tests/krb5: Allow excluding the PAC server checksum
[Samba.git] / python / samba / tests / krb5 / raw_testcase.py
blob4c1aedbca0f186b663bc010553515f29a663c1f7
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import socket
21 import struct
22 import time
23 import datetime
24 import random
25 import binascii
26 import itertools
27 import collections
29 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
30 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
31 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
32 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
34 from pyasn1.codec.ber.encoder import BitStringEncoder
36 from samba.credentials import Credentials
37 from samba.dcerpc import krb5pac, security
38 from samba.gensec import FEATURE_SEAL
39 from samba.ndr import ndr_pack, ndr_unpack
41 import samba.tests
42 from samba.tests import TestCaseInTempDir
44 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
45 from samba.tests.krb5.rfc4120_constants import (
46 AD_IF_RELEVANT,
47 AD_WIN2K_PAC,
48 FX_FAST_ARMOR_AP_REQUEST,
49 KDC_ERR_GENERIC,
50 KDC_ERR_PREAUTH_FAILED,
51 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
52 KRB_AP_REQ,
53 KRB_AS_REP,
54 KRB_AS_REQ,
55 KRB_ERROR,
56 KRB_TGS_REP,
57 KRB_TGS_REQ,
58 KU_AP_REQ_AUTH,
59 KU_AS_REP_ENC_PART,
60 KU_ENC_CHALLENGE_KDC,
61 KU_FAST_ENC,
62 KU_FAST_FINISHED,
63 KU_FAST_REP,
64 KU_FAST_REQ_CHKSUM,
65 KU_NON_KERB_CKSUM_SALT,
66 KU_TGS_REP_ENC_PART_SESSION,
67 KU_TGS_REP_ENC_PART_SUB_KEY,
68 KU_TGS_REQ_AUTH,
69 KU_TGS_REQ_AUTH_CKSUM,
70 KU_TGS_REQ_AUTH_DAT_SESSION,
71 KU_TGS_REQ_AUTH_DAT_SUBKEY,
72 KU_TICKET,
73 NT_SRV_INST,
74 NT_WELLKNOWN,
75 PADATA_ENCRYPTED_CHALLENGE,
76 PADATA_ENC_TIMESTAMP,
77 PADATA_ETYPE_INFO,
78 PADATA_ETYPE_INFO2,
79 PADATA_FOR_USER,
80 PADATA_FX_COOKIE,
81 PADATA_FX_ERROR,
82 PADATA_FX_FAST,
83 PADATA_KDC_REQ,
84 PADATA_PAC_OPTIONS,
85 PADATA_PAC_REQUEST,
86 PADATA_PK_AS_REQ,
87 PADATA_PK_AS_REP_19,
88 PADATA_PW_SALT,
89 PADATA_SUPPORTED_ETYPES
91 import samba.tests.krb5.kcrypto as kcrypto
94 def BitStringEncoder_encodeValue32(
95 self, value, asn1Spec, encodeFun, **options):
97 # BitStrings like KDCOptions or TicketFlags should at least
98 # be 32-Bit on the wire
100 if asn1Spec is not None:
101 # TODO: try to avoid ASN.1 schema instantiation
102 value = asn1Spec.clone(value)
104 valueLength = len(value)
105 if valueLength % 8:
106 alignedValue = value << (8 - valueLength % 8)
107 else:
108 alignedValue = value
110 substrate = alignedValue.asOctets()
111 length = len(substrate)
112 # We need at least 32-Bit / 4-Bytes
113 if length < 4:
114 padding = 4 - length
115 else:
116 padding = 0
117 ret = b'\x00' + substrate + (b'\x00' * padding)
118 return ret, False, True
121 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
124 def BitString_NamedValues_prettyPrint(self, scope=0):
125 ret = "%s" % self.asBinary()
126 bits = []
127 highest_bit = 32
128 for byte in self.asNumbers():
129 for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
130 mask = 1 << bit
131 if byte & mask:
132 val = 1
133 else:
134 val = 0
135 bits.append(val)
136 if len(bits) < highest_bit:
137 for bitPosition in range(len(bits), highest_bit):
138 bits.append(0)
139 indent = " " * scope
140 delim = ": (\n%s " % indent
141 for bitPosition in range(highest_bit):
142 if bitPosition in self.prettyPrintNamedValues:
143 name = self.prettyPrintNamedValues[bitPosition]
144 elif bits[bitPosition] != 0:
145 name = "unknown-bit-%u" % bitPosition
146 else:
147 continue
148 ret += "%s%s:%u" % (delim, name, bits[bitPosition])
149 delim = ",\n%s " % indent
150 ret += "\n%s)" % indent
151 return ret
154 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
155 krb5_asn1.TicketFlagsValues.namedValues
156 krb5_asn1.TicketFlags.namedValues =\
157 krb5_asn1.TicketFlagsValues.namedValues
158 krb5_asn1.TicketFlags.prettyPrint =\
159 BitString_NamedValues_prettyPrint
160 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
161 krb5_asn1.KDCOptionsValues.namedValues
162 krb5_asn1.KDCOptions.namedValues =\
163 krb5_asn1.KDCOptionsValues.namedValues
164 krb5_asn1.KDCOptions.prettyPrint =\
165 BitString_NamedValues_prettyPrint
166 krb5_asn1.APOptions.prettyPrintNamedValues =\
167 krb5_asn1.APOptionsValues.namedValues
168 krb5_asn1.APOptions.namedValues =\
169 krb5_asn1.APOptionsValues.namedValues
170 krb5_asn1.APOptions.prettyPrint =\
171 BitString_NamedValues_prettyPrint
172 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
173 krb5_asn1.PACOptionFlagsValues.namedValues
174 krb5_asn1.PACOptionFlags.namedValues =\
175 krb5_asn1.PACOptionFlagsValues.namedValues
176 krb5_asn1.PACOptionFlags.prettyPrint =\
177 BitString_NamedValues_prettyPrint
180 def Integer_NamedValues_prettyPrint(self, scope=0):
181 intval = int(self)
182 if intval in self.prettyPrintNamedValues:
183 name = self.prettyPrintNamedValues[intval]
184 else:
185 name = "<__unknown__>"
186 ret = "%d (0x%x) %s" % (intval, intval, name)
187 return ret
190 krb5_asn1.NameType.prettyPrintNamedValues =\
191 krb5_asn1.NameTypeValues.namedValues
192 krb5_asn1.NameType.prettyPrint =\
193 Integer_NamedValues_prettyPrint
194 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
195 krb5_asn1.AuthDataTypeValues.namedValues
196 krb5_asn1.AuthDataType.prettyPrint =\
197 Integer_NamedValues_prettyPrint
198 krb5_asn1.PADataType.prettyPrintNamedValues =\
199 krb5_asn1.PADataTypeValues.namedValues
200 krb5_asn1.PADataType.prettyPrint =\
201 Integer_NamedValues_prettyPrint
202 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
203 krb5_asn1.EncryptionTypeValues.namedValues
204 krb5_asn1.EncryptionType.prettyPrint =\
205 Integer_NamedValues_prettyPrint
206 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
207 krb5_asn1.ChecksumTypeValues.namedValues
208 krb5_asn1.ChecksumType.prettyPrint =\
209 Integer_NamedValues_prettyPrint
210 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
211 krb5_asn1.KerbErrorDataTypeValues.namedValues
212 krb5_asn1.KerbErrorDataType.prettyPrint =\
213 Integer_NamedValues_prettyPrint
216 class Krb5EncryptionKey:
217 def __init__(self, key, kvno):
218 EncTypeChecksum = {
219 kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
220 kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
221 kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
223 self.key = key
224 self.etype = key.enctype
225 self.ctype = EncTypeChecksum[self.etype]
226 self.kvno = kvno
228 def encrypt(self, usage, plaintext):
229 ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
230 return ciphertext
232 def decrypt(self, usage, ciphertext):
233 plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
234 return plaintext
236 def make_zeroed_checksum(self, ctype=None):
237 if ctype is None:
238 ctype = self.ctype
240 checksum_len = kcrypto.checksum_len(ctype)
241 return bytes(checksum_len)
243 def make_checksum(self, usage, plaintext, ctype=None):
244 if ctype is None:
245 ctype = self.ctype
246 cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
247 return cksum
249 def verify_checksum(self, usage, plaintext, ctype, cksum):
250 if self.ctype != ctype:
251 raise AssertionError(f'{self.ctype} != {ctype}')
253 kcrypto.verify_checksum(ctype,
254 self.key,
255 usage,
256 plaintext,
257 cksum)
259 def export_obj(self):
260 EncryptionKey_obj = {
261 'keytype': self.etype,
262 'keyvalue': self.key.contents,
264 return EncryptionKey_obj
267 class RodcPacEncryptionKey(Krb5EncryptionKey):
268 def __init__(self, key, kvno, rodc_id=None):
269 super().__init__(key, kvno)
271 if rodc_id is None:
272 kvno = self.kvno
273 if kvno is not None:
274 kvno >>= 16
275 kvno &= (1 << 16) - 1
277 rodc_id = kvno or None
279 if rodc_id is not None:
280 self.rodc_id = rodc_id.to_bytes(2, byteorder='little')
281 else:
282 self.rodc_id = b''
284 def make_rodc_zeroed_checksum(self, ctype=None):
285 checksum = super().make_zeroed_checksum(ctype)
286 return checksum + bytes(len(self.rodc_id))
288 def make_rodc_checksum(self, usage, plaintext, ctype=None):
289 checksum = super().make_checksum(usage, plaintext, ctype)
290 return checksum + self.rodc_id
292 def verify_rodc_checksum(self, usage, plaintext, ctype, cksum):
293 if self.rodc_id:
294 cksum, cksum_rodc_id = cksum[:-2], cksum[-2:]
296 if self.rodc_id != cksum_rodc_id:
297 raise AssertionError(f'{self.rodc_id.hex()} != '
298 f'{cksum_rodc_id.hex()}')
300 super().verify_checksum(usage,
301 plaintext,
302 ctype,
303 cksum)
306 class ZeroedChecksumKey(RodcPacEncryptionKey):
307 def make_checksum(self, usage, plaintext, ctype=None):
308 return self.make_zeroed_checksum(ctype)
310 def make_rodc_checksum(self, usage, plaintext, ctype=None):
311 return self.make_rodc_zeroed_checksum(ctype)
314 class WrongLengthChecksumKey(RodcPacEncryptionKey):
315 def __init__(self, key, kvno, length):
316 super().__init__(key, kvno)
318 self._length = length
320 @classmethod
321 def _adjust_to_length(cls, checksum, length):
322 diff = length - len(checksum)
323 if diff > 0:
324 checksum += bytes(diff)
325 elif diff < 0:
326 checksum = checksum[:length]
328 return checksum
330 def make_zeroed_checksum(self, ctype=None):
331 return bytes(self._length)
333 def make_checksum(self, usage, plaintext, ctype=None):
334 checksum = super().make_checksum(usage, plaintext, ctype)
335 return self._adjust_to_length(checksum, self._length)
337 def make_rodc_zeroed_checksum(self, ctype=None):
338 return bytes(self._length)
340 def make_rodc_checksum(self, usage, plaintext, ctype=None):
341 checksum = super().make_rodc_checksum(usage, plaintext, ctype)
342 return self._adjust_to_length(checksum, self._length)
345 class KerberosCredentials(Credentials):
347 fast_supported_bits = (security.KERB_ENCTYPE_FAST_SUPPORTED |
348 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
349 security.KERB_ENCTYPE_CLAIMS_SUPPORTED)
351 def __init__(self):
352 super(KerberosCredentials, self).__init__()
353 all_enc_types = 0
354 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
355 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
356 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
358 self.as_supported_enctypes = all_enc_types
359 self.tgs_supported_enctypes = all_enc_types
360 self.ap_supported_enctypes = all_enc_types
362 self.kvno = None
363 self.forced_keys = {}
365 self.forced_salt = None
367 self.dn = None
369 def set_as_supported_enctypes(self, value):
370 self.as_supported_enctypes = int(value)
372 def set_tgs_supported_enctypes(self, value):
373 self.tgs_supported_enctypes = int(value)
375 def set_ap_supported_enctypes(self, value):
376 self.ap_supported_enctypes = int(value)
378 etype_map = collections.OrderedDict([
379 (kcrypto.Enctype.AES256,
380 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96),
381 (kcrypto.Enctype.AES128,
382 security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96),
383 (kcrypto.Enctype.RC4,
384 security.KERB_ENCTYPE_RC4_HMAC_MD5),
385 (kcrypto.Enctype.DES_MD5,
386 security.KERB_ENCTYPE_DES_CBC_MD5),
387 (kcrypto.Enctype.DES_CRC,
388 security.KERB_ENCTYPE_DES_CBC_CRC)
391 @classmethod
392 def etypes_to_bits(cls, etypes):
393 bits = 0
394 for etype in etypes:
395 bit = cls.etype_map[etype]
396 if bits & bit:
397 raise ValueError(f'Got duplicate etype: {etype}')
398 bits |= bit
400 return bits
402 @classmethod
403 def bits_to_etypes(cls, bits):
404 etypes = ()
405 for etype, bit in cls.etype_map.items():
406 if bit & bits:
407 bits &= ~bit
408 etypes += (etype,)
410 bits &= ~cls.fast_supported_bits
411 if bits != 0:
412 raise ValueError(f'Unsupported etype bits: {bits}')
414 return etypes
416 def get_as_krb5_etypes(self):
417 return self.bits_to_etypes(self.as_supported_enctypes)
419 def get_tgs_krb5_etypes(self):
420 return self.bits_to_etypes(self.tgs_supported_enctypes)
422 def get_ap_krb5_etypes(self):
423 return self.bits_to_etypes(self.ap_supported_enctypes)
425 def set_kvno(self, kvno):
426 # Sign-extend from 32 bits.
427 if kvno & 1 << 31:
428 kvno |= -1 << 31
429 self.kvno = kvno
431 def get_kvno(self):
432 return self.kvno
434 def set_forced_key(self, etype, hexkey):
435 etype = int(etype)
436 contents = binascii.a2b_hex(hexkey)
437 key = kcrypto.Key(etype, contents)
438 self.forced_keys[etype] = RodcPacEncryptionKey(key, self.kvno)
440 def get_forced_key(self, etype):
441 etype = int(etype)
442 return self.forced_keys.get(etype)
444 def set_forced_salt(self, salt):
445 self.forced_salt = bytes(salt)
447 def get_forced_salt(self):
448 return self.forced_salt
450 def get_salt(self):
451 if self.forced_salt is not None:
452 return self.forced_salt
454 if self.get_workstation():
455 salt_string = '%shost%s.%s' % (
456 self.get_realm().upper(),
457 self.get_username().lower().rsplit('$', 1)[0],
458 self.get_realm().lower())
459 else:
460 salt_string = self.get_realm().upper() + self.get_username()
462 return salt_string.encode('utf-8')
464 def set_dn(self, dn):
465 self.dn = dn
467 def get_dn(self):
468 return self.dn
471 class KerberosTicketCreds:
472 def __init__(self, ticket, session_key,
473 crealm=None, cname=None,
474 srealm=None, sname=None,
475 decryption_key=None,
476 ticket_private=None,
477 encpart_private=None):
478 self.ticket = ticket
479 self.session_key = session_key
480 self.crealm = crealm
481 self.cname = cname
482 self.srealm = srealm
483 self.sname = sname
484 self.decryption_key = decryption_key
485 self.ticket_private = ticket_private
486 self.encpart_private = encpart_private
489 class RawKerberosTest(TestCaseInTempDir):
490 """A raw Kerberos Test case."""
492 pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
493 krb5pac.PAC_TYPE_KDC_CHECKSUM,
494 krb5pac.PAC_TYPE_TICKET_CHECKSUM}
496 etypes_to_test = (
497 {"value": -1111, "name": "dummy", },
498 {"value": kcrypto.Enctype.AES256, "name": "aes128", },
499 {"value": kcrypto.Enctype.AES128, "name": "aes256", },
500 {"value": kcrypto.Enctype.RC4, "name": "rc4", },
503 setup_etype_test_permutations_done = False
505 @classmethod
506 def setup_etype_test_permutations(cls):
507 if cls.setup_etype_test_permutations_done:
508 return
510 res = []
512 num_idxs = len(cls.etypes_to_test)
513 permutations = []
514 for num in range(1, num_idxs + 1):
515 chunk = list(itertools.permutations(range(num_idxs), num))
516 for e in chunk:
517 el = list(e)
518 permutations.append(el)
520 for p in permutations:
521 name = None
522 etypes = ()
523 for idx in p:
524 n = cls.etypes_to_test[idx]["name"]
525 if name is None:
526 name = n
527 else:
528 name += "_%s" % n
529 etypes += (cls.etypes_to_test[idx]["value"],)
531 r = {"name": name, "etypes": etypes, }
532 res.append(r)
534 cls.etype_test_permutations = res
535 cls.setup_etype_test_permutations_done = True
537 @classmethod
538 def etype_test_permutation_name_idx(cls):
539 cls.setup_etype_test_permutations()
540 res = []
541 idx = 0
542 for e in cls.etype_test_permutations:
543 r = (e['name'], idx)
544 idx += 1
545 res.append(r)
546 return res
548 def etype_test_permutation_by_idx(self, idx):
549 e = self.etype_test_permutations[idx]
550 return (e['name'], e['etypes'])
552 @classmethod
553 def setUpClass(cls):
554 super().setUpClass()
556 cls.host = samba.tests.env_get_var_value('SERVER')
557 cls.dc_host = samba.tests.env_get_var_value('DC_SERVER')
559 # A dictionary containing credentials that have already been
560 # obtained.
561 cls.creds_dict = {}
563 cls.kdc_fast_support = False
565 def setUp(self):
566 super().setUp()
567 self.do_asn1_print = False
568 self.do_hexdump = False
570 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
571 allow_missing=True)
572 if strict_checking is None:
573 strict_checking = '1'
574 self.strict_checking = bool(int(strict_checking))
576 self.s = None
578 self.unspecified_kvno = object()
580 def tearDown(self):
581 self._disconnect("tearDown")
582 super().tearDown()
584 def _disconnect(self, reason):
585 if self.s is None:
586 return
587 self.s.close()
588 self.s = None
589 if self.do_hexdump:
590 sys.stderr.write("disconnect[%s]\n" % reason)
592 def _connect_tcp(self, host):
593 tcp_port = 88
594 try:
595 self.a = socket.getaddrinfo(host, tcp_port, socket.AF_UNSPEC,
596 socket.SOCK_STREAM, socket.SOL_TCP,
598 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
599 self.s.settimeout(10)
600 self.s.connect(self.a[0][4])
601 except socket.error:
602 self.s.close()
603 raise
604 except IOError:
605 self.s.close()
606 raise
608 def connect(self, host):
609 self.assertNotConnected()
610 self._connect_tcp(host)
611 if self.do_hexdump:
612 sys.stderr.write("connected[%s]\n" % host)
614 def env_get_var(self, varname, prefix,
615 fallback_default=True,
616 allow_missing=False):
617 val = None
618 if prefix is not None:
619 allow_missing_prefix = allow_missing or fallback_default
620 val = samba.tests.env_get_var_value(
621 '%s_%s' % (prefix, varname),
622 allow_missing=allow_missing_prefix)
623 else:
624 fallback_default = True
625 if val is None and fallback_default:
626 val = samba.tests.env_get_var_value(varname,
627 allow_missing=allow_missing)
628 return val
630 def _get_krb5_creds_from_env(self, prefix,
631 default_username=None,
632 allow_missing_password=False,
633 allow_missing_keys=True,
634 require_strongest_key=False):
635 c = KerberosCredentials()
636 c.guess()
638 domain = self.env_get_var('DOMAIN', prefix)
639 realm = self.env_get_var('REALM', prefix)
640 allow_missing_username = default_username is not None
641 username = self.env_get_var('USERNAME', prefix,
642 fallback_default=False,
643 allow_missing=allow_missing_username)
644 if username is None:
645 username = default_username
646 password = self.env_get_var('PASSWORD', prefix,
647 fallback_default=False,
648 allow_missing=allow_missing_password)
649 c.set_domain(domain)
650 c.set_realm(realm)
651 c.set_username(username)
652 if password is not None:
653 c.set_password(password)
654 as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
655 prefix, allow_missing=True)
656 if as_supported_enctypes is not None:
657 c.set_as_supported_enctypes(as_supported_enctypes)
658 tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
659 prefix, allow_missing=True)
660 if tgs_supported_enctypes is not None:
661 c.set_tgs_supported_enctypes(tgs_supported_enctypes)
662 ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
663 prefix, allow_missing=True)
664 if ap_supported_enctypes is not None:
665 c.set_ap_supported_enctypes(ap_supported_enctypes)
667 if require_strongest_key:
668 kvno_allow_missing = False
669 if password is None:
670 aes256_allow_missing = False
671 else:
672 aes256_allow_missing = True
673 else:
674 kvno_allow_missing = allow_missing_keys
675 aes256_allow_missing = allow_missing_keys
676 kvno = self.env_get_var('KVNO', prefix,
677 fallback_default=False,
678 allow_missing=kvno_allow_missing)
679 if kvno is not None:
680 c.set_kvno(kvno)
681 aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
682 fallback_default=False,
683 allow_missing=aes256_allow_missing)
684 if aes256_key is not None:
685 c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
686 aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
687 fallback_default=False,
688 allow_missing=True)
689 if aes128_key is not None:
690 c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
691 rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
692 fallback_default=False, allow_missing=True)
693 if rc4_key is not None:
694 c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
696 if not allow_missing_keys:
697 self.assertTrue(c.forced_keys,
698 'Please supply %s encryption keys '
699 'in environment' % prefix)
701 return c
703 def _get_krb5_creds(self,
704 prefix,
705 default_username=None,
706 allow_missing_password=False,
707 allow_missing_keys=True,
708 require_strongest_key=False,
709 fallback_creds_fn=None):
710 if prefix in self.creds_dict:
711 return self.creds_dict[prefix]
713 # We don't have the credentials already
714 creds = None
715 env_err = None
716 try:
717 # Try to obtain them from the environment
718 creds = self._get_krb5_creds_from_env(
719 prefix,
720 default_username=default_username,
721 allow_missing_password=allow_missing_password,
722 allow_missing_keys=allow_missing_keys,
723 require_strongest_key=require_strongest_key)
724 except Exception as err:
725 # An error occurred, so save it for later
726 env_err = err
727 else:
728 self.assertIsNotNone(creds)
729 # Save the obtained credentials
730 self.creds_dict[prefix] = creds
731 return creds
733 if fallback_creds_fn is not None:
734 try:
735 # Try to use the fallback method
736 creds = fallback_creds_fn()
737 except Exception as err:
738 print("ERROR FROM ENV: %r" % (env_err))
739 print("FALLBACK-FN: %s" % (fallback_creds_fn))
740 print("FALLBACK-ERROR: %r" % (err))
741 else:
742 self.assertIsNotNone(creds)
743 # Save the obtained credentials
744 self.creds_dict[prefix] = creds
745 return creds
747 # Both methods failed, so raise the exception from the
748 # environment method
749 raise env_err
751 def get_user_creds(self,
752 allow_missing_password=False,
753 allow_missing_keys=True):
754 c = self._get_krb5_creds(prefix=None,
755 allow_missing_password=allow_missing_password,
756 allow_missing_keys=allow_missing_keys)
757 return c
759 def get_service_creds(self,
760 allow_missing_password=False,
761 allow_missing_keys=True):
762 c = self._get_krb5_creds(prefix='SERVICE',
763 allow_missing_password=allow_missing_password,
764 allow_missing_keys=allow_missing_keys)
765 return c
767 def get_client_creds(self,
768 allow_missing_password=False,
769 allow_missing_keys=True):
770 c = self._get_krb5_creds(prefix='CLIENT',
771 allow_missing_password=allow_missing_password,
772 allow_missing_keys=allow_missing_keys)
773 return c
775 def get_server_creds(self,
776 allow_missing_password=False,
777 allow_missing_keys=True):
778 c = self._get_krb5_creds(prefix='SERVER',
779 allow_missing_password=allow_missing_password,
780 allow_missing_keys=allow_missing_keys)
781 return c
783 def get_admin_creds(self,
784 allow_missing_password=False,
785 allow_missing_keys=True):
786 c = self._get_krb5_creds(prefix='ADMIN',
787 allow_missing_password=allow_missing_password,
788 allow_missing_keys=allow_missing_keys)
789 c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
790 return c
792 def get_rodc_krbtgt_creds(self,
793 require_keys=True,
794 require_strongest_key=False):
795 if require_strongest_key:
796 self.assertTrue(require_keys)
797 c = self._get_krb5_creds(prefix='RODC_KRBTGT',
798 allow_missing_password=True,
799 allow_missing_keys=not require_keys,
800 require_strongest_key=require_strongest_key)
801 return c
803 def get_krbtgt_creds(self,
804 require_keys=True,
805 require_strongest_key=False):
806 if require_strongest_key:
807 self.assertTrue(require_keys)
808 c = self._get_krb5_creds(prefix='KRBTGT',
809 default_username='krbtgt',
810 allow_missing_password=True,
811 allow_missing_keys=not require_keys,
812 require_strongest_key=require_strongest_key)
813 return c
815 def get_anon_creds(self):
816 c = Credentials()
817 c.set_anonymous()
818 return c
820 def asn1_dump(self, name, obj, asn1_print=None):
821 if asn1_print is None:
822 asn1_print = self.do_asn1_print
823 if asn1_print:
824 if name is not None:
825 sys.stderr.write("%s:\n%s" % (name, obj))
826 else:
827 sys.stderr.write("%s" % (obj))
829 def hex_dump(self, name, blob, hexdump=None):
830 if hexdump is None:
831 hexdump = self.do_hexdump
832 if hexdump:
833 sys.stderr.write(
834 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
836 def der_decode(
837 self,
838 blob,
839 asn1Spec=None,
840 native_encode=True,
841 asn1_print=None,
842 hexdump=None):
843 if asn1Spec is not None:
844 class_name = type(asn1Spec).__name__.split(':')[0]
845 else:
846 class_name = "<None-asn1Spec>"
847 self.hex_dump(class_name, blob, hexdump=hexdump)
848 obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
849 self.asn1_dump(None, obj, asn1_print=asn1_print)
850 if native_encode:
851 obj = pyasn1_native_encode(obj)
852 return obj
854 def der_encode(
855 self,
856 obj,
857 asn1Spec=None,
858 native_decode=True,
859 asn1_print=None,
860 hexdump=None):
861 if native_decode:
862 obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
863 class_name = type(obj).__name__.split(':')[0]
864 if class_name is not None:
865 self.asn1_dump(None, obj, asn1_print=asn1_print)
866 blob = pyasn1_der_encode(obj)
867 if class_name is not None:
868 self.hex_dump(class_name, blob, hexdump=hexdump)
869 return blob
871 def send_pdu(self, req, asn1_print=None, hexdump=None):
872 try:
873 k5_pdu = self.der_encode(
874 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
875 header = struct.pack('>I', len(k5_pdu))
876 req_pdu = header
877 req_pdu += k5_pdu
878 self.hex_dump("send_pdu", header, hexdump=hexdump)
879 self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
880 while True:
881 sent = self.s.send(req_pdu, 0)
882 if sent == len(req_pdu):
883 break
884 req_pdu = req_pdu[sent:]
885 except socket.error as e:
886 self._disconnect("send_pdu: %s" % e)
887 raise
888 except IOError as e:
889 self._disconnect("send_pdu: %s" % e)
890 raise
892 def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
893 rep_pdu = None
894 try:
895 if timeout is not None:
896 self.s.settimeout(timeout)
897 rep_pdu = self.s.recv(num_recv, 0)
898 self.s.settimeout(10)
899 if len(rep_pdu) == 0:
900 self._disconnect("recv_raw: EOF")
901 return None
902 self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
903 except socket.timeout:
904 self.s.settimeout(10)
905 sys.stderr.write("recv_raw: TIMEOUT\n")
906 except socket.error as e:
907 self._disconnect("recv_raw: %s" % e)
908 raise
909 except IOError as e:
910 self._disconnect("recv_raw: %s" % e)
911 raise
912 return rep_pdu
914 def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
915 rep_pdu = None
916 rep = None
917 raw_pdu = self.recv_raw(
918 num_recv=4, hexdump=hexdump, timeout=timeout)
919 if raw_pdu is None:
920 return (None, None)
921 header = struct.unpack(">I", raw_pdu[0:4])
922 k5_len = header[0]
923 if k5_len == 0:
924 return (None, "")
925 missing = k5_len
926 rep_pdu = b''
927 while missing > 0:
928 raw_pdu = self.recv_raw(
929 num_recv=missing, hexdump=hexdump, timeout=timeout)
930 self.assertGreaterEqual(len(raw_pdu), 1)
931 rep_pdu += raw_pdu
932 missing = k5_len - len(rep_pdu)
933 k5_raw = self.der_decode(
934 rep_pdu,
935 asn1Spec=None,
936 native_encode=False,
937 asn1_print=False,
938 hexdump=False)
939 pvno = k5_raw['field-0']
940 self.assertEqual(pvno, 5)
941 msg_type = k5_raw['field-1']
942 self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
943 if msg_type == KRB_AS_REP:
944 asn1Spec = krb5_asn1.AS_REP()
945 elif msg_type == KRB_TGS_REP:
946 asn1Spec = krb5_asn1.TGS_REP()
947 elif msg_type == KRB_ERROR:
948 asn1Spec = krb5_asn1.KRB_ERROR()
949 rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
950 asn1_print=asn1_print, hexdump=False)
951 return (rep, rep_pdu)
953 def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
954 (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
955 hexdump=hexdump,
956 timeout=timeout)
957 return rep
959 def assertIsConnected(self):
960 self.assertIsNotNone(self.s, msg="Not connected")
962 def assertNotConnected(self):
963 self.assertIsNone(self.s, msg="Is connected")
965 def send_recv_transaction(
966 self,
967 req,
968 asn1_print=None,
969 hexdump=None,
970 timeout=None,
971 to_rodc=False):
972 host = self.host if to_rodc else self.dc_host
973 self.connect(host)
974 try:
975 self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
976 rep = self.recv_pdu(
977 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
978 except Exception:
979 self._disconnect("transaction failed")
980 raise
981 self._disconnect("transaction done")
982 return rep
984 def assertNoValue(self, value):
985 self.assertTrue(value.isNoValue)
987 def assertHasValue(self, value):
988 self.assertIsNotNone(value)
990 def getElementValue(self, obj, elem):
991 return obj.get(elem)
993 def assertElementMissing(self, obj, elem):
994 v = self.getElementValue(obj, elem)
995 self.assertIsNone(v)
997 def assertElementPresent(self, obj, elem, expect_empty=False):
998 v = self.getElementValue(obj, elem)
999 self.assertIsNotNone(v)
1000 if self.strict_checking:
1001 if isinstance(v, collections.abc.Container):
1002 if expect_empty:
1003 self.assertEqual(0, len(v))
1004 else:
1005 self.assertNotEqual(0, len(v))
1007 def assertElementEqual(self, obj, elem, value):
1008 v = self.getElementValue(obj, elem)
1009 self.assertIsNotNone(v)
1010 self.assertEqual(v, value)
1012 def assertElementEqualUTF8(self, obj, elem, value):
1013 v = self.getElementValue(obj, elem)
1014 self.assertIsNotNone(v)
1015 self.assertEqual(v, bytes(value, 'utf8'))
1017 def assertPrincipalEqual(self, princ1, princ2):
1018 self.assertEqual(princ1['name-type'], princ2['name-type'])
1019 self.assertEqual(
1020 len(princ1['name-string']),
1021 len(princ2['name-string']),
1022 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1023 for idx in range(len(princ1['name-string'])):
1024 self.assertEqual(
1025 princ1['name-string'][idx],
1026 princ2['name-string'][idx],
1027 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1029 def assertElementEqualPrincipal(self, obj, elem, value):
1030 v = self.getElementValue(obj, elem)
1031 self.assertIsNotNone(v)
1032 v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
1033 self.assertPrincipalEqual(v, value)
1035 def assertElementKVNO(self, obj, elem, value):
1036 v = self.getElementValue(obj, elem)
1037 if value == "autodetect":
1038 value = v
1039 if value is not None:
1040 self.assertIsNotNone(v)
1041 # The value on the wire should never be 0
1042 self.assertNotEqual(v, 0)
1043 # unspecified_kvno means we don't know the kvno,
1044 # but want to enforce its presence
1045 if value is not self.unspecified_kvno:
1046 value = int(value)
1047 self.assertNotEqual(value, 0)
1048 self.assertEqual(v, value)
1049 else:
1050 self.assertIsNone(v)
1052 def assertElementFlags(self, obj, elem, expected, unexpected):
1053 v = self.getElementValue(obj, elem)
1054 self.assertIsNotNone(v)
1055 if expected is not None:
1056 self.assertIsInstance(expected, krb5_asn1.KDCOptions)
1057 for i, flag in enumerate(expected):
1058 if flag == 1:
1059 self.assertEqual('1', v[i],
1060 f"'{expected.namedValues[i]}' "
1061 f"expected in {v}")
1062 if unexpected is not None:
1063 self.assertIsInstance(unexpected, krb5_asn1.KDCOptions)
1064 for i, flag in enumerate(unexpected):
1065 if flag == 1:
1066 self.assertEqual('0', v[i],
1067 f"'{unexpected.namedValues[i]}' "
1068 f"unexpected in {v}")
1070 def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
1071 if epoch is None:
1072 epoch = time.time()
1073 if offset is not None:
1074 epoch = epoch + int(offset)
1075 dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
1076 return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
1078 def get_KerberosTime(self, epoch=None, offset=None):
1079 (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
1080 return s
1082 def get_EpochFromKerberosTime(self, kerberos_time):
1083 if isinstance(kerberos_time, bytes):
1084 kerberos_time = kerberos_time.decode()
1086 epoch = datetime.datetime.strptime(kerberos_time,
1087 '%Y%m%d%H%M%SZ')
1088 epoch = epoch.replace(tzinfo=datetime.timezone.utc)
1089 epoch = int(epoch.timestamp())
1091 return epoch
1093 def get_Nonce(self):
1094 nonce_min = 0x7f000000
1095 nonce_max = 0x7fffffff
1096 v = random.randint(nonce_min, nonce_max)
1097 return v
1099 def get_pa_dict(self, pa_data):
1100 pa_dict = {}
1102 if pa_data is not None:
1103 for pa in pa_data:
1104 pa_type = pa['padata-type']
1105 if pa_type in pa_dict:
1106 raise RuntimeError(f'Duplicate type {pa_type}')
1107 pa_dict[pa_type] = pa['padata-value']
1109 return pa_dict
1111 def SessionKey_create(self, etype, contents, kvno=None):
1112 key = kcrypto.Key(etype, contents)
1113 return RodcPacEncryptionKey(key, kvno)
1115 def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None):
1116 self.assertIsNotNone(pwd)
1117 self.assertIsNotNone(salt)
1118 key = kcrypto.string_to_key(etype, pwd, salt)
1119 return RodcPacEncryptionKey(key, kvno)
1121 def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
1122 e = etype_info2['etype']
1124 salt = etype_info2.get('salt')
1126 if e == kcrypto.Enctype.RC4:
1127 nthash = creds.get_nt_hash()
1128 return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
1130 password = creds.get_password()
1131 return self.PasswordKey_create(
1132 etype=e, pwd=password, salt=salt, kvno=kvno)
1134 def TicketDecryptionKey_from_creds(self, creds, etype=None):
1136 if etype is None:
1137 etypes = creds.get_tgs_krb5_etypes()
1138 if etypes:
1139 etype = etypes[0]
1140 else:
1141 etype = kcrypto.Enctype.RC4
1143 forced_key = creds.get_forced_key(etype)
1144 if forced_key is not None:
1145 return forced_key
1147 kvno = creds.get_kvno()
1149 fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
1150 "nor a password specified, " % (
1151 creds.get_username(), etype, kvno))
1153 if etype == kcrypto.Enctype.RC4:
1154 nthash = creds.get_nt_hash()
1155 self.assertIsNotNone(nthash, msg=fail_msg)
1156 return self.SessionKey_create(etype=etype,
1157 contents=nthash,
1158 kvno=kvno)
1160 password = creds.get_password()
1161 self.assertIsNotNone(password, msg=fail_msg)
1162 salt = creds.get_salt()
1163 return self.PasswordKey_create(etype=etype,
1164 pwd=password,
1165 salt=salt,
1166 kvno=kvno)
1168 def RandomKey(self, etype):
1169 e = kcrypto._get_enctype_profile(etype)
1170 contents = samba.generate_random_bytes(e.keysize)
1171 return self.SessionKey_create(etype=etype, contents=contents)
1173 def EncryptionKey_import(self, EncryptionKey_obj):
1174 return self.SessionKey_create(EncryptionKey_obj['keytype'],
1175 EncryptionKey_obj['keyvalue'])
1177 def EncryptedData_create(self, key, usage, plaintext):
1178 # EncryptedData ::= SEQUENCE {
1179 # etype [0] Int32 -- EncryptionType --,
1180 # kvno [1] Int32 OPTIONAL,
1181 # cipher [2] OCTET STRING -- ciphertext
1183 ciphertext = key.encrypt(usage, plaintext)
1184 EncryptedData_obj = {
1185 'etype': key.etype,
1186 'cipher': ciphertext
1188 if key.kvno is not None:
1189 EncryptedData_obj['kvno'] = key.kvno
1190 return EncryptedData_obj
1192 def Checksum_create(self, key, usage, plaintext, ctype=None):
1193 # Checksum ::= SEQUENCE {
1194 # cksumtype [0] Int32,
1195 # checksum [1] OCTET STRING
1197 if ctype is None:
1198 ctype = key.ctype
1199 checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1200 Checksum_obj = {
1201 'cksumtype': ctype,
1202 'checksum': checksum,
1204 return Checksum_obj
1206 @classmethod
1207 def PrincipalName_create(cls, name_type, names):
1208 # PrincipalName ::= SEQUENCE {
1209 # name-type [0] Int32,
1210 # name-string [1] SEQUENCE OF KerberosString
1212 PrincipalName_obj = {
1213 'name-type': name_type,
1214 'name-string': names,
1216 return PrincipalName_obj
1218 def AuthorizationData_create(self, ad_type, ad_data):
1219 # AuthorizationData ::= SEQUENCE {
1220 # ad-type [0] Int32,
1221 # ad-data [1] OCTET STRING
1223 AUTH_DATA_obj = {
1224 'ad-type': ad_type,
1225 'ad-data': ad_data
1227 return AUTH_DATA_obj
1229 def PA_DATA_create(self, padata_type, padata_value):
1230 # PA-DATA ::= SEQUENCE {
1231 # -- NOTE: first tag is [1], not [0]
1232 # padata-type [1] Int32,
1233 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1235 PA_DATA_obj = {
1236 'padata-type': padata_type,
1237 'padata-value': padata_value,
1239 return PA_DATA_obj
1241 def PA_ENC_TS_ENC_create(self, ts, usec):
1242 # PA-ENC-TS-ENC ::= SEQUENCE {
1243 # patimestamp[0] KerberosTime, -- client's time
1244 # pausec[1] krb5int32 OPTIONAL
1246 PA_ENC_TS_ENC_obj = {
1247 'patimestamp': ts,
1248 'pausec': usec,
1250 return PA_ENC_TS_ENC_obj
1252 def PA_PAC_OPTIONS_create(self, options):
1253 # PA-PAC-OPTIONS ::= SEQUENCE {
1254 # options [0] PACOptionFlags
1256 PA_PAC_OPTIONS_obj = {
1257 'options': options
1259 return PA_PAC_OPTIONS_obj
1261 def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1262 # KrbFastArmor ::= SEQUENCE {
1263 # armor-type [0] Int32,
1264 # armor-value [1] OCTET STRING,
1265 # ...
1267 KRB_FAST_ARMOR_obj = {
1268 'armor-type': armor_type,
1269 'armor-value': armor_value
1271 return KRB_FAST_ARMOR_obj
1273 def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1274 # KrbFastReq ::= SEQUENCE {
1275 # fast-options [0] FastOptions,
1276 # padata [1] SEQUENCE OF PA-DATA,
1277 # req-body [2] KDC-REQ-BODY,
1278 # ...
1280 KRB_FAST_REQ_obj = {
1281 'fast-options': fast_options,
1282 'padata': padata,
1283 'req-body': req_body
1285 return KRB_FAST_REQ_obj
1287 def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1288 # KrbFastArmoredReq ::= SEQUENCE {
1289 # armor [0] KrbFastArmor OPTIONAL,
1290 # req-checksum [1] Checksum,
1291 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1293 KRB_FAST_ARMORED_REQ_obj = {
1294 'req-checksum': req_checksum,
1295 'enc-fast-req': enc_fast_req
1297 if armor is not None:
1298 KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1299 return KRB_FAST_ARMORED_REQ_obj
1301 def PA_FX_FAST_REQUEST_create(self, armored_data):
1302 # PA-FX-FAST-REQUEST ::= CHOICE {
1303 # armored-data [0] KrbFastArmoredReq,
1304 # ...
1306 PA_FX_FAST_REQUEST_obj = {
1307 'armored-data': armored_data
1309 return PA_FX_FAST_REQUEST_obj
1311 def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1312 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1313 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1314 # -- include PAC.
1315 # --If FALSE, and PAC present,
1316 # -- remove PAC.
1318 KERB_PA_PAC_REQUEST_obj = {
1319 'include-pac': include_pac,
1321 if not pa_data_create:
1322 return KERB_PA_PAC_REQUEST_obj
1323 pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1324 asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1325 pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1326 return pa_data
1328 def get_pa_pac_options(self, options):
1329 pac_options = self.PA_PAC_OPTIONS_create(options)
1330 pac_options = self.der_encode(pac_options,
1331 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
1332 pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options)
1334 return pac_options
1336 def KDC_REQ_BODY_create(self,
1337 kdc_options,
1338 cname,
1339 realm,
1340 sname,
1341 from_time,
1342 till_time,
1343 renew_time,
1344 nonce,
1345 etypes,
1346 addresses,
1347 additional_tickets,
1348 EncAuthorizationData,
1349 EncAuthorizationData_key,
1350 EncAuthorizationData_usage,
1351 asn1_print=None,
1352 hexdump=None):
1353 # KDC-REQ-BODY ::= SEQUENCE {
1354 # kdc-options [0] KDCOptions,
1355 # cname [1] PrincipalName OPTIONAL
1356 # -- Used only in AS-REQ --,
1357 # realm [2] Realm
1358 # -- Server's realm
1359 # -- Also client's in AS-REQ --,
1360 # sname [3] PrincipalName OPTIONAL,
1361 # from [4] KerberosTime OPTIONAL,
1362 # till [5] KerberosTime,
1363 # rtime [6] KerberosTime OPTIONAL,
1364 # nonce [7] UInt32,
1365 # etype [8] SEQUENCE OF Int32
1366 # -- EncryptionType
1367 # -- in preference order --,
1368 # addresses [9] HostAddresses OPTIONAL,
1369 # enc-authorization-data [10] EncryptedData OPTIONAL
1370 # -- AuthorizationData --,
1371 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1372 # -- NOTE: not empty
1374 if EncAuthorizationData is not None:
1375 enc_ad_plain = self.der_encode(
1376 EncAuthorizationData,
1377 asn1Spec=krb5_asn1.AuthorizationData(),
1378 asn1_print=asn1_print,
1379 hexdump=hexdump)
1380 enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1381 EncAuthorizationData_usage,
1382 enc_ad_plain)
1383 else:
1384 enc_ad = None
1385 KDC_REQ_BODY_obj = {
1386 'kdc-options': kdc_options,
1387 'realm': realm,
1388 'till': till_time,
1389 'nonce': nonce,
1390 'etype': etypes,
1392 if cname is not None:
1393 KDC_REQ_BODY_obj['cname'] = cname
1394 if sname is not None:
1395 KDC_REQ_BODY_obj['sname'] = sname
1396 if from_time is not None:
1397 KDC_REQ_BODY_obj['from'] = from_time
1398 if renew_time is not None:
1399 KDC_REQ_BODY_obj['rtime'] = renew_time
1400 if addresses is not None:
1401 KDC_REQ_BODY_obj['addresses'] = addresses
1402 if enc_ad is not None:
1403 KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1404 if additional_tickets is not None:
1405 KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1406 return KDC_REQ_BODY_obj
1408 def KDC_REQ_create(self,
1409 msg_type,
1410 padata,
1411 req_body,
1412 asn1Spec=None,
1413 asn1_print=None,
1414 hexdump=None):
1415 # KDC-REQ ::= SEQUENCE {
1416 # -- NOTE: first tag is [1], not [0]
1417 # pvno [1] INTEGER (5) ,
1418 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1419 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1420 # -- NOTE: not empty --,
1421 # req-body [4] KDC-REQ-BODY
1424 KDC_REQ_obj = {
1425 'pvno': 5,
1426 'msg-type': msg_type,
1427 'req-body': req_body,
1429 if padata is not None:
1430 KDC_REQ_obj['padata'] = padata
1431 if asn1Spec is not None:
1432 KDC_REQ_decoded = pyasn1_native_decode(
1433 KDC_REQ_obj, asn1Spec=asn1Spec)
1434 else:
1435 KDC_REQ_decoded = None
1436 return KDC_REQ_obj, KDC_REQ_decoded
1438 def AS_REQ_create(self,
1439 padata, # optional
1440 kdc_options, # required
1441 cname, # optional
1442 realm, # required
1443 sname, # optional
1444 from_time, # optional
1445 till_time, # required
1446 renew_time, # optional
1447 nonce, # required
1448 etypes, # required
1449 addresses, # optional
1450 additional_tickets,
1451 native_decoded_only=True,
1452 asn1_print=None,
1453 hexdump=None):
1454 # KDC-REQ ::= SEQUENCE {
1455 # -- NOTE: first tag is [1], not [0]
1456 # pvno [1] INTEGER (5) ,
1457 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1458 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1459 # -- NOTE: not empty --,
1460 # req-body [4] KDC-REQ-BODY
1463 # KDC-REQ-BODY ::= SEQUENCE {
1464 # kdc-options [0] KDCOptions,
1465 # cname [1] PrincipalName OPTIONAL
1466 # -- Used only in AS-REQ --,
1467 # realm [2] Realm
1468 # -- Server's realm
1469 # -- Also client's in AS-REQ --,
1470 # sname [3] PrincipalName OPTIONAL,
1471 # from [4] KerberosTime OPTIONAL,
1472 # till [5] KerberosTime,
1473 # rtime [6] KerberosTime OPTIONAL,
1474 # nonce [7] UInt32,
1475 # etype [8] SEQUENCE OF Int32
1476 # -- EncryptionType
1477 # -- in preference order --,
1478 # addresses [9] HostAddresses OPTIONAL,
1479 # enc-authorization-data [10] EncryptedData OPTIONAL
1480 # -- AuthorizationData --,
1481 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1482 # -- NOTE: not empty
1484 KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1485 kdc_options,
1486 cname,
1487 realm,
1488 sname,
1489 from_time,
1490 till_time,
1491 renew_time,
1492 nonce,
1493 etypes,
1494 addresses,
1495 additional_tickets,
1496 EncAuthorizationData=None,
1497 EncAuthorizationData_key=None,
1498 EncAuthorizationData_usage=None,
1499 asn1_print=asn1_print,
1500 hexdump=hexdump)
1501 obj, decoded = self.KDC_REQ_create(
1502 msg_type=KRB_AS_REQ,
1503 padata=padata,
1504 req_body=KDC_REQ_BODY_obj,
1505 asn1Spec=krb5_asn1.AS_REQ(),
1506 asn1_print=asn1_print,
1507 hexdump=hexdump)
1508 if native_decoded_only:
1509 return decoded
1510 return decoded, obj
1512 def AP_REQ_create(self, ap_options, ticket, authenticator):
1513 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1514 # pvno [0] INTEGER (5),
1515 # msg-type [1] INTEGER (14),
1516 # ap-options [2] APOptions,
1517 # ticket [3] Ticket,
1518 # authenticator [4] EncryptedData -- Authenticator
1520 AP_REQ_obj = {
1521 'pvno': 5,
1522 'msg-type': KRB_AP_REQ,
1523 'ap-options': ap_options,
1524 'ticket': ticket,
1525 'authenticator': authenticator,
1527 return AP_REQ_obj
1529 def Authenticator_create(
1530 self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1531 authorization_data):
1532 # -- Unencrypted authenticator
1533 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1534 # authenticator-vno [0] INTEGER (5),
1535 # crealm [1] Realm,
1536 # cname [2] PrincipalName,
1537 # cksum [3] Checksum OPTIONAL,
1538 # cusec [4] Microseconds,
1539 # ctime [5] KerberosTime,
1540 # subkey [6] EncryptionKey OPTIONAL,
1541 # seq-number [7] UInt32 OPTIONAL,
1542 # authorization-data [8] AuthorizationData OPTIONAL
1544 Authenticator_obj = {
1545 'authenticator-vno': 5,
1546 'crealm': crealm,
1547 'cname': cname,
1548 'cusec': cusec,
1549 'ctime': ctime,
1551 if cksum is not None:
1552 Authenticator_obj['cksum'] = cksum
1553 if subkey is not None:
1554 Authenticator_obj['subkey'] = subkey
1555 if seq_number is not None:
1556 Authenticator_obj['seq-number'] = seq_number
1557 if authorization_data is not None:
1558 Authenticator_obj['authorization-data'] = authorization_data
1559 return Authenticator_obj
1561 def TGS_REQ_create(self,
1562 padata, # optional
1563 cusec,
1564 ctime,
1565 ticket,
1566 kdc_options, # required
1567 cname, # optional
1568 realm, # required
1569 sname, # optional
1570 from_time, # optional
1571 till_time, # required
1572 renew_time, # optional
1573 nonce, # required
1574 etypes, # required
1575 addresses, # optional
1576 EncAuthorizationData,
1577 EncAuthorizationData_key,
1578 additional_tickets,
1579 ticket_session_key,
1580 authenticator_subkey=None,
1581 body_checksum_type=None,
1582 native_decoded_only=True,
1583 asn1_print=None,
1584 hexdump=None):
1585 # KDC-REQ ::= SEQUENCE {
1586 # -- NOTE: first tag is [1], not [0]
1587 # pvno [1] INTEGER (5) ,
1588 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1589 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1590 # -- NOTE: not empty --,
1591 # req-body [4] KDC-REQ-BODY
1594 # KDC-REQ-BODY ::= SEQUENCE {
1595 # kdc-options [0] KDCOptions,
1596 # cname [1] PrincipalName OPTIONAL
1597 # -- Used only in AS-REQ --,
1598 # realm [2] Realm
1599 # -- Server's realm
1600 # -- Also client's in AS-REQ --,
1601 # sname [3] PrincipalName OPTIONAL,
1602 # from [4] KerberosTime OPTIONAL,
1603 # till [5] KerberosTime,
1604 # rtime [6] KerberosTime OPTIONAL,
1605 # nonce [7] UInt32,
1606 # etype [8] SEQUENCE OF Int32
1607 # -- EncryptionType
1608 # -- in preference order --,
1609 # addresses [9] HostAddresses OPTIONAL,
1610 # enc-authorization-data [10] EncryptedData OPTIONAL
1611 # -- AuthorizationData --,
1612 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1613 # -- NOTE: not empty
1616 if authenticator_subkey is not None:
1617 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1618 else:
1619 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1621 req_body = self.KDC_REQ_BODY_create(
1622 kdc_options=kdc_options,
1623 cname=None,
1624 realm=realm,
1625 sname=sname,
1626 from_time=from_time,
1627 till_time=till_time,
1628 renew_time=renew_time,
1629 nonce=nonce,
1630 etypes=etypes,
1631 addresses=addresses,
1632 additional_tickets=additional_tickets,
1633 EncAuthorizationData=EncAuthorizationData,
1634 EncAuthorizationData_key=EncAuthorizationData_key,
1635 EncAuthorizationData_usage=EncAuthorizationData_usage)
1636 req_body_blob = self.der_encode(req_body,
1637 asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1638 asn1_print=asn1_print, hexdump=hexdump)
1640 req_body_checksum = self.Checksum_create(ticket_session_key,
1641 KU_TGS_REQ_AUTH_CKSUM,
1642 req_body_blob,
1643 ctype=body_checksum_type)
1645 subkey_obj = None
1646 if authenticator_subkey is not None:
1647 subkey_obj = authenticator_subkey.export_obj()
1648 seq_number = random.randint(0, 0xfffffffe)
1649 authenticator = self.Authenticator_create(
1650 crealm=realm,
1651 cname=cname,
1652 cksum=req_body_checksum,
1653 cusec=cusec,
1654 ctime=ctime,
1655 subkey=subkey_obj,
1656 seq_number=seq_number,
1657 authorization_data=None)
1658 authenticator = self.der_encode(
1659 authenticator,
1660 asn1Spec=krb5_asn1.Authenticator(),
1661 asn1_print=asn1_print,
1662 hexdump=hexdump)
1664 authenticator = self.EncryptedData_create(
1665 ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1667 ap_options = krb5_asn1.APOptions('0')
1668 ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1669 ticket=ticket,
1670 authenticator=authenticator)
1671 ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1672 asn1_print=asn1_print, hexdump=hexdump)
1673 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1674 if padata is not None:
1675 padata.append(pa_tgs_req)
1676 else:
1677 padata = [pa_tgs_req]
1679 obj, decoded = self.KDC_REQ_create(
1680 msg_type=KRB_TGS_REQ,
1681 padata=padata,
1682 req_body=req_body,
1683 asn1Spec=krb5_asn1.TGS_REQ(),
1684 asn1_print=asn1_print,
1685 hexdump=hexdump)
1686 if native_decoded_only:
1687 return decoded
1688 return decoded, obj
1690 def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1691 # PA-S4U2Self ::= SEQUENCE {
1692 # name [0] PrincipalName,
1693 # realm [1] Realm,
1694 # cksum [2] Checksum,
1695 # auth [3] GeneralString
1697 cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1698 for n in name['name-string']:
1699 cksum_data += n.encode()
1700 cksum_data += realm.encode()
1701 cksum_data += "Kerberos".encode()
1702 cksum = self.Checksum_create(tgt_session_key,
1703 KU_NON_KERB_CKSUM_SALT,
1704 cksum_data,
1705 ctype)
1707 PA_S4U2Self_obj = {
1708 'name': name,
1709 'realm': realm,
1710 'cksum': cksum,
1711 'auth': "Kerberos",
1713 pa_s4u2self = self.der_encode(
1714 PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
1715 return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
1717 def _generic_kdc_exchange(self,
1718 kdc_exchange_dict, # required
1719 cname=None, # optional
1720 realm=None, # required
1721 sname=None, # optional
1722 from_time=None, # optional
1723 till_time=None, # required
1724 renew_time=None, # optional
1725 etypes=None, # required
1726 addresses=None, # optional
1727 additional_tickets=None, # optional
1728 EncAuthorizationData=None, # optional
1729 EncAuthorizationData_key=None, # optional
1730 EncAuthorizationData_usage=None): # optional
1732 check_error_fn = kdc_exchange_dict['check_error_fn']
1733 check_rep_fn = kdc_exchange_dict['check_rep_fn']
1734 generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
1735 generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
1736 generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
1737 generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
1738 callback_dict = kdc_exchange_dict['callback_dict']
1739 req_msg_type = kdc_exchange_dict['req_msg_type']
1740 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
1741 rep_msg_type = kdc_exchange_dict['rep_msg_type']
1743 expected_error_mode = kdc_exchange_dict['expected_error_mode']
1744 kdc_options = kdc_exchange_dict['kdc_options']
1746 pac_request = kdc_exchange_dict['pac_request']
1747 pac_options = kdc_exchange_dict['pac_options']
1749 # Parameters specific to the inner request body
1750 inner_req = kdc_exchange_dict['inner_req']
1752 # Parameters specific to the outer request body
1753 outer_req = kdc_exchange_dict['outer_req']
1755 if till_time is None:
1756 till_time = self.get_KerberosTime(offset=36000)
1758 if 'nonce' in kdc_exchange_dict:
1759 nonce = kdc_exchange_dict['nonce']
1760 else:
1761 nonce = self.get_Nonce()
1762 kdc_exchange_dict['nonce'] = nonce
1764 req_body = self.KDC_REQ_BODY_create(
1765 kdc_options=kdc_options,
1766 cname=cname,
1767 realm=realm,
1768 sname=sname,
1769 from_time=from_time,
1770 till_time=till_time,
1771 renew_time=renew_time,
1772 nonce=nonce,
1773 etypes=etypes,
1774 addresses=addresses,
1775 additional_tickets=additional_tickets,
1776 EncAuthorizationData=EncAuthorizationData,
1777 EncAuthorizationData_key=EncAuthorizationData_key,
1778 EncAuthorizationData_usage=EncAuthorizationData_usage)
1780 inner_req_body = dict(req_body)
1781 if inner_req is not None:
1782 for key, value in inner_req.items():
1783 if value is not None:
1784 inner_req_body[key] = value
1785 else:
1786 del inner_req_body[key]
1787 if outer_req is not None:
1788 for key, value in outer_req.items():
1789 if value is not None:
1790 req_body[key] = value
1791 else:
1792 del req_body[key]
1794 additional_padata = []
1795 if pac_request is not None:
1796 pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request)
1797 additional_padata.append(pa_pac_request)
1798 if pac_options is not None:
1799 pa_pac_options = self.get_pa_pac_options(pac_options)
1800 additional_padata.append(pa_pac_options)
1802 if req_msg_type == KRB_AS_REQ:
1803 tgs_req = None
1804 tgs_req_padata = None
1805 else:
1806 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1808 tgs_req = self.generate_ap_req(kdc_exchange_dict,
1809 callback_dict,
1810 req_body,
1811 armor=False)
1812 tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
1814 if generate_fast_padata_fn is not None:
1815 self.assertIsNotNone(generate_fast_fn)
1816 # This can alter req_body...
1817 fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
1818 callback_dict,
1819 req_body)
1820 else:
1821 fast_padata = []
1823 if generate_fast_armor_fn is not None:
1824 self.assertIsNotNone(generate_fast_fn)
1825 fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
1826 callback_dict,
1827 req_body,
1828 armor=True)
1830 fast_armor_type = kdc_exchange_dict['fast_armor_type']
1831 fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
1832 fast_ap_req)
1833 else:
1834 fast_armor = None
1836 if generate_padata_fn is not None:
1837 # This can alter req_body...
1838 outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
1839 callback_dict,
1840 req_body)
1841 self.assertIsNotNone(outer_padata)
1842 self.assertNotIn(PADATA_KDC_REQ,
1843 [pa['padata-type'] for pa in outer_padata],
1844 'Don\'t create TGS-REQ manually')
1845 else:
1846 outer_padata = None
1848 if generate_fast_fn is not None:
1849 armor_key = kdc_exchange_dict['armor_key']
1850 self.assertIsNotNone(armor_key)
1852 if req_msg_type == KRB_AS_REQ:
1853 checksum_blob = self.der_encode(
1854 req_body,
1855 asn1Spec=krb5_asn1.KDC_REQ_BODY())
1856 else:
1857 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1858 checksum_blob = tgs_req
1860 checksum = self.Checksum_create(armor_key,
1861 KU_FAST_REQ_CHKSUM,
1862 checksum_blob)
1864 fast_padata += additional_padata
1865 fast = generate_fast_fn(kdc_exchange_dict,
1866 callback_dict,
1867 inner_req_body,
1868 fast_padata,
1869 fast_armor,
1870 checksum)
1871 else:
1872 fast = None
1874 padata = []
1876 if tgs_req_padata is not None:
1877 padata.append(tgs_req_padata)
1879 if fast is not None:
1880 padata.append(fast)
1882 if outer_padata is not None:
1883 padata += outer_padata
1885 if fast is None:
1886 padata += additional_padata
1888 if not padata:
1889 padata = None
1891 kdc_exchange_dict['req_padata'] = padata
1892 kdc_exchange_dict['fast_padata'] = fast_padata
1893 kdc_exchange_dict['req_body'] = inner_req_body
1895 req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
1896 padata=padata,
1897 req_body=req_body,
1898 asn1Spec=req_asn1Spec())
1900 to_rodc = kdc_exchange_dict['to_rodc']
1902 rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc)
1903 self.assertIsNotNone(rep)
1905 msg_type = self.getElementValue(rep, 'msg-type')
1906 self.assertIsNotNone(msg_type)
1908 expected_msg_type = None
1909 if check_error_fn is not None:
1910 expected_msg_type = KRB_ERROR
1911 self.assertIsNone(check_rep_fn)
1912 self.assertNotEqual(0, len(expected_error_mode))
1913 self.assertNotIn(0, expected_error_mode)
1914 if check_rep_fn is not None:
1915 expected_msg_type = rep_msg_type
1916 self.assertIsNone(check_error_fn)
1917 self.assertEqual(0, len(expected_error_mode))
1918 self.assertIsNotNone(expected_msg_type)
1919 self.assertEqual(msg_type, expected_msg_type)
1921 if msg_type == KRB_ERROR:
1922 return check_error_fn(kdc_exchange_dict,
1923 callback_dict,
1924 rep)
1926 return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
1928 def as_exchange_dict(self,
1929 expected_crealm=None,
1930 expected_cname=None,
1931 expected_anon=False,
1932 expected_srealm=None,
1933 expected_sname=None,
1934 expected_supported_etypes=None,
1935 expected_flags=None,
1936 unexpected_flags=None,
1937 ticket_decryption_key=None,
1938 generate_fast_fn=None,
1939 generate_fast_armor_fn=None,
1940 generate_fast_padata_fn=None,
1941 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
1942 generate_padata_fn=None,
1943 check_error_fn=None,
1944 check_rep_fn=None,
1945 check_kdc_private_fn=None,
1946 callback_dict=None,
1947 expected_error_mode=0,
1948 expected_status=None,
1949 client_as_etypes=None,
1950 expected_salt=None,
1951 authenticator_subkey=None,
1952 preauth_key=None,
1953 armor_key=None,
1954 armor_tgt=None,
1955 armor_subkey=None,
1956 auth_data=None,
1957 kdc_options='',
1958 inner_req=None,
1959 outer_req=None,
1960 pac_request=None,
1961 pac_options=None,
1962 expect_pac=True,
1963 to_rodc=False):
1964 if expected_error_mode == 0:
1965 expected_error_mode = ()
1966 elif not isinstance(expected_error_mode, collections.abc.Container):
1967 expected_error_mode = (expected_error_mode,)
1969 kdc_exchange_dict = {
1970 'req_msg_type': KRB_AS_REQ,
1971 'req_asn1Spec': krb5_asn1.AS_REQ,
1972 'rep_msg_type': KRB_AS_REP,
1973 'rep_asn1Spec': krb5_asn1.AS_REP,
1974 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
1975 'expected_crealm': expected_crealm,
1976 'expected_cname': expected_cname,
1977 'expected_anon': expected_anon,
1978 'expected_srealm': expected_srealm,
1979 'expected_sname': expected_sname,
1980 'expected_supported_etypes': expected_supported_etypes,
1981 'expected_flags': expected_flags,
1982 'unexpected_flags': unexpected_flags,
1983 'ticket_decryption_key': ticket_decryption_key,
1984 'generate_fast_fn': generate_fast_fn,
1985 'generate_fast_armor_fn': generate_fast_armor_fn,
1986 'generate_fast_padata_fn': generate_fast_padata_fn,
1987 'fast_armor_type': fast_armor_type,
1988 'generate_padata_fn': generate_padata_fn,
1989 'check_error_fn': check_error_fn,
1990 'check_rep_fn': check_rep_fn,
1991 'check_kdc_private_fn': check_kdc_private_fn,
1992 'callback_dict': callback_dict,
1993 'expected_error_mode': expected_error_mode,
1994 'expected_status': expected_status,
1995 'client_as_etypes': client_as_etypes,
1996 'expected_salt': expected_salt,
1997 'authenticator_subkey': authenticator_subkey,
1998 'preauth_key': preauth_key,
1999 'armor_key': armor_key,
2000 'armor_tgt': armor_tgt,
2001 'armor_subkey': armor_subkey,
2002 'auth_data': auth_data,
2003 'kdc_options': kdc_options,
2004 'inner_req': inner_req,
2005 'outer_req': outer_req,
2006 'pac_request': pac_request,
2007 'pac_options': pac_options,
2008 'expect_pac': expect_pac,
2009 'to_rodc': to_rodc
2011 if callback_dict is None:
2012 callback_dict = {}
2014 return kdc_exchange_dict
2016 def tgs_exchange_dict(self,
2017 expected_crealm=None,
2018 expected_cname=None,
2019 expected_anon=False,
2020 expected_srealm=None,
2021 expected_sname=None,
2022 expected_supported_etypes=None,
2023 expected_flags=None,
2024 unexpected_flags=None,
2025 ticket_decryption_key=None,
2026 generate_fast_fn=None,
2027 generate_fast_armor_fn=None,
2028 generate_fast_padata_fn=None,
2029 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2030 generate_padata_fn=None,
2031 check_error_fn=None,
2032 check_rep_fn=None,
2033 check_kdc_private_fn=None,
2034 expected_error_mode=0,
2035 expected_status=None,
2036 callback_dict=None,
2037 tgt=None,
2038 armor_key=None,
2039 armor_tgt=None,
2040 armor_subkey=None,
2041 authenticator_subkey=None,
2042 auth_data=None,
2043 body_checksum_type=None,
2044 kdc_options='',
2045 inner_req=None,
2046 outer_req=None,
2047 pac_request=None,
2048 pac_options=None,
2049 expect_pac=True,
2050 to_rodc=False):
2051 if expected_error_mode == 0:
2052 expected_error_mode = ()
2053 elif not isinstance(expected_error_mode, collections.abc.Container):
2054 expected_error_mode = (expected_error_mode,)
2056 kdc_exchange_dict = {
2057 'req_msg_type': KRB_TGS_REQ,
2058 'req_asn1Spec': krb5_asn1.TGS_REQ,
2059 'rep_msg_type': KRB_TGS_REP,
2060 'rep_asn1Spec': krb5_asn1.TGS_REP,
2061 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
2062 'expected_crealm': expected_crealm,
2063 'expected_cname': expected_cname,
2064 'expected_anon': expected_anon,
2065 'expected_srealm': expected_srealm,
2066 'expected_sname': expected_sname,
2067 'expected_supported_etypes': expected_supported_etypes,
2068 'expected_flags': expected_flags,
2069 'unexpected_flags': unexpected_flags,
2070 'ticket_decryption_key': ticket_decryption_key,
2071 'generate_fast_fn': generate_fast_fn,
2072 'generate_fast_armor_fn': generate_fast_armor_fn,
2073 'generate_fast_padata_fn': generate_fast_padata_fn,
2074 'fast_armor_type': fast_armor_type,
2075 'generate_padata_fn': generate_padata_fn,
2076 'check_error_fn': check_error_fn,
2077 'check_rep_fn': check_rep_fn,
2078 'check_kdc_private_fn': check_kdc_private_fn,
2079 'callback_dict': callback_dict,
2080 'expected_error_mode': expected_error_mode,
2081 'expected_status': expected_status,
2082 'tgt': tgt,
2083 'body_checksum_type': body_checksum_type,
2084 'armor_key': armor_key,
2085 'armor_tgt': armor_tgt,
2086 'armor_subkey': armor_subkey,
2087 'auth_data': auth_data,
2088 'authenticator_subkey': authenticator_subkey,
2089 'kdc_options': kdc_options,
2090 'inner_req': inner_req,
2091 'outer_req': outer_req,
2092 'pac_request': pac_request,
2093 'pac_options': pac_options,
2094 'expect_pac': expect_pac,
2095 'to_rodc': to_rodc
2097 if callback_dict is None:
2098 callback_dict = {}
2100 return kdc_exchange_dict
2102 def generic_check_kdc_rep(self,
2103 kdc_exchange_dict,
2104 callback_dict,
2105 rep):
2107 expected_crealm = kdc_exchange_dict['expected_crealm']
2108 expected_anon = kdc_exchange_dict['expected_anon']
2109 expected_srealm = kdc_exchange_dict['expected_srealm']
2110 expected_sname = kdc_exchange_dict['expected_sname']
2111 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2112 check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
2113 rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
2114 msg_type = kdc_exchange_dict['rep_msg_type']
2115 armor_key = kdc_exchange_dict['armor_key']
2117 self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
2118 padata = self.getElementValue(rep, 'padata')
2119 if self.strict_checking:
2120 self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
2121 if expected_anon:
2122 expected_cname = self.PrincipalName_create(
2123 name_type=NT_WELLKNOWN,
2124 names=['WELLKNOWN', 'ANONYMOUS'])
2125 else:
2126 expected_cname = kdc_exchange_dict['expected_cname']
2127 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2128 self.assertElementPresent(rep, 'ticket')
2129 ticket = self.getElementValue(rep, 'ticket')
2130 ticket_encpart = None
2131 ticket_cipher = None
2132 self.assertIsNotNone(ticket)
2133 if ticket is not None: # Never None, but gives indentation
2134 self.assertElementEqual(ticket, 'tkt-vno', 5)
2135 self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
2136 self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
2137 self.assertElementPresent(ticket, 'enc-part')
2138 ticket_encpart = self.getElementValue(ticket, 'enc-part')
2139 self.assertIsNotNone(ticket_encpart)
2140 if ticket_encpart is not None: # Never None, but gives indentation
2141 self.assertElementPresent(ticket_encpart, 'etype')
2142 # 'unspecified' means present, with any value != 0
2143 self.assertElementKVNO(ticket_encpart, 'kvno',
2144 self.unspecified_kvno)
2145 self.assertElementPresent(ticket_encpart, 'cipher')
2146 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
2147 self.assertElementPresent(rep, 'enc-part')
2148 encpart = self.getElementValue(rep, 'enc-part')
2149 encpart_cipher = None
2150 self.assertIsNotNone(encpart)
2151 if encpart is not None: # Never None, but gives indentation
2152 self.assertElementPresent(encpart, 'etype')
2153 self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
2154 self.assertElementPresent(encpart, 'cipher')
2155 encpart_cipher = self.getElementValue(encpart, 'cipher')
2157 ticket_checksum = None
2159 # Get the decryption key for the encrypted part
2160 encpart_decryption_key, encpart_decryption_usage = (
2161 self.get_preauth_key(kdc_exchange_dict))
2163 if armor_key is not None:
2164 pa_dict = self.get_pa_dict(padata)
2166 if PADATA_FX_FAST in pa_dict:
2167 fx_fast_data = pa_dict[PADATA_FX_FAST]
2168 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
2169 fx_fast_data,
2170 armor_key,
2171 finished=True)
2173 if 'strengthen-key' in fast_response:
2174 strengthen_key = self.EncryptionKey_import(
2175 fast_response['strengthen-key'])
2176 encpart_decryption_key = (
2177 self.generate_strengthen_reply_key(
2178 strengthen_key,
2179 encpart_decryption_key))
2181 fast_finished = fast_response.get('finished')
2182 if fast_finished is not None:
2183 ticket_checksum = fast_finished['ticket-checksum']
2185 self.check_rep_padata(kdc_exchange_dict,
2186 callback_dict,
2187 fast_response['padata'],
2188 error_code=0)
2190 ticket_private = None
2191 if ticket_decryption_key is not None:
2192 self.assertElementEqual(ticket_encpart, 'etype',
2193 ticket_decryption_key.etype)
2194 self.assertElementKVNO(ticket_encpart, 'kvno',
2195 ticket_decryption_key.kvno)
2196 ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
2197 ticket_cipher)
2198 ticket_private = self.der_decode(
2199 ticket_decpart,
2200 asn1Spec=krb5_asn1.EncTicketPart())
2202 encpart_private = None
2203 self.assertIsNotNone(encpart_decryption_key)
2204 if encpart_decryption_key is not None:
2205 self.assertElementEqual(encpart, 'etype',
2206 encpart_decryption_key.etype)
2207 if self.strict_checking:
2208 self.assertElementKVNO(encpart, 'kvno',
2209 encpart_decryption_key.kvno)
2210 rep_decpart = encpart_decryption_key.decrypt(
2211 encpart_decryption_usage,
2212 encpart_cipher)
2213 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2214 # application tag 26
2215 try:
2216 encpart_private = self.der_decode(
2217 rep_decpart,
2218 asn1Spec=rep_encpart_asn1Spec())
2219 except Exception:
2220 encpart_private = self.der_decode(
2221 rep_decpart,
2222 asn1Spec=krb5_asn1.EncTGSRepPart())
2224 self.assertIsNotNone(check_kdc_private_fn)
2225 if check_kdc_private_fn is not None:
2226 check_kdc_private_fn(kdc_exchange_dict, callback_dict,
2227 rep, ticket_private, encpart_private,
2228 ticket_checksum)
2230 return rep
2232 def check_fx_fast_data(self,
2233 kdc_exchange_dict,
2234 fx_fast_data,
2235 armor_key,
2236 finished=False,
2237 expect_strengthen_key=True):
2238 fx_fast_data = self.der_decode(fx_fast_data,
2239 asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
2241 enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
2242 self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
2244 fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
2246 fast_response = self.der_decode(fast_rep,
2247 asn1Spec=krb5_asn1.KrbFastResponse())
2249 if expect_strengthen_key and self.strict_checking:
2250 self.assertIn('strengthen-key', fast_response)
2252 if finished:
2253 self.assertIn('finished', fast_response)
2255 # Ensure that the nonce matches the nonce in the body of the request
2256 # (RFC6113 5.4.3).
2257 nonce = kdc_exchange_dict['nonce']
2258 self.assertEqual(nonce, fast_response['nonce'])
2260 return fast_response
2262 def generic_check_kdc_private(self,
2263 kdc_exchange_dict,
2264 callback_dict,
2265 rep,
2266 ticket_private,
2267 encpart_private,
2268 ticket_checksum):
2269 kdc_options = kdc_exchange_dict['kdc_options']
2270 canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
2271 canonicalize = (canon_pos < len(kdc_options)
2272 and kdc_options[canon_pos] == '1')
2273 renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1
2274 renewable = (renewable_pos < len(kdc_options)
2275 and kdc_options[renewable_pos] == '1')
2277 expected_crealm = kdc_exchange_dict['expected_crealm']
2278 expected_cname = kdc_exchange_dict['expected_cname']
2279 expected_srealm = kdc_exchange_dict['expected_srealm']
2280 expected_sname = kdc_exchange_dict['expected_sname']
2281 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2283 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2285 expected_flags = kdc_exchange_dict.get('expected_flags')
2286 unexpected_flags = kdc_exchange_dict.get('unexpected_flags')
2288 ticket = self.getElementValue(rep, 'ticket')
2290 if ticket_checksum is not None:
2291 armor_key = kdc_exchange_dict['armor_key']
2292 self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
2294 to_rodc = kdc_exchange_dict['to_rodc']
2295 if to_rodc:
2296 krbtgt_creds = self.get_rodc_krbtgt_creds()
2297 else:
2298 krbtgt_creds = self.get_krbtgt_creds()
2299 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
2301 expect_pac = kdc_exchange_dict['expect_pac']
2303 ticket_session_key = None
2304 if ticket_private is not None:
2305 self.assertElementFlags(ticket_private, 'flags',
2306 expected_flags,
2307 unexpected_flags)
2308 self.assertElementPresent(ticket_private, 'key')
2309 ticket_key = self.getElementValue(ticket_private, 'key')
2310 self.assertIsNotNone(ticket_key)
2311 if ticket_key is not None: # Never None, but gives indentation
2312 self.assertElementPresent(ticket_key, 'keytype')
2313 self.assertElementPresent(ticket_key, 'keyvalue')
2314 ticket_session_key = self.EncryptionKey_import(ticket_key)
2315 self.assertElementEqualUTF8(ticket_private, 'crealm',
2316 expected_crealm)
2317 if self.strict_checking:
2318 self.assertElementEqualPrincipal(ticket_private, 'cname',
2319 expected_cname)
2320 self.assertElementPresent(ticket_private, 'transited')
2321 self.assertElementPresent(ticket_private, 'authtime')
2322 if self.strict_checking:
2323 self.assertElementPresent(ticket_private, 'starttime')
2324 self.assertElementPresent(ticket_private, 'endtime')
2325 if renewable:
2326 if self.strict_checking:
2327 self.assertElementPresent(ticket_private, 'renew-till')
2328 else:
2329 self.assertElementMissing(ticket_private, 'renew-till')
2330 if self.strict_checking:
2331 self.assertElementEqual(ticket_private, 'caddr', [])
2332 self.assertElementPresent(ticket_private, 'authorization-data',
2333 expect_empty=not expect_pac)
2335 encpart_session_key = None
2336 if encpart_private is not None:
2337 self.assertElementPresent(encpart_private, 'key')
2338 encpart_key = self.getElementValue(encpart_private, 'key')
2339 self.assertIsNotNone(encpart_key)
2340 if encpart_key is not None: # Never None, but gives indentation
2341 self.assertElementPresent(encpart_key, 'keytype')
2342 self.assertElementPresent(encpart_key, 'keyvalue')
2343 encpart_session_key = self.EncryptionKey_import(encpart_key)
2344 self.assertElementPresent(encpart_private, 'last-req')
2345 self.assertElementEqual(encpart_private, 'nonce',
2346 kdc_exchange_dict['nonce'])
2347 if rep_msg_type == KRB_AS_REP:
2348 if self.strict_checking:
2349 self.assertElementPresent(encpart_private,
2350 'key-expiration')
2351 else:
2352 self.assertElementMissing(encpart_private,
2353 'key-expiration')
2354 self.assertElementFlags(encpart_private, 'flags',
2355 expected_flags,
2356 unexpected_flags)
2357 self.assertElementPresent(encpart_private, 'authtime')
2358 if self.strict_checking:
2359 self.assertElementPresent(encpart_private, 'starttime')
2360 self.assertElementPresent(encpart_private, 'endtime')
2361 if renewable:
2362 if self.strict_checking:
2363 self.assertElementPresent(encpart_private, 'renew-till')
2364 else:
2365 self.assertElementMissing(encpart_private, 'renew-till')
2366 self.assertElementEqualUTF8(encpart_private, 'srealm',
2367 expected_srealm)
2368 self.assertElementEqualPrincipal(encpart_private, 'sname',
2369 expected_sname)
2370 if self.strict_checking:
2371 self.assertElementEqual(encpart_private, 'caddr', [])
2373 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2375 if self.strict_checking:
2376 if canonicalize or '1' in sent_pac_options:
2377 self.assertElementPresent(encpart_private,
2378 'encrypted-pa-data')
2379 enc_pa_dict = self.get_pa_dict(
2380 encpart_private['encrypted-pa-data'])
2381 if canonicalize:
2382 self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2384 expected_supported_etypes = kdc_exchange_dict[
2385 'expected_supported_etypes']
2386 expected_supported_etypes |= (
2387 security.KERB_ENCTYPE_DES_CBC_CRC |
2388 security.KERB_ENCTYPE_DES_CBC_MD5 |
2389 security.KERB_ENCTYPE_RC4_HMAC_MD5)
2391 (supported_etypes,) = struct.unpack(
2392 '<L',
2393 enc_pa_dict[PADATA_SUPPORTED_ETYPES])
2395 self.assertEqual(supported_etypes,
2396 expected_supported_etypes)
2397 else:
2398 self.assertNotIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2400 if '1' in sent_pac_options:
2401 self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2403 pac_options = self.der_decode(
2404 enc_pa_dict[PADATA_PAC_OPTIONS],
2405 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2407 self.assertElementEqual(pac_options, 'options',
2408 sent_pac_options)
2409 else:
2410 self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2411 else:
2412 self.assertElementEqual(encpart_private,
2413 'encrypted-pa-data',
2416 if ticket_session_key is not None and encpart_session_key is not None:
2417 self.assertEqual(ticket_session_key.etype,
2418 encpart_session_key.etype)
2419 self.assertEqual(ticket_session_key.key.contents,
2420 encpart_session_key.key.contents)
2421 if encpart_session_key is not None:
2422 session_key = encpart_session_key
2423 else:
2424 session_key = ticket_session_key
2425 ticket_creds = KerberosTicketCreds(
2426 ticket,
2427 session_key,
2428 crealm=expected_crealm,
2429 cname=expected_cname,
2430 srealm=expected_srealm,
2431 sname=expected_sname,
2432 decryption_key=ticket_decryption_key,
2433 ticket_private=ticket_private,
2434 encpart_private=encpart_private)
2436 if ticket_decryption_key is not None:
2437 self.verify_ticket(ticket_creds, krbtgt_key, expect_pac=expect_pac)
2439 kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
2441 def generic_check_kdc_error(self,
2442 kdc_exchange_dict,
2443 callback_dict,
2444 rep,
2445 inner=False):
2447 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2449 expected_anon = kdc_exchange_dict['expected_anon']
2450 expected_srealm = kdc_exchange_dict['expected_srealm']
2451 expected_sname = kdc_exchange_dict['expected_sname']
2452 expected_error_mode = kdc_exchange_dict['expected_error_mode']
2454 sent_fast = self.sent_fast(kdc_exchange_dict)
2456 fast_armor_type = kdc_exchange_dict['fast_armor_type']
2458 self.assertElementEqual(rep, 'pvno', 5)
2459 self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
2460 error_code = self.getElementValue(rep, 'error-code')
2461 self.assertIn(error_code, expected_error_mode)
2462 if self.strict_checking:
2463 self.assertElementMissing(rep, 'ctime')
2464 self.assertElementMissing(rep, 'cusec')
2465 self.assertElementPresent(rep, 'stime')
2466 self.assertElementPresent(rep, 'susec')
2467 # error-code checked above
2468 if self.strict_checking:
2469 self.assertElementMissing(rep, 'crealm')
2470 if expected_anon and not inner:
2471 expected_cname = self.PrincipalName_create(
2472 name_type=NT_WELLKNOWN,
2473 names=['WELLKNOWN', 'ANONYMOUS'])
2474 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2475 else:
2476 self.assertElementMissing(rep, 'cname')
2477 self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
2478 if sent_fast and error_code == KDC_ERR_GENERIC:
2479 self.assertElementEqualPrincipal(rep, 'sname',
2480 self.get_krbtgt_sname())
2481 else:
2482 self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
2483 self.assertElementMissing(rep, 'e-text')
2484 if (error_code == KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
2485 or (rep_msg_type == KRB_TGS_REP
2486 and not sent_fast)
2487 or (sent_fast and fast_armor_type is not None
2488 and fast_armor_type != FX_FAST_ARMOR_AP_REQUEST)
2489 or inner):
2490 self.assertElementMissing(rep, 'e-data')
2491 return rep
2492 edata = self.getElementValue(rep, 'e-data')
2493 if self.strict_checking:
2494 if error_code != KDC_ERR_GENERIC:
2495 # Predicting whether an ERR_GENERIC error contains e-data is
2496 # more complicated.
2497 self.assertIsNotNone(edata)
2498 if edata is not None:
2499 if rep_msg_type == KRB_TGS_REP and not sent_fast:
2500 rep_padata = [self.der_decode(edata,
2501 asn1Spec=krb5_asn1.PA_DATA())]
2502 else:
2503 rep_padata = self.der_decode(edata,
2504 asn1Spec=krb5_asn1.METHOD_DATA())
2505 self.assertGreater(len(rep_padata), 0)
2507 if sent_fast:
2508 self.assertEqual(1, len(rep_padata))
2509 rep_pa_dict = self.get_pa_dict(rep_padata)
2510 self.assertIn(PADATA_FX_FAST, rep_pa_dict)
2512 armor_key = kdc_exchange_dict['armor_key']
2513 self.assertIsNotNone(armor_key)
2514 fast_response = self.check_fx_fast_data(
2515 kdc_exchange_dict,
2516 rep_pa_dict[PADATA_FX_FAST],
2517 armor_key,
2518 expect_strengthen_key=False)
2520 rep_padata = fast_response['padata']
2522 etype_info2 = self.check_rep_padata(kdc_exchange_dict,
2523 callback_dict,
2524 rep_padata,
2525 error_code)
2527 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
2529 return rep
2531 def check_rep_padata(self,
2532 kdc_exchange_dict,
2533 callback_dict,
2534 rep_padata,
2535 error_code):
2536 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2538 req_body = kdc_exchange_dict['req_body']
2539 proposed_etypes = req_body['etype']
2540 client_as_etypes = kdc_exchange_dict.get('client_as_etypes', [])
2542 sent_fast = self.sent_fast(kdc_exchange_dict)
2543 sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict)
2545 if rep_msg_type == KRB_TGS_REP:
2546 self.assertTrue(sent_fast)
2548 expect_etype_info2 = ()
2549 expect_etype_info = False
2550 unexpect_etype_info = True
2551 expected_aes_type = 0
2552 expected_rc4_type = 0
2553 if kcrypto.Enctype.RC4 in proposed_etypes:
2554 expect_etype_info = True
2555 for etype in proposed_etypes:
2556 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2557 expect_etype_info = False
2558 if etype not in client_as_etypes:
2559 continue
2560 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2561 if etype > expected_aes_type:
2562 expected_aes_type = etype
2563 if etype in (kcrypto.Enctype.RC4,) and error_code != 0:
2564 unexpect_etype_info = False
2565 if etype > expected_rc4_type:
2566 expected_rc4_type = etype
2568 if expected_aes_type != 0:
2569 expect_etype_info2 += (expected_aes_type,)
2570 if expected_rc4_type != 0:
2571 expect_etype_info2 += (expected_rc4_type,)
2573 expected_patypes = ()
2574 if sent_fast and error_code != 0:
2575 expected_patypes += (PADATA_FX_ERROR,)
2576 expected_patypes += (PADATA_FX_COOKIE,)
2578 if rep_msg_type == KRB_TGS_REP:
2579 if not sent_fast and error_code != 0:
2580 expected_patypes += (PADATA_PW_SALT,)
2581 else:
2582 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2583 if ('1' in sent_pac_options
2584 and error_code not in (0, KDC_ERR_GENERIC)):
2585 expected_patypes += (PADATA_PAC_OPTIONS,)
2586 elif error_code != KDC_ERR_GENERIC:
2587 if expect_etype_info:
2588 self.assertGreater(len(expect_etype_info2), 0)
2589 expected_patypes += (PADATA_ETYPE_INFO,)
2590 if len(expect_etype_info2) != 0:
2591 expected_patypes += (PADATA_ETYPE_INFO2,)
2593 if error_code != KDC_ERR_PREAUTH_FAILED:
2594 if sent_fast:
2595 expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,)
2596 else:
2597 expected_patypes += (PADATA_ENC_TIMESTAMP,)
2599 if not sent_enc_challenge:
2600 expected_patypes += (PADATA_PK_AS_REQ,)
2601 expected_patypes += (PADATA_PK_AS_REP_19,)
2603 if (self.kdc_fast_support
2604 and not sent_fast
2605 and not sent_enc_challenge):
2606 expected_patypes += (PADATA_FX_FAST,)
2607 expected_patypes += (PADATA_FX_COOKIE,)
2609 if self.strict_checking:
2610 for i, patype in enumerate(expected_patypes):
2611 self.assertElementEqual(rep_padata[i], 'padata-type', patype)
2612 self.assertEqual(len(rep_padata), len(expected_patypes))
2614 etype_info2 = None
2615 etype_info = None
2616 enc_timestamp = None
2617 enc_challenge = None
2618 pk_as_req = None
2619 pk_as_rep19 = None
2620 fast_cookie = None
2621 fast_error = None
2622 fx_fast = None
2623 pac_options = None
2624 pw_salt = None
2625 for pa in rep_padata:
2626 patype = self.getElementValue(pa, 'padata-type')
2627 pavalue = self.getElementValue(pa, 'padata-value')
2628 if patype == PADATA_ETYPE_INFO2:
2629 self.assertIsNone(etype_info2)
2630 etype_info2 = self.der_decode(pavalue,
2631 asn1Spec=krb5_asn1.ETYPE_INFO2())
2632 continue
2633 if patype == PADATA_ETYPE_INFO:
2634 self.assertIsNone(etype_info)
2635 etype_info = self.der_decode(pavalue,
2636 asn1Spec=krb5_asn1.ETYPE_INFO())
2637 continue
2638 if patype == PADATA_ENC_TIMESTAMP:
2639 self.assertIsNone(enc_timestamp)
2640 enc_timestamp = pavalue
2641 self.assertEqual(len(enc_timestamp), 0)
2642 continue
2643 if patype == PADATA_ENCRYPTED_CHALLENGE:
2644 self.assertIsNone(enc_challenge)
2645 enc_challenge = pavalue
2646 continue
2647 if patype == PADATA_PK_AS_REQ:
2648 self.assertIsNone(pk_as_req)
2649 pk_as_req = pavalue
2650 self.assertEqual(len(pk_as_req), 0)
2651 continue
2652 if patype == PADATA_PK_AS_REP_19:
2653 self.assertIsNone(pk_as_rep19)
2654 pk_as_rep19 = pavalue
2655 self.assertEqual(len(pk_as_rep19), 0)
2656 continue
2657 if patype == PADATA_FX_COOKIE:
2658 self.assertIsNone(fast_cookie)
2659 fast_cookie = pavalue
2660 self.assertIsNotNone(fast_cookie)
2661 continue
2662 if patype == PADATA_FX_ERROR:
2663 self.assertIsNone(fast_error)
2664 fast_error = pavalue
2665 self.assertIsNotNone(fast_error)
2666 continue
2667 if patype == PADATA_FX_FAST:
2668 self.assertIsNone(fx_fast)
2669 fx_fast = pavalue
2670 self.assertEqual(len(fx_fast), 0)
2671 continue
2672 if patype == PADATA_PAC_OPTIONS:
2673 self.assertIsNone(pac_options)
2674 pac_options = self.der_decode(
2675 pavalue,
2676 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2677 continue
2678 if patype == PADATA_PW_SALT:
2679 self.assertIsNone(pw_salt)
2680 pw_salt = pavalue
2681 self.assertIsNotNone(pw_salt)
2682 continue
2684 if fast_cookie is not None:
2685 kdc_exchange_dict['fast_cookie'] = fast_cookie
2687 if fast_error is not None:
2688 fast_error = self.der_decode(fast_error,
2689 asn1Spec=krb5_asn1.KRB_ERROR())
2690 self.generic_check_kdc_error(kdc_exchange_dict,
2691 callback_dict,
2692 fast_error,
2693 inner=True)
2695 if pac_options is not None:
2696 self.assertElementEqual(pac_options, 'options', sent_pac_options)
2698 if pw_salt is not None:
2699 self.assertEqual(12, len(pw_salt))
2701 status = int.from_bytes(pw_salt[:4], 'little')
2702 flags = int.from_bytes(pw_salt[8:], 'little')
2704 expected_status = kdc_exchange_dict['expected_status']
2705 self.assertEqual(expected_status, status)
2707 self.assertEqual(3, flags)
2708 else:
2709 self.assertIsNone(kdc_exchange_dict.get('expected_status'))
2711 if enc_challenge is not None:
2712 if not sent_enc_challenge:
2713 self.assertEqual(len(enc_challenge), 0)
2714 else:
2715 armor_key = kdc_exchange_dict['armor_key']
2716 self.assertIsNotNone(armor_key)
2718 preauth_key, _ = self.get_preauth_key(kdc_exchange_dict)
2720 kdc_challenge_key = self.generate_kdc_challenge_key(
2721 armor_key, preauth_key)
2723 # Ensure that the encrypted challenge FAST factor is supported
2724 # (RFC6113 5.4.6).
2725 if self.strict_checking:
2726 self.assertNotEqual(len(enc_challenge), 0)
2727 if len(enc_challenge) != 0:
2728 encrypted_challenge = self.der_decode(
2729 enc_challenge,
2730 asn1Spec=krb5_asn1.EncryptedData())
2731 self.assertEqual(encrypted_challenge['etype'],
2732 kdc_challenge_key.etype)
2734 challenge = kdc_challenge_key.decrypt(
2735 KU_ENC_CHALLENGE_KDC,
2736 encrypted_challenge['cipher'])
2737 challenge = self.der_decode(
2738 challenge,
2739 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
2741 # Retrieve the returned timestamp.
2742 rep_patime = challenge['patimestamp']
2743 self.assertIn('pausec', challenge)
2745 # Ensure the returned time is within five minutes of the
2746 # current time.
2747 rep_time = self.get_EpochFromKerberosTime(rep_patime)
2748 current_time = time.time()
2750 self.assertLess(current_time - 300, rep_time)
2751 self.assertLess(rep_time, current_time + 300)
2753 if all(etype not in client_as_etypes or etype not in proposed_etypes
2754 for etype in (kcrypto.Enctype.AES256,
2755 kcrypto.Enctype.AES128,
2756 kcrypto.Enctype.RC4)):
2757 self.assertIsNone(etype_info2)
2758 self.assertIsNone(etype_info)
2759 if rep_msg_type == KRB_AS_REP:
2760 if self.strict_checking:
2761 if sent_fast:
2762 self.assertIsNotNone(enc_challenge)
2763 self.assertIsNone(enc_timestamp)
2764 else:
2765 self.assertIsNotNone(enc_timestamp)
2766 self.assertIsNone(enc_challenge)
2767 self.assertIsNotNone(pk_as_req)
2768 self.assertIsNotNone(pk_as_rep19)
2769 else:
2770 self.assertIsNone(enc_timestamp)
2771 self.assertIsNone(enc_challenge)
2772 self.assertIsNone(pk_as_req)
2773 self.assertIsNone(pk_as_rep19)
2774 return None
2776 if error_code != KDC_ERR_GENERIC:
2777 if self.strict_checking:
2778 self.assertIsNotNone(etype_info2)
2779 else:
2780 self.assertIsNone(etype_info2)
2781 if expect_etype_info:
2782 self.assertIsNotNone(etype_info)
2783 else:
2784 if self.strict_checking:
2785 self.assertIsNone(etype_info)
2786 if unexpect_etype_info:
2787 self.assertIsNone(etype_info)
2789 if error_code != KDC_ERR_GENERIC and self.strict_checking:
2790 self.assertGreaterEqual(len(etype_info2), 1)
2791 self.assertEqual(len(etype_info2), len(expect_etype_info2))
2792 for i in range(0, len(etype_info2)):
2793 e = self.getElementValue(etype_info2[i], 'etype')
2794 self.assertEqual(e, expect_etype_info2[i])
2795 salt = self.getElementValue(etype_info2[i], 'salt')
2796 if e == kcrypto.Enctype.RC4:
2797 self.assertIsNone(salt)
2798 else:
2799 self.assertIsNotNone(salt)
2800 expected_salt = kdc_exchange_dict['expected_salt']
2801 if expected_salt is not None:
2802 self.assertEqual(salt, expected_salt)
2803 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
2804 if self.strict_checking:
2805 self.assertIsNone(s2kparams)
2806 if etype_info is not None:
2807 self.assertEqual(len(etype_info), 1)
2808 e = self.getElementValue(etype_info[0], 'etype')
2809 self.assertEqual(e, kcrypto.Enctype.RC4)
2810 self.assertEqual(e, expect_etype_info2[0])
2811 salt = self.getElementValue(etype_info[0], 'salt')
2812 if self.strict_checking:
2813 self.assertIsNotNone(salt)
2814 self.assertEqual(len(salt), 0)
2816 if error_code not in (KDC_ERR_PREAUTH_FAILED,
2817 KDC_ERR_GENERIC):
2818 if sent_fast:
2819 self.assertIsNotNone(enc_challenge)
2820 if self.strict_checking:
2821 self.assertIsNone(enc_timestamp)
2822 else:
2823 self.assertIsNotNone(enc_timestamp)
2824 if self.strict_checking:
2825 self.assertIsNone(enc_challenge)
2826 if not sent_enc_challenge:
2827 if self.strict_checking:
2828 self.assertIsNotNone(pk_as_req)
2829 self.assertIsNotNone(pk_as_rep19)
2830 else:
2831 self.assertIsNone(pk_as_req)
2832 self.assertIsNone(pk_as_rep19)
2833 else:
2834 if self.strict_checking:
2835 self.assertIsNone(enc_timestamp)
2836 self.assertIsNone(enc_challenge)
2837 self.assertIsNone(pk_as_req)
2838 self.assertIsNone(pk_as_rep19)
2840 return etype_info2
2842 def generate_simple_fast(self,
2843 kdc_exchange_dict,
2844 _callback_dict,
2845 req_body,
2846 fast_padata,
2847 fast_armor,
2848 checksum,
2849 fast_options=''):
2850 armor_key = kdc_exchange_dict['armor_key']
2852 fast_req = self.KRB_FAST_REQ_create(fast_options,
2853 fast_padata,
2854 req_body)
2855 fast_req = self.der_encode(fast_req,
2856 asn1Spec=krb5_asn1.KrbFastReq())
2857 fast_req = self.EncryptedData_create(armor_key,
2858 KU_FAST_ENC,
2859 fast_req)
2861 fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
2862 checksum,
2863 fast_req)
2865 fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
2866 fx_fast_request = self.der_encode(
2867 fx_fast_request,
2868 asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
2870 fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
2871 fx_fast_request)
2873 return fast_padata
2875 def generate_ap_req(self,
2876 kdc_exchange_dict,
2877 _callback_dict,
2878 req_body,
2879 armor):
2880 if armor:
2881 tgt = kdc_exchange_dict['armor_tgt']
2882 authenticator_subkey = kdc_exchange_dict['armor_subkey']
2884 req_body_checksum = None
2885 else:
2886 tgt = kdc_exchange_dict['tgt']
2887 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2888 body_checksum_type = kdc_exchange_dict['body_checksum_type']
2890 req_body_blob = self.der_encode(req_body,
2891 asn1Spec=krb5_asn1.KDC_REQ_BODY())
2893 req_body_checksum = self.Checksum_create(tgt.session_key,
2894 KU_TGS_REQ_AUTH_CKSUM,
2895 req_body_blob,
2896 ctype=body_checksum_type)
2898 auth_data = kdc_exchange_dict['auth_data']
2900 subkey_obj = None
2901 if authenticator_subkey is not None:
2902 subkey_obj = authenticator_subkey.export_obj()
2903 seq_number = random.randint(0, 0xfffffffe)
2904 (ctime, cusec) = self.get_KerberosTimeWithUsec()
2905 authenticator_obj = self.Authenticator_create(
2906 crealm=tgt.crealm,
2907 cname=tgt.cname,
2908 cksum=req_body_checksum,
2909 cusec=cusec,
2910 ctime=ctime,
2911 subkey=subkey_obj,
2912 seq_number=seq_number,
2913 authorization_data=auth_data)
2914 authenticator_blob = self.der_encode(
2915 authenticator_obj,
2916 asn1Spec=krb5_asn1.Authenticator())
2918 usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
2919 authenticator = self.EncryptedData_create(tgt.session_key,
2920 usage,
2921 authenticator_blob)
2923 ap_options = krb5_asn1.APOptions('0')
2924 ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
2925 ticket=tgt.ticket,
2926 authenticator=authenticator)
2927 ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
2929 return ap_req
2931 def generate_simple_tgs_padata(self,
2932 kdc_exchange_dict,
2933 callback_dict,
2934 req_body):
2935 ap_req = self.generate_ap_req(kdc_exchange_dict,
2936 callback_dict,
2937 req_body,
2938 armor=False)
2939 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
2940 padata = [pa_tgs_req]
2942 return padata, req_body
2944 def get_preauth_key(self, kdc_exchange_dict):
2945 msg_type = kdc_exchange_dict['rep_msg_type']
2947 if msg_type == KRB_AS_REP:
2948 key = kdc_exchange_dict['preauth_key']
2949 usage = KU_AS_REP_ENC_PART
2950 else: # KRB_TGS_REP
2951 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2952 if authenticator_subkey is not None:
2953 key = authenticator_subkey
2954 usage = KU_TGS_REP_ENC_PART_SUB_KEY
2955 else:
2956 tgt = kdc_exchange_dict['tgt']
2957 key = tgt.session_key
2958 usage = KU_TGS_REP_ENC_PART_SESSION
2960 self.assertIsNotNone(key)
2962 return key, usage
2964 def generate_armor_key(self, subkey, session_key):
2965 armor_key = kcrypto.cf2(subkey.key,
2966 session_key.key,
2967 b'subkeyarmor',
2968 b'ticketarmor')
2969 armor_key = Krb5EncryptionKey(armor_key, None)
2971 return armor_key
2973 def generate_strengthen_reply_key(self, strengthen_key, reply_key):
2974 strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
2975 reply_key.key,
2976 b'strengthenkey',
2977 b'replykey')
2978 strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
2979 reply_key.kvno)
2981 return strengthen_reply_key
2983 def generate_client_challenge_key(self, armor_key, longterm_key):
2984 client_challenge_key = kcrypto.cf2(armor_key.key,
2985 longterm_key.key,
2986 b'clientchallengearmor',
2987 b'challengelongterm')
2988 client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
2990 return client_challenge_key
2992 def generate_kdc_challenge_key(self, armor_key, longterm_key):
2993 kdc_challenge_key = kcrypto.cf2(armor_key.key,
2994 longterm_key.key,
2995 b'kdcchallengearmor',
2996 b'challengelongterm')
2997 kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
2999 return kdc_challenge_key
3001 def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
3002 expected_type = expected_checksum['cksumtype']
3003 self.assertEqual(armor_key.ctype, expected_type)
3005 ticket_blob = self.der_encode(ticket,
3006 asn1Spec=krb5_asn1.Ticket())
3007 checksum = self.Checksum_create(armor_key,
3008 KU_FAST_FINISHED,
3009 ticket_blob)
3010 self.assertEqual(expected_checksum, checksum)
3012 def verify_ticket(self, ticket, krbtgt_key, expect_pac=True):
3013 # Check if the ticket is a TGT.
3014 sname = ticket.ticket['sname']
3015 is_tgt = self.is_tgs(sname)
3017 # Decrypt the ticket.
3019 key = ticket.decryption_key
3020 enc_part = ticket.ticket['enc-part']
3022 self.assertElementEqual(enc_part, 'etype', key.etype)
3023 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3025 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3026 enc_part = self.der_decode(
3027 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3029 # Fetch the authorization data from the ticket.
3030 auth_data = enc_part.get('authorization-data')
3031 if expect_pac:
3032 self.assertIsNotNone(auth_data)
3033 elif auth_data is None:
3034 return
3036 # Get a copy of the authdata with an empty PAC, and the existing PAC
3037 # (if present).
3038 empty_pac = self.get_empty_pac()
3039 auth_data, pac_data = self.replace_pac(auth_data,
3040 empty_pac,
3041 expect_pac=expect_pac)
3042 if not expect_pac:
3043 return
3045 # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
3046 # raw type to create a new PAC with zeroed signatures for
3047 # verification. This is because on Windows, the resource_groups field
3048 # is added to PAC_LOGON_INFO after the info3 field has been created,
3049 # which results in a different ordering of pointer values than Samba
3050 # (see commit 0e201ecdc53). Using the raw type avoids changing
3051 # PAC_LOGON_INFO, so verification against Windows can work. We still
3052 # need the PAC_DATA type to retrieve the actual checksums, because the
3053 # signatures in the raw type may contain padding bytes.
3054 pac = ndr_unpack(krb5pac.PAC_DATA,
3055 pac_data)
3056 raw_pac = ndr_unpack(krb5pac.PAC_DATA_RAW,
3057 pac_data)
3059 checksums = {}
3061 for pac_buffer, raw_pac_buffer in zip(pac.buffers, raw_pac.buffers):
3062 buffer_type = pac_buffer.type
3063 if buffer_type in self.pac_checksum_types:
3064 self.assertNotIn(buffer_type, checksums,
3065 f'Duplicate checksum type {buffer_type}')
3067 # Fetch the checksum and the checksum type from the PAC buffer.
3068 checksum = pac_buffer.info.signature
3069 ctype = pac_buffer.info.type
3070 if ctype & 1 << 31:
3071 ctype |= -1 << 31
3073 checksums[buffer_type] = checksum, ctype
3075 if buffer_type != krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3076 # Zero the checksum field so that we can later verify the
3077 # checksums. The ticket checksum field is not zeroed.
3079 signature = ndr_unpack(
3080 krb5pac.PAC_SIGNATURE_DATA,
3081 raw_pac_buffer.info.remaining)
3082 signature.signature = bytes(len(checksum))
3083 raw_pac_buffer.info.remaining = ndr_pack(
3084 signature)
3086 # Re-encode the PAC.
3087 pac_data = ndr_pack(raw_pac)
3089 # Verify the signatures.
3091 server_checksum, server_ctype = checksums[
3092 krb5pac.PAC_TYPE_SRV_CHECKSUM]
3093 key.verify_checksum(KU_NON_KERB_CKSUM_SALT,
3094 pac_data,
3095 server_ctype,
3096 server_checksum)
3098 kdc_checksum, kdc_ctype = checksums[
3099 krb5pac.PAC_TYPE_KDC_CHECKSUM]
3100 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
3101 server_checksum,
3102 kdc_ctype,
3103 kdc_checksum)
3105 if is_tgt:
3106 self.assertNotIn(krb5pac.PAC_TYPE_TICKET_CHECKSUM, checksums)
3107 else:
3108 ticket_checksum, ticket_ctype = checksums.get(
3109 krb5pac.PAC_TYPE_TICKET_CHECKSUM,
3110 (None, None))
3111 if self.strict_checking:
3112 self.assertIsNotNone(ticket_checksum)
3113 if ticket_checksum is not None:
3114 enc_part['authorization-data'] = auth_data
3115 enc_part = self.der_encode(enc_part,
3116 asn1Spec=krb5_asn1.EncTicketPart())
3118 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
3119 enc_part,
3120 ticket_ctype,
3121 ticket_checksum)
3123 def modified_ticket(self,
3124 ticket, *,
3125 new_ticket_key=None,
3126 modify_fn=None,
3127 modify_pac_fn=None,
3128 exclude_pac=False,
3129 update_pac_checksums=True,
3130 checksum_keys=None,
3131 include_checksums=None):
3132 if checksum_keys is None:
3133 # A dict containing a key for each checksum type to be created in
3134 # the PAC.
3135 checksum_keys = {}
3137 if include_checksums is None:
3138 # A dict containing a value for each checksum type; True if the
3139 # checksum type is to be included in the PAC, False if it is to be
3140 # excluded, or None/not present if the checksum is to be included
3141 # based on its presence in the original PAC.
3142 include_checksums = {}
3144 # Check that the values passed in by the caller make sense.
3146 self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types)
3147 self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types)
3149 if exclude_pac:
3150 self.assertIsNone(modify_pac_fn)
3152 update_pac_checksums = False
3154 if not update_pac_checksums:
3155 self.assertFalse(checksum_keys)
3156 self.assertFalse(include_checksums)
3158 expect_pac = update_pac_checksums or modify_pac_fn is not None
3160 key = ticket.decryption_key
3162 if new_ticket_key is None:
3163 # Use the same key to re-encrypt the ticket.
3164 new_ticket_key = key
3166 if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys:
3167 # If the server signature key is not present, fall back to the key
3168 # used to encrypt the ticket.
3169 checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key
3171 if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys:
3172 # If the ticket signature key is not present, fall back to the key
3173 # used for the KDC signature.
3174 kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
3175 if kdc_checksum_key is not None:
3176 checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = (
3177 kdc_checksum_key)
3179 # Decrypt the ticket.
3181 enc_part = ticket.ticket['enc-part']
3183 self.assertElementEqual(enc_part, 'etype', key.etype)
3184 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3186 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3187 enc_part = self.der_decode(
3188 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3190 # Modify the ticket here.
3191 if modify_fn is not None:
3192 enc_part = modify_fn(enc_part)
3194 auth_data = enc_part.get('authorization-data')
3195 if expect_pac:
3196 self.assertIsNotNone(auth_data)
3197 if auth_data is not None:
3198 new_pac = None
3199 if not exclude_pac:
3200 # Get a copy of the authdata with an empty PAC, and the
3201 # existing PAC (if present).
3202 empty_pac = self.get_empty_pac()
3203 empty_pac_auth_data, pac_data = self.replace_pac(auth_data,
3204 empty_pac)
3206 if expect_pac:
3207 self.assertIsNotNone(pac_data)
3208 if pac_data is not None:
3209 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
3211 # Modify the PAC here.
3212 if modify_pac_fn is not None:
3213 pac = modify_pac_fn(pac)
3215 if update_pac_checksums:
3216 # Get the enc-part with an empty PAC, which is needed
3217 # to create a ticket signature.
3218 enc_part_to_sign = enc_part.copy()
3219 enc_part_to_sign['authorization-data'] = (
3220 empty_pac_auth_data)
3221 enc_part_to_sign = self.der_encode(
3222 enc_part_to_sign,
3223 asn1Spec=krb5_asn1.EncTicketPart())
3225 self.update_pac_checksums(pac,
3226 checksum_keys,
3227 include_checksums,
3228 enc_part_to_sign)
3230 # Re-encode the PAC.
3231 pac_data = ndr_pack(pac)
3232 new_pac = self.AuthorizationData_create(AD_WIN2K_PAC,
3233 pac_data)
3235 # Replace the PAC in the authorization data and re-add it to the
3236 # ticket enc-part.
3237 auth_data, _ = self.replace_pac(auth_data, new_pac)
3238 enc_part['authorization-data'] = auth_data
3240 # Re-encrypt the ticket enc-part with the new key.
3241 enc_part_new = self.der_encode(enc_part,
3242 asn1Spec=krb5_asn1.EncTicketPart())
3243 enc_part_new = self.EncryptedData_create(new_ticket_key,
3244 KU_TICKET,
3245 enc_part_new)
3247 # Create a copy of the ticket with the new enc-part.
3248 new_ticket = ticket.ticket.copy()
3249 new_ticket['enc-part'] = enc_part_new
3251 new_ticket_creds = KerberosTicketCreds(
3252 new_ticket,
3253 session_key=ticket.session_key,
3254 crealm=ticket.crealm,
3255 cname=ticket.cname,
3256 srealm=ticket.srealm,
3257 sname=ticket.sname,
3258 decryption_key=new_ticket_key,
3259 ticket_private=enc_part,
3260 encpart_private=ticket.encpart_private)
3262 return new_ticket_creds
3264 def update_pac_checksums(self,
3265 pac,
3266 checksum_keys,
3267 include_checksums,
3268 enc_part=None):
3269 pac_buffers = pac.buffers
3270 checksum_buffers = {}
3272 # Find the relevant PAC checksum buffers.
3273 for pac_buffer in pac_buffers:
3274 buffer_type = pac_buffer.type
3275 if buffer_type in self.pac_checksum_types:
3276 self.assertNotIn(buffer_type, checksum_buffers,
3277 f'Duplicate checksum type {buffer_type}')
3279 checksum_buffers[buffer_type] = pac_buffer
3281 # Create any additional buffers that were requested but not
3282 # present. Conversely, remove any buffers that were requested to be
3283 # removed.
3284 for buffer_type in self.pac_checksum_types:
3285 if buffer_type in checksum_buffers:
3286 if include_checksums.get(buffer_type) is False:
3287 checksum_buffer = checksum_buffers.pop(buffer_type)
3289 pac.num_buffers -= 1
3290 pac_buffers.remove(checksum_buffer)
3292 elif include_checksums.get(buffer_type) is True:
3293 info = krb5pac.PAC_SIGNATURE_DATA()
3295 checksum_buffer = krb5pac.PAC_BUFFER()
3296 checksum_buffer.type = buffer_type
3297 checksum_buffer.info = info
3299 pac_buffers.append(checksum_buffer)
3300 pac.num_buffers += 1
3302 checksum_buffers[buffer_type] = checksum_buffer
3304 # Fill the relevant checksum buffers.
3305 for buffer_type, checksum_buffer in checksum_buffers.items():
3306 checksum_key = checksum_keys[buffer_type]
3307 ctype = checksum_key.ctype & ((1 << 32) - 1)
3309 if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3310 self.assertIsNotNone(enc_part)
3312 signature = checksum_key.make_rodc_checksum(
3313 KU_NON_KERB_CKSUM_SALT,
3314 enc_part)
3316 elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM:
3317 signature = checksum_key.make_zeroed_checksum()
3319 else:
3320 signature = checksum_key.make_rodc_zeroed_checksum()
3322 checksum_buffer.info.signature = signature
3323 checksum_buffer.info.type = ctype
3325 # Add the new checksum buffers to the PAC.
3326 pac.buffers = pac_buffers
3328 # Calculate the server and KDC checksums and insert them into the PAC.
3330 server_checksum_buffer = checksum_buffers.get(
3331 krb5pac.PAC_TYPE_SRV_CHECKSUM)
3332 if server_checksum_buffer is not None:
3333 server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM]
3335 pac_data = ndr_pack(pac)
3336 server_checksum = server_checksum_key.make_checksum(
3337 KU_NON_KERB_CKSUM_SALT,
3338 pac_data)
3340 server_checksum_buffer.info.signature = server_checksum
3342 kdc_checksum_buffer = checksum_buffers.get(
3343 krb5pac.PAC_TYPE_KDC_CHECKSUM)
3344 if kdc_checksum_buffer is not None:
3345 if server_checksum_buffer is None:
3346 # There's no server signature to make the checksum over, so
3347 # just make the checksum over an empty bytes object.
3348 server_checksum = bytes()
3350 kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM]
3352 kdc_checksum = kdc_checksum_key.make_rodc_checksum(
3353 KU_NON_KERB_CKSUM_SALT,
3354 server_checksum)
3356 kdc_checksum_buffer.info.signature = kdc_checksum
3358 def replace_pac(self, auth_data, new_pac, expect_pac=True):
3359 if new_pac is not None:
3360 self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)
3361 self.assertElementPresent(new_pac, 'ad-data')
3363 new_auth_data = []
3365 ad_relevant = None
3366 old_pac = None
3368 for authdata_elem in auth_data:
3369 if authdata_elem['ad-type'] == AD_IF_RELEVANT:
3370 ad_relevant = self.der_decode(
3371 authdata_elem['ad-data'],
3372 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3374 relevant_elems = []
3375 for relevant_elem in ad_relevant:
3376 if relevant_elem['ad-type'] == AD_WIN2K_PAC:
3377 self.assertIsNone(old_pac, 'Multiple PACs detected')
3378 old_pac = relevant_elem['ad-data']
3380 if new_pac is not None:
3381 relevant_elems.append(new_pac)
3382 else:
3383 relevant_elems.append(relevant_elem)
3384 if expect_pac:
3385 self.assertIsNotNone(old_pac, 'Expected PAC')
3387 ad_relevant = self.der_encode(
3388 relevant_elems,
3389 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3391 authdata_elem = self.AuthorizationData_create(AD_IF_RELEVANT,
3392 ad_relevant)
3394 new_auth_data.append(authdata_elem)
3396 if expect_pac:
3397 self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT')
3399 return new_auth_data, old_pac
3401 def get_krbtgt_checksum_key(self):
3402 krbtgt_creds = self.get_krbtgt_creds()
3403 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
3405 return {
3406 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
3409 def is_tgs(self, principal):
3410 name = principal['name-string'][0]
3411 return name in ('krbtgt', b'krbtgt')
3413 def get_empty_pac(self):
3414 return self.AuthorizationData_create(AD_WIN2K_PAC, bytes(1))
3416 def get_outer_pa_dict(self, kdc_exchange_dict):
3417 return self.get_pa_dict(kdc_exchange_dict['req_padata'])
3419 def get_fast_pa_dict(self, kdc_exchange_dict):
3420 req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
3422 if req_pa_dict:
3423 return req_pa_dict
3425 return self.get_outer_pa_dict(kdc_exchange_dict)
3427 def sent_fast(self, kdc_exchange_dict):
3428 outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
3430 return PADATA_FX_FAST in outer_pa_dict
3432 def sent_enc_challenge(self, kdc_exchange_dict):
3433 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
3435 return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
3437 def get_sent_pac_options(self, kdc_exchange_dict):
3438 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
3440 if PADATA_PAC_OPTIONS not in fast_pa_dict:
3441 return ''
3443 pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
3444 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
3445 pac_options = pac_options['options']
3447 # Mask out unsupported bits.
3448 pac_options, remaining = pac_options[:4], pac_options[4:]
3449 pac_options += '0' * len(remaining)
3451 return pac_options
3453 def get_krbtgt_sname(self):
3454 krbtgt_creds = self.get_krbtgt_creds()
3455 krbtgt_username = krbtgt_creds.get_username()
3456 krbtgt_realm = krbtgt_creds.get_realm()
3457 krbtgt_sname = self.PrincipalName_create(
3458 name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm])
3460 return krbtgt_sname
3462 def _test_as_exchange(self,
3463 cname,
3464 realm,
3465 sname,
3466 till,
3467 client_as_etypes,
3468 expected_error_mode,
3469 expected_crealm,
3470 expected_cname,
3471 expected_srealm,
3472 expected_sname,
3473 expected_salt,
3474 etypes,
3475 padata,
3476 kdc_options,
3477 expected_flags=None,
3478 unexpected_flags=None,
3479 expected_supported_etypes=None,
3480 preauth_key=None,
3481 ticket_decryption_key=None,
3482 pac_request=None,
3483 pac_options=None,
3484 to_rodc=False):
3486 def _generate_padata_copy(_kdc_exchange_dict,
3487 _callback_dict,
3488 req_body):
3489 return padata, req_body
3491 if not expected_error_mode:
3492 check_error_fn = None
3493 check_rep_fn = self.generic_check_kdc_rep
3494 else:
3495 check_error_fn = self.generic_check_kdc_error
3496 check_rep_fn = None
3498 if padata is not None:
3499 generate_padata_fn = _generate_padata_copy
3500 else:
3501 generate_padata_fn = None
3503 kdc_exchange_dict = self.as_exchange_dict(
3504 expected_crealm=expected_crealm,
3505 expected_cname=expected_cname,
3506 expected_srealm=expected_srealm,
3507 expected_sname=expected_sname,
3508 expected_supported_etypes=expected_supported_etypes,
3509 ticket_decryption_key=ticket_decryption_key,
3510 generate_padata_fn=generate_padata_fn,
3511 check_error_fn=check_error_fn,
3512 check_rep_fn=check_rep_fn,
3513 check_kdc_private_fn=self.generic_check_kdc_private,
3514 expected_error_mode=expected_error_mode,
3515 client_as_etypes=client_as_etypes,
3516 expected_salt=expected_salt,
3517 expected_flags=expected_flags,
3518 unexpected_flags=unexpected_flags,
3519 preauth_key=preauth_key,
3520 kdc_options=str(kdc_options),
3521 pac_request=pac_request,
3522 pac_options=pac_options,
3523 to_rodc=to_rodc)
3525 rep = self._generic_kdc_exchange(kdc_exchange_dict,
3526 cname=cname,
3527 realm=realm,
3528 sname=sname,
3529 till_time=till,
3530 etypes=etypes)
3532 return rep, kdc_exchange_dict