tests/krb5: Add parameter to enforce presence of ticket checksums
[Samba.git] / python / samba / tests / krb5 / raw_testcase.py
blob72a39b23b0e402378099bfe7794042e9b5081736
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import socket
21 import struct
22 import time
23 import datetime
24 import random
25 import binascii
26 import itertools
27 import collections
29 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
30 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
31 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
32 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
34 from pyasn1.codec.ber.encoder import BitStringEncoder
36 from samba.credentials import Credentials
37 from samba.dcerpc import krb5pac, security
38 from samba.gensec import FEATURE_SEAL
39 from samba.ndr import ndr_pack, ndr_unpack
41 import samba.tests
42 from samba.tests import TestCaseInTempDir
44 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
45 from samba.tests.krb5.rfc4120_constants import (
46 AD_IF_RELEVANT,
47 AD_WIN2K_PAC,
48 FX_FAST_ARMOR_AP_REQUEST,
49 KDC_ERR_GENERIC,
50 KDC_ERR_PREAUTH_FAILED,
51 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
52 KERB_ERR_TYPE_EXTENDED,
53 KRB_AP_REQ,
54 KRB_AS_REP,
55 KRB_AS_REQ,
56 KRB_ERROR,
57 KRB_TGS_REP,
58 KRB_TGS_REQ,
59 KU_AP_REQ_AUTH,
60 KU_AS_REP_ENC_PART,
61 KU_ENC_CHALLENGE_KDC,
62 KU_FAST_ENC,
63 KU_FAST_FINISHED,
64 KU_FAST_REP,
65 KU_FAST_REQ_CHKSUM,
66 KU_NON_KERB_CKSUM_SALT,
67 KU_TGS_REP_ENC_PART_SESSION,
68 KU_TGS_REP_ENC_PART_SUB_KEY,
69 KU_TGS_REQ_AUTH,
70 KU_TGS_REQ_AUTH_CKSUM,
71 KU_TGS_REQ_AUTH_DAT_SESSION,
72 KU_TGS_REQ_AUTH_DAT_SUBKEY,
73 KU_TICKET,
74 NT_SRV_INST,
75 NT_WELLKNOWN,
76 PADATA_ENCRYPTED_CHALLENGE,
77 PADATA_ENC_TIMESTAMP,
78 PADATA_ETYPE_INFO,
79 PADATA_ETYPE_INFO2,
80 PADATA_FOR_USER,
81 PADATA_FX_COOKIE,
82 PADATA_FX_ERROR,
83 PADATA_FX_FAST,
84 PADATA_KDC_REQ,
85 PADATA_PAC_OPTIONS,
86 PADATA_PAC_REQUEST,
87 PADATA_PK_AS_REQ,
88 PADATA_PK_AS_REP_19,
89 PADATA_SUPPORTED_ETYPES
91 import samba.tests.krb5.kcrypto as kcrypto
94 def BitStringEncoder_encodeValue32(
95 self, value, asn1Spec, encodeFun, **options):
97 # BitStrings like KDCOptions or TicketFlags should at least
98 # be 32-Bit on the wire
100 if asn1Spec is not None:
101 # TODO: try to avoid ASN.1 schema instantiation
102 value = asn1Spec.clone(value)
104 valueLength = len(value)
105 if valueLength % 8:
106 alignedValue = value << (8 - valueLength % 8)
107 else:
108 alignedValue = value
110 substrate = alignedValue.asOctets()
111 length = len(substrate)
112 # We need at least 32-Bit / 4-Bytes
113 if length < 4:
114 padding = 4 - length
115 else:
116 padding = 0
117 ret = b'\x00' + substrate + (b'\x00' * padding)
118 return ret, False, True
121 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
124 def BitString_NamedValues_prettyPrint(self, scope=0):
125 ret = "%s" % self.asBinary()
126 bits = []
127 highest_bit = 32
128 for byte in self.asNumbers():
129 for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
130 mask = 1 << bit
131 if byte & mask:
132 val = 1
133 else:
134 val = 0
135 bits.append(val)
136 if len(bits) < highest_bit:
137 for bitPosition in range(len(bits), highest_bit):
138 bits.append(0)
139 indent = " " * scope
140 delim = ": (\n%s " % indent
141 for bitPosition in range(highest_bit):
142 if bitPosition in self.prettyPrintNamedValues:
143 name = self.prettyPrintNamedValues[bitPosition]
144 elif bits[bitPosition] != 0:
145 name = "unknown-bit-%u" % bitPosition
146 else:
147 continue
148 ret += "%s%s:%u" % (delim, name, bits[bitPosition])
149 delim = ",\n%s " % indent
150 ret += "\n%s)" % indent
151 return ret
154 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
155 krb5_asn1.TicketFlagsValues.namedValues
156 krb5_asn1.TicketFlags.namedValues =\
157 krb5_asn1.TicketFlagsValues.namedValues
158 krb5_asn1.TicketFlags.prettyPrint =\
159 BitString_NamedValues_prettyPrint
160 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
161 krb5_asn1.KDCOptionsValues.namedValues
162 krb5_asn1.KDCOptions.namedValues =\
163 krb5_asn1.KDCOptionsValues.namedValues
164 krb5_asn1.KDCOptions.prettyPrint =\
165 BitString_NamedValues_prettyPrint
166 krb5_asn1.APOptions.prettyPrintNamedValues =\
167 krb5_asn1.APOptionsValues.namedValues
168 krb5_asn1.APOptions.namedValues =\
169 krb5_asn1.APOptionsValues.namedValues
170 krb5_asn1.APOptions.prettyPrint =\
171 BitString_NamedValues_prettyPrint
172 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
173 krb5_asn1.PACOptionFlagsValues.namedValues
174 krb5_asn1.PACOptionFlags.namedValues =\
175 krb5_asn1.PACOptionFlagsValues.namedValues
176 krb5_asn1.PACOptionFlags.prettyPrint =\
177 BitString_NamedValues_prettyPrint
180 def Integer_NamedValues_prettyPrint(self, scope=0):
181 intval = int(self)
182 if intval in self.prettyPrintNamedValues:
183 name = self.prettyPrintNamedValues[intval]
184 else:
185 name = "<__unknown__>"
186 ret = "%d (0x%x) %s" % (intval, intval, name)
187 return ret
190 krb5_asn1.NameType.prettyPrintNamedValues =\
191 krb5_asn1.NameTypeValues.namedValues
192 krb5_asn1.NameType.prettyPrint =\
193 Integer_NamedValues_prettyPrint
194 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
195 krb5_asn1.AuthDataTypeValues.namedValues
196 krb5_asn1.AuthDataType.prettyPrint =\
197 Integer_NamedValues_prettyPrint
198 krb5_asn1.PADataType.prettyPrintNamedValues =\
199 krb5_asn1.PADataTypeValues.namedValues
200 krb5_asn1.PADataType.prettyPrint =\
201 Integer_NamedValues_prettyPrint
202 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
203 krb5_asn1.EncryptionTypeValues.namedValues
204 krb5_asn1.EncryptionType.prettyPrint =\
205 Integer_NamedValues_prettyPrint
206 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
207 krb5_asn1.ChecksumTypeValues.namedValues
208 krb5_asn1.ChecksumType.prettyPrint =\
209 Integer_NamedValues_prettyPrint
210 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
211 krb5_asn1.KerbErrorDataTypeValues.namedValues
212 krb5_asn1.KerbErrorDataType.prettyPrint =\
213 Integer_NamedValues_prettyPrint
216 class Krb5EncryptionKey:
217 def __init__(self, key, kvno):
218 EncTypeChecksum = {
219 kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
220 kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
221 kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
223 self.key = key
224 self.etype = key.enctype
225 self.ctype = EncTypeChecksum[self.etype]
226 self.kvno = kvno
228 def encrypt(self, usage, plaintext):
229 ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
230 return ciphertext
232 def decrypt(self, usage, ciphertext):
233 plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
234 return plaintext
236 def make_zeroed_checksum(self, ctype=None):
237 if ctype is None:
238 ctype = self.ctype
240 checksum_len = kcrypto.checksum_len(ctype)
241 return bytes(checksum_len)
243 def make_checksum(self, usage, plaintext, ctype=None):
244 if ctype is None:
245 ctype = self.ctype
246 cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
247 return cksum
249 def verify_checksum(self, usage, plaintext, ctype, cksum):
250 if self.ctype != ctype:
251 raise AssertionError(f'{self.ctype} != {ctype}')
253 kcrypto.verify_checksum(ctype,
254 self.key,
255 usage,
256 plaintext,
257 cksum)
259 def export_obj(self):
260 EncryptionKey_obj = {
261 'keytype': self.etype,
262 'keyvalue': self.key.contents,
264 return EncryptionKey_obj
267 class RodcPacEncryptionKey(Krb5EncryptionKey):
268 def __init__(self, key, kvno, rodc_id=None):
269 super().__init__(key, kvno)
271 if rodc_id is None:
272 kvno = self.kvno
273 if kvno is not None:
274 kvno >>= 16
275 kvno &= (1 << 16) - 1
277 rodc_id = kvno or None
279 if rodc_id is not None:
280 self.rodc_id = rodc_id.to_bytes(2, byteorder='little')
281 else:
282 self.rodc_id = b''
284 def make_rodc_zeroed_checksum(self, ctype=None):
285 checksum = super().make_zeroed_checksum(ctype)
286 return checksum + bytes(len(self.rodc_id))
288 def make_rodc_checksum(self, usage, plaintext, ctype=None):
289 checksum = super().make_checksum(usage, plaintext, ctype)
290 return checksum + self.rodc_id
292 def verify_rodc_checksum(self, usage, plaintext, ctype, cksum):
293 if self.rodc_id:
294 cksum, cksum_rodc_id = cksum[:-2], cksum[-2:]
296 if self.rodc_id != cksum_rodc_id:
297 raise AssertionError(f'{self.rodc_id.hex()} != '
298 f'{cksum_rodc_id.hex()}')
300 super().verify_checksum(usage,
301 plaintext,
302 ctype,
303 cksum)
306 class ZeroedChecksumKey(RodcPacEncryptionKey):
307 def make_checksum(self, usage, plaintext, ctype=None):
308 return self.make_zeroed_checksum(ctype)
310 def make_rodc_checksum(self, usage, plaintext, ctype=None):
311 return self.make_rodc_zeroed_checksum(ctype)
314 class WrongLengthChecksumKey(RodcPacEncryptionKey):
315 def __init__(self, key, kvno, length):
316 super().__init__(key, kvno)
318 self._length = length
320 @classmethod
321 def _adjust_to_length(cls, checksum, length):
322 diff = length - len(checksum)
323 if diff > 0:
324 checksum += bytes(diff)
325 elif diff < 0:
326 checksum = checksum[:length]
328 return checksum
330 def make_zeroed_checksum(self, ctype=None):
331 return bytes(self._length)
333 def make_checksum(self, usage, plaintext, ctype=None):
334 checksum = super().make_checksum(usage, plaintext, ctype)
335 return self._adjust_to_length(checksum, self._length)
337 def make_rodc_zeroed_checksum(self, ctype=None):
338 return bytes(self._length)
340 def make_rodc_checksum(self, usage, plaintext, ctype=None):
341 checksum = super().make_rodc_checksum(usage, plaintext, ctype)
342 return self._adjust_to_length(checksum, self._length)
345 class KerberosCredentials(Credentials):
347 fast_supported_bits = (security.KERB_ENCTYPE_FAST_SUPPORTED |
348 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
349 security.KERB_ENCTYPE_CLAIMS_SUPPORTED)
351 def __init__(self):
352 super(KerberosCredentials, self).__init__()
353 all_enc_types = 0
354 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
355 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
356 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
358 self.as_supported_enctypes = all_enc_types
359 self.tgs_supported_enctypes = all_enc_types
360 self.ap_supported_enctypes = all_enc_types
362 self.kvno = None
363 self.forced_keys = {}
365 self.forced_salt = None
367 self.dn = None
368 self.spn = None
370 def set_as_supported_enctypes(self, value):
371 self.as_supported_enctypes = int(value)
373 def set_tgs_supported_enctypes(self, value):
374 self.tgs_supported_enctypes = int(value)
376 def set_ap_supported_enctypes(self, value):
377 self.ap_supported_enctypes = int(value)
379 etype_map = collections.OrderedDict([
380 (kcrypto.Enctype.AES256,
381 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96),
382 (kcrypto.Enctype.AES128,
383 security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96),
384 (kcrypto.Enctype.RC4,
385 security.KERB_ENCTYPE_RC4_HMAC_MD5),
386 (kcrypto.Enctype.DES_MD5,
387 security.KERB_ENCTYPE_DES_CBC_MD5),
388 (kcrypto.Enctype.DES_CRC,
389 security.KERB_ENCTYPE_DES_CBC_CRC)
392 @classmethod
393 def etypes_to_bits(cls, etypes):
394 bits = 0
395 for etype in etypes:
396 bit = cls.etype_map[etype]
397 if bits & bit:
398 raise ValueError(f'Got duplicate etype: {etype}')
399 bits |= bit
401 return bits
403 @classmethod
404 def bits_to_etypes(cls, bits):
405 etypes = ()
406 for etype, bit in cls.etype_map.items():
407 if bit & bits:
408 bits &= ~bit
409 etypes += (etype,)
411 bits &= ~cls.fast_supported_bits
412 if bits != 0:
413 raise ValueError(f'Unsupported etype bits: {bits}')
415 return etypes
417 def get_as_krb5_etypes(self):
418 return self.bits_to_etypes(self.as_supported_enctypes)
420 def get_tgs_krb5_etypes(self):
421 return self.bits_to_etypes(self.tgs_supported_enctypes)
423 def get_ap_krb5_etypes(self):
424 return self.bits_to_etypes(self.ap_supported_enctypes)
426 def set_kvno(self, kvno):
427 # Sign-extend from 32 bits.
428 if kvno & 1 << 31:
429 kvno |= -1 << 31
430 self.kvno = kvno
432 def get_kvno(self):
433 return self.kvno
435 def set_forced_key(self, etype, hexkey):
436 etype = int(etype)
437 contents = binascii.a2b_hex(hexkey)
438 key = kcrypto.Key(etype, contents)
439 self.forced_keys[etype] = RodcPacEncryptionKey(key, self.kvno)
441 def get_forced_key(self, etype):
442 etype = int(etype)
443 return self.forced_keys.get(etype)
445 def set_forced_salt(self, salt):
446 self.forced_salt = bytes(salt)
448 def get_forced_salt(self):
449 return self.forced_salt
451 def get_salt(self):
452 if self.forced_salt is not None:
453 return self.forced_salt
455 if self.get_workstation():
456 salt_string = '%shost%s.%s' % (
457 self.get_realm().upper(),
458 self.get_username().lower().rsplit('$', 1)[0],
459 self.get_realm().lower())
460 else:
461 salt_string = self.get_realm().upper() + self.get_username()
463 return salt_string.encode('utf-8')
465 def set_dn(self, dn):
466 self.dn = dn
468 def get_dn(self):
469 return self.dn
471 def set_spn(self, spn):
472 self.spn = spn
474 def get_spn(self):
475 return self.spn
478 class KerberosTicketCreds:
479 def __init__(self, ticket, session_key,
480 crealm=None, cname=None,
481 srealm=None, sname=None,
482 decryption_key=None,
483 ticket_private=None,
484 encpart_private=None):
485 self.ticket = ticket
486 self.session_key = session_key
487 self.crealm = crealm
488 self.cname = cname
489 self.srealm = srealm
490 self.sname = sname
491 self.decryption_key = decryption_key
492 self.ticket_private = ticket_private
493 self.encpart_private = encpart_private
496 class RawKerberosTest(TestCaseInTempDir):
497 """A raw Kerberos Test case."""
499 pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
500 krb5pac.PAC_TYPE_KDC_CHECKSUM,
501 krb5pac.PAC_TYPE_TICKET_CHECKSUM}
503 etypes_to_test = (
504 {"value": -1111, "name": "dummy", },
505 {"value": kcrypto.Enctype.AES256, "name": "aes128", },
506 {"value": kcrypto.Enctype.AES128, "name": "aes256", },
507 {"value": kcrypto.Enctype.RC4, "name": "rc4", },
510 setup_etype_test_permutations_done = False
512 @classmethod
513 def setup_etype_test_permutations(cls):
514 if cls.setup_etype_test_permutations_done:
515 return
517 res = []
519 num_idxs = len(cls.etypes_to_test)
520 permutations = []
521 for num in range(1, num_idxs + 1):
522 chunk = list(itertools.permutations(range(num_idxs), num))
523 for e in chunk:
524 el = list(e)
525 permutations.append(el)
527 for p in permutations:
528 name = None
529 etypes = ()
530 for idx in p:
531 n = cls.etypes_to_test[idx]["name"]
532 if name is None:
533 name = n
534 else:
535 name += "_%s" % n
536 etypes += (cls.etypes_to_test[idx]["value"],)
538 r = {"name": name, "etypes": etypes, }
539 res.append(r)
541 cls.etype_test_permutations = res
542 cls.setup_etype_test_permutations_done = True
544 @classmethod
545 def etype_test_permutation_name_idx(cls):
546 cls.setup_etype_test_permutations()
547 res = []
548 idx = 0
549 for e in cls.etype_test_permutations:
550 r = (e['name'], idx)
551 idx += 1
552 res.append(r)
553 return res
555 def etype_test_permutation_by_idx(self, idx):
556 e = self.etype_test_permutations[idx]
557 return (e['name'], e['etypes'])
559 @classmethod
560 def setUpClass(cls):
561 super().setUpClass()
563 cls.host = samba.tests.env_get_var_value('SERVER')
564 cls.dc_host = samba.tests.env_get_var_value('DC_SERVER')
566 # A dictionary containing credentials that have already been
567 # obtained.
568 cls.creds_dict = {}
570 cls.kdc_fast_support = False
572 def setUp(self):
573 super().setUp()
574 self.do_asn1_print = False
575 self.do_hexdump = False
577 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
578 allow_missing=True)
579 if strict_checking is None:
580 strict_checking = '1'
581 self.strict_checking = bool(int(strict_checking))
583 self.s = None
585 self.unspecified_kvno = object()
587 def tearDown(self):
588 self._disconnect("tearDown")
589 super().tearDown()
591 def _disconnect(self, reason):
592 if self.s is None:
593 return
594 self.s.close()
595 self.s = None
596 if self.do_hexdump:
597 sys.stderr.write("disconnect[%s]\n" % reason)
599 def _connect_tcp(self, host):
600 tcp_port = 88
601 try:
602 self.a = socket.getaddrinfo(host, tcp_port, socket.AF_UNSPEC,
603 socket.SOCK_STREAM, socket.SOL_TCP,
605 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
606 self.s.settimeout(10)
607 self.s.connect(self.a[0][4])
608 except socket.error:
609 self.s.close()
610 raise
611 except IOError:
612 self.s.close()
613 raise
615 def connect(self, host):
616 self.assertNotConnected()
617 self._connect_tcp(host)
618 if self.do_hexdump:
619 sys.stderr.write("connected[%s]\n" % host)
621 def env_get_var(self, varname, prefix,
622 fallback_default=True,
623 allow_missing=False):
624 val = None
625 if prefix is not None:
626 allow_missing_prefix = allow_missing or fallback_default
627 val = samba.tests.env_get_var_value(
628 '%s_%s' % (prefix, varname),
629 allow_missing=allow_missing_prefix)
630 else:
631 fallback_default = True
632 if val is None and fallback_default:
633 val = samba.tests.env_get_var_value(varname,
634 allow_missing=allow_missing)
635 return val
637 def _get_krb5_creds_from_env(self, prefix,
638 default_username=None,
639 allow_missing_password=False,
640 allow_missing_keys=True,
641 require_strongest_key=False):
642 c = KerberosCredentials()
643 c.guess()
645 domain = self.env_get_var('DOMAIN', prefix)
646 realm = self.env_get_var('REALM', prefix)
647 allow_missing_username = default_username is not None
648 username = self.env_get_var('USERNAME', prefix,
649 fallback_default=False,
650 allow_missing=allow_missing_username)
651 if username is None:
652 username = default_username
653 password = self.env_get_var('PASSWORD', prefix,
654 fallback_default=False,
655 allow_missing=allow_missing_password)
656 c.set_domain(domain)
657 c.set_realm(realm)
658 c.set_username(username)
659 if password is not None:
660 c.set_password(password)
661 as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
662 prefix, allow_missing=True)
663 if as_supported_enctypes is not None:
664 c.set_as_supported_enctypes(as_supported_enctypes)
665 tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
666 prefix, allow_missing=True)
667 if tgs_supported_enctypes is not None:
668 c.set_tgs_supported_enctypes(tgs_supported_enctypes)
669 ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
670 prefix, allow_missing=True)
671 if ap_supported_enctypes is not None:
672 c.set_ap_supported_enctypes(ap_supported_enctypes)
674 if require_strongest_key:
675 kvno_allow_missing = False
676 if password is None:
677 aes256_allow_missing = False
678 else:
679 aes256_allow_missing = True
680 else:
681 kvno_allow_missing = allow_missing_keys
682 aes256_allow_missing = allow_missing_keys
683 kvno = self.env_get_var('KVNO', prefix,
684 fallback_default=False,
685 allow_missing=kvno_allow_missing)
686 if kvno is not None:
687 c.set_kvno(kvno)
688 aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
689 fallback_default=False,
690 allow_missing=aes256_allow_missing)
691 if aes256_key is not None:
692 c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
693 aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
694 fallback_default=False,
695 allow_missing=True)
696 if aes128_key is not None:
697 c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
698 rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
699 fallback_default=False, allow_missing=True)
700 if rc4_key is not None:
701 c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
703 if not allow_missing_keys:
704 self.assertTrue(c.forced_keys,
705 'Please supply %s encryption keys '
706 'in environment' % prefix)
708 return c
710 def _get_krb5_creds(self,
711 prefix,
712 default_username=None,
713 allow_missing_password=False,
714 allow_missing_keys=True,
715 require_strongest_key=False,
716 fallback_creds_fn=None):
717 if prefix in self.creds_dict:
718 return self.creds_dict[prefix]
720 # We don't have the credentials already
721 creds = None
722 env_err = None
723 try:
724 # Try to obtain them from the environment
725 creds = self._get_krb5_creds_from_env(
726 prefix,
727 default_username=default_username,
728 allow_missing_password=allow_missing_password,
729 allow_missing_keys=allow_missing_keys,
730 require_strongest_key=require_strongest_key)
731 except Exception as err:
732 # An error occurred, so save it for later
733 env_err = err
734 else:
735 self.assertIsNotNone(creds)
736 # Save the obtained credentials
737 self.creds_dict[prefix] = creds
738 return creds
740 if fallback_creds_fn is not None:
741 try:
742 # Try to use the fallback method
743 creds = fallback_creds_fn()
744 except Exception as err:
745 print("ERROR FROM ENV: %r" % (env_err))
746 print("FALLBACK-FN: %s" % (fallback_creds_fn))
747 print("FALLBACK-ERROR: %r" % (err))
748 else:
749 self.assertIsNotNone(creds)
750 # Save the obtained credentials
751 self.creds_dict[prefix] = creds
752 return creds
754 # Both methods failed, so raise the exception from the
755 # environment method
756 raise env_err
758 def get_user_creds(self,
759 allow_missing_password=False,
760 allow_missing_keys=True):
761 c = self._get_krb5_creds(prefix=None,
762 allow_missing_password=allow_missing_password,
763 allow_missing_keys=allow_missing_keys)
764 return c
766 def get_service_creds(self,
767 allow_missing_password=False,
768 allow_missing_keys=True):
769 c = self._get_krb5_creds(prefix='SERVICE',
770 allow_missing_password=allow_missing_password,
771 allow_missing_keys=allow_missing_keys)
772 return c
774 def get_client_creds(self,
775 allow_missing_password=False,
776 allow_missing_keys=True):
777 c = self._get_krb5_creds(prefix='CLIENT',
778 allow_missing_password=allow_missing_password,
779 allow_missing_keys=allow_missing_keys)
780 return c
782 def get_server_creds(self,
783 allow_missing_password=False,
784 allow_missing_keys=True):
785 c = self._get_krb5_creds(prefix='SERVER',
786 allow_missing_password=allow_missing_password,
787 allow_missing_keys=allow_missing_keys)
788 return c
790 def get_admin_creds(self,
791 allow_missing_password=False,
792 allow_missing_keys=True):
793 c = self._get_krb5_creds(prefix='ADMIN',
794 allow_missing_password=allow_missing_password,
795 allow_missing_keys=allow_missing_keys)
796 c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
797 return c
799 def get_rodc_krbtgt_creds(self,
800 require_keys=True,
801 require_strongest_key=False):
802 if require_strongest_key:
803 self.assertTrue(require_keys)
804 c = self._get_krb5_creds(prefix='RODC_KRBTGT',
805 allow_missing_password=True,
806 allow_missing_keys=not require_keys,
807 require_strongest_key=require_strongest_key)
808 return c
810 def get_krbtgt_creds(self,
811 require_keys=True,
812 require_strongest_key=False):
813 if require_strongest_key:
814 self.assertTrue(require_keys)
815 c = self._get_krb5_creds(prefix='KRBTGT',
816 default_username='krbtgt',
817 allow_missing_password=True,
818 allow_missing_keys=not require_keys,
819 require_strongest_key=require_strongest_key)
820 return c
822 def get_anon_creds(self):
823 c = Credentials()
824 c.set_anonymous()
825 return c
827 def asn1_dump(self, name, obj, asn1_print=None):
828 if asn1_print is None:
829 asn1_print = self.do_asn1_print
830 if asn1_print:
831 if name is not None:
832 sys.stderr.write("%s:\n%s" % (name, obj))
833 else:
834 sys.stderr.write("%s" % (obj))
836 def hex_dump(self, name, blob, hexdump=None):
837 if hexdump is None:
838 hexdump = self.do_hexdump
839 if hexdump:
840 sys.stderr.write(
841 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
843 def der_decode(
844 self,
845 blob,
846 asn1Spec=None,
847 native_encode=True,
848 asn1_print=None,
849 hexdump=None):
850 if asn1Spec is not None:
851 class_name = type(asn1Spec).__name__.split(':')[0]
852 else:
853 class_name = "<None-asn1Spec>"
854 self.hex_dump(class_name, blob, hexdump=hexdump)
855 obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
856 self.asn1_dump(None, obj, asn1_print=asn1_print)
857 if native_encode:
858 obj = pyasn1_native_encode(obj)
859 return obj
861 def der_encode(
862 self,
863 obj,
864 asn1Spec=None,
865 native_decode=True,
866 asn1_print=None,
867 hexdump=None):
868 if native_decode:
869 obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
870 class_name = type(obj).__name__.split(':')[0]
871 if class_name is not None:
872 self.asn1_dump(None, obj, asn1_print=asn1_print)
873 blob = pyasn1_der_encode(obj)
874 if class_name is not None:
875 self.hex_dump(class_name, blob, hexdump=hexdump)
876 return blob
878 def send_pdu(self, req, asn1_print=None, hexdump=None):
879 try:
880 k5_pdu = self.der_encode(
881 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
882 header = struct.pack('>I', len(k5_pdu))
883 req_pdu = header
884 req_pdu += k5_pdu
885 self.hex_dump("send_pdu", header, hexdump=hexdump)
886 self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
887 while True:
888 sent = self.s.send(req_pdu, 0)
889 if sent == len(req_pdu):
890 break
891 req_pdu = req_pdu[sent:]
892 except socket.error as e:
893 self._disconnect("send_pdu: %s" % e)
894 raise
895 except IOError as e:
896 self._disconnect("send_pdu: %s" % e)
897 raise
899 def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
900 rep_pdu = None
901 try:
902 if timeout is not None:
903 self.s.settimeout(timeout)
904 rep_pdu = self.s.recv(num_recv, 0)
905 self.s.settimeout(10)
906 if len(rep_pdu) == 0:
907 self._disconnect("recv_raw: EOF")
908 return None
909 self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
910 except socket.timeout:
911 self.s.settimeout(10)
912 sys.stderr.write("recv_raw: TIMEOUT\n")
913 except socket.error as e:
914 self._disconnect("recv_raw: %s" % e)
915 raise
916 except IOError as e:
917 self._disconnect("recv_raw: %s" % e)
918 raise
919 return rep_pdu
921 def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
922 rep_pdu = None
923 rep = None
924 raw_pdu = self.recv_raw(
925 num_recv=4, hexdump=hexdump, timeout=timeout)
926 if raw_pdu is None:
927 return (None, None)
928 header = struct.unpack(">I", raw_pdu[0:4])
929 k5_len = header[0]
930 if k5_len == 0:
931 return (None, "")
932 missing = k5_len
933 rep_pdu = b''
934 while missing > 0:
935 raw_pdu = self.recv_raw(
936 num_recv=missing, hexdump=hexdump, timeout=timeout)
937 self.assertGreaterEqual(len(raw_pdu), 1)
938 rep_pdu += raw_pdu
939 missing = k5_len - len(rep_pdu)
940 k5_raw = self.der_decode(
941 rep_pdu,
942 asn1Spec=None,
943 native_encode=False,
944 asn1_print=False,
945 hexdump=False)
946 pvno = k5_raw['field-0']
947 self.assertEqual(pvno, 5)
948 msg_type = k5_raw['field-1']
949 self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
950 if msg_type == KRB_AS_REP:
951 asn1Spec = krb5_asn1.AS_REP()
952 elif msg_type == KRB_TGS_REP:
953 asn1Spec = krb5_asn1.TGS_REP()
954 elif msg_type == KRB_ERROR:
955 asn1Spec = krb5_asn1.KRB_ERROR()
956 rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
957 asn1_print=asn1_print, hexdump=False)
958 return (rep, rep_pdu)
960 def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
961 (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
962 hexdump=hexdump,
963 timeout=timeout)
964 return rep
966 def assertIsConnected(self):
967 self.assertIsNotNone(self.s, msg="Not connected")
969 def assertNotConnected(self):
970 self.assertIsNone(self.s, msg="Is connected")
972 def send_recv_transaction(
973 self,
974 req,
975 asn1_print=None,
976 hexdump=None,
977 timeout=None,
978 to_rodc=False):
979 host = self.host if to_rodc else self.dc_host
980 self.connect(host)
981 try:
982 self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
983 rep = self.recv_pdu(
984 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
985 except Exception:
986 self._disconnect("transaction failed")
987 raise
988 self._disconnect("transaction done")
989 return rep
991 def assertNoValue(self, value):
992 self.assertTrue(value.isNoValue)
994 def assertHasValue(self, value):
995 self.assertIsNotNone(value)
997 def getElementValue(self, obj, elem):
998 return obj.get(elem)
1000 def assertElementMissing(self, obj, elem):
1001 v = self.getElementValue(obj, elem)
1002 self.assertIsNone(v)
1004 def assertElementPresent(self, obj, elem, expect_empty=False):
1005 v = self.getElementValue(obj, elem)
1006 self.assertIsNotNone(v)
1007 if self.strict_checking:
1008 if isinstance(v, collections.abc.Container):
1009 if expect_empty:
1010 self.assertEqual(0, len(v))
1011 else:
1012 self.assertNotEqual(0, len(v))
1014 def assertElementEqual(self, obj, elem, value):
1015 v = self.getElementValue(obj, elem)
1016 self.assertIsNotNone(v)
1017 self.assertEqual(v, value)
1019 def assertElementEqualUTF8(self, obj, elem, value):
1020 v = self.getElementValue(obj, elem)
1021 self.assertIsNotNone(v)
1022 self.assertEqual(v, bytes(value, 'utf8'))
1024 def assertPrincipalEqual(self, princ1, princ2):
1025 self.assertEqual(princ1['name-type'], princ2['name-type'])
1026 self.assertEqual(
1027 len(princ1['name-string']),
1028 len(princ2['name-string']),
1029 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1030 for idx in range(len(princ1['name-string'])):
1031 self.assertEqual(
1032 princ1['name-string'][idx],
1033 princ2['name-string'][idx],
1034 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1036 def assertElementEqualPrincipal(self, obj, elem, value):
1037 v = self.getElementValue(obj, elem)
1038 self.assertIsNotNone(v)
1039 v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
1040 self.assertPrincipalEqual(v, value)
1042 def assertElementKVNO(self, obj, elem, value):
1043 v = self.getElementValue(obj, elem)
1044 if value == "autodetect":
1045 value = v
1046 if value is not None:
1047 self.assertIsNotNone(v)
1048 # The value on the wire should never be 0
1049 self.assertNotEqual(v, 0)
1050 # unspecified_kvno means we don't know the kvno,
1051 # but want to enforce its presence
1052 if value is not self.unspecified_kvno:
1053 value = int(value)
1054 self.assertNotEqual(value, 0)
1055 self.assertEqual(v, value)
1056 else:
1057 self.assertIsNone(v)
1059 def assertElementFlags(self, obj, elem, expected, unexpected):
1060 v = self.getElementValue(obj, elem)
1061 self.assertIsNotNone(v)
1062 if expected is not None:
1063 self.assertIsInstance(expected, krb5_asn1.TicketFlags)
1064 for i, flag in enumerate(expected):
1065 if flag == 1:
1066 self.assertEqual('1', v[i],
1067 f"'{expected.namedValues[i]}' "
1068 f"expected in {v}")
1069 if unexpected is not None:
1070 self.assertIsInstance(unexpected, krb5_asn1.TicketFlags)
1071 for i, flag in enumerate(unexpected):
1072 if flag == 1:
1073 self.assertEqual('0', v[i],
1074 f"'{unexpected.namedValues[i]}' "
1075 f"unexpected in {v}")
1077 def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
1078 if epoch is None:
1079 epoch = time.time()
1080 if offset is not None:
1081 epoch = epoch + int(offset)
1082 dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
1083 return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
1085 def get_KerberosTime(self, epoch=None, offset=None):
1086 (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
1087 return s
1089 def get_EpochFromKerberosTime(self, kerberos_time):
1090 if isinstance(kerberos_time, bytes):
1091 kerberos_time = kerberos_time.decode()
1093 epoch = datetime.datetime.strptime(kerberos_time,
1094 '%Y%m%d%H%M%SZ')
1095 epoch = epoch.replace(tzinfo=datetime.timezone.utc)
1096 epoch = int(epoch.timestamp())
1098 return epoch
1100 def get_Nonce(self):
1101 nonce_min = 0x7f000000
1102 nonce_max = 0x7fffffff
1103 v = random.randint(nonce_min, nonce_max)
1104 return v
1106 def get_pa_dict(self, pa_data):
1107 pa_dict = {}
1109 if pa_data is not None:
1110 for pa in pa_data:
1111 pa_type = pa['padata-type']
1112 if pa_type in pa_dict:
1113 raise RuntimeError(f'Duplicate type {pa_type}')
1114 pa_dict[pa_type] = pa['padata-value']
1116 return pa_dict
1118 def SessionKey_create(self, etype, contents, kvno=None):
1119 key = kcrypto.Key(etype, contents)
1120 return RodcPacEncryptionKey(key, kvno)
1122 def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None):
1123 self.assertIsNotNone(pwd)
1124 self.assertIsNotNone(salt)
1125 key = kcrypto.string_to_key(etype, pwd, salt)
1126 return RodcPacEncryptionKey(key, kvno)
1128 def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
1129 e = etype_info2['etype']
1131 salt = etype_info2.get('salt')
1133 if e == kcrypto.Enctype.RC4:
1134 nthash = creds.get_nt_hash()
1135 return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
1137 password = creds.get_password()
1138 return self.PasswordKey_create(
1139 etype=e, pwd=password, salt=salt, kvno=kvno)
1141 def TicketDecryptionKey_from_creds(self, creds, etype=None):
1143 if etype is None:
1144 etypes = creds.get_tgs_krb5_etypes()
1145 if etypes:
1146 etype = etypes[0]
1147 else:
1148 etype = kcrypto.Enctype.RC4
1150 forced_key = creds.get_forced_key(etype)
1151 if forced_key is not None:
1152 return forced_key
1154 kvno = creds.get_kvno()
1156 fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
1157 "nor a password specified, " % (
1158 creds.get_username(), etype, kvno))
1160 if etype == kcrypto.Enctype.RC4:
1161 nthash = creds.get_nt_hash()
1162 self.assertIsNotNone(nthash, msg=fail_msg)
1163 return self.SessionKey_create(etype=etype,
1164 contents=nthash,
1165 kvno=kvno)
1167 password = creds.get_password()
1168 self.assertIsNotNone(password, msg=fail_msg)
1169 salt = creds.get_salt()
1170 return self.PasswordKey_create(etype=etype,
1171 pwd=password,
1172 salt=salt,
1173 kvno=kvno)
1175 def RandomKey(self, etype):
1176 e = kcrypto._get_enctype_profile(etype)
1177 contents = samba.generate_random_bytes(e.keysize)
1178 return self.SessionKey_create(etype=etype, contents=contents)
1180 def EncryptionKey_import(self, EncryptionKey_obj):
1181 return self.SessionKey_create(EncryptionKey_obj['keytype'],
1182 EncryptionKey_obj['keyvalue'])
1184 def EncryptedData_create(self, key, usage, plaintext):
1185 # EncryptedData ::= SEQUENCE {
1186 # etype [0] Int32 -- EncryptionType --,
1187 # kvno [1] Int32 OPTIONAL,
1188 # cipher [2] OCTET STRING -- ciphertext
1190 ciphertext = key.encrypt(usage, plaintext)
1191 EncryptedData_obj = {
1192 'etype': key.etype,
1193 'cipher': ciphertext
1195 if key.kvno is not None:
1196 EncryptedData_obj['kvno'] = key.kvno
1197 return EncryptedData_obj
1199 def Checksum_create(self, key, usage, plaintext, ctype=None):
1200 # Checksum ::= SEQUENCE {
1201 # cksumtype [0] Int32,
1202 # checksum [1] OCTET STRING
1204 if ctype is None:
1205 ctype = key.ctype
1206 checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1207 Checksum_obj = {
1208 'cksumtype': ctype,
1209 'checksum': checksum,
1211 return Checksum_obj
1213 @classmethod
1214 def PrincipalName_create(cls, name_type, names):
1215 # PrincipalName ::= SEQUENCE {
1216 # name-type [0] Int32,
1217 # name-string [1] SEQUENCE OF KerberosString
1219 PrincipalName_obj = {
1220 'name-type': name_type,
1221 'name-string': names,
1223 return PrincipalName_obj
1225 def AuthorizationData_create(self, ad_type, ad_data):
1226 # AuthorizationData ::= SEQUENCE {
1227 # ad-type [0] Int32,
1228 # ad-data [1] OCTET STRING
1230 AUTH_DATA_obj = {
1231 'ad-type': ad_type,
1232 'ad-data': ad_data
1234 return AUTH_DATA_obj
1236 def PA_DATA_create(self, padata_type, padata_value):
1237 # PA-DATA ::= SEQUENCE {
1238 # -- NOTE: first tag is [1], not [0]
1239 # padata-type [1] Int32,
1240 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1242 PA_DATA_obj = {
1243 'padata-type': padata_type,
1244 'padata-value': padata_value,
1246 return PA_DATA_obj
1248 def PA_ENC_TS_ENC_create(self, ts, usec):
1249 # PA-ENC-TS-ENC ::= SEQUENCE {
1250 # patimestamp[0] KerberosTime, -- client's time
1251 # pausec[1] krb5int32 OPTIONAL
1253 PA_ENC_TS_ENC_obj = {
1254 'patimestamp': ts,
1255 'pausec': usec,
1257 return PA_ENC_TS_ENC_obj
1259 def PA_PAC_OPTIONS_create(self, options):
1260 # PA-PAC-OPTIONS ::= SEQUENCE {
1261 # options [0] PACOptionFlags
1263 PA_PAC_OPTIONS_obj = {
1264 'options': options
1266 return PA_PAC_OPTIONS_obj
1268 def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1269 # KrbFastArmor ::= SEQUENCE {
1270 # armor-type [0] Int32,
1271 # armor-value [1] OCTET STRING,
1272 # ...
1274 KRB_FAST_ARMOR_obj = {
1275 'armor-type': armor_type,
1276 'armor-value': armor_value
1278 return KRB_FAST_ARMOR_obj
1280 def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1281 # KrbFastReq ::= SEQUENCE {
1282 # fast-options [0] FastOptions,
1283 # padata [1] SEQUENCE OF PA-DATA,
1284 # req-body [2] KDC-REQ-BODY,
1285 # ...
1287 KRB_FAST_REQ_obj = {
1288 'fast-options': fast_options,
1289 'padata': padata,
1290 'req-body': req_body
1292 return KRB_FAST_REQ_obj
1294 def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1295 # KrbFastArmoredReq ::= SEQUENCE {
1296 # armor [0] KrbFastArmor OPTIONAL,
1297 # req-checksum [1] Checksum,
1298 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1300 KRB_FAST_ARMORED_REQ_obj = {
1301 'req-checksum': req_checksum,
1302 'enc-fast-req': enc_fast_req
1304 if armor is not None:
1305 KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1306 return KRB_FAST_ARMORED_REQ_obj
1308 def PA_FX_FAST_REQUEST_create(self, armored_data):
1309 # PA-FX-FAST-REQUEST ::= CHOICE {
1310 # armored-data [0] KrbFastArmoredReq,
1311 # ...
1313 PA_FX_FAST_REQUEST_obj = {
1314 'armored-data': armored_data
1316 return PA_FX_FAST_REQUEST_obj
1318 def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1319 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1320 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1321 # -- include PAC.
1322 # --If FALSE, and PAC present,
1323 # -- remove PAC.
1325 KERB_PA_PAC_REQUEST_obj = {
1326 'include-pac': include_pac,
1328 if not pa_data_create:
1329 return KERB_PA_PAC_REQUEST_obj
1330 pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1331 asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1332 pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1333 return pa_data
1335 def get_pa_pac_options(self, options):
1336 pac_options = self.PA_PAC_OPTIONS_create(options)
1337 pac_options = self.der_encode(pac_options,
1338 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
1339 pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options)
1341 return pac_options
1343 def KDC_REQ_BODY_create(self,
1344 kdc_options,
1345 cname,
1346 realm,
1347 sname,
1348 from_time,
1349 till_time,
1350 renew_time,
1351 nonce,
1352 etypes,
1353 addresses,
1354 additional_tickets,
1355 EncAuthorizationData,
1356 EncAuthorizationData_key,
1357 EncAuthorizationData_usage,
1358 asn1_print=None,
1359 hexdump=None):
1360 # KDC-REQ-BODY ::= SEQUENCE {
1361 # kdc-options [0] KDCOptions,
1362 # cname [1] PrincipalName OPTIONAL
1363 # -- Used only in AS-REQ --,
1364 # realm [2] Realm
1365 # -- Server's realm
1366 # -- Also client's in AS-REQ --,
1367 # sname [3] PrincipalName OPTIONAL,
1368 # from [4] KerberosTime OPTIONAL,
1369 # till [5] KerberosTime,
1370 # rtime [6] KerberosTime OPTIONAL,
1371 # nonce [7] UInt32,
1372 # etype [8] SEQUENCE OF Int32
1373 # -- EncryptionType
1374 # -- in preference order --,
1375 # addresses [9] HostAddresses OPTIONAL,
1376 # enc-authorization-data [10] EncryptedData OPTIONAL
1377 # -- AuthorizationData --,
1378 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1379 # -- NOTE: not empty
1381 if EncAuthorizationData is not None:
1382 enc_ad_plain = self.der_encode(
1383 EncAuthorizationData,
1384 asn1Spec=krb5_asn1.AuthorizationData(),
1385 asn1_print=asn1_print,
1386 hexdump=hexdump)
1387 enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1388 EncAuthorizationData_usage,
1389 enc_ad_plain)
1390 else:
1391 enc_ad = None
1392 KDC_REQ_BODY_obj = {
1393 'kdc-options': kdc_options,
1394 'realm': realm,
1395 'till': till_time,
1396 'nonce': nonce,
1397 'etype': etypes,
1399 if cname is not None:
1400 KDC_REQ_BODY_obj['cname'] = cname
1401 if sname is not None:
1402 KDC_REQ_BODY_obj['sname'] = sname
1403 if from_time is not None:
1404 KDC_REQ_BODY_obj['from'] = from_time
1405 if renew_time is not None:
1406 KDC_REQ_BODY_obj['rtime'] = renew_time
1407 if addresses is not None:
1408 KDC_REQ_BODY_obj['addresses'] = addresses
1409 if enc_ad is not None:
1410 KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1411 if additional_tickets is not None:
1412 KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1413 return KDC_REQ_BODY_obj
1415 def KDC_REQ_create(self,
1416 msg_type,
1417 padata,
1418 req_body,
1419 asn1Spec=None,
1420 asn1_print=None,
1421 hexdump=None):
1422 # KDC-REQ ::= SEQUENCE {
1423 # -- NOTE: first tag is [1], not [0]
1424 # pvno [1] INTEGER (5) ,
1425 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1426 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1427 # -- NOTE: not empty --,
1428 # req-body [4] KDC-REQ-BODY
1431 KDC_REQ_obj = {
1432 'pvno': 5,
1433 'msg-type': msg_type,
1434 'req-body': req_body,
1436 if padata is not None:
1437 KDC_REQ_obj['padata'] = padata
1438 if asn1Spec is not None:
1439 KDC_REQ_decoded = pyasn1_native_decode(
1440 KDC_REQ_obj, asn1Spec=asn1Spec)
1441 else:
1442 KDC_REQ_decoded = None
1443 return KDC_REQ_obj, KDC_REQ_decoded
1445 def AS_REQ_create(self,
1446 padata, # optional
1447 kdc_options, # required
1448 cname, # optional
1449 realm, # required
1450 sname, # optional
1451 from_time, # optional
1452 till_time, # required
1453 renew_time, # optional
1454 nonce, # required
1455 etypes, # required
1456 addresses, # optional
1457 additional_tickets,
1458 native_decoded_only=True,
1459 asn1_print=None,
1460 hexdump=None):
1461 # KDC-REQ ::= SEQUENCE {
1462 # -- NOTE: first tag is [1], not [0]
1463 # pvno [1] INTEGER (5) ,
1464 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1465 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1466 # -- NOTE: not empty --,
1467 # req-body [4] KDC-REQ-BODY
1470 # KDC-REQ-BODY ::= SEQUENCE {
1471 # kdc-options [0] KDCOptions,
1472 # cname [1] PrincipalName OPTIONAL
1473 # -- Used only in AS-REQ --,
1474 # realm [2] Realm
1475 # -- Server's realm
1476 # -- Also client's in AS-REQ --,
1477 # sname [3] PrincipalName OPTIONAL,
1478 # from [4] KerberosTime OPTIONAL,
1479 # till [5] KerberosTime,
1480 # rtime [6] KerberosTime OPTIONAL,
1481 # nonce [7] UInt32,
1482 # etype [8] SEQUENCE OF Int32
1483 # -- EncryptionType
1484 # -- in preference order --,
1485 # addresses [9] HostAddresses OPTIONAL,
1486 # enc-authorization-data [10] EncryptedData OPTIONAL
1487 # -- AuthorizationData --,
1488 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1489 # -- NOTE: not empty
1491 KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1492 kdc_options,
1493 cname,
1494 realm,
1495 sname,
1496 from_time,
1497 till_time,
1498 renew_time,
1499 nonce,
1500 etypes,
1501 addresses,
1502 additional_tickets,
1503 EncAuthorizationData=None,
1504 EncAuthorizationData_key=None,
1505 EncAuthorizationData_usage=None,
1506 asn1_print=asn1_print,
1507 hexdump=hexdump)
1508 obj, decoded = self.KDC_REQ_create(
1509 msg_type=KRB_AS_REQ,
1510 padata=padata,
1511 req_body=KDC_REQ_BODY_obj,
1512 asn1Spec=krb5_asn1.AS_REQ(),
1513 asn1_print=asn1_print,
1514 hexdump=hexdump)
1515 if native_decoded_only:
1516 return decoded
1517 return decoded, obj
1519 def AP_REQ_create(self, ap_options, ticket, authenticator):
1520 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1521 # pvno [0] INTEGER (5),
1522 # msg-type [1] INTEGER (14),
1523 # ap-options [2] APOptions,
1524 # ticket [3] Ticket,
1525 # authenticator [4] EncryptedData -- Authenticator
1527 AP_REQ_obj = {
1528 'pvno': 5,
1529 'msg-type': KRB_AP_REQ,
1530 'ap-options': ap_options,
1531 'ticket': ticket,
1532 'authenticator': authenticator,
1534 return AP_REQ_obj
1536 def Authenticator_create(
1537 self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1538 authorization_data):
1539 # -- Unencrypted authenticator
1540 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1541 # authenticator-vno [0] INTEGER (5),
1542 # crealm [1] Realm,
1543 # cname [2] PrincipalName,
1544 # cksum [3] Checksum OPTIONAL,
1545 # cusec [4] Microseconds,
1546 # ctime [5] KerberosTime,
1547 # subkey [6] EncryptionKey OPTIONAL,
1548 # seq-number [7] UInt32 OPTIONAL,
1549 # authorization-data [8] AuthorizationData OPTIONAL
1551 Authenticator_obj = {
1552 'authenticator-vno': 5,
1553 'crealm': crealm,
1554 'cname': cname,
1555 'cusec': cusec,
1556 'ctime': ctime,
1558 if cksum is not None:
1559 Authenticator_obj['cksum'] = cksum
1560 if subkey is not None:
1561 Authenticator_obj['subkey'] = subkey
1562 if seq_number is not None:
1563 Authenticator_obj['seq-number'] = seq_number
1564 if authorization_data is not None:
1565 Authenticator_obj['authorization-data'] = authorization_data
1566 return Authenticator_obj
1568 def TGS_REQ_create(self,
1569 padata, # optional
1570 cusec,
1571 ctime,
1572 ticket,
1573 kdc_options, # required
1574 cname, # optional
1575 realm, # required
1576 sname, # optional
1577 from_time, # optional
1578 till_time, # required
1579 renew_time, # optional
1580 nonce, # required
1581 etypes, # required
1582 addresses, # optional
1583 EncAuthorizationData,
1584 EncAuthorizationData_key,
1585 additional_tickets,
1586 ticket_session_key,
1587 authenticator_subkey=None,
1588 body_checksum_type=None,
1589 native_decoded_only=True,
1590 asn1_print=None,
1591 hexdump=None):
1592 # KDC-REQ ::= SEQUENCE {
1593 # -- NOTE: first tag is [1], not [0]
1594 # pvno [1] INTEGER (5) ,
1595 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1596 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1597 # -- NOTE: not empty --,
1598 # req-body [4] KDC-REQ-BODY
1601 # KDC-REQ-BODY ::= SEQUENCE {
1602 # kdc-options [0] KDCOptions,
1603 # cname [1] PrincipalName OPTIONAL
1604 # -- Used only in AS-REQ --,
1605 # realm [2] Realm
1606 # -- Server's realm
1607 # -- Also client's in AS-REQ --,
1608 # sname [3] PrincipalName OPTIONAL,
1609 # from [4] KerberosTime OPTIONAL,
1610 # till [5] KerberosTime,
1611 # rtime [6] KerberosTime OPTIONAL,
1612 # nonce [7] UInt32,
1613 # etype [8] SEQUENCE OF Int32
1614 # -- EncryptionType
1615 # -- in preference order --,
1616 # addresses [9] HostAddresses OPTIONAL,
1617 # enc-authorization-data [10] EncryptedData OPTIONAL
1618 # -- AuthorizationData --,
1619 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1620 # -- NOTE: not empty
1623 if authenticator_subkey is not None:
1624 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1625 else:
1626 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1628 req_body = self.KDC_REQ_BODY_create(
1629 kdc_options=kdc_options,
1630 cname=None,
1631 realm=realm,
1632 sname=sname,
1633 from_time=from_time,
1634 till_time=till_time,
1635 renew_time=renew_time,
1636 nonce=nonce,
1637 etypes=etypes,
1638 addresses=addresses,
1639 additional_tickets=additional_tickets,
1640 EncAuthorizationData=EncAuthorizationData,
1641 EncAuthorizationData_key=EncAuthorizationData_key,
1642 EncAuthorizationData_usage=EncAuthorizationData_usage)
1643 req_body_blob = self.der_encode(req_body,
1644 asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1645 asn1_print=asn1_print, hexdump=hexdump)
1647 req_body_checksum = self.Checksum_create(ticket_session_key,
1648 KU_TGS_REQ_AUTH_CKSUM,
1649 req_body_blob,
1650 ctype=body_checksum_type)
1652 subkey_obj = None
1653 if authenticator_subkey is not None:
1654 subkey_obj = authenticator_subkey.export_obj()
1655 seq_number = random.randint(0, 0xfffffffe)
1656 authenticator = self.Authenticator_create(
1657 crealm=realm,
1658 cname=cname,
1659 cksum=req_body_checksum,
1660 cusec=cusec,
1661 ctime=ctime,
1662 subkey=subkey_obj,
1663 seq_number=seq_number,
1664 authorization_data=None)
1665 authenticator = self.der_encode(
1666 authenticator,
1667 asn1Spec=krb5_asn1.Authenticator(),
1668 asn1_print=asn1_print,
1669 hexdump=hexdump)
1671 authenticator = self.EncryptedData_create(
1672 ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1674 ap_options = krb5_asn1.APOptions('0')
1675 ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1676 ticket=ticket,
1677 authenticator=authenticator)
1678 ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1679 asn1_print=asn1_print, hexdump=hexdump)
1680 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1681 if padata is not None:
1682 padata.append(pa_tgs_req)
1683 else:
1684 padata = [pa_tgs_req]
1686 obj, decoded = self.KDC_REQ_create(
1687 msg_type=KRB_TGS_REQ,
1688 padata=padata,
1689 req_body=req_body,
1690 asn1Spec=krb5_asn1.TGS_REQ(),
1691 asn1_print=asn1_print,
1692 hexdump=hexdump)
1693 if native_decoded_only:
1694 return decoded
1695 return decoded, obj
1697 def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1698 # PA-S4U2Self ::= SEQUENCE {
1699 # name [0] PrincipalName,
1700 # realm [1] Realm,
1701 # cksum [2] Checksum,
1702 # auth [3] GeneralString
1704 cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1705 for n in name['name-string']:
1706 cksum_data += n.encode()
1707 cksum_data += realm.encode()
1708 cksum_data += "Kerberos".encode()
1709 cksum = self.Checksum_create(tgt_session_key,
1710 KU_NON_KERB_CKSUM_SALT,
1711 cksum_data,
1712 ctype)
1714 PA_S4U2Self_obj = {
1715 'name': name,
1716 'realm': realm,
1717 'cksum': cksum,
1718 'auth': "Kerberos",
1720 pa_s4u2self = self.der_encode(
1721 PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
1722 return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
1724 def _generic_kdc_exchange(self,
1725 kdc_exchange_dict, # required
1726 cname=None, # optional
1727 realm=None, # required
1728 sname=None, # optional
1729 from_time=None, # optional
1730 till_time=None, # required
1731 renew_time=None, # optional
1732 etypes=None, # required
1733 addresses=None, # optional
1734 additional_tickets=None, # optional
1735 EncAuthorizationData=None, # optional
1736 EncAuthorizationData_key=None, # optional
1737 EncAuthorizationData_usage=None): # optional
1739 check_error_fn = kdc_exchange_dict['check_error_fn']
1740 check_rep_fn = kdc_exchange_dict['check_rep_fn']
1741 generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
1742 generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
1743 generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
1744 generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
1745 callback_dict = kdc_exchange_dict['callback_dict']
1746 req_msg_type = kdc_exchange_dict['req_msg_type']
1747 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
1748 rep_msg_type = kdc_exchange_dict['rep_msg_type']
1750 expected_error_mode = kdc_exchange_dict['expected_error_mode']
1751 kdc_options = kdc_exchange_dict['kdc_options']
1753 pac_request = kdc_exchange_dict['pac_request']
1754 pac_options = kdc_exchange_dict['pac_options']
1756 # Parameters specific to the inner request body
1757 inner_req = kdc_exchange_dict['inner_req']
1759 # Parameters specific to the outer request body
1760 outer_req = kdc_exchange_dict['outer_req']
1762 if till_time is None:
1763 till_time = self.get_KerberosTime(offset=36000)
1765 if 'nonce' in kdc_exchange_dict:
1766 nonce = kdc_exchange_dict['nonce']
1767 else:
1768 nonce = self.get_Nonce()
1769 kdc_exchange_dict['nonce'] = nonce
1771 req_body = self.KDC_REQ_BODY_create(
1772 kdc_options=kdc_options,
1773 cname=cname,
1774 realm=realm,
1775 sname=sname,
1776 from_time=from_time,
1777 till_time=till_time,
1778 renew_time=renew_time,
1779 nonce=nonce,
1780 etypes=etypes,
1781 addresses=addresses,
1782 additional_tickets=additional_tickets,
1783 EncAuthorizationData=EncAuthorizationData,
1784 EncAuthorizationData_key=EncAuthorizationData_key,
1785 EncAuthorizationData_usage=EncAuthorizationData_usage)
1787 inner_req_body = dict(req_body)
1788 if inner_req is not None:
1789 for key, value in inner_req.items():
1790 if value is not None:
1791 inner_req_body[key] = value
1792 else:
1793 del inner_req_body[key]
1794 if outer_req is not None:
1795 for key, value in outer_req.items():
1796 if value is not None:
1797 req_body[key] = value
1798 else:
1799 del req_body[key]
1801 additional_padata = []
1802 if pac_request is not None:
1803 pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request)
1804 additional_padata.append(pa_pac_request)
1805 if pac_options is not None:
1806 pa_pac_options = self.get_pa_pac_options(pac_options)
1807 additional_padata.append(pa_pac_options)
1809 if req_msg_type == KRB_AS_REQ:
1810 tgs_req = None
1811 tgs_req_padata = None
1812 else:
1813 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1815 tgs_req = self.generate_ap_req(kdc_exchange_dict,
1816 callback_dict,
1817 req_body,
1818 armor=False)
1819 tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
1821 if generate_fast_padata_fn is not None:
1822 self.assertIsNotNone(generate_fast_fn)
1823 # This can alter req_body...
1824 fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
1825 callback_dict,
1826 req_body)
1827 else:
1828 fast_padata = []
1830 if generate_fast_armor_fn is not None:
1831 self.assertIsNotNone(generate_fast_fn)
1832 fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
1833 callback_dict,
1834 req_body,
1835 armor=True)
1837 fast_armor_type = kdc_exchange_dict['fast_armor_type']
1838 fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
1839 fast_ap_req)
1840 else:
1841 fast_armor = None
1843 if generate_padata_fn is not None:
1844 # This can alter req_body...
1845 outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
1846 callback_dict,
1847 req_body)
1848 self.assertIsNotNone(outer_padata)
1849 self.assertNotIn(PADATA_KDC_REQ,
1850 [pa['padata-type'] for pa in outer_padata],
1851 'Don\'t create TGS-REQ manually')
1852 else:
1853 outer_padata = None
1855 if generate_fast_fn is not None:
1856 armor_key = kdc_exchange_dict['armor_key']
1857 self.assertIsNotNone(armor_key)
1859 if req_msg_type == KRB_AS_REQ:
1860 checksum_blob = self.der_encode(
1861 req_body,
1862 asn1Spec=krb5_asn1.KDC_REQ_BODY())
1863 else:
1864 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1865 checksum_blob = tgs_req
1867 checksum = self.Checksum_create(armor_key,
1868 KU_FAST_REQ_CHKSUM,
1869 checksum_blob)
1871 fast_padata += additional_padata
1872 fast = generate_fast_fn(kdc_exchange_dict,
1873 callback_dict,
1874 inner_req_body,
1875 fast_padata,
1876 fast_armor,
1877 checksum)
1878 else:
1879 fast = None
1881 padata = []
1883 if tgs_req_padata is not None:
1884 padata.append(tgs_req_padata)
1886 if fast is not None:
1887 padata.append(fast)
1889 if outer_padata is not None:
1890 padata += outer_padata
1892 if fast is None:
1893 padata += additional_padata
1895 if not padata:
1896 padata = None
1898 kdc_exchange_dict['req_padata'] = padata
1899 kdc_exchange_dict['fast_padata'] = fast_padata
1900 kdc_exchange_dict['req_body'] = inner_req_body
1902 req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
1903 padata=padata,
1904 req_body=req_body,
1905 asn1Spec=req_asn1Spec())
1907 to_rodc = kdc_exchange_dict['to_rodc']
1909 rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc)
1910 self.assertIsNotNone(rep)
1912 msg_type = self.getElementValue(rep, 'msg-type')
1913 self.assertIsNotNone(msg_type)
1915 expected_msg_type = None
1916 if check_error_fn is not None:
1917 expected_msg_type = KRB_ERROR
1918 self.assertIsNone(check_rep_fn)
1919 self.assertNotEqual(0, len(expected_error_mode))
1920 self.assertNotIn(0, expected_error_mode)
1921 if check_rep_fn is not None:
1922 expected_msg_type = rep_msg_type
1923 self.assertIsNone(check_error_fn)
1924 self.assertEqual(0, len(expected_error_mode))
1925 self.assertIsNotNone(expected_msg_type)
1926 self.assertEqual(msg_type, expected_msg_type)
1928 if msg_type == KRB_ERROR:
1929 return check_error_fn(kdc_exchange_dict,
1930 callback_dict,
1931 rep)
1933 return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
1935 def as_exchange_dict(self,
1936 expected_crealm=None,
1937 expected_cname=None,
1938 expected_anon=False,
1939 expected_srealm=None,
1940 expected_sname=None,
1941 expected_supported_etypes=None,
1942 expected_flags=None,
1943 unexpected_flags=None,
1944 ticket_decryption_key=None,
1945 expect_ticket_checksum=None,
1946 generate_fast_fn=None,
1947 generate_fast_armor_fn=None,
1948 generate_fast_padata_fn=None,
1949 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
1950 generate_padata_fn=None,
1951 check_error_fn=None,
1952 check_rep_fn=None,
1953 check_kdc_private_fn=None,
1954 callback_dict=None,
1955 expected_error_mode=0,
1956 expected_status=None,
1957 client_as_etypes=None,
1958 expected_salt=None,
1959 authenticator_subkey=None,
1960 preauth_key=None,
1961 armor_key=None,
1962 armor_tgt=None,
1963 armor_subkey=None,
1964 auth_data=None,
1965 kdc_options='',
1966 inner_req=None,
1967 outer_req=None,
1968 pac_request=None,
1969 pac_options=None,
1970 expect_edata=None,
1971 expect_pac=True,
1972 expect_claims=True,
1973 to_rodc=False):
1974 if expected_error_mode == 0:
1975 expected_error_mode = ()
1976 elif not isinstance(expected_error_mode, collections.abc.Container):
1977 expected_error_mode = (expected_error_mode,)
1979 kdc_exchange_dict = {
1980 'req_msg_type': KRB_AS_REQ,
1981 'req_asn1Spec': krb5_asn1.AS_REQ,
1982 'rep_msg_type': KRB_AS_REP,
1983 'rep_asn1Spec': krb5_asn1.AS_REP,
1984 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
1985 'expected_crealm': expected_crealm,
1986 'expected_cname': expected_cname,
1987 'expected_anon': expected_anon,
1988 'expected_srealm': expected_srealm,
1989 'expected_sname': expected_sname,
1990 'expected_supported_etypes': expected_supported_etypes,
1991 'expected_flags': expected_flags,
1992 'unexpected_flags': unexpected_flags,
1993 'ticket_decryption_key': ticket_decryption_key,
1994 'expect_ticket_checksum': expect_ticket_checksum,
1995 'generate_fast_fn': generate_fast_fn,
1996 'generate_fast_armor_fn': generate_fast_armor_fn,
1997 'generate_fast_padata_fn': generate_fast_padata_fn,
1998 'fast_armor_type': fast_armor_type,
1999 'generate_padata_fn': generate_padata_fn,
2000 'check_error_fn': check_error_fn,
2001 'check_rep_fn': check_rep_fn,
2002 'check_kdc_private_fn': check_kdc_private_fn,
2003 'callback_dict': callback_dict,
2004 'expected_error_mode': expected_error_mode,
2005 'expected_status': expected_status,
2006 'client_as_etypes': client_as_etypes,
2007 'expected_salt': expected_salt,
2008 'authenticator_subkey': authenticator_subkey,
2009 'preauth_key': preauth_key,
2010 'armor_key': armor_key,
2011 'armor_tgt': armor_tgt,
2012 'armor_subkey': armor_subkey,
2013 'auth_data': auth_data,
2014 'kdc_options': kdc_options,
2015 'inner_req': inner_req,
2016 'outer_req': outer_req,
2017 'pac_request': pac_request,
2018 'pac_options': pac_options,
2019 'expect_edata': expect_edata,
2020 'expect_pac': expect_pac,
2021 'expect_claims': expect_claims,
2022 'to_rodc': to_rodc
2024 if callback_dict is None:
2025 callback_dict = {}
2027 return kdc_exchange_dict
2029 def tgs_exchange_dict(self,
2030 expected_crealm=None,
2031 expected_cname=None,
2032 expected_anon=False,
2033 expected_srealm=None,
2034 expected_sname=None,
2035 expected_supported_etypes=None,
2036 expected_flags=None,
2037 unexpected_flags=None,
2038 ticket_decryption_key=None,
2039 expect_ticket_checksum=None,
2040 generate_fast_fn=None,
2041 generate_fast_armor_fn=None,
2042 generate_fast_padata_fn=None,
2043 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2044 generate_padata_fn=None,
2045 check_error_fn=None,
2046 check_rep_fn=None,
2047 check_kdc_private_fn=None,
2048 expected_error_mode=0,
2049 expected_status=None,
2050 callback_dict=None,
2051 tgt=None,
2052 armor_key=None,
2053 armor_tgt=None,
2054 armor_subkey=None,
2055 authenticator_subkey=None,
2056 auth_data=None,
2057 body_checksum_type=None,
2058 kdc_options='',
2059 inner_req=None,
2060 outer_req=None,
2061 pac_request=None,
2062 pac_options=None,
2063 expect_edata=None,
2064 expect_pac=True,
2065 expect_claims=True,
2066 expected_proxy_target=None,
2067 expected_transited_services=None,
2068 to_rodc=False):
2069 if expected_error_mode == 0:
2070 expected_error_mode = ()
2071 elif not isinstance(expected_error_mode, collections.abc.Container):
2072 expected_error_mode = (expected_error_mode,)
2074 kdc_exchange_dict = {
2075 'req_msg_type': KRB_TGS_REQ,
2076 'req_asn1Spec': krb5_asn1.TGS_REQ,
2077 'rep_msg_type': KRB_TGS_REP,
2078 'rep_asn1Spec': krb5_asn1.TGS_REP,
2079 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
2080 'expected_crealm': expected_crealm,
2081 'expected_cname': expected_cname,
2082 'expected_anon': expected_anon,
2083 'expected_srealm': expected_srealm,
2084 'expected_sname': expected_sname,
2085 'expected_supported_etypes': expected_supported_etypes,
2086 'expected_flags': expected_flags,
2087 'unexpected_flags': unexpected_flags,
2088 'ticket_decryption_key': ticket_decryption_key,
2089 'expect_ticket_checksum': expect_ticket_checksum,
2090 'generate_fast_fn': generate_fast_fn,
2091 'generate_fast_armor_fn': generate_fast_armor_fn,
2092 'generate_fast_padata_fn': generate_fast_padata_fn,
2093 'fast_armor_type': fast_armor_type,
2094 'generate_padata_fn': generate_padata_fn,
2095 'check_error_fn': check_error_fn,
2096 'check_rep_fn': check_rep_fn,
2097 'check_kdc_private_fn': check_kdc_private_fn,
2098 'callback_dict': callback_dict,
2099 'expected_error_mode': expected_error_mode,
2100 'expected_status': expected_status,
2101 'tgt': tgt,
2102 'body_checksum_type': body_checksum_type,
2103 'armor_key': armor_key,
2104 'armor_tgt': armor_tgt,
2105 'armor_subkey': armor_subkey,
2106 'auth_data': auth_data,
2107 'authenticator_subkey': authenticator_subkey,
2108 'kdc_options': kdc_options,
2109 'inner_req': inner_req,
2110 'outer_req': outer_req,
2111 'pac_request': pac_request,
2112 'pac_options': pac_options,
2113 'expect_edata': expect_edata,
2114 'expect_pac': expect_pac,
2115 'expect_claims': expect_claims,
2116 'expected_proxy_target': expected_proxy_target,
2117 'expected_transited_services': expected_transited_services,
2118 'to_rodc': to_rodc
2120 if callback_dict is None:
2121 callback_dict = {}
2123 return kdc_exchange_dict
2125 def generic_check_kdc_rep(self,
2126 kdc_exchange_dict,
2127 callback_dict,
2128 rep):
2130 expected_crealm = kdc_exchange_dict['expected_crealm']
2131 expected_anon = kdc_exchange_dict['expected_anon']
2132 expected_srealm = kdc_exchange_dict['expected_srealm']
2133 expected_sname = kdc_exchange_dict['expected_sname']
2134 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2135 check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
2136 rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
2137 msg_type = kdc_exchange_dict['rep_msg_type']
2138 armor_key = kdc_exchange_dict['armor_key']
2140 self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
2141 padata = self.getElementValue(rep, 'padata')
2142 if self.strict_checking:
2143 self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
2144 if expected_anon:
2145 expected_cname = self.PrincipalName_create(
2146 name_type=NT_WELLKNOWN,
2147 names=['WELLKNOWN', 'ANONYMOUS'])
2148 else:
2149 expected_cname = kdc_exchange_dict['expected_cname']
2150 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2151 self.assertElementPresent(rep, 'ticket')
2152 ticket = self.getElementValue(rep, 'ticket')
2153 ticket_encpart = None
2154 ticket_cipher = None
2155 self.assertIsNotNone(ticket)
2156 if ticket is not None: # Never None, but gives indentation
2157 self.assertElementEqual(ticket, 'tkt-vno', 5)
2158 self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
2159 self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
2160 self.assertElementPresent(ticket, 'enc-part')
2161 ticket_encpart = self.getElementValue(ticket, 'enc-part')
2162 self.assertIsNotNone(ticket_encpart)
2163 if ticket_encpart is not None: # Never None, but gives indentation
2164 self.assertElementPresent(ticket_encpart, 'etype')
2165 # 'unspecified' means present, with any value != 0
2166 self.assertElementKVNO(ticket_encpart, 'kvno',
2167 self.unspecified_kvno)
2168 self.assertElementPresent(ticket_encpart, 'cipher')
2169 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
2170 self.assertElementPresent(rep, 'enc-part')
2171 encpart = self.getElementValue(rep, 'enc-part')
2172 encpart_cipher = None
2173 self.assertIsNotNone(encpart)
2174 if encpart is not None: # Never None, but gives indentation
2175 self.assertElementPresent(encpart, 'etype')
2176 self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
2177 self.assertElementPresent(encpart, 'cipher')
2178 encpart_cipher = self.getElementValue(encpart, 'cipher')
2180 ticket_checksum = None
2182 # Get the decryption key for the encrypted part
2183 encpart_decryption_key, encpart_decryption_usage = (
2184 self.get_preauth_key(kdc_exchange_dict))
2186 if armor_key is not None:
2187 pa_dict = self.get_pa_dict(padata)
2189 if PADATA_FX_FAST in pa_dict:
2190 fx_fast_data = pa_dict[PADATA_FX_FAST]
2191 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
2192 fx_fast_data,
2193 armor_key,
2194 finished=True)
2196 if 'strengthen-key' in fast_response:
2197 strengthen_key = self.EncryptionKey_import(
2198 fast_response['strengthen-key'])
2199 encpart_decryption_key = (
2200 self.generate_strengthen_reply_key(
2201 strengthen_key,
2202 encpart_decryption_key))
2204 fast_finished = fast_response.get('finished')
2205 if fast_finished is not None:
2206 ticket_checksum = fast_finished['ticket-checksum']
2208 self.check_rep_padata(kdc_exchange_dict,
2209 callback_dict,
2210 fast_response['padata'],
2211 error_code=0)
2213 ticket_private = None
2214 if ticket_decryption_key is not None:
2215 self.assertElementEqual(ticket_encpart, 'etype',
2216 ticket_decryption_key.etype)
2217 self.assertElementKVNO(ticket_encpart, 'kvno',
2218 ticket_decryption_key.kvno)
2219 ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
2220 ticket_cipher)
2221 ticket_private = self.der_decode(
2222 ticket_decpart,
2223 asn1Spec=krb5_asn1.EncTicketPart())
2225 encpart_private = None
2226 self.assertIsNotNone(encpart_decryption_key)
2227 if encpart_decryption_key is not None:
2228 self.assertElementEqual(encpart, 'etype',
2229 encpart_decryption_key.etype)
2230 if self.strict_checking:
2231 self.assertElementKVNO(encpart, 'kvno',
2232 encpart_decryption_key.kvno)
2233 rep_decpart = encpart_decryption_key.decrypt(
2234 encpart_decryption_usage,
2235 encpart_cipher)
2236 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2237 # application tag 26
2238 try:
2239 encpart_private = self.der_decode(
2240 rep_decpart,
2241 asn1Spec=rep_encpart_asn1Spec())
2242 except Exception:
2243 encpart_private = self.der_decode(
2244 rep_decpart,
2245 asn1Spec=krb5_asn1.EncTGSRepPart())
2247 self.assertIsNotNone(check_kdc_private_fn)
2248 if check_kdc_private_fn is not None:
2249 check_kdc_private_fn(kdc_exchange_dict, callback_dict,
2250 rep, ticket_private, encpart_private,
2251 ticket_checksum)
2253 return rep
2255 def check_fx_fast_data(self,
2256 kdc_exchange_dict,
2257 fx_fast_data,
2258 armor_key,
2259 finished=False,
2260 expect_strengthen_key=True):
2261 fx_fast_data = self.der_decode(fx_fast_data,
2262 asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
2264 enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
2265 self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
2267 fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
2269 fast_response = self.der_decode(fast_rep,
2270 asn1Spec=krb5_asn1.KrbFastResponse())
2272 if expect_strengthen_key and self.strict_checking:
2273 self.assertIn('strengthen-key', fast_response)
2275 if finished:
2276 self.assertIn('finished', fast_response)
2278 # Ensure that the nonce matches the nonce in the body of the request
2279 # (RFC6113 5.4.3).
2280 nonce = kdc_exchange_dict['nonce']
2281 self.assertEqual(nonce, fast_response['nonce'])
2283 return fast_response
2285 def generic_check_kdc_private(self,
2286 kdc_exchange_dict,
2287 callback_dict,
2288 rep,
2289 ticket_private,
2290 encpart_private,
2291 ticket_checksum):
2292 kdc_options = kdc_exchange_dict['kdc_options']
2293 canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
2294 canonicalize = (canon_pos < len(kdc_options)
2295 and kdc_options[canon_pos] == '1')
2296 renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1
2297 renewable = (renewable_pos < len(kdc_options)
2298 and kdc_options[renewable_pos] == '1')
2300 expected_crealm = kdc_exchange_dict['expected_crealm']
2301 expected_cname = kdc_exchange_dict['expected_cname']
2302 expected_srealm = kdc_exchange_dict['expected_srealm']
2303 expected_sname = kdc_exchange_dict['expected_sname']
2304 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2306 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2308 expected_flags = kdc_exchange_dict.get('expected_flags')
2309 unexpected_flags = kdc_exchange_dict.get('unexpected_flags')
2311 ticket = self.getElementValue(rep, 'ticket')
2313 if ticket_checksum is not None:
2314 armor_key = kdc_exchange_dict['armor_key']
2315 self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
2317 to_rodc = kdc_exchange_dict['to_rodc']
2318 if to_rodc:
2319 krbtgt_creds = self.get_rodc_krbtgt_creds()
2320 else:
2321 krbtgt_creds = self.get_krbtgt_creds()
2322 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
2324 expect_pac = kdc_exchange_dict['expect_pac']
2326 ticket_session_key = None
2327 if ticket_private is not None:
2328 self.assertElementFlags(ticket_private, 'flags',
2329 expected_flags,
2330 unexpected_flags)
2331 self.assertElementPresent(ticket_private, 'key')
2332 ticket_key = self.getElementValue(ticket_private, 'key')
2333 self.assertIsNotNone(ticket_key)
2334 if ticket_key is not None: # Never None, but gives indentation
2335 self.assertElementPresent(ticket_key, 'keytype')
2336 self.assertElementPresent(ticket_key, 'keyvalue')
2337 ticket_session_key = self.EncryptionKey_import(ticket_key)
2338 self.assertElementEqualUTF8(ticket_private, 'crealm',
2339 expected_crealm)
2340 if self.strict_checking:
2341 self.assertElementEqualPrincipal(ticket_private, 'cname',
2342 expected_cname)
2343 self.assertElementPresent(ticket_private, 'transited')
2344 self.assertElementPresent(ticket_private, 'authtime')
2345 if self.strict_checking:
2346 self.assertElementPresent(ticket_private, 'starttime')
2347 self.assertElementPresent(ticket_private, 'endtime')
2348 if renewable:
2349 if self.strict_checking:
2350 self.assertElementPresent(ticket_private, 'renew-till')
2351 else:
2352 self.assertElementMissing(ticket_private, 'renew-till')
2353 if self.strict_checking:
2354 self.assertElementEqual(ticket_private, 'caddr', [])
2355 self.assertElementPresent(ticket_private, 'authorization-data',
2356 expect_empty=not expect_pac)
2358 if expect_pac:
2359 authorization_data = self.getElementValue(ticket_private,
2360 'authorization-data')
2361 pac_data = self.get_pac(authorization_data)
2363 self.check_pac_buffers(pac_data, kdc_exchange_dict)
2365 encpart_session_key = None
2366 if encpart_private is not None:
2367 self.assertElementPresent(encpart_private, 'key')
2368 encpart_key = self.getElementValue(encpart_private, 'key')
2369 self.assertIsNotNone(encpart_key)
2370 if encpart_key is not None: # Never None, but gives indentation
2371 self.assertElementPresent(encpart_key, 'keytype')
2372 self.assertElementPresent(encpart_key, 'keyvalue')
2373 encpart_session_key = self.EncryptionKey_import(encpart_key)
2374 self.assertElementPresent(encpart_private, 'last-req')
2375 self.assertElementEqual(encpart_private, 'nonce',
2376 kdc_exchange_dict['nonce'])
2377 if rep_msg_type == KRB_AS_REP:
2378 if self.strict_checking:
2379 self.assertElementPresent(encpart_private,
2380 'key-expiration')
2381 else:
2382 self.assertElementMissing(encpart_private,
2383 'key-expiration')
2384 self.assertElementFlags(encpart_private, 'flags',
2385 expected_flags,
2386 unexpected_flags)
2387 self.assertElementPresent(encpart_private, 'authtime')
2388 if self.strict_checking:
2389 self.assertElementPresent(encpart_private, 'starttime')
2390 self.assertElementPresent(encpart_private, 'endtime')
2391 if renewable:
2392 if self.strict_checking:
2393 self.assertElementPresent(encpart_private, 'renew-till')
2394 else:
2395 self.assertElementMissing(encpart_private, 'renew-till')
2396 self.assertElementEqualUTF8(encpart_private, 'srealm',
2397 expected_srealm)
2398 self.assertElementEqualPrincipal(encpart_private, 'sname',
2399 expected_sname)
2400 if self.strict_checking:
2401 self.assertElementEqual(encpart_private, 'caddr', [])
2403 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2405 if self.strict_checking:
2406 if canonicalize or '1' in sent_pac_options:
2407 self.assertElementPresent(encpart_private,
2408 'encrypted-pa-data')
2409 enc_pa_dict = self.get_pa_dict(
2410 encpart_private['encrypted-pa-data'])
2411 if canonicalize:
2412 self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2414 expected_supported_etypes = kdc_exchange_dict[
2415 'expected_supported_etypes']
2416 expected_supported_etypes |= (
2417 security.KERB_ENCTYPE_DES_CBC_CRC |
2418 security.KERB_ENCTYPE_DES_CBC_MD5 |
2419 security.KERB_ENCTYPE_RC4_HMAC_MD5)
2421 (supported_etypes,) = struct.unpack(
2422 '<L',
2423 enc_pa_dict[PADATA_SUPPORTED_ETYPES])
2425 self.assertEqual(supported_etypes,
2426 expected_supported_etypes)
2427 else:
2428 self.assertNotIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2430 if '1' in sent_pac_options:
2431 self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2433 pac_options = self.der_decode(
2434 enc_pa_dict[PADATA_PAC_OPTIONS],
2435 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2437 self.assertElementEqual(pac_options, 'options',
2438 sent_pac_options)
2439 else:
2440 self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2441 else:
2442 self.assertElementEqual(encpart_private,
2443 'encrypted-pa-data',
2446 if ticket_session_key is not None and encpart_session_key is not None:
2447 self.assertEqual(ticket_session_key.etype,
2448 encpart_session_key.etype)
2449 self.assertEqual(ticket_session_key.key.contents,
2450 encpart_session_key.key.contents)
2451 if encpart_session_key is not None:
2452 session_key = encpart_session_key
2453 else:
2454 session_key = ticket_session_key
2455 ticket_creds = KerberosTicketCreds(
2456 ticket,
2457 session_key,
2458 crealm=expected_crealm,
2459 cname=expected_cname,
2460 srealm=expected_srealm,
2461 sname=expected_sname,
2462 decryption_key=ticket_decryption_key,
2463 ticket_private=ticket_private,
2464 encpart_private=encpart_private)
2466 # TODO: This parameter should be removed when all service tickets are
2467 # issued with ticket checksums.
2468 expect_ticket_checksum = kdc_exchange_dict['expect_ticket_checksum']
2469 if expect_ticket_checksum:
2470 self.assertIsNotNone(ticket_decryption_key)
2472 if ticket_decryption_key is not None:
2473 self.verify_ticket(ticket_creds, krbtgt_key, expect_pac=expect_pac,
2474 expect_ticket_checksum=expect_ticket_checksum)
2476 kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
2478 def check_pac_buffers(self, pac_data, kdc_exchange_dict):
2479 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
2481 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2482 armor_tgt = kdc_exchange_dict['armor_tgt']
2484 expected_sname = kdc_exchange_dict['expected_sname']
2485 expect_claims = kdc_exchange_dict['expect_claims']
2487 expected_types = [krb5pac.PAC_TYPE_LOGON_INFO,
2488 krb5pac.PAC_TYPE_SRV_CHECKSUM,
2489 krb5pac.PAC_TYPE_KDC_CHECKSUM,
2490 krb5pac.PAC_TYPE_LOGON_NAME,
2491 krb5pac.PAC_TYPE_UPN_DNS_INFO]
2493 kdc_options = kdc_exchange_dict['kdc_options']
2494 pos = len(tuple(krb5_asn1.KDCOptions('cname-in-addl-tkt'))) - 1
2495 constrained_delegation = (pos < len(kdc_options)
2496 and kdc_options[pos] == '1')
2497 if constrained_delegation:
2498 expected_types.append(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION)
2500 if self.kdc_fast_support:
2501 if expect_claims:
2502 expected_types.append(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
2504 if (rep_msg_type == KRB_TGS_REP
2505 and armor_tgt is not None):
2506 expected_types.append(krb5pac.PAC_TYPE_DEVICE_INFO)
2507 expected_types.append(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO)
2509 if not self.is_tgs(expected_sname):
2510 expected_types.append(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
2512 if self.strict_checking:
2513 buffer_types = [pac_buffer.type
2514 for pac_buffer in pac.buffers]
2515 self.assertCountEqual(expected_types, buffer_types,
2516 f'expected: {expected_types} '
2517 f'got: {buffer_types}')
2519 for pac_buffer in pac.buffers:
2520 if pac_buffer.type == krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION:
2521 expected_proxy_target = kdc_exchange_dict[
2522 'expected_proxy_target']
2523 expected_transited_services = kdc_exchange_dict[
2524 'expected_transited_services']
2526 delegation_info = pac_buffer.info.info
2528 self.assertEqual(expected_proxy_target,
2529 str(delegation_info.proxy_target))
2531 transited_services = list(map(
2532 str, delegation_info.transited_services))
2533 self.assertEqual(expected_transited_services,
2534 transited_services)
2536 def generic_check_kdc_error(self,
2537 kdc_exchange_dict,
2538 callback_dict,
2539 rep,
2540 inner=False):
2542 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2544 expected_anon = kdc_exchange_dict['expected_anon']
2545 expected_srealm = kdc_exchange_dict['expected_srealm']
2546 expected_sname = kdc_exchange_dict['expected_sname']
2547 expected_error_mode = kdc_exchange_dict['expected_error_mode']
2549 sent_fast = self.sent_fast(kdc_exchange_dict)
2551 fast_armor_type = kdc_exchange_dict['fast_armor_type']
2553 self.assertElementEqual(rep, 'pvno', 5)
2554 self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
2555 error_code = self.getElementValue(rep, 'error-code')
2556 self.assertIn(error_code, expected_error_mode)
2557 if self.strict_checking:
2558 self.assertElementMissing(rep, 'ctime')
2559 self.assertElementMissing(rep, 'cusec')
2560 self.assertElementPresent(rep, 'stime')
2561 self.assertElementPresent(rep, 'susec')
2562 # error-code checked above
2563 if self.strict_checking:
2564 self.assertElementMissing(rep, 'crealm')
2565 if expected_anon and not inner:
2566 expected_cname = self.PrincipalName_create(
2567 name_type=NT_WELLKNOWN,
2568 names=['WELLKNOWN', 'ANONYMOUS'])
2569 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2570 else:
2571 self.assertElementMissing(rep, 'cname')
2572 self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
2573 self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
2574 self.assertElementMissing(rep, 'e-text')
2575 expected_status = kdc_exchange_dict['expected_status']
2576 expect_edata = kdc_exchange_dict['expect_edata']
2577 if expect_edata is None:
2578 expect_edata = (error_code != KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
2579 and (not sent_fast or fast_armor_type is None
2580 or fast_armor_type == FX_FAST_ARMOR_AP_REQUEST)
2581 and not inner)
2582 if not expect_edata:
2583 self.assertIsNone(expected_status)
2584 self.assertElementMissing(rep, 'e-data')
2585 return rep
2586 edata = self.getElementValue(rep, 'e-data')
2587 if self.strict_checking:
2588 self.assertIsNotNone(edata)
2589 if edata is not None:
2590 if rep_msg_type == KRB_TGS_REP and not sent_fast:
2591 error_data = self.der_decode(
2592 edata,
2593 asn1Spec=krb5_asn1.KERB_ERROR_DATA())
2594 self.assertEqual(KERB_ERR_TYPE_EXTENDED,
2595 error_data['data-type'])
2597 extended_error = error_data['data-value']
2599 self.assertEqual(12, len(extended_error))
2601 status = int.from_bytes(extended_error[:4], 'little')
2602 flags = int.from_bytes(extended_error[8:], 'little')
2604 self.assertEqual(expected_status, status)
2606 self.assertEqual(3, flags)
2607 else:
2608 self.assertIsNone(expected_status)
2610 rep_padata = self.der_decode(edata,
2611 asn1Spec=krb5_asn1.METHOD_DATA())
2612 self.assertGreater(len(rep_padata), 0)
2614 if sent_fast:
2615 self.assertEqual(1, len(rep_padata))
2616 rep_pa_dict = self.get_pa_dict(rep_padata)
2617 self.assertIn(PADATA_FX_FAST, rep_pa_dict)
2619 armor_key = kdc_exchange_dict['armor_key']
2620 self.assertIsNotNone(armor_key)
2621 fast_response = self.check_fx_fast_data(
2622 kdc_exchange_dict,
2623 rep_pa_dict[PADATA_FX_FAST],
2624 armor_key,
2625 expect_strengthen_key=False)
2627 rep_padata = fast_response['padata']
2629 etype_info2 = self.check_rep_padata(kdc_exchange_dict,
2630 callback_dict,
2631 rep_padata,
2632 error_code)
2634 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
2636 return rep
2638 def check_rep_padata(self,
2639 kdc_exchange_dict,
2640 callback_dict,
2641 rep_padata,
2642 error_code):
2643 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2645 req_body = kdc_exchange_dict['req_body']
2646 proposed_etypes = req_body['etype']
2647 client_as_etypes = kdc_exchange_dict.get('client_as_etypes', [])
2649 sent_fast = self.sent_fast(kdc_exchange_dict)
2650 sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict)
2652 if rep_msg_type == KRB_TGS_REP:
2653 self.assertTrue(sent_fast)
2655 expect_etype_info2 = ()
2656 expect_etype_info = False
2657 unexpect_etype_info = True
2658 expected_aes_type = 0
2659 expected_rc4_type = 0
2660 if kcrypto.Enctype.RC4 in proposed_etypes:
2661 expect_etype_info = True
2662 for etype in proposed_etypes:
2663 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2664 expect_etype_info = False
2665 if etype not in client_as_etypes:
2666 continue
2667 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2668 if etype > expected_aes_type:
2669 expected_aes_type = etype
2670 if etype in (kcrypto.Enctype.RC4,) and error_code != 0:
2671 unexpect_etype_info = False
2672 if etype > expected_rc4_type:
2673 expected_rc4_type = etype
2675 if expected_aes_type != 0:
2676 expect_etype_info2 += (expected_aes_type,)
2677 if expected_rc4_type != 0:
2678 expect_etype_info2 += (expected_rc4_type,)
2680 expected_patypes = ()
2681 if sent_fast and error_code != 0:
2682 expected_patypes += (PADATA_FX_ERROR,)
2683 expected_patypes += (PADATA_FX_COOKIE,)
2685 if rep_msg_type == KRB_TGS_REP:
2686 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2687 if ('1' in sent_pac_options
2688 and error_code not in (0, KDC_ERR_GENERIC)):
2689 expected_patypes += (PADATA_PAC_OPTIONS,)
2690 elif error_code != KDC_ERR_GENERIC:
2691 if expect_etype_info:
2692 self.assertGreater(len(expect_etype_info2), 0)
2693 expected_patypes += (PADATA_ETYPE_INFO,)
2694 if len(expect_etype_info2) != 0:
2695 expected_patypes += (PADATA_ETYPE_INFO2,)
2697 if error_code != KDC_ERR_PREAUTH_FAILED:
2698 if sent_fast:
2699 expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,)
2700 else:
2701 expected_patypes += (PADATA_ENC_TIMESTAMP,)
2703 if not sent_enc_challenge:
2704 expected_patypes += (PADATA_PK_AS_REQ,)
2705 expected_patypes += (PADATA_PK_AS_REP_19,)
2707 if (self.kdc_fast_support
2708 and not sent_fast
2709 and not sent_enc_challenge):
2710 expected_patypes += (PADATA_FX_FAST,)
2711 expected_patypes += (PADATA_FX_COOKIE,)
2713 if self.strict_checking:
2714 for i, patype in enumerate(expected_patypes):
2715 self.assertElementEqual(rep_padata[i], 'padata-type', patype)
2716 self.assertEqual(len(rep_padata), len(expected_patypes))
2718 etype_info2 = None
2719 etype_info = None
2720 enc_timestamp = None
2721 enc_challenge = None
2722 pk_as_req = None
2723 pk_as_rep19 = None
2724 fast_cookie = None
2725 fast_error = None
2726 fx_fast = None
2727 pac_options = None
2728 for pa in rep_padata:
2729 patype = self.getElementValue(pa, 'padata-type')
2730 pavalue = self.getElementValue(pa, 'padata-value')
2731 if patype == PADATA_ETYPE_INFO2:
2732 self.assertIsNone(etype_info2)
2733 etype_info2 = self.der_decode(pavalue,
2734 asn1Spec=krb5_asn1.ETYPE_INFO2())
2735 continue
2736 if patype == PADATA_ETYPE_INFO:
2737 self.assertIsNone(etype_info)
2738 etype_info = self.der_decode(pavalue,
2739 asn1Spec=krb5_asn1.ETYPE_INFO())
2740 continue
2741 if patype == PADATA_ENC_TIMESTAMP:
2742 self.assertIsNone(enc_timestamp)
2743 enc_timestamp = pavalue
2744 self.assertEqual(len(enc_timestamp), 0)
2745 continue
2746 if patype == PADATA_ENCRYPTED_CHALLENGE:
2747 self.assertIsNone(enc_challenge)
2748 enc_challenge = pavalue
2749 continue
2750 if patype == PADATA_PK_AS_REQ:
2751 self.assertIsNone(pk_as_req)
2752 pk_as_req = pavalue
2753 self.assertEqual(len(pk_as_req), 0)
2754 continue
2755 if patype == PADATA_PK_AS_REP_19:
2756 self.assertIsNone(pk_as_rep19)
2757 pk_as_rep19 = pavalue
2758 self.assertEqual(len(pk_as_rep19), 0)
2759 continue
2760 if patype == PADATA_FX_COOKIE:
2761 self.assertIsNone(fast_cookie)
2762 fast_cookie = pavalue
2763 self.assertIsNotNone(fast_cookie)
2764 continue
2765 if patype == PADATA_FX_ERROR:
2766 self.assertIsNone(fast_error)
2767 fast_error = pavalue
2768 self.assertIsNotNone(fast_error)
2769 continue
2770 if patype == PADATA_FX_FAST:
2771 self.assertIsNone(fx_fast)
2772 fx_fast = pavalue
2773 self.assertEqual(len(fx_fast), 0)
2774 continue
2775 if patype == PADATA_PAC_OPTIONS:
2776 self.assertIsNone(pac_options)
2777 pac_options = self.der_decode(
2778 pavalue,
2779 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2780 continue
2782 if fast_cookie is not None:
2783 kdc_exchange_dict['fast_cookie'] = fast_cookie
2785 if fast_error is not None:
2786 fast_error = self.der_decode(fast_error,
2787 asn1Spec=krb5_asn1.KRB_ERROR())
2788 self.generic_check_kdc_error(kdc_exchange_dict,
2789 callback_dict,
2790 fast_error,
2791 inner=True)
2793 if pac_options is not None:
2794 self.assertElementEqual(pac_options, 'options', sent_pac_options)
2796 if enc_challenge is not None:
2797 if not sent_enc_challenge:
2798 self.assertEqual(len(enc_challenge), 0)
2799 else:
2800 armor_key = kdc_exchange_dict['armor_key']
2801 self.assertIsNotNone(armor_key)
2803 preauth_key, _ = self.get_preauth_key(kdc_exchange_dict)
2805 kdc_challenge_key = self.generate_kdc_challenge_key(
2806 armor_key, preauth_key)
2808 # Ensure that the encrypted challenge FAST factor is supported
2809 # (RFC6113 5.4.6).
2810 if self.strict_checking:
2811 self.assertNotEqual(len(enc_challenge), 0)
2812 if len(enc_challenge) != 0:
2813 encrypted_challenge = self.der_decode(
2814 enc_challenge,
2815 asn1Spec=krb5_asn1.EncryptedData())
2816 self.assertEqual(encrypted_challenge['etype'],
2817 kdc_challenge_key.etype)
2819 challenge = kdc_challenge_key.decrypt(
2820 KU_ENC_CHALLENGE_KDC,
2821 encrypted_challenge['cipher'])
2822 challenge = self.der_decode(
2823 challenge,
2824 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
2826 # Retrieve the returned timestamp.
2827 rep_patime = challenge['patimestamp']
2828 self.assertIn('pausec', challenge)
2830 # Ensure the returned time is within five minutes of the
2831 # current time.
2832 rep_time = self.get_EpochFromKerberosTime(rep_patime)
2833 current_time = time.time()
2835 self.assertLess(current_time - 300, rep_time)
2836 self.assertLess(rep_time, current_time + 300)
2838 if all(etype not in client_as_etypes or etype not in proposed_etypes
2839 for etype in (kcrypto.Enctype.AES256,
2840 kcrypto.Enctype.AES128,
2841 kcrypto.Enctype.RC4)):
2842 self.assertIsNone(etype_info2)
2843 self.assertIsNone(etype_info)
2844 if rep_msg_type == KRB_AS_REP:
2845 if self.strict_checking:
2846 if sent_fast:
2847 self.assertIsNotNone(enc_challenge)
2848 self.assertIsNone(enc_timestamp)
2849 else:
2850 self.assertIsNotNone(enc_timestamp)
2851 self.assertIsNone(enc_challenge)
2852 self.assertIsNotNone(pk_as_req)
2853 self.assertIsNotNone(pk_as_rep19)
2854 else:
2855 self.assertIsNone(enc_timestamp)
2856 self.assertIsNone(enc_challenge)
2857 self.assertIsNone(pk_as_req)
2858 self.assertIsNone(pk_as_rep19)
2859 return None
2861 if error_code != KDC_ERR_GENERIC:
2862 if self.strict_checking:
2863 self.assertIsNotNone(etype_info2)
2864 else:
2865 self.assertIsNone(etype_info2)
2866 if expect_etype_info:
2867 self.assertIsNotNone(etype_info)
2868 else:
2869 if self.strict_checking:
2870 self.assertIsNone(etype_info)
2871 if unexpect_etype_info:
2872 self.assertIsNone(etype_info)
2874 if error_code != KDC_ERR_GENERIC and self.strict_checking:
2875 self.assertGreaterEqual(len(etype_info2), 1)
2876 self.assertEqual(len(etype_info2), len(expect_etype_info2))
2877 for i in range(0, len(etype_info2)):
2878 e = self.getElementValue(etype_info2[i], 'etype')
2879 self.assertEqual(e, expect_etype_info2[i])
2880 salt = self.getElementValue(etype_info2[i], 'salt')
2881 if e == kcrypto.Enctype.RC4:
2882 self.assertIsNone(salt)
2883 else:
2884 self.assertIsNotNone(salt)
2885 expected_salt = kdc_exchange_dict['expected_salt']
2886 if expected_salt is not None:
2887 self.assertEqual(salt, expected_salt)
2888 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
2889 if self.strict_checking:
2890 self.assertIsNone(s2kparams)
2891 if etype_info is not None:
2892 self.assertEqual(len(etype_info), 1)
2893 e = self.getElementValue(etype_info[0], 'etype')
2894 self.assertEqual(e, kcrypto.Enctype.RC4)
2895 self.assertEqual(e, expect_etype_info2[0])
2896 salt = self.getElementValue(etype_info[0], 'salt')
2897 if self.strict_checking:
2898 self.assertIsNotNone(salt)
2899 self.assertEqual(len(salt), 0)
2901 if error_code not in (KDC_ERR_PREAUTH_FAILED,
2902 KDC_ERR_GENERIC):
2903 if sent_fast:
2904 self.assertIsNotNone(enc_challenge)
2905 if self.strict_checking:
2906 self.assertIsNone(enc_timestamp)
2907 else:
2908 self.assertIsNotNone(enc_timestamp)
2909 if self.strict_checking:
2910 self.assertIsNone(enc_challenge)
2911 if not sent_enc_challenge:
2912 if self.strict_checking:
2913 self.assertIsNotNone(pk_as_req)
2914 self.assertIsNotNone(pk_as_rep19)
2915 else:
2916 self.assertIsNone(pk_as_req)
2917 self.assertIsNone(pk_as_rep19)
2918 else:
2919 if self.strict_checking:
2920 self.assertIsNone(enc_timestamp)
2921 self.assertIsNone(enc_challenge)
2922 self.assertIsNone(pk_as_req)
2923 self.assertIsNone(pk_as_rep19)
2925 return etype_info2
2927 def generate_simple_fast(self,
2928 kdc_exchange_dict,
2929 _callback_dict,
2930 req_body,
2931 fast_padata,
2932 fast_armor,
2933 checksum,
2934 fast_options=''):
2935 armor_key = kdc_exchange_dict['armor_key']
2937 fast_req = self.KRB_FAST_REQ_create(fast_options,
2938 fast_padata,
2939 req_body)
2940 fast_req = self.der_encode(fast_req,
2941 asn1Spec=krb5_asn1.KrbFastReq())
2942 fast_req = self.EncryptedData_create(armor_key,
2943 KU_FAST_ENC,
2944 fast_req)
2946 fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
2947 checksum,
2948 fast_req)
2950 fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
2951 fx_fast_request = self.der_encode(
2952 fx_fast_request,
2953 asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
2955 fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
2956 fx_fast_request)
2958 return fast_padata
2960 def generate_ap_req(self,
2961 kdc_exchange_dict,
2962 _callback_dict,
2963 req_body,
2964 armor):
2965 if armor:
2966 tgt = kdc_exchange_dict['armor_tgt']
2967 authenticator_subkey = kdc_exchange_dict['armor_subkey']
2969 req_body_checksum = None
2970 else:
2971 tgt = kdc_exchange_dict['tgt']
2972 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2973 body_checksum_type = kdc_exchange_dict['body_checksum_type']
2975 req_body_blob = self.der_encode(req_body,
2976 asn1Spec=krb5_asn1.KDC_REQ_BODY())
2978 req_body_checksum = self.Checksum_create(tgt.session_key,
2979 KU_TGS_REQ_AUTH_CKSUM,
2980 req_body_blob,
2981 ctype=body_checksum_type)
2983 auth_data = kdc_exchange_dict['auth_data']
2985 subkey_obj = None
2986 if authenticator_subkey is not None:
2987 subkey_obj = authenticator_subkey.export_obj()
2988 seq_number = random.randint(0, 0xfffffffe)
2989 (ctime, cusec) = self.get_KerberosTimeWithUsec()
2990 authenticator_obj = self.Authenticator_create(
2991 crealm=tgt.crealm,
2992 cname=tgt.cname,
2993 cksum=req_body_checksum,
2994 cusec=cusec,
2995 ctime=ctime,
2996 subkey=subkey_obj,
2997 seq_number=seq_number,
2998 authorization_data=auth_data)
2999 authenticator_blob = self.der_encode(
3000 authenticator_obj,
3001 asn1Spec=krb5_asn1.Authenticator())
3003 usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
3004 authenticator = self.EncryptedData_create(tgt.session_key,
3005 usage,
3006 authenticator_blob)
3008 ap_options = krb5_asn1.APOptions('0')
3009 ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
3010 ticket=tgt.ticket,
3011 authenticator=authenticator)
3012 ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
3014 return ap_req
3016 def generate_simple_tgs_padata(self,
3017 kdc_exchange_dict,
3018 callback_dict,
3019 req_body):
3020 ap_req = self.generate_ap_req(kdc_exchange_dict,
3021 callback_dict,
3022 req_body,
3023 armor=False)
3024 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
3025 padata = [pa_tgs_req]
3027 return padata, req_body
3029 def get_preauth_key(self, kdc_exchange_dict):
3030 msg_type = kdc_exchange_dict['rep_msg_type']
3032 if msg_type == KRB_AS_REP:
3033 key = kdc_exchange_dict['preauth_key']
3034 usage = KU_AS_REP_ENC_PART
3035 else: # KRB_TGS_REP
3036 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
3037 if authenticator_subkey is not None:
3038 key = authenticator_subkey
3039 usage = KU_TGS_REP_ENC_PART_SUB_KEY
3040 else:
3041 tgt = kdc_exchange_dict['tgt']
3042 key = tgt.session_key
3043 usage = KU_TGS_REP_ENC_PART_SESSION
3045 self.assertIsNotNone(key)
3047 return key, usage
3049 def generate_armor_key(self, subkey, session_key):
3050 armor_key = kcrypto.cf2(subkey.key,
3051 session_key.key,
3052 b'subkeyarmor',
3053 b'ticketarmor')
3054 armor_key = Krb5EncryptionKey(armor_key, None)
3056 return armor_key
3058 def generate_strengthen_reply_key(self, strengthen_key, reply_key):
3059 strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
3060 reply_key.key,
3061 b'strengthenkey',
3062 b'replykey')
3063 strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
3064 reply_key.kvno)
3066 return strengthen_reply_key
3068 def generate_client_challenge_key(self, armor_key, longterm_key):
3069 client_challenge_key = kcrypto.cf2(armor_key.key,
3070 longterm_key.key,
3071 b'clientchallengearmor',
3072 b'challengelongterm')
3073 client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
3075 return client_challenge_key
3077 def generate_kdc_challenge_key(self, armor_key, longterm_key):
3078 kdc_challenge_key = kcrypto.cf2(armor_key.key,
3079 longterm_key.key,
3080 b'kdcchallengearmor',
3081 b'challengelongterm')
3082 kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
3084 return kdc_challenge_key
3086 def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
3087 expected_type = expected_checksum['cksumtype']
3088 self.assertEqual(armor_key.ctype, expected_type)
3090 ticket_blob = self.der_encode(ticket,
3091 asn1Spec=krb5_asn1.Ticket())
3092 checksum = self.Checksum_create(armor_key,
3093 KU_FAST_FINISHED,
3094 ticket_blob)
3095 self.assertEqual(expected_checksum, checksum)
3097 def verify_ticket(self, ticket, krbtgt_key, expect_pac=True,
3098 expect_ticket_checksum=True):
3099 # Check if the ticket is a TGT.
3100 sname = ticket.ticket['sname']
3101 is_tgt = self.is_tgs(sname)
3103 # Decrypt the ticket.
3105 key = ticket.decryption_key
3106 enc_part = ticket.ticket['enc-part']
3108 self.assertElementEqual(enc_part, 'etype', key.etype)
3109 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3111 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3112 enc_part = self.der_decode(
3113 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3115 # Fetch the authorization data from the ticket.
3116 auth_data = enc_part.get('authorization-data')
3117 if expect_pac:
3118 self.assertIsNotNone(auth_data)
3119 elif auth_data is None:
3120 return
3122 # Get a copy of the authdata with an empty PAC, and the existing PAC
3123 # (if present).
3124 empty_pac = self.get_empty_pac()
3125 auth_data, pac_data = self.replace_pac(auth_data,
3126 empty_pac,
3127 expect_pac=expect_pac)
3128 if not expect_pac:
3129 return
3131 # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
3132 # raw type to create a new PAC with zeroed signatures for
3133 # verification. This is because on Windows, the resource_groups field
3134 # is added to PAC_LOGON_INFO after the info3 field has been created,
3135 # which results in a different ordering of pointer values than Samba
3136 # (see commit 0e201ecdc53). Using the raw type avoids changing
3137 # PAC_LOGON_INFO, so verification against Windows can work. We still
3138 # need the PAC_DATA type to retrieve the actual checksums, because the
3139 # signatures in the raw type may contain padding bytes.
3140 pac = ndr_unpack(krb5pac.PAC_DATA,
3141 pac_data)
3142 raw_pac = ndr_unpack(krb5pac.PAC_DATA_RAW,
3143 pac_data)
3145 checksums = {}
3147 for pac_buffer, raw_pac_buffer in zip(pac.buffers, raw_pac.buffers):
3148 buffer_type = pac_buffer.type
3149 if buffer_type in self.pac_checksum_types:
3150 self.assertNotIn(buffer_type, checksums,
3151 f'Duplicate checksum type {buffer_type}')
3153 # Fetch the checksum and the checksum type from the PAC buffer.
3154 checksum = pac_buffer.info.signature
3155 ctype = pac_buffer.info.type
3156 if ctype & 1 << 31:
3157 ctype |= -1 << 31
3159 checksums[buffer_type] = checksum, ctype
3161 if buffer_type != krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3162 # Zero the checksum field so that we can later verify the
3163 # checksums. The ticket checksum field is not zeroed.
3165 signature = ndr_unpack(
3166 krb5pac.PAC_SIGNATURE_DATA,
3167 raw_pac_buffer.info.remaining)
3168 signature.signature = bytes(len(checksum))
3169 raw_pac_buffer.info.remaining = ndr_pack(
3170 signature)
3172 # Re-encode the PAC.
3173 pac_data = ndr_pack(raw_pac)
3175 # Verify the signatures.
3177 server_checksum, server_ctype = checksums[
3178 krb5pac.PAC_TYPE_SRV_CHECKSUM]
3179 key.verify_checksum(KU_NON_KERB_CKSUM_SALT,
3180 pac_data,
3181 server_ctype,
3182 server_checksum)
3184 kdc_checksum, kdc_ctype = checksums[
3185 krb5pac.PAC_TYPE_KDC_CHECKSUM]
3186 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
3187 server_checksum,
3188 kdc_ctype,
3189 kdc_checksum)
3191 if is_tgt:
3192 self.assertNotIn(krb5pac.PAC_TYPE_TICKET_CHECKSUM, checksums)
3193 else:
3194 ticket_checksum, ticket_ctype = checksums.get(
3195 krb5pac.PAC_TYPE_TICKET_CHECKSUM,
3196 (None, None))
3197 if expect_ticket_checksum:
3198 self.assertIsNotNone(ticket_checksum)
3199 elif expect_ticket_checksum is False:
3200 self.assertIsNone(ticket_checksum)
3201 if ticket_checksum is not None:
3202 enc_part['authorization-data'] = auth_data
3203 enc_part = self.der_encode(enc_part,
3204 asn1Spec=krb5_asn1.EncTicketPart())
3206 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
3207 enc_part,
3208 ticket_ctype,
3209 ticket_checksum)
3211 def modified_ticket(self,
3212 ticket, *,
3213 new_ticket_key=None,
3214 modify_fn=None,
3215 modify_pac_fn=None,
3216 exclude_pac=False,
3217 update_pac_checksums=True,
3218 checksum_keys=None,
3219 include_checksums=None):
3220 if checksum_keys is None:
3221 # A dict containing a key for each checksum type to be created in
3222 # the PAC.
3223 checksum_keys = {}
3225 if include_checksums is None:
3226 # A dict containing a value for each checksum type; True if the
3227 # checksum type is to be included in the PAC, False if it is to be
3228 # excluded, or None/not present if the checksum is to be included
3229 # based on its presence in the original PAC.
3230 include_checksums = {}
3232 # Check that the values passed in by the caller make sense.
3234 self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types)
3235 self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types)
3237 if exclude_pac:
3238 self.assertIsNone(modify_pac_fn)
3240 update_pac_checksums = False
3242 if not update_pac_checksums:
3243 self.assertFalse(checksum_keys)
3244 self.assertFalse(include_checksums)
3246 expect_pac = update_pac_checksums or modify_pac_fn is not None
3248 key = ticket.decryption_key
3250 if new_ticket_key is None:
3251 # Use the same key to re-encrypt the ticket.
3252 new_ticket_key = key
3254 if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys:
3255 # If the server signature key is not present, fall back to the key
3256 # used to encrypt the ticket.
3257 checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key
3259 if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys:
3260 # If the ticket signature key is not present, fall back to the key
3261 # used for the KDC signature.
3262 kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
3263 if kdc_checksum_key is not None:
3264 checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = (
3265 kdc_checksum_key)
3267 # Decrypt the ticket.
3269 enc_part = ticket.ticket['enc-part']
3271 self.assertElementEqual(enc_part, 'etype', key.etype)
3272 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3274 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3275 enc_part = self.der_decode(
3276 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3278 # Modify the ticket here.
3279 if modify_fn is not None:
3280 enc_part = modify_fn(enc_part)
3282 auth_data = enc_part.get('authorization-data')
3283 if expect_pac:
3284 self.assertIsNotNone(auth_data)
3285 if auth_data is not None:
3286 new_pac = None
3287 if not exclude_pac:
3288 # Get a copy of the authdata with an empty PAC, and the
3289 # existing PAC (if present).
3290 empty_pac = self.get_empty_pac()
3291 empty_pac_auth_data, pac_data = self.replace_pac(
3292 auth_data,
3293 empty_pac,
3294 expect_pac=expect_pac)
3296 if pac_data is not None:
3297 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
3299 # Modify the PAC here.
3300 if modify_pac_fn is not None:
3301 pac = modify_pac_fn(pac)
3303 if update_pac_checksums:
3304 # Get the enc-part with an empty PAC, which is needed
3305 # to create a ticket signature.
3306 enc_part_to_sign = enc_part.copy()
3307 enc_part_to_sign['authorization-data'] = (
3308 empty_pac_auth_data)
3309 enc_part_to_sign = self.der_encode(
3310 enc_part_to_sign,
3311 asn1Spec=krb5_asn1.EncTicketPart())
3313 self.update_pac_checksums(pac,
3314 checksum_keys,
3315 include_checksums,
3316 enc_part_to_sign)
3318 # Re-encode the PAC.
3319 pac_data = ndr_pack(pac)
3320 new_pac = self.AuthorizationData_create(AD_WIN2K_PAC,
3321 pac_data)
3323 # Replace the PAC in the authorization data and re-add it to the
3324 # ticket enc-part.
3325 auth_data, _ = self.replace_pac(auth_data, new_pac,
3326 expect_pac=expect_pac)
3327 enc_part['authorization-data'] = auth_data
3329 # Re-encrypt the ticket enc-part with the new key.
3330 enc_part_new = self.der_encode(enc_part,
3331 asn1Spec=krb5_asn1.EncTicketPart())
3332 enc_part_new = self.EncryptedData_create(new_ticket_key,
3333 KU_TICKET,
3334 enc_part_new)
3336 # Create a copy of the ticket with the new enc-part.
3337 new_ticket = ticket.ticket.copy()
3338 new_ticket['enc-part'] = enc_part_new
3340 new_ticket_creds = KerberosTicketCreds(
3341 new_ticket,
3342 session_key=ticket.session_key,
3343 crealm=ticket.crealm,
3344 cname=ticket.cname,
3345 srealm=ticket.srealm,
3346 sname=ticket.sname,
3347 decryption_key=new_ticket_key,
3348 ticket_private=enc_part,
3349 encpart_private=ticket.encpart_private)
3351 return new_ticket_creds
3353 def update_pac_checksums(self,
3354 pac,
3355 checksum_keys,
3356 include_checksums,
3357 enc_part=None):
3358 pac_buffers = pac.buffers
3359 checksum_buffers = {}
3361 # Find the relevant PAC checksum buffers.
3362 for pac_buffer in pac_buffers:
3363 buffer_type = pac_buffer.type
3364 if buffer_type in self.pac_checksum_types:
3365 self.assertNotIn(buffer_type, checksum_buffers,
3366 f'Duplicate checksum type {buffer_type}')
3368 checksum_buffers[buffer_type] = pac_buffer
3370 # Create any additional buffers that were requested but not
3371 # present. Conversely, remove any buffers that were requested to be
3372 # removed.
3373 for buffer_type in self.pac_checksum_types:
3374 if buffer_type in checksum_buffers:
3375 if include_checksums.get(buffer_type) is False:
3376 checksum_buffer = checksum_buffers.pop(buffer_type)
3378 pac.num_buffers -= 1
3379 pac_buffers.remove(checksum_buffer)
3381 elif include_checksums.get(buffer_type) is True:
3382 info = krb5pac.PAC_SIGNATURE_DATA()
3384 checksum_buffer = krb5pac.PAC_BUFFER()
3385 checksum_buffer.type = buffer_type
3386 checksum_buffer.info = info
3388 pac_buffers.append(checksum_buffer)
3389 pac.num_buffers += 1
3391 checksum_buffers[buffer_type] = checksum_buffer
3393 # Fill the relevant checksum buffers.
3394 for buffer_type, checksum_buffer in checksum_buffers.items():
3395 checksum_key = checksum_keys[buffer_type]
3396 ctype = checksum_key.ctype & ((1 << 32) - 1)
3398 if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3399 self.assertIsNotNone(enc_part)
3401 signature = checksum_key.make_rodc_checksum(
3402 KU_NON_KERB_CKSUM_SALT,
3403 enc_part)
3405 elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM:
3406 signature = checksum_key.make_zeroed_checksum()
3408 else:
3409 signature = checksum_key.make_rodc_zeroed_checksum()
3411 checksum_buffer.info.signature = signature
3412 checksum_buffer.info.type = ctype
3414 # Add the new checksum buffers to the PAC.
3415 pac.buffers = pac_buffers
3417 # Calculate the server and KDC checksums and insert them into the PAC.
3419 server_checksum_buffer = checksum_buffers.get(
3420 krb5pac.PAC_TYPE_SRV_CHECKSUM)
3421 if server_checksum_buffer is not None:
3422 server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM]
3424 pac_data = ndr_pack(pac)
3425 server_checksum = server_checksum_key.make_checksum(
3426 KU_NON_KERB_CKSUM_SALT,
3427 pac_data)
3429 server_checksum_buffer.info.signature = server_checksum
3431 kdc_checksum_buffer = checksum_buffers.get(
3432 krb5pac.PAC_TYPE_KDC_CHECKSUM)
3433 if kdc_checksum_buffer is not None:
3434 if server_checksum_buffer is None:
3435 # There's no server signature to make the checksum over, so
3436 # just make the checksum over an empty bytes object.
3437 server_checksum = bytes()
3439 kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM]
3441 kdc_checksum = kdc_checksum_key.make_rodc_checksum(
3442 KU_NON_KERB_CKSUM_SALT,
3443 server_checksum)
3445 kdc_checksum_buffer.info.signature = kdc_checksum
3447 def replace_pac(self, auth_data, new_pac, expect_pac=True):
3448 if new_pac is not None:
3449 self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)
3450 self.assertElementPresent(new_pac, 'ad-data')
3452 new_auth_data = []
3454 ad_relevant = None
3455 old_pac = None
3457 for authdata_elem in auth_data:
3458 if authdata_elem['ad-type'] == AD_IF_RELEVANT:
3459 ad_relevant = self.der_decode(
3460 authdata_elem['ad-data'],
3461 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3463 relevant_elems = []
3464 for relevant_elem in ad_relevant:
3465 if relevant_elem['ad-type'] == AD_WIN2K_PAC:
3466 self.assertIsNone(old_pac, 'Multiple PACs detected')
3467 old_pac = relevant_elem['ad-data']
3469 if new_pac is not None:
3470 relevant_elems.append(new_pac)
3471 else:
3472 relevant_elems.append(relevant_elem)
3473 if expect_pac:
3474 self.assertIsNotNone(old_pac, 'Expected PAC')
3476 ad_relevant = self.der_encode(
3477 relevant_elems,
3478 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3480 authdata_elem = self.AuthorizationData_create(AD_IF_RELEVANT,
3481 ad_relevant)
3483 new_auth_data.append(authdata_elem)
3485 if expect_pac:
3486 self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT')
3488 return new_auth_data, old_pac
3490 def get_pac(self, auth_data, expect_pac=True):
3491 _, pac = self.replace_pac(auth_data, None, expect_pac)
3492 return pac
3494 def get_krbtgt_checksum_key(self):
3495 krbtgt_creds = self.get_krbtgt_creds()
3496 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
3498 return {
3499 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
3502 def is_tgs(self, principal):
3503 name = principal['name-string'][0]
3504 return name in ('krbtgt', b'krbtgt')
3506 def get_empty_pac(self):
3507 return self.AuthorizationData_create(AD_WIN2K_PAC, bytes(1))
3509 def get_outer_pa_dict(self, kdc_exchange_dict):
3510 return self.get_pa_dict(kdc_exchange_dict['req_padata'])
3512 def get_fast_pa_dict(self, kdc_exchange_dict):
3513 req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
3515 if req_pa_dict:
3516 return req_pa_dict
3518 return self.get_outer_pa_dict(kdc_exchange_dict)
3520 def sent_fast(self, kdc_exchange_dict):
3521 outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
3523 return PADATA_FX_FAST in outer_pa_dict
3525 def sent_enc_challenge(self, kdc_exchange_dict):
3526 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
3528 return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
3530 def get_sent_pac_options(self, kdc_exchange_dict):
3531 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
3533 if PADATA_PAC_OPTIONS not in fast_pa_dict:
3534 return ''
3536 pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
3537 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
3538 pac_options = pac_options['options']
3540 # Mask out unsupported bits.
3541 pac_options, remaining = pac_options[:4], pac_options[4:]
3542 pac_options += '0' * len(remaining)
3544 return pac_options
3546 def get_krbtgt_sname(self):
3547 krbtgt_creds = self.get_krbtgt_creds()
3548 krbtgt_username = krbtgt_creds.get_username()
3549 krbtgt_realm = krbtgt_creds.get_realm()
3550 krbtgt_sname = self.PrincipalName_create(
3551 name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm])
3553 return krbtgt_sname
3555 def _test_as_exchange(self,
3556 cname,
3557 realm,
3558 sname,
3559 till,
3560 client_as_etypes,
3561 expected_error_mode,
3562 expected_crealm,
3563 expected_cname,
3564 expected_srealm,
3565 expected_sname,
3566 expected_salt,
3567 etypes,
3568 padata,
3569 kdc_options,
3570 expected_flags=None,
3571 unexpected_flags=None,
3572 expected_supported_etypes=None,
3573 preauth_key=None,
3574 ticket_decryption_key=None,
3575 pac_request=None,
3576 pac_options=None,
3577 to_rodc=False):
3579 def _generate_padata_copy(_kdc_exchange_dict,
3580 _callback_dict,
3581 req_body):
3582 return padata, req_body
3584 if not expected_error_mode:
3585 check_error_fn = None
3586 check_rep_fn = self.generic_check_kdc_rep
3587 else:
3588 check_error_fn = self.generic_check_kdc_error
3589 check_rep_fn = None
3591 if padata is not None:
3592 generate_padata_fn = _generate_padata_copy
3593 else:
3594 generate_padata_fn = None
3596 kdc_exchange_dict = self.as_exchange_dict(
3597 expected_crealm=expected_crealm,
3598 expected_cname=expected_cname,
3599 expected_srealm=expected_srealm,
3600 expected_sname=expected_sname,
3601 expected_supported_etypes=expected_supported_etypes,
3602 ticket_decryption_key=ticket_decryption_key,
3603 generate_padata_fn=generate_padata_fn,
3604 check_error_fn=check_error_fn,
3605 check_rep_fn=check_rep_fn,
3606 check_kdc_private_fn=self.generic_check_kdc_private,
3607 expected_error_mode=expected_error_mode,
3608 client_as_etypes=client_as_etypes,
3609 expected_salt=expected_salt,
3610 expected_flags=expected_flags,
3611 unexpected_flags=unexpected_flags,
3612 preauth_key=preauth_key,
3613 kdc_options=str(kdc_options),
3614 pac_request=pac_request,
3615 pac_options=pac_options,
3616 to_rodc=to_rodc)
3618 rep = self._generic_kdc_exchange(kdc_exchange_dict,
3619 cname=cname,
3620 realm=realm,
3621 sname=sname,
3622 till_time=till,
3623 etypes=etypes)
3625 return rep, kdc_exchange_dict