tests/krb5: Use Python bindings for LZ77+Huffman compression
[Samba.git] / python / samba / tests / krb5 / raw_testcase.py
bloba3fcc701ab000af4163f0b926c6d837a8fea170d
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import socket
21 import struct
22 import time
23 import datetime
24 import random
25 import binascii
26 import itertools
27 import collections
29 from enum import Enum
31 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
32 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
33 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
34 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
36 from pyasn1.codec.ber.encoder import BitStringEncoder
38 from pyasn1.error import PyAsn1Error
40 from samba.compression import huffman_decompress
41 from samba.credentials import Credentials
42 from samba.dcerpc import claims, krb5pac, netlogon, security
43 from samba.gensec import FEATURE_SEAL
44 from samba.ndr import ndr_pack, ndr_unpack
45 from samba.dcerpc.misc import (
46 SEC_CHAN_WKSTA,
47 SEC_CHAN_BDC,
50 import samba.tests
51 from samba.tests import TestCaseInTempDir
53 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
54 from samba.tests.krb5.rfc4120_constants import (
55 AD_IF_RELEVANT,
56 AD_WIN2K_PAC,
57 FX_FAST_ARMOR_AP_REQUEST,
58 KDC_ERR_CLIENT_REVOKED,
59 KDC_ERR_GENERIC,
60 KDC_ERR_POLICY,
61 KDC_ERR_PREAUTH_FAILED,
62 KDC_ERR_SKEW,
63 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
64 KERB_ERR_TYPE_EXTENDED,
65 KRB_AP_REP,
66 KRB_AP_REQ,
67 KRB_AS_REP,
68 KRB_AS_REQ,
69 KRB_ERROR,
70 KRB_PRIV,
71 KRB_TGS_REP,
72 KRB_TGS_REQ,
73 KU_AP_REQ_AUTH,
74 KU_AS_REP_ENC_PART,
75 KU_AP_REQ_ENC_PART,
76 KU_AS_REQ,
77 KU_ENC_CHALLENGE_KDC,
78 KU_FAST_ENC,
79 KU_FAST_FINISHED,
80 KU_FAST_REP,
81 KU_FAST_REQ_CHKSUM,
82 KU_KRB_PRIV,
83 KU_NON_KERB_CKSUM_SALT,
84 KU_TGS_REP_ENC_PART_SESSION,
85 KU_TGS_REP_ENC_PART_SUB_KEY,
86 KU_TGS_REQ_AUTH,
87 KU_TGS_REQ_AUTH_CKSUM,
88 KU_TGS_REQ_AUTH_DAT_SESSION,
89 KU_TGS_REQ_AUTH_DAT_SUBKEY,
90 KU_TICKET,
91 NT_PRINCIPAL,
92 NT_SRV_INST,
93 NT_WELLKNOWN,
94 PADATA_ENCRYPTED_CHALLENGE,
95 PADATA_ENC_TIMESTAMP,
96 PADATA_ETYPE_INFO,
97 PADATA_ETYPE_INFO2,
98 PADATA_FOR_USER,
99 PADATA_FX_COOKIE,
100 PADATA_FX_ERROR,
101 PADATA_FX_FAST,
102 PADATA_GSS,
103 PADATA_KDC_REQ,
104 PADATA_PAC_OPTIONS,
105 PADATA_PAC_REQUEST,
106 PADATA_PKINIT_KX,
107 PADATA_PK_AS_REQ,
108 PADATA_PK_AS_REP_19,
109 PADATA_SUPPORTED_ETYPES,
110 PADATA_REQ_ENC_PA_REP
112 import samba.tests.krb5.kcrypto as kcrypto
115 def BitStringEncoder_encodeValue32(
116 self, value, asn1Spec, encodeFun, **options):
118 # BitStrings like KDCOptions or TicketFlags should at least
119 # be 32-Bit on the wire
121 if asn1Spec is not None:
122 # TODO: try to avoid ASN.1 schema instantiation
123 value = asn1Spec.clone(value)
125 valueLength = len(value)
126 if valueLength % 8:
127 alignedValue = value << (8 - valueLength % 8)
128 else:
129 alignedValue = value
131 substrate = alignedValue.asOctets()
132 length = len(substrate)
133 # We need at least 32-Bit / 4-Bytes
134 if length < 4:
135 padding = 4 - length
136 else:
137 padding = 0
138 ret = b'\x00' + substrate + (b'\x00' * padding)
139 return ret, False, True
142 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
145 def BitString_NamedValues_prettyPrint(self, scope=0):
146 ret = "%s" % self.asBinary()
147 bits = []
148 highest_bit = 32
149 for byte in self.asNumbers():
150 for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
151 mask = 1 << bit
152 if byte & mask:
153 val = 1
154 else:
155 val = 0
156 bits.append(val)
157 if len(bits) < highest_bit:
158 for bitPosition in range(len(bits), highest_bit):
159 bits.append(0)
160 indent = " " * scope
161 delim = ": (\n%s " % indent
162 for bitPosition in range(highest_bit):
163 if bitPosition in self.prettyPrintNamedValues:
164 name = self.prettyPrintNamedValues[bitPosition]
165 elif bits[bitPosition] != 0:
166 name = "unknown-bit-%u" % bitPosition
167 else:
168 continue
169 ret += "%s%s:%u" % (delim, name, bits[bitPosition])
170 delim = ",\n%s " % indent
171 ret += "\n%s)" % indent
172 return ret
175 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
176 krb5_asn1.TicketFlagsValues.namedValues
177 krb5_asn1.TicketFlags.namedValues =\
178 krb5_asn1.TicketFlagsValues.namedValues
179 krb5_asn1.TicketFlags.prettyPrint =\
180 BitString_NamedValues_prettyPrint
181 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
182 krb5_asn1.KDCOptionsValues.namedValues
183 krb5_asn1.KDCOptions.namedValues =\
184 krb5_asn1.KDCOptionsValues.namedValues
185 krb5_asn1.KDCOptions.prettyPrint =\
186 BitString_NamedValues_prettyPrint
187 krb5_asn1.APOptions.prettyPrintNamedValues =\
188 krb5_asn1.APOptionsValues.namedValues
189 krb5_asn1.APOptions.namedValues =\
190 krb5_asn1.APOptionsValues.namedValues
191 krb5_asn1.APOptions.prettyPrint =\
192 BitString_NamedValues_prettyPrint
193 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
194 krb5_asn1.PACOptionFlagsValues.namedValues
195 krb5_asn1.PACOptionFlags.namedValues =\
196 krb5_asn1.PACOptionFlagsValues.namedValues
197 krb5_asn1.PACOptionFlags.prettyPrint =\
198 BitString_NamedValues_prettyPrint
201 def Integer_NamedValues_prettyPrint(self, scope=0):
202 intval = int(self)
203 if intval in self.prettyPrintNamedValues:
204 name = self.prettyPrintNamedValues[intval]
205 else:
206 name = "<__unknown__>"
207 ret = "%d (0x%x) %s" % (intval, intval, name)
208 return ret
211 krb5_asn1.NameType.prettyPrintNamedValues =\
212 krb5_asn1.NameTypeValues.namedValues
213 krb5_asn1.NameType.prettyPrint =\
214 Integer_NamedValues_prettyPrint
215 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
216 krb5_asn1.AuthDataTypeValues.namedValues
217 krb5_asn1.AuthDataType.prettyPrint =\
218 Integer_NamedValues_prettyPrint
219 krb5_asn1.PADataType.prettyPrintNamedValues =\
220 krb5_asn1.PADataTypeValues.namedValues
221 krb5_asn1.PADataType.prettyPrint =\
222 Integer_NamedValues_prettyPrint
223 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
224 krb5_asn1.EncryptionTypeValues.namedValues
225 krb5_asn1.EncryptionType.prettyPrint =\
226 Integer_NamedValues_prettyPrint
227 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
228 krb5_asn1.ChecksumTypeValues.namedValues
229 krb5_asn1.ChecksumType.prettyPrint =\
230 Integer_NamedValues_prettyPrint
231 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
232 krb5_asn1.KerbErrorDataTypeValues.namedValues
233 krb5_asn1.KerbErrorDataType.prettyPrint =\
234 Integer_NamedValues_prettyPrint
237 class Krb5EncryptionKey:
238 def __init__(self, key, kvno):
239 EncTypeChecksum = {
240 kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
241 kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
242 kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
244 self.key = key
245 self.etype = key.enctype
246 self.ctype = EncTypeChecksum[self.etype]
247 self.kvno = kvno
249 def __str__(self):
250 return "etype=%d ctype=%d kvno=%d key=%s" % (
251 self.etype, self.ctype, self.kvno, self.key)
253 def encrypt(self, usage, plaintext):
254 ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
255 return ciphertext
257 def decrypt(self, usage, ciphertext):
258 plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
259 return plaintext
261 def make_zeroed_checksum(self, ctype=None):
262 if ctype is None:
263 ctype = self.ctype
265 checksum_len = kcrypto.checksum_len(ctype)
266 return bytes(checksum_len)
268 def make_checksum(self, usage, plaintext, ctype=None):
269 if ctype is None:
270 ctype = self.ctype
271 cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
272 return cksum
274 def verify_checksum(self, usage, plaintext, ctype, cksum):
275 if self.ctype != ctype:
276 raise AssertionError(f'key checksum type ({self.ctype}) != '
277 f'checksum type ({ctype})')
279 kcrypto.verify_checksum(ctype,
280 self.key,
281 usage,
282 plaintext,
283 cksum)
285 def export_obj(self):
286 EncryptionKey_obj = {
287 'keytype': self.etype,
288 'keyvalue': self.key.contents,
290 return EncryptionKey_obj
293 class RodcPacEncryptionKey(Krb5EncryptionKey):
294 def __init__(self, key, kvno, rodc_id=None):
295 super().__init__(key, kvno)
297 if rodc_id is None:
298 kvno = self.kvno
299 if kvno is not None:
300 kvno >>= 16
301 kvno &= (1 << 16) - 1
303 rodc_id = kvno or None
305 if rodc_id is not None:
306 self.rodc_id = rodc_id.to_bytes(2, byteorder='little')
307 else:
308 self.rodc_id = b''
310 def make_rodc_zeroed_checksum(self, ctype=None):
311 checksum = super().make_zeroed_checksum(ctype)
312 return checksum + bytes(len(self.rodc_id))
314 def make_rodc_checksum(self, usage, plaintext, ctype=None):
315 checksum = super().make_checksum(usage, plaintext, ctype)
316 return checksum + self.rodc_id
318 def verify_rodc_checksum(self, usage, plaintext, ctype, cksum):
319 if self.rodc_id:
320 cksum, cksum_rodc_id = cksum[:-2], cksum[-2:]
322 if self.rodc_id != cksum_rodc_id:
323 raise AssertionError(f'{self.rodc_id.hex()} != '
324 f'{cksum_rodc_id.hex()}')
326 super().verify_checksum(usage,
327 plaintext,
328 ctype,
329 cksum)
332 class ZeroedChecksumKey(RodcPacEncryptionKey):
333 def make_checksum(self, usage, plaintext, ctype=None):
334 return self.make_zeroed_checksum(ctype)
336 def make_rodc_checksum(self, usage, plaintext, ctype=None):
337 return self.make_rodc_zeroed_checksum(ctype)
340 class WrongLengthChecksumKey(RodcPacEncryptionKey):
341 def __init__(self, key, kvno, length):
342 super().__init__(key, kvno)
344 self._length = length
346 @classmethod
347 def _adjust_to_length(cls, checksum, length):
348 diff = length - len(checksum)
349 if diff > 0:
350 checksum += bytes(diff)
351 elif diff < 0:
352 checksum = checksum[:length]
354 return checksum
356 def make_zeroed_checksum(self, ctype=None):
357 return bytes(self._length)
359 def make_checksum(self, usage, plaintext, ctype=None):
360 checksum = super().make_checksum(usage, plaintext, ctype)
361 return self._adjust_to_length(checksum, self._length)
363 def make_rodc_zeroed_checksum(self, ctype=None):
364 return bytes(self._length)
366 def make_rodc_checksum(self, usage, plaintext, ctype=None):
367 checksum = super().make_rodc_checksum(usage, plaintext, ctype)
368 return self._adjust_to_length(checksum, self._length)
371 class KerberosCredentials(Credentials):
373 fast_supported_bits = (security.KERB_ENCTYPE_FAST_SUPPORTED |
374 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
375 security.KERB_ENCTYPE_CLAIMS_SUPPORTED)
377 non_etype_bits = fast_supported_bits | (
378 security.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED) | (
379 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK)
381 def __init__(self):
382 super(KerberosCredentials, self).__init__()
383 all_enc_types = 0
384 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
385 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
386 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
388 self.as_supported_enctypes = all_enc_types
389 self.tgs_supported_enctypes = all_enc_types
390 self.ap_supported_enctypes = all_enc_types
392 self.kvno = None
393 self.forced_keys = {}
395 self.forced_salt = None
397 self.dn = None
398 self.upn = None
399 self.spn = None
401 def set_as_supported_enctypes(self, value):
402 self.as_supported_enctypes = int(value)
404 def set_tgs_supported_enctypes(self, value):
405 self.tgs_supported_enctypes = int(value)
407 def set_ap_supported_enctypes(self, value):
408 self.ap_supported_enctypes = int(value)
410 etype_map = collections.OrderedDict([
411 (kcrypto.Enctype.AES256,
412 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96),
413 (kcrypto.Enctype.AES128,
414 security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96),
415 (kcrypto.Enctype.RC4,
416 security.KERB_ENCTYPE_RC4_HMAC_MD5),
417 (kcrypto.Enctype.DES_MD5,
418 security.KERB_ENCTYPE_DES_CBC_MD5),
419 (kcrypto.Enctype.DES_CRC,
420 security.KERB_ENCTYPE_DES_CBC_CRC)
423 @classmethod
424 def etypes_to_bits(cls, etypes):
425 bits = 0
426 for etype in etypes:
427 bit = cls.etype_map[etype]
428 if bits & bit:
429 raise ValueError(f'Got duplicate etype: {etype}')
430 bits |= bit
432 return bits
434 @classmethod
435 def bits_to_etypes(cls, bits):
436 etypes = ()
437 for etype, bit in cls.etype_map.items():
438 if bit & bits:
439 bits &= ~bit
440 etypes += (etype,)
442 bits &= ~cls.non_etype_bits
443 if bits != 0:
444 raise ValueError(f'Unsupported etype bits: {bits}')
446 return etypes
448 def get_as_krb5_etypes(self):
449 return self.bits_to_etypes(self.as_supported_enctypes)
451 def get_tgs_krb5_etypes(self):
452 return self.bits_to_etypes(self.tgs_supported_enctypes)
454 def get_ap_krb5_etypes(self):
455 return self.bits_to_etypes(self.ap_supported_enctypes)
457 def set_kvno(self, kvno):
458 # Sign-extend from 32 bits.
459 if kvno & 1 << 31:
460 kvno |= -1 << 31
461 self.kvno = kvno
463 def get_kvno(self):
464 return self.kvno
466 def set_forced_key(self, etype, hexkey):
467 etype = int(etype)
468 contents = binascii.a2b_hex(hexkey)
469 key = kcrypto.Key(etype, contents)
470 self.forced_keys[etype] = RodcPacEncryptionKey(key, self.kvno)
472 def get_forced_key(self, etype):
473 etype = int(etype)
474 return self.forced_keys.get(etype)
476 def set_forced_salt(self, salt):
477 self.forced_salt = bytes(salt)
479 def get_forced_salt(self):
480 return self.forced_salt
482 def get_salt(self):
483 if self.forced_salt is not None:
484 return self.forced_salt
486 upn = self.get_upn()
487 if upn is not None:
488 salt_name = upn.rsplit('@', 1)[0].replace('/', '')
489 else:
490 salt_name = self.get_username()
492 secure_schannel_type = self.get_secure_channel_type()
493 if secure_schannel_type in [SEC_CHAN_WKSTA,SEC_CHAN_BDC]:
494 salt_name = self.get_username().lower()
495 if salt_name[-1] == '$':
496 salt_name = salt_name[:-1]
497 salt_string = '%shost%s.%s' % (
498 self.get_realm().upper(),
499 salt_name,
500 self.get_realm().lower())
501 else:
502 salt_string = self.get_realm().upper() + salt_name
504 return salt_string.encode('utf-8')
506 def set_dn(self, dn):
507 self.dn = dn
509 def get_dn(self):
510 return self.dn
512 def set_spn(self, spn):
513 self.spn = spn
515 def get_spn(self):
516 return self.spn
518 def set_upn(self, upn):
519 self.upn = upn
521 def get_upn(self):
522 return self.upn
524 def update_password(self, password):
525 self.set_password(password)
526 self.set_kvno(self.get_kvno() + 1)
529 class KerberosTicketCreds:
530 def __init__(self, ticket, session_key,
531 crealm=None, cname=None,
532 srealm=None, sname=None,
533 decryption_key=None,
534 ticket_private=None,
535 encpart_private=None):
536 self.ticket = ticket
537 self.session_key = session_key
538 self.crealm = crealm
539 self.cname = cname
540 self.srealm = srealm
541 self.sname = sname
542 self.decryption_key = decryption_key
543 self.ticket_private = ticket_private
544 self.encpart_private = encpart_private
546 def set_sname(self, sname):
547 self.ticket['sname'] = sname
548 self.sname = sname
551 class RawKerberosTest(TestCaseInTempDir):
552 """A raw Kerberos Test case."""
554 class KpasswdMode(Enum):
555 SET = object()
556 CHANGE = object()
558 # The location of a SID within the PAC
559 class SidType(Enum):
560 BASE_SID = object() # in info3.base.groups
561 EXTRA_SID = object() # in info3.sids
562 RESOURCE_SID = object() # in resource_groups
564 pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
565 krb5pac.PAC_TYPE_KDC_CHECKSUM,
566 krb5pac.PAC_TYPE_TICKET_CHECKSUM,
567 krb5pac.PAC_TYPE_FULL_CHECKSUM}
569 etypes_to_test = (
570 {"value": -1111, "name": "dummy", },
571 {"value": kcrypto.Enctype.AES256, "name": "aes128", },
572 {"value": kcrypto.Enctype.AES128, "name": "aes256", },
573 {"value": kcrypto.Enctype.RC4, "name": "rc4", },
576 expect_padata_outer = object()
578 setup_etype_test_permutations_done = False
580 @classmethod
581 def setup_etype_test_permutations(cls):
582 if cls.setup_etype_test_permutations_done:
583 return
585 res = []
587 num_idxs = len(cls.etypes_to_test)
588 permutations = []
589 for num in range(1, num_idxs + 1):
590 chunk = list(itertools.permutations(range(num_idxs), num))
591 for e in chunk:
592 el = list(e)
593 permutations.append(el)
595 for p in permutations:
596 name = None
597 etypes = ()
598 for idx in p:
599 n = cls.etypes_to_test[idx]["name"]
600 if name is None:
601 name = n
602 else:
603 name += "_%s" % n
604 etypes += (cls.etypes_to_test[idx]["value"],)
606 r = {"name": name, "etypes": etypes, }
607 res.append(r)
609 cls.etype_test_permutations = res
610 cls.setup_etype_test_permutations_done = True
612 @classmethod
613 def etype_test_permutation_name_idx(cls):
614 cls.setup_etype_test_permutations()
615 res = []
616 idx = 0
617 for e in cls.etype_test_permutations:
618 r = (e['name'], idx)
619 idx += 1
620 res.append(r)
621 return res
623 def etype_test_permutation_by_idx(self, idx):
624 e = self.etype_test_permutations[idx]
625 return (e['name'], e['etypes'])
627 @classmethod
628 def setUpClass(cls):
629 super().setUpClass()
631 cls.host = samba.tests.env_get_var_value('SERVER')
632 cls.dc_host = samba.tests.env_get_var_value('DC_SERVER')
634 # A dictionary containing credentials that have already been
635 # obtained.
636 cls.creds_dict = {}
638 kdc_fast_support = samba.tests.env_get_var_value('FAST_SUPPORT',
639 allow_missing=True)
640 if kdc_fast_support is None:
641 kdc_fast_support = '0'
642 cls.kdc_fast_support = bool(int(kdc_fast_support))
644 kdc_claims_support = samba.tests.env_get_var_value('CLAIMS_SUPPORT',
645 allow_missing=True)
646 if kdc_claims_support is None:
647 kdc_claims_support = '0'
648 cls.kdc_claims_support = bool(int(kdc_claims_support))
650 kdc_compound_id_support = samba.tests.env_get_var_value(
651 'COMPOUND_ID_SUPPORT',
652 allow_missing=True)
653 if kdc_compound_id_support is None:
654 kdc_compound_id_support = '0'
655 cls.kdc_compound_id_support = bool(int(kdc_compound_id_support))
657 tkt_sig_support = samba.tests.env_get_var_value('TKT_SIG_SUPPORT',
658 allow_missing=True)
659 if tkt_sig_support is None:
660 tkt_sig_support = '0'
661 cls.tkt_sig_support = bool(int(tkt_sig_support))
663 full_sig_support = samba.tests.env_get_var_value('FULL_SIG_SUPPORT',
664 allow_missing=True)
665 if full_sig_support is None:
666 full_sig_support = '0'
667 cls.full_sig_support = bool(int(full_sig_support))
669 gnutls_pbkdf2_support = samba.tests.env_get_var_value(
670 'GNUTLS_PBKDF2_SUPPORT',
671 allow_missing=True)
672 if gnutls_pbkdf2_support is None:
673 gnutls_pbkdf2_support = '1'
674 cls.gnutls_pbkdf2_support = bool(int(gnutls_pbkdf2_support))
676 expect_pac = samba.tests.env_get_var_value('EXPECT_PAC',
677 allow_missing=True)
678 if expect_pac is None:
679 expect_pac = '1'
680 cls.expect_pac = bool(int(expect_pac))
682 expect_extra_pac_buffers = samba.tests.env_get_var_value(
683 'EXPECT_EXTRA_PAC_BUFFERS',
684 allow_missing=True)
685 if expect_extra_pac_buffers is None:
686 expect_extra_pac_buffers = '1'
687 cls.expect_extra_pac_buffers = bool(int(expect_extra_pac_buffers))
689 cname_checking = samba.tests.env_get_var_value('CHECK_CNAME',
690 allow_missing=True)
691 if cname_checking is None:
692 cname_checking = '1'
693 cls.cname_checking = bool(int(cname_checking))
695 padata_checking = samba.tests.env_get_var_value('CHECK_PADATA',
696 allow_missing=True)
697 if padata_checking is None:
698 padata_checking = '1'
699 cls.padata_checking = bool(int(padata_checking))
701 kadmin_is_tgs = samba.tests.env_get_var_value('KADMIN_IS_TGS',
702 allow_missing=True)
703 if kadmin_is_tgs is None:
704 kadmin_is_tgs = '0'
705 cls.kadmin_is_tgs = bool(int(kadmin_is_tgs))
707 default_etypes = samba.tests.env_get_var_value('DEFAULT_ETYPES',
708 allow_missing=True)
709 if default_etypes is not None:
710 default_etypes = int(default_etypes)
711 cls.default_etypes = default_etypes
713 forced_rc4 = samba.tests.env_get_var_value('FORCED_RC4',
714 allow_missing=True)
715 if forced_rc4 is None:
716 forced_rc4 = '0'
717 cls.forced_rc4 = bool(int(forced_rc4))
719 def setUp(self):
720 super().setUp()
721 self.do_asn1_print = False
722 self.do_hexdump = False
724 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
725 allow_missing=True)
726 if strict_checking is None:
727 strict_checking = '1'
728 self.strict_checking = bool(int(strict_checking))
730 self.s = None
732 self.unspecified_kvno = object()
734 def tearDown(self):
735 self._disconnect("tearDown")
736 super().tearDown()
738 def _disconnect(self, reason):
739 if self.s is None:
740 return
741 self.s.close()
742 self.s = None
743 if self.do_hexdump:
744 sys.stderr.write("disconnect[%s]\n" % reason)
746 def _connect_tcp(self, host, port=None):
747 if port is None:
748 port = 88
749 try:
750 self.a = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
751 socket.SOCK_STREAM, socket.SOL_TCP,
753 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
754 self.s.settimeout(10)
755 self.s.connect(self.a[0][4])
756 except socket.error:
757 self.s.close()
758 raise
759 except IOError:
760 self.s.close()
761 raise
763 def connect(self, host, port=None):
764 self.assertNotConnected()
765 self._connect_tcp(host, port)
766 if self.do_hexdump:
767 sys.stderr.write("connected[%s]\n" % host)
769 def env_get_var(self, varname, prefix,
770 fallback_default=True,
771 allow_missing=False):
772 val = None
773 if prefix is not None:
774 allow_missing_prefix = allow_missing or fallback_default
775 val = samba.tests.env_get_var_value(
776 '%s_%s' % (prefix, varname),
777 allow_missing=allow_missing_prefix)
778 else:
779 fallback_default = True
780 if val is None and fallback_default:
781 val = samba.tests.env_get_var_value(varname,
782 allow_missing=allow_missing)
783 return val
785 def _get_krb5_creds_from_env(self, prefix,
786 default_username=None,
787 allow_missing_password=False,
788 allow_missing_keys=True,
789 require_strongest_key=False):
790 c = KerberosCredentials()
791 c.guess()
793 domain = self.env_get_var('DOMAIN', prefix)
794 realm = self.env_get_var('REALM', prefix)
795 allow_missing_username = default_username is not None
796 username = self.env_get_var('USERNAME', prefix,
797 fallback_default=False,
798 allow_missing=allow_missing_username)
799 if username is None:
800 username = default_username
801 password = self.env_get_var('PASSWORD', prefix,
802 fallback_default=False,
803 allow_missing=allow_missing_password)
804 c.set_domain(domain)
805 c.set_realm(realm)
806 c.set_username(username)
807 if password is not None:
808 c.set_password(password)
809 as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
810 prefix, allow_missing=True)
811 if as_supported_enctypes is not None:
812 c.set_as_supported_enctypes(as_supported_enctypes)
813 tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
814 prefix, allow_missing=True)
815 if tgs_supported_enctypes is not None:
816 c.set_tgs_supported_enctypes(tgs_supported_enctypes)
817 ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
818 prefix, allow_missing=True)
819 if ap_supported_enctypes is not None:
820 c.set_ap_supported_enctypes(ap_supported_enctypes)
822 if require_strongest_key:
823 kvno_allow_missing = False
824 if password is None:
825 aes256_allow_missing = False
826 else:
827 aes256_allow_missing = True
828 else:
829 kvno_allow_missing = allow_missing_keys
830 aes256_allow_missing = allow_missing_keys
831 kvno = self.env_get_var('KVNO', prefix,
832 fallback_default=False,
833 allow_missing=kvno_allow_missing)
834 if kvno is not None:
835 c.set_kvno(int(kvno))
836 aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
837 fallback_default=False,
838 allow_missing=aes256_allow_missing)
839 if aes256_key is not None:
840 c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
841 aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
842 fallback_default=False,
843 allow_missing=True)
844 if aes128_key is not None:
845 c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
846 rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
847 fallback_default=False, allow_missing=True)
848 if rc4_key is not None:
849 c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
851 if not allow_missing_keys:
852 self.assertTrue(c.forced_keys,
853 'Please supply %s encryption keys '
854 'in environment' % prefix)
856 return c
858 def _get_krb5_creds(self,
859 prefix,
860 default_username=None,
861 allow_missing_password=False,
862 allow_missing_keys=True,
863 require_strongest_key=False,
864 fallback_creds_fn=None):
865 if prefix in self.creds_dict:
866 return self.creds_dict[prefix]
868 # We don't have the credentials already
869 creds = None
870 env_err = None
871 try:
872 # Try to obtain them from the environment
873 creds = self._get_krb5_creds_from_env(
874 prefix,
875 default_username=default_username,
876 allow_missing_password=allow_missing_password,
877 allow_missing_keys=allow_missing_keys,
878 require_strongest_key=require_strongest_key)
879 except Exception as err:
880 # An error occurred, so save it for later
881 env_err = err
882 else:
883 self.assertIsNotNone(creds)
884 # Save the obtained credentials
885 self.creds_dict[prefix] = creds
886 return creds
888 if fallback_creds_fn is not None:
889 try:
890 # Try to use the fallback method
891 creds = fallback_creds_fn()
892 except Exception as err:
893 print("ERROR FROM ENV: %r" % (env_err))
894 print("FALLBACK-FN: %s" % (fallback_creds_fn))
895 print("FALLBACK-ERROR: %r" % (err))
896 else:
897 self.assertIsNotNone(creds)
898 # Save the obtained credentials
899 self.creds_dict[prefix] = creds
900 return creds
902 # Both methods failed, so raise the exception from the
903 # environment method
904 raise env_err
906 def get_user_creds(self,
907 allow_missing_password=False,
908 allow_missing_keys=True):
909 c = self._get_krb5_creds(prefix=None,
910 allow_missing_password=allow_missing_password,
911 allow_missing_keys=allow_missing_keys)
912 return c
914 def get_service_creds(self,
915 allow_missing_password=False,
916 allow_missing_keys=True):
917 c = self._get_krb5_creds(prefix='SERVICE',
918 allow_missing_password=allow_missing_password,
919 allow_missing_keys=allow_missing_keys)
920 return c
922 def get_client_creds(self,
923 allow_missing_password=False,
924 allow_missing_keys=True):
925 c = self._get_krb5_creds(prefix='CLIENT',
926 allow_missing_password=allow_missing_password,
927 allow_missing_keys=allow_missing_keys)
928 return c
930 def get_server_creds(self,
931 allow_missing_password=False,
932 allow_missing_keys=True):
933 c = self._get_krb5_creds(prefix='SERVER',
934 allow_missing_password=allow_missing_password,
935 allow_missing_keys=allow_missing_keys)
936 return c
938 def get_admin_creds(self,
939 allow_missing_password=False,
940 allow_missing_keys=True):
941 c = self._get_krb5_creds(prefix='ADMIN',
942 allow_missing_password=allow_missing_password,
943 allow_missing_keys=allow_missing_keys)
944 c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
945 c.set_workstation('')
946 return c
948 def get_rodc_krbtgt_creds(self,
949 require_keys=True,
950 require_strongest_key=False):
951 if require_strongest_key:
952 self.assertTrue(require_keys)
953 c = self._get_krb5_creds(prefix='RODC_KRBTGT',
954 allow_missing_password=True,
955 allow_missing_keys=not require_keys,
956 require_strongest_key=require_strongest_key)
957 return c
959 def get_krbtgt_creds(self,
960 require_keys=True,
961 require_strongest_key=False):
962 if require_strongest_key:
963 self.assertTrue(require_keys)
964 c = self._get_krb5_creds(prefix='KRBTGT',
965 default_username='krbtgt',
966 allow_missing_password=True,
967 allow_missing_keys=not require_keys,
968 require_strongest_key=require_strongest_key)
969 return c
971 def get_anon_creds(self):
972 c = Credentials()
973 c.set_anonymous()
974 return c
976 def asn1_dump(self, name, obj, asn1_print=None):
977 if asn1_print is None:
978 asn1_print = self.do_asn1_print
979 if asn1_print:
980 if name is not None:
981 sys.stderr.write("%s:\n%s" % (name, obj))
982 else:
983 sys.stderr.write("%s" % (obj))
985 def hex_dump(self, name, blob, hexdump=None):
986 if hexdump is None:
987 hexdump = self.do_hexdump
988 if hexdump:
989 sys.stderr.write(
990 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
992 def der_decode(
993 self,
994 blob,
995 asn1Spec=None,
996 native_encode=True,
997 asn1_print=None,
998 hexdump=None):
999 if asn1Spec is not None:
1000 class_name = type(asn1Spec).__name__.split(':')[0]
1001 else:
1002 class_name = "<None-asn1Spec>"
1003 self.hex_dump(class_name, blob, hexdump=hexdump)
1004 obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
1005 self.asn1_dump(None, obj, asn1_print=asn1_print)
1006 if native_encode:
1007 obj = pyasn1_native_encode(obj)
1008 return obj
1010 def der_encode(
1011 self,
1012 obj,
1013 asn1Spec=None,
1014 native_decode=True,
1015 asn1_print=None,
1016 hexdump=None):
1017 if native_decode:
1018 obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
1019 class_name = type(obj).__name__.split(':')[0]
1020 if class_name is not None:
1021 self.asn1_dump(None, obj, asn1_print=asn1_print)
1022 blob = pyasn1_der_encode(obj)
1023 if class_name is not None:
1024 self.hex_dump(class_name, blob, hexdump=hexdump)
1025 return blob
1027 def send_pdu(self, req, asn1_print=None, hexdump=None):
1028 k5_pdu = self.der_encode(
1029 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
1030 self.send_msg(k5_pdu, hexdump=hexdump)
1032 def send_msg(self, msg, hexdump=None):
1033 header = struct.pack('>I', len(msg))
1034 req_pdu = header
1035 req_pdu += msg
1036 self.hex_dump("send_msg", header, hexdump=hexdump)
1037 self.hex_dump("send_msg", msg, hexdump=hexdump)
1039 try:
1040 while True:
1041 sent = self.s.send(req_pdu, 0)
1042 if sent == len(req_pdu):
1043 return
1044 req_pdu = req_pdu[sent:]
1045 except socket.error as e:
1046 self._disconnect("send_msg: %s" % e)
1047 raise
1048 except IOError as e:
1049 self._disconnect("send_msg: %s" % e)
1050 raise
1052 def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
1053 rep_pdu = None
1054 try:
1055 if timeout is not None:
1056 self.s.settimeout(timeout)
1057 rep_pdu = self.s.recv(num_recv, 0)
1058 self.s.settimeout(10)
1059 if len(rep_pdu) == 0:
1060 self._disconnect("recv_raw: EOF")
1061 return None
1062 self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
1063 except socket.timeout:
1064 self.s.settimeout(10)
1065 sys.stderr.write("recv_raw: TIMEOUT\n")
1066 except socket.error as e:
1067 self._disconnect("recv_raw: %s" % e)
1068 raise
1069 except IOError as e:
1070 self._disconnect("recv_raw: %s" % e)
1071 raise
1072 return rep_pdu
1074 def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
1075 raw_pdu = self.recv_raw(
1076 num_recv=4, hexdump=hexdump, timeout=timeout)
1077 if raw_pdu is None:
1078 return None
1079 header = struct.unpack(">I", raw_pdu[0:4])
1080 k5_len = header[0]
1081 if k5_len == 0:
1082 return ""
1083 missing = k5_len
1084 rep_pdu = b''
1085 while missing > 0:
1086 raw_pdu = self.recv_raw(
1087 num_recv=missing, hexdump=hexdump, timeout=timeout)
1088 self.assertGreaterEqual(len(raw_pdu), 1)
1089 rep_pdu += raw_pdu
1090 missing = k5_len - len(rep_pdu)
1091 return rep_pdu
1093 def recv_reply(self, asn1_print=None, hexdump=None, timeout=None):
1094 rep_pdu = self.recv_pdu_raw(asn1_print=asn1_print,
1095 hexdump=hexdump,
1096 timeout=timeout)
1097 if not rep_pdu:
1098 return None, rep_pdu
1099 k5_raw = self.der_decode(
1100 rep_pdu,
1101 asn1Spec=None,
1102 native_encode=False,
1103 asn1_print=False,
1104 hexdump=False)
1105 pvno = k5_raw['field-0']
1106 self.assertEqual(pvno, 5)
1107 msg_type = k5_raw['field-1']
1108 self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
1109 if msg_type == KRB_AS_REP:
1110 asn1Spec = krb5_asn1.AS_REP()
1111 elif msg_type == KRB_TGS_REP:
1112 asn1Spec = krb5_asn1.TGS_REP()
1113 elif msg_type == KRB_ERROR:
1114 asn1Spec = krb5_asn1.KRB_ERROR()
1115 rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
1116 asn1_print=asn1_print, hexdump=False)
1117 return (rep, rep_pdu)
1119 def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
1120 (rep, rep_pdu) = self.recv_reply(asn1_print=asn1_print,
1121 hexdump=hexdump,
1122 timeout=timeout)
1123 return rep
1125 def assertIsConnected(self):
1126 self.assertIsNotNone(self.s, msg="Not connected")
1128 def assertNotConnected(self):
1129 self.assertIsNone(self.s, msg="Is connected")
1131 def send_recv_transaction(
1132 self,
1133 req,
1134 asn1_print=None,
1135 hexdump=None,
1136 timeout=None,
1137 to_rodc=False):
1138 host = self.host if to_rodc else self.dc_host
1139 self.connect(host)
1140 try:
1141 self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
1142 rep = self.recv_pdu(
1143 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
1144 except Exception:
1145 self._disconnect("transaction failed")
1146 raise
1147 self._disconnect("transaction done")
1148 return rep
1150 def assertNoValue(self, value):
1151 self.assertTrue(value.isNoValue)
1153 def assertHasValue(self, value):
1154 self.assertIsNotNone(value)
1156 def getElementValue(self, obj, elem):
1157 return obj.get(elem)
1159 def assertElementMissing(self, obj, elem):
1160 v = self.getElementValue(obj, elem)
1161 self.assertIsNone(v)
1163 def assertElementPresent(self, obj, elem, expect_empty=False):
1164 v = self.getElementValue(obj, elem)
1165 self.assertIsNotNone(v)
1166 if self.strict_checking:
1167 if isinstance(v, collections.abc.Container):
1168 if expect_empty:
1169 self.assertEqual(0, len(v))
1170 else:
1171 self.assertNotEqual(0, len(v))
1173 def assertElementEqual(self, obj, elem, value):
1174 v = self.getElementValue(obj, elem)
1175 self.assertIsNotNone(v)
1176 self.assertEqual(v, value)
1178 def assertElementEqualUTF8(self, obj, elem, value):
1179 v = self.getElementValue(obj, elem)
1180 self.assertIsNotNone(v)
1181 self.assertEqual(v, bytes(value, 'utf8'))
1183 def assertPrincipalEqual(self, princ1, princ2):
1184 self.assertEqual(princ1['name-type'], princ2['name-type'])
1185 self.assertEqual(
1186 len(princ1['name-string']),
1187 len(princ2['name-string']),
1188 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1189 for idx in range(len(princ1['name-string'])):
1190 self.assertEqual(
1191 princ1['name-string'][idx],
1192 princ2['name-string'][idx],
1193 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1195 def assertElementEqualPrincipal(self, obj, elem, value):
1196 v = self.getElementValue(obj, elem)
1197 self.assertIsNotNone(v)
1198 v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
1199 self.assertPrincipalEqual(v, value)
1201 def assertElementKVNO(self, obj, elem, value):
1202 v = self.getElementValue(obj, elem)
1203 if value == "autodetect":
1204 value = v
1205 if value is not None:
1206 self.assertIsNotNone(v)
1207 # The value on the wire should never be 0
1208 self.assertNotEqual(v, 0)
1209 # unspecified_kvno means we don't know the kvno,
1210 # but want to enforce its presence
1211 if value is not self.unspecified_kvno:
1212 value = int(value)
1213 self.assertNotEqual(value, 0)
1214 self.assertEqual(v, value)
1215 else:
1216 self.assertIsNone(v)
1218 def assertElementFlags(self, obj, elem, expected, unexpected):
1219 v = self.getElementValue(obj, elem)
1220 self.assertIsNotNone(v)
1221 if expected is not None:
1222 self.assertIsInstance(expected, krb5_asn1.TicketFlags)
1223 for i, flag in enumerate(expected):
1224 if flag == 1:
1225 self.assertEqual('1', v[i],
1226 f"'{expected.namedValues[i]}' "
1227 f"expected in {v}")
1228 if unexpected is not None:
1229 self.assertIsInstance(unexpected, krb5_asn1.TicketFlags)
1230 for i, flag in enumerate(unexpected):
1231 if flag == 1:
1232 self.assertEqual('0', v[i],
1233 f"'{unexpected.namedValues[i]}' "
1234 f"unexpected in {v}")
1236 def assertSequenceElementsEqual(self, expected, got, *,
1237 require_strict=None,
1238 unchecked=None,
1239 require_ordered=True):
1240 if self.strict_checking and require_ordered and not unchecked:
1241 self.assertEqual(expected, got)
1242 else:
1243 fail_msg = f'expected: {expected} got: {got}'
1245 ignored = set()
1246 if unchecked:
1247 ignored.update(unchecked)
1248 if require_strict and not self.strict_checking:
1249 ignored.update(require_strict)
1251 if ignored:
1252 fail_msg += f' (ignoring: {ignored})'
1253 expected = (x for x in expected if x not in ignored)
1254 got = (x for x in got if x not in ignored)
1256 self.assertCountEqual(expected, got, fail_msg)
1258 def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
1259 if epoch is None:
1260 epoch = time.time()
1261 if offset is not None:
1262 epoch = epoch + int(offset)
1263 dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
1264 return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
1266 def get_KerberosTime(self, epoch=None, offset=None):
1267 (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
1268 return s
1270 def get_EpochFromKerberosTime(self, kerberos_time):
1271 if isinstance(kerberos_time, bytes):
1272 kerberos_time = kerberos_time.decode()
1274 epoch = datetime.datetime.strptime(kerberos_time,
1275 '%Y%m%d%H%M%SZ')
1276 epoch = epoch.replace(tzinfo=datetime.timezone.utc)
1277 epoch = int(epoch.timestamp())
1279 return epoch
1281 def get_Nonce(self):
1282 nonce_min = 0x7f000000
1283 nonce_max = 0x7fffffff
1284 v = random.randint(nonce_min, nonce_max)
1285 return v
1287 def get_pa_dict(self, pa_data):
1288 pa_dict = {}
1290 if pa_data is not None:
1291 for pa in pa_data:
1292 pa_type = pa['padata-type']
1293 if pa_type in pa_dict:
1294 raise RuntimeError(f'Duplicate type {pa_type}')
1295 pa_dict[pa_type] = pa['padata-value']
1297 return pa_dict
1299 def SessionKey_create(self, etype, contents, kvno=None):
1300 key = kcrypto.Key(etype, contents)
1301 return RodcPacEncryptionKey(key, kvno)
1303 def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None,
1304 params=None):
1305 self.assertIsNotNone(pwd)
1306 self.assertIsNotNone(salt)
1307 key = kcrypto.string_to_key(etype, pwd, salt, params=params)
1308 return RodcPacEncryptionKey(key, kvno)
1310 def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
1311 e = etype_info2['etype']
1312 salt = etype_info2.get('salt')
1313 params = etype_info2.get('s2kparams')
1314 return self.PasswordKey_from_etype(creds, e,
1315 kvno=kvno,
1316 salt=salt,
1317 params=params)
1319 def PasswordKey_from_creds(self, creds, etype):
1320 kvno = creds.get_kvno()
1321 salt = creds.get_salt()
1322 return self.PasswordKey_from_etype(creds, etype,
1323 kvno=kvno,
1324 salt=salt)
1326 def PasswordKey_from_etype(self, creds, etype, kvno=None, salt=None, params=None):
1327 if etype == kcrypto.Enctype.RC4:
1328 nthash = creds.get_nt_hash()
1329 return self.SessionKey_create(etype=etype, contents=nthash, kvno=kvno)
1331 password = creds.get_password().encode('utf-8')
1332 return self.PasswordKey_create(
1333 etype=etype, pwd=password, salt=salt, kvno=kvno)
1335 def TicketDecryptionKey_from_creds(self, creds, etype=None):
1337 if etype is None:
1338 etypes = creds.get_tgs_krb5_etypes()
1339 if etypes and etypes[0] not in (kcrypto.Enctype.DES_CRC,
1340 kcrypto.Enctype.DES_MD5):
1341 etype = etypes[0]
1342 else:
1343 etype = kcrypto.Enctype.RC4
1345 forced_key = creds.get_forced_key(etype)
1346 if forced_key is not None:
1347 return forced_key
1349 kvno = creds.get_kvno()
1351 fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
1352 "nor a password specified, " % (
1353 creds.get_username(), etype, kvno))
1355 if etype == kcrypto.Enctype.RC4:
1356 nthash = creds.get_nt_hash()
1357 self.assertIsNotNone(nthash, msg=fail_msg)
1358 return self.SessionKey_create(etype=etype,
1359 contents=nthash,
1360 kvno=kvno)
1362 password = creds.get_password()
1363 self.assertIsNotNone(password, msg=fail_msg)
1364 salt = creds.get_salt()
1365 return self.PasswordKey_create(etype=etype,
1366 pwd=password,
1367 salt=salt,
1368 kvno=kvno)
1370 def RandomKey(self, etype):
1371 e = kcrypto._get_enctype_profile(etype)
1372 contents = samba.generate_random_bytes(e.keysize)
1373 return self.SessionKey_create(etype=etype, contents=contents)
1375 def EncryptionKey_import(self, EncryptionKey_obj):
1376 return self.SessionKey_create(EncryptionKey_obj['keytype'],
1377 EncryptionKey_obj['keyvalue'])
1379 def EncryptedData_create(self, key, usage, plaintext):
1380 # EncryptedData ::= SEQUENCE {
1381 # etype [0] Int32 -- EncryptionType --,
1382 # kvno [1] Int32 OPTIONAL,
1383 # cipher [2] OCTET STRING -- ciphertext
1385 ciphertext = key.encrypt(usage, plaintext)
1386 EncryptedData_obj = {
1387 'etype': key.etype,
1388 'cipher': ciphertext
1390 if key.kvno is not None:
1391 EncryptedData_obj['kvno'] = key.kvno
1392 return EncryptedData_obj
1394 def Checksum_create(self, key, usage, plaintext, ctype=None):
1395 # Checksum ::= SEQUENCE {
1396 # cksumtype [0] Int32,
1397 # checksum [1] OCTET STRING
1399 if ctype is None:
1400 ctype = key.ctype
1401 checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1402 Checksum_obj = {
1403 'cksumtype': ctype,
1404 'checksum': checksum,
1406 return Checksum_obj
1408 @classmethod
1409 def PrincipalName_create(cls, name_type, names):
1410 # PrincipalName ::= SEQUENCE {
1411 # name-type [0] Int32,
1412 # name-string [1] SEQUENCE OF KerberosString
1414 PrincipalName_obj = {
1415 'name-type': name_type,
1416 'name-string': names,
1418 return PrincipalName_obj
1420 def AuthorizationData_create(self, ad_type, ad_data):
1421 # AuthorizationData ::= SEQUENCE {
1422 # ad-type [0] Int32,
1423 # ad-data [1] OCTET STRING
1425 AUTH_DATA_obj = {
1426 'ad-type': ad_type,
1427 'ad-data': ad_data
1429 return AUTH_DATA_obj
1431 def PA_DATA_create(self, padata_type, padata_value):
1432 # PA-DATA ::= SEQUENCE {
1433 # -- NOTE: first tag is [1], not [0]
1434 # padata-type [1] Int32,
1435 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1437 PA_DATA_obj = {
1438 'padata-type': padata_type,
1439 'padata-value': padata_value,
1441 return PA_DATA_obj
1443 def PA_ENC_TS_ENC_create(self, ts, usec):
1444 # PA-ENC-TS-ENC ::= SEQUENCE {
1445 # patimestamp[0] KerberosTime, -- client's time
1446 # pausec[1] krb5int32 OPTIONAL
1448 PA_ENC_TS_ENC_obj = {
1449 'patimestamp': ts,
1450 'pausec': usec,
1452 return PA_ENC_TS_ENC_obj
1454 def PA_PAC_OPTIONS_create(self, options):
1455 # PA-PAC-OPTIONS ::= SEQUENCE {
1456 # options [0] PACOptionFlags
1458 PA_PAC_OPTIONS_obj = {
1459 'options': options
1461 return PA_PAC_OPTIONS_obj
1463 def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1464 # KrbFastArmor ::= SEQUENCE {
1465 # armor-type [0] Int32,
1466 # armor-value [1] OCTET STRING,
1467 # ...
1469 KRB_FAST_ARMOR_obj = {
1470 'armor-type': armor_type,
1471 'armor-value': armor_value
1473 return KRB_FAST_ARMOR_obj
1475 def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1476 # KrbFastReq ::= SEQUENCE {
1477 # fast-options [0] FastOptions,
1478 # padata [1] SEQUENCE OF PA-DATA,
1479 # req-body [2] KDC-REQ-BODY,
1480 # ...
1482 KRB_FAST_REQ_obj = {
1483 'fast-options': fast_options,
1484 'padata': padata,
1485 'req-body': req_body
1487 return KRB_FAST_REQ_obj
1489 def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1490 # KrbFastArmoredReq ::= SEQUENCE {
1491 # armor [0] KrbFastArmor OPTIONAL,
1492 # req-checksum [1] Checksum,
1493 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1495 KRB_FAST_ARMORED_REQ_obj = {
1496 'req-checksum': req_checksum,
1497 'enc-fast-req': enc_fast_req
1499 if armor is not None:
1500 KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1501 return KRB_FAST_ARMORED_REQ_obj
1503 def PA_FX_FAST_REQUEST_create(self, armored_data):
1504 # PA-FX-FAST-REQUEST ::= CHOICE {
1505 # armored-data [0] KrbFastArmoredReq,
1506 # ...
1508 PA_FX_FAST_REQUEST_obj = {
1509 'armored-data': armored_data
1511 return PA_FX_FAST_REQUEST_obj
1513 def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1514 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1515 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1516 # -- include PAC.
1517 # --If FALSE, and PAC present,
1518 # -- remove PAC.
1520 KERB_PA_PAC_REQUEST_obj = {
1521 'include-pac': include_pac,
1523 if not pa_data_create:
1524 return KERB_PA_PAC_REQUEST_obj
1525 pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1526 asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1527 pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1528 return pa_data
1530 def get_pa_pac_options(self, options):
1531 pac_options = self.PA_PAC_OPTIONS_create(options)
1532 pac_options = self.der_encode(pac_options,
1533 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
1534 pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options)
1536 return pac_options
1538 def KDC_REQ_BODY_create(self,
1539 kdc_options,
1540 cname,
1541 realm,
1542 sname,
1543 from_time,
1544 till_time,
1545 renew_time,
1546 nonce,
1547 etypes,
1548 addresses,
1549 additional_tickets,
1550 EncAuthorizationData,
1551 EncAuthorizationData_key,
1552 EncAuthorizationData_usage,
1553 asn1_print=None,
1554 hexdump=None):
1555 # KDC-REQ-BODY ::= SEQUENCE {
1556 # kdc-options [0] KDCOptions,
1557 # cname [1] PrincipalName OPTIONAL
1558 # -- Used only in AS-REQ --,
1559 # realm [2] Realm
1560 # -- Server's realm
1561 # -- Also client's in AS-REQ --,
1562 # sname [3] PrincipalName OPTIONAL,
1563 # from [4] KerberosTime OPTIONAL,
1564 # till [5] KerberosTime,
1565 # rtime [6] KerberosTime OPTIONAL,
1566 # nonce [7] UInt32,
1567 # etype [8] SEQUENCE OF Int32
1568 # -- EncryptionType
1569 # -- in preference order --,
1570 # addresses [9] HostAddresses OPTIONAL,
1571 # enc-authorization-data [10] EncryptedData OPTIONAL
1572 # -- AuthorizationData --,
1573 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1574 # -- NOTE: not empty
1576 if EncAuthorizationData is not None:
1577 enc_ad_plain = self.der_encode(
1578 EncAuthorizationData,
1579 asn1Spec=krb5_asn1.AuthorizationData(),
1580 asn1_print=asn1_print,
1581 hexdump=hexdump)
1582 enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1583 EncAuthorizationData_usage,
1584 enc_ad_plain)
1585 else:
1586 enc_ad = None
1587 KDC_REQ_BODY_obj = {
1588 'kdc-options': kdc_options,
1589 'realm': realm,
1590 'till': till_time,
1591 'nonce': nonce,
1592 'etype': etypes,
1594 if cname is not None:
1595 KDC_REQ_BODY_obj['cname'] = cname
1596 if sname is not None:
1597 KDC_REQ_BODY_obj['sname'] = sname
1598 if from_time is not None:
1599 KDC_REQ_BODY_obj['from'] = from_time
1600 if renew_time is not None:
1601 KDC_REQ_BODY_obj['rtime'] = renew_time
1602 if addresses is not None:
1603 KDC_REQ_BODY_obj['addresses'] = addresses
1604 if enc_ad is not None:
1605 KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1606 if additional_tickets is not None:
1607 KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1608 return KDC_REQ_BODY_obj
1610 def KDC_REQ_create(self,
1611 msg_type,
1612 padata,
1613 req_body,
1614 asn1Spec=None,
1615 asn1_print=None,
1616 hexdump=None):
1617 # KDC-REQ ::= SEQUENCE {
1618 # -- NOTE: first tag is [1], not [0]
1619 # pvno [1] INTEGER (5) ,
1620 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1621 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1622 # -- NOTE: not empty --,
1623 # req-body [4] KDC-REQ-BODY
1626 KDC_REQ_obj = {
1627 'pvno': 5,
1628 'msg-type': msg_type,
1629 'req-body': req_body,
1631 if padata is not None:
1632 KDC_REQ_obj['padata'] = padata
1633 if asn1Spec is not None:
1634 KDC_REQ_decoded = pyasn1_native_decode(
1635 KDC_REQ_obj, asn1Spec=asn1Spec)
1636 else:
1637 KDC_REQ_decoded = None
1638 return KDC_REQ_obj, KDC_REQ_decoded
1640 def AS_REQ_create(self,
1641 padata, # optional
1642 kdc_options, # required
1643 cname, # optional
1644 realm, # required
1645 sname, # optional
1646 from_time, # optional
1647 till_time, # required
1648 renew_time, # optional
1649 nonce, # required
1650 etypes, # required
1651 addresses, # optional
1652 additional_tickets,
1653 native_decoded_only=True,
1654 asn1_print=None,
1655 hexdump=None):
1656 # KDC-REQ ::= SEQUENCE {
1657 # -- NOTE: first tag is [1], not [0]
1658 # pvno [1] INTEGER (5) ,
1659 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1660 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1661 # -- NOTE: not empty --,
1662 # req-body [4] KDC-REQ-BODY
1665 # KDC-REQ-BODY ::= SEQUENCE {
1666 # kdc-options [0] KDCOptions,
1667 # cname [1] PrincipalName OPTIONAL
1668 # -- Used only in AS-REQ --,
1669 # realm [2] Realm
1670 # -- Server's realm
1671 # -- Also client's in AS-REQ --,
1672 # sname [3] PrincipalName OPTIONAL,
1673 # from [4] KerberosTime OPTIONAL,
1674 # till [5] KerberosTime,
1675 # rtime [6] KerberosTime OPTIONAL,
1676 # nonce [7] UInt32,
1677 # etype [8] SEQUENCE OF Int32
1678 # -- EncryptionType
1679 # -- in preference order --,
1680 # addresses [9] HostAddresses OPTIONAL,
1681 # enc-authorization-data [10] EncryptedData OPTIONAL
1682 # -- AuthorizationData --,
1683 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1684 # -- NOTE: not empty
1686 KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1687 kdc_options,
1688 cname,
1689 realm,
1690 sname,
1691 from_time,
1692 till_time,
1693 renew_time,
1694 nonce,
1695 etypes,
1696 addresses,
1697 additional_tickets,
1698 EncAuthorizationData=None,
1699 EncAuthorizationData_key=None,
1700 EncAuthorizationData_usage=None,
1701 asn1_print=asn1_print,
1702 hexdump=hexdump)
1703 obj, decoded = self.KDC_REQ_create(
1704 msg_type=KRB_AS_REQ,
1705 padata=padata,
1706 req_body=KDC_REQ_BODY_obj,
1707 asn1Spec=krb5_asn1.AS_REQ(),
1708 asn1_print=asn1_print,
1709 hexdump=hexdump)
1710 if native_decoded_only:
1711 return decoded
1712 return decoded, obj
1714 def AP_REQ_create(self, ap_options, ticket, authenticator):
1715 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1716 # pvno [0] INTEGER (5),
1717 # msg-type [1] INTEGER (14),
1718 # ap-options [2] APOptions,
1719 # ticket [3] Ticket,
1720 # authenticator [4] EncryptedData -- Authenticator
1722 AP_REQ_obj = {
1723 'pvno': 5,
1724 'msg-type': KRB_AP_REQ,
1725 'ap-options': ap_options,
1726 'ticket': ticket,
1727 'authenticator': authenticator,
1729 return AP_REQ_obj
1731 def Authenticator_create(
1732 self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1733 authorization_data):
1734 # -- Unencrypted authenticator
1735 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1736 # authenticator-vno [0] INTEGER (5),
1737 # crealm [1] Realm,
1738 # cname [2] PrincipalName,
1739 # cksum [3] Checksum OPTIONAL,
1740 # cusec [4] Microseconds,
1741 # ctime [5] KerberosTime,
1742 # subkey [6] EncryptionKey OPTIONAL,
1743 # seq-number [7] UInt32 OPTIONAL,
1744 # authorization-data [8] AuthorizationData OPTIONAL
1746 Authenticator_obj = {
1747 'authenticator-vno': 5,
1748 'crealm': crealm,
1749 'cname': cname,
1750 'cusec': cusec,
1751 'ctime': ctime,
1753 if cksum is not None:
1754 Authenticator_obj['cksum'] = cksum
1755 if subkey is not None:
1756 Authenticator_obj['subkey'] = subkey
1757 if seq_number is not None:
1758 Authenticator_obj['seq-number'] = seq_number
1759 if authorization_data is not None:
1760 Authenticator_obj['authorization-data'] = authorization_data
1761 return Authenticator_obj
1763 def TGS_REQ_create(self,
1764 padata, # optional
1765 cusec,
1766 ctime,
1767 ticket,
1768 kdc_options, # required
1769 cname, # optional
1770 realm, # required
1771 sname, # optional
1772 from_time, # optional
1773 till_time, # required
1774 renew_time, # optional
1775 nonce, # required
1776 etypes, # required
1777 addresses, # optional
1778 EncAuthorizationData,
1779 EncAuthorizationData_key,
1780 additional_tickets,
1781 ticket_session_key,
1782 authenticator_subkey=None,
1783 body_checksum_type=None,
1784 native_decoded_only=True,
1785 asn1_print=None,
1786 hexdump=None):
1787 # KDC-REQ ::= SEQUENCE {
1788 # -- NOTE: first tag is [1], not [0]
1789 # pvno [1] INTEGER (5) ,
1790 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1791 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1792 # -- NOTE: not empty --,
1793 # req-body [4] KDC-REQ-BODY
1796 # KDC-REQ-BODY ::= SEQUENCE {
1797 # kdc-options [0] KDCOptions,
1798 # cname [1] PrincipalName OPTIONAL
1799 # -- Used only in AS-REQ --,
1800 # realm [2] Realm
1801 # -- Server's realm
1802 # -- Also client's in AS-REQ --,
1803 # sname [3] PrincipalName OPTIONAL,
1804 # from [4] KerberosTime OPTIONAL,
1805 # till [5] KerberosTime,
1806 # rtime [6] KerberosTime OPTIONAL,
1807 # nonce [7] UInt32,
1808 # etype [8] SEQUENCE OF Int32
1809 # -- EncryptionType
1810 # -- in preference order --,
1811 # addresses [9] HostAddresses OPTIONAL,
1812 # enc-authorization-data [10] EncryptedData OPTIONAL
1813 # -- AuthorizationData --,
1814 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1815 # -- NOTE: not empty
1818 if authenticator_subkey is not None:
1819 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1820 else:
1821 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1823 req_body = self.KDC_REQ_BODY_create(
1824 kdc_options=kdc_options,
1825 cname=None,
1826 realm=realm,
1827 sname=sname,
1828 from_time=from_time,
1829 till_time=till_time,
1830 renew_time=renew_time,
1831 nonce=nonce,
1832 etypes=etypes,
1833 addresses=addresses,
1834 additional_tickets=additional_tickets,
1835 EncAuthorizationData=EncAuthorizationData,
1836 EncAuthorizationData_key=EncAuthorizationData_key,
1837 EncAuthorizationData_usage=EncAuthorizationData_usage)
1838 req_body_blob = self.der_encode(req_body,
1839 asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1840 asn1_print=asn1_print, hexdump=hexdump)
1842 req_body_checksum = self.Checksum_create(ticket_session_key,
1843 KU_TGS_REQ_AUTH_CKSUM,
1844 req_body_blob,
1845 ctype=body_checksum_type)
1847 subkey_obj = None
1848 if authenticator_subkey is not None:
1849 subkey_obj = authenticator_subkey.export_obj()
1850 seq_number = random.randint(0, 0xfffffffe)
1851 authenticator = self.Authenticator_create(
1852 crealm=realm,
1853 cname=cname,
1854 cksum=req_body_checksum,
1855 cusec=cusec,
1856 ctime=ctime,
1857 subkey=subkey_obj,
1858 seq_number=seq_number,
1859 authorization_data=None)
1860 authenticator = self.der_encode(
1861 authenticator,
1862 asn1Spec=krb5_asn1.Authenticator(),
1863 asn1_print=asn1_print,
1864 hexdump=hexdump)
1866 authenticator = self.EncryptedData_create(
1867 ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1869 ap_options = krb5_asn1.APOptions('0')
1870 ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1871 ticket=ticket,
1872 authenticator=authenticator)
1873 ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1874 asn1_print=asn1_print, hexdump=hexdump)
1875 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1876 if padata is not None:
1877 padata.append(pa_tgs_req)
1878 else:
1879 padata = [pa_tgs_req]
1881 obj, decoded = self.KDC_REQ_create(
1882 msg_type=KRB_TGS_REQ,
1883 padata=padata,
1884 req_body=req_body,
1885 asn1Spec=krb5_asn1.TGS_REQ(),
1886 asn1_print=asn1_print,
1887 hexdump=hexdump)
1888 if native_decoded_only:
1889 return decoded
1890 return decoded, obj
1892 def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1893 # PA-S4U2Self ::= SEQUENCE {
1894 # name [0] PrincipalName,
1895 # realm [1] Realm,
1896 # cksum [2] Checksum,
1897 # auth [3] GeneralString
1899 cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1900 for n in name['name-string']:
1901 cksum_data += n.encode()
1902 cksum_data += realm.encode()
1903 cksum_data += "Kerberos".encode()
1904 cksum = self.Checksum_create(tgt_session_key,
1905 KU_NON_KERB_CKSUM_SALT,
1906 cksum_data,
1907 ctype)
1909 PA_S4U2Self_obj = {
1910 'name': name,
1911 'realm': realm,
1912 'cksum': cksum,
1913 'auth': "Kerberos",
1915 pa_s4u2self = self.der_encode(
1916 PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
1917 return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
1919 def ChangePasswdDataMS_create(self,
1920 new_password,
1921 target_princ=None,
1922 target_realm=None):
1923 ChangePasswdDataMS_obj = {
1924 'newpasswd': new_password,
1926 if target_princ is not None:
1927 ChangePasswdDataMS_obj['targname'] = target_princ
1928 if target_realm is not None:
1929 ChangePasswdDataMS_obj['targrealm'] = target_realm
1931 change_password_data = self.der_encode(
1932 ChangePasswdDataMS_obj, asn1Spec=krb5_asn1.ChangePasswdDataMS())
1934 return change_password_data
1936 def KRB_PRIV_create(self,
1937 subkey,
1938 user_data,
1939 s_address,
1940 timestamp=None,
1941 usec=None,
1942 seq_number=None,
1943 r_address=None):
1944 EncKrbPrivPart_obj = {
1945 'user-data': user_data,
1946 's-address': s_address,
1948 if timestamp is not None:
1949 EncKrbPrivPart_obj['timestamp'] = timestamp
1950 if usec is not None:
1951 EncKrbPrivPart_obj['usec'] = usec
1952 if seq_number is not None:
1953 EncKrbPrivPart_obj['seq-number'] = seq_number
1954 if r_address is not None:
1955 EncKrbPrivPart_obj['r-address'] = r_address
1957 enc_krb_priv_part = self.der_encode(
1958 EncKrbPrivPart_obj, asn1Spec=krb5_asn1.EncKrbPrivPart())
1960 enc_data = self.EncryptedData_create(subkey,
1961 KU_KRB_PRIV,
1962 enc_krb_priv_part)
1964 KRB_PRIV_obj = {
1965 'pvno': 5,
1966 'msg-type': KRB_PRIV,
1967 'enc-part': enc_data,
1970 krb_priv = self.der_encode(
1971 KRB_PRIV_obj, asn1Spec=krb5_asn1.KRB_PRIV())
1973 return krb_priv
1975 def kpasswd_create(self,
1976 subkey,
1977 user_data,
1978 version,
1979 seq_number,
1980 ap_req,
1981 local_address,
1982 remote_address):
1983 self.assertIsNotNone(self.s, 'call self.connect() first')
1985 timestamp, usec = self.get_KerberosTimeWithUsec()
1987 krb_priv = self.KRB_PRIV_create(subkey,
1988 user_data,
1989 s_address=local_address,
1990 timestamp=timestamp,
1991 usec=usec,
1992 seq_number=seq_number,
1993 r_address=remote_address)
1995 size = 6 + len(ap_req) + len(krb_priv)
1996 self.assertLess(size, 0x10000)
1998 msg = bytearray()
1999 msg.append(size >> 8)
2000 msg.append(size & 0xff)
2001 msg.append(version >> 8)
2002 msg.append(version & 0xff)
2003 msg.append(len(ap_req) >> 8)
2004 msg.append(len(ap_req) & 0xff)
2005 # Note: for sets, there could be a little-endian four-byte length here.
2007 msg.extend(ap_req)
2008 msg.extend(krb_priv)
2010 return msg
2012 def get_enc_part(self, obj, key, usage):
2013 self.assertElementEqual(obj, 'pvno', 5)
2015 enc_part = obj['enc-part']
2016 self.assertElementEqual(enc_part, 'etype', key.etype)
2017 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
2019 enc_part = key.decrypt(usage, enc_part['cipher'])
2021 return enc_part
2023 def kpasswd_exchange(self,
2024 ticket,
2025 new_password,
2026 expected_code,
2027 expected_msg,
2028 mode,
2029 target_princ=None,
2030 target_realm=None,
2031 ap_options=None,
2032 send_seq_number=True):
2033 if mode is self.KpasswdMode.SET:
2034 version = 0xff80
2035 user_data = self.ChangePasswdDataMS_create(new_password,
2036 target_princ,
2037 target_realm)
2038 elif mode is self.KpasswdMode.CHANGE:
2039 self.assertIsNone(target_princ,
2040 'target_princ only valid for pw set')
2041 self.assertIsNone(target_realm,
2042 'target_realm only valid for pw set')
2044 version = 1
2045 user_data = new_password.encode('utf-8')
2046 else:
2047 self.fail(f'invalid mode {mode}')
2049 subkey = self.RandomKey(kcrypto.Enctype.AES256)
2051 if ap_options is None:
2052 ap_options = '0'
2053 ap_options = str(krb5_asn1.APOptions(ap_options))
2055 kdc_exchange_dict = {
2056 'tgt': ticket,
2057 'authenticator_subkey': subkey,
2058 'auth_data': None,
2059 'ap_options': ap_options,
2062 if send_seq_number:
2063 seq_number = random.randint(0, 0xfffffffe)
2064 else:
2065 seq_number = None
2067 ap_req = self.generate_ap_req(kdc_exchange_dict,
2068 None,
2069 req_body=None,
2070 armor=False,
2071 usage=KU_AP_REQ_AUTH,
2072 seq_number=seq_number)
2074 self.connect(self.host, port=464)
2075 self.assertIsNotNone(self.s)
2077 family = self.s.family
2079 if family == socket.AF_INET:
2080 addr_type = 2 # IPv4
2081 elif family == socket.AF_INET6:
2082 addr_type = 24 # IPv6
2083 else:
2084 self.fail(f'unknown family {family}')
2086 def create_address(ip):
2087 return {
2088 'addr-type': addr_type,
2089 'address': socket.inet_pton(family, ip),
2092 local_ip = self.s.getsockname()[0]
2093 local_address = create_address(local_ip)
2095 # remote_ip = self.s.getpeername()[0]
2096 # remote_address = create_address(remote_ip)
2098 # TODO: due to a bug (?), MIT Kerberos will not accept the request
2099 # unless r-address is set to our _local_ address. Heimdal, on the other
2100 # hand, requires the r-address is set to the remote address (as
2101 # expected). To avoid problems, avoid sending r-address for now.
2102 remote_address = None
2104 msg = self.kpasswd_create(subkey,
2105 user_data,
2106 version,
2107 seq_number,
2108 ap_req,
2109 local_address,
2110 remote_address)
2112 self.send_msg(msg)
2113 rep_pdu = self.recv_pdu_raw()
2115 self._disconnect('transaction done')
2117 self.assertIsNotNone(rep_pdu)
2119 header = rep_pdu[:6]
2120 reply = rep_pdu[6:]
2122 reply_len = (header[0] << 8) | header[1]
2123 reply_version = (header[2] << 8) | header[3]
2124 ap_rep_len = (header[4] << 8) | header[5]
2126 self.assertEqual(reply_len, len(rep_pdu))
2127 self.assertEqual(1, reply_version) # KRB5_KPASSWD_VERS_CHANGEPW
2128 self.assertLess(ap_rep_len, reply_len)
2130 self.assertNotEqual(0x7e, rep_pdu[1])
2131 self.assertNotEqual(0x5e, rep_pdu[1])
2133 if ap_rep_len:
2134 # We received an AP-REQ and KRB-PRIV as a response. This may or may
2135 # not indicate an error, depending on the status code.
2136 ap_rep = reply[:ap_rep_len]
2137 krb_priv = reply[ap_rep_len:]
2139 key = ticket.session_key
2141 ap_rep = self.der_decode(ap_rep, asn1Spec=krb5_asn1.AP_REP())
2142 self.assertElementEqual(ap_rep, 'msg-type', KRB_AP_REP)
2143 enc_part = self.get_enc_part(ap_rep, key, KU_AP_REQ_ENC_PART)
2144 enc_part = self.der_decode(
2145 enc_part, asn1Spec=krb5_asn1.EncAPRepPart())
2147 self.assertElementPresent(enc_part, 'ctime')
2148 self.assertElementPresent(enc_part, 'cusec')
2149 # self.assertElementMissing(enc_part, 'subkey') # TODO
2150 # self.assertElementPresent(enc_part, 'seq-number') # TODO
2152 try:
2153 krb_priv = self.der_decode(krb_priv, asn1Spec=krb5_asn1.KRB_PRIV())
2154 except PyAsn1Error:
2155 self.fail()
2157 self.assertElementEqual(krb_priv, 'msg-type', KRB_PRIV)
2158 priv_enc_part = self.get_enc_part(krb_priv, subkey, KU_KRB_PRIV)
2159 priv_enc_part = self.der_decode(
2160 priv_enc_part, asn1Spec=krb5_asn1.EncKrbPrivPart())
2162 self.assertElementMissing(priv_enc_part, 'timestamp')
2163 self.assertElementMissing(priv_enc_part, 'usec')
2164 # self.assertElementPresent(priv_enc_part, 'seq-number') # TODO
2165 # self.assertElementEqual(priv_enc_part, 's-address', remote_address) # TODO
2166 # self.assertElementMissing(priv_enc_part, 'r-address') # TODO
2168 result_data = priv_enc_part['user-data']
2169 else:
2170 # We received a KRB-ERROR as a response, indicating an error.
2171 krb_error = self.der_decode(reply, asn1Spec=krb5_asn1.KRB_ERROR())
2173 sname = self.PrincipalName_create(
2174 name_type=NT_PRINCIPAL,
2175 names=['kadmin', 'changepw'])
2176 realm = self.get_krbtgt_creds().get_realm().upper()
2178 self.assertElementEqual(krb_error, 'pvno', 5)
2179 self.assertElementEqual(krb_error, 'msg-type', KRB_ERROR)
2180 self.assertElementMissing(krb_error, 'ctime')
2181 self.assertElementMissing(krb_error, 'usec')
2182 self.assertElementPresent(krb_error, 'stime')
2183 self.assertElementPresent(krb_error, 'susec')
2185 error_code = krb_error['error-code']
2186 if isinstance(expected_code, int):
2187 self.assertEqual(error_code, expected_code)
2188 else:
2189 self.assertIn(error_code, expected_code)
2191 self.assertElementMissing(krb_error, 'crealm')
2192 self.assertElementMissing(krb_error, 'cname')
2193 self.assertElementEqual(krb_error, 'realm', realm.encode('utf-8'))
2194 self.assertElementEqualPrincipal(krb_error, 'sname', sname)
2195 self.assertElementMissing(krb_error, 'e-text')
2197 result_data = krb_error['e-data']
2199 status = result_data[:2]
2200 message = result_data[2:]
2202 status_code = (status[0] << 8) | status[1]
2203 if isinstance(expected_code, int):
2204 self.assertEqual(status_code, expected_code)
2205 else:
2206 self.assertIn(status_code, expected_code)
2208 if not message:
2209 self.assertEqual(0, status_code,
2210 'got an error result, but no message')
2211 return
2213 # Check the first character of the message.
2214 if message[0]:
2215 if isinstance(expected_msg, bytes):
2216 self.assertEqual(message, expected_msg)
2217 else:
2218 self.assertIn(message, expected_msg)
2219 else:
2220 # We got AD password policy information.
2221 self.assertEqual(30, len(message))
2223 (empty_bytes,
2224 min_length,
2225 history_length,
2226 properties,
2227 expire_time,
2228 min_age) = struct.unpack('>HIIIQQ', message)
2230 def _generic_kdc_exchange(self,
2231 kdc_exchange_dict, # required
2232 cname=None, # optional
2233 realm=None, # required
2234 sname=None, # optional
2235 from_time=None, # optional
2236 till_time=None, # required
2237 renew_time=None, # optional
2238 etypes=None, # required
2239 addresses=None, # optional
2240 additional_tickets=None, # optional
2241 EncAuthorizationData=None, # optional
2242 EncAuthorizationData_key=None, # optional
2243 EncAuthorizationData_usage=None): # optional
2245 check_error_fn = kdc_exchange_dict['check_error_fn']
2246 check_rep_fn = kdc_exchange_dict['check_rep_fn']
2247 generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
2248 generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
2249 generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
2250 generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
2251 callback_dict = kdc_exchange_dict['callback_dict']
2252 req_msg_type = kdc_exchange_dict['req_msg_type']
2253 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
2254 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2256 expected_error_mode = kdc_exchange_dict['expected_error_mode']
2257 kdc_options = kdc_exchange_dict['kdc_options']
2259 pac_request = kdc_exchange_dict['pac_request']
2260 pac_options = kdc_exchange_dict['pac_options']
2262 # Parameters specific to the inner request body
2263 inner_req = kdc_exchange_dict['inner_req']
2265 # Parameters specific to the outer request body
2266 outer_req = kdc_exchange_dict['outer_req']
2268 if till_time is None:
2269 till_time = self.get_KerberosTime(offset=36000)
2271 if 'nonce' in kdc_exchange_dict:
2272 nonce = kdc_exchange_dict['nonce']
2273 else:
2274 nonce = self.get_Nonce()
2275 kdc_exchange_dict['nonce'] = nonce
2277 req_body = self.KDC_REQ_BODY_create(
2278 kdc_options=kdc_options,
2279 cname=cname,
2280 realm=realm,
2281 sname=sname,
2282 from_time=from_time,
2283 till_time=till_time,
2284 renew_time=renew_time,
2285 nonce=nonce,
2286 etypes=etypes,
2287 addresses=addresses,
2288 additional_tickets=additional_tickets,
2289 EncAuthorizationData=EncAuthorizationData,
2290 EncAuthorizationData_key=EncAuthorizationData_key,
2291 EncAuthorizationData_usage=EncAuthorizationData_usage)
2293 inner_req_body = dict(req_body)
2294 if inner_req is not None:
2295 for key, value in inner_req.items():
2296 if value is not None:
2297 inner_req_body[key] = value
2298 else:
2299 del inner_req_body[key]
2300 if outer_req is not None:
2301 for key, value in outer_req.items():
2302 if value is not None:
2303 req_body[key] = value
2304 else:
2305 del req_body[key]
2307 additional_padata = []
2308 if pac_request is not None:
2309 pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request)
2310 additional_padata.append(pa_pac_request)
2311 if pac_options is not None:
2312 pa_pac_options = self.get_pa_pac_options(pac_options)
2313 additional_padata.append(pa_pac_options)
2315 if req_msg_type == KRB_AS_REQ:
2316 tgs_req = None
2317 tgs_req_padata = None
2318 else:
2319 self.assertEqual(KRB_TGS_REQ, req_msg_type)
2321 tgs_req = self.generate_ap_req(kdc_exchange_dict,
2322 callback_dict,
2323 req_body,
2324 armor=False)
2325 tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
2327 if generate_fast_padata_fn is not None:
2328 self.assertIsNotNone(generate_fast_fn)
2329 # This can alter req_body...
2330 fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
2331 callback_dict,
2332 req_body)
2333 else:
2334 fast_padata = []
2336 if generate_fast_armor_fn is not None:
2337 self.assertIsNotNone(generate_fast_fn)
2338 fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
2339 callback_dict,
2340 None,
2341 armor=True)
2343 fast_armor_type = kdc_exchange_dict['fast_armor_type']
2344 fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
2345 fast_ap_req)
2346 else:
2347 fast_armor = None
2349 if generate_padata_fn is not None:
2350 # This can alter req_body...
2351 outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
2352 callback_dict,
2353 req_body)
2354 self.assertIsNotNone(outer_padata)
2355 self.assertNotIn(PADATA_KDC_REQ,
2356 [pa['padata-type'] for pa in outer_padata],
2357 'Don\'t create TGS-REQ manually')
2358 else:
2359 outer_padata = None
2361 if generate_fast_fn is not None:
2362 armor_key = kdc_exchange_dict['armor_key']
2363 self.assertIsNotNone(armor_key)
2365 if req_msg_type == KRB_AS_REQ:
2366 checksum_blob = self.der_encode(
2367 req_body,
2368 asn1Spec=krb5_asn1.KDC_REQ_BODY())
2369 else:
2370 self.assertEqual(KRB_TGS_REQ, req_msg_type)
2371 checksum_blob = tgs_req
2373 checksum = self.Checksum_create(armor_key,
2374 KU_FAST_REQ_CHKSUM,
2375 checksum_blob)
2377 fast_padata += additional_padata
2378 fast = generate_fast_fn(kdc_exchange_dict,
2379 callback_dict,
2380 inner_req_body,
2381 fast_padata,
2382 fast_armor,
2383 checksum)
2384 else:
2385 fast = None
2387 padata = []
2389 if tgs_req_padata is not None:
2390 padata.append(tgs_req_padata)
2392 if fast is not None:
2393 padata.append(fast)
2395 if outer_padata is not None:
2396 padata += outer_padata
2398 if fast is None:
2399 padata += additional_padata
2401 if not padata:
2402 padata = None
2404 kdc_exchange_dict['req_padata'] = padata
2405 kdc_exchange_dict['fast_padata'] = fast_padata
2406 kdc_exchange_dict['req_body'] = inner_req_body
2408 req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
2409 padata=padata,
2410 req_body=req_body,
2411 asn1Spec=req_asn1Spec())
2413 kdc_exchange_dict['req_obj'] = req_obj
2415 to_rodc = kdc_exchange_dict['to_rodc']
2417 rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc)
2418 self.assertIsNotNone(rep)
2420 msg_type = self.getElementValue(rep, 'msg-type')
2421 self.assertIsNotNone(msg_type)
2423 expected_msg_type = None
2424 if check_error_fn is not None:
2425 expected_msg_type = KRB_ERROR
2426 self.assertIsNone(check_rep_fn)
2427 self.assertNotEqual(0, len(expected_error_mode))
2428 self.assertNotIn(0, expected_error_mode)
2429 if check_rep_fn is not None:
2430 expected_msg_type = rep_msg_type
2431 self.assertIsNone(check_error_fn)
2432 self.assertEqual(0, len(expected_error_mode))
2433 self.assertIsNotNone(expected_msg_type)
2434 if msg_type == KRB_ERROR:
2435 error_code = self.getElementValue(rep, 'error-code')
2436 fail_msg = f'Got unexpected error: {error_code}'
2437 else:
2438 fail_msg = f'Expected to fail with error: {expected_error_mode}'
2439 self.assertEqual(msg_type, expected_msg_type, fail_msg)
2441 if msg_type == KRB_ERROR:
2442 return check_error_fn(kdc_exchange_dict,
2443 callback_dict,
2444 rep)
2446 return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
2448 def as_exchange_dict(self,
2449 expected_crealm=None,
2450 expected_cname=None,
2451 expected_anon=False,
2452 expected_srealm=None,
2453 expected_sname=None,
2454 expected_account_name=None,
2455 expected_groups=None,
2456 unexpected_groups=None,
2457 expected_upn_name=None,
2458 expected_sid=None,
2459 expected_domain_sid=None,
2460 expected_supported_etypes=None,
2461 expected_flags=None,
2462 unexpected_flags=None,
2463 ticket_decryption_key=None,
2464 expect_ticket_checksum=None,
2465 expect_full_checksum=None,
2466 generate_fast_fn=None,
2467 generate_fast_armor_fn=None,
2468 generate_fast_padata_fn=None,
2469 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2470 generate_padata_fn=None,
2471 check_error_fn=None,
2472 check_rep_fn=None,
2473 check_kdc_private_fn=None,
2474 callback_dict=None,
2475 expected_error_mode=0,
2476 expected_status=None,
2477 client_as_etypes=None,
2478 expected_salt=None,
2479 authenticator_subkey=None,
2480 preauth_key=None,
2481 armor_key=None,
2482 armor_tgt=None,
2483 armor_subkey=None,
2484 auth_data=None,
2485 kdc_options='',
2486 inner_req=None,
2487 outer_req=None,
2488 pac_request=None,
2489 pac_options=None,
2490 ap_options=None,
2491 fast_ap_options=None,
2492 strict_edata_checking=True,
2493 expect_edata=None,
2494 expect_pac=True,
2495 expect_client_claims=None,
2496 expect_device_info=None,
2497 expect_device_claims=None,
2498 expect_upn_dns_info_ex=None,
2499 expect_pac_attrs=None,
2500 expect_pac_attrs_pac_request=None,
2501 expect_requester_sid=None,
2502 rc4_support=True,
2503 expected_client_claims=None,
2504 unexpected_client_claims=None,
2505 expected_device_claims=None,
2506 unexpected_device_claims=None,
2507 to_rodc=False):
2508 if expected_error_mode == 0:
2509 expected_error_mode = ()
2510 elif not isinstance(expected_error_mode, collections.abc.Container):
2511 expected_error_mode = (expected_error_mode,)
2513 kdc_exchange_dict = {
2514 'req_msg_type': KRB_AS_REQ,
2515 'req_asn1Spec': krb5_asn1.AS_REQ,
2516 'rep_msg_type': KRB_AS_REP,
2517 'rep_asn1Spec': krb5_asn1.AS_REP,
2518 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
2519 'expected_crealm': expected_crealm,
2520 'expected_cname': expected_cname,
2521 'expected_anon': expected_anon,
2522 'expected_srealm': expected_srealm,
2523 'expected_sname': expected_sname,
2524 'expected_account_name': expected_account_name,
2525 'expected_groups': expected_groups,
2526 'unexpected_groups': unexpected_groups,
2527 'expected_upn_name': expected_upn_name,
2528 'expected_sid': expected_sid,
2529 'expected_domain_sid': expected_domain_sid,
2530 'expected_supported_etypes': expected_supported_etypes,
2531 'expected_flags': expected_flags,
2532 'unexpected_flags': unexpected_flags,
2533 'ticket_decryption_key': ticket_decryption_key,
2534 'expect_ticket_checksum': expect_ticket_checksum,
2535 'expect_full_checksum': expect_full_checksum,
2536 'generate_fast_fn': generate_fast_fn,
2537 'generate_fast_armor_fn': generate_fast_armor_fn,
2538 'generate_fast_padata_fn': generate_fast_padata_fn,
2539 'fast_armor_type': fast_armor_type,
2540 'generate_padata_fn': generate_padata_fn,
2541 'check_error_fn': check_error_fn,
2542 'check_rep_fn': check_rep_fn,
2543 'check_kdc_private_fn': check_kdc_private_fn,
2544 'callback_dict': callback_dict,
2545 'expected_error_mode': expected_error_mode,
2546 'expected_status': expected_status,
2547 'client_as_etypes': client_as_etypes,
2548 'expected_salt': expected_salt,
2549 'authenticator_subkey': authenticator_subkey,
2550 'preauth_key': preauth_key,
2551 'armor_key': armor_key,
2552 'armor_tgt': armor_tgt,
2553 'armor_subkey': armor_subkey,
2554 'auth_data': auth_data,
2555 'kdc_options': kdc_options,
2556 'inner_req': inner_req,
2557 'outer_req': outer_req,
2558 'pac_request': pac_request,
2559 'pac_options': pac_options,
2560 'ap_options': ap_options,
2561 'fast_ap_options': fast_ap_options,
2562 'strict_edata_checking': strict_edata_checking,
2563 'expect_edata': expect_edata,
2564 'expect_pac': expect_pac,
2565 'expect_client_claims': expect_client_claims,
2566 'expect_device_info': expect_device_info,
2567 'expect_device_claims': expect_device_claims,
2568 'expect_upn_dns_info_ex': expect_upn_dns_info_ex,
2569 'expect_pac_attrs': expect_pac_attrs,
2570 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request,
2571 'expect_requester_sid': expect_requester_sid,
2572 'rc4_support': rc4_support,
2573 'expected_client_claims': expected_client_claims,
2574 'unexpected_client_claims': unexpected_client_claims,
2575 'expected_device_claims': expected_device_claims,
2576 'unexpected_device_claims': unexpected_device_claims,
2577 'to_rodc': to_rodc
2579 if callback_dict is None:
2580 callback_dict = {}
2582 return kdc_exchange_dict
2584 def tgs_exchange_dict(self,
2585 expected_crealm=None,
2586 expected_cname=None,
2587 expected_anon=False,
2588 expected_srealm=None,
2589 expected_sname=None,
2590 expected_account_name=None,
2591 expected_groups=None,
2592 unexpected_groups=None,
2593 expected_upn_name=None,
2594 expected_sid=None,
2595 expected_domain_sid=None,
2596 expected_supported_etypes=None,
2597 expected_flags=None,
2598 unexpected_flags=None,
2599 ticket_decryption_key=None,
2600 expect_ticket_checksum=None,
2601 expect_full_checksum=None,
2602 generate_fast_fn=None,
2603 generate_fast_armor_fn=None,
2604 generate_fast_padata_fn=None,
2605 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2606 generate_padata_fn=None,
2607 check_error_fn=None,
2608 check_rep_fn=None,
2609 check_kdc_private_fn=None,
2610 expected_error_mode=0,
2611 expected_status=None,
2612 callback_dict=None,
2613 tgt=None,
2614 armor_key=None,
2615 armor_tgt=None,
2616 armor_subkey=None,
2617 authenticator_subkey=None,
2618 auth_data=None,
2619 body_checksum_type=None,
2620 kdc_options='',
2621 inner_req=None,
2622 outer_req=None,
2623 pac_request=None,
2624 pac_options=None,
2625 ap_options=None,
2626 fast_ap_options=None,
2627 strict_edata_checking=True,
2628 expect_edata=None,
2629 expect_pac=True,
2630 expect_client_claims=None,
2631 expect_device_info=None,
2632 expect_device_claims=None,
2633 expect_upn_dns_info_ex=None,
2634 expect_pac_attrs=None,
2635 expect_pac_attrs_pac_request=None,
2636 expect_requester_sid=None,
2637 expected_proxy_target=None,
2638 expected_transited_services=None,
2639 rc4_support=True,
2640 expected_client_claims=None,
2641 unexpected_client_claims=None,
2642 expected_device_claims=None,
2643 unexpected_device_claims=None,
2644 to_rodc=False):
2645 if expected_error_mode == 0:
2646 expected_error_mode = ()
2647 elif not isinstance(expected_error_mode, collections.abc.Container):
2648 expected_error_mode = (expected_error_mode,)
2650 kdc_exchange_dict = {
2651 'req_msg_type': KRB_TGS_REQ,
2652 'req_asn1Spec': krb5_asn1.TGS_REQ,
2653 'rep_msg_type': KRB_TGS_REP,
2654 'rep_asn1Spec': krb5_asn1.TGS_REP,
2655 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
2656 'expected_crealm': expected_crealm,
2657 'expected_cname': expected_cname,
2658 'expected_anon': expected_anon,
2659 'expected_srealm': expected_srealm,
2660 'expected_sname': expected_sname,
2661 'expected_account_name': expected_account_name,
2662 'expected_groups': expected_groups,
2663 'unexpected_groups': unexpected_groups,
2664 'expected_upn_name': expected_upn_name,
2665 'expected_sid': expected_sid,
2666 'expected_domain_sid': expected_domain_sid,
2667 'expected_supported_etypes': expected_supported_etypes,
2668 'expected_flags': expected_flags,
2669 'unexpected_flags': unexpected_flags,
2670 'ticket_decryption_key': ticket_decryption_key,
2671 'expect_ticket_checksum': expect_ticket_checksum,
2672 'expect_full_checksum': expect_full_checksum,
2673 'generate_fast_fn': generate_fast_fn,
2674 'generate_fast_armor_fn': generate_fast_armor_fn,
2675 'generate_fast_padata_fn': generate_fast_padata_fn,
2676 'fast_armor_type': fast_armor_type,
2677 'generate_padata_fn': generate_padata_fn,
2678 'check_error_fn': check_error_fn,
2679 'check_rep_fn': check_rep_fn,
2680 'check_kdc_private_fn': check_kdc_private_fn,
2681 'callback_dict': callback_dict,
2682 'expected_error_mode': expected_error_mode,
2683 'expected_status': expected_status,
2684 'tgt': tgt,
2685 'body_checksum_type': body_checksum_type,
2686 'armor_key': armor_key,
2687 'armor_tgt': armor_tgt,
2688 'armor_subkey': armor_subkey,
2689 'auth_data': auth_data,
2690 'authenticator_subkey': authenticator_subkey,
2691 'kdc_options': kdc_options,
2692 'inner_req': inner_req,
2693 'outer_req': outer_req,
2694 'pac_request': pac_request,
2695 'pac_options': pac_options,
2696 'ap_options': ap_options,
2697 'fast_ap_options': fast_ap_options,
2698 'strict_edata_checking': strict_edata_checking,
2699 'expect_edata': expect_edata,
2700 'expect_pac': expect_pac,
2701 'expect_client_claims': expect_client_claims,
2702 'expect_device_info': expect_device_info,
2703 'expect_device_claims': expect_device_claims,
2704 'expect_upn_dns_info_ex': expect_upn_dns_info_ex,
2705 'expect_pac_attrs': expect_pac_attrs,
2706 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request,
2707 'expect_requester_sid': expect_requester_sid,
2708 'expected_proxy_target': expected_proxy_target,
2709 'expected_transited_services': expected_transited_services,
2710 'rc4_support': rc4_support,
2711 'expected_client_claims': expected_client_claims,
2712 'unexpected_client_claims': unexpected_client_claims,
2713 'expected_device_claims': expected_device_claims,
2714 'unexpected_device_claims': unexpected_device_claims,
2715 'to_rodc': to_rodc
2717 if callback_dict is None:
2718 callback_dict = {}
2720 return kdc_exchange_dict
2722 def generic_check_kdc_rep(self,
2723 kdc_exchange_dict,
2724 callback_dict,
2725 rep):
2727 expected_crealm = kdc_exchange_dict['expected_crealm']
2728 expected_anon = kdc_exchange_dict['expected_anon']
2729 expected_srealm = kdc_exchange_dict['expected_srealm']
2730 expected_sname = kdc_exchange_dict['expected_sname']
2731 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2732 check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
2733 rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
2734 msg_type = kdc_exchange_dict['rep_msg_type']
2735 armor_key = kdc_exchange_dict['armor_key']
2737 self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
2738 padata = self.getElementValue(rep, 'padata')
2739 if self.strict_checking:
2740 self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
2741 if self.cname_checking:
2742 if expected_anon:
2743 expected_cname = self.PrincipalName_create(
2744 name_type=NT_WELLKNOWN,
2745 names=['WELLKNOWN', 'ANONYMOUS'])
2746 else:
2747 expected_cname = kdc_exchange_dict['expected_cname']
2748 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2749 self.assertElementPresent(rep, 'ticket')
2750 ticket = self.getElementValue(rep, 'ticket')
2751 ticket_encpart = None
2752 ticket_cipher = None
2753 self.assertIsNotNone(ticket)
2754 if ticket is not None: # Never None, but gives indentation
2755 self.assertElementEqual(ticket, 'tkt-vno', 5)
2756 self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
2757 self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
2758 self.assertElementPresent(ticket, 'enc-part')
2759 ticket_encpart = self.getElementValue(ticket, 'enc-part')
2760 self.assertIsNotNone(ticket_encpart)
2761 if ticket_encpart is not None: # Never None, but gives indentation
2762 self.assertElementPresent(ticket_encpart, 'etype')
2764 kdc_options = kdc_exchange_dict['kdc_options']
2765 pos = len(tuple(krb5_asn1.KDCOptions('enc-tkt-in-skey'))) - 1
2766 expect_kvno = (pos >= len(kdc_options)
2767 or kdc_options[pos] != '1')
2768 if expect_kvno:
2769 # 'unspecified' means present, with any value != 0
2770 self.assertElementKVNO(ticket_encpart, 'kvno',
2771 self.unspecified_kvno)
2772 else:
2773 # For user-to-user, don't expect a kvno.
2774 self.assertElementMissing(ticket_encpart, 'kvno')
2776 self.assertElementPresent(ticket_encpart, 'cipher')
2777 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
2778 self.assertElementPresent(rep, 'enc-part')
2779 encpart = self.getElementValue(rep, 'enc-part')
2780 encpart_cipher = None
2781 self.assertIsNotNone(encpart)
2782 if encpart is not None: # Never None, but gives indentation
2783 self.assertElementPresent(encpart, 'etype')
2784 self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
2785 self.assertElementPresent(encpart, 'cipher')
2786 encpart_cipher = self.getElementValue(encpart, 'cipher')
2788 if self.padata_checking:
2789 self.check_reply_padata(kdc_exchange_dict,
2790 callback_dict,
2791 encpart,
2792 padata)
2794 ticket_checksum = None
2796 # Get the decryption key for the encrypted part
2797 encpart_decryption_key, encpart_decryption_usage = (
2798 self.get_preauth_key(kdc_exchange_dict))
2800 if armor_key is not None:
2801 pa_dict = self.get_pa_dict(padata)
2803 if PADATA_FX_FAST in pa_dict:
2804 fx_fast_data = pa_dict[PADATA_FX_FAST]
2805 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
2806 fx_fast_data,
2807 armor_key,
2808 finished=True)
2810 if 'strengthen-key' in fast_response:
2811 strengthen_key = self.EncryptionKey_import(
2812 fast_response['strengthen-key'])
2813 encpart_decryption_key = (
2814 self.generate_strengthen_reply_key(
2815 strengthen_key,
2816 encpart_decryption_key))
2818 fast_finished = fast_response.get('finished')
2819 if fast_finished is not None:
2820 ticket_checksum = fast_finished['ticket-checksum']
2822 self.check_rep_padata(kdc_exchange_dict,
2823 callback_dict,
2824 fast_response['padata'],
2825 error_code=0)
2827 ticket_private = None
2828 if ticket_decryption_key is not None:
2829 self.assertElementEqual(ticket_encpart, 'etype',
2830 ticket_decryption_key.etype)
2831 self.assertElementKVNO(ticket_encpart, 'kvno',
2832 ticket_decryption_key.kvno)
2833 ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
2834 ticket_cipher)
2835 ticket_private = self.der_decode(
2836 ticket_decpart,
2837 asn1Spec=krb5_asn1.EncTicketPart())
2839 encpart_private = None
2840 self.assertIsNotNone(encpart_decryption_key)
2841 if encpart_decryption_key is not None:
2842 self.assertElementEqual(encpart, 'etype',
2843 encpart_decryption_key.etype)
2844 if self.strict_checking:
2845 self.assertElementKVNO(encpart, 'kvno',
2846 encpart_decryption_key.kvno)
2847 rep_decpart = encpart_decryption_key.decrypt(
2848 encpart_decryption_usage,
2849 encpart_cipher)
2850 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2851 # application tag 26
2852 try:
2853 encpart_private = self.der_decode(
2854 rep_decpart,
2855 asn1Spec=rep_encpart_asn1Spec())
2856 except Exception:
2857 encpart_private = self.der_decode(
2858 rep_decpart,
2859 asn1Spec=krb5_asn1.EncTGSRepPart())
2861 kdc_exchange_dict['reply_key'] = encpart_decryption_key
2863 self.assertIsNotNone(check_kdc_private_fn)
2864 if check_kdc_private_fn is not None:
2865 check_kdc_private_fn(kdc_exchange_dict, callback_dict,
2866 rep, ticket_private, encpart_private,
2867 ticket_checksum)
2869 return rep
2871 def check_fx_fast_data(self,
2872 kdc_exchange_dict,
2873 fx_fast_data,
2874 armor_key,
2875 finished=False,
2876 expect_strengthen_key=True):
2877 fx_fast_data = self.der_decode(fx_fast_data,
2878 asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
2880 enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
2881 self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
2883 fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
2885 fast_response = self.der_decode(fast_rep,
2886 asn1Spec=krb5_asn1.KrbFastResponse())
2888 if expect_strengthen_key and self.strict_checking:
2889 self.assertIn('strengthen-key', fast_response)
2891 if finished:
2892 self.assertIn('finished', fast_response)
2894 # Ensure that the nonce matches the nonce in the body of the request
2895 # (RFC6113 5.4.3).
2896 nonce = kdc_exchange_dict['nonce']
2897 self.assertEqual(nonce, fast_response['nonce'])
2899 return fast_response
2901 def generic_check_kdc_private(self,
2902 kdc_exchange_dict,
2903 callback_dict,
2904 rep,
2905 ticket_private,
2906 encpart_private,
2907 ticket_checksum):
2908 kdc_options = kdc_exchange_dict['kdc_options']
2909 canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
2910 canonicalize = (canon_pos < len(kdc_options)
2911 and kdc_options[canon_pos] == '1')
2912 renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1
2913 renewable = (renewable_pos < len(kdc_options)
2914 and kdc_options[renewable_pos] == '1')
2915 renew_pos = len(tuple(krb5_asn1.KDCOptions('renew'))) - 1
2916 renew = (renew_pos < len(kdc_options)
2917 and kdc_options[renew_pos] == '1')
2918 expect_renew_till = renewable or renew
2920 expected_crealm = kdc_exchange_dict['expected_crealm']
2921 expected_cname = kdc_exchange_dict['expected_cname']
2922 expected_srealm = kdc_exchange_dict['expected_srealm']
2923 expected_sname = kdc_exchange_dict['expected_sname']
2924 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2926 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2928 expected_flags = kdc_exchange_dict.get('expected_flags')
2929 unexpected_flags = kdc_exchange_dict.get('unexpected_flags')
2931 ticket = self.getElementValue(rep, 'ticket')
2933 if ticket_checksum is not None:
2934 armor_key = kdc_exchange_dict['armor_key']
2935 self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
2937 to_rodc = kdc_exchange_dict['to_rodc']
2938 if to_rodc:
2939 krbtgt_creds = self.get_rodc_krbtgt_creds()
2940 else:
2941 krbtgt_creds = self.get_krbtgt_creds()
2942 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
2944 krbtgt_keys = [krbtgt_key]
2945 if not self.strict_checking:
2946 krbtgt_key_rc4 = self.TicketDecryptionKey_from_creds(
2947 krbtgt_creds,
2948 etype=kcrypto.Enctype.RC4)
2949 krbtgt_keys.append(krbtgt_key_rc4)
2951 if self.expect_pac and self.is_tgs(expected_sname):
2952 expect_pac = True
2953 else:
2954 expect_pac = kdc_exchange_dict['expect_pac']
2956 ticket_session_key = None
2957 if ticket_private is not None:
2958 self.assertElementFlags(ticket_private, 'flags',
2959 expected_flags,
2960 unexpected_flags)
2961 self.assertElementPresent(ticket_private, 'key')
2962 ticket_key = self.getElementValue(ticket_private, 'key')
2963 self.assertIsNotNone(ticket_key)
2964 if ticket_key is not None: # Never None, but gives indentation
2965 self.assertElementPresent(ticket_key, 'keytype')
2966 self.assertElementPresent(ticket_key, 'keyvalue')
2967 ticket_session_key = self.EncryptionKey_import(ticket_key)
2968 self.assertElementEqualUTF8(ticket_private, 'crealm',
2969 expected_crealm)
2970 if self.cname_checking:
2971 self.assertElementEqualPrincipal(ticket_private, 'cname',
2972 expected_cname)
2973 self.assertElementPresent(ticket_private, 'transited')
2974 self.assertElementPresent(ticket_private, 'authtime')
2975 if self.strict_checking:
2976 self.assertElementPresent(ticket_private, 'starttime')
2977 self.assertElementPresent(ticket_private, 'endtime')
2978 if self.strict_checking:
2979 if expect_renew_till:
2980 self.assertElementPresent(ticket_private, 'renew-till')
2981 else:
2982 self.assertElementMissing(ticket_private, 'renew-till')
2983 if self.strict_checking:
2984 self.assertElementMissing(ticket_private, 'caddr')
2985 if expect_pac is not None:
2986 if expect_pac:
2987 self.assertElementPresent(ticket_private,
2988 'authorization-data',
2989 expect_empty=not expect_pac)
2990 else:
2991 # It is more correct to not have an authorization-data
2992 # present than an empty one.
2994 # https://github.com/krb5/krb5/pull/1225#issuecomment-995104193
2995 v = self.getElementValue(ticket_private,
2996 'authorization-data')
2997 if v is not None:
2998 self.assertElementPresent(ticket_private,
2999 'authorization-data',
3000 expect_empty=True)
3002 encpart_session_key = None
3003 if encpart_private is not None:
3004 self.assertElementPresent(encpart_private, 'key')
3005 encpart_key = self.getElementValue(encpart_private, 'key')
3006 self.assertIsNotNone(encpart_key)
3007 if encpart_key is not None: # Never None, but gives indentation
3008 self.assertElementPresent(encpart_key, 'keytype')
3009 self.assertElementPresent(encpart_key, 'keyvalue')
3010 encpart_session_key = self.EncryptionKey_import(encpart_key)
3011 self.assertElementPresent(encpart_private, 'last-req')
3012 self.assertElementEqual(encpart_private, 'nonce',
3013 kdc_exchange_dict['nonce'])
3014 if rep_msg_type == KRB_AS_REP:
3015 if self.strict_checking:
3016 self.assertElementPresent(encpart_private,
3017 'key-expiration')
3018 else:
3019 self.assertElementMissing(encpart_private,
3020 'key-expiration')
3021 self.assertElementFlags(encpart_private, 'flags',
3022 expected_flags,
3023 unexpected_flags)
3024 self.assertElementPresent(encpart_private, 'authtime')
3025 if self.strict_checking:
3026 self.assertElementPresent(encpart_private, 'starttime')
3027 self.assertElementPresent(encpart_private, 'endtime')
3028 if self.strict_checking:
3029 if expect_renew_till:
3030 self.assertElementPresent(encpart_private, 'renew-till')
3031 else:
3032 self.assertElementMissing(encpart_private, 'renew-till')
3033 self.assertElementEqualUTF8(encpart_private, 'srealm',
3034 expected_srealm)
3035 self.assertElementEqualPrincipal(encpart_private, 'sname',
3036 expected_sname)
3037 if self.strict_checking:
3038 self.assertElementMissing(encpart_private, 'caddr')
3040 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
3042 sent_enc_pa_rep = self.sent_enc_pa_rep(kdc_exchange_dict)
3044 enc_padata = self.getElementValue(encpart_private,
3045 'encrypted-pa-data')
3046 if (canonicalize or '1' in sent_pac_options or (
3047 rep_msg_type == KRB_AS_REP and sent_enc_pa_rep)):
3048 if self.strict_checking:
3049 self.assertIsNotNone(enc_padata)
3051 if enc_padata is not None:
3052 enc_pa_dict = self.get_pa_dict(enc_padata)
3053 if self.strict_checking:
3054 if canonicalize:
3055 self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
3056 else:
3057 self.assertNotIn(PADATA_SUPPORTED_ETYPES,
3058 enc_pa_dict)
3060 if '1' in sent_pac_options:
3061 self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
3062 else:
3063 self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
3065 if rep_msg_type == KRB_AS_REP and sent_enc_pa_rep:
3066 self.assertIn(PADATA_REQ_ENC_PA_REP, enc_pa_dict)
3067 else:
3068 self.assertNotIn(PADATA_REQ_ENC_PA_REP, enc_pa_dict)
3070 if PADATA_SUPPORTED_ETYPES in enc_pa_dict:
3071 expected_supported_etypes = kdc_exchange_dict[
3072 'expected_supported_etypes']
3074 (supported_etypes,) = struct.unpack(
3075 '<L',
3076 enc_pa_dict[PADATA_SUPPORTED_ETYPES])
3078 ignore_bits = (security.KERB_ENCTYPE_DES_CBC_CRC |
3079 security.KERB_ENCTYPE_DES_CBC_MD5)
3081 self.assertEqual(
3082 supported_etypes & ~ignore_bits,
3083 expected_supported_etypes & ~ignore_bits,
3084 f'PADATA_SUPPORTED_ETYPES: got: {supported_etypes} (0x{supported_etypes:X}), '
3085 f'expected: {expected_supported_etypes} (0x{expected_supported_etypes:X})')
3087 if PADATA_PAC_OPTIONS in enc_pa_dict:
3088 pac_options = self.der_decode(
3089 enc_pa_dict[PADATA_PAC_OPTIONS],
3090 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
3092 self.assertElementEqual(pac_options, 'options',
3093 sent_pac_options)
3095 if PADATA_REQ_ENC_PA_REP in enc_pa_dict:
3096 enc_pa_rep = enc_pa_dict[PADATA_REQ_ENC_PA_REP]
3098 enc_pa_rep = self.der_decode(
3099 enc_pa_rep,
3100 asn1Spec=krb5_asn1.Checksum())
3102 reply_key = kdc_exchange_dict['reply_key']
3103 req_obj = kdc_exchange_dict['req_obj']
3104 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
3106 req_obj = self.der_encode(req_obj,
3107 asn1Spec=req_asn1Spec())
3109 checksum = enc_pa_rep['checksum']
3110 ctype = enc_pa_rep['cksumtype']
3112 reply_key.verify_checksum(KU_AS_REQ,
3113 req_obj,
3114 ctype,
3115 checksum)
3116 else:
3117 if enc_padata is not None:
3118 self.assertEqual(enc_padata, [])
3120 if ticket_session_key is not None and encpart_session_key is not None:
3121 self.assertEqual(ticket_session_key.etype,
3122 encpart_session_key.etype)
3123 self.assertEqual(ticket_session_key.key.contents,
3124 encpart_session_key.key.contents)
3125 if encpart_session_key is not None:
3126 session_key = encpart_session_key
3127 else:
3128 session_key = ticket_session_key
3129 ticket_creds = KerberosTicketCreds(
3130 ticket,
3131 session_key,
3132 crealm=expected_crealm,
3133 cname=expected_cname,
3134 srealm=expected_srealm,
3135 sname=expected_sname,
3136 decryption_key=ticket_decryption_key,
3137 ticket_private=ticket_private,
3138 encpart_private=encpart_private)
3140 if ticket_private is not None:
3141 pac_data = self.get_ticket_pac(ticket_creds, expect_pac=expect_pac)
3142 if expect_pac is True:
3143 self.assertIsNotNone(pac_data)
3144 elif expect_pac is False:
3145 self.assertIsNone(pac_data)
3147 if pac_data is not None:
3148 self.check_pac_buffers(pac_data, kdc_exchange_dict)
3150 expect_ticket_checksum = kdc_exchange_dict['expect_ticket_checksum']
3151 expect_full_checksum = kdc_exchange_dict['expect_full_checksum']
3152 if expect_ticket_checksum or expect_full_checksum:
3153 self.assertIsNotNone(ticket_decryption_key)
3155 if ticket_decryption_key is not None:
3156 service_ticket = (rep_msg_type == KRB_TGS_REP
3157 and not self.is_tgs_principal(expected_sname))
3158 self.verify_ticket(ticket_creds, krbtgt_keys,
3159 service_ticket=service_ticket,
3160 expect_pac=expect_pac,
3161 expect_ticket_checksum=expect_ticket_checksum
3162 or self.tkt_sig_support,
3163 expect_full_checksum=expect_full_checksum
3164 or self.full_sig_support)
3166 kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
3168 # Check the SIDs in a LOGON_INFO PAC buffer.
3169 def check_logon_info_sids(self, logon_info_buffer, kdc_exchange_dict):
3170 info3 = logon_info_buffer.info.info.info3
3171 logon_info = info3.base
3172 resource_groups = logon_info_buffer.info.info.resource_groups
3174 expected_groups = kdc_exchange_dict['expected_groups']
3175 unexpected_groups = kdc_exchange_dict['unexpected_groups']
3176 expected_domain_sid = kdc_exchange_dict['expected_domain_sid']
3177 expected_sid = kdc_exchange_dict['expected_sid']
3179 domain_sid = logon_info.domain_sid
3180 if expected_domain_sid is not None:
3181 self.assertEqual(expected_domain_sid, str(domain_sid))
3183 if expected_sid is not None:
3184 got_sid = f'{domain_sid}-{logon_info.rid}'
3185 self.assertEqual(expected_sid, got_sid)
3187 if expected_groups is None and unexpected_groups is None:
3188 # Nothing more to do.
3189 return
3191 # Check the SIDs in the PAC.
3193 # A representation of the PAC.
3194 pac_sids = set()
3196 # Collect the Extra SIDs.
3197 if info3.sids is not None:
3198 self.assertTrue(logon_info.user_flags & (
3199 netlogon.NETLOGON_EXTRA_SIDS),
3200 'extra SIDs present, but EXTRA_SIDS flag not set')
3201 self.assertTrue(info3.sids, 'got empty SIDs')
3203 for sid_attr in info3.sids:
3204 got_sid = str(sid_attr.sid)
3205 if unexpected_groups is not None:
3206 self.assertNotIn(got_sid, unexpected_groups)
3208 pac_sid = (got_sid,
3209 self.SidType.EXTRA_SID,
3210 sid_attr.attributes)
3211 self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID')
3212 pac_sids.add(pac_sid)
3213 else:
3214 self.assertFalse(logon_info.user_flags & (
3215 netlogon.NETLOGON_EXTRA_SIDS),
3216 'no extra SIDs present, but EXTRA_SIDS flag set')
3218 # Collect the Base RIDs.
3219 if logon_info.groups.rids is not None:
3220 self.assertTrue(logon_info.groups.rids, 'got empty RIDs')
3222 for group in logon_info.groups.rids:
3223 got_sid = f'{domain_sid}-{group.rid}'
3224 if unexpected_groups is not None:
3225 self.assertNotIn(got_sid, unexpected_groups)
3227 pac_sid = (got_sid, self.SidType.BASE_SID, group.attributes)
3228 self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID')
3229 pac_sids.add(pac_sid)
3231 # Collect the Resource SIDs.
3232 if resource_groups.groups.rids is not None:
3233 self.assertTrue(logon_info.user_flags & (
3234 netlogon.NETLOGON_RESOURCE_GROUPS),
3235 'resource groups present, but RESOURCE_GROUPS '
3236 'flag not set')
3237 self.assertTrue(resource_groups.groups.rids, 'got empty RIDs')
3239 resource_group_sid = resource_groups.domain_sid
3240 for resource_group in resource_groups.groups.rids:
3241 got_sid = f'{resource_group_sid}-{resource_group.rid}'
3242 if unexpected_groups is not None:
3243 self.assertNotIn(got_sid, unexpected_groups)
3245 pac_sid = (got_sid,
3246 self.SidType.RESOURCE_SID,
3247 resource_group.attributes)
3248 self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID')
3249 pac_sids.add(pac_sid)
3250 else:
3251 self.assertFalse(logon_info.user_flags & (
3252 netlogon.NETLOGON_RESOURCE_GROUPS),
3253 'no resource groups present, but RESOURCE_GROUPS '
3254 'flag set')
3256 # Compare the aggregated SIDs against the set of expected SIDs.
3257 if expected_groups is not None:
3258 if ... in expected_groups:
3259 # The caller is only interested in asserting the
3260 # presence of particular groups, and doesn't mind if
3261 # other groups are present as well.
3262 pac_sids.add(...)
3263 self.assertLessEqual(expected_groups, pac_sids,
3264 'expected groups')
3265 else:
3266 # The caller wants to make sure the groups match
3267 # exactly.
3268 self.assertEqual(expected_groups, pac_sids,
3269 'expected != got')
3271 def check_pac_buffers(self, pac_data, kdc_exchange_dict):
3272 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
3274 rep_msg_type = kdc_exchange_dict['rep_msg_type']
3275 armor_tgt = kdc_exchange_dict['armor_tgt']
3277 compound_id = rep_msg_type == KRB_TGS_REP and armor_tgt is not None
3279 expected_sname = kdc_exchange_dict['expected_sname']
3280 expect_client_claims = kdc_exchange_dict['expect_client_claims']
3281 expect_device_info = kdc_exchange_dict['expect_device_info']
3282 expect_device_claims = kdc_exchange_dict['expect_device_claims']
3284 expected_types = [krb5pac.PAC_TYPE_LOGON_INFO,
3285 krb5pac.PAC_TYPE_SRV_CHECKSUM,
3286 krb5pac.PAC_TYPE_KDC_CHECKSUM,
3287 krb5pac.PAC_TYPE_LOGON_NAME,
3288 krb5pac.PAC_TYPE_UPN_DNS_INFO]
3290 kdc_options = kdc_exchange_dict['kdc_options']
3291 pos = len(tuple(krb5_asn1.KDCOptions('cname-in-addl-tkt'))) - 1
3292 constrained_delegation = (pos < len(kdc_options)
3293 and kdc_options[pos] == '1')
3294 if constrained_delegation:
3295 expected_types.append(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION)
3297 require_strict = set()
3298 unchecked = set()
3299 if not self.tkt_sig_support:
3300 require_strict.add(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
3301 if not self.full_sig_support:
3302 require_strict.add(krb5pac.PAC_TYPE_FULL_CHECKSUM)
3304 expected_client_claims = kdc_exchange_dict['expected_client_claims']
3305 unexpected_client_claims = kdc_exchange_dict[
3306 'unexpected_client_claims']
3308 if self.kdc_claims_support and expect_client_claims:
3309 expected_types.append(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
3310 else:
3311 self.assertFalse(
3312 expected_client_claims,
3313 'expected client claims, but client claims not expected in '
3314 'PAC')
3315 self.assertFalse(
3316 unexpected_client_claims,
3317 'unexpected client claims, but client claims not expected in '
3318 'PAC')
3320 if expect_client_claims is None:
3321 unchecked.add(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
3323 expected_device_claims = kdc_exchange_dict['expected_device_claims']
3324 unexpected_device_claims = kdc_exchange_dict['unexpected_device_claims']
3326 if (self.kdc_claims_support and self.kdc_compound_id_support
3327 and expect_device_claims and compound_id):
3328 expected_types.append(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO)
3329 else:
3330 self.assertFalse(
3331 expect_device_claims,
3332 'expected device claims buffer, but client claims not '
3333 'expected in PAC')
3334 self.assertFalse(
3335 expected_device_claims,
3336 'expected device claims, but device claims not expected in '
3337 'PAC')
3338 self.assertFalse(
3339 unexpected_device_claims,
3340 'unexpected device claims, but device claims not expected in '
3341 'PAC')
3343 if expect_device_claims is None and compound_id:
3344 unchecked.add(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO)
3346 if self.kdc_compound_id_support and compound_id and expect_device_info:
3347 expected_types.append(krb5pac.PAC_TYPE_DEVICE_INFO)
3348 else:
3349 self.assertFalse(expect_device_info,
3350 'expected device info with no armor TGT or '
3351 'for non-TGS request')
3353 if expect_device_info is None and compound_id:
3354 unchecked.add(krb5pac.PAC_TYPE_DEVICE_INFO)
3356 if rep_msg_type == KRB_TGS_REP:
3357 if not self.is_tgs_principal(expected_sname):
3358 expected_types.append(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
3359 expected_types.append(krb5pac.PAC_TYPE_FULL_CHECKSUM)
3361 expect_extra_pac_buffers = self.is_tgs(expected_sname)
3363 expect_pac_attrs = kdc_exchange_dict['expect_pac_attrs']
3365 if expect_pac_attrs:
3366 expect_pac_attrs_pac_request = kdc_exchange_dict[
3367 'expect_pac_attrs_pac_request']
3368 else:
3369 expect_pac_attrs_pac_request = kdc_exchange_dict[
3370 'pac_request']
3372 if expect_pac_attrs is None:
3373 if self.expect_extra_pac_buffers:
3374 expect_pac_attrs = expect_extra_pac_buffers
3375 else:
3376 require_strict.add(krb5pac.PAC_TYPE_ATTRIBUTES_INFO)
3377 if expect_pac_attrs:
3378 expected_types.append(krb5pac.PAC_TYPE_ATTRIBUTES_INFO)
3380 expect_requester_sid = kdc_exchange_dict['expect_requester_sid']
3382 if expect_requester_sid is None:
3383 if self.expect_extra_pac_buffers:
3384 expect_requester_sid = expect_extra_pac_buffers
3385 else:
3386 require_strict.add(krb5pac.PAC_TYPE_REQUESTER_SID)
3387 if expect_requester_sid:
3388 expected_types.append(krb5pac.PAC_TYPE_REQUESTER_SID)
3390 buffer_types = [pac_buffer.type
3391 for pac_buffer in pac.buffers]
3392 self.assertSequenceElementsEqual(
3393 expected_types, buffer_types,
3394 require_ordered=False,
3395 require_strict=require_strict,
3396 unchecked=unchecked)
3398 expected_account_name = kdc_exchange_dict['expected_account_name']
3399 expected_sid = kdc_exchange_dict['expected_sid']
3401 expect_upn_dns_info_ex = kdc_exchange_dict['expect_upn_dns_info_ex']
3402 if expect_upn_dns_info_ex is None and (
3403 expected_account_name is not None
3404 or expected_sid is not None):
3405 expect_upn_dns_info_ex = True
3407 for pac_buffer in pac.buffers:
3408 if pac_buffer.type == krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION:
3409 expected_proxy_target = kdc_exchange_dict[
3410 'expected_proxy_target']
3411 expected_transited_services = kdc_exchange_dict[
3412 'expected_transited_services']
3414 delegation_info = pac_buffer.info.info
3416 self.assertEqual(expected_proxy_target,
3417 str(delegation_info.proxy_target))
3419 transited_services = list(map(
3420 str, delegation_info.transited_services))
3421 self.assertEqual(expected_transited_services,
3422 transited_services)
3424 elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
3425 expected_cname = kdc_exchange_dict['expected_cname']
3426 account_name = '/'.join(expected_cname['name-string'])
3428 self.assertEqual(account_name, pac_buffer.info.account_name)
3430 elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
3431 info3 = pac_buffer.info.info.info3
3432 logon_info = info3.base
3434 if expected_account_name is not None:
3435 self.assertEqual(expected_account_name,
3436 str(logon_info.account_name))
3438 self.check_logon_info_sids(pac_buffer, kdc_exchange_dict)
3440 elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
3441 upn_dns_info = pac_buffer.info
3442 upn_dns_info_ex = upn_dns_info.ex
3444 expected_realm = kdc_exchange_dict['expected_crealm']
3445 self.assertEqual(expected_realm,
3446 upn_dns_info.dns_domain_name)
3448 expected_upn_name = kdc_exchange_dict['expected_upn_name']
3449 if expected_upn_name is not None:
3450 self.assertEqual(expected_upn_name,
3451 upn_dns_info.upn_name)
3453 if expect_upn_dns_info_ex:
3454 self.assertIsNotNone(upn_dns_info_ex)
3456 if upn_dns_info_ex is not None:
3457 if expected_account_name is not None:
3458 self.assertEqual(expected_account_name,
3459 upn_dns_info_ex.samaccountname)
3461 if expected_sid is not None:
3462 self.assertEqual(expected_sid,
3463 str(upn_dns_info_ex.objectsid))
3465 elif (pac_buffer.type == krb5pac.PAC_TYPE_ATTRIBUTES_INFO
3466 and expect_pac_attrs):
3467 attr_info = pac_buffer.info
3469 self.assertEqual(2, attr_info.flags_length)
3471 flags = attr_info.flags
3473 requested_pac = bool(flags & 1)
3474 given_pac = bool(flags & 2)
3476 self.assertEqual(expect_pac_attrs_pac_request is True,
3477 requested_pac)
3478 self.assertEqual(expect_pac_attrs_pac_request is None,
3479 given_pac)
3481 elif (pac_buffer.type == krb5pac.PAC_TYPE_REQUESTER_SID
3482 and expect_requester_sid):
3483 requester_sid = pac_buffer.info.sid
3485 if expected_sid is not None:
3486 self.assertEqual(expected_sid, str(requester_sid))
3488 elif pac_buffer.type in {krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO,
3489 krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO}:
3490 remaining = pac_buffer.info.remaining
3492 if pac_buffer.type == krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO:
3493 claims_type = 'client claims'
3494 expected_claims = expected_client_claims
3495 unexpected_claims = unexpected_client_claims
3496 else:
3497 claims_type = 'device claims'
3498 expected_claims = expected_device_claims
3499 unexpected_claims = unexpected_device_claims
3501 if not remaining:
3502 # Windows may produce an empty claims buffer.
3503 self.assertFalse(expected_claims,
3504 f'expected {claims_type}, but the PAC '
3505 f'buffer was empty')
3506 continue
3508 if expected_claims:
3509 empty_msg = ', and {claims_type} were expected'
3510 else:
3511 empty_msg = ' for {claims_type} (should be missing)'
3513 client_claims = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR,
3514 remaining)
3515 client_claims = client_claims.claims.metadata
3516 self.assertIsNotNone(client_claims,
3517 f'got empty CLAIMS_SET_METADATA_NDR '
3518 f'inner structure {empty_msg}')
3520 claims_data = bytes(client_claims.claims_set)
3521 self.assertIsNotNone(claims_data,
3522 f'got empty CLAIMS_SET_METADATA '
3523 f'structure {empty_msg}')
3524 self.assertGreater(len(claims_data), 0,
3525 f'got empty encoded claims data '
3526 f'{empty_msg}')
3527 self.assertEqual(len(claims_data),
3528 client_claims.claims_set_size,
3529 f'encoded {claims_type} data size mismatch')
3531 uncompressed_size = client_claims.uncompressed_claims_set_size
3532 compression_format = client_claims.compression_format
3534 if self.strict_checking:
3535 if uncompressed_size < 384:
3536 self.assertEqual(claims.CLAIMS_COMPRESSION_FORMAT_NONE,
3537 compression_format,
3538 f'{claims_type} unexpectedly '
3539 f'compressed ({uncompressed_size} '
3540 f'bytes uncompressed)')
3541 else:
3542 self.assertEqual(
3543 claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF,
3544 compression_format,
3545 f'{claims_type} unexpectedly not compressed '
3546 f'({uncompressed_size} bytes uncompressed)')
3548 claims_data = huffman_decompress(claims_data,
3549 uncompressed_size)
3551 claims_set = ndr_unpack(claims.CLAIMS_SET_NDR,
3552 claims_data)
3553 claims_set = claims_set.claims.claims
3554 self.assertIsNotNone(claims_set,
3555 f'got empty CLAIMS_SET_NDR inner '
3556 f'structure {empty_msg}')
3558 claims_arrays = claims_set.claims_arrays
3559 self.assertIsNotNone(claims_arrays,
3560 f'got empty CLAIMS_SET structure '
3561 f'{empty_msg}')
3562 self.assertGreater(len(claims_arrays), 0,
3563 f'got empty claims array {empty_msg}')
3564 self.assertEqual(len(claims_arrays),
3565 claims_set.claims_array_count,
3566 f'{claims_type} arrays size mismatch')
3568 got_claims = {}
3570 for claims_array in claims_arrays:
3571 claim_entries = claims_array.claim_entries
3572 self.assertIsNotNone(claim_entries,
3573 f'got empty CLAIMS_ARRAY structure '
3574 f'{empty_msg}')
3575 self.assertGreater(len(claim_entries), 0,
3576 f'got empty claim entries array '
3577 f'{empty_msg}')
3578 self.assertEqual(len(claim_entries),
3579 claims_array.claims_count,
3580 f'{claims_type} entries array size '
3581 f'mismatch')
3583 for entry in claim_entries:
3584 if unexpected_claims is not None:
3585 self.assertNotIn(entry.id, unexpected_claims,
3586 f'got unexpected {claims_type} '
3587 f'in PAC')
3588 if expected_claims is None:
3589 continue
3591 expected_claim = expected_claims.get(entry.id)
3592 if expected_claim is None:
3593 continue
3595 self.assertNotIn(entry.id, got_claims,
3596 f'got duplicate {claims_type}')
3598 self.assertIsNotNone(entry.values.values,
3599 f'got {claims_type} with no '
3600 f'values')
3601 self.assertGreater(len(entry.values.values), 0,
3602 f'got empty {claims_type} values '
3603 f'array')
3604 self.assertEqual(len(entry.values.values),
3605 entry.values.value_count,
3606 f'{claims_type} values array size '
3607 f'mismatch')
3609 expected_claim_values = expected_claim.get('values')
3610 self.assertIsNotNone(expected_claim_values,
3611 f'got expected {claims_type} '
3612 f'with no values')
3614 values = type(expected_claim_values)(
3615 entry.values.values)
3617 got_claims[entry.id] = {
3618 'source_type': claims_array.claims_source_type,
3619 'type': entry.type,
3620 'values': values,
3623 self.assertEqual(expected_claims, got_claims or None,
3624 f'{claims_type} did not match expectations')
3626 elif pac_buffer.type == krb5pac.PAC_TYPE_DEVICE_INFO:
3627 device_info = pac_buffer.info.info
3629 armor_auth_data = armor_tgt.ticket_private.get(
3630 'authorization-data')
3631 self.assertIsNotNone(armor_auth_data,
3632 'missing authdata for armor TGT')
3633 armor_pac_data = self.get_pac(armor_auth_data)
3634 armor_pac = ndr_unpack(krb5pac.PAC_DATA, armor_pac_data)
3635 for armor_pac_buffer in armor_pac.buffers:
3636 if armor_pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
3637 armor_info = armor_pac_buffer.info.info.info3
3638 break
3639 else:
3640 self.fail('missing logon info for armor PAC')
3642 self.assertEqual(armor_info.base.rid, device_info.rid)
3644 self.assertEqual(armor_info.base.primary_gid,
3645 device_info.primary_gid)
3646 self.assertEqual(security.DOMAIN_RID_DOMAIN_MEMBERS,
3647 device_info.primary_gid)
3649 self.assertEqual(armor_info.base.domain_sid,
3650 device_info.domain_sid)
3652 def get_groups(groups):
3653 return [(x.rid, x.attributes) for x in groups.rids]
3655 self.assertEqual(get_groups(armor_info.base.groups),
3656 get_groups(device_info.groups))
3658 self.assertEqual(1, device_info.sid_count)
3659 self.assertEqual(
3660 security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
3661 str(device_info.sids[0].sid))
3663 claims_valid_sid, claims_valid_rid = (
3664 security.SID_CLAIMS_VALID.rsplit('-', 1))
3666 self.assertEqual(1, device_info.domain_group_count)
3667 domain_group = device_info.domain_groups[0]
3668 self.assertEqual(claims_valid_sid,
3669 str(domain_group.domain_sid))
3671 self.assertEqual(1, domain_group.groups.count)
3672 self.assertEqual(int(claims_valid_rid),
3673 domain_group.groups.rids[0].rid)
3675 def generic_check_kdc_error(self,
3676 kdc_exchange_dict,
3677 callback_dict,
3678 rep,
3679 inner=False):
3681 rep_msg_type = kdc_exchange_dict['rep_msg_type']
3683 expected_anon = kdc_exchange_dict['expected_anon']
3684 expected_srealm = kdc_exchange_dict['expected_srealm']
3685 expected_sname = kdc_exchange_dict['expected_sname']
3686 expected_error_mode = kdc_exchange_dict['expected_error_mode']
3688 sent_fast = self.sent_fast(kdc_exchange_dict)
3690 fast_armor_type = kdc_exchange_dict['fast_armor_type']
3692 self.assertElementEqual(rep, 'pvno', 5)
3693 self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
3694 error_code = self.getElementValue(rep, 'error-code')
3695 self.assertIn(error_code, expected_error_mode)
3696 if self.strict_checking:
3697 self.assertElementMissing(rep, 'ctime')
3698 self.assertElementMissing(rep, 'cusec')
3699 self.assertElementPresent(rep, 'stime')
3700 self.assertElementPresent(rep, 'susec')
3701 # error-code checked above
3702 if expected_anon and not inner:
3703 expected_cname = self.PrincipalName_create(
3704 name_type=NT_WELLKNOWN,
3705 names=['WELLKNOWN', 'ANONYMOUS'])
3706 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
3707 elif self.strict_checking:
3708 self.assertElementMissing(rep, 'cname')
3709 if self.strict_checking:
3710 self.assertElementMissing(rep, 'crealm')
3711 self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
3712 self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
3713 self.assertElementMissing(rep, 'e-text')
3714 expected_status = kdc_exchange_dict['expected_status']
3715 expect_edata = kdc_exchange_dict['expect_edata']
3716 if expect_edata is None:
3717 expect_edata = (error_code != KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
3718 and (not sent_fast or fast_armor_type is None
3719 or fast_armor_type == FX_FAST_ARMOR_AP_REQUEST)
3720 and not inner)
3721 if inner and expect_edata is self.expect_padata_outer:
3722 expect_edata = False
3723 if not expect_edata:
3724 self.assertIsNone(expected_status)
3725 if self.strict_checking:
3726 self.assertElementMissing(rep, 'e-data')
3727 return rep
3728 edata = self.getElementValue(rep, 'e-data')
3729 if self.strict_checking:
3730 self.assertIsNotNone(edata)
3731 if edata is not None:
3732 if rep_msg_type == KRB_TGS_REP and not sent_fast:
3733 error_data = self.der_decode(
3734 edata,
3735 asn1Spec=krb5_asn1.KERB_ERROR_DATA())
3736 self.assertEqual(KERB_ERR_TYPE_EXTENDED,
3737 error_data['data-type'])
3739 extended_error = error_data['data-value']
3741 self.assertEqual(12, len(extended_error))
3743 status = int.from_bytes(extended_error[:4], 'little')
3744 flags = int.from_bytes(extended_error[8:], 'little')
3746 self.assertEqual(expected_status, status)
3748 self.assertEqual(3, flags)
3749 else:
3750 self.assertIsNone(expected_status)
3752 rep_padata = self.der_decode(edata,
3753 asn1Spec=krb5_asn1.METHOD_DATA())
3754 self.assertGreater(len(rep_padata), 0)
3756 if sent_fast:
3757 self.assertEqual(1, len(rep_padata))
3758 rep_pa_dict = self.get_pa_dict(rep_padata)
3759 self.assertIn(PADATA_FX_FAST, rep_pa_dict)
3761 armor_key = kdc_exchange_dict['armor_key']
3762 self.assertIsNotNone(armor_key)
3763 fast_response = self.check_fx_fast_data(
3764 kdc_exchange_dict,
3765 rep_pa_dict[PADATA_FX_FAST],
3766 armor_key,
3767 expect_strengthen_key=False)
3769 rep_padata = fast_response['padata']
3771 etype_info2 = self.check_rep_padata(kdc_exchange_dict,
3772 callback_dict,
3773 rep_padata,
3774 error_code)
3776 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
3778 return rep
3780 def check_reply_padata(self,
3781 kdc_exchange_dict,
3782 callback_dict,
3783 encpart,
3784 rep_padata):
3785 expected_patypes = ()
3787 sent_fast = self.sent_fast(kdc_exchange_dict)
3788 rep_msg_type = kdc_exchange_dict['rep_msg_type']
3790 if sent_fast:
3791 expected_patypes += (PADATA_FX_FAST,)
3792 elif rep_msg_type == KRB_AS_REP:
3793 chosen_etype = self.getElementValue(encpart, 'etype')
3794 self.assertIsNotNone(chosen_etype)
3796 if chosen_etype in {kcrypto.Enctype.AES256,
3797 kcrypto.Enctype.AES128}:
3798 expected_patypes += (PADATA_ETYPE_INFO2,)
3800 preauth_key = kdc_exchange_dict['preauth_key']
3801 if preauth_key.etype == kcrypto.Enctype.RC4 and rep_padata is None:
3802 rep_padata = ()
3803 elif rep_msg_type == KRB_TGS_REP:
3804 if expected_patypes == () and rep_padata is None:
3805 rep_padata = ()
3807 if not self.strict_checking and rep_padata is None:
3808 rep_padata = ()
3810 self.assertIsNotNone(rep_padata)
3811 got_patypes = tuple(pa['padata-type'] for pa in rep_padata)
3812 self.assertSequenceElementsEqual(expected_patypes, got_patypes)
3814 if len(expected_patypes) == 0:
3815 return None
3817 pa_dict = self.get_pa_dict(rep_padata)
3819 etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2)
3820 if etype_info2 is not None:
3821 etype_info2 = self.der_decode(etype_info2,
3822 asn1Spec=krb5_asn1.ETYPE_INFO2())
3823 self.assertEqual(len(etype_info2), 1)
3824 elem = etype_info2[0]
3826 e = self.getElementValue(elem, 'etype')
3827 self.assertEqual(e, chosen_etype)
3828 salt = self.getElementValue(elem, 'salt')
3829 self.assertIsNotNone(salt)
3830 expected_salt = kdc_exchange_dict['expected_salt']
3831 if expected_salt is not None:
3832 self.assertEqual(salt, expected_salt)
3833 s2kparams = self.getElementValue(elem, 's2kparams')
3834 if self.strict_checking:
3835 self.assertIsNone(s2kparams)
3837 def check_rep_padata(self,
3838 kdc_exchange_dict,
3839 callback_dict,
3840 rep_padata,
3841 error_code):
3842 rep_msg_type = kdc_exchange_dict['rep_msg_type']
3844 req_body = kdc_exchange_dict['req_body']
3845 proposed_etypes = req_body['etype']
3846 client_as_etypes = kdc_exchange_dict.get('client_as_etypes', [])
3848 sent_fast = self.sent_fast(kdc_exchange_dict)
3849 sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict)
3851 if rep_msg_type == KRB_TGS_REP:
3852 self.assertTrue(sent_fast)
3854 rc4_support = kdc_exchange_dict['rc4_support']
3856 expect_etype_info2 = ()
3857 expect_etype_info = False
3858 expected_aes_type = 0
3859 expected_rc4_type = 0
3860 if kcrypto.Enctype.RC4 in proposed_etypes:
3861 expect_etype_info = True
3862 for etype in proposed_etypes:
3863 if etype not in client_as_etypes:
3864 continue
3865 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
3866 expect_etype_info = False
3867 if etype > expected_aes_type:
3868 expected_aes_type = etype
3869 if etype in (kcrypto.Enctype.RC4,) and error_code != 0:
3870 if etype > expected_rc4_type and rc4_support:
3871 expected_rc4_type = etype
3873 if expected_aes_type != 0:
3874 expect_etype_info2 += (expected_aes_type,)
3875 if expected_rc4_type != 0:
3876 expect_etype_info2 += (expected_rc4_type,)
3878 expected_patypes = ()
3879 if sent_fast and error_code != 0:
3880 expected_patypes += (PADATA_FX_ERROR,)
3881 expected_patypes += (PADATA_FX_COOKIE,)
3883 if rep_msg_type == KRB_TGS_REP:
3884 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
3885 if ('1' in sent_pac_options
3886 and error_code not in (0, KDC_ERR_GENERIC)):
3887 expected_patypes += (PADATA_PAC_OPTIONS,)
3888 elif error_code != KDC_ERR_GENERIC:
3889 if expect_etype_info:
3890 if rc4_support:
3891 self.assertGreater(len(expect_etype_info2), 0)
3892 expected_patypes += (PADATA_ETYPE_INFO,)
3893 if len(expect_etype_info2) != 0:
3894 expected_patypes += (PADATA_ETYPE_INFO2,)
3896 if error_code not in (KDC_ERR_PREAUTH_FAILED, KDC_ERR_SKEW,
3897 KDC_ERR_POLICY, KDC_ERR_CLIENT_REVOKED):
3898 if sent_fast:
3899 expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,)
3900 else:
3901 expected_patypes += (PADATA_ENC_TIMESTAMP,)
3903 if not sent_enc_challenge:
3904 expected_patypes += (PADATA_PK_AS_REQ,)
3905 expected_patypes += (PADATA_PK_AS_REP_19,)
3907 if (self.kdc_fast_support
3908 and not sent_fast
3909 and not sent_enc_challenge):
3910 expected_patypes += (PADATA_FX_FAST,)
3911 expected_patypes += (PADATA_FX_COOKIE,)
3913 require_strict = {PADATA_FX_COOKIE,
3914 PADATA_FX_FAST,
3915 PADATA_PAC_OPTIONS,
3916 PADATA_PK_AS_REP_19,
3917 PADATA_PK_AS_REQ,
3918 PADATA_PKINIT_KX,
3919 PADATA_GSS}
3920 strict_edata_checking = kdc_exchange_dict['strict_edata_checking']
3921 if not strict_edata_checking:
3922 require_strict.add(PADATA_ETYPE_INFO2)
3923 require_strict.add(PADATA_ENCRYPTED_CHALLENGE)
3925 got_patypes = tuple(pa['padata-type'] for pa in rep_padata)
3926 self.assertSequenceElementsEqual(expected_patypes, got_patypes,
3927 require_strict=require_strict)
3929 if not expected_patypes:
3930 return None
3932 pa_dict = self.get_pa_dict(rep_padata)
3934 enc_timestamp = pa_dict.get(PADATA_ENC_TIMESTAMP)
3935 if enc_timestamp is not None:
3936 self.assertEqual(len(enc_timestamp), 0)
3938 pk_as_req = pa_dict.get(PADATA_PK_AS_REQ)
3939 if pk_as_req is not None:
3940 self.assertEqual(len(pk_as_req), 0)
3942 pk_as_rep19 = pa_dict.get(PADATA_PK_AS_REP_19)
3943 if pk_as_rep19 is not None:
3944 self.assertEqual(len(pk_as_rep19), 0)
3946 fx_fast = pa_dict.get(PADATA_FX_FAST)
3947 if fx_fast is not None:
3948 self.assertEqual(len(fx_fast), 0)
3950 fast_cookie = pa_dict.get(PADATA_FX_COOKIE)
3951 if fast_cookie is not None:
3952 kdc_exchange_dict['fast_cookie'] = fast_cookie
3954 fast_error = pa_dict.get(PADATA_FX_ERROR)
3955 if fast_error is not None:
3956 fast_error = self.der_decode(fast_error,
3957 asn1Spec=krb5_asn1.KRB_ERROR())
3958 self.generic_check_kdc_error(kdc_exchange_dict,
3959 callback_dict,
3960 fast_error,
3961 inner=True)
3963 pac_options = pa_dict.get(PADATA_PAC_OPTIONS)
3964 if pac_options is not None:
3965 pac_options = self.der_decode(
3966 pac_options,
3967 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
3968 self.assertElementEqual(pac_options, 'options', sent_pac_options)
3970 enc_challenge = pa_dict.get(PADATA_ENCRYPTED_CHALLENGE)
3971 if enc_challenge is not None:
3972 if not sent_enc_challenge:
3973 self.assertEqual(len(enc_challenge), 0)
3974 else:
3975 armor_key = kdc_exchange_dict['armor_key']
3976 self.assertIsNotNone(armor_key)
3978 preauth_key, _ = self.get_preauth_key(kdc_exchange_dict)
3980 kdc_challenge_key = self.generate_kdc_challenge_key(
3981 armor_key, preauth_key)
3983 # Ensure that the encrypted challenge FAST factor is supported
3984 # (RFC6113 5.4.6).
3985 if self.strict_checking:
3986 self.assertNotEqual(len(enc_challenge), 0)
3987 if len(enc_challenge) != 0:
3988 encrypted_challenge = self.der_decode(
3989 enc_challenge,
3990 asn1Spec=krb5_asn1.EncryptedData())
3991 self.assertEqual(encrypted_challenge['etype'],
3992 kdc_challenge_key.etype)
3994 challenge = kdc_challenge_key.decrypt(
3995 KU_ENC_CHALLENGE_KDC,
3996 encrypted_challenge['cipher'])
3997 challenge = self.der_decode(
3998 challenge,
3999 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
4001 # Retrieve the returned timestamp.
4002 rep_patime = challenge['patimestamp']
4003 self.assertIn('pausec', challenge)
4005 # Ensure the returned time is within five minutes of the
4006 # current time.
4007 rep_time = self.get_EpochFromKerberosTime(rep_patime)
4008 current_time = time.time()
4010 self.assertLess(current_time - 300, rep_time)
4011 self.assertLess(rep_time, current_time + 300)
4013 etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2)
4014 if etype_info2 is not None:
4015 etype_info2 = self.der_decode(etype_info2,
4016 asn1Spec=krb5_asn1.ETYPE_INFO2())
4017 self.assertGreaterEqual(len(etype_info2), 1)
4018 if self.strict_checking:
4019 self.assertEqual(len(etype_info2), len(expect_etype_info2))
4020 for i in range(0, len(etype_info2)):
4021 e = self.getElementValue(etype_info2[i], 'etype')
4022 if self.strict_checking:
4023 self.assertEqual(e, expect_etype_info2[i])
4024 salt = self.getElementValue(etype_info2[i], 'salt')
4025 if e == kcrypto.Enctype.RC4:
4026 if self.strict_checking:
4027 self.assertIsNone(salt)
4028 else:
4029 self.assertIsNotNone(salt)
4030 expected_salt = kdc_exchange_dict['expected_salt']
4031 if expected_salt is not None:
4032 self.assertEqual(salt, expected_salt)
4033 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
4034 if self.strict_checking:
4035 self.assertIsNone(s2kparams)
4037 etype_info = pa_dict.get(PADATA_ETYPE_INFO)
4038 if etype_info is not None:
4039 etype_info = self.der_decode(etype_info,
4040 asn1Spec=krb5_asn1.ETYPE_INFO())
4041 self.assertEqual(len(etype_info), 1)
4042 e = self.getElementValue(etype_info[0], 'etype')
4043 self.assertEqual(e, kcrypto.Enctype.RC4)
4044 if rc4_support:
4045 self.assertEqual(e, expect_etype_info2[0])
4046 salt = self.getElementValue(etype_info[0], 'salt')
4047 if self.strict_checking:
4048 self.assertIsNotNone(salt)
4049 self.assertEqual(len(salt), 0)
4051 return etype_info2
4053 def generate_simple_fast(self,
4054 kdc_exchange_dict,
4055 _callback_dict,
4056 req_body,
4057 fast_padata,
4058 fast_armor,
4059 checksum,
4060 fast_options=''):
4061 armor_key = kdc_exchange_dict['armor_key']
4063 fast_req = self.KRB_FAST_REQ_create(fast_options,
4064 fast_padata,
4065 req_body)
4066 fast_req = self.der_encode(fast_req,
4067 asn1Spec=krb5_asn1.KrbFastReq())
4068 fast_req = self.EncryptedData_create(armor_key,
4069 KU_FAST_ENC,
4070 fast_req)
4072 fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
4073 checksum,
4074 fast_req)
4076 fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
4077 fx_fast_request = self.der_encode(
4078 fx_fast_request,
4079 asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
4081 fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
4082 fx_fast_request)
4084 return fast_padata
4086 def generate_ap_req(self,
4087 kdc_exchange_dict,
4088 _callback_dict,
4089 req_body,
4090 armor,
4091 usage=None,
4092 seq_number=None):
4093 req_body_checksum = None
4095 if armor:
4096 self.assertIsNone(req_body)
4098 tgt = kdc_exchange_dict['armor_tgt']
4099 authenticator_subkey = kdc_exchange_dict['armor_subkey']
4100 else:
4101 tgt = kdc_exchange_dict['tgt']
4102 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
4104 if req_body is not None:
4105 body_checksum_type = kdc_exchange_dict['body_checksum_type']
4107 req_body_blob = self.der_encode(
4108 req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY())
4110 req_body_checksum = self.Checksum_create(
4111 tgt.session_key,
4112 KU_TGS_REQ_AUTH_CKSUM,
4113 req_body_blob,
4114 ctype=body_checksum_type)
4116 auth_data = kdc_exchange_dict['auth_data']
4118 subkey_obj = None
4119 if authenticator_subkey is not None:
4120 subkey_obj = authenticator_subkey.export_obj()
4121 if seq_number is None:
4122 seq_number = random.randint(0, 0xfffffffe)
4123 (ctime, cusec) = self.get_KerberosTimeWithUsec()
4124 authenticator_obj = self.Authenticator_create(
4125 crealm=tgt.crealm,
4126 cname=tgt.cname,
4127 cksum=req_body_checksum,
4128 cusec=cusec,
4129 ctime=ctime,
4130 subkey=subkey_obj,
4131 seq_number=seq_number,
4132 authorization_data=auth_data)
4133 authenticator_blob = self.der_encode(
4134 authenticator_obj,
4135 asn1Spec=krb5_asn1.Authenticator())
4137 if usage is None:
4138 usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
4139 authenticator = self.EncryptedData_create(tgt.session_key,
4140 usage,
4141 authenticator_blob)
4143 if armor:
4144 ap_options = kdc_exchange_dict['fast_ap_options']
4145 else:
4146 ap_options = kdc_exchange_dict['ap_options']
4147 if ap_options is None:
4148 ap_options = str(krb5_asn1.APOptions('0'))
4149 ap_req_obj = self.AP_REQ_create(ap_options=ap_options,
4150 ticket=tgt.ticket,
4151 authenticator=authenticator)
4152 ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
4154 return ap_req
4156 def generate_simple_tgs_padata(self,
4157 kdc_exchange_dict,
4158 callback_dict,
4159 req_body):
4160 ap_req = self.generate_ap_req(kdc_exchange_dict,
4161 callback_dict,
4162 req_body,
4163 armor=False)
4164 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
4165 padata = [pa_tgs_req]
4167 return padata, req_body
4169 def get_preauth_key(self, kdc_exchange_dict):
4170 msg_type = kdc_exchange_dict['rep_msg_type']
4172 if msg_type == KRB_AS_REP:
4173 key = kdc_exchange_dict['preauth_key']
4174 usage = KU_AS_REP_ENC_PART
4175 else: # KRB_TGS_REP
4176 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
4177 if authenticator_subkey is not None:
4178 key = authenticator_subkey
4179 usage = KU_TGS_REP_ENC_PART_SUB_KEY
4180 else:
4181 tgt = kdc_exchange_dict['tgt']
4182 key = tgt.session_key
4183 usage = KU_TGS_REP_ENC_PART_SESSION
4185 self.assertIsNotNone(key)
4187 return key, usage
4189 def generate_armor_key(self, subkey, session_key):
4190 armor_key = kcrypto.cf2(subkey.key,
4191 session_key.key,
4192 b'subkeyarmor',
4193 b'ticketarmor')
4194 armor_key = Krb5EncryptionKey(armor_key, None)
4196 return armor_key
4198 def generate_strengthen_reply_key(self, strengthen_key, reply_key):
4199 strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
4200 reply_key.key,
4201 b'strengthenkey',
4202 b'replykey')
4203 strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
4204 reply_key.kvno)
4206 return strengthen_reply_key
4208 def generate_client_challenge_key(self, armor_key, longterm_key):
4209 client_challenge_key = kcrypto.cf2(armor_key.key,
4210 longterm_key.key,
4211 b'clientchallengearmor',
4212 b'challengelongterm')
4213 client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
4215 return client_challenge_key
4217 def generate_kdc_challenge_key(self, armor_key, longterm_key):
4218 kdc_challenge_key = kcrypto.cf2(armor_key.key,
4219 longterm_key.key,
4220 b'kdcchallengearmor',
4221 b'challengelongterm')
4222 kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
4224 return kdc_challenge_key
4226 def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
4227 expected_type = expected_checksum['cksumtype']
4228 self.assertEqual(armor_key.ctype, expected_type)
4230 ticket_blob = self.der_encode(ticket,
4231 asn1Spec=krb5_asn1.Ticket())
4232 checksum = self.Checksum_create(armor_key,
4233 KU_FAST_FINISHED,
4234 ticket_blob)
4235 self.assertEqual(expected_checksum, checksum)
4237 def verify_ticket(self, ticket, krbtgt_keys, service_ticket,
4238 expect_pac=True,
4239 expect_ticket_checksum=True,
4240 expect_full_checksum=None):
4241 # Decrypt the ticket.
4243 key = ticket.decryption_key
4244 enc_part = ticket.ticket['enc-part']
4246 self.assertElementEqual(enc_part, 'etype', key.etype)
4247 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
4249 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
4250 enc_part = self.der_decode(
4251 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
4253 # Fetch the authorization data from the ticket.
4254 auth_data = enc_part.get('authorization-data')
4255 if expect_pac:
4256 self.assertIsNotNone(auth_data)
4257 elif auth_data is None:
4258 return
4260 # Get a copy of the authdata with an empty PAC, and the existing PAC
4261 # (if present).
4262 empty_pac = self.get_empty_pac()
4263 auth_data, pac_data = self.replace_pac(auth_data,
4264 empty_pac,
4265 expect_pac=expect_pac)
4266 if not expect_pac:
4267 return
4269 # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
4270 # raw type to create a new PAC with zeroed signatures for
4271 # verification. This is because on Windows, the resource_groups field
4272 # is added to PAC_LOGON_INFO after the info3 field has been created,
4273 # which results in a different ordering of pointer values than Samba
4274 # (see commit 0e201ecdc53). Using the raw type avoids changing
4275 # PAC_LOGON_INFO, so verification against Windows can work. We still
4276 # need the PAC_DATA type to retrieve the actual checksums, because the
4277 # signatures in the raw type may contain padding bytes.
4278 pac = ndr_unpack(krb5pac.PAC_DATA,
4279 pac_data)
4280 raw_pac = ndr_unpack(krb5pac.PAC_DATA_RAW,
4281 pac_data)
4283 checksums = {}
4285 full_checksum_buffer = None
4287 for pac_buffer, raw_pac_buffer in zip(pac.buffers, raw_pac.buffers):
4288 buffer_type = pac_buffer.type
4289 if buffer_type in self.pac_checksum_types:
4290 self.assertNotIn(buffer_type, checksums,
4291 f'Duplicate checksum type {buffer_type}')
4293 # Fetch the checksum and the checksum type from the PAC buffer.
4294 checksum = pac_buffer.info.signature
4295 ctype = pac_buffer.info.type
4296 if ctype & 1 << 31:
4297 ctype |= -1 << 31
4299 checksums[buffer_type] = checksum, ctype
4301 if buffer_type == krb5pac.PAC_TYPE_FULL_CHECKSUM:
4302 full_checksum_buffer = raw_pac_buffer
4303 elif buffer_type != krb5pac.PAC_TYPE_TICKET_CHECKSUM:
4304 # Zero the checksum field so that we can later verify the
4305 # checksums. The ticket checksum field is not zeroed.
4307 signature = ndr_unpack(
4308 krb5pac.PAC_SIGNATURE_DATA,
4309 raw_pac_buffer.info.remaining)
4310 signature.signature = bytes(len(checksum))
4311 raw_pac_buffer.info.remaining = ndr_pack(
4312 signature)
4314 # Re-encode the PAC.
4315 pac_data = ndr_pack(raw_pac)
4317 if full_checksum_buffer is not None:
4318 signature = ndr_unpack(
4319 krb5pac.PAC_SIGNATURE_DATA,
4320 full_checksum_buffer.info.remaining)
4321 signature.signature = bytes(len(checksum))
4322 full_checksum_buffer.info.remaining = ndr_pack(
4323 signature)
4325 # Re-encode the PAC.
4326 full_pac_data = ndr_pack(raw_pac)
4328 # Verify the signatures.
4330 server_checksum, server_ctype = checksums[
4331 krb5pac.PAC_TYPE_SRV_CHECKSUM]
4332 key.verify_checksum(KU_NON_KERB_CKSUM_SALT,
4333 pac_data,
4334 server_ctype,
4335 server_checksum)
4337 kdc_checksum, kdc_ctype = checksums[
4338 krb5pac.PAC_TYPE_KDC_CHECKSUM]
4340 if isinstance(krbtgt_keys, collections.abc.Container):
4341 if self.strict_checking:
4342 krbtgt_key = krbtgt_keys[0]
4343 else:
4344 krbtgt_key = next(key for key in krbtgt_keys
4345 if key.ctype == kdc_ctype)
4346 else:
4347 krbtgt_key = krbtgt_keys
4349 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
4350 server_checksum,
4351 kdc_ctype,
4352 kdc_checksum)
4354 if not service_ticket:
4355 self.assertNotIn(krb5pac.PAC_TYPE_TICKET_CHECKSUM, checksums)
4356 self.assertNotIn(krb5pac.PAC_TYPE_FULL_CHECKSUM, checksums)
4357 else:
4358 ticket_checksum, ticket_ctype = checksums.get(
4359 krb5pac.PAC_TYPE_TICKET_CHECKSUM,
4360 (None, None))
4361 if expect_ticket_checksum:
4362 self.assertIsNotNone(ticket_checksum)
4363 elif expect_ticket_checksum is False:
4364 self.assertIsNone(ticket_checksum)
4365 if ticket_checksum is not None:
4366 enc_part['authorization-data'] = auth_data
4367 enc_part = self.der_encode(enc_part,
4368 asn1Spec=krb5_asn1.EncTicketPart())
4370 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
4371 enc_part,
4372 ticket_ctype,
4373 ticket_checksum)
4375 full_checksum, full_ctype = checksums.get(
4376 krb5pac.PAC_TYPE_FULL_CHECKSUM,
4377 (None, None))
4378 if expect_full_checksum:
4379 self.assertIsNotNone(full_checksum)
4380 elif expect_full_checksum is False:
4381 self.assertIsNone(full_checksum)
4382 if full_checksum is not None:
4383 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
4384 full_pac_data,
4385 full_ctype,
4386 full_checksum)
4388 def modified_ticket(self,
4389 ticket, *,
4390 new_ticket_key=None,
4391 modify_fn=None,
4392 modify_pac_fn=None,
4393 exclude_pac=False,
4394 allow_empty_authdata=False,
4395 update_pac_checksums=True,
4396 checksum_keys=None,
4397 include_checksums=None):
4398 if checksum_keys is None:
4399 # A dict containing a key for each checksum type to be created in
4400 # the PAC.
4401 checksum_keys = {}
4403 if include_checksums is None:
4404 # A dict containing a value for each checksum type; True if the
4405 # checksum type is to be included in the PAC, False if it is to be
4406 # excluded, or None/not present if the checksum is to be included
4407 # based on its presence in the original PAC.
4408 include_checksums = {}
4410 # Check that the values passed in by the caller make sense.
4412 self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types)
4413 self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types)
4415 if exclude_pac:
4416 self.assertIsNone(modify_pac_fn)
4418 update_pac_checksums = False
4420 if not update_pac_checksums:
4421 self.assertFalse(checksum_keys)
4422 self.assertFalse(include_checksums)
4424 expect_pac = modify_pac_fn is not None
4426 key = ticket.decryption_key
4428 if new_ticket_key is None:
4429 # Use the same key to re-encrypt the ticket.
4430 new_ticket_key = key
4432 if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys:
4433 # If the server signature key is not present, fall back to the key
4434 # used to encrypt the ticket.
4435 checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key
4437 if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys:
4438 # If the ticket signature key is not present, fall back to the key
4439 # used for the KDC signature.
4440 kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
4441 if kdc_checksum_key is not None:
4442 checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = (
4443 kdc_checksum_key)
4445 if krb5pac.PAC_TYPE_FULL_CHECKSUM not in checksum_keys:
4446 # If the full signature key is not present, fall back to the key
4447 # used for the KDC signature.
4448 kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
4449 if kdc_checksum_key is not None:
4450 checksum_keys[krb5pac.PAC_TYPE_FULL_CHECKSUM] = (
4451 kdc_checksum_key)
4453 # Decrypt the ticket.
4455 enc_part = ticket.ticket['enc-part']
4457 self.assertElementEqual(enc_part, 'etype', key.etype)
4458 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
4460 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
4461 enc_part = self.der_decode(
4462 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
4464 # Modify the ticket here.
4465 if modify_fn is not None:
4466 enc_part = modify_fn(enc_part)
4468 auth_data = enc_part.get('authorization-data')
4469 if expect_pac:
4470 self.assertIsNotNone(auth_data)
4471 if auth_data is not None:
4472 new_pac = None
4473 if not exclude_pac:
4474 # Get a copy of the authdata with an empty PAC, and the
4475 # existing PAC (if present).
4476 empty_pac = self.get_empty_pac()
4477 empty_pac_auth_data, pac_data = self.replace_pac(
4478 auth_data,
4479 empty_pac,
4480 expect_pac=expect_pac)
4482 if pac_data is not None:
4483 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
4485 # Modify the PAC here.
4486 if modify_pac_fn is not None:
4487 pac = modify_pac_fn(pac)
4489 if update_pac_checksums:
4490 # Get the enc-part with an empty PAC, which is needed
4491 # to create a ticket signature.
4492 enc_part_to_sign = enc_part.copy()
4493 enc_part_to_sign['authorization-data'] = (
4494 empty_pac_auth_data)
4495 enc_part_to_sign = self.der_encode(
4496 enc_part_to_sign,
4497 asn1Spec=krb5_asn1.EncTicketPart())
4499 self.update_pac_checksums(pac,
4500 checksum_keys,
4501 include_checksums,
4502 enc_part_to_sign)
4504 # Re-encode the PAC.
4505 pac_data = ndr_pack(pac)
4506 new_pac = self.AuthorizationData_create(AD_WIN2K_PAC,
4507 pac_data)
4509 # Replace the PAC in the authorization data and re-add it to the
4510 # ticket enc-part.
4511 auth_data, _ = self.replace_pac(
4512 auth_data, new_pac,
4513 expect_pac=expect_pac,
4514 allow_empty_authdata=allow_empty_authdata)
4515 enc_part['authorization-data'] = auth_data
4517 # Re-encrypt the ticket enc-part with the new key.
4518 enc_part_new = self.der_encode(enc_part,
4519 asn1Spec=krb5_asn1.EncTicketPart())
4520 enc_part_new = self.EncryptedData_create(new_ticket_key,
4521 KU_TICKET,
4522 enc_part_new)
4524 # Create a copy of the ticket with the new enc-part.
4525 new_ticket = ticket.ticket.copy()
4526 new_ticket['enc-part'] = enc_part_new
4528 new_ticket_creds = KerberosTicketCreds(
4529 new_ticket,
4530 session_key=ticket.session_key,
4531 crealm=ticket.crealm,
4532 cname=ticket.cname,
4533 srealm=ticket.srealm,
4534 sname=ticket.sname,
4535 decryption_key=new_ticket_key,
4536 ticket_private=enc_part,
4537 encpart_private=ticket.encpart_private)
4539 return new_ticket_creds
4541 def update_pac_checksums(self,
4542 pac,
4543 checksum_keys,
4544 include_checksums,
4545 enc_part=None):
4546 pac_buffers = pac.buffers
4547 checksum_buffers = {}
4549 # Find the relevant PAC checksum buffers.
4550 for pac_buffer in pac_buffers:
4551 buffer_type = pac_buffer.type
4552 if buffer_type in self.pac_checksum_types:
4553 self.assertNotIn(buffer_type, checksum_buffers,
4554 f'Duplicate checksum type {buffer_type}')
4556 checksum_buffers[buffer_type] = pac_buffer
4558 # Create any additional buffers that were requested but not
4559 # present. Conversely, remove any buffers that were requested to be
4560 # removed.
4561 for buffer_type in self.pac_checksum_types:
4562 if buffer_type in checksum_buffers:
4563 if include_checksums.get(buffer_type) is False:
4564 checksum_buffer = checksum_buffers.pop(buffer_type)
4566 pac.num_buffers -= 1
4567 pac_buffers.remove(checksum_buffer)
4569 elif include_checksums.get(buffer_type) is True:
4570 info = krb5pac.PAC_SIGNATURE_DATA()
4572 checksum_buffer = krb5pac.PAC_BUFFER()
4573 checksum_buffer.type = buffer_type
4574 checksum_buffer.info = info
4576 pac_buffers.append(checksum_buffer)
4577 pac.num_buffers += 1
4579 checksum_buffers[buffer_type] = checksum_buffer
4581 # Fill the relevant checksum buffers.
4582 for buffer_type, checksum_buffer in checksum_buffers.items():
4583 checksum_key = checksum_keys[buffer_type]
4584 ctype = checksum_key.ctype & ((1 << 32) - 1)
4586 if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
4587 self.assertIsNotNone(enc_part)
4589 signature = checksum_key.make_rodc_checksum(
4590 KU_NON_KERB_CKSUM_SALT,
4591 enc_part)
4593 elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM:
4594 signature = checksum_key.make_zeroed_checksum()
4596 else:
4597 signature = checksum_key.make_rodc_zeroed_checksum()
4599 checksum_buffer.info.signature = signature
4600 checksum_buffer.info.type = ctype
4602 # Add the new checksum buffers to the PAC.
4603 pac.buffers = pac_buffers
4605 # Calculate the full checksum and insert it into the PAC.
4606 full_checksum_buffer = checksum_buffers.get(
4607 krb5pac.PAC_TYPE_FULL_CHECKSUM)
4608 if full_checksum_buffer is not None:
4609 full_checksum_key = checksum_keys[krb5pac.PAC_TYPE_FULL_CHECKSUM]
4611 pac_data = ndr_pack(pac)
4612 full_checksum = full_checksum_key.make_checksum(
4613 KU_NON_KERB_CKSUM_SALT,
4614 pac_data)
4616 full_checksum_buffer.info.signature = full_checksum
4618 # Calculate the server and KDC checksums and insert them into the PAC.
4620 server_checksum_buffer = checksum_buffers.get(
4621 krb5pac.PAC_TYPE_SRV_CHECKSUM)
4622 if server_checksum_buffer is not None:
4623 server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM]
4625 pac_data = ndr_pack(pac)
4626 server_checksum = server_checksum_key.make_checksum(
4627 KU_NON_KERB_CKSUM_SALT,
4628 pac_data)
4630 server_checksum_buffer.info.signature = server_checksum
4632 kdc_checksum_buffer = checksum_buffers.get(
4633 krb5pac.PAC_TYPE_KDC_CHECKSUM)
4634 if kdc_checksum_buffer is not None:
4635 if server_checksum_buffer is None:
4636 # There's no server signature to make the checksum over, so
4637 # just make the checksum over an empty bytes object.
4638 server_checksum = bytes()
4640 kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM]
4642 kdc_checksum = kdc_checksum_key.make_rodc_checksum(
4643 KU_NON_KERB_CKSUM_SALT,
4644 server_checksum)
4646 kdc_checksum_buffer.info.signature = kdc_checksum
4648 def replace_pac(self, auth_data, new_pac, expect_pac=True,
4649 allow_empty_authdata=False):
4650 if new_pac is not None:
4651 self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)
4652 self.assertElementPresent(new_pac, 'ad-data')
4654 new_auth_data = []
4656 ad_relevant = None
4657 old_pac = None
4659 for authdata_elem in auth_data:
4660 if authdata_elem['ad-type'] == AD_IF_RELEVANT:
4661 ad_relevant = self.der_decode(
4662 authdata_elem['ad-data'],
4663 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
4665 relevant_elems = []
4666 for relevant_elem in ad_relevant:
4667 if relevant_elem['ad-type'] == AD_WIN2K_PAC:
4668 self.assertIsNone(old_pac, 'Multiple PACs detected')
4669 old_pac = relevant_elem['ad-data']
4671 if new_pac is not None:
4672 relevant_elems.append(new_pac)
4673 else:
4674 relevant_elems.append(relevant_elem)
4675 if expect_pac:
4676 self.assertIsNotNone(old_pac, 'Expected PAC')
4678 if relevant_elems or allow_empty_authdata:
4679 ad_relevant = self.der_encode(
4680 relevant_elems,
4681 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
4683 authdata_elem = self.AuthorizationData_create(
4684 AD_IF_RELEVANT,
4685 ad_relevant)
4686 else:
4687 authdata_elem = None
4689 if authdata_elem is not None or allow_empty_authdata:
4690 new_auth_data.append(authdata_elem)
4692 if expect_pac:
4693 self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT')
4695 return new_auth_data, old_pac
4697 def get_pac(self, auth_data, expect_pac=True):
4698 _, pac = self.replace_pac(auth_data, None, expect_pac)
4699 return pac
4701 def get_ticket_pac(self, ticket, expect_pac=True):
4702 auth_data = ticket.ticket_private.get('authorization-data')
4703 if expect_pac:
4704 self.assertIsNotNone(auth_data)
4705 elif auth_data is None:
4706 return None
4708 return self.get_pac(auth_data, expect_pac=expect_pac)
4710 def get_krbtgt_checksum_key(self):
4711 krbtgt_creds = self.get_krbtgt_creds()
4712 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
4714 return {
4715 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
4718 def is_tgs_principal(self, principal):
4719 if self.is_tgs(principal):
4720 return True
4722 if self.kadmin_is_tgs and self.is_kadmin(principal):
4723 return True
4725 return False
4727 def is_kadmin(self, principal):
4728 name = principal['name-string'][0]
4729 return name in ('kadmin', b'kadmin')
4731 def is_tgs(self, principal):
4732 name = principal['name-string'][0]
4733 return name in ('krbtgt', b'krbtgt')
4735 def is_tgt(self, ticket):
4736 sname = ticket.ticket['sname']
4737 return self.is_tgs(sname)
4739 def get_empty_pac(self):
4740 return self.AuthorizationData_create(AD_WIN2K_PAC, bytes(1))
4742 def get_outer_pa_dict(self, kdc_exchange_dict):
4743 return self.get_pa_dict(kdc_exchange_dict['req_padata'])
4745 def get_fast_pa_dict(self, kdc_exchange_dict):
4746 req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
4748 if req_pa_dict:
4749 return req_pa_dict
4751 return self.get_outer_pa_dict(kdc_exchange_dict)
4753 def sent_fast(self, kdc_exchange_dict):
4754 outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
4756 return PADATA_FX_FAST in outer_pa_dict
4758 def sent_enc_challenge(self, kdc_exchange_dict):
4759 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
4761 return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
4763 def sent_enc_pa_rep(self, kdc_exchange_dict):
4764 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
4766 return PADATA_REQ_ENC_PA_REP in fast_pa_dict
4768 def get_sent_pac_options(self, kdc_exchange_dict):
4769 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
4771 if PADATA_PAC_OPTIONS not in fast_pa_dict:
4772 return ''
4774 pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
4775 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
4776 pac_options = pac_options['options']
4778 # Mask out unsupported bits.
4779 pac_options, remaining = pac_options[:4], pac_options[4:]
4780 pac_options += '0' * len(remaining)
4782 return pac_options
4784 def get_krbtgt_sname(self):
4785 krbtgt_creds = self.get_krbtgt_creds()
4786 krbtgt_username = krbtgt_creds.get_username()
4787 krbtgt_realm = krbtgt_creds.get_realm()
4788 krbtgt_sname = self.PrincipalName_create(
4789 name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm])
4791 return krbtgt_sname
4793 def _test_as_exchange(self,
4794 cname,
4795 realm,
4796 sname,
4797 till,
4798 client_as_etypes,
4799 expected_error_mode,
4800 expected_crealm,
4801 expected_cname,
4802 expected_srealm,
4803 expected_sname,
4804 expected_salt,
4805 etypes,
4806 padata,
4807 kdc_options,
4808 renew_time=None,
4809 expected_account_name=None,
4810 expected_groups=None,
4811 unexpected_groups=None,
4812 expected_upn_name=None,
4813 expected_sid=None,
4814 expected_domain_sid=None,
4815 expected_flags=None,
4816 unexpected_flags=None,
4817 expected_supported_etypes=None,
4818 preauth_key=None,
4819 ticket_decryption_key=None,
4820 pac_request=None,
4821 pac_options=None,
4822 expect_pac=True,
4823 expect_pac_attrs=None,
4824 expect_pac_attrs_pac_request=None,
4825 expect_requester_sid=None,
4826 expect_client_claims=None,
4827 expect_device_claims=None,
4828 expected_client_claims=None,
4829 unexpected_client_claims=None,
4830 expected_device_claims=None,
4831 unexpected_device_claims=None,
4832 expect_edata=None,
4833 rc4_support=True,
4834 to_rodc=False):
4836 def _generate_padata_copy(_kdc_exchange_dict,
4837 _callback_dict,
4838 req_body):
4839 return padata, req_body
4841 if not expected_error_mode:
4842 check_error_fn = None
4843 check_rep_fn = self.generic_check_kdc_rep
4844 else:
4845 check_error_fn = self.generic_check_kdc_error
4846 check_rep_fn = None
4848 if padata is not None:
4849 generate_padata_fn = _generate_padata_copy
4850 else:
4851 generate_padata_fn = None
4853 kdc_exchange_dict = self.as_exchange_dict(
4854 expected_crealm=expected_crealm,
4855 expected_cname=expected_cname,
4856 expected_srealm=expected_srealm,
4857 expected_sname=expected_sname,
4858 expected_account_name=expected_account_name,
4859 expected_groups=expected_groups,
4860 unexpected_groups=unexpected_groups,
4861 expected_upn_name=expected_upn_name,
4862 expected_sid=expected_sid,
4863 expected_domain_sid=expected_domain_sid,
4864 expected_supported_etypes=expected_supported_etypes,
4865 ticket_decryption_key=ticket_decryption_key,
4866 generate_padata_fn=generate_padata_fn,
4867 check_error_fn=check_error_fn,
4868 check_rep_fn=check_rep_fn,
4869 check_kdc_private_fn=self.generic_check_kdc_private,
4870 expected_error_mode=expected_error_mode,
4871 client_as_etypes=client_as_etypes,
4872 expected_salt=expected_salt,
4873 expected_flags=expected_flags,
4874 unexpected_flags=unexpected_flags,
4875 preauth_key=preauth_key,
4876 kdc_options=str(kdc_options),
4877 pac_request=pac_request,
4878 pac_options=pac_options,
4879 expect_pac=expect_pac,
4880 expect_pac_attrs=expect_pac_attrs,
4881 expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
4882 expect_requester_sid=expect_requester_sid,
4883 expect_client_claims=expect_client_claims,
4884 expect_device_claims=expect_device_claims,
4885 expected_client_claims=expected_client_claims,
4886 unexpected_client_claims=unexpected_client_claims,
4887 expected_device_claims=expected_device_claims,
4888 unexpected_device_claims=unexpected_device_claims,
4889 expect_edata=expect_edata,
4890 rc4_support=rc4_support,
4891 to_rodc=to_rodc)
4893 rep = self._generic_kdc_exchange(kdc_exchange_dict,
4894 cname=cname,
4895 realm=realm,
4896 sname=sname,
4897 till_time=till,
4898 renew_time=renew_time,
4899 etypes=etypes)
4901 return rep, kdc_exchange_dict