tests/krb5: Allow specifying whether to expect a PAC with _test_as_exchange()
[Samba.git] / python / samba / tests / krb5 / raw_testcase.py
blob29dc5f397b6889bde5be3c1aaeacbafbdc7a605d
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import socket
21 import struct
22 import time
23 import datetime
24 import random
25 import binascii
26 import itertools
27 import collections
29 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
30 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
31 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
32 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
34 from pyasn1.codec.ber.encoder import BitStringEncoder
36 from samba.credentials import Credentials
37 from samba.dcerpc import krb5pac, security
38 from samba.gensec import FEATURE_SEAL
39 from samba.ndr import ndr_pack, ndr_unpack
41 import samba.tests
42 from samba.tests import TestCaseInTempDir
44 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
45 from samba.tests.krb5.rfc4120_constants import (
46 AD_IF_RELEVANT,
47 AD_WIN2K_PAC,
48 FX_FAST_ARMOR_AP_REQUEST,
49 KDC_ERR_GENERIC,
50 KDC_ERR_PREAUTH_FAILED,
51 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
52 KERB_ERR_TYPE_EXTENDED,
53 KRB_AP_REQ,
54 KRB_AS_REP,
55 KRB_AS_REQ,
56 KRB_ERROR,
57 KRB_TGS_REP,
58 KRB_TGS_REQ,
59 KU_AP_REQ_AUTH,
60 KU_AS_REP_ENC_PART,
61 KU_ENC_CHALLENGE_KDC,
62 KU_FAST_ENC,
63 KU_FAST_FINISHED,
64 KU_FAST_REP,
65 KU_FAST_REQ_CHKSUM,
66 KU_NON_KERB_CKSUM_SALT,
67 KU_TGS_REP_ENC_PART_SESSION,
68 KU_TGS_REP_ENC_PART_SUB_KEY,
69 KU_TGS_REQ_AUTH,
70 KU_TGS_REQ_AUTH_CKSUM,
71 KU_TGS_REQ_AUTH_DAT_SESSION,
72 KU_TGS_REQ_AUTH_DAT_SUBKEY,
73 KU_TICKET,
74 NT_SRV_INST,
75 NT_WELLKNOWN,
76 PADATA_ENCRYPTED_CHALLENGE,
77 PADATA_ENC_TIMESTAMP,
78 PADATA_ETYPE_INFO,
79 PADATA_ETYPE_INFO2,
80 PADATA_FOR_USER,
81 PADATA_FX_COOKIE,
82 PADATA_FX_ERROR,
83 PADATA_FX_FAST,
84 PADATA_KDC_REQ,
85 PADATA_PAC_OPTIONS,
86 PADATA_PAC_REQUEST,
87 PADATA_PK_AS_REQ,
88 PADATA_PK_AS_REP_19,
89 PADATA_SUPPORTED_ETYPES
91 import samba.tests.krb5.kcrypto as kcrypto
94 def BitStringEncoder_encodeValue32(
95 self, value, asn1Spec, encodeFun, **options):
97 # BitStrings like KDCOptions or TicketFlags should at least
98 # be 32-Bit on the wire
100 if asn1Spec is not None:
101 # TODO: try to avoid ASN.1 schema instantiation
102 value = asn1Spec.clone(value)
104 valueLength = len(value)
105 if valueLength % 8:
106 alignedValue = value << (8 - valueLength % 8)
107 else:
108 alignedValue = value
110 substrate = alignedValue.asOctets()
111 length = len(substrate)
112 # We need at least 32-Bit / 4-Bytes
113 if length < 4:
114 padding = 4 - length
115 else:
116 padding = 0
117 ret = b'\x00' + substrate + (b'\x00' * padding)
118 return ret, False, True
121 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
124 def BitString_NamedValues_prettyPrint(self, scope=0):
125 ret = "%s" % self.asBinary()
126 bits = []
127 highest_bit = 32
128 for byte in self.asNumbers():
129 for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
130 mask = 1 << bit
131 if byte & mask:
132 val = 1
133 else:
134 val = 0
135 bits.append(val)
136 if len(bits) < highest_bit:
137 for bitPosition in range(len(bits), highest_bit):
138 bits.append(0)
139 indent = " " * scope
140 delim = ": (\n%s " % indent
141 for bitPosition in range(highest_bit):
142 if bitPosition in self.prettyPrintNamedValues:
143 name = self.prettyPrintNamedValues[bitPosition]
144 elif bits[bitPosition] != 0:
145 name = "unknown-bit-%u" % bitPosition
146 else:
147 continue
148 ret += "%s%s:%u" % (delim, name, bits[bitPosition])
149 delim = ",\n%s " % indent
150 ret += "\n%s)" % indent
151 return ret
154 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
155 krb5_asn1.TicketFlagsValues.namedValues
156 krb5_asn1.TicketFlags.namedValues =\
157 krb5_asn1.TicketFlagsValues.namedValues
158 krb5_asn1.TicketFlags.prettyPrint =\
159 BitString_NamedValues_prettyPrint
160 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
161 krb5_asn1.KDCOptionsValues.namedValues
162 krb5_asn1.KDCOptions.namedValues =\
163 krb5_asn1.KDCOptionsValues.namedValues
164 krb5_asn1.KDCOptions.prettyPrint =\
165 BitString_NamedValues_prettyPrint
166 krb5_asn1.APOptions.prettyPrintNamedValues =\
167 krb5_asn1.APOptionsValues.namedValues
168 krb5_asn1.APOptions.namedValues =\
169 krb5_asn1.APOptionsValues.namedValues
170 krb5_asn1.APOptions.prettyPrint =\
171 BitString_NamedValues_prettyPrint
172 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
173 krb5_asn1.PACOptionFlagsValues.namedValues
174 krb5_asn1.PACOptionFlags.namedValues =\
175 krb5_asn1.PACOptionFlagsValues.namedValues
176 krb5_asn1.PACOptionFlags.prettyPrint =\
177 BitString_NamedValues_prettyPrint
180 def Integer_NamedValues_prettyPrint(self, scope=0):
181 intval = int(self)
182 if intval in self.prettyPrintNamedValues:
183 name = self.prettyPrintNamedValues[intval]
184 else:
185 name = "<__unknown__>"
186 ret = "%d (0x%x) %s" % (intval, intval, name)
187 return ret
190 krb5_asn1.NameType.prettyPrintNamedValues =\
191 krb5_asn1.NameTypeValues.namedValues
192 krb5_asn1.NameType.prettyPrint =\
193 Integer_NamedValues_prettyPrint
194 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
195 krb5_asn1.AuthDataTypeValues.namedValues
196 krb5_asn1.AuthDataType.prettyPrint =\
197 Integer_NamedValues_prettyPrint
198 krb5_asn1.PADataType.prettyPrintNamedValues =\
199 krb5_asn1.PADataTypeValues.namedValues
200 krb5_asn1.PADataType.prettyPrint =\
201 Integer_NamedValues_prettyPrint
202 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
203 krb5_asn1.EncryptionTypeValues.namedValues
204 krb5_asn1.EncryptionType.prettyPrint =\
205 Integer_NamedValues_prettyPrint
206 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
207 krb5_asn1.ChecksumTypeValues.namedValues
208 krb5_asn1.ChecksumType.prettyPrint =\
209 Integer_NamedValues_prettyPrint
210 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
211 krb5_asn1.KerbErrorDataTypeValues.namedValues
212 krb5_asn1.KerbErrorDataType.prettyPrint =\
213 Integer_NamedValues_prettyPrint
216 class Krb5EncryptionKey:
217 def __init__(self, key, kvno):
218 EncTypeChecksum = {
219 kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
220 kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
221 kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
223 self.key = key
224 self.etype = key.enctype
225 self.ctype = EncTypeChecksum[self.etype]
226 self.kvno = kvno
228 def encrypt(self, usage, plaintext):
229 ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
230 return ciphertext
232 def decrypt(self, usage, ciphertext):
233 plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
234 return plaintext
236 def make_zeroed_checksum(self, ctype=None):
237 if ctype is None:
238 ctype = self.ctype
240 checksum_len = kcrypto.checksum_len(ctype)
241 return bytes(checksum_len)
243 def make_checksum(self, usage, plaintext, ctype=None):
244 if ctype is None:
245 ctype = self.ctype
246 cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
247 return cksum
249 def verify_checksum(self, usage, plaintext, ctype, cksum):
250 if self.ctype != ctype:
251 raise AssertionError(f'key checksum type ({self.ctype}) != '
252 f'checksum type ({ctype})')
254 kcrypto.verify_checksum(ctype,
255 self.key,
256 usage,
257 plaintext,
258 cksum)
260 def export_obj(self):
261 EncryptionKey_obj = {
262 'keytype': self.etype,
263 'keyvalue': self.key.contents,
265 return EncryptionKey_obj
268 class RodcPacEncryptionKey(Krb5EncryptionKey):
269 def __init__(self, key, kvno, rodc_id=None):
270 super().__init__(key, kvno)
272 if rodc_id is None:
273 kvno = self.kvno
274 if kvno is not None:
275 kvno >>= 16
276 kvno &= (1 << 16) - 1
278 rodc_id = kvno or None
280 if rodc_id is not None:
281 self.rodc_id = rodc_id.to_bytes(2, byteorder='little')
282 else:
283 self.rodc_id = b''
285 def make_rodc_zeroed_checksum(self, ctype=None):
286 checksum = super().make_zeroed_checksum(ctype)
287 return checksum + bytes(len(self.rodc_id))
289 def make_rodc_checksum(self, usage, plaintext, ctype=None):
290 checksum = super().make_checksum(usage, plaintext, ctype)
291 return checksum + self.rodc_id
293 def verify_rodc_checksum(self, usage, plaintext, ctype, cksum):
294 if self.rodc_id:
295 cksum, cksum_rodc_id = cksum[:-2], cksum[-2:]
297 if self.rodc_id != cksum_rodc_id:
298 raise AssertionError(f'{self.rodc_id.hex()} != '
299 f'{cksum_rodc_id.hex()}')
301 super().verify_checksum(usage,
302 plaintext,
303 ctype,
304 cksum)
307 class ZeroedChecksumKey(RodcPacEncryptionKey):
308 def make_checksum(self, usage, plaintext, ctype=None):
309 return self.make_zeroed_checksum(ctype)
311 def make_rodc_checksum(self, usage, plaintext, ctype=None):
312 return self.make_rodc_zeroed_checksum(ctype)
315 class WrongLengthChecksumKey(RodcPacEncryptionKey):
316 def __init__(self, key, kvno, length):
317 super().__init__(key, kvno)
319 self._length = length
321 @classmethod
322 def _adjust_to_length(cls, checksum, length):
323 diff = length - len(checksum)
324 if diff > 0:
325 checksum += bytes(diff)
326 elif diff < 0:
327 checksum = checksum[:length]
329 return checksum
331 def make_zeroed_checksum(self, ctype=None):
332 return bytes(self._length)
334 def make_checksum(self, usage, plaintext, ctype=None):
335 checksum = super().make_checksum(usage, plaintext, ctype)
336 return self._adjust_to_length(checksum, self._length)
338 def make_rodc_zeroed_checksum(self, ctype=None):
339 return bytes(self._length)
341 def make_rodc_checksum(self, usage, plaintext, ctype=None):
342 checksum = super().make_rodc_checksum(usage, plaintext, ctype)
343 return self._adjust_to_length(checksum, self._length)
346 class KerberosCredentials(Credentials):
348 fast_supported_bits = (security.KERB_ENCTYPE_FAST_SUPPORTED |
349 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
350 security.KERB_ENCTYPE_CLAIMS_SUPPORTED)
352 def __init__(self):
353 super(KerberosCredentials, self).__init__()
354 all_enc_types = 0
355 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
356 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
357 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
359 self.as_supported_enctypes = all_enc_types
360 self.tgs_supported_enctypes = all_enc_types
361 self.ap_supported_enctypes = all_enc_types
363 self.kvno = None
364 self.forced_keys = {}
366 self.forced_salt = None
368 self.dn = None
369 self.spn = None
371 def set_as_supported_enctypes(self, value):
372 self.as_supported_enctypes = int(value)
374 def set_tgs_supported_enctypes(self, value):
375 self.tgs_supported_enctypes = int(value)
377 def set_ap_supported_enctypes(self, value):
378 self.ap_supported_enctypes = int(value)
380 etype_map = collections.OrderedDict([
381 (kcrypto.Enctype.AES256,
382 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96),
383 (kcrypto.Enctype.AES128,
384 security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96),
385 (kcrypto.Enctype.RC4,
386 security.KERB_ENCTYPE_RC4_HMAC_MD5),
387 (kcrypto.Enctype.DES_MD5,
388 security.KERB_ENCTYPE_DES_CBC_MD5),
389 (kcrypto.Enctype.DES_CRC,
390 security.KERB_ENCTYPE_DES_CBC_CRC)
393 @classmethod
394 def etypes_to_bits(cls, etypes):
395 bits = 0
396 for etype in etypes:
397 bit = cls.etype_map[etype]
398 if bits & bit:
399 raise ValueError(f'Got duplicate etype: {etype}')
400 bits |= bit
402 return bits
404 @classmethod
405 def bits_to_etypes(cls, bits):
406 etypes = ()
407 for etype, bit in cls.etype_map.items():
408 if bit & bits:
409 bits &= ~bit
410 etypes += (etype,)
412 bits &= ~cls.fast_supported_bits
413 if bits != 0:
414 raise ValueError(f'Unsupported etype bits: {bits}')
416 return etypes
418 def get_as_krb5_etypes(self):
419 return self.bits_to_etypes(self.as_supported_enctypes)
421 def get_tgs_krb5_etypes(self):
422 return self.bits_to_etypes(self.tgs_supported_enctypes)
424 def get_ap_krb5_etypes(self):
425 return self.bits_to_etypes(self.ap_supported_enctypes)
427 def set_kvno(self, kvno):
428 # Sign-extend from 32 bits.
429 if kvno & 1 << 31:
430 kvno |= -1 << 31
431 self.kvno = kvno
433 def get_kvno(self):
434 return self.kvno
436 def set_forced_key(self, etype, hexkey):
437 etype = int(etype)
438 contents = binascii.a2b_hex(hexkey)
439 key = kcrypto.Key(etype, contents)
440 self.forced_keys[etype] = RodcPacEncryptionKey(key, self.kvno)
442 def get_forced_key(self, etype):
443 etype = int(etype)
444 return self.forced_keys.get(etype)
446 def set_forced_salt(self, salt):
447 self.forced_salt = bytes(salt)
449 def get_forced_salt(self):
450 return self.forced_salt
452 def get_salt(self):
453 if self.forced_salt is not None:
454 return self.forced_salt
456 if self.get_workstation():
457 salt_string = '%shost%s.%s' % (
458 self.get_realm().upper(),
459 self.get_username().lower().rsplit('$', 1)[0],
460 self.get_realm().lower())
461 else:
462 salt_string = self.get_realm().upper() + self.get_username()
464 return salt_string.encode('utf-8')
466 def set_dn(self, dn):
467 self.dn = dn
469 def get_dn(self):
470 return self.dn
472 def set_spn(self, spn):
473 self.spn = spn
475 def get_spn(self):
476 return self.spn
479 class KerberosTicketCreds:
480 def __init__(self, ticket, session_key,
481 crealm=None, cname=None,
482 srealm=None, sname=None,
483 decryption_key=None,
484 ticket_private=None,
485 encpart_private=None):
486 self.ticket = ticket
487 self.session_key = session_key
488 self.crealm = crealm
489 self.cname = cname
490 self.srealm = srealm
491 self.sname = sname
492 self.decryption_key = decryption_key
493 self.ticket_private = ticket_private
494 self.encpart_private = encpart_private
497 class RawKerberosTest(TestCaseInTempDir):
498 """A raw Kerberos Test case."""
500 pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
501 krb5pac.PAC_TYPE_KDC_CHECKSUM,
502 krb5pac.PAC_TYPE_TICKET_CHECKSUM}
504 etypes_to_test = (
505 {"value": -1111, "name": "dummy", },
506 {"value": kcrypto.Enctype.AES256, "name": "aes128", },
507 {"value": kcrypto.Enctype.AES128, "name": "aes256", },
508 {"value": kcrypto.Enctype.RC4, "name": "rc4", },
511 setup_etype_test_permutations_done = False
513 @classmethod
514 def setup_etype_test_permutations(cls):
515 if cls.setup_etype_test_permutations_done:
516 return
518 res = []
520 num_idxs = len(cls.etypes_to_test)
521 permutations = []
522 for num in range(1, num_idxs + 1):
523 chunk = list(itertools.permutations(range(num_idxs), num))
524 for e in chunk:
525 el = list(e)
526 permutations.append(el)
528 for p in permutations:
529 name = None
530 etypes = ()
531 for idx in p:
532 n = cls.etypes_to_test[idx]["name"]
533 if name is None:
534 name = n
535 else:
536 name += "_%s" % n
537 etypes += (cls.etypes_to_test[idx]["value"],)
539 r = {"name": name, "etypes": etypes, }
540 res.append(r)
542 cls.etype_test_permutations = res
543 cls.setup_etype_test_permutations_done = True
545 @classmethod
546 def etype_test_permutation_name_idx(cls):
547 cls.setup_etype_test_permutations()
548 res = []
549 idx = 0
550 for e in cls.etype_test_permutations:
551 r = (e['name'], idx)
552 idx += 1
553 res.append(r)
554 return res
556 def etype_test_permutation_by_idx(self, idx):
557 e = self.etype_test_permutations[idx]
558 return (e['name'], e['etypes'])
560 @classmethod
561 def setUpClass(cls):
562 super().setUpClass()
564 cls.host = samba.tests.env_get_var_value('SERVER')
565 cls.dc_host = samba.tests.env_get_var_value('DC_SERVER')
567 # A dictionary containing credentials that have already been
568 # obtained.
569 cls.creds_dict = {}
571 kdc_fast_support = samba.tests.env_get_var_value('FAST_SUPPORT',
572 allow_missing=True)
573 if kdc_fast_support is None:
574 kdc_fast_support = '0'
575 cls.kdc_fast_support = bool(int(kdc_fast_support))
577 tkt_sig_support = samba.tests.env_get_var_value('TKT_SIG_SUPPORT',
578 allow_missing=True)
579 if tkt_sig_support is None:
580 tkt_sig_support = '0'
581 cls.tkt_sig_support = bool(int(tkt_sig_support))
583 def setUp(self):
584 super().setUp()
585 self.do_asn1_print = False
586 self.do_hexdump = False
588 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
589 allow_missing=True)
590 if strict_checking is None:
591 strict_checking = '1'
592 self.strict_checking = bool(int(strict_checking))
594 self.s = None
596 self.unspecified_kvno = object()
598 def tearDown(self):
599 self._disconnect("tearDown")
600 super().tearDown()
602 def _disconnect(self, reason):
603 if self.s is None:
604 return
605 self.s.close()
606 self.s = None
607 if self.do_hexdump:
608 sys.stderr.write("disconnect[%s]\n" % reason)
610 def _connect_tcp(self, host):
611 tcp_port = 88
612 try:
613 self.a = socket.getaddrinfo(host, tcp_port, socket.AF_UNSPEC,
614 socket.SOCK_STREAM, socket.SOL_TCP,
616 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
617 self.s.settimeout(10)
618 self.s.connect(self.a[0][4])
619 except socket.error:
620 self.s.close()
621 raise
622 except IOError:
623 self.s.close()
624 raise
626 def connect(self, host):
627 self.assertNotConnected()
628 self._connect_tcp(host)
629 if self.do_hexdump:
630 sys.stderr.write("connected[%s]\n" % host)
632 def env_get_var(self, varname, prefix,
633 fallback_default=True,
634 allow_missing=False):
635 val = None
636 if prefix is not None:
637 allow_missing_prefix = allow_missing or fallback_default
638 val = samba.tests.env_get_var_value(
639 '%s_%s' % (prefix, varname),
640 allow_missing=allow_missing_prefix)
641 else:
642 fallback_default = True
643 if val is None and fallback_default:
644 val = samba.tests.env_get_var_value(varname,
645 allow_missing=allow_missing)
646 return val
648 def _get_krb5_creds_from_env(self, prefix,
649 default_username=None,
650 allow_missing_password=False,
651 allow_missing_keys=True,
652 require_strongest_key=False):
653 c = KerberosCredentials()
654 c.guess()
656 domain = self.env_get_var('DOMAIN', prefix)
657 realm = self.env_get_var('REALM', prefix)
658 allow_missing_username = default_username is not None
659 username = self.env_get_var('USERNAME', prefix,
660 fallback_default=False,
661 allow_missing=allow_missing_username)
662 if username is None:
663 username = default_username
664 password = self.env_get_var('PASSWORD', prefix,
665 fallback_default=False,
666 allow_missing=allow_missing_password)
667 c.set_domain(domain)
668 c.set_realm(realm)
669 c.set_username(username)
670 if password is not None:
671 c.set_password(password)
672 as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
673 prefix, allow_missing=True)
674 if as_supported_enctypes is not None:
675 c.set_as_supported_enctypes(as_supported_enctypes)
676 tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
677 prefix, allow_missing=True)
678 if tgs_supported_enctypes is not None:
679 c.set_tgs_supported_enctypes(tgs_supported_enctypes)
680 ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
681 prefix, allow_missing=True)
682 if ap_supported_enctypes is not None:
683 c.set_ap_supported_enctypes(ap_supported_enctypes)
685 if require_strongest_key:
686 kvno_allow_missing = False
687 if password is None:
688 aes256_allow_missing = False
689 else:
690 aes256_allow_missing = True
691 else:
692 kvno_allow_missing = allow_missing_keys
693 aes256_allow_missing = allow_missing_keys
694 kvno = self.env_get_var('KVNO', prefix,
695 fallback_default=False,
696 allow_missing=kvno_allow_missing)
697 if kvno is not None:
698 c.set_kvno(kvno)
699 aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
700 fallback_default=False,
701 allow_missing=aes256_allow_missing)
702 if aes256_key is not None:
703 c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
704 aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
705 fallback_default=False,
706 allow_missing=True)
707 if aes128_key is not None:
708 c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
709 rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
710 fallback_default=False, allow_missing=True)
711 if rc4_key is not None:
712 c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
714 if not allow_missing_keys:
715 self.assertTrue(c.forced_keys,
716 'Please supply %s encryption keys '
717 'in environment' % prefix)
719 return c
721 def _get_krb5_creds(self,
722 prefix,
723 default_username=None,
724 allow_missing_password=False,
725 allow_missing_keys=True,
726 require_strongest_key=False,
727 fallback_creds_fn=None):
728 if prefix in self.creds_dict:
729 return self.creds_dict[prefix]
731 # We don't have the credentials already
732 creds = None
733 env_err = None
734 try:
735 # Try to obtain them from the environment
736 creds = self._get_krb5_creds_from_env(
737 prefix,
738 default_username=default_username,
739 allow_missing_password=allow_missing_password,
740 allow_missing_keys=allow_missing_keys,
741 require_strongest_key=require_strongest_key)
742 except Exception as err:
743 # An error occurred, so save it for later
744 env_err = err
745 else:
746 self.assertIsNotNone(creds)
747 # Save the obtained credentials
748 self.creds_dict[prefix] = creds
749 return creds
751 if fallback_creds_fn is not None:
752 try:
753 # Try to use the fallback method
754 creds = fallback_creds_fn()
755 except Exception as err:
756 print("ERROR FROM ENV: %r" % (env_err))
757 print("FALLBACK-FN: %s" % (fallback_creds_fn))
758 print("FALLBACK-ERROR: %r" % (err))
759 else:
760 self.assertIsNotNone(creds)
761 # Save the obtained credentials
762 self.creds_dict[prefix] = creds
763 return creds
765 # Both methods failed, so raise the exception from the
766 # environment method
767 raise env_err
769 def get_user_creds(self,
770 allow_missing_password=False,
771 allow_missing_keys=True):
772 c = self._get_krb5_creds(prefix=None,
773 allow_missing_password=allow_missing_password,
774 allow_missing_keys=allow_missing_keys)
775 return c
777 def get_service_creds(self,
778 allow_missing_password=False,
779 allow_missing_keys=True):
780 c = self._get_krb5_creds(prefix='SERVICE',
781 allow_missing_password=allow_missing_password,
782 allow_missing_keys=allow_missing_keys)
783 return c
785 def get_client_creds(self,
786 allow_missing_password=False,
787 allow_missing_keys=True):
788 c = self._get_krb5_creds(prefix='CLIENT',
789 allow_missing_password=allow_missing_password,
790 allow_missing_keys=allow_missing_keys)
791 return c
793 def get_server_creds(self,
794 allow_missing_password=False,
795 allow_missing_keys=True):
796 c = self._get_krb5_creds(prefix='SERVER',
797 allow_missing_password=allow_missing_password,
798 allow_missing_keys=allow_missing_keys)
799 return c
801 def get_admin_creds(self,
802 allow_missing_password=False,
803 allow_missing_keys=True):
804 c = self._get_krb5_creds(prefix='ADMIN',
805 allow_missing_password=allow_missing_password,
806 allow_missing_keys=allow_missing_keys)
807 c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
808 return c
810 def get_rodc_krbtgt_creds(self,
811 require_keys=True,
812 require_strongest_key=False):
813 if require_strongest_key:
814 self.assertTrue(require_keys)
815 c = self._get_krb5_creds(prefix='RODC_KRBTGT',
816 allow_missing_password=True,
817 allow_missing_keys=not require_keys,
818 require_strongest_key=require_strongest_key)
819 return c
821 def get_krbtgt_creds(self,
822 require_keys=True,
823 require_strongest_key=False):
824 if require_strongest_key:
825 self.assertTrue(require_keys)
826 c = self._get_krb5_creds(prefix='KRBTGT',
827 default_username='krbtgt',
828 allow_missing_password=True,
829 allow_missing_keys=not require_keys,
830 require_strongest_key=require_strongest_key)
831 return c
833 def get_anon_creds(self):
834 c = Credentials()
835 c.set_anonymous()
836 return c
838 def asn1_dump(self, name, obj, asn1_print=None):
839 if asn1_print is None:
840 asn1_print = self.do_asn1_print
841 if asn1_print:
842 if name is not None:
843 sys.stderr.write("%s:\n%s" % (name, obj))
844 else:
845 sys.stderr.write("%s" % (obj))
847 def hex_dump(self, name, blob, hexdump=None):
848 if hexdump is None:
849 hexdump = self.do_hexdump
850 if hexdump:
851 sys.stderr.write(
852 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
854 def der_decode(
855 self,
856 blob,
857 asn1Spec=None,
858 native_encode=True,
859 asn1_print=None,
860 hexdump=None):
861 if asn1Spec is not None:
862 class_name = type(asn1Spec).__name__.split(':')[0]
863 else:
864 class_name = "<None-asn1Spec>"
865 self.hex_dump(class_name, blob, hexdump=hexdump)
866 obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
867 self.asn1_dump(None, obj, asn1_print=asn1_print)
868 if native_encode:
869 obj = pyasn1_native_encode(obj)
870 return obj
872 def der_encode(
873 self,
874 obj,
875 asn1Spec=None,
876 native_decode=True,
877 asn1_print=None,
878 hexdump=None):
879 if native_decode:
880 obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
881 class_name = type(obj).__name__.split(':')[0]
882 if class_name is not None:
883 self.asn1_dump(None, obj, asn1_print=asn1_print)
884 blob = pyasn1_der_encode(obj)
885 if class_name is not None:
886 self.hex_dump(class_name, blob, hexdump=hexdump)
887 return blob
889 def send_pdu(self, req, asn1_print=None, hexdump=None):
890 try:
891 k5_pdu = self.der_encode(
892 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
893 header = struct.pack('>I', len(k5_pdu))
894 req_pdu = header
895 req_pdu += k5_pdu
896 self.hex_dump("send_pdu", header, hexdump=hexdump)
897 self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
898 while True:
899 sent = self.s.send(req_pdu, 0)
900 if sent == len(req_pdu):
901 break
902 req_pdu = req_pdu[sent:]
903 except socket.error as e:
904 self._disconnect("send_pdu: %s" % e)
905 raise
906 except IOError as e:
907 self._disconnect("send_pdu: %s" % e)
908 raise
910 def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
911 rep_pdu = None
912 try:
913 if timeout is not None:
914 self.s.settimeout(timeout)
915 rep_pdu = self.s.recv(num_recv, 0)
916 self.s.settimeout(10)
917 if len(rep_pdu) == 0:
918 self._disconnect("recv_raw: EOF")
919 return None
920 self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
921 except socket.timeout:
922 self.s.settimeout(10)
923 sys.stderr.write("recv_raw: TIMEOUT\n")
924 except socket.error as e:
925 self._disconnect("recv_raw: %s" % e)
926 raise
927 except IOError as e:
928 self._disconnect("recv_raw: %s" % e)
929 raise
930 return rep_pdu
932 def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
933 rep_pdu = None
934 rep = None
935 raw_pdu = self.recv_raw(
936 num_recv=4, hexdump=hexdump, timeout=timeout)
937 if raw_pdu is None:
938 return (None, None)
939 header = struct.unpack(">I", raw_pdu[0:4])
940 k5_len = header[0]
941 if k5_len == 0:
942 return (None, "")
943 missing = k5_len
944 rep_pdu = b''
945 while missing > 0:
946 raw_pdu = self.recv_raw(
947 num_recv=missing, hexdump=hexdump, timeout=timeout)
948 self.assertGreaterEqual(len(raw_pdu), 1)
949 rep_pdu += raw_pdu
950 missing = k5_len - len(rep_pdu)
951 k5_raw = self.der_decode(
952 rep_pdu,
953 asn1Spec=None,
954 native_encode=False,
955 asn1_print=False,
956 hexdump=False)
957 pvno = k5_raw['field-0']
958 self.assertEqual(pvno, 5)
959 msg_type = k5_raw['field-1']
960 self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
961 if msg_type == KRB_AS_REP:
962 asn1Spec = krb5_asn1.AS_REP()
963 elif msg_type == KRB_TGS_REP:
964 asn1Spec = krb5_asn1.TGS_REP()
965 elif msg_type == KRB_ERROR:
966 asn1Spec = krb5_asn1.KRB_ERROR()
967 rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
968 asn1_print=asn1_print, hexdump=False)
969 return (rep, rep_pdu)
971 def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
972 (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
973 hexdump=hexdump,
974 timeout=timeout)
975 return rep
977 def assertIsConnected(self):
978 self.assertIsNotNone(self.s, msg="Not connected")
980 def assertNotConnected(self):
981 self.assertIsNone(self.s, msg="Is connected")
983 def send_recv_transaction(
984 self,
985 req,
986 asn1_print=None,
987 hexdump=None,
988 timeout=None,
989 to_rodc=False):
990 host = self.host if to_rodc else self.dc_host
991 self.connect(host)
992 try:
993 self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
994 rep = self.recv_pdu(
995 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
996 except Exception:
997 self._disconnect("transaction failed")
998 raise
999 self._disconnect("transaction done")
1000 return rep
1002 def assertNoValue(self, value):
1003 self.assertTrue(value.isNoValue)
1005 def assertHasValue(self, value):
1006 self.assertIsNotNone(value)
1008 def getElementValue(self, obj, elem):
1009 return obj.get(elem)
1011 def assertElementMissing(self, obj, elem):
1012 v = self.getElementValue(obj, elem)
1013 self.assertIsNone(v)
1015 def assertElementPresent(self, obj, elem, expect_empty=False):
1016 v = self.getElementValue(obj, elem)
1017 self.assertIsNotNone(v)
1018 if self.strict_checking:
1019 if isinstance(v, collections.abc.Container):
1020 if expect_empty:
1021 self.assertEqual(0, len(v))
1022 else:
1023 self.assertNotEqual(0, len(v))
1025 def assertElementEqual(self, obj, elem, value):
1026 v = self.getElementValue(obj, elem)
1027 self.assertIsNotNone(v)
1028 self.assertEqual(v, value)
1030 def assertElementEqualUTF8(self, obj, elem, value):
1031 v = self.getElementValue(obj, elem)
1032 self.assertIsNotNone(v)
1033 self.assertEqual(v, bytes(value, 'utf8'))
1035 def assertPrincipalEqual(self, princ1, princ2):
1036 self.assertEqual(princ1['name-type'], princ2['name-type'])
1037 self.assertEqual(
1038 len(princ1['name-string']),
1039 len(princ2['name-string']),
1040 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1041 for idx in range(len(princ1['name-string'])):
1042 self.assertEqual(
1043 princ1['name-string'][idx],
1044 princ2['name-string'][idx],
1045 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1047 def assertElementEqualPrincipal(self, obj, elem, value):
1048 v = self.getElementValue(obj, elem)
1049 self.assertIsNotNone(v)
1050 v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
1051 self.assertPrincipalEqual(v, value)
1053 def assertElementKVNO(self, obj, elem, value):
1054 v = self.getElementValue(obj, elem)
1055 if value == "autodetect":
1056 value = v
1057 if value is not None:
1058 self.assertIsNotNone(v)
1059 # The value on the wire should never be 0
1060 self.assertNotEqual(v, 0)
1061 # unspecified_kvno means we don't know the kvno,
1062 # but want to enforce its presence
1063 if value is not self.unspecified_kvno:
1064 value = int(value)
1065 self.assertNotEqual(value, 0)
1066 self.assertEqual(v, value)
1067 else:
1068 self.assertIsNone(v)
1070 def assertElementFlags(self, obj, elem, expected, unexpected):
1071 v = self.getElementValue(obj, elem)
1072 self.assertIsNotNone(v)
1073 if expected is not None:
1074 self.assertIsInstance(expected, krb5_asn1.TicketFlags)
1075 for i, flag in enumerate(expected):
1076 if flag == 1:
1077 self.assertEqual('1', v[i],
1078 f"'{expected.namedValues[i]}' "
1079 f"expected in {v}")
1080 if unexpected is not None:
1081 self.assertIsInstance(unexpected, krb5_asn1.TicketFlags)
1082 for i, flag in enumerate(unexpected):
1083 if flag == 1:
1084 self.assertEqual('0', v[i],
1085 f"'{unexpected.namedValues[i]}' "
1086 f"unexpected in {v}")
1088 def assertSequenceElementsEqual(self, expected, got, *,
1089 require_strict=None):
1090 if self.strict_checking:
1091 self.assertEqual(expected, got)
1092 else:
1093 fail_msg = f'expected: {expected} got: {got}'
1095 if require_strict is not None:
1096 fail_msg += f' (ignoring: {require_strict})'
1097 expected = (x for x in expected if x not in require_strict)
1098 got = (x for x in got if x not in require_strict)
1100 self.assertCountEqual(expected, got, fail_msg)
1102 def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
1103 if epoch is None:
1104 epoch = time.time()
1105 if offset is not None:
1106 epoch = epoch + int(offset)
1107 dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
1108 return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
1110 def get_KerberosTime(self, epoch=None, offset=None):
1111 (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
1112 return s
1114 def get_EpochFromKerberosTime(self, kerberos_time):
1115 if isinstance(kerberos_time, bytes):
1116 kerberos_time = kerberos_time.decode()
1118 epoch = datetime.datetime.strptime(kerberos_time,
1119 '%Y%m%d%H%M%SZ')
1120 epoch = epoch.replace(tzinfo=datetime.timezone.utc)
1121 epoch = int(epoch.timestamp())
1123 return epoch
1125 def get_Nonce(self):
1126 nonce_min = 0x7f000000
1127 nonce_max = 0x7fffffff
1128 v = random.randint(nonce_min, nonce_max)
1129 return v
1131 def get_pa_dict(self, pa_data):
1132 pa_dict = {}
1134 if pa_data is not None:
1135 for pa in pa_data:
1136 pa_type = pa['padata-type']
1137 if pa_type in pa_dict:
1138 raise RuntimeError(f'Duplicate type {pa_type}')
1139 pa_dict[pa_type] = pa['padata-value']
1141 return pa_dict
1143 def SessionKey_create(self, etype, contents, kvno=None):
1144 key = kcrypto.Key(etype, contents)
1145 return RodcPacEncryptionKey(key, kvno)
1147 def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None):
1148 self.assertIsNotNone(pwd)
1149 self.assertIsNotNone(salt)
1150 key = kcrypto.string_to_key(etype, pwd, salt)
1151 return RodcPacEncryptionKey(key, kvno)
1153 def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
1154 e = etype_info2['etype']
1156 salt = etype_info2.get('salt')
1158 if e == kcrypto.Enctype.RC4:
1159 nthash = creds.get_nt_hash()
1160 return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
1162 password = creds.get_password()
1163 return self.PasswordKey_create(
1164 etype=e, pwd=password, salt=salt, kvno=kvno)
1166 def TicketDecryptionKey_from_creds(self, creds, etype=None):
1168 if etype is None:
1169 etypes = creds.get_tgs_krb5_etypes()
1170 if etypes:
1171 etype = etypes[0]
1172 else:
1173 etype = kcrypto.Enctype.RC4
1175 forced_key = creds.get_forced_key(etype)
1176 if forced_key is not None:
1177 return forced_key
1179 kvno = creds.get_kvno()
1181 fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
1182 "nor a password specified, " % (
1183 creds.get_username(), etype, kvno))
1185 if etype == kcrypto.Enctype.RC4:
1186 nthash = creds.get_nt_hash()
1187 self.assertIsNotNone(nthash, msg=fail_msg)
1188 return self.SessionKey_create(etype=etype,
1189 contents=nthash,
1190 kvno=kvno)
1192 password = creds.get_password()
1193 self.assertIsNotNone(password, msg=fail_msg)
1194 salt = creds.get_salt()
1195 return self.PasswordKey_create(etype=etype,
1196 pwd=password,
1197 salt=salt,
1198 kvno=kvno)
1200 def RandomKey(self, etype):
1201 e = kcrypto._get_enctype_profile(etype)
1202 contents = samba.generate_random_bytes(e.keysize)
1203 return self.SessionKey_create(etype=etype, contents=contents)
1205 def EncryptionKey_import(self, EncryptionKey_obj):
1206 return self.SessionKey_create(EncryptionKey_obj['keytype'],
1207 EncryptionKey_obj['keyvalue'])
1209 def EncryptedData_create(self, key, usage, plaintext):
1210 # EncryptedData ::= SEQUENCE {
1211 # etype [0] Int32 -- EncryptionType --,
1212 # kvno [1] Int32 OPTIONAL,
1213 # cipher [2] OCTET STRING -- ciphertext
1215 ciphertext = key.encrypt(usage, plaintext)
1216 EncryptedData_obj = {
1217 'etype': key.etype,
1218 'cipher': ciphertext
1220 if key.kvno is not None:
1221 EncryptedData_obj['kvno'] = key.kvno
1222 return EncryptedData_obj
1224 def Checksum_create(self, key, usage, plaintext, ctype=None):
1225 # Checksum ::= SEQUENCE {
1226 # cksumtype [0] Int32,
1227 # checksum [1] OCTET STRING
1229 if ctype is None:
1230 ctype = key.ctype
1231 checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1232 Checksum_obj = {
1233 'cksumtype': ctype,
1234 'checksum': checksum,
1236 return Checksum_obj
1238 @classmethod
1239 def PrincipalName_create(cls, name_type, names):
1240 # PrincipalName ::= SEQUENCE {
1241 # name-type [0] Int32,
1242 # name-string [1] SEQUENCE OF KerberosString
1244 PrincipalName_obj = {
1245 'name-type': name_type,
1246 'name-string': names,
1248 return PrincipalName_obj
1250 def AuthorizationData_create(self, ad_type, ad_data):
1251 # AuthorizationData ::= SEQUENCE {
1252 # ad-type [0] Int32,
1253 # ad-data [1] OCTET STRING
1255 AUTH_DATA_obj = {
1256 'ad-type': ad_type,
1257 'ad-data': ad_data
1259 return AUTH_DATA_obj
1261 def PA_DATA_create(self, padata_type, padata_value):
1262 # PA-DATA ::= SEQUENCE {
1263 # -- NOTE: first tag is [1], not [0]
1264 # padata-type [1] Int32,
1265 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1267 PA_DATA_obj = {
1268 'padata-type': padata_type,
1269 'padata-value': padata_value,
1271 return PA_DATA_obj
1273 def PA_ENC_TS_ENC_create(self, ts, usec):
1274 # PA-ENC-TS-ENC ::= SEQUENCE {
1275 # patimestamp[0] KerberosTime, -- client's time
1276 # pausec[1] krb5int32 OPTIONAL
1278 PA_ENC_TS_ENC_obj = {
1279 'patimestamp': ts,
1280 'pausec': usec,
1282 return PA_ENC_TS_ENC_obj
1284 def PA_PAC_OPTIONS_create(self, options):
1285 # PA-PAC-OPTIONS ::= SEQUENCE {
1286 # options [0] PACOptionFlags
1288 PA_PAC_OPTIONS_obj = {
1289 'options': options
1291 return PA_PAC_OPTIONS_obj
1293 def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1294 # KrbFastArmor ::= SEQUENCE {
1295 # armor-type [0] Int32,
1296 # armor-value [1] OCTET STRING,
1297 # ...
1299 KRB_FAST_ARMOR_obj = {
1300 'armor-type': armor_type,
1301 'armor-value': armor_value
1303 return KRB_FAST_ARMOR_obj
1305 def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1306 # KrbFastReq ::= SEQUENCE {
1307 # fast-options [0] FastOptions,
1308 # padata [1] SEQUENCE OF PA-DATA,
1309 # req-body [2] KDC-REQ-BODY,
1310 # ...
1312 KRB_FAST_REQ_obj = {
1313 'fast-options': fast_options,
1314 'padata': padata,
1315 'req-body': req_body
1317 return KRB_FAST_REQ_obj
1319 def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1320 # KrbFastArmoredReq ::= SEQUENCE {
1321 # armor [0] KrbFastArmor OPTIONAL,
1322 # req-checksum [1] Checksum,
1323 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1325 KRB_FAST_ARMORED_REQ_obj = {
1326 'req-checksum': req_checksum,
1327 'enc-fast-req': enc_fast_req
1329 if armor is not None:
1330 KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1331 return KRB_FAST_ARMORED_REQ_obj
1333 def PA_FX_FAST_REQUEST_create(self, armored_data):
1334 # PA-FX-FAST-REQUEST ::= CHOICE {
1335 # armored-data [0] KrbFastArmoredReq,
1336 # ...
1338 PA_FX_FAST_REQUEST_obj = {
1339 'armored-data': armored_data
1341 return PA_FX_FAST_REQUEST_obj
1343 def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1344 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1345 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1346 # -- include PAC.
1347 # --If FALSE, and PAC present,
1348 # -- remove PAC.
1350 KERB_PA_PAC_REQUEST_obj = {
1351 'include-pac': include_pac,
1353 if not pa_data_create:
1354 return KERB_PA_PAC_REQUEST_obj
1355 pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1356 asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1357 pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1358 return pa_data
1360 def get_pa_pac_options(self, options):
1361 pac_options = self.PA_PAC_OPTIONS_create(options)
1362 pac_options = self.der_encode(pac_options,
1363 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
1364 pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options)
1366 return pac_options
1368 def KDC_REQ_BODY_create(self,
1369 kdc_options,
1370 cname,
1371 realm,
1372 sname,
1373 from_time,
1374 till_time,
1375 renew_time,
1376 nonce,
1377 etypes,
1378 addresses,
1379 additional_tickets,
1380 EncAuthorizationData,
1381 EncAuthorizationData_key,
1382 EncAuthorizationData_usage,
1383 asn1_print=None,
1384 hexdump=None):
1385 # KDC-REQ-BODY ::= SEQUENCE {
1386 # kdc-options [0] KDCOptions,
1387 # cname [1] PrincipalName OPTIONAL
1388 # -- Used only in AS-REQ --,
1389 # realm [2] Realm
1390 # -- Server's realm
1391 # -- Also client's in AS-REQ --,
1392 # sname [3] PrincipalName OPTIONAL,
1393 # from [4] KerberosTime OPTIONAL,
1394 # till [5] KerberosTime,
1395 # rtime [6] KerberosTime OPTIONAL,
1396 # nonce [7] UInt32,
1397 # etype [8] SEQUENCE OF Int32
1398 # -- EncryptionType
1399 # -- in preference order --,
1400 # addresses [9] HostAddresses OPTIONAL,
1401 # enc-authorization-data [10] EncryptedData OPTIONAL
1402 # -- AuthorizationData --,
1403 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1404 # -- NOTE: not empty
1406 if EncAuthorizationData is not None:
1407 enc_ad_plain = self.der_encode(
1408 EncAuthorizationData,
1409 asn1Spec=krb5_asn1.AuthorizationData(),
1410 asn1_print=asn1_print,
1411 hexdump=hexdump)
1412 enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1413 EncAuthorizationData_usage,
1414 enc_ad_plain)
1415 else:
1416 enc_ad = None
1417 KDC_REQ_BODY_obj = {
1418 'kdc-options': kdc_options,
1419 'realm': realm,
1420 'till': till_time,
1421 'nonce': nonce,
1422 'etype': etypes,
1424 if cname is not None:
1425 KDC_REQ_BODY_obj['cname'] = cname
1426 if sname is not None:
1427 KDC_REQ_BODY_obj['sname'] = sname
1428 if from_time is not None:
1429 KDC_REQ_BODY_obj['from'] = from_time
1430 if renew_time is not None:
1431 KDC_REQ_BODY_obj['rtime'] = renew_time
1432 if addresses is not None:
1433 KDC_REQ_BODY_obj['addresses'] = addresses
1434 if enc_ad is not None:
1435 KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1436 if additional_tickets is not None:
1437 KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1438 return KDC_REQ_BODY_obj
1440 def KDC_REQ_create(self,
1441 msg_type,
1442 padata,
1443 req_body,
1444 asn1Spec=None,
1445 asn1_print=None,
1446 hexdump=None):
1447 # KDC-REQ ::= SEQUENCE {
1448 # -- NOTE: first tag is [1], not [0]
1449 # pvno [1] INTEGER (5) ,
1450 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1451 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1452 # -- NOTE: not empty --,
1453 # req-body [4] KDC-REQ-BODY
1456 KDC_REQ_obj = {
1457 'pvno': 5,
1458 'msg-type': msg_type,
1459 'req-body': req_body,
1461 if padata is not None:
1462 KDC_REQ_obj['padata'] = padata
1463 if asn1Spec is not None:
1464 KDC_REQ_decoded = pyasn1_native_decode(
1465 KDC_REQ_obj, asn1Spec=asn1Spec)
1466 else:
1467 KDC_REQ_decoded = None
1468 return KDC_REQ_obj, KDC_REQ_decoded
1470 def AS_REQ_create(self,
1471 padata, # optional
1472 kdc_options, # required
1473 cname, # optional
1474 realm, # required
1475 sname, # optional
1476 from_time, # optional
1477 till_time, # required
1478 renew_time, # optional
1479 nonce, # required
1480 etypes, # required
1481 addresses, # optional
1482 additional_tickets,
1483 native_decoded_only=True,
1484 asn1_print=None,
1485 hexdump=None):
1486 # KDC-REQ ::= SEQUENCE {
1487 # -- NOTE: first tag is [1], not [0]
1488 # pvno [1] INTEGER (5) ,
1489 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1490 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1491 # -- NOTE: not empty --,
1492 # req-body [4] KDC-REQ-BODY
1495 # KDC-REQ-BODY ::= SEQUENCE {
1496 # kdc-options [0] KDCOptions,
1497 # cname [1] PrincipalName OPTIONAL
1498 # -- Used only in AS-REQ --,
1499 # realm [2] Realm
1500 # -- Server's realm
1501 # -- Also client's in AS-REQ --,
1502 # sname [3] PrincipalName OPTIONAL,
1503 # from [4] KerberosTime OPTIONAL,
1504 # till [5] KerberosTime,
1505 # rtime [6] KerberosTime OPTIONAL,
1506 # nonce [7] UInt32,
1507 # etype [8] SEQUENCE OF Int32
1508 # -- EncryptionType
1509 # -- in preference order --,
1510 # addresses [9] HostAddresses OPTIONAL,
1511 # enc-authorization-data [10] EncryptedData OPTIONAL
1512 # -- AuthorizationData --,
1513 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1514 # -- NOTE: not empty
1516 KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1517 kdc_options,
1518 cname,
1519 realm,
1520 sname,
1521 from_time,
1522 till_time,
1523 renew_time,
1524 nonce,
1525 etypes,
1526 addresses,
1527 additional_tickets,
1528 EncAuthorizationData=None,
1529 EncAuthorizationData_key=None,
1530 EncAuthorizationData_usage=None,
1531 asn1_print=asn1_print,
1532 hexdump=hexdump)
1533 obj, decoded = self.KDC_REQ_create(
1534 msg_type=KRB_AS_REQ,
1535 padata=padata,
1536 req_body=KDC_REQ_BODY_obj,
1537 asn1Spec=krb5_asn1.AS_REQ(),
1538 asn1_print=asn1_print,
1539 hexdump=hexdump)
1540 if native_decoded_only:
1541 return decoded
1542 return decoded, obj
1544 def AP_REQ_create(self, ap_options, ticket, authenticator):
1545 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1546 # pvno [0] INTEGER (5),
1547 # msg-type [1] INTEGER (14),
1548 # ap-options [2] APOptions,
1549 # ticket [3] Ticket,
1550 # authenticator [4] EncryptedData -- Authenticator
1552 AP_REQ_obj = {
1553 'pvno': 5,
1554 'msg-type': KRB_AP_REQ,
1555 'ap-options': ap_options,
1556 'ticket': ticket,
1557 'authenticator': authenticator,
1559 return AP_REQ_obj
1561 def Authenticator_create(
1562 self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1563 authorization_data):
1564 # -- Unencrypted authenticator
1565 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1566 # authenticator-vno [0] INTEGER (5),
1567 # crealm [1] Realm,
1568 # cname [2] PrincipalName,
1569 # cksum [3] Checksum OPTIONAL,
1570 # cusec [4] Microseconds,
1571 # ctime [5] KerberosTime,
1572 # subkey [6] EncryptionKey OPTIONAL,
1573 # seq-number [7] UInt32 OPTIONAL,
1574 # authorization-data [8] AuthorizationData OPTIONAL
1576 Authenticator_obj = {
1577 'authenticator-vno': 5,
1578 'crealm': crealm,
1579 'cname': cname,
1580 'cusec': cusec,
1581 'ctime': ctime,
1583 if cksum is not None:
1584 Authenticator_obj['cksum'] = cksum
1585 if subkey is not None:
1586 Authenticator_obj['subkey'] = subkey
1587 if seq_number is not None:
1588 Authenticator_obj['seq-number'] = seq_number
1589 if authorization_data is not None:
1590 Authenticator_obj['authorization-data'] = authorization_data
1591 return Authenticator_obj
1593 def TGS_REQ_create(self,
1594 padata, # optional
1595 cusec,
1596 ctime,
1597 ticket,
1598 kdc_options, # required
1599 cname, # optional
1600 realm, # required
1601 sname, # optional
1602 from_time, # optional
1603 till_time, # required
1604 renew_time, # optional
1605 nonce, # required
1606 etypes, # required
1607 addresses, # optional
1608 EncAuthorizationData,
1609 EncAuthorizationData_key,
1610 additional_tickets,
1611 ticket_session_key,
1612 authenticator_subkey=None,
1613 body_checksum_type=None,
1614 native_decoded_only=True,
1615 asn1_print=None,
1616 hexdump=None):
1617 # KDC-REQ ::= SEQUENCE {
1618 # -- NOTE: first tag is [1], not [0]
1619 # pvno [1] INTEGER (5) ,
1620 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1621 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1622 # -- NOTE: not empty --,
1623 # req-body [4] KDC-REQ-BODY
1626 # KDC-REQ-BODY ::= SEQUENCE {
1627 # kdc-options [0] KDCOptions,
1628 # cname [1] PrincipalName OPTIONAL
1629 # -- Used only in AS-REQ --,
1630 # realm [2] Realm
1631 # -- Server's realm
1632 # -- Also client's in AS-REQ --,
1633 # sname [3] PrincipalName OPTIONAL,
1634 # from [4] KerberosTime OPTIONAL,
1635 # till [5] KerberosTime,
1636 # rtime [6] KerberosTime OPTIONAL,
1637 # nonce [7] UInt32,
1638 # etype [8] SEQUENCE OF Int32
1639 # -- EncryptionType
1640 # -- in preference order --,
1641 # addresses [9] HostAddresses OPTIONAL,
1642 # enc-authorization-data [10] EncryptedData OPTIONAL
1643 # -- AuthorizationData --,
1644 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1645 # -- NOTE: not empty
1648 if authenticator_subkey is not None:
1649 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1650 else:
1651 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1653 req_body = self.KDC_REQ_BODY_create(
1654 kdc_options=kdc_options,
1655 cname=None,
1656 realm=realm,
1657 sname=sname,
1658 from_time=from_time,
1659 till_time=till_time,
1660 renew_time=renew_time,
1661 nonce=nonce,
1662 etypes=etypes,
1663 addresses=addresses,
1664 additional_tickets=additional_tickets,
1665 EncAuthorizationData=EncAuthorizationData,
1666 EncAuthorizationData_key=EncAuthorizationData_key,
1667 EncAuthorizationData_usage=EncAuthorizationData_usage)
1668 req_body_blob = self.der_encode(req_body,
1669 asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1670 asn1_print=asn1_print, hexdump=hexdump)
1672 req_body_checksum = self.Checksum_create(ticket_session_key,
1673 KU_TGS_REQ_AUTH_CKSUM,
1674 req_body_blob,
1675 ctype=body_checksum_type)
1677 subkey_obj = None
1678 if authenticator_subkey is not None:
1679 subkey_obj = authenticator_subkey.export_obj()
1680 seq_number = random.randint(0, 0xfffffffe)
1681 authenticator = self.Authenticator_create(
1682 crealm=realm,
1683 cname=cname,
1684 cksum=req_body_checksum,
1685 cusec=cusec,
1686 ctime=ctime,
1687 subkey=subkey_obj,
1688 seq_number=seq_number,
1689 authorization_data=None)
1690 authenticator = self.der_encode(
1691 authenticator,
1692 asn1Spec=krb5_asn1.Authenticator(),
1693 asn1_print=asn1_print,
1694 hexdump=hexdump)
1696 authenticator = self.EncryptedData_create(
1697 ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1699 ap_options = krb5_asn1.APOptions('0')
1700 ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1701 ticket=ticket,
1702 authenticator=authenticator)
1703 ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1704 asn1_print=asn1_print, hexdump=hexdump)
1705 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1706 if padata is not None:
1707 padata.append(pa_tgs_req)
1708 else:
1709 padata = [pa_tgs_req]
1711 obj, decoded = self.KDC_REQ_create(
1712 msg_type=KRB_TGS_REQ,
1713 padata=padata,
1714 req_body=req_body,
1715 asn1Spec=krb5_asn1.TGS_REQ(),
1716 asn1_print=asn1_print,
1717 hexdump=hexdump)
1718 if native_decoded_only:
1719 return decoded
1720 return decoded, obj
1722 def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1723 # PA-S4U2Self ::= SEQUENCE {
1724 # name [0] PrincipalName,
1725 # realm [1] Realm,
1726 # cksum [2] Checksum,
1727 # auth [3] GeneralString
1729 cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1730 for n in name['name-string']:
1731 cksum_data += n.encode()
1732 cksum_data += realm.encode()
1733 cksum_data += "Kerberos".encode()
1734 cksum = self.Checksum_create(tgt_session_key,
1735 KU_NON_KERB_CKSUM_SALT,
1736 cksum_data,
1737 ctype)
1739 PA_S4U2Self_obj = {
1740 'name': name,
1741 'realm': realm,
1742 'cksum': cksum,
1743 'auth': "Kerberos",
1745 pa_s4u2self = self.der_encode(
1746 PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
1747 return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
1749 def _generic_kdc_exchange(self,
1750 kdc_exchange_dict, # required
1751 cname=None, # optional
1752 realm=None, # required
1753 sname=None, # optional
1754 from_time=None, # optional
1755 till_time=None, # required
1756 renew_time=None, # optional
1757 etypes=None, # required
1758 addresses=None, # optional
1759 additional_tickets=None, # optional
1760 EncAuthorizationData=None, # optional
1761 EncAuthorizationData_key=None, # optional
1762 EncAuthorizationData_usage=None): # optional
1764 check_error_fn = kdc_exchange_dict['check_error_fn']
1765 check_rep_fn = kdc_exchange_dict['check_rep_fn']
1766 generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
1767 generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
1768 generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
1769 generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
1770 callback_dict = kdc_exchange_dict['callback_dict']
1771 req_msg_type = kdc_exchange_dict['req_msg_type']
1772 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
1773 rep_msg_type = kdc_exchange_dict['rep_msg_type']
1775 expected_error_mode = kdc_exchange_dict['expected_error_mode']
1776 kdc_options = kdc_exchange_dict['kdc_options']
1778 pac_request = kdc_exchange_dict['pac_request']
1779 pac_options = kdc_exchange_dict['pac_options']
1781 # Parameters specific to the inner request body
1782 inner_req = kdc_exchange_dict['inner_req']
1784 # Parameters specific to the outer request body
1785 outer_req = kdc_exchange_dict['outer_req']
1787 if till_time is None:
1788 till_time = self.get_KerberosTime(offset=36000)
1790 if 'nonce' in kdc_exchange_dict:
1791 nonce = kdc_exchange_dict['nonce']
1792 else:
1793 nonce = self.get_Nonce()
1794 kdc_exchange_dict['nonce'] = nonce
1796 req_body = self.KDC_REQ_BODY_create(
1797 kdc_options=kdc_options,
1798 cname=cname,
1799 realm=realm,
1800 sname=sname,
1801 from_time=from_time,
1802 till_time=till_time,
1803 renew_time=renew_time,
1804 nonce=nonce,
1805 etypes=etypes,
1806 addresses=addresses,
1807 additional_tickets=additional_tickets,
1808 EncAuthorizationData=EncAuthorizationData,
1809 EncAuthorizationData_key=EncAuthorizationData_key,
1810 EncAuthorizationData_usage=EncAuthorizationData_usage)
1812 inner_req_body = dict(req_body)
1813 if inner_req is not None:
1814 for key, value in inner_req.items():
1815 if value is not None:
1816 inner_req_body[key] = value
1817 else:
1818 del inner_req_body[key]
1819 if outer_req is not None:
1820 for key, value in outer_req.items():
1821 if value is not None:
1822 req_body[key] = value
1823 else:
1824 del req_body[key]
1826 additional_padata = []
1827 if pac_request is not None:
1828 pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request)
1829 additional_padata.append(pa_pac_request)
1830 if pac_options is not None:
1831 pa_pac_options = self.get_pa_pac_options(pac_options)
1832 additional_padata.append(pa_pac_options)
1834 if req_msg_type == KRB_AS_REQ:
1835 tgs_req = None
1836 tgs_req_padata = None
1837 else:
1838 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1840 tgs_req = self.generate_ap_req(kdc_exchange_dict,
1841 callback_dict,
1842 req_body,
1843 armor=False)
1844 tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
1846 if generate_fast_padata_fn is not None:
1847 self.assertIsNotNone(generate_fast_fn)
1848 # This can alter req_body...
1849 fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
1850 callback_dict,
1851 req_body)
1852 else:
1853 fast_padata = []
1855 if generate_fast_armor_fn is not None:
1856 self.assertIsNotNone(generate_fast_fn)
1857 fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
1858 callback_dict,
1859 req_body,
1860 armor=True)
1862 fast_armor_type = kdc_exchange_dict['fast_armor_type']
1863 fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
1864 fast_ap_req)
1865 else:
1866 fast_armor = None
1868 if generate_padata_fn is not None:
1869 # This can alter req_body...
1870 outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
1871 callback_dict,
1872 req_body)
1873 self.assertIsNotNone(outer_padata)
1874 self.assertNotIn(PADATA_KDC_REQ,
1875 [pa['padata-type'] for pa in outer_padata],
1876 'Don\'t create TGS-REQ manually')
1877 else:
1878 outer_padata = None
1880 if generate_fast_fn is not None:
1881 armor_key = kdc_exchange_dict['armor_key']
1882 self.assertIsNotNone(armor_key)
1884 if req_msg_type == KRB_AS_REQ:
1885 checksum_blob = self.der_encode(
1886 req_body,
1887 asn1Spec=krb5_asn1.KDC_REQ_BODY())
1888 else:
1889 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1890 checksum_blob = tgs_req
1892 checksum = self.Checksum_create(armor_key,
1893 KU_FAST_REQ_CHKSUM,
1894 checksum_blob)
1896 fast_padata += additional_padata
1897 fast = generate_fast_fn(kdc_exchange_dict,
1898 callback_dict,
1899 inner_req_body,
1900 fast_padata,
1901 fast_armor,
1902 checksum)
1903 else:
1904 fast = None
1906 padata = []
1908 if tgs_req_padata is not None:
1909 padata.append(tgs_req_padata)
1911 if fast is not None:
1912 padata.append(fast)
1914 if outer_padata is not None:
1915 padata += outer_padata
1917 if fast is None:
1918 padata += additional_padata
1920 if not padata:
1921 padata = None
1923 kdc_exchange_dict['req_padata'] = padata
1924 kdc_exchange_dict['fast_padata'] = fast_padata
1925 kdc_exchange_dict['req_body'] = inner_req_body
1927 req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
1928 padata=padata,
1929 req_body=req_body,
1930 asn1Spec=req_asn1Spec())
1932 to_rodc = kdc_exchange_dict['to_rodc']
1934 rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc)
1935 self.assertIsNotNone(rep)
1937 msg_type = self.getElementValue(rep, 'msg-type')
1938 self.assertIsNotNone(msg_type)
1940 expected_msg_type = None
1941 if check_error_fn is not None:
1942 expected_msg_type = KRB_ERROR
1943 self.assertIsNone(check_rep_fn)
1944 self.assertNotEqual(0, len(expected_error_mode))
1945 self.assertNotIn(0, expected_error_mode)
1946 if check_rep_fn is not None:
1947 expected_msg_type = rep_msg_type
1948 self.assertIsNone(check_error_fn)
1949 self.assertEqual(0, len(expected_error_mode))
1950 self.assertIsNotNone(expected_msg_type)
1951 if msg_type == KRB_ERROR:
1952 error_code = self.getElementValue(rep, 'error-code')
1953 fail_msg = f'Got unexpected error: {error_code}'
1954 else:
1955 fail_msg = f'Expected to fail with error: {expected_error_mode}'
1956 self.assertEqual(msg_type, expected_msg_type, fail_msg)
1958 if msg_type == KRB_ERROR:
1959 return check_error_fn(kdc_exchange_dict,
1960 callback_dict,
1961 rep)
1963 return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
1965 def as_exchange_dict(self,
1966 expected_crealm=None,
1967 expected_cname=None,
1968 expected_anon=False,
1969 expected_srealm=None,
1970 expected_sname=None,
1971 expected_supported_etypes=None,
1972 expected_flags=None,
1973 unexpected_flags=None,
1974 ticket_decryption_key=None,
1975 expect_ticket_checksum=None,
1976 generate_fast_fn=None,
1977 generate_fast_armor_fn=None,
1978 generate_fast_padata_fn=None,
1979 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
1980 generate_padata_fn=None,
1981 check_error_fn=None,
1982 check_rep_fn=None,
1983 check_kdc_private_fn=None,
1984 callback_dict=None,
1985 expected_error_mode=0,
1986 expected_status=None,
1987 client_as_etypes=None,
1988 expected_salt=None,
1989 authenticator_subkey=None,
1990 preauth_key=None,
1991 armor_key=None,
1992 armor_tgt=None,
1993 armor_subkey=None,
1994 auth_data=None,
1995 kdc_options='',
1996 inner_req=None,
1997 outer_req=None,
1998 pac_request=None,
1999 pac_options=None,
2000 expect_edata=None,
2001 expect_pac=True,
2002 expect_claims=True,
2003 to_rodc=False):
2004 if expected_error_mode == 0:
2005 expected_error_mode = ()
2006 elif not isinstance(expected_error_mode, collections.abc.Container):
2007 expected_error_mode = (expected_error_mode,)
2009 kdc_exchange_dict = {
2010 'req_msg_type': KRB_AS_REQ,
2011 'req_asn1Spec': krb5_asn1.AS_REQ,
2012 'rep_msg_type': KRB_AS_REP,
2013 'rep_asn1Spec': krb5_asn1.AS_REP,
2014 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
2015 'expected_crealm': expected_crealm,
2016 'expected_cname': expected_cname,
2017 'expected_anon': expected_anon,
2018 'expected_srealm': expected_srealm,
2019 'expected_sname': expected_sname,
2020 'expected_supported_etypes': expected_supported_etypes,
2021 'expected_flags': expected_flags,
2022 'unexpected_flags': unexpected_flags,
2023 'ticket_decryption_key': ticket_decryption_key,
2024 'expect_ticket_checksum': expect_ticket_checksum,
2025 'generate_fast_fn': generate_fast_fn,
2026 'generate_fast_armor_fn': generate_fast_armor_fn,
2027 'generate_fast_padata_fn': generate_fast_padata_fn,
2028 'fast_armor_type': fast_armor_type,
2029 'generate_padata_fn': generate_padata_fn,
2030 'check_error_fn': check_error_fn,
2031 'check_rep_fn': check_rep_fn,
2032 'check_kdc_private_fn': check_kdc_private_fn,
2033 'callback_dict': callback_dict,
2034 'expected_error_mode': expected_error_mode,
2035 'expected_status': expected_status,
2036 'client_as_etypes': client_as_etypes,
2037 'expected_salt': expected_salt,
2038 'authenticator_subkey': authenticator_subkey,
2039 'preauth_key': preauth_key,
2040 'armor_key': armor_key,
2041 'armor_tgt': armor_tgt,
2042 'armor_subkey': armor_subkey,
2043 'auth_data': auth_data,
2044 'kdc_options': kdc_options,
2045 'inner_req': inner_req,
2046 'outer_req': outer_req,
2047 'pac_request': pac_request,
2048 'pac_options': pac_options,
2049 'expect_edata': expect_edata,
2050 'expect_pac': expect_pac,
2051 'expect_claims': expect_claims,
2052 'to_rodc': to_rodc
2054 if callback_dict is None:
2055 callback_dict = {}
2057 return kdc_exchange_dict
2059 def tgs_exchange_dict(self,
2060 expected_crealm=None,
2061 expected_cname=None,
2062 expected_anon=False,
2063 expected_srealm=None,
2064 expected_sname=None,
2065 expected_supported_etypes=None,
2066 expected_flags=None,
2067 unexpected_flags=None,
2068 ticket_decryption_key=None,
2069 expect_ticket_checksum=None,
2070 generate_fast_fn=None,
2071 generate_fast_armor_fn=None,
2072 generate_fast_padata_fn=None,
2073 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2074 generate_padata_fn=None,
2075 check_error_fn=None,
2076 check_rep_fn=None,
2077 check_kdc_private_fn=None,
2078 expected_error_mode=0,
2079 expected_status=None,
2080 callback_dict=None,
2081 tgt=None,
2082 armor_key=None,
2083 armor_tgt=None,
2084 armor_subkey=None,
2085 authenticator_subkey=None,
2086 auth_data=None,
2087 body_checksum_type=None,
2088 kdc_options='',
2089 inner_req=None,
2090 outer_req=None,
2091 pac_request=None,
2092 pac_options=None,
2093 expect_edata=None,
2094 expect_pac=True,
2095 expect_claims=True,
2096 expected_proxy_target=None,
2097 expected_transited_services=None,
2098 to_rodc=False):
2099 if expected_error_mode == 0:
2100 expected_error_mode = ()
2101 elif not isinstance(expected_error_mode, collections.abc.Container):
2102 expected_error_mode = (expected_error_mode,)
2104 kdc_exchange_dict = {
2105 'req_msg_type': KRB_TGS_REQ,
2106 'req_asn1Spec': krb5_asn1.TGS_REQ,
2107 'rep_msg_type': KRB_TGS_REP,
2108 'rep_asn1Spec': krb5_asn1.TGS_REP,
2109 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
2110 'expected_crealm': expected_crealm,
2111 'expected_cname': expected_cname,
2112 'expected_anon': expected_anon,
2113 'expected_srealm': expected_srealm,
2114 'expected_sname': expected_sname,
2115 'expected_supported_etypes': expected_supported_etypes,
2116 'expected_flags': expected_flags,
2117 'unexpected_flags': unexpected_flags,
2118 'ticket_decryption_key': ticket_decryption_key,
2119 'expect_ticket_checksum': expect_ticket_checksum,
2120 'generate_fast_fn': generate_fast_fn,
2121 'generate_fast_armor_fn': generate_fast_armor_fn,
2122 'generate_fast_padata_fn': generate_fast_padata_fn,
2123 'fast_armor_type': fast_armor_type,
2124 'generate_padata_fn': generate_padata_fn,
2125 'check_error_fn': check_error_fn,
2126 'check_rep_fn': check_rep_fn,
2127 'check_kdc_private_fn': check_kdc_private_fn,
2128 'callback_dict': callback_dict,
2129 'expected_error_mode': expected_error_mode,
2130 'expected_status': expected_status,
2131 'tgt': tgt,
2132 'body_checksum_type': body_checksum_type,
2133 'armor_key': armor_key,
2134 'armor_tgt': armor_tgt,
2135 'armor_subkey': armor_subkey,
2136 'auth_data': auth_data,
2137 'authenticator_subkey': authenticator_subkey,
2138 'kdc_options': kdc_options,
2139 'inner_req': inner_req,
2140 'outer_req': outer_req,
2141 'pac_request': pac_request,
2142 'pac_options': pac_options,
2143 'expect_edata': expect_edata,
2144 'expect_pac': expect_pac,
2145 'expect_claims': expect_claims,
2146 'expected_proxy_target': expected_proxy_target,
2147 'expected_transited_services': expected_transited_services,
2148 'to_rodc': to_rodc
2150 if callback_dict is None:
2151 callback_dict = {}
2153 return kdc_exchange_dict
2155 def generic_check_kdc_rep(self,
2156 kdc_exchange_dict,
2157 callback_dict,
2158 rep):
2160 expected_crealm = kdc_exchange_dict['expected_crealm']
2161 expected_anon = kdc_exchange_dict['expected_anon']
2162 expected_srealm = kdc_exchange_dict['expected_srealm']
2163 expected_sname = kdc_exchange_dict['expected_sname']
2164 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2165 check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
2166 rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
2167 msg_type = kdc_exchange_dict['rep_msg_type']
2168 armor_key = kdc_exchange_dict['armor_key']
2170 self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
2171 padata = self.getElementValue(rep, 'padata')
2172 if self.strict_checking:
2173 self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
2174 if expected_anon:
2175 expected_cname = self.PrincipalName_create(
2176 name_type=NT_WELLKNOWN,
2177 names=['WELLKNOWN', 'ANONYMOUS'])
2178 else:
2179 expected_cname = kdc_exchange_dict['expected_cname']
2180 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2181 self.assertElementPresent(rep, 'ticket')
2182 ticket = self.getElementValue(rep, 'ticket')
2183 ticket_encpart = None
2184 ticket_cipher = None
2185 self.assertIsNotNone(ticket)
2186 if ticket is not None: # Never None, but gives indentation
2187 self.assertElementEqual(ticket, 'tkt-vno', 5)
2188 self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
2189 self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
2190 self.assertElementPresent(ticket, 'enc-part')
2191 ticket_encpart = self.getElementValue(ticket, 'enc-part')
2192 self.assertIsNotNone(ticket_encpart)
2193 if ticket_encpart is not None: # Never None, but gives indentation
2194 self.assertElementPresent(ticket_encpart, 'etype')
2195 # 'unspecified' means present, with any value != 0
2196 self.assertElementKVNO(ticket_encpart, 'kvno',
2197 self.unspecified_kvno)
2198 self.assertElementPresent(ticket_encpart, 'cipher')
2199 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
2200 self.assertElementPresent(rep, 'enc-part')
2201 encpart = self.getElementValue(rep, 'enc-part')
2202 encpart_cipher = None
2203 self.assertIsNotNone(encpart)
2204 if encpart is not None: # Never None, but gives indentation
2205 self.assertElementPresent(encpart, 'etype')
2206 self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
2207 self.assertElementPresent(encpart, 'cipher')
2208 encpart_cipher = self.getElementValue(encpart, 'cipher')
2210 ticket_checksum = None
2212 # Get the decryption key for the encrypted part
2213 encpart_decryption_key, encpart_decryption_usage = (
2214 self.get_preauth_key(kdc_exchange_dict))
2216 if armor_key is not None:
2217 pa_dict = self.get_pa_dict(padata)
2219 if PADATA_FX_FAST in pa_dict:
2220 fx_fast_data = pa_dict[PADATA_FX_FAST]
2221 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
2222 fx_fast_data,
2223 armor_key,
2224 finished=True)
2226 if 'strengthen-key' in fast_response:
2227 strengthen_key = self.EncryptionKey_import(
2228 fast_response['strengthen-key'])
2229 encpart_decryption_key = (
2230 self.generate_strengthen_reply_key(
2231 strengthen_key,
2232 encpart_decryption_key))
2234 fast_finished = fast_response.get('finished')
2235 if fast_finished is not None:
2236 ticket_checksum = fast_finished['ticket-checksum']
2238 self.check_rep_padata(kdc_exchange_dict,
2239 callback_dict,
2240 fast_response['padata'],
2241 error_code=0)
2243 ticket_private = None
2244 if ticket_decryption_key is not None:
2245 self.assertElementEqual(ticket_encpart, 'etype',
2246 ticket_decryption_key.etype)
2247 self.assertElementKVNO(ticket_encpart, 'kvno',
2248 ticket_decryption_key.kvno)
2249 ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
2250 ticket_cipher)
2251 ticket_private = self.der_decode(
2252 ticket_decpart,
2253 asn1Spec=krb5_asn1.EncTicketPart())
2255 encpart_private = None
2256 self.assertIsNotNone(encpart_decryption_key)
2257 if encpart_decryption_key is not None:
2258 self.assertElementEqual(encpart, 'etype',
2259 encpart_decryption_key.etype)
2260 if self.strict_checking:
2261 self.assertElementKVNO(encpart, 'kvno',
2262 encpart_decryption_key.kvno)
2263 rep_decpart = encpart_decryption_key.decrypt(
2264 encpart_decryption_usage,
2265 encpart_cipher)
2266 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2267 # application tag 26
2268 try:
2269 encpart_private = self.der_decode(
2270 rep_decpart,
2271 asn1Spec=rep_encpart_asn1Spec())
2272 except Exception:
2273 encpart_private = self.der_decode(
2274 rep_decpart,
2275 asn1Spec=krb5_asn1.EncTGSRepPart())
2277 self.assertIsNotNone(check_kdc_private_fn)
2278 if check_kdc_private_fn is not None:
2279 check_kdc_private_fn(kdc_exchange_dict, callback_dict,
2280 rep, ticket_private, encpart_private,
2281 ticket_checksum)
2283 return rep
2285 def check_fx_fast_data(self,
2286 kdc_exchange_dict,
2287 fx_fast_data,
2288 armor_key,
2289 finished=False,
2290 expect_strengthen_key=True):
2291 fx_fast_data = self.der_decode(fx_fast_data,
2292 asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
2294 enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
2295 self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
2297 fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
2299 fast_response = self.der_decode(fast_rep,
2300 asn1Spec=krb5_asn1.KrbFastResponse())
2302 if expect_strengthen_key and self.strict_checking:
2303 self.assertIn('strengthen-key', fast_response)
2305 if finished:
2306 self.assertIn('finished', fast_response)
2308 # Ensure that the nonce matches the nonce in the body of the request
2309 # (RFC6113 5.4.3).
2310 nonce = kdc_exchange_dict['nonce']
2311 self.assertEqual(nonce, fast_response['nonce'])
2313 return fast_response
2315 def generic_check_kdc_private(self,
2316 kdc_exchange_dict,
2317 callback_dict,
2318 rep,
2319 ticket_private,
2320 encpart_private,
2321 ticket_checksum):
2322 kdc_options = kdc_exchange_dict['kdc_options']
2323 canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
2324 canonicalize = (canon_pos < len(kdc_options)
2325 and kdc_options[canon_pos] == '1')
2326 renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1
2327 renewable = (renewable_pos < len(kdc_options)
2328 and kdc_options[renewable_pos] == '1')
2330 expected_crealm = kdc_exchange_dict['expected_crealm']
2331 expected_cname = kdc_exchange_dict['expected_cname']
2332 expected_srealm = kdc_exchange_dict['expected_srealm']
2333 expected_sname = kdc_exchange_dict['expected_sname']
2334 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2336 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2338 expected_flags = kdc_exchange_dict.get('expected_flags')
2339 unexpected_flags = kdc_exchange_dict.get('unexpected_flags')
2341 ticket = self.getElementValue(rep, 'ticket')
2343 if ticket_checksum is not None:
2344 armor_key = kdc_exchange_dict['armor_key']
2345 self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
2347 to_rodc = kdc_exchange_dict['to_rodc']
2348 if to_rodc:
2349 krbtgt_creds = self.get_rodc_krbtgt_creds()
2350 else:
2351 krbtgt_creds = self.get_krbtgt_creds()
2352 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
2354 expect_pac = kdc_exchange_dict['expect_pac']
2356 ticket_session_key = None
2357 if ticket_private is not None:
2358 self.assertElementFlags(ticket_private, 'flags',
2359 expected_flags,
2360 unexpected_flags)
2361 self.assertElementPresent(ticket_private, 'key')
2362 ticket_key = self.getElementValue(ticket_private, 'key')
2363 self.assertIsNotNone(ticket_key)
2364 if ticket_key is not None: # Never None, but gives indentation
2365 self.assertElementPresent(ticket_key, 'keytype')
2366 self.assertElementPresent(ticket_key, 'keyvalue')
2367 ticket_session_key = self.EncryptionKey_import(ticket_key)
2368 self.assertElementEqualUTF8(ticket_private, 'crealm',
2369 expected_crealm)
2370 if self.strict_checking:
2371 self.assertElementEqualPrincipal(ticket_private, 'cname',
2372 expected_cname)
2373 self.assertElementPresent(ticket_private, 'transited')
2374 self.assertElementPresent(ticket_private, 'authtime')
2375 if self.strict_checking:
2376 self.assertElementPresent(ticket_private, 'starttime')
2377 self.assertElementPresent(ticket_private, 'endtime')
2378 if renewable:
2379 if self.strict_checking:
2380 self.assertElementPresent(ticket_private, 'renew-till')
2381 else:
2382 self.assertElementMissing(ticket_private, 'renew-till')
2383 if self.strict_checking:
2384 self.assertElementEqual(ticket_private, 'caddr', [])
2385 self.assertElementPresent(ticket_private, 'authorization-data',
2386 expect_empty=not expect_pac)
2388 if expect_pac:
2389 authorization_data = self.getElementValue(ticket_private,
2390 'authorization-data')
2391 pac_data = self.get_pac(authorization_data)
2393 self.check_pac_buffers(pac_data, kdc_exchange_dict)
2395 encpart_session_key = None
2396 if encpart_private is not None:
2397 self.assertElementPresent(encpart_private, 'key')
2398 encpart_key = self.getElementValue(encpart_private, 'key')
2399 self.assertIsNotNone(encpart_key)
2400 if encpart_key is not None: # Never None, but gives indentation
2401 self.assertElementPresent(encpart_key, 'keytype')
2402 self.assertElementPresent(encpart_key, 'keyvalue')
2403 encpart_session_key = self.EncryptionKey_import(encpart_key)
2404 self.assertElementPresent(encpart_private, 'last-req')
2405 self.assertElementEqual(encpart_private, 'nonce',
2406 kdc_exchange_dict['nonce'])
2407 if rep_msg_type == KRB_AS_REP:
2408 if self.strict_checking:
2409 self.assertElementPresent(encpart_private,
2410 'key-expiration')
2411 else:
2412 self.assertElementMissing(encpart_private,
2413 'key-expiration')
2414 self.assertElementFlags(encpart_private, 'flags',
2415 expected_flags,
2416 unexpected_flags)
2417 self.assertElementPresent(encpart_private, 'authtime')
2418 if self.strict_checking:
2419 self.assertElementPresent(encpart_private, 'starttime')
2420 self.assertElementPresent(encpart_private, 'endtime')
2421 if renewable:
2422 if self.strict_checking:
2423 self.assertElementPresent(encpart_private, 'renew-till')
2424 else:
2425 self.assertElementMissing(encpart_private, 'renew-till')
2426 self.assertElementEqualUTF8(encpart_private, 'srealm',
2427 expected_srealm)
2428 self.assertElementEqualPrincipal(encpart_private, 'sname',
2429 expected_sname)
2430 if self.strict_checking:
2431 self.assertElementEqual(encpart_private, 'caddr', [])
2433 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2435 if self.strict_checking:
2436 if canonicalize or '1' in sent_pac_options:
2437 self.assertElementPresent(encpart_private,
2438 'encrypted-pa-data')
2439 enc_pa_dict = self.get_pa_dict(
2440 encpart_private['encrypted-pa-data'])
2441 if canonicalize:
2442 self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2444 expected_supported_etypes = kdc_exchange_dict[
2445 'expected_supported_etypes']
2446 expected_supported_etypes |= (
2447 security.KERB_ENCTYPE_DES_CBC_CRC |
2448 security.KERB_ENCTYPE_DES_CBC_MD5 |
2449 security.KERB_ENCTYPE_RC4_HMAC_MD5)
2451 (supported_etypes,) = struct.unpack(
2452 '<L',
2453 enc_pa_dict[PADATA_SUPPORTED_ETYPES])
2455 self.assertEqual(supported_etypes,
2456 expected_supported_etypes)
2457 else:
2458 self.assertNotIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2460 if '1' in sent_pac_options:
2461 self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2463 pac_options = self.der_decode(
2464 enc_pa_dict[PADATA_PAC_OPTIONS],
2465 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2467 self.assertElementEqual(pac_options, 'options',
2468 sent_pac_options)
2469 else:
2470 self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2471 else:
2472 self.assertElementEqual(encpart_private,
2473 'encrypted-pa-data',
2476 if ticket_session_key is not None and encpart_session_key is not None:
2477 self.assertEqual(ticket_session_key.etype,
2478 encpart_session_key.etype)
2479 self.assertEqual(ticket_session_key.key.contents,
2480 encpart_session_key.key.contents)
2481 if encpart_session_key is not None:
2482 session_key = encpart_session_key
2483 else:
2484 session_key = ticket_session_key
2485 ticket_creds = KerberosTicketCreds(
2486 ticket,
2487 session_key,
2488 crealm=expected_crealm,
2489 cname=expected_cname,
2490 srealm=expected_srealm,
2491 sname=expected_sname,
2492 decryption_key=ticket_decryption_key,
2493 ticket_private=ticket_private,
2494 encpart_private=encpart_private)
2496 expect_ticket_checksum = kdc_exchange_dict['expect_ticket_checksum']
2497 if expect_ticket_checksum:
2498 self.assertIsNotNone(ticket_decryption_key)
2500 if ticket_decryption_key is not None:
2501 self.verify_ticket(ticket_creds, krbtgt_key, expect_pac=expect_pac,
2502 expect_ticket_checksum=expect_ticket_checksum
2503 or self.tkt_sig_support)
2505 kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
2507 def check_pac_buffers(self, pac_data, kdc_exchange_dict):
2508 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
2510 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2511 armor_tgt = kdc_exchange_dict['armor_tgt']
2513 expected_sname = kdc_exchange_dict['expected_sname']
2514 expect_claims = kdc_exchange_dict['expect_claims']
2516 expected_types = [krb5pac.PAC_TYPE_LOGON_INFO,
2517 krb5pac.PAC_TYPE_SRV_CHECKSUM,
2518 krb5pac.PAC_TYPE_KDC_CHECKSUM,
2519 krb5pac.PAC_TYPE_LOGON_NAME,
2520 krb5pac.PAC_TYPE_UPN_DNS_INFO]
2522 kdc_options = kdc_exchange_dict['kdc_options']
2523 pos = len(tuple(krb5_asn1.KDCOptions('cname-in-addl-tkt'))) - 1
2524 constrained_delegation = (pos < len(kdc_options)
2525 and kdc_options[pos] == '1')
2526 if constrained_delegation:
2527 expected_types.append(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION)
2529 if self.kdc_fast_support:
2530 if expect_claims:
2531 expected_types.append(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
2533 if (rep_msg_type == KRB_TGS_REP
2534 and armor_tgt is not None):
2535 expected_types.append(krb5pac.PAC_TYPE_DEVICE_INFO)
2536 expected_types.append(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO)
2538 if not self.is_tgs(expected_sname):
2539 expected_types.append(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
2541 if self.strict_checking:
2542 buffer_types = [pac_buffer.type
2543 for pac_buffer in pac.buffers]
2544 self.assertCountEqual(expected_types, buffer_types,
2545 f'expected: {expected_types} '
2546 f'got: {buffer_types}')
2548 for pac_buffer in pac.buffers:
2549 if pac_buffer.type == krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION:
2550 expected_proxy_target = kdc_exchange_dict[
2551 'expected_proxy_target']
2552 expected_transited_services = kdc_exchange_dict[
2553 'expected_transited_services']
2555 delegation_info = pac_buffer.info.info
2557 self.assertEqual(expected_proxy_target,
2558 str(delegation_info.proxy_target))
2560 transited_services = list(map(
2561 str, delegation_info.transited_services))
2562 self.assertEqual(expected_transited_services,
2563 transited_services)
2565 elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
2566 expected_cname = kdc_exchange_dict['expected_cname']
2567 account_name = expected_cname['name-string'][0]
2569 self.assertEqual(account_name, pac_buffer.info.account_name)
2571 def generic_check_kdc_error(self,
2572 kdc_exchange_dict,
2573 callback_dict,
2574 rep,
2575 inner=False):
2577 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2579 expected_anon = kdc_exchange_dict['expected_anon']
2580 expected_srealm = kdc_exchange_dict['expected_srealm']
2581 expected_sname = kdc_exchange_dict['expected_sname']
2582 expected_error_mode = kdc_exchange_dict['expected_error_mode']
2584 sent_fast = self.sent_fast(kdc_exchange_dict)
2586 fast_armor_type = kdc_exchange_dict['fast_armor_type']
2588 self.assertElementEqual(rep, 'pvno', 5)
2589 self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
2590 error_code = self.getElementValue(rep, 'error-code')
2591 self.assertIn(error_code, expected_error_mode)
2592 if self.strict_checking:
2593 self.assertElementMissing(rep, 'ctime')
2594 self.assertElementMissing(rep, 'cusec')
2595 self.assertElementPresent(rep, 'stime')
2596 self.assertElementPresent(rep, 'susec')
2597 # error-code checked above
2598 if self.strict_checking:
2599 self.assertElementMissing(rep, 'crealm')
2600 if expected_anon and not inner:
2601 expected_cname = self.PrincipalName_create(
2602 name_type=NT_WELLKNOWN,
2603 names=['WELLKNOWN', 'ANONYMOUS'])
2604 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2605 else:
2606 self.assertElementMissing(rep, 'cname')
2607 self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
2608 self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
2609 self.assertElementMissing(rep, 'e-text')
2610 expected_status = kdc_exchange_dict['expected_status']
2611 expect_edata = kdc_exchange_dict['expect_edata']
2612 if expect_edata is None:
2613 expect_edata = (error_code != KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
2614 and (not sent_fast or fast_armor_type is None
2615 or fast_armor_type == FX_FAST_ARMOR_AP_REQUEST)
2616 and not inner)
2617 if not expect_edata:
2618 self.assertIsNone(expected_status)
2619 self.assertElementMissing(rep, 'e-data')
2620 return rep
2621 edata = self.getElementValue(rep, 'e-data')
2622 if self.strict_checking:
2623 self.assertIsNotNone(edata)
2624 if edata is not None:
2625 if rep_msg_type == KRB_TGS_REP and not sent_fast:
2626 error_data = self.der_decode(
2627 edata,
2628 asn1Spec=krb5_asn1.KERB_ERROR_DATA())
2629 self.assertEqual(KERB_ERR_TYPE_EXTENDED,
2630 error_data['data-type'])
2632 extended_error = error_data['data-value']
2634 self.assertEqual(12, len(extended_error))
2636 status = int.from_bytes(extended_error[:4], 'little')
2637 flags = int.from_bytes(extended_error[8:], 'little')
2639 self.assertEqual(expected_status, status)
2641 self.assertEqual(3, flags)
2642 else:
2643 self.assertIsNone(expected_status)
2645 rep_padata = self.der_decode(edata,
2646 asn1Spec=krb5_asn1.METHOD_DATA())
2647 self.assertGreater(len(rep_padata), 0)
2649 if sent_fast:
2650 self.assertEqual(1, len(rep_padata))
2651 rep_pa_dict = self.get_pa_dict(rep_padata)
2652 self.assertIn(PADATA_FX_FAST, rep_pa_dict)
2654 armor_key = kdc_exchange_dict['armor_key']
2655 self.assertIsNotNone(armor_key)
2656 fast_response = self.check_fx_fast_data(
2657 kdc_exchange_dict,
2658 rep_pa_dict[PADATA_FX_FAST],
2659 armor_key,
2660 expect_strengthen_key=False)
2662 rep_padata = fast_response['padata']
2664 etype_info2 = self.check_rep_padata(kdc_exchange_dict,
2665 callback_dict,
2666 rep_padata,
2667 error_code)
2669 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
2671 return rep
2673 def check_rep_padata(self,
2674 kdc_exchange_dict,
2675 callback_dict,
2676 rep_padata,
2677 error_code):
2678 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2680 req_body = kdc_exchange_dict['req_body']
2681 proposed_etypes = req_body['etype']
2682 client_as_etypes = kdc_exchange_dict.get('client_as_etypes', [])
2684 sent_fast = self.sent_fast(kdc_exchange_dict)
2685 sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict)
2687 if rep_msg_type == KRB_TGS_REP:
2688 self.assertTrue(sent_fast)
2690 expect_etype_info2 = ()
2691 expect_etype_info = False
2692 unexpect_etype_info = True
2693 expected_aes_type = 0
2694 expected_rc4_type = 0
2695 if kcrypto.Enctype.RC4 in proposed_etypes:
2696 expect_etype_info = True
2697 for etype in proposed_etypes:
2698 if etype not in client_as_etypes:
2699 continue
2700 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2701 expect_etype_info = False
2702 if etype > expected_aes_type:
2703 expected_aes_type = etype
2704 if etype in (kcrypto.Enctype.RC4,) and error_code != 0:
2705 unexpect_etype_info = False
2706 if etype > expected_rc4_type:
2707 expected_rc4_type = etype
2709 if expected_aes_type != 0:
2710 expect_etype_info2 += (expected_aes_type,)
2711 if expected_rc4_type != 0:
2712 expect_etype_info2 += (expected_rc4_type,)
2714 expected_patypes = ()
2715 if sent_fast and error_code != 0:
2716 expected_patypes += (PADATA_FX_ERROR,)
2717 expected_patypes += (PADATA_FX_COOKIE,)
2719 if rep_msg_type == KRB_TGS_REP:
2720 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2721 if ('1' in sent_pac_options
2722 and error_code not in (0, KDC_ERR_GENERIC)):
2723 expected_patypes += (PADATA_PAC_OPTIONS,)
2724 elif error_code != KDC_ERR_GENERIC:
2725 if expect_etype_info:
2726 self.assertGreater(len(expect_etype_info2), 0)
2727 expected_patypes += (PADATA_ETYPE_INFO,)
2728 if len(expect_etype_info2) != 0:
2729 expected_patypes += (PADATA_ETYPE_INFO2,)
2731 if error_code != KDC_ERR_PREAUTH_FAILED:
2732 if sent_fast:
2733 expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,)
2734 else:
2735 expected_patypes += (PADATA_ENC_TIMESTAMP,)
2737 if not sent_enc_challenge:
2738 expected_patypes += (PADATA_PK_AS_REQ,)
2739 expected_patypes += (PADATA_PK_AS_REP_19,)
2741 if (self.kdc_fast_support
2742 and not sent_fast
2743 and not sent_enc_challenge):
2744 expected_patypes += (PADATA_FX_FAST,)
2745 expected_patypes += (PADATA_FX_COOKIE,)
2747 got_patypes = tuple(pa['padata-type'] for pa in rep_padata)
2748 self.assertSequenceElementsEqual(expected_patypes, got_patypes,
2749 require_strict={PADATA_FX_COOKIE,
2750 PADATA_FX_FAST,
2751 PADATA_PAC_OPTIONS,
2752 PADATA_PK_AS_REP_19,
2753 PADATA_PK_AS_REQ})
2755 if not expected_patypes:
2756 return None
2758 pa_dict = self.get_pa_dict(rep_padata)
2760 enc_timestamp = pa_dict.get(PADATA_ENC_TIMESTAMP)
2761 if enc_timestamp is not None:
2762 self.assertEqual(len(enc_timestamp), 0)
2764 pk_as_req = pa_dict.get(PADATA_PK_AS_REQ)
2765 if pk_as_req is not None:
2766 self.assertEqual(len(pk_as_req), 0)
2768 pk_as_rep19 = pa_dict.get(PADATA_PK_AS_REP_19)
2769 if pk_as_rep19 is not None:
2770 self.assertEqual(len(pk_as_rep19), 0)
2772 fx_fast = pa_dict.get(PADATA_FX_FAST)
2773 if fx_fast is not None:
2774 self.assertEqual(len(fx_fast), 0)
2776 fast_cookie = pa_dict.get(PADATA_FX_COOKIE)
2777 if fast_cookie is not None:
2778 kdc_exchange_dict['fast_cookie'] = fast_cookie
2780 fast_error = pa_dict.get(PADATA_FX_ERROR)
2781 if fast_error is not None:
2782 fast_error = self.der_decode(fast_error,
2783 asn1Spec=krb5_asn1.KRB_ERROR())
2784 self.generic_check_kdc_error(kdc_exchange_dict,
2785 callback_dict,
2786 fast_error,
2787 inner=True)
2789 pac_options = pa_dict.get(PADATA_PAC_OPTIONS)
2790 if pac_options is not None:
2791 pac_options = self.der_decode(
2792 pac_options,
2793 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2794 self.assertElementEqual(pac_options, 'options', sent_pac_options)
2796 enc_challenge = pa_dict.get(PADATA_ENCRYPTED_CHALLENGE)
2797 if enc_challenge is not None:
2798 if not sent_enc_challenge:
2799 self.assertEqual(len(enc_challenge), 0)
2800 else:
2801 armor_key = kdc_exchange_dict['armor_key']
2802 self.assertIsNotNone(armor_key)
2804 preauth_key, _ = self.get_preauth_key(kdc_exchange_dict)
2806 kdc_challenge_key = self.generate_kdc_challenge_key(
2807 armor_key, preauth_key)
2809 # Ensure that the encrypted challenge FAST factor is supported
2810 # (RFC6113 5.4.6).
2811 if self.strict_checking:
2812 self.assertNotEqual(len(enc_challenge), 0)
2813 if len(enc_challenge) != 0:
2814 encrypted_challenge = self.der_decode(
2815 enc_challenge,
2816 asn1Spec=krb5_asn1.EncryptedData())
2817 self.assertEqual(encrypted_challenge['etype'],
2818 kdc_challenge_key.etype)
2820 challenge = kdc_challenge_key.decrypt(
2821 KU_ENC_CHALLENGE_KDC,
2822 encrypted_challenge['cipher'])
2823 challenge = self.der_decode(
2824 challenge,
2825 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
2827 # Retrieve the returned timestamp.
2828 rep_patime = challenge['patimestamp']
2829 self.assertIn('pausec', challenge)
2831 # Ensure the returned time is within five minutes of the
2832 # current time.
2833 rep_time = self.get_EpochFromKerberosTime(rep_patime)
2834 current_time = time.time()
2836 self.assertLess(current_time - 300, rep_time)
2837 self.assertLess(rep_time, current_time + 300)
2839 etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2)
2840 if etype_info2 is not None:
2841 etype_info2 = self.der_decode(etype_info2,
2842 asn1Spec=krb5_asn1.ETYPE_INFO2())
2843 self.assertGreaterEqual(len(etype_info2), 1)
2844 if self.strict_checking:
2845 self.assertEqual(len(etype_info2), len(expect_etype_info2))
2846 for i in range(0, len(etype_info2)):
2847 e = self.getElementValue(etype_info2[i], 'etype')
2848 if self.strict_checking:
2849 self.assertEqual(e, expect_etype_info2[i])
2850 salt = self.getElementValue(etype_info2[i], 'salt')
2851 if e == kcrypto.Enctype.RC4:
2852 if self.strict_checking:
2853 self.assertIsNone(salt)
2854 else:
2855 self.assertIsNotNone(salt)
2856 expected_salt = kdc_exchange_dict['expected_salt']
2857 if expected_salt is not None:
2858 self.assertEqual(salt, expected_salt)
2859 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
2860 if self.strict_checking:
2861 self.assertIsNone(s2kparams)
2863 etype_info = pa_dict.get(PADATA_ETYPE_INFO)
2864 if etype_info is not None:
2865 etype_info = self.der_decode(etype_info,
2866 asn1Spec=krb5_asn1.ETYPE_INFO())
2867 self.assertEqual(len(etype_info), 1)
2868 e = self.getElementValue(etype_info[0], 'etype')
2869 self.assertEqual(e, kcrypto.Enctype.RC4)
2870 self.assertEqual(e, expect_etype_info2[0])
2871 salt = self.getElementValue(etype_info[0], 'salt')
2872 if self.strict_checking:
2873 self.assertIsNotNone(salt)
2874 self.assertEqual(len(salt), 0)
2876 return etype_info2
2878 def generate_simple_fast(self,
2879 kdc_exchange_dict,
2880 _callback_dict,
2881 req_body,
2882 fast_padata,
2883 fast_armor,
2884 checksum,
2885 fast_options=''):
2886 armor_key = kdc_exchange_dict['armor_key']
2888 fast_req = self.KRB_FAST_REQ_create(fast_options,
2889 fast_padata,
2890 req_body)
2891 fast_req = self.der_encode(fast_req,
2892 asn1Spec=krb5_asn1.KrbFastReq())
2893 fast_req = self.EncryptedData_create(armor_key,
2894 KU_FAST_ENC,
2895 fast_req)
2897 fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
2898 checksum,
2899 fast_req)
2901 fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
2902 fx_fast_request = self.der_encode(
2903 fx_fast_request,
2904 asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
2906 fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
2907 fx_fast_request)
2909 return fast_padata
2911 def generate_ap_req(self,
2912 kdc_exchange_dict,
2913 _callback_dict,
2914 req_body,
2915 armor):
2916 if armor:
2917 tgt = kdc_exchange_dict['armor_tgt']
2918 authenticator_subkey = kdc_exchange_dict['armor_subkey']
2920 req_body_checksum = None
2921 else:
2922 tgt = kdc_exchange_dict['tgt']
2923 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2924 body_checksum_type = kdc_exchange_dict['body_checksum_type']
2926 req_body_blob = self.der_encode(req_body,
2927 asn1Spec=krb5_asn1.KDC_REQ_BODY())
2929 req_body_checksum = self.Checksum_create(tgt.session_key,
2930 KU_TGS_REQ_AUTH_CKSUM,
2931 req_body_blob,
2932 ctype=body_checksum_type)
2934 auth_data = kdc_exchange_dict['auth_data']
2936 subkey_obj = None
2937 if authenticator_subkey is not None:
2938 subkey_obj = authenticator_subkey.export_obj()
2939 seq_number = random.randint(0, 0xfffffffe)
2940 (ctime, cusec) = self.get_KerberosTimeWithUsec()
2941 authenticator_obj = self.Authenticator_create(
2942 crealm=tgt.crealm,
2943 cname=tgt.cname,
2944 cksum=req_body_checksum,
2945 cusec=cusec,
2946 ctime=ctime,
2947 subkey=subkey_obj,
2948 seq_number=seq_number,
2949 authorization_data=auth_data)
2950 authenticator_blob = self.der_encode(
2951 authenticator_obj,
2952 asn1Spec=krb5_asn1.Authenticator())
2954 usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
2955 authenticator = self.EncryptedData_create(tgt.session_key,
2956 usage,
2957 authenticator_blob)
2959 ap_options = krb5_asn1.APOptions('0')
2960 ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
2961 ticket=tgt.ticket,
2962 authenticator=authenticator)
2963 ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
2965 return ap_req
2967 def generate_simple_tgs_padata(self,
2968 kdc_exchange_dict,
2969 callback_dict,
2970 req_body):
2971 ap_req = self.generate_ap_req(kdc_exchange_dict,
2972 callback_dict,
2973 req_body,
2974 armor=False)
2975 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
2976 padata = [pa_tgs_req]
2978 return padata, req_body
2980 def get_preauth_key(self, kdc_exchange_dict):
2981 msg_type = kdc_exchange_dict['rep_msg_type']
2983 if msg_type == KRB_AS_REP:
2984 key = kdc_exchange_dict['preauth_key']
2985 usage = KU_AS_REP_ENC_PART
2986 else: # KRB_TGS_REP
2987 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2988 if authenticator_subkey is not None:
2989 key = authenticator_subkey
2990 usage = KU_TGS_REP_ENC_PART_SUB_KEY
2991 else:
2992 tgt = kdc_exchange_dict['tgt']
2993 key = tgt.session_key
2994 usage = KU_TGS_REP_ENC_PART_SESSION
2996 self.assertIsNotNone(key)
2998 return key, usage
3000 def generate_armor_key(self, subkey, session_key):
3001 armor_key = kcrypto.cf2(subkey.key,
3002 session_key.key,
3003 b'subkeyarmor',
3004 b'ticketarmor')
3005 armor_key = Krb5EncryptionKey(armor_key, None)
3007 return armor_key
3009 def generate_strengthen_reply_key(self, strengthen_key, reply_key):
3010 strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
3011 reply_key.key,
3012 b'strengthenkey',
3013 b'replykey')
3014 strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
3015 reply_key.kvno)
3017 return strengthen_reply_key
3019 def generate_client_challenge_key(self, armor_key, longterm_key):
3020 client_challenge_key = kcrypto.cf2(armor_key.key,
3021 longterm_key.key,
3022 b'clientchallengearmor',
3023 b'challengelongterm')
3024 client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
3026 return client_challenge_key
3028 def generate_kdc_challenge_key(self, armor_key, longterm_key):
3029 kdc_challenge_key = kcrypto.cf2(armor_key.key,
3030 longterm_key.key,
3031 b'kdcchallengearmor',
3032 b'challengelongterm')
3033 kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
3035 return kdc_challenge_key
3037 def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
3038 expected_type = expected_checksum['cksumtype']
3039 self.assertEqual(armor_key.ctype, expected_type)
3041 ticket_blob = self.der_encode(ticket,
3042 asn1Spec=krb5_asn1.Ticket())
3043 checksum = self.Checksum_create(armor_key,
3044 KU_FAST_FINISHED,
3045 ticket_blob)
3046 self.assertEqual(expected_checksum, checksum)
3048 def verify_ticket(self, ticket, krbtgt_key, expect_pac=True,
3049 expect_ticket_checksum=True):
3050 # Check if the ticket is a TGT.
3051 sname = ticket.ticket['sname']
3052 is_tgt = self.is_tgs(sname)
3054 # Decrypt the ticket.
3056 key = ticket.decryption_key
3057 enc_part = ticket.ticket['enc-part']
3059 self.assertElementEqual(enc_part, 'etype', key.etype)
3060 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3062 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3063 enc_part = self.der_decode(
3064 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3066 # Fetch the authorization data from the ticket.
3067 auth_data = enc_part.get('authorization-data')
3068 if expect_pac:
3069 self.assertIsNotNone(auth_data)
3070 elif auth_data is None:
3071 return
3073 # Get a copy of the authdata with an empty PAC, and the existing PAC
3074 # (if present).
3075 empty_pac = self.get_empty_pac()
3076 auth_data, pac_data = self.replace_pac(auth_data,
3077 empty_pac,
3078 expect_pac=expect_pac)
3079 if not expect_pac:
3080 return
3082 # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
3083 # raw type to create a new PAC with zeroed signatures for
3084 # verification. This is because on Windows, the resource_groups field
3085 # is added to PAC_LOGON_INFO after the info3 field has been created,
3086 # which results in a different ordering of pointer values than Samba
3087 # (see commit 0e201ecdc53). Using the raw type avoids changing
3088 # PAC_LOGON_INFO, so verification against Windows can work. We still
3089 # need the PAC_DATA type to retrieve the actual checksums, because the
3090 # signatures in the raw type may contain padding bytes.
3091 pac = ndr_unpack(krb5pac.PAC_DATA,
3092 pac_data)
3093 raw_pac = ndr_unpack(krb5pac.PAC_DATA_RAW,
3094 pac_data)
3096 checksums = {}
3098 for pac_buffer, raw_pac_buffer in zip(pac.buffers, raw_pac.buffers):
3099 buffer_type = pac_buffer.type
3100 if buffer_type in self.pac_checksum_types:
3101 self.assertNotIn(buffer_type, checksums,
3102 f'Duplicate checksum type {buffer_type}')
3104 # Fetch the checksum and the checksum type from the PAC buffer.
3105 checksum = pac_buffer.info.signature
3106 ctype = pac_buffer.info.type
3107 if ctype & 1 << 31:
3108 ctype |= -1 << 31
3110 checksums[buffer_type] = checksum, ctype
3112 if buffer_type != krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3113 # Zero the checksum field so that we can later verify the
3114 # checksums. The ticket checksum field is not zeroed.
3116 signature = ndr_unpack(
3117 krb5pac.PAC_SIGNATURE_DATA,
3118 raw_pac_buffer.info.remaining)
3119 signature.signature = bytes(len(checksum))
3120 raw_pac_buffer.info.remaining = ndr_pack(
3121 signature)
3123 # Re-encode the PAC.
3124 pac_data = ndr_pack(raw_pac)
3126 # Verify the signatures.
3128 server_checksum, server_ctype = checksums[
3129 krb5pac.PAC_TYPE_SRV_CHECKSUM]
3130 key.verify_checksum(KU_NON_KERB_CKSUM_SALT,
3131 pac_data,
3132 server_ctype,
3133 server_checksum)
3135 kdc_checksum, kdc_ctype = checksums[
3136 krb5pac.PAC_TYPE_KDC_CHECKSUM]
3137 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
3138 server_checksum,
3139 kdc_ctype,
3140 kdc_checksum)
3142 if is_tgt:
3143 self.assertNotIn(krb5pac.PAC_TYPE_TICKET_CHECKSUM, checksums)
3144 else:
3145 ticket_checksum, ticket_ctype = checksums.get(
3146 krb5pac.PAC_TYPE_TICKET_CHECKSUM,
3147 (None, None))
3148 if expect_ticket_checksum:
3149 self.assertIsNotNone(ticket_checksum)
3150 elif expect_ticket_checksum is False:
3151 self.assertIsNone(ticket_checksum)
3152 if ticket_checksum is not None:
3153 enc_part['authorization-data'] = auth_data
3154 enc_part = self.der_encode(enc_part,
3155 asn1Spec=krb5_asn1.EncTicketPart())
3157 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
3158 enc_part,
3159 ticket_ctype,
3160 ticket_checksum)
3162 def modified_ticket(self,
3163 ticket, *,
3164 new_ticket_key=None,
3165 modify_fn=None,
3166 modify_pac_fn=None,
3167 exclude_pac=False,
3168 update_pac_checksums=True,
3169 checksum_keys=None,
3170 include_checksums=None):
3171 if checksum_keys is None:
3172 # A dict containing a key for each checksum type to be created in
3173 # the PAC.
3174 checksum_keys = {}
3176 if include_checksums is None:
3177 # A dict containing a value for each checksum type; True if the
3178 # checksum type is to be included in the PAC, False if it is to be
3179 # excluded, or None/not present if the checksum is to be included
3180 # based on its presence in the original PAC.
3181 include_checksums = {}
3183 # Check that the values passed in by the caller make sense.
3185 self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types)
3186 self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types)
3188 if exclude_pac:
3189 self.assertIsNone(modify_pac_fn)
3191 update_pac_checksums = False
3193 if not update_pac_checksums:
3194 self.assertFalse(checksum_keys)
3195 self.assertFalse(include_checksums)
3197 expect_pac = update_pac_checksums or modify_pac_fn is not None
3199 key = ticket.decryption_key
3201 if new_ticket_key is None:
3202 # Use the same key to re-encrypt the ticket.
3203 new_ticket_key = key
3205 if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys:
3206 # If the server signature key is not present, fall back to the key
3207 # used to encrypt the ticket.
3208 checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key
3210 if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys:
3211 # If the ticket signature key is not present, fall back to the key
3212 # used for the KDC signature.
3213 kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
3214 if kdc_checksum_key is not None:
3215 checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = (
3216 kdc_checksum_key)
3218 # Decrypt the ticket.
3220 enc_part = ticket.ticket['enc-part']
3222 self.assertElementEqual(enc_part, 'etype', key.etype)
3223 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3225 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3226 enc_part = self.der_decode(
3227 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3229 # Modify the ticket here.
3230 if modify_fn is not None:
3231 enc_part = modify_fn(enc_part)
3233 auth_data = enc_part.get('authorization-data')
3234 if expect_pac:
3235 self.assertIsNotNone(auth_data)
3236 if auth_data is not None:
3237 new_pac = None
3238 if not exclude_pac:
3239 # Get a copy of the authdata with an empty PAC, and the
3240 # existing PAC (if present).
3241 empty_pac = self.get_empty_pac()
3242 empty_pac_auth_data, pac_data = self.replace_pac(
3243 auth_data,
3244 empty_pac,
3245 expect_pac=expect_pac)
3247 if pac_data is not None:
3248 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
3250 # Modify the PAC here.
3251 if modify_pac_fn is not None:
3252 pac = modify_pac_fn(pac)
3254 if update_pac_checksums:
3255 # Get the enc-part with an empty PAC, which is needed
3256 # to create a ticket signature.
3257 enc_part_to_sign = enc_part.copy()
3258 enc_part_to_sign['authorization-data'] = (
3259 empty_pac_auth_data)
3260 enc_part_to_sign = self.der_encode(
3261 enc_part_to_sign,
3262 asn1Spec=krb5_asn1.EncTicketPart())
3264 self.update_pac_checksums(pac,
3265 checksum_keys,
3266 include_checksums,
3267 enc_part_to_sign)
3269 # Re-encode the PAC.
3270 pac_data = ndr_pack(pac)
3271 new_pac = self.AuthorizationData_create(AD_WIN2K_PAC,
3272 pac_data)
3274 # Replace the PAC in the authorization data and re-add it to the
3275 # ticket enc-part.
3276 auth_data, _ = self.replace_pac(auth_data, new_pac,
3277 expect_pac=expect_pac)
3278 enc_part['authorization-data'] = auth_data
3280 # Re-encrypt the ticket enc-part with the new key.
3281 enc_part_new = self.der_encode(enc_part,
3282 asn1Spec=krb5_asn1.EncTicketPart())
3283 enc_part_new = self.EncryptedData_create(new_ticket_key,
3284 KU_TICKET,
3285 enc_part_new)
3287 # Create a copy of the ticket with the new enc-part.
3288 new_ticket = ticket.ticket.copy()
3289 new_ticket['enc-part'] = enc_part_new
3291 new_ticket_creds = KerberosTicketCreds(
3292 new_ticket,
3293 session_key=ticket.session_key,
3294 crealm=ticket.crealm,
3295 cname=ticket.cname,
3296 srealm=ticket.srealm,
3297 sname=ticket.sname,
3298 decryption_key=new_ticket_key,
3299 ticket_private=enc_part,
3300 encpart_private=ticket.encpart_private)
3302 return new_ticket_creds
3304 def update_pac_checksums(self,
3305 pac,
3306 checksum_keys,
3307 include_checksums,
3308 enc_part=None):
3309 pac_buffers = pac.buffers
3310 checksum_buffers = {}
3312 # Find the relevant PAC checksum buffers.
3313 for pac_buffer in pac_buffers:
3314 buffer_type = pac_buffer.type
3315 if buffer_type in self.pac_checksum_types:
3316 self.assertNotIn(buffer_type, checksum_buffers,
3317 f'Duplicate checksum type {buffer_type}')
3319 checksum_buffers[buffer_type] = pac_buffer
3321 # Create any additional buffers that were requested but not
3322 # present. Conversely, remove any buffers that were requested to be
3323 # removed.
3324 for buffer_type in self.pac_checksum_types:
3325 if buffer_type in checksum_buffers:
3326 if include_checksums.get(buffer_type) is False:
3327 checksum_buffer = checksum_buffers.pop(buffer_type)
3329 pac.num_buffers -= 1
3330 pac_buffers.remove(checksum_buffer)
3332 elif include_checksums.get(buffer_type) is True:
3333 info = krb5pac.PAC_SIGNATURE_DATA()
3335 checksum_buffer = krb5pac.PAC_BUFFER()
3336 checksum_buffer.type = buffer_type
3337 checksum_buffer.info = info
3339 pac_buffers.append(checksum_buffer)
3340 pac.num_buffers += 1
3342 checksum_buffers[buffer_type] = checksum_buffer
3344 # Fill the relevant checksum buffers.
3345 for buffer_type, checksum_buffer in checksum_buffers.items():
3346 checksum_key = checksum_keys[buffer_type]
3347 ctype = checksum_key.ctype & ((1 << 32) - 1)
3349 if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3350 self.assertIsNotNone(enc_part)
3352 signature = checksum_key.make_rodc_checksum(
3353 KU_NON_KERB_CKSUM_SALT,
3354 enc_part)
3356 elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM:
3357 signature = checksum_key.make_zeroed_checksum()
3359 else:
3360 signature = checksum_key.make_rodc_zeroed_checksum()
3362 checksum_buffer.info.signature = signature
3363 checksum_buffer.info.type = ctype
3365 # Add the new checksum buffers to the PAC.
3366 pac.buffers = pac_buffers
3368 # Calculate the server and KDC checksums and insert them into the PAC.
3370 server_checksum_buffer = checksum_buffers.get(
3371 krb5pac.PAC_TYPE_SRV_CHECKSUM)
3372 if server_checksum_buffer is not None:
3373 server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM]
3375 pac_data = ndr_pack(pac)
3376 server_checksum = server_checksum_key.make_checksum(
3377 KU_NON_KERB_CKSUM_SALT,
3378 pac_data)
3380 server_checksum_buffer.info.signature = server_checksum
3382 kdc_checksum_buffer = checksum_buffers.get(
3383 krb5pac.PAC_TYPE_KDC_CHECKSUM)
3384 if kdc_checksum_buffer is not None:
3385 if server_checksum_buffer is None:
3386 # There's no server signature to make the checksum over, so
3387 # just make the checksum over an empty bytes object.
3388 server_checksum = bytes()
3390 kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM]
3392 kdc_checksum = kdc_checksum_key.make_rodc_checksum(
3393 KU_NON_KERB_CKSUM_SALT,
3394 server_checksum)
3396 kdc_checksum_buffer.info.signature = kdc_checksum
3398 def replace_pac(self, auth_data, new_pac, expect_pac=True):
3399 if new_pac is not None:
3400 self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)
3401 self.assertElementPresent(new_pac, 'ad-data')
3403 new_auth_data = []
3405 ad_relevant = None
3406 old_pac = None
3408 for authdata_elem in auth_data:
3409 if authdata_elem['ad-type'] == AD_IF_RELEVANT:
3410 ad_relevant = self.der_decode(
3411 authdata_elem['ad-data'],
3412 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3414 relevant_elems = []
3415 for relevant_elem in ad_relevant:
3416 if relevant_elem['ad-type'] == AD_WIN2K_PAC:
3417 self.assertIsNone(old_pac, 'Multiple PACs detected')
3418 old_pac = relevant_elem['ad-data']
3420 if new_pac is not None:
3421 relevant_elems.append(new_pac)
3422 else:
3423 relevant_elems.append(relevant_elem)
3424 if expect_pac:
3425 self.assertIsNotNone(old_pac, 'Expected PAC')
3427 if relevant_elems:
3428 ad_relevant = self.der_encode(
3429 relevant_elems,
3430 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3432 authdata_elem = self.AuthorizationData_create(
3433 AD_IF_RELEVANT,
3434 ad_relevant)
3435 else:
3436 authdata_elem = None
3438 if authdata_elem is not None:
3439 new_auth_data.append(authdata_elem)
3441 if expect_pac:
3442 self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT')
3444 return new_auth_data, old_pac
3446 def get_pac(self, auth_data, expect_pac=True):
3447 _, pac = self.replace_pac(auth_data, None, expect_pac)
3448 return pac
3450 def get_krbtgt_checksum_key(self):
3451 krbtgt_creds = self.get_krbtgt_creds()
3452 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
3454 return {
3455 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
3458 def is_tgs(self, principal):
3459 name = principal['name-string'][0]
3460 return name in ('krbtgt', b'krbtgt')
3462 def get_empty_pac(self):
3463 return self.AuthorizationData_create(AD_WIN2K_PAC, bytes(1))
3465 def get_outer_pa_dict(self, kdc_exchange_dict):
3466 return self.get_pa_dict(kdc_exchange_dict['req_padata'])
3468 def get_fast_pa_dict(self, kdc_exchange_dict):
3469 req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
3471 if req_pa_dict:
3472 return req_pa_dict
3474 return self.get_outer_pa_dict(kdc_exchange_dict)
3476 def sent_fast(self, kdc_exchange_dict):
3477 outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
3479 return PADATA_FX_FAST in outer_pa_dict
3481 def sent_enc_challenge(self, kdc_exchange_dict):
3482 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
3484 return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
3486 def get_sent_pac_options(self, kdc_exchange_dict):
3487 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
3489 if PADATA_PAC_OPTIONS not in fast_pa_dict:
3490 return ''
3492 pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
3493 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
3494 pac_options = pac_options['options']
3496 # Mask out unsupported bits.
3497 pac_options, remaining = pac_options[:4], pac_options[4:]
3498 pac_options += '0' * len(remaining)
3500 return pac_options
3502 def get_krbtgt_sname(self):
3503 krbtgt_creds = self.get_krbtgt_creds()
3504 krbtgt_username = krbtgt_creds.get_username()
3505 krbtgt_realm = krbtgt_creds.get_realm()
3506 krbtgt_sname = self.PrincipalName_create(
3507 name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm])
3509 return krbtgt_sname
3511 def _test_as_exchange(self,
3512 cname,
3513 realm,
3514 sname,
3515 till,
3516 client_as_etypes,
3517 expected_error_mode,
3518 expected_crealm,
3519 expected_cname,
3520 expected_srealm,
3521 expected_sname,
3522 expected_salt,
3523 etypes,
3524 padata,
3525 kdc_options,
3526 expected_flags=None,
3527 unexpected_flags=None,
3528 expected_supported_etypes=None,
3529 preauth_key=None,
3530 ticket_decryption_key=None,
3531 pac_request=None,
3532 pac_options=None,
3533 expect_pac=True,
3534 to_rodc=False):
3536 def _generate_padata_copy(_kdc_exchange_dict,
3537 _callback_dict,
3538 req_body):
3539 return padata, req_body
3541 if not expected_error_mode:
3542 check_error_fn = None
3543 check_rep_fn = self.generic_check_kdc_rep
3544 else:
3545 check_error_fn = self.generic_check_kdc_error
3546 check_rep_fn = None
3548 if padata is not None:
3549 generate_padata_fn = _generate_padata_copy
3550 else:
3551 generate_padata_fn = None
3553 kdc_exchange_dict = self.as_exchange_dict(
3554 expected_crealm=expected_crealm,
3555 expected_cname=expected_cname,
3556 expected_srealm=expected_srealm,
3557 expected_sname=expected_sname,
3558 expected_supported_etypes=expected_supported_etypes,
3559 ticket_decryption_key=ticket_decryption_key,
3560 generate_padata_fn=generate_padata_fn,
3561 check_error_fn=check_error_fn,
3562 check_rep_fn=check_rep_fn,
3563 check_kdc_private_fn=self.generic_check_kdc_private,
3564 expected_error_mode=expected_error_mode,
3565 client_as_etypes=client_as_etypes,
3566 expected_salt=expected_salt,
3567 expected_flags=expected_flags,
3568 unexpected_flags=unexpected_flags,
3569 preauth_key=preauth_key,
3570 kdc_options=str(kdc_options),
3571 pac_request=pac_request,
3572 pac_options=pac_options,
3573 expect_pac=expect_pac,
3574 to_rodc=to_rodc)
3576 rep = self._generic_kdc_exchange(kdc_exchange_dict,
3577 cname=cname,
3578 realm=realm,
3579 sname=sname,
3580 till_time=till,
3581 etypes=etypes)
3583 return rep, kdc_exchange_dict