tests/krb5: Fix account salt calculation to match Windows
[Samba.git] / python / samba / tests / krb5 / raw_testcase.py
blobf352615db1fdffd08640d6a789933706f5703a81
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import socket
21 import struct
22 import time
23 import datetime
24 import random
25 import binascii
26 import itertools
27 import collections
29 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
30 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
31 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
32 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
34 from pyasn1.codec.ber.encoder import BitStringEncoder
36 from samba.credentials import Credentials
37 from samba.dcerpc import krb5pac, security
38 from samba.gensec import FEATURE_SEAL
39 from samba.ndr import ndr_pack, ndr_unpack
41 import samba.tests
42 from samba.tests import TestCaseInTempDir
44 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
45 from samba.tests.krb5.rfc4120_constants import (
46 AD_IF_RELEVANT,
47 AD_WIN2K_PAC,
48 FX_FAST_ARMOR_AP_REQUEST,
49 KDC_ERR_GENERIC,
50 KDC_ERR_PREAUTH_FAILED,
51 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
52 KERB_ERR_TYPE_EXTENDED,
53 KRB_AP_REQ,
54 KRB_AS_REP,
55 KRB_AS_REQ,
56 KRB_ERROR,
57 KRB_TGS_REP,
58 KRB_TGS_REQ,
59 KU_AP_REQ_AUTH,
60 KU_AS_REP_ENC_PART,
61 KU_ENC_CHALLENGE_KDC,
62 KU_FAST_ENC,
63 KU_FAST_FINISHED,
64 KU_FAST_REP,
65 KU_FAST_REQ_CHKSUM,
66 KU_NON_KERB_CKSUM_SALT,
67 KU_TGS_REP_ENC_PART_SESSION,
68 KU_TGS_REP_ENC_PART_SUB_KEY,
69 KU_TGS_REQ_AUTH,
70 KU_TGS_REQ_AUTH_CKSUM,
71 KU_TGS_REQ_AUTH_DAT_SESSION,
72 KU_TGS_REQ_AUTH_DAT_SUBKEY,
73 KU_TICKET,
74 NT_SRV_INST,
75 NT_WELLKNOWN,
76 PADATA_ENCRYPTED_CHALLENGE,
77 PADATA_ENC_TIMESTAMP,
78 PADATA_ETYPE_INFO,
79 PADATA_ETYPE_INFO2,
80 PADATA_FOR_USER,
81 PADATA_FX_COOKIE,
82 PADATA_FX_ERROR,
83 PADATA_FX_FAST,
84 PADATA_KDC_REQ,
85 PADATA_PAC_OPTIONS,
86 PADATA_PAC_REQUEST,
87 PADATA_PK_AS_REQ,
88 PADATA_PK_AS_REP_19,
89 PADATA_SUPPORTED_ETYPES
91 import samba.tests.krb5.kcrypto as kcrypto
94 def BitStringEncoder_encodeValue32(
95 self, value, asn1Spec, encodeFun, **options):
97 # BitStrings like KDCOptions or TicketFlags should at least
98 # be 32-Bit on the wire
100 if asn1Spec is not None:
101 # TODO: try to avoid ASN.1 schema instantiation
102 value = asn1Spec.clone(value)
104 valueLength = len(value)
105 if valueLength % 8:
106 alignedValue = value << (8 - valueLength % 8)
107 else:
108 alignedValue = value
110 substrate = alignedValue.asOctets()
111 length = len(substrate)
112 # We need at least 32-Bit / 4-Bytes
113 if length < 4:
114 padding = 4 - length
115 else:
116 padding = 0
117 ret = b'\x00' + substrate + (b'\x00' * padding)
118 return ret, False, True
121 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
124 def BitString_NamedValues_prettyPrint(self, scope=0):
125 ret = "%s" % self.asBinary()
126 bits = []
127 highest_bit = 32
128 for byte in self.asNumbers():
129 for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
130 mask = 1 << bit
131 if byte & mask:
132 val = 1
133 else:
134 val = 0
135 bits.append(val)
136 if len(bits) < highest_bit:
137 for bitPosition in range(len(bits), highest_bit):
138 bits.append(0)
139 indent = " " * scope
140 delim = ": (\n%s " % indent
141 for bitPosition in range(highest_bit):
142 if bitPosition in self.prettyPrintNamedValues:
143 name = self.prettyPrintNamedValues[bitPosition]
144 elif bits[bitPosition] != 0:
145 name = "unknown-bit-%u" % bitPosition
146 else:
147 continue
148 ret += "%s%s:%u" % (delim, name, bits[bitPosition])
149 delim = ",\n%s " % indent
150 ret += "\n%s)" % indent
151 return ret
154 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
155 krb5_asn1.TicketFlagsValues.namedValues
156 krb5_asn1.TicketFlags.namedValues =\
157 krb5_asn1.TicketFlagsValues.namedValues
158 krb5_asn1.TicketFlags.prettyPrint =\
159 BitString_NamedValues_prettyPrint
160 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
161 krb5_asn1.KDCOptionsValues.namedValues
162 krb5_asn1.KDCOptions.namedValues =\
163 krb5_asn1.KDCOptionsValues.namedValues
164 krb5_asn1.KDCOptions.prettyPrint =\
165 BitString_NamedValues_prettyPrint
166 krb5_asn1.APOptions.prettyPrintNamedValues =\
167 krb5_asn1.APOptionsValues.namedValues
168 krb5_asn1.APOptions.namedValues =\
169 krb5_asn1.APOptionsValues.namedValues
170 krb5_asn1.APOptions.prettyPrint =\
171 BitString_NamedValues_prettyPrint
172 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
173 krb5_asn1.PACOptionFlagsValues.namedValues
174 krb5_asn1.PACOptionFlags.namedValues =\
175 krb5_asn1.PACOptionFlagsValues.namedValues
176 krb5_asn1.PACOptionFlags.prettyPrint =\
177 BitString_NamedValues_prettyPrint
180 def Integer_NamedValues_prettyPrint(self, scope=0):
181 intval = int(self)
182 if intval in self.prettyPrintNamedValues:
183 name = self.prettyPrintNamedValues[intval]
184 else:
185 name = "<__unknown__>"
186 ret = "%d (0x%x) %s" % (intval, intval, name)
187 return ret
190 krb5_asn1.NameType.prettyPrintNamedValues =\
191 krb5_asn1.NameTypeValues.namedValues
192 krb5_asn1.NameType.prettyPrint =\
193 Integer_NamedValues_prettyPrint
194 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
195 krb5_asn1.AuthDataTypeValues.namedValues
196 krb5_asn1.AuthDataType.prettyPrint =\
197 Integer_NamedValues_prettyPrint
198 krb5_asn1.PADataType.prettyPrintNamedValues =\
199 krb5_asn1.PADataTypeValues.namedValues
200 krb5_asn1.PADataType.prettyPrint =\
201 Integer_NamedValues_prettyPrint
202 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
203 krb5_asn1.EncryptionTypeValues.namedValues
204 krb5_asn1.EncryptionType.prettyPrint =\
205 Integer_NamedValues_prettyPrint
206 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
207 krb5_asn1.ChecksumTypeValues.namedValues
208 krb5_asn1.ChecksumType.prettyPrint =\
209 Integer_NamedValues_prettyPrint
210 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
211 krb5_asn1.KerbErrorDataTypeValues.namedValues
212 krb5_asn1.KerbErrorDataType.prettyPrint =\
213 Integer_NamedValues_prettyPrint
216 class Krb5EncryptionKey:
217 def __init__(self, key, kvno):
218 EncTypeChecksum = {
219 kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
220 kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
221 kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
223 self.key = key
224 self.etype = key.enctype
225 self.ctype = EncTypeChecksum[self.etype]
226 self.kvno = kvno
228 def encrypt(self, usage, plaintext):
229 ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
230 return ciphertext
232 def decrypt(self, usage, ciphertext):
233 plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
234 return plaintext
236 def make_zeroed_checksum(self, ctype=None):
237 if ctype is None:
238 ctype = self.ctype
240 checksum_len = kcrypto.checksum_len(ctype)
241 return bytes(checksum_len)
243 def make_checksum(self, usage, plaintext, ctype=None):
244 if ctype is None:
245 ctype = self.ctype
246 cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
247 return cksum
249 def verify_checksum(self, usage, plaintext, ctype, cksum):
250 if self.ctype != ctype:
251 raise AssertionError(f'key checksum type ({self.ctype}) != '
252 f'checksum type ({ctype})')
254 kcrypto.verify_checksum(ctype,
255 self.key,
256 usage,
257 plaintext,
258 cksum)
260 def export_obj(self):
261 EncryptionKey_obj = {
262 'keytype': self.etype,
263 'keyvalue': self.key.contents,
265 return EncryptionKey_obj
268 class RodcPacEncryptionKey(Krb5EncryptionKey):
269 def __init__(self, key, kvno, rodc_id=None):
270 super().__init__(key, kvno)
272 if rodc_id is None:
273 kvno = self.kvno
274 if kvno is not None:
275 kvno >>= 16
276 kvno &= (1 << 16) - 1
278 rodc_id = kvno or None
280 if rodc_id is not None:
281 self.rodc_id = rodc_id.to_bytes(2, byteorder='little')
282 else:
283 self.rodc_id = b''
285 def make_rodc_zeroed_checksum(self, ctype=None):
286 checksum = super().make_zeroed_checksum(ctype)
287 return checksum + bytes(len(self.rodc_id))
289 def make_rodc_checksum(self, usage, plaintext, ctype=None):
290 checksum = super().make_checksum(usage, plaintext, ctype)
291 return checksum + self.rodc_id
293 def verify_rodc_checksum(self, usage, plaintext, ctype, cksum):
294 if self.rodc_id:
295 cksum, cksum_rodc_id = cksum[:-2], cksum[-2:]
297 if self.rodc_id != cksum_rodc_id:
298 raise AssertionError(f'{self.rodc_id.hex()} != '
299 f'{cksum_rodc_id.hex()}')
301 super().verify_checksum(usage,
302 plaintext,
303 ctype,
304 cksum)
307 class ZeroedChecksumKey(RodcPacEncryptionKey):
308 def make_checksum(self, usage, plaintext, ctype=None):
309 return self.make_zeroed_checksum(ctype)
311 def make_rodc_checksum(self, usage, plaintext, ctype=None):
312 return self.make_rodc_zeroed_checksum(ctype)
315 class WrongLengthChecksumKey(RodcPacEncryptionKey):
316 def __init__(self, key, kvno, length):
317 super().__init__(key, kvno)
319 self._length = length
321 @classmethod
322 def _adjust_to_length(cls, checksum, length):
323 diff = length - len(checksum)
324 if diff > 0:
325 checksum += bytes(diff)
326 elif diff < 0:
327 checksum = checksum[:length]
329 return checksum
331 def make_zeroed_checksum(self, ctype=None):
332 return bytes(self._length)
334 def make_checksum(self, usage, plaintext, ctype=None):
335 checksum = super().make_checksum(usage, plaintext, ctype)
336 return self._adjust_to_length(checksum, self._length)
338 def make_rodc_zeroed_checksum(self, ctype=None):
339 return bytes(self._length)
341 def make_rodc_checksum(self, usage, plaintext, ctype=None):
342 checksum = super().make_rodc_checksum(usage, plaintext, ctype)
343 return self._adjust_to_length(checksum, self._length)
346 class KerberosCredentials(Credentials):
348 fast_supported_bits = (security.KERB_ENCTYPE_FAST_SUPPORTED |
349 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
350 security.KERB_ENCTYPE_CLAIMS_SUPPORTED)
352 def __init__(self):
353 super(KerberosCredentials, self).__init__()
354 all_enc_types = 0
355 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
356 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
357 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
359 self.as_supported_enctypes = all_enc_types
360 self.tgs_supported_enctypes = all_enc_types
361 self.ap_supported_enctypes = all_enc_types
363 self.kvno = None
364 self.forced_keys = {}
366 self.forced_salt = None
368 self.dn = None
369 self.upn = None
370 self.spn = None
372 def set_as_supported_enctypes(self, value):
373 self.as_supported_enctypes = int(value)
375 def set_tgs_supported_enctypes(self, value):
376 self.tgs_supported_enctypes = int(value)
378 def set_ap_supported_enctypes(self, value):
379 self.ap_supported_enctypes = int(value)
381 etype_map = collections.OrderedDict([
382 (kcrypto.Enctype.AES256,
383 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96),
384 (kcrypto.Enctype.AES128,
385 security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96),
386 (kcrypto.Enctype.RC4,
387 security.KERB_ENCTYPE_RC4_HMAC_MD5),
388 (kcrypto.Enctype.DES_MD5,
389 security.KERB_ENCTYPE_DES_CBC_MD5),
390 (kcrypto.Enctype.DES_CRC,
391 security.KERB_ENCTYPE_DES_CBC_CRC)
394 @classmethod
395 def etypes_to_bits(cls, etypes):
396 bits = 0
397 for etype in etypes:
398 bit = cls.etype_map[etype]
399 if bits & bit:
400 raise ValueError(f'Got duplicate etype: {etype}')
401 bits |= bit
403 return bits
405 @classmethod
406 def bits_to_etypes(cls, bits):
407 etypes = ()
408 for etype, bit in cls.etype_map.items():
409 if bit & bits:
410 bits &= ~bit
411 etypes += (etype,)
413 bits &= ~cls.fast_supported_bits
414 if bits != 0:
415 raise ValueError(f'Unsupported etype bits: {bits}')
417 return etypes
419 def get_as_krb5_etypes(self):
420 return self.bits_to_etypes(self.as_supported_enctypes)
422 def get_tgs_krb5_etypes(self):
423 return self.bits_to_etypes(self.tgs_supported_enctypes)
425 def get_ap_krb5_etypes(self):
426 return self.bits_to_etypes(self.ap_supported_enctypes)
428 def set_kvno(self, kvno):
429 # Sign-extend from 32 bits.
430 if kvno & 1 << 31:
431 kvno |= -1 << 31
432 self.kvno = kvno
434 def get_kvno(self):
435 return self.kvno
437 def set_forced_key(self, etype, hexkey):
438 etype = int(etype)
439 contents = binascii.a2b_hex(hexkey)
440 key = kcrypto.Key(etype, contents)
441 self.forced_keys[etype] = RodcPacEncryptionKey(key, self.kvno)
443 def get_forced_key(self, etype):
444 etype = int(etype)
445 return self.forced_keys.get(etype)
447 def set_forced_salt(self, salt):
448 self.forced_salt = bytes(salt)
450 def get_forced_salt(self):
451 return self.forced_salt
453 def get_salt(self):
454 if self.forced_salt is not None:
455 return self.forced_salt
457 upn = self.get_upn()
458 if upn is not None:
459 salt_name = upn.rsplit('@', 1)[0].replace('/', '')
460 else:
461 salt_name = self.get_username()
463 if self.get_workstation():
464 salt_name = self.get_username().lower()
465 if salt_name[-1] == '$':
466 salt_name = salt_name[:-1]
467 salt_string = '%shost%s.%s' % (
468 self.get_realm().upper(),
469 salt_name,
470 self.get_realm().lower())
471 else:
472 salt_string = self.get_realm().upper() + salt_name
474 return salt_string.encode('utf-8')
476 def set_dn(self, dn):
477 self.dn = dn
479 def get_dn(self):
480 return self.dn
482 def set_spn(self, spn):
483 self.spn = spn
485 def get_spn(self):
486 return self.spn
488 def set_upn(self, upn):
489 self.upn = upn
491 def get_upn(self):
492 return self.upn
495 class KerberosTicketCreds:
496 def __init__(self, ticket, session_key,
497 crealm=None, cname=None,
498 srealm=None, sname=None,
499 decryption_key=None,
500 ticket_private=None,
501 encpart_private=None):
502 self.ticket = ticket
503 self.session_key = session_key
504 self.crealm = crealm
505 self.cname = cname
506 self.srealm = srealm
507 self.sname = sname
508 self.decryption_key = decryption_key
509 self.ticket_private = ticket_private
510 self.encpart_private = encpart_private
513 class RawKerberosTest(TestCaseInTempDir):
514 """A raw Kerberos Test case."""
516 pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
517 krb5pac.PAC_TYPE_KDC_CHECKSUM,
518 krb5pac.PAC_TYPE_TICKET_CHECKSUM}
520 etypes_to_test = (
521 {"value": -1111, "name": "dummy", },
522 {"value": kcrypto.Enctype.AES256, "name": "aes128", },
523 {"value": kcrypto.Enctype.AES128, "name": "aes256", },
524 {"value": kcrypto.Enctype.RC4, "name": "rc4", },
527 setup_etype_test_permutations_done = False
529 @classmethod
530 def setup_etype_test_permutations(cls):
531 if cls.setup_etype_test_permutations_done:
532 return
534 res = []
536 num_idxs = len(cls.etypes_to_test)
537 permutations = []
538 for num in range(1, num_idxs + 1):
539 chunk = list(itertools.permutations(range(num_idxs), num))
540 for e in chunk:
541 el = list(e)
542 permutations.append(el)
544 for p in permutations:
545 name = None
546 etypes = ()
547 for idx in p:
548 n = cls.etypes_to_test[idx]["name"]
549 if name is None:
550 name = n
551 else:
552 name += "_%s" % n
553 etypes += (cls.etypes_to_test[idx]["value"],)
555 r = {"name": name, "etypes": etypes, }
556 res.append(r)
558 cls.etype_test_permutations = res
559 cls.setup_etype_test_permutations_done = True
561 @classmethod
562 def etype_test_permutation_name_idx(cls):
563 cls.setup_etype_test_permutations()
564 res = []
565 idx = 0
566 for e in cls.etype_test_permutations:
567 r = (e['name'], idx)
568 idx += 1
569 res.append(r)
570 return res
572 def etype_test_permutation_by_idx(self, idx):
573 e = self.etype_test_permutations[idx]
574 return (e['name'], e['etypes'])
576 @classmethod
577 def setUpClass(cls):
578 super().setUpClass()
580 cls.host = samba.tests.env_get_var_value('SERVER')
581 cls.dc_host = samba.tests.env_get_var_value('DC_SERVER')
583 # A dictionary containing credentials that have already been
584 # obtained.
585 cls.creds_dict = {}
587 kdc_fast_support = samba.tests.env_get_var_value('FAST_SUPPORT',
588 allow_missing=True)
589 if kdc_fast_support is None:
590 kdc_fast_support = '0'
591 cls.kdc_fast_support = bool(int(kdc_fast_support))
593 tkt_sig_support = samba.tests.env_get_var_value('TKT_SIG_SUPPORT',
594 allow_missing=True)
595 if tkt_sig_support is None:
596 tkt_sig_support = '0'
597 cls.tkt_sig_support = bool(int(tkt_sig_support))
599 def setUp(self):
600 super().setUp()
601 self.do_asn1_print = False
602 self.do_hexdump = False
604 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
605 allow_missing=True)
606 if strict_checking is None:
607 strict_checking = '1'
608 self.strict_checking = bool(int(strict_checking))
610 self.s = None
612 self.unspecified_kvno = object()
614 def tearDown(self):
615 self._disconnect("tearDown")
616 super().tearDown()
618 def _disconnect(self, reason):
619 if self.s is None:
620 return
621 self.s.close()
622 self.s = None
623 if self.do_hexdump:
624 sys.stderr.write("disconnect[%s]\n" % reason)
626 def _connect_tcp(self, host):
627 tcp_port = 88
628 try:
629 self.a = socket.getaddrinfo(host, tcp_port, socket.AF_UNSPEC,
630 socket.SOCK_STREAM, socket.SOL_TCP,
632 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
633 self.s.settimeout(10)
634 self.s.connect(self.a[0][4])
635 except socket.error:
636 self.s.close()
637 raise
638 except IOError:
639 self.s.close()
640 raise
642 def connect(self, host):
643 self.assertNotConnected()
644 self._connect_tcp(host)
645 if self.do_hexdump:
646 sys.stderr.write("connected[%s]\n" % host)
648 def env_get_var(self, varname, prefix,
649 fallback_default=True,
650 allow_missing=False):
651 val = None
652 if prefix is not None:
653 allow_missing_prefix = allow_missing or fallback_default
654 val = samba.tests.env_get_var_value(
655 '%s_%s' % (prefix, varname),
656 allow_missing=allow_missing_prefix)
657 else:
658 fallback_default = True
659 if val is None and fallback_default:
660 val = samba.tests.env_get_var_value(varname,
661 allow_missing=allow_missing)
662 return val
664 def _get_krb5_creds_from_env(self, prefix,
665 default_username=None,
666 allow_missing_password=False,
667 allow_missing_keys=True,
668 require_strongest_key=False):
669 c = KerberosCredentials()
670 c.guess()
672 domain = self.env_get_var('DOMAIN', prefix)
673 realm = self.env_get_var('REALM', prefix)
674 allow_missing_username = default_username is not None
675 username = self.env_get_var('USERNAME', prefix,
676 fallback_default=False,
677 allow_missing=allow_missing_username)
678 if username is None:
679 username = default_username
680 password = self.env_get_var('PASSWORD', prefix,
681 fallback_default=False,
682 allow_missing=allow_missing_password)
683 c.set_domain(domain)
684 c.set_realm(realm)
685 c.set_username(username)
686 if password is not None:
687 c.set_password(password)
688 as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
689 prefix, allow_missing=True)
690 if as_supported_enctypes is not None:
691 c.set_as_supported_enctypes(as_supported_enctypes)
692 tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
693 prefix, allow_missing=True)
694 if tgs_supported_enctypes is not None:
695 c.set_tgs_supported_enctypes(tgs_supported_enctypes)
696 ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
697 prefix, allow_missing=True)
698 if ap_supported_enctypes is not None:
699 c.set_ap_supported_enctypes(ap_supported_enctypes)
701 if require_strongest_key:
702 kvno_allow_missing = False
703 if password is None:
704 aes256_allow_missing = False
705 else:
706 aes256_allow_missing = True
707 else:
708 kvno_allow_missing = allow_missing_keys
709 aes256_allow_missing = allow_missing_keys
710 kvno = self.env_get_var('KVNO', prefix,
711 fallback_default=False,
712 allow_missing=kvno_allow_missing)
713 if kvno is not None:
714 c.set_kvno(kvno)
715 aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
716 fallback_default=False,
717 allow_missing=aes256_allow_missing)
718 if aes256_key is not None:
719 c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
720 aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
721 fallback_default=False,
722 allow_missing=True)
723 if aes128_key is not None:
724 c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
725 rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
726 fallback_default=False, allow_missing=True)
727 if rc4_key is not None:
728 c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
730 if not allow_missing_keys:
731 self.assertTrue(c.forced_keys,
732 'Please supply %s encryption keys '
733 'in environment' % prefix)
735 return c
737 def _get_krb5_creds(self,
738 prefix,
739 default_username=None,
740 allow_missing_password=False,
741 allow_missing_keys=True,
742 require_strongest_key=False,
743 fallback_creds_fn=None):
744 if prefix in self.creds_dict:
745 return self.creds_dict[prefix]
747 # We don't have the credentials already
748 creds = None
749 env_err = None
750 try:
751 # Try to obtain them from the environment
752 creds = self._get_krb5_creds_from_env(
753 prefix,
754 default_username=default_username,
755 allow_missing_password=allow_missing_password,
756 allow_missing_keys=allow_missing_keys,
757 require_strongest_key=require_strongest_key)
758 except Exception as err:
759 # An error occurred, so save it for later
760 env_err = err
761 else:
762 self.assertIsNotNone(creds)
763 # Save the obtained credentials
764 self.creds_dict[prefix] = creds
765 return creds
767 if fallback_creds_fn is not None:
768 try:
769 # Try to use the fallback method
770 creds = fallback_creds_fn()
771 except Exception as err:
772 print("ERROR FROM ENV: %r" % (env_err))
773 print("FALLBACK-FN: %s" % (fallback_creds_fn))
774 print("FALLBACK-ERROR: %r" % (err))
775 else:
776 self.assertIsNotNone(creds)
777 # Save the obtained credentials
778 self.creds_dict[prefix] = creds
779 return creds
781 # Both methods failed, so raise the exception from the
782 # environment method
783 raise env_err
785 def get_user_creds(self,
786 allow_missing_password=False,
787 allow_missing_keys=True):
788 c = self._get_krb5_creds(prefix=None,
789 allow_missing_password=allow_missing_password,
790 allow_missing_keys=allow_missing_keys)
791 return c
793 def get_service_creds(self,
794 allow_missing_password=False,
795 allow_missing_keys=True):
796 c = self._get_krb5_creds(prefix='SERVICE',
797 allow_missing_password=allow_missing_password,
798 allow_missing_keys=allow_missing_keys)
799 return c
801 def get_client_creds(self,
802 allow_missing_password=False,
803 allow_missing_keys=True):
804 c = self._get_krb5_creds(prefix='CLIENT',
805 allow_missing_password=allow_missing_password,
806 allow_missing_keys=allow_missing_keys)
807 return c
809 def get_server_creds(self,
810 allow_missing_password=False,
811 allow_missing_keys=True):
812 c = self._get_krb5_creds(prefix='SERVER',
813 allow_missing_password=allow_missing_password,
814 allow_missing_keys=allow_missing_keys)
815 return c
817 def get_admin_creds(self,
818 allow_missing_password=False,
819 allow_missing_keys=True):
820 c = self._get_krb5_creds(prefix='ADMIN',
821 allow_missing_password=allow_missing_password,
822 allow_missing_keys=allow_missing_keys)
823 c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
824 return c
826 def get_rodc_krbtgt_creds(self,
827 require_keys=True,
828 require_strongest_key=False):
829 if require_strongest_key:
830 self.assertTrue(require_keys)
831 c = self._get_krb5_creds(prefix='RODC_KRBTGT',
832 allow_missing_password=True,
833 allow_missing_keys=not require_keys,
834 require_strongest_key=require_strongest_key)
835 return c
837 def get_krbtgt_creds(self,
838 require_keys=True,
839 require_strongest_key=False):
840 if require_strongest_key:
841 self.assertTrue(require_keys)
842 c = self._get_krb5_creds(prefix='KRBTGT',
843 default_username='krbtgt',
844 allow_missing_password=True,
845 allow_missing_keys=not require_keys,
846 require_strongest_key=require_strongest_key)
847 return c
849 def get_anon_creds(self):
850 c = Credentials()
851 c.set_anonymous()
852 return c
854 def asn1_dump(self, name, obj, asn1_print=None):
855 if asn1_print is None:
856 asn1_print = self.do_asn1_print
857 if asn1_print:
858 if name is not None:
859 sys.stderr.write("%s:\n%s" % (name, obj))
860 else:
861 sys.stderr.write("%s" % (obj))
863 def hex_dump(self, name, blob, hexdump=None):
864 if hexdump is None:
865 hexdump = self.do_hexdump
866 if hexdump:
867 sys.stderr.write(
868 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
870 def der_decode(
871 self,
872 blob,
873 asn1Spec=None,
874 native_encode=True,
875 asn1_print=None,
876 hexdump=None):
877 if asn1Spec is not None:
878 class_name = type(asn1Spec).__name__.split(':')[0]
879 else:
880 class_name = "<None-asn1Spec>"
881 self.hex_dump(class_name, blob, hexdump=hexdump)
882 obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
883 self.asn1_dump(None, obj, asn1_print=asn1_print)
884 if native_encode:
885 obj = pyasn1_native_encode(obj)
886 return obj
888 def der_encode(
889 self,
890 obj,
891 asn1Spec=None,
892 native_decode=True,
893 asn1_print=None,
894 hexdump=None):
895 if native_decode:
896 obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
897 class_name = type(obj).__name__.split(':')[0]
898 if class_name is not None:
899 self.asn1_dump(None, obj, asn1_print=asn1_print)
900 blob = pyasn1_der_encode(obj)
901 if class_name is not None:
902 self.hex_dump(class_name, blob, hexdump=hexdump)
903 return blob
905 def send_pdu(self, req, asn1_print=None, hexdump=None):
906 try:
907 k5_pdu = self.der_encode(
908 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
909 header = struct.pack('>I', len(k5_pdu))
910 req_pdu = header
911 req_pdu += k5_pdu
912 self.hex_dump("send_pdu", header, hexdump=hexdump)
913 self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
914 while True:
915 sent = self.s.send(req_pdu, 0)
916 if sent == len(req_pdu):
917 break
918 req_pdu = req_pdu[sent:]
919 except socket.error as e:
920 self._disconnect("send_pdu: %s" % e)
921 raise
922 except IOError as e:
923 self._disconnect("send_pdu: %s" % e)
924 raise
926 def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
927 rep_pdu = None
928 try:
929 if timeout is not None:
930 self.s.settimeout(timeout)
931 rep_pdu = self.s.recv(num_recv, 0)
932 self.s.settimeout(10)
933 if len(rep_pdu) == 0:
934 self._disconnect("recv_raw: EOF")
935 return None
936 self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
937 except socket.timeout:
938 self.s.settimeout(10)
939 sys.stderr.write("recv_raw: TIMEOUT\n")
940 except socket.error as e:
941 self._disconnect("recv_raw: %s" % e)
942 raise
943 except IOError as e:
944 self._disconnect("recv_raw: %s" % e)
945 raise
946 return rep_pdu
948 def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
949 rep_pdu = None
950 rep = None
951 raw_pdu = self.recv_raw(
952 num_recv=4, hexdump=hexdump, timeout=timeout)
953 if raw_pdu is None:
954 return (None, None)
955 header = struct.unpack(">I", raw_pdu[0:4])
956 k5_len = header[0]
957 if k5_len == 0:
958 return (None, "")
959 missing = k5_len
960 rep_pdu = b''
961 while missing > 0:
962 raw_pdu = self.recv_raw(
963 num_recv=missing, hexdump=hexdump, timeout=timeout)
964 self.assertGreaterEqual(len(raw_pdu), 1)
965 rep_pdu += raw_pdu
966 missing = k5_len - len(rep_pdu)
967 k5_raw = self.der_decode(
968 rep_pdu,
969 asn1Spec=None,
970 native_encode=False,
971 asn1_print=False,
972 hexdump=False)
973 pvno = k5_raw['field-0']
974 self.assertEqual(pvno, 5)
975 msg_type = k5_raw['field-1']
976 self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
977 if msg_type == KRB_AS_REP:
978 asn1Spec = krb5_asn1.AS_REP()
979 elif msg_type == KRB_TGS_REP:
980 asn1Spec = krb5_asn1.TGS_REP()
981 elif msg_type == KRB_ERROR:
982 asn1Spec = krb5_asn1.KRB_ERROR()
983 rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
984 asn1_print=asn1_print, hexdump=False)
985 return (rep, rep_pdu)
987 def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
988 (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
989 hexdump=hexdump,
990 timeout=timeout)
991 return rep
993 def assertIsConnected(self):
994 self.assertIsNotNone(self.s, msg="Not connected")
996 def assertNotConnected(self):
997 self.assertIsNone(self.s, msg="Is connected")
999 def send_recv_transaction(
1000 self,
1001 req,
1002 asn1_print=None,
1003 hexdump=None,
1004 timeout=None,
1005 to_rodc=False):
1006 host = self.host if to_rodc else self.dc_host
1007 self.connect(host)
1008 try:
1009 self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
1010 rep = self.recv_pdu(
1011 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
1012 except Exception:
1013 self._disconnect("transaction failed")
1014 raise
1015 self._disconnect("transaction done")
1016 return rep
1018 def assertNoValue(self, value):
1019 self.assertTrue(value.isNoValue)
1021 def assertHasValue(self, value):
1022 self.assertIsNotNone(value)
1024 def getElementValue(self, obj, elem):
1025 return obj.get(elem)
1027 def assertElementMissing(self, obj, elem):
1028 v = self.getElementValue(obj, elem)
1029 self.assertIsNone(v)
1031 def assertElementPresent(self, obj, elem, expect_empty=False):
1032 v = self.getElementValue(obj, elem)
1033 self.assertIsNotNone(v)
1034 if self.strict_checking:
1035 if isinstance(v, collections.abc.Container):
1036 if expect_empty:
1037 self.assertEqual(0, len(v))
1038 else:
1039 self.assertNotEqual(0, len(v))
1041 def assertElementEqual(self, obj, elem, value):
1042 v = self.getElementValue(obj, elem)
1043 self.assertIsNotNone(v)
1044 self.assertEqual(v, value)
1046 def assertElementEqualUTF8(self, obj, elem, value):
1047 v = self.getElementValue(obj, elem)
1048 self.assertIsNotNone(v)
1049 self.assertEqual(v, bytes(value, 'utf8'))
1051 def assertPrincipalEqual(self, princ1, princ2):
1052 self.assertEqual(princ1['name-type'], princ2['name-type'])
1053 self.assertEqual(
1054 len(princ1['name-string']),
1055 len(princ2['name-string']),
1056 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1057 for idx in range(len(princ1['name-string'])):
1058 self.assertEqual(
1059 princ1['name-string'][idx],
1060 princ2['name-string'][idx],
1061 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1063 def assertElementEqualPrincipal(self, obj, elem, value):
1064 v = self.getElementValue(obj, elem)
1065 self.assertIsNotNone(v)
1066 v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
1067 self.assertPrincipalEqual(v, value)
1069 def assertElementKVNO(self, obj, elem, value):
1070 v = self.getElementValue(obj, elem)
1071 if value == "autodetect":
1072 value = v
1073 if value is not None:
1074 self.assertIsNotNone(v)
1075 # The value on the wire should never be 0
1076 self.assertNotEqual(v, 0)
1077 # unspecified_kvno means we don't know the kvno,
1078 # but want to enforce its presence
1079 if value is not self.unspecified_kvno:
1080 value = int(value)
1081 self.assertNotEqual(value, 0)
1082 self.assertEqual(v, value)
1083 else:
1084 self.assertIsNone(v)
1086 def assertElementFlags(self, obj, elem, expected, unexpected):
1087 v = self.getElementValue(obj, elem)
1088 self.assertIsNotNone(v)
1089 if expected is not None:
1090 self.assertIsInstance(expected, krb5_asn1.TicketFlags)
1091 for i, flag in enumerate(expected):
1092 if flag == 1:
1093 self.assertEqual('1', v[i],
1094 f"'{expected.namedValues[i]}' "
1095 f"expected in {v}")
1096 if unexpected is not None:
1097 self.assertIsInstance(unexpected, krb5_asn1.TicketFlags)
1098 for i, flag in enumerate(unexpected):
1099 if flag == 1:
1100 self.assertEqual('0', v[i],
1101 f"'{unexpected.namedValues[i]}' "
1102 f"unexpected in {v}")
1104 def assertSequenceElementsEqual(self, expected, got, *,
1105 require_strict=None):
1106 if self.strict_checking:
1107 self.assertEqual(expected, got)
1108 else:
1109 fail_msg = f'expected: {expected} got: {got}'
1111 if require_strict is not None:
1112 fail_msg += f' (ignoring: {require_strict})'
1113 expected = (x for x in expected if x not in require_strict)
1114 got = (x for x in got if x not in require_strict)
1116 self.assertCountEqual(expected, got, fail_msg)
1118 def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
1119 if epoch is None:
1120 epoch = time.time()
1121 if offset is not None:
1122 epoch = epoch + int(offset)
1123 dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
1124 return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
1126 def get_KerberosTime(self, epoch=None, offset=None):
1127 (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
1128 return s
1130 def get_EpochFromKerberosTime(self, kerberos_time):
1131 if isinstance(kerberos_time, bytes):
1132 kerberos_time = kerberos_time.decode()
1134 epoch = datetime.datetime.strptime(kerberos_time,
1135 '%Y%m%d%H%M%SZ')
1136 epoch = epoch.replace(tzinfo=datetime.timezone.utc)
1137 epoch = int(epoch.timestamp())
1139 return epoch
1141 def get_Nonce(self):
1142 nonce_min = 0x7f000000
1143 nonce_max = 0x7fffffff
1144 v = random.randint(nonce_min, nonce_max)
1145 return v
1147 def get_pa_dict(self, pa_data):
1148 pa_dict = {}
1150 if pa_data is not None:
1151 for pa in pa_data:
1152 pa_type = pa['padata-type']
1153 if pa_type in pa_dict:
1154 raise RuntimeError(f'Duplicate type {pa_type}')
1155 pa_dict[pa_type] = pa['padata-value']
1157 return pa_dict
1159 def SessionKey_create(self, etype, contents, kvno=None):
1160 key = kcrypto.Key(etype, contents)
1161 return RodcPacEncryptionKey(key, kvno)
1163 def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None):
1164 self.assertIsNotNone(pwd)
1165 self.assertIsNotNone(salt)
1166 key = kcrypto.string_to_key(etype, pwd, salt)
1167 return RodcPacEncryptionKey(key, kvno)
1169 def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
1170 e = etype_info2['etype']
1172 salt = etype_info2.get('salt')
1174 if e == kcrypto.Enctype.RC4:
1175 nthash = creds.get_nt_hash()
1176 return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
1178 password = creds.get_password()
1179 return self.PasswordKey_create(
1180 etype=e, pwd=password, salt=salt, kvno=kvno)
1182 def TicketDecryptionKey_from_creds(self, creds, etype=None):
1184 if etype is None:
1185 etypes = creds.get_tgs_krb5_etypes()
1186 if etypes:
1187 etype = etypes[0]
1188 else:
1189 etype = kcrypto.Enctype.RC4
1191 forced_key = creds.get_forced_key(etype)
1192 if forced_key is not None:
1193 return forced_key
1195 kvno = creds.get_kvno()
1197 fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
1198 "nor a password specified, " % (
1199 creds.get_username(), etype, kvno))
1201 if etype == kcrypto.Enctype.RC4:
1202 nthash = creds.get_nt_hash()
1203 self.assertIsNotNone(nthash, msg=fail_msg)
1204 return self.SessionKey_create(etype=etype,
1205 contents=nthash,
1206 kvno=kvno)
1208 password = creds.get_password()
1209 self.assertIsNotNone(password, msg=fail_msg)
1210 salt = creds.get_salt()
1211 return self.PasswordKey_create(etype=etype,
1212 pwd=password,
1213 salt=salt,
1214 kvno=kvno)
1216 def RandomKey(self, etype):
1217 e = kcrypto._get_enctype_profile(etype)
1218 contents = samba.generate_random_bytes(e.keysize)
1219 return self.SessionKey_create(etype=etype, contents=contents)
1221 def EncryptionKey_import(self, EncryptionKey_obj):
1222 return self.SessionKey_create(EncryptionKey_obj['keytype'],
1223 EncryptionKey_obj['keyvalue'])
1225 def EncryptedData_create(self, key, usage, plaintext):
1226 # EncryptedData ::= SEQUENCE {
1227 # etype [0] Int32 -- EncryptionType --,
1228 # kvno [1] Int32 OPTIONAL,
1229 # cipher [2] OCTET STRING -- ciphertext
1231 ciphertext = key.encrypt(usage, plaintext)
1232 EncryptedData_obj = {
1233 'etype': key.etype,
1234 'cipher': ciphertext
1236 if key.kvno is not None:
1237 EncryptedData_obj['kvno'] = key.kvno
1238 return EncryptedData_obj
1240 def Checksum_create(self, key, usage, plaintext, ctype=None):
1241 # Checksum ::= SEQUENCE {
1242 # cksumtype [0] Int32,
1243 # checksum [1] OCTET STRING
1245 if ctype is None:
1246 ctype = key.ctype
1247 checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1248 Checksum_obj = {
1249 'cksumtype': ctype,
1250 'checksum': checksum,
1252 return Checksum_obj
1254 @classmethod
1255 def PrincipalName_create(cls, name_type, names):
1256 # PrincipalName ::= SEQUENCE {
1257 # name-type [0] Int32,
1258 # name-string [1] SEQUENCE OF KerberosString
1260 PrincipalName_obj = {
1261 'name-type': name_type,
1262 'name-string': names,
1264 return PrincipalName_obj
1266 def AuthorizationData_create(self, ad_type, ad_data):
1267 # AuthorizationData ::= SEQUENCE {
1268 # ad-type [0] Int32,
1269 # ad-data [1] OCTET STRING
1271 AUTH_DATA_obj = {
1272 'ad-type': ad_type,
1273 'ad-data': ad_data
1275 return AUTH_DATA_obj
1277 def PA_DATA_create(self, padata_type, padata_value):
1278 # PA-DATA ::= SEQUENCE {
1279 # -- NOTE: first tag is [1], not [0]
1280 # padata-type [1] Int32,
1281 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1283 PA_DATA_obj = {
1284 'padata-type': padata_type,
1285 'padata-value': padata_value,
1287 return PA_DATA_obj
1289 def PA_ENC_TS_ENC_create(self, ts, usec):
1290 # PA-ENC-TS-ENC ::= SEQUENCE {
1291 # patimestamp[0] KerberosTime, -- client's time
1292 # pausec[1] krb5int32 OPTIONAL
1294 PA_ENC_TS_ENC_obj = {
1295 'patimestamp': ts,
1296 'pausec': usec,
1298 return PA_ENC_TS_ENC_obj
1300 def PA_PAC_OPTIONS_create(self, options):
1301 # PA-PAC-OPTIONS ::= SEQUENCE {
1302 # options [0] PACOptionFlags
1304 PA_PAC_OPTIONS_obj = {
1305 'options': options
1307 return PA_PAC_OPTIONS_obj
1309 def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1310 # KrbFastArmor ::= SEQUENCE {
1311 # armor-type [0] Int32,
1312 # armor-value [1] OCTET STRING,
1313 # ...
1315 KRB_FAST_ARMOR_obj = {
1316 'armor-type': armor_type,
1317 'armor-value': armor_value
1319 return KRB_FAST_ARMOR_obj
1321 def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1322 # KrbFastReq ::= SEQUENCE {
1323 # fast-options [0] FastOptions,
1324 # padata [1] SEQUENCE OF PA-DATA,
1325 # req-body [2] KDC-REQ-BODY,
1326 # ...
1328 KRB_FAST_REQ_obj = {
1329 'fast-options': fast_options,
1330 'padata': padata,
1331 'req-body': req_body
1333 return KRB_FAST_REQ_obj
1335 def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1336 # KrbFastArmoredReq ::= SEQUENCE {
1337 # armor [0] KrbFastArmor OPTIONAL,
1338 # req-checksum [1] Checksum,
1339 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1341 KRB_FAST_ARMORED_REQ_obj = {
1342 'req-checksum': req_checksum,
1343 'enc-fast-req': enc_fast_req
1345 if armor is not None:
1346 KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1347 return KRB_FAST_ARMORED_REQ_obj
1349 def PA_FX_FAST_REQUEST_create(self, armored_data):
1350 # PA-FX-FAST-REQUEST ::= CHOICE {
1351 # armored-data [0] KrbFastArmoredReq,
1352 # ...
1354 PA_FX_FAST_REQUEST_obj = {
1355 'armored-data': armored_data
1357 return PA_FX_FAST_REQUEST_obj
1359 def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1360 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1361 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1362 # -- include PAC.
1363 # --If FALSE, and PAC present,
1364 # -- remove PAC.
1366 KERB_PA_PAC_REQUEST_obj = {
1367 'include-pac': include_pac,
1369 if not pa_data_create:
1370 return KERB_PA_PAC_REQUEST_obj
1371 pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1372 asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1373 pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1374 return pa_data
1376 def get_pa_pac_options(self, options):
1377 pac_options = self.PA_PAC_OPTIONS_create(options)
1378 pac_options = self.der_encode(pac_options,
1379 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
1380 pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options)
1382 return pac_options
1384 def KDC_REQ_BODY_create(self,
1385 kdc_options,
1386 cname,
1387 realm,
1388 sname,
1389 from_time,
1390 till_time,
1391 renew_time,
1392 nonce,
1393 etypes,
1394 addresses,
1395 additional_tickets,
1396 EncAuthorizationData,
1397 EncAuthorizationData_key,
1398 EncAuthorizationData_usage,
1399 asn1_print=None,
1400 hexdump=None):
1401 # KDC-REQ-BODY ::= SEQUENCE {
1402 # kdc-options [0] KDCOptions,
1403 # cname [1] PrincipalName OPTIONAL
1404 # -- Used only in AS-REQ --,
1405 # realm [2] Realm
1406 # -- Server's realm
1407 # -- Also client's in AS-REQ --,
1408 # sname [3] PrincipalName OPTIONAL,
1409 # from [4] KerberosTime OPTIONAL,
1410 # till [5] KerberosTime,
1411 # rtime [6] KerberosTime OPTIONAL,
1412 # nonce [7] UInt32,
1413 # etype [8] SEQUENCE OF Int32
1414 # -- EncryptionType
1415 # -- in preference order --,
1416 # addresses [9] HostAddresses OPTIONAL,
1417 # enc-authorization-data [10] EncryptedData OPTIONAL
1418 # -- AuthorizationData --,
1419 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1420 # -- NOTE: not empty
1422 if EncAuthorizationData is not None:
1423 enc_ad_plain = self.der_encode(
1424 EncAuthorizationData,
1425 asn1Spec=krb5_asn1.AuthorizationData(),
1426 asn1_print=asn1_print,
1427 hexdump=hexdump)
1428 enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1429 EncAuthorizationData_usage,
1430 enc_ad_plain)
1431 else:
1432 enc_ad = None
1433 KDC_REQ_BODY_obj = {
1434 'kdc-options': kdc_options,
1435 'realm': realm,
1436 'till': till_time,
1437 'nonce': nonce,
1438 'etype': etypes,
1440 if cname is not None:
1441 KDC_REQ_BODY_obj['cname'] = cname
1442 if sname is not None:
1443 KDC_REQ_BODY_obj['sname'] = sname
1444 if from_time is not None:
1445 KDC_REQ_BODY_obj['from'] = from_time
1446 if renew_time is not None:
1447 KDC_REQ_BODY_obj['rtime'] = renew_time
1448 if addresses is not None:
1449 KDC_REQ_BODY_obj['addresses'] = addresses
1450 if enc_ad is not None:
1451 KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1452 if additional_tickets is not None:
1453 KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1454 return KDC_REQ_BODY_obj
1456 def KDC_REQ_create(self,
1457 msg_type,
1458 padata,
1459 req_body,
1460 asn1Spec=None,
1461 asn1_print=None,
1462 hexdump=None):
1463 # KDC-REQ ::= SEQUENCE {
1464 # -- NOTE: first tag is [1], not [0]
1465 # pvno [1] INTEGER (5) ,
1466 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1467 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1468 # -- NOTE: not empty --,
1469 # req-body [4] KDC-REQ-BODY
1472 KDC_REQ_obj = {
1473 'pvno': 5,
1474 'msg-type': msg_type,
1475 'req-body': req_body,
1477 if padata is not None:
1478 KDC_REQ_obj['padata'] = padata
1479 if asn1Spec is not None:
1480 KDC_REQ_decoded = pyasn1_native_decode(
1481 KDC_REQ_obj, asn1Spec=asn1Spec)
1482 else:
1483 KDC_REQ_decoded = None
1484 return KDC_REQ_obj, KDC_REQ_decoded
1486 def AS_REQ_create(self,
1487 padata, # optional
1488 kdc_options, # required
1489 cname, # optional
1490 realm, # required
1491 sname, # optional
1492 from_time, # optional
1493 till_time, # required
1494 renew_time, # optional
1495 nonce, # required
1496 etypes, # required
1497 addresses, # optional
1498 additional_tickets,
1499 native_decoded_only=True,
1500 asn1_print=None,
1501 hexdump=None):
1502 # KDC-REQ ::= SEQUENCE {
1503 # -- NOTE: first tag is [1], not [0]
1504 # pvno [1] INTEGER (5) ,
1505 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1506 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1507 # -- NOTE: not empty --,
1508 # req-body [4] KDC-REQ-BODY
1511 # KDC-REQ-BODY ::= SEQUENCE {
1512 # kdc-options [0] KDCOptions,
1513 # cname [1] PrincipalName OPTIONAL
1514 # -- Used only in AS-REQ --,
1515 # realm [2] Realm
1516 # -- Server's realm
1517 # -- Also client's in AS-REQ --,
1518 # sname [3] PrincipalName OPTIONAL,
1519 # from [4] KerberosTime OPTIONAL,
1520 # till [5] KerberosTime,
1521 # rtime [6] KerberosTime OPTIONAL,
1522 # nonce [7] UInt32,
1523 # etype [8] SEQUENCE OF Int32
1524 # -- EncryptionType
1525 # -- in preference order --,
1526 # addresses [9] HostAddresses OPTIONAL,
1527 # enc-authorization-data [10] EncryptedData OPTIONAL
1528 # -- AuthorizationData --,
1529 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1530 # -- NOTE: not empty
1532 KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1533 kdc_options,
1534 cname,
1535 realm,
1536 sname,
1537 from_time,
1538 till_time,
1539 renew_time,
1540 nonce,
1541 etypes,
1542 addresses,
1543 additional_tickets,
1544 EncAuthorizationData=None,
1545 EncAuthorizationData_key=None,
1546 EncAuthorizationData_usage=None,
1547 asn1_print=asn1_print,
1548 hexdump=hexdump)
1549 obj, decoded = self.KDC_REQ_create(
1550 msg_type=KRB_AS_REQ,
1551 padata=padata,
1552 req_body=KDC_REQ_BODY_obj,
1553 asn1Spec=krb5_asn1.AS_REQ(),
1554 asn1_print=asn1_print,
1555 hexdump=hexdump)
1556 if native_decoded_only:
1557 return decoded
1558 return decoded, obj
1560 def AP_REQ_create(self, ap_options, ticket, authenticator):
1561 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1562 # pvno [0] INTEGER (5),
1563 # msg-type [1] INTEGER (14),
1564 # ap-options [2] APOptions,
1565 # ticket [3] Ticket,
1566 # authenticator [4] EncryptedData -- Authenticator
1568 AP_REQ_obj = {
1569 'pvno': 5,
1570 'msg-type': KRB_AP_REQ,
1571 'ap-options': ap_options,
1572 'ticket': ticket,
1573 'authenticator': authenticator,
1575 return AP_REQ_obj
1577 def Authenticator_create(
1578 self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1579 authorization_data):
1580 # -- Unencrypted authenticator
1581 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1582 # authenticator-vno [0] INTEGER (5),
1583 # crealm [1] Realm,
1584 # cname [2] PrincipalName,
1585 # cksum [3] Checksum OPTIONAL,
1586 # cusec [4] Microseconds,
1587 # ctime [5] KerberosTime,
1588 # subkey [6] EncryptionKey OPTIONAL,
1589 # seq-number [7] UInt32 OPTIONAL,
1590 # authorization-data [8] AuthorizationData OPTIONAL
1592 Authenticator_obj = {
1593 'authenticator-vno': 5,
1594 'crealm': crealm,
1595 'cname': cname,
1596 'cusec': cusec,
1597 'ctime': ctime,
1599 if cksum is not None:
1600 Authenticator_obj['cksum'] = cksum
1601 if subkey is not None:
1602 Authenticator_obj['subkey'] = subkey
1603 if seq_number is not None:
1604 Authenticator_obj['seq-number'] = seq_number
1605 if authorization_data is not None:
1606 Authenticator_obj['authorization-data'] = authorization_data
1607 return Authenticator_obj
1609 def TGS_REQ_create(self,
1610 padata, # optional
1611 cusec,
1612 ctime,
1613 ticket,
1614 kdc_options, # required
1615 cname, # optional
1616 realm, # required
1617 sname, # optional
1618 from_time, # optional
1619 till_time, # required
1620 renew_time, # optional
1621 nonce, # required
1622 etypes, # required
1623 addresses, # optional
1624 EncAuthorizationData,
1625 EncAuthorizationData_key,
1626 additional_tickets,
1627 ticket_session_key,
1628 authenticator_subkey=None,
1629 body_checksum_type=None,
1630 native_decoded_only=True,
1631 asn1_print=None,
1632 hexdump=None):
1633 # KDC-REQ ::= SEQUENCE {
1634 # -- NOTE: first tag is [1], not [0]
1635 # pvno [1] INTEGER (5) ,
1636 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1637 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1638 # -- NOTE: not empty --,
1639 # req-body [4] KDC-REQ-BODY
1642 # KDC-REQ-BODY ::= SEQUENCE {
1643 # kdc-options [0] KDCOptions,
1644 # cname [1] PrincipalName OPTIONAL
1645 # -- Used only in AS-REQ --,
1646 # realm [2] Realm
1647 # -- Server's realm
1648 # -- Also client's in AS-REQ --,
1649 # sname [3] PrincipalName OPTIONAL,
1650 # from [4] KerberosTime OPTIONAL,
1651 # till [5] KerberosTime,
1652 # rtime [6] KerberosTime OPTIONAL,
1653 # nonce [7] UInt32,
1654 # etype [8] SEQUENCE OF Int32
1655 # -- EncryptionType
1656 # -- in preference order --,
1657 # addresses [9] HostAddresses OPTIONAL,
1658 # enc-authorization-data [10] EncryptedData OPTIONAL
1659 # -- AuthorizationData --,
1660 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1661 # -- NOTE: not empty
1664 if authenticator_subkey is not None:
1665 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1666 else:
1667 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1669 req_body = self.KDC_REQ_BODY_create(
1670 kdc_options=kdc_options,
1671 cname=None,
1672 realm=realm,
1673 sname=sname,
1674 from_time=from_time,
1675 till_time=till_time,
1676 renew_time=renew_time,
1677 nonce=nonce,
1678 etypes=etypes,
1679 addresses=addresses,
1680 additional_tickets=additional_tickets,
1681 EncAuthorizationData=EncAuthorizationData,
1682 EncAuthorizationData_key=EncAuthorizationData_key,
1683 EncAuthorizationData_usage=EncAuthorizationData_usage)
1684 req_body_blob = self.der_encode(req_body,
1685 asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1686 asn1_print=asn1_print, hexdump=hexdump)
1688 req_body_checksum = self.Checksum_create(ticket_session_key,
1689 KU_TGS_REQ_AUTH_CKSUM,
1690 req_body_blob,
1691 ctype=body_checksum_type)
1693 subkey_obj = None
1694 if authenticator_subkey is not None:
1695 subkey_obj = authenticator_subkey.export_obj()
1696 seq_number = random.randint(0, 0xfffffffe)
1697 authenticator = self.Authenticator_create(
1698 crealm=realm,
1699 cname=cname,
1700 cksum=req_body_checksum,
1701 cusec=cusec,
1702 ctime=ctime,
1703 subkey=subkey_obj,
1704 seq_number=seq_number,
1705 authorization_data=None)
1706 authenticator = self.der_encode(
1707 authenticator,
1708 asn1Spec=krb5_asn1.Authenticator(),
1709 asn1_print=asn1_print,
1710 hexdump=hexdump)
1712 authenticator = self.EncryptedData_create(
1713 ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1715 ap_options = krb5_asn1.APOptions('0')
1716 ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1717 ticket=ticket,
1718 authenticator=authenticator)
1719 ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1720 asn1_print=asn1_print, hexdump=hexdump)
1721 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1722 if padata is not None:
1723 padata.append(pa_tgs_req)
1724 else:
1725 padata = [pa_tgs_req]
1727 obj, decoded = self.KDC_REQ_create(
1728 msg_type=KRB_TGS_REQ,
1729 padata=padata,
1730 req_body=req_body,
1731 asn1Spec=krb5_asn1.TGS_REQ(),
1732 asn1_print=asn1_print,
1733 hexdump=hexdump)
1734 if native_decoded_only:
1735 return decoded
1736 return decoded, obj
1738 def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1739 # PA-S4U2Self ::= SEQUENCE {
1740 # name [0] PrincipalName,
1741 # realm [1] Realm,
1742 # cksum [2] Checksum,
1743 # auth [3] GeneralString
1745 cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1746 for n in name['name-string']:
1747 cksum_data += n.encode()
1748 cksum_data += realm.encode()
1749 cksum_data += "Kerberos".encode()
1750 cksum = self.Checksum_create(tgt_session_key,
1751 KU_NON_KERB_CKSUM_SALT,
1752 cksum_data,
1753 ctype)
1755 PA_S4U2Self_obj = {
1756 'name': name,
1757 'realm': realm,
1758 'cksum': cksum,
1759 'auth': "Kerberos",
1761 pa_s4u2self = self.der_encode(
1762 PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
1763 return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
1765 def _generic_kdc_exchange(self,
1766 kdc_exchange_dict, # required
1767 cname=None, # optional
1768 realm=None, # required
1769 sname=None, # optional
1770 from_time=None, # optional
1771 till_time=None, # required
1772 renew_time=None, # optional
1773 etypes=None, # required
1774 addresses=None, # optional
1775 additional_tickets=None, # optional
1776 EncAuthorizationData=None, # optional
1777 EncAuthorizationData_key=None, # optional
1778 EncAuthorizationData_usage=None): # optional
1780 check_error_fn = kdc_exchange_dict['check_error_fn']
1781 check_rep_fn = kdc_exchange_dict['check_rep_fn']
1782 generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
1783 generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
1784 generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
1785 generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
1786 callback_dict = kdc_exchange_dict['callback_dict']
1787 req_msg_type = kdc_exchange_dict['req_msg_type']
1788 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
1789 rep_msg_type = kdc_exchange_dict['rep_msg_type']
1791 expected_error_mode = kdc_exchange_dict['expected_error_mode']
1792 kdc_options = kdc_exchange_dict['kdc_options']
1794 pac_request = kdc_exchange_dict['pac_request']
1795 pac_options = kdc_exchange_dict['pac_options']
1797 # Parameters specific to the inner request body
1798 inner_req = kdc_exchange_dict['inner_req']
1800 # Parameters specific to the outer request body
1801 outer_req = kdc_exchange_dict['outer_req']
1803 if till_time is None:
1804 till_time = self.get_KerberosTime(offset=36000)
1806 if 'nonce' in kdc_exchange_dict:
1807 nonce = kdc_exchange_dict['nonce']
1808 else:
1809 nonce = self.get_Nonce()
1810 kdc_exchange_dict['nonce'] = nonce
1812 req_body = self.KDC_REQ_BODY_create(
1813 kdc_options=kdc_options,
1814 cname=cname,
1815 realm=realm,
1816 sname=sname,
1817 from_time=from_time,
1818 till_time=till_time,
1819 renew_time=renew_time,
1820 nonce=nonce,
1821 etypes=etypes,
1822 addresses=addresses,
1823 additional_tickets=additional_tickets,
1824 EncAuthorizationData=EncAuthorizationData,
1825 EncAuthorizationData_key=EncAuthorizationData_key,
1826 EncAuthorizationData_usage=EncAuthorizationData_usage)
1828 inner_req_body = dict(req_body)
1829 if inner_req is not None:
1830 for key, value in inner_req.items():
1831 if value is not None:
1832 inner_req_body[key] = value
1833 else:
1834 del inner_req_body[key]
1835 if outer_req is not None:
1836 for key, value in outer_req.items():
1837 if value is not None:
1838 req_body[key] = value
1839 else:
1840 del req_body[key]
1842 additional_padata = []
1843 if pac_request is not None:
1844 pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request)
1845 additional_padata.append(pa_pac_request)
1846 if pac_options is not None:
1847 pa_pac_options = self.get_pa_pac_options(pac_options)
1848 additional_padata.append(pa_pac_options)
1850 if req_msg_type == KRB_AS_REQ:
1851 tgs_req = None
1852 tgs_req_padata = None
1853 else:
1854 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1856 tgs_req = self.generate_ap_req(kdc_exchange_dict,
1857 callback_dict,
1858 req_body,
1859 armor=False)
1860 tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
1862 if generate_fast_padata_fn is not None:
1863 self.assertIsNotNone(generate_fast_fn)
1864 # This can alter req_body...
1865 fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
1866 callback_dict,
1867 req_body)
1868 else:
1869 fast_padata = []
1871 if generate_fast_armor_fn is not None:
1872 self.assertIsNotNone(generate_fast_fn)
1873 fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
1874 callback_dict,
1875 req_body,
1876 armor=True)
1878 fast_armor_type = kdc_exchange_dict['fast_armor_type']
1879 fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
1880 fast_ap_req)
1881 else:
1882 fast_armor = None
1884 if generate_padata_fn is not None:
1885 # This can alter req_body...
1886 outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
1887 callback_dict,
1888 req_body)
1889 self.assertIsNotNone(outer_padata)
1890 self.assertNotIn(PADATA_KDC_REQ,
1891 [pa['padata-type'] for pa in outer_padata],
1892 'Don\'t create TGS-REQ manually')
1893 else:
1894 outer_padata = None
1896 if generate_fast_fn is not None:
1897 armor_key = kdc_exchange_dict['armor_key']
1898 self.assertIsNotNone(armor_key)
1900 if req_msg_type == KRB_AS_REQ:
1901 checksum_blob = self.der_encode(
1902 req_body,
1903 asn1Spec=krb5_asn1.KDC_REQ_BODY())
1904 else:
1905 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1906 checksum_blob = tgs_req
1908 checksum = self.Checksum_create(armor_key,
1909 KU_FAST_REQ_CHKSUM,
1910 checksum_blob)
1912 fast_padata += additional_padata
1913 fast = generate_fast_fn(kdc_exchange_dict,
1914 callback_dict,
1915 inner_req_body,
1916 fast_padata,
1917 fast_armor,
1918 checksum)
1919 else:
1920 fast = None
1922 padata = []
1924 if tgs_req_padata is not None:
1925 padata.append(tgs_req_padata)
1927 if fast is not None:
1928 padata.append(fast)
1930 if outer_padata is not None:
1931 padata += outer_padata
1933 if fast is None:
1934 padata += additional_padata
1936 if not padata:
1937 padata = None
1939 kdc_exchange_dict['req_padata'] = padata
1940 kdc_exchange_dict['fast_padata'] = fast_padata
1941 kdc_exchange_dict['req_body'] = inner_req_body
1943 req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
1944 padata=padata,
1945 req_body=req_body,
1946 asn1Spec=req_asn1Spec())
1948 to_rodc = kdc_exchange_dict['to_rodc']
1950 rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc)
1951 self.assertIsNotNone(rep)
1953 msg_type = self.getElementValue(rep, 'msg-type')
1954 self.assertIsNotNone(msg_type)
1956 expected_msg_type = None
1957 if check_error_fn is not None:
1958 expected_msg_type = KRB_ERROR
1959 self.assertIsNone(check_rep_fn)
1960 self.assertNotEqual(0, len(expected_error_mode))
1961 self.assertNotIn(0, expected_error_mode)
1962 if check_rep_fn is not None:
1963 expected_msg_type = rep_msg_type
1964 self.assertIsNone(check_error_fn)
1965 self.assertEqual(0, len(expected_error_mode))
1966 self.assertIsNotNone(expected_msg_type)
1967 if msg_type == KRB_ERROR:
1968 error_code = self.getElementValue(rep, 'error-code')
1969 fail_msg = f'Got unexpected error: {error_code}'
1970 else:
1971 fail_msg = f'Expected to fail with error: {expected_error_mode}'
1972 self.assertEqual(msg_type, expected_msg_type, fail_msg)
1974 if msg_type == KRB_ERROR:
1975 return check_error_fn(kdc_exchange_dict,
1976 callback_dict,
1977 rep)
1979 return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
1981 def as_exchange_dict(self,
1982 expected_crealm=None,
1983 expected_cname=None,
1984 expected_anon=False,
1985 expected_srealm=None,
1986 expected_sname=None,
1987 expected_supported_etypes=None,
1988 expected_flags=None,
1989 unexpected_flags=None,
1990 ticket_decryption_key=None,
1991 expect_ticket_checksum=None,
1992 generate_fast_fn=None,
1993 generate_fast_armor_fn=None,
1994 generate_fast_padata_fn=None,
1995 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
1996 generate_padata_fn=None,
1997 check_error_fn=None,
1998 check_rep_fn=None,
1999 check_kdc_private_fn=None,
2000 callback_dict=None,
2001 expected_error_mode=0,
2002 expected_status=None,
2003 client_as_etypes=None,
2004 expected_salt=None,
2005 authenticator_subkey=None,
2006 preauth_key=None,
2007 armor_key=None,
2008 armor_tgt=None,
2009 armor_subkey=None,
2010 auth_data=None,
2011 kdc_options='',
2012 inner_req=None,
2013 outer_req=None,
2014 pac_request=None,
2015 pac_options=None,
2016 expect_edata=None,
2017 expect_pac=True,
2018 expect_claims=True,
2019 to_rodc=False):
2020 if expected_error_mode == 0:
2021 expected_error_mode = ()
2022 elif not isinstance(expected_error_mode, collections.abc.Container):
2023 expected_error_mode = (expected_error_mode,)
2025 kdc_exchange_dict = {
2026 'req_msg_type': KRB_AS_REQ,
2027 'req_asn1Spec': krb5_asn1.AS_REQ,
2028 'rep_msg_type': KRB_AS_REP,
2029 'rep_asn1Spec': krb5_asn1.AS_REP,
2030 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
2031 'expected_crealm': expected_crealm,
2032 'expected_cname': expected_cname,
2033 'expected_anon': expected_anon,
2034 'expected_srealm': expected_srealm,
2035 'expected_sname': expected_sname,
2036 'expected_supported_etypes': expected_supported_etypes,
2037 'expected_flags': expected_flags,
2038 'unexpected_flags': unexpected_flags,
2039 'ticket_decryption_key': ticket_decryption_key,
2040 'expect_ticket_checksum': expect_ticket_checksum,
2041 'generate_fast_fn': generate_fast_fn,
2042 'generate_fast_armor_fn': generate_fast_armor_fn,
2043 'generate_fast_padata_fn': generate_fast_padata_fn,
2044 'fast_armor_type': fast_armor_type,
2045 'generate_padata_fn': generate_padata_fn,
2046 'check_error_fn': check_error_fn,
2047 'check_rep_fn': check_rep_fn,
2048 'check_kdc_private_fn': check_kdc_private_fn,
2049 'callback_dict': callback_dict,
2050 'expected_error_mode': expected_error_mode,
2051 'expected_status': expected_status,
2052 'client_as_etypes': client_as_etypes,
2053 'expected_salt': expected_salt,
2054 'authenticator_subkey': authenticator_subkey,
2055 'preauth_key': preauth_key,
2056 'armor_key': armor_key,
2057 'armor_tgt': armor_tgt,
2058 'armor_subkey': armor_subkey,
2059 'auth_data': auth_data,
2060 'kdc_options': kdc_options,
2061 'inner_req': inner_req,
2062 'outer_req': outer_req,
2063 'pac_request': pac_request,
2064 'pac_options': pac_options,
2065 'expect_edata': expect_edata,
2066 'expect_pac': expect_pac,
2067 'expect_claims': expect_claims,
2068 'to_rodc': to_rodc
2070 if callback_dict is None:
2071 callback_dict = {}
2073 return kdc_exchange_dict
2075 def tgs_exchange_dict(self,
2076 expected_crealm=None,
2077 expected_cname=None,
2078 expected_anon=False,
2079 expected_srealm=None,
2080 expected_sname=None,
2081 expected_supported_etypes=None,
2082 expected_flags=None,
2083 unexpected_flags=None,
2084 ticket_decryption_key=None,
2085 expect_ticket_checksum=None,
2086 generate_fast_fn=None,
2087 generate_fast_armor_fn=None,
2088 generate_fast_padata_fn=None,
2089 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2090 generate_padata_fn=None,
2091 check_error_fn=None,
2092 check_rep_fn=None,
2093 check_kdc_private_fn=None,
2094 expected_error_mode=0,
2095 expected_status=None,
2096 callback_dict=None,
2097 tgt=None,
2098 armor_key=None,
2099 armor_tgt=None,
2100 armor_subkey=None,
2101 authenticator_subkey=None,
2102 auth_data=None,
2103 body_checksum_type=None,
2104 kdc_options='',
2105 inner_req=None,
2106 outer_req=None,
2107 pac_request=None,
2108 pac_options=None,
2109 expect_edata=None,
2110 expect_pac=True,
2111 expect_claims=True,
2112 expected_proxy_target=None,
2113 expected_transited_services=None,
2114 to_rodc=False):
2115 if expected_error_mode == 0:
2116 expected_error_mode = ()
2117 elif not isinstance(expected_error_mode, collections.abc.Container):
2118 expected_error_mode = (expected_error_mode,)
2120 kdc_exchange_dict = {
2121 'req_msg_type': KRB_TGS_REQ,
2122 'req_asn1Spec': krb5_asn1.TGS_REQ,
2123 'rep_msg_type': KRB_TGS_REP,
2124 'rep_asn1Spec': krb5_asn1.TGS_REP,
2125 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
2126 'expected_crealm': expected_crealm,
2127 'expected_cname': expected_cname,
2128 'expected_anon': expected_anon,
2129 'expected_srealm': expected_srealm,
2130 'expected_sname': expected_sname,
2131 'expected_supported_etypes': expected_supported_etypes,
2132 'expected_flags': expected_flags,
2133 'unexpected_flags': unexpected_flags,
2134 'ticket_decryption_key': ticket_decryption_key,
2135 'expect_ticket_checksum': expect_ticket_checksum,
2136 'generate_fast_fn': generate_fast_fn,
2137 'generate_fast_armor_fn': generate_fast_armor_fn,
2138 'generate_fast_padata_fn': generate_fast_padata_fn,
2139 'fast_armor_type': fast_armor_type,
2140 'generate_padata_fn': generate_padata_fn,
2141 'check_error_fn': check_error_fn,
2142 'check_rep_fn': check_rep_fn,
2143 'check_kdc_private_fn': check_kdc_private_fn,
2144 'callback_dict': callback_dict,
2145 'expected_error_mode': expected_error_mode,
2146 'expected_status': expected_status,
2147 'tgt': tgt,
2148 'body_checksum_type': body_checksum_type,
2149 'armor_key': armor_key,
2150 'armor_tgt': armor_tgt,
2151 'armor_subkey': armor_subkey,
2152 'auth_data': auth_data,
2153 'authenticator_subkey': authenticator_subkey,
2154 'kdc_options': kdc_options,
2155 'inner_req': inner_req,
2156 'outer_req': outer_req,
2157 'pac_request': pac_request,
2158 'pac_options': pac_options,
2159 'expect_edata': expect_edata,
2160 'expect_pac': expect_pac,
2161 'expect_claims': expect_claims,
2162 'expected_proxy_target': expected_proxy_target,
2163 'expected_transited_services': expected_transited_services,
2164 'to_rodc': to_rodc
2166 if callback_dict is None:
2167 callback_dict = {}
2169 return kdc_exchange_dict
2171 def generic_check_kdc_rep(self,
2172 kdc_exchange_dict,
2173 callback_dict,
2174 rep):
2176 expected_crealm = kdc_exchange_dict['expected_crealm']
2177 expected_anon = kdc_exchange_dict['expected_anon']
2178 expected_srealm = kdc_exchange_dict['expected_srealm']
2179 expected_sname = kdc_exchange_dict['expected_sname']
2180 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2181 check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
2182 rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
2183 msg_type = kdc_exchange_dict['rep_msg_type']
2184 armor_key = kdc_exchange_dict['armor_key']
2186 self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
2187 padata = self.getElementValue(rep, 'padata')
2188 if self.strict_checking:
2189 self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
2190 if expected_anon:
2191 expected_cname = self.PrincipalName_create(
2192 name_type=NT_WELLKNOWN,
2193 names=['WELLKNOWN', 'ANONYMOUS'])
2194 else:
2195 expected_cname = kdc_exchange_dict['expected_cname']
2196 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2197 self.assertElementPresent(rep, 'ticket')
2198 ticket = self.getElementValue(rep, 'ticket')
2199 ticket_encpart = None
2200 ticket_cipher = None
2201 self.assertIsNotNone(ticket)
2202 if ticket is not None: # Never None, but gives indentation
2203 self.assertElementEqual(ticket, 'tkt-vno', 5)
2204 self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
2205 self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
2206 self.assertElementPresent(ticket, 'enc-part')
2207 ticket_encpart = self.getElementValue(ticket, 'enc-part')
2208 self.assertIsNotNone(ticket_encpart)
2209 if ticket_encpart is not None: # Never None, but gives indentation
2210 self.assertElementPresent(ticket_encpart, 'etype')
2211 # 'unspecified' means present, with any value != 0
2212 self.assertElementKVNO(ticket_encpart, 'kvno',
2213 self.unspecified_kvno)
2214 self.assertElementPresent(ticket_encpart, 'cipher')
2215 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
2216 self.assertElementPresent(rep, 'enc-part')
2217 encpart = self.getElementValue(rep, 'enc-part')
2218 encpart_cipher = None
2219 self.assertIsNotNone(encpart)
2220 if encpart is not None: # Never None, but gives indentation
2221 self.assertElementPresent(encpart, 'etype')
2222 self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
2223 self.assertElementPresent(encpart, 'cipher')
2224 encpart_cipher = self.getElementValue(encpart, 'cipher')
2226 ticket_checksum = None
2228 # Get the decryption key for the encrypted part
2229 encpart_decryption_key, encpart_decryption_usage = (
2230 self.get_preauth_key(kdc_exchange_dict))
2232 if armor_key is not None:
2233 pa_dict = self.get_pa_dict(padata)
2235 if PADATA_FX_FAST in pa_dict:
2236 fx_fast_data = pa_dict[PADATA_FX_FAST]
2237 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
2238 fx_fast_data,
2239 armor_key,
2240 finished=True)
2242 if 'strengthen-key' in fast_response:
2243 strengthen_key = self.EncryptionKey_import(
2244 fast_response['strengthen-key'])
2245 encpart_decryption_key = (
2246 self.generate_strengthen_reply_key(
2247 strengthen_key,
2248 encpart_decryption_key))
2250 fast_finished = fast_response.get('finished')
2251 if fast_finished is not None:
2252 ticket_checksum = fast_finished['ticket-checksum']
2254 self.check_rep_padata(kdc_exchange_dict,
2255 callback_dict,
2256 fast_response['padata'],
2257 error_code=0)
2259 ticket_private = None
2260 if ticket_decryption_key is not None:
2261 self.assertElementEqual(ticket_encpart, 'etype',
2262 ticket_decryption_key.etype)
2263 self.assertElementKVNO(ticket_encpart, 'kvno',
2264 ticket_decryption_key.kvno)
2265 ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
2266 ticket_cipher)
2267 ticket_private = self.der_decode(
2268 ticket_decpart,
2269 asn1Spec=krb5_asn1.EncTicketPart())
2271 encpart_private = None
2272 self.assertIsNotNone(encpart_decryption_key)
2273 if encpart_decryption_key is not None:
2274 self.assertElementEqual(encpart, 'etype',
2275 encpart_decryption_key.etype)
2276 if self.strict_checking:
2277 self.assertElementKVNO(encpart, 'kvno',
2278 encpart_decryption_key.kvno)
2279 rep_decpart = encpart_decryption_key.decrypt(
2280 encpart_decryption_usage,
2281 encpart_cipher)
2282 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2283 # application tag 26
2284 try:
2285 encpart_private = self.der_decode(
2286 rep_decpart,
2287 asn1Spec=rep_encpart_asn1Spec())
2288 except Exception:
2289 encpart_private = self.der_decode(
2290 rep_decpart,
2291 asn1Spec=krb5_asn1.EncTGSRepPart())
2293 self.assertIsNotNone(check_kdc_private_fn)
2294 if check_kdc_private_fn is not None:
2295 check_kdc_private_fn(kdc_exchange_dict, callback_dict,
2296 rep, ticket_private, encpart_private,
2297 ticket_checksum)
2299 return rep
2301 def check_fx_fast_data(self,
2302 kdc_exchange_dict,
2303 fx_fast_data,
2304 armor_key,
2305 finished=False,
2306 expect_strengthen_key=True):
2307 fx_fast_data = self.der_decode(fx_fast_data,
2308 asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
2310 enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
2311 self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
2313 fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
2315 fast_response = self.der_decode(fast_rep,
2316 asn1Spec=krb5_asn1.KrbFastResponse())
2318 if expect_strengthen_key and self.strict_checking:
2319 self.assertIn('strengthen-key', fast_response)
2321 if finished:
2322 self.assertIn('finished', fast_response)
2324 # Ensure that the nonce matches the nonce in the body of the request
2325 # (RFC6113 5.4.3).
2326 nonce = kdc_exchange_dict['nonce']
2327 self.assertEqual(nonce, fast_response['nonce'])
2329 return fast_response
2331 def generic_check_kdc_private(self,
2332 kdc_exchange_dict,
2333 callback_dict,
2334 rep,
2335 ticket_private,
2336 encpart_private,
2337 ticket_checksum):
2338 kdc_options = kdc_exchange_dict['kdc_options']
2339 canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
2340 canonicalize = (canon_pos < len(kdc_options)
2341 and kdc_options[canon_pos] == '1')
2342 renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1
2343 renewable = (renewable_pos < len(kdc_options)
2344 and kdc_options[renewable_pos] == '1')
2346 expected_crealm = kdc_exchange_dict['expected_crealm']
2347 expected_cname = kdc_exchange_dict['expected_cname']
2348 expected_srealm = kdc_exchange_dict['expected_srealm']
2349 expected_sname = kdc_exchange_dict['expected_sname']
2350 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2352 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2354 expected_flags = kdc_exchange_dict.get('expected_flags')
2355 unexpected_flags = kdc_exchange_dict.get('unexpected_flags')
2357 ticket = self.getElementValue(rep, 'ticket')
2359 if ticket_checksum is not None:
2360 armor_key = kdc_exchange_dict['armor_key']
2361 self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
2363 to_rodc = kdc_exchange_dict['to_rodc']
2364 if to_rodc:
2365 krbtgt_creds = self.get_rodc_krbtgt_creds()
2366 else:
2367 krbtgt_creds = self.get_krbtgt_creds()
2368 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
2370 expect_pac = kdc_exchange_dict['expect_pac']
2372 ticket_session_key = None
2373 if ticket_private is not None:
2374 self.assertElementFlags(ticket_private, 'flags',
2375 expected_flags,
2376 unexpected_flags)
2377 self.assertElementPresent(ticket_private, 'key')
2378 ticket_key = self.getElementValue(ticket_private, 'key')
2379 self.assertIsNotNone(ticket_key)
2380 if ticket_key is not None: # Never None, but gives indentation
2381 self.assertElementPresent(ticket_key, 'keytype')
2382 self.assertElementPresent(ticket_key, 'keyvalue')
2383 ticket_session_key = self.EncryptionKey_import(ticket_key)
2384 self.assertElementEqualUTF8(ticket_private, 'crealm',
2385 expected_crealm)
2386 if self.strict_checking:
2387 self.assertElementEqualPrincipal(ticket_private, 'cname',
2388 expected_cname)
2389 self.assertElementPresent(ticket_private, 'transited')
2390 self.assertElementPresent(ticket_private, 'authtime')
2391 if self.strict_checking:
2392 self.assertElementPresent(ticket_private, 'starttime')
2393 self.assertElementPresent(ticket_private, 'endtime')
2394 if renewable:
2395 if self.strict_checking:
2396 self.assertElementPresent(ticket_private, 'renew-till')
2397 else:
2398 self.assertElementMissing(ticket_private, 'renew-till')
2399 if self.strict_checking:
2400 self.assertElementEqual(ticket_private, 'caddr', [])
2401 self.assertElementPresent(ticket_private, 'authorization-data',
2402 expect_empty=not expect_pac)
2404 encpart_session_key = None
2405 if encpart_private is not None:
2406 self.assertElementPresent(encpart_private, 'key')
2407 encpart_key = self.getElementValue(encpart_private, 'key')
2408 self.assertIsNotNone(encpart_key)
2409 if encpart_key is not None: # Never None, but gives indentation
2410 self.assertElementPresent(encpart_key, 'keytype')
2411 self.assertElementPresent(encpart_key, 'keyvalue')
2412 encpart_session_key = self.EncryptionKey_import(encpart_key)
2413 self.assertElementPresent(encpart_private, 'last-req')
2414 self.assertElementEqual(encpart_private, 'nonce',
2415 kdc_exchange_dict['nonce'])
2416 if rep_msg_type == KRB_AS_REP:
2417 if self.strict_checking:
2418 self.assertElementPresent(encpart_private,
2419 'key-expiration')
2420 else:
2421 self.assertElementMissing(encpart_private,
2422 'key-expiration')
2423 self.assertElementFlags(encpart_private, 'flags',
2424 expected_flags,
2425 unexpected_flags)
2426 self.assertElementPresent(encpart_private, 'authtime')
2427 if self.strict_checking:
2428 self.assertElementPresent(encpart_private, 'starttime')
2429 self.assertElementPresent(encpart_private, 'endtime')
2430 if renewable:
2431 if self.strict_checking:
2432 self.assertElementPresent(encpart_private, 'renew-till')
2433 else:
2434 self.assertElementMissing(encpart_private, 'renew-till')
2435 self.assertElementEqualUTF8(encpart_private, 'srealm',
2436 expected_srealm)
2437 self.assertElementEqualPrincipal(encpart_private, 'sname',
2438 expected_sname)
2439 if self.strict_checking:
2440 self.assertElementEqual(encpart_private, 'caddr', [])
2442 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2444 if self.strict_checking:
2445 if canonicalize or '1' in sent_pac_options:
2446 self.assertElementPresent(encpart_private,
2447 'encrypted-pa-data')
2448 enc_pa_dict = self.get_pa_dict(
2449 encpart_private['encrypted-pa-data'])
2450 if canonicalize:
2451 self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2453 expected_supported_etypes = kdc_exchange_dict[
2454 'expected_supported_etypes']
2455 expected_supported_etypes |= (
2456 security.KERB_ENCTYPE_DES_CBC_CRC |
2457 security.KERB_ENCTYPE_DES_CBC_MD5 |
2458 security.KERB_ENCTYPE_RC4_HMAC_MD5)
2460 (supported_etypes,) = struct.unpack(
2461 '<L',
2462 enc_pa_dict[PADATA_SUPPORTED_ETYPES])
2464 self.assertEqual(supported_etypes,
2465 expected_supported_etypes)
2466 else:
2467 self.assertNotIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2469 if '1' in sent_pac_options:
2470 self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2472 pac_options = self.der_decode(
2473 enc_pa_dict[PADATA_PAC_OPTIONS],
2474 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2476 self.assertElementEqual(pac_options, 'options',
2477 sent_pac_options)
2478 else:
2479 self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2480 else:
2481 self.assertElementEqual(encpart_private,
2482 'encrypted-pa-data',
2485 if ticket_session_key is not None and encpart_session_key is not None:
2486 self.assertEqual(ticket_session_key.etype,
2487 encpart_session_key.etype)
2488 self.assertEqual(ticket_session_key.key.contents,
2489 encpart_session_key.key.contents)
2490 if encpart_session_key is not None:
2491 session_key = encpart_session_key
2492 else:
2493 session_key = ticket_session_key
2494 ticket_creds = KerberosTicketCreds(
2495 ticket,
2496 session_key,
2497 crealm=expected_crealm,
2498 cname=expected_cname,
2499 srealm=expected_srealm,
2500 sname=expected_sname,
2501 decryption_key=ticket_decryption_key,
2502 ticket_private=ticket_private,
2503 encpart_private=encpart_private)
2505 if ticket_private is not None:
2506 pac_data = self.get_ticket_pac(ticket_creds, expect_pac=expect_pac)
2507 if expect_pac:
2508 self.check_pac_buffers(pac_data, kdc_exchange_dict)
2509 else:
2510 self.assertIsNone(pac_data)
2512 expect_ticket_checksum = kdc_exchange_dict['expect_ticket_checksum']
2513 if expect_ticket_checksum:
2514 self.assertIsNotNone(ticket_decryption_key)
2516 if ticket_decryption_key is not None:
2517 self.verify_ticket(ticket_creds, krbtgt_key, expect_pac=expect_pac,
2518 expect_ticket_checksum=expect_ticket_checksum
2519 or self.tkt_sig_support)
2521 kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
2523 def check_pac_buffers(self, pac_data, kdc_exchange_dict):
2524 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
2526 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2527 armor_tgt = kdc_exchange_dict['armor_tgt']
2529 expected_sname = kdc_exchange_dict['expected_sname']
2530 expect_claims = kdc_exchange_dict['expect_claims']
2532 expected_types = [krb5pac.PAC_TYPE_LOGON_INFO,
2533 krb5pac.PAC_TYPE_SRV_CHECKSUM,
2534 krb5pac.PAC_TYPE_KDC_CHECKSUM,
2535 krb5pac.PAC_TYPE_LOGON_NAME,
2536 krb5pac.PAC_TYPE_UPN_DNS_INFO]
2538 kdc_options = kdc_exchange_dict['kdc_options']
2539 pos = len(tuple(krb5_asn1.KDCOptions('cname-in-addl-tkt'))) - 1
2540 constrained_delegation = (pos < len(kdc_options)
2541 and kdc_options[pos] == '1')
2542 if constrained_delegation:
2543 expected_types.append(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION)
2545 if self.kdc_fast_support:
2546 if expect_claims:
2547 expected_types.append(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
2549 if (rep_msg_type == KRB_TGS_REP
2550 and armor_tgt is not None):
2551 expected_types.append(krb5pac.PAC_TYPE_DEVICE_INFO)
2552 expected_types.append(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO)
2554 if not self.is_tgs(expected_sname):
2555 expected_types.append(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
2557 if self.strict_checking:
2558 buffer_types = [pac_buffer.type
2559 for pac_buffer in pac.buffers]
2560 self.assertCountEqual(expected_types, buffer_types,
2561 f'expected: {expected_types} '
2562 f'got: {buffer_types}')
2564 for pac_buffer in pac.buffers:
2565 if pac_buffer.type == krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION:
2566 expected_proxy_target = kdc_exchange_dict[
2567 'expected_proxy_target']
2568 expected_transited_services = kdc_exchange_dict[
2569 'expected_transited_services']
2571 delegation_info = pac_buffer.info.info
2573 self.assertEqual(expected_proxy_target,
2574 str(delegation_info.proxy_target))
2576 transited_services = list(map(
2577 str, delegation_info.transited_services))
2578 self.assertEqual(expected_transited_services,
2579 transited_services)
2581 elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
2582 expected_cname = kdc_exchange_dict['expected_cname']
2583 account_name = expected_cname['name-string'][0]
2585 self.assertEqual(account_name, pac_buffer.info.account_name)
2587 def generic_check_kdc_error(self,
2588 kdc_exchange_dict,
2589 callback_dict,
2590 rep,
2591 inner=False):
2593 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2595 expected_anon = kdc_exchange_dict['expected_anon']
2596 expected_srealm = kdc_exchange_dict['expected_srealm']
2597 expected_sname = kdc_exchange_dict['expected_sname']
2598 expected_error_mode = kdc_exchange_dict['expected_error_mode']
2600 sent_fast = self.sent_fast(kdc_exchange_dict)
2602 fast_armor_type = kdc_exchange_dict['fast_armor_type']
2604 self.assertElementEqual(rep, 'pvno', 5)
2605 self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
2606 error_code = self.getElementValue(rep, 'error-code')
2607 self.assertIn(error_code, expected_error_mode)
2608 if self.strict_checking:
2609 self.assertElementMissing(rep, 'ctime')
2610 self.assertElementMissing(rep, 'cusec')
2611 self.assertElementPresent(rep, 'stime')
2612 self.assertElementPresent(rep, 'susec')
2613 # error-code checked above
2614 if self.strict_checking:
2615 self.assertElementMissing(rep, 'crealm')
2616 if expected_anon and not inner:
2617 expected_cname = self.PrincipalName_create(
2618 name_type=NT_WELLKNOWN,
2619 names=['WELLKNOWN', 'ANONYMOUS'])
2620 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2621 else:
2622 self.assertElementMissing(rep, 'cname')
2623 self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
2624 self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
2625 self.assertElementMissing(rep, 'e-text')
2626 expected_status = kdc_exchange_dict['expected_status']
2627 expect_edata = kdc_exchange_dict['expect_edata']
2628 if expect_edata is None:
2629 expect_edata = (error_code != KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
2630 and (not sent_fast or fast_armor_type is None
2631 or fast_armor_type == FX_FAST_ARMOR_AP_REQUEST)
2632 and not inner)
2633 if not expect_edata:
2634 self.assertIsNone(expected_status)
2635 self.assertElementMissing(rep, 'e-data')
2636 return rep
2637 edata = self.getElementValue(rep, 'e-data')
2638 if self.strict_checking:
2639 self.assertIsNotNone(edata)
2640 if edata is not None:
2641 if rep_msg_type == KRB_TGS_REP and not sent_fast:
2642 error_data = self.der_decode(
2643 edata,
2644 asn1Spec=krb5_asn1.KERB_ERROR_DATA())
2645 self.assertEqual(KERB_ERR_TYPE_EXTENDED,
2646 error_data['data-type'])
2648 extended_error = error_data['data-value']
2650 self.assertEqual(12, len(extended_error))
2652 status = int.from_bytes(extended_error[:4], 'little')
2653 flags = int.from_bytes(extended_error[8:], 'little')
2655 self.assertEqual(expected_status, status)
2657 self.assertEqual(3, flags)
2658 else:
2659 self.assertIsNone(expected_status)
2661 rep_padata = self.der_decode(edata,
2662 asn1Spec=krb5_asn1.METHOD_DATA())
2663 self.assertGreater(len(rep_padata), 0)
2665 if sent_fast:
2666 self.assertEqual(1, len(rep_padata))
2667 rep_pa_dict = self.get_pa_dict(rep_padata)
2668 self.assertIn(PADATA_FX_FAST, rep_pa_dict)
2670 armor_key = kdc_exchange_dict['armor_key']
2671 self.assertIsNotNone(armor_key)
2672 fast_response = self.check_fx_fast_data(
2673 kdc_exchange_dict,
2674 rep_pa_dict[PADATA_FX_FAST],
2675 armor_key,
2676 expect_strengthen_key=False)
2678 rep_padata = fast_response['padata']
2680 etype_info2 = self.check_rep_padata(kdc_exchange_dict,
2681 callback_dict,
2682 rep_padata,
2683 error_code)
2685 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
2687 return rep
2689 def check_rep_padata(self,
2690 kdc_exchange_dict,
2691 callback_dict,
2692 rep_padata,
2693 error_code):
2694 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2696 req_body = kdc_exchange_dict['req_body']
2697 proposed_etypes = req_body['etype']
2698 client_as_etypes = kdc_exchange_dict.get('client_as_etypes', [])
2700 sent_fast = self.sent_fast(kdc_exchange_dict)
2701 sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict)
2703 if rep_msg_type == KRB_TGS_REP:
2704 self.assertTrue(sent_fast)
2706 expect_etype_info2 = ()
2707 expect_etype_info = False
2708 unexpect_etype_info = True
2709 expected_aes_type = 0
2710 expected_rc4_type = 0
2711 if kcrypto.Enctype.RC4 in proposed_etypes:
2712 expect_etype_info = True
2713 for etype in proposed_etypes:
2714 if etype not in client_as_etypes:
2715 continue
2716 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2717 expect_etype_info = False
2718 if etype > expected_aes_type:
2719 expected_aes_type = etype
2720 if etype in (kcrypto.Enctype.RC4,) and error_code != 0:
2721 unexpect_etype_info = False
2722 if etype > expected_rc4_type:
2723 expected_rc4_type = etype
2725 if expected_aes_type != 0:
2726 expect_etype_info2 += (expected_aes_type,)
2727 if expected_rc4_type != 0:
2728 expect_etype_info2 += (expected_rc4_type,)
2730 expected_patypes = ()
2731 if sent_fast and error_code != 0:
2732 expected_patypes += (PADATA_FX_ERROR,)
2733 expected_patypes += (PADATA_FX_COOKIE,)
2735 if rep_msg_type == KRB_TGS_REP:
2736 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2737 if ('1' in sent_pac_options
2738 and error_code not in (0, KDC_ERR_GENERIC)):
2739 expected_patypes += (PADATA_PAC_OPTIONS,)
2740 elif error_code != KDC_ERR_GENERIC:
2741 if expect_etype_info:
2742 self.assertGreater(len(expect_etype_info2), 0)
2743 expected_patypes += (PADATA_ETYPE_INFO,)
2744 if len(expect_etype_info2) != 0:
2745 expected_patypes += (PADATA_ETYPE_INFO2,)
2747 if error_code != KDC_ERR_PREAUTH_FAILED:
2748 if sent_fast:
2749 expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,)
2750 else:
2751 expected_patypes += (PADATA_ENC_TIMESTAMP,)
2753 if not sent_enc_challenge:
2754 expected_patypes += (PADATA_PK_AS_REQ,)
2755 expected_patypes += (PADATA_PK_AS_REP_19,)
2757 if (self.kdc_fast_support
2758 and not sent_fast
2759 and not sent_enc_challenge):
2760 expected_patypes += (PADATA_FX_FAST,)
2761 expected_patypes += (PADATA_FX_COOKIE,)
2763 got_patypes = tuple(pa['padata-type'] for pa in rep_padata)
2764 self.assertSequenceElementsEqual(expected_patypes, got_patypes,
2765 require_strict={PADATA_FX_COOKIE,
2766 PADATA_FX_FAST,
2767 PADATA_PAC_OPTIONS,
2768 PADATA_PK_AS_REP_19,
2769 PADATA_PK_AS_REQ})
2771 if not expected_patypes:
2772 return None
2774 pa_dict = self.get_pa_dict(rep_padata)
2776 enc_timestamp = pa_dict.get(PADATA_ENC_TIMESTAMP)
2777 if enc_timestamp is not None:
2778 self.assertEqual(len(enc_timestamp), 0)
2780 pk_as_req = pa_dict.get(PADATA_PK_AS_REQ)
2781 if pk_as_req is not None:
2782 self.assertEqual(len(pk_as_req), 0)
2784 pk_as_rep19 = pa_dict.get(PADATA_PK_AS_REP_19)
2785 if pk_as_rep19 is not None:
2786 self.assertEqual(len(pk_as_rep19), 0)
2788 fx_fast = pa_dict.get(PADATA_FX_FAST)
2789 if fx_fast is not None:
2790 self.assertEqual(len(fx_fast), 0)
2792 fast_cookie = pa_dict.get(PADATA_FX_COOKIE)
2793 if fast_cookie is not None:
2794 kdc_exchange_dict['fast_cookie'] = fast_cookie
2796 fast_error = pa_dict.get(PADATA_FX_ERROR)
2797 if fast_error is not None:
2798 fast_error = self.der_decode(fast_error,
2799 asn1Spec=krb5_asn1.KRB_ERROR())
2800 self.generic_check_kdc_error(kdc_exchange_dict,
2801 callback_dict,
2802 fast_error,
2803 inner=True)
2805 pac_options = pa_dict.get(PADATA_PAC_OPTIONS)
2806 if pac_options is not None:
2807 pac_options = self.der_decode(
2808 pac_options,
2809 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2810 self.assertElementEqual(pac_options, 'options', sent_pac_options)
2812 enc_challenge = pa_dict.get(PADATA_ENCRYPTED_CHALLENGE)
2813 if enc_challenge is not None:
2814 if not sent_enc_challenge:
2815 self.assertEqual(len(enc_challenge), 0)
2816 else:
2817 armor_key = kdc_exchange_dict['armor_key']
2818 self.assertIsNotNone(armor_key)
2820 preauth_key, _ = self.get_preauth_key(kdc_exchange_dict)
2822 kdc_challenge_key = self.generate_kdc_challenge_key(
2823 armor_key, preauth_key)
2825 # Ensure that the encrypted challenge FAST factor is supported
2826 # (RFC6113 5.4.6).
2827 if self.strict_checking:
2828 self.assertNotEqual(len(enc_challenge), 0)
2829 if len(enc_challenge) != 0:
2830 encrypted_challenge = self.der_decode(
2831 enc_challenge,
2832 asn1Spec=krb5_asn1.EncryptedData())
2833 self.assertEqual(encrypted_challenge['etype'],
2834 kdc_challenge_key.etype)
2836 challenge = kdc_challenge_key.decrypt(
2837 KU_ENC_CHALLENGE_KDC,
2838 encrypted_challenge['cipher'])
2839 challenge = self.der_decode(
2840 challenge,
2841 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
2843 # Retrieve the returned timestamp.
2844 rep_patime = challenge['patimestamp']
2845 self.assertIn('pausec', challenge)
2847 # Ensure the returned time is within five minutes of the
2848 # current time.
2849 rep_time = self.get_EpochFromKerberosTime(rep_patime)
2850 current_time = time.time()
2852 self.assertLess(current_time - 300, rep_time)
2853 self.assertLess(rep_time, current_time + 300)
2855 etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2)
2856 if etype_info2 is not None:
2857 etype_info2 = self.der_decode(etype_info2,
2858 asn1Spec=krb5_asn1.ETYPE_INFO2())
2859 self.assertGreaterEqual(len(etype_info2), 1)
2860 if self.strict_checking:
2861 self.assertEqual(len(etype_info2), len(expect_etype_info2))
2862 for i in range(0, len(etype_info2)):
2863 e = self.getElementValue(etype_info2[i], 'etype')
2864 if self.strict_checking:
2865 self.assertEqual(e, expect_etype_info2[i])
2866 salt = self.getElementValue(etype_info2[i], 'salt')
2867 if e == kcrypto.Enctype.RC4:
2868 if self.strict_checking:
2869 self.assertIsNone(salt)
2870 else:
2871 self.assertIsNotNone(salt)
2872 expected_salt = kdc_exchange_dict['expected_salt']
2873 if expected_salt is not None:
2874 self.assertEqual(salt, expected_salt)
2875 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
2876 if self.strict_checking:
2877 self.assertIsNone(s2kparams)
2879 etype_info = pa_dict.get(PADATA_ETYPE_INFO)
2880 if etype_info is not None:
2881 etype_info = self.der_decode(etype_info,
2882 asn1Spec=krb5_asn1.ETYPE_INFO())
2883 self.assertEqual(len(etype_info), 1)
2884 e = self.getElementValue(etype_info[0], 'etype')
2885 self.assertEqual(e, kcrypto.Enctype.RC4)
2886 self.assertEqual(e, expect_etype_info2[0])
2887 salt = self.getElementValue(etype_info[0], 'salt')
2888 if self.strict_checking:
2889 self.assertIsNotNone(salt)
2890 self.assertEqual(len(salt), 0)
2892 return etype_info2
2894 def generate_simple_fast(self,
2895 kdc_exchange_dict,
2896 _callback_dict,
2897 req_body,
2898 fast_padata,
2899 fast_armor,
2900 checksum,
2901 fast_options=''):
2902 armor_key = kdc_exchange_dict['armor_key']
2904 fast_req = self.KRB_FAST_REQ_create(fast_options,
2905 fast_padata,
2906 req_body)
2907 fast_req = self.der_encode(fast_req,
2908 asn1Spec=krb5_asn1.KrbFastReq())
2909 fast_req = self.EncryptedData_create(armor_key,
2910 KU_FAST_ENC,
2911 fast_req)
2913 fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
2914 checksum,
2915 fast_req)
2917 fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
2918 fx_fast_request = self.der_encode(
2919 fx_fast_request,
2920 asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
2922 fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
2923 fx_fast_request)
2925 return fast_padata
2927 def generate_ap_req(self,
2928 kdc_exchange_dict,
2929 _callback_dict,
2930 req_body,
2931 armor):
2932 if armor:
2933 tgt = kdc_exchange_dict['armor_tgt']
2934 authenticator_subkey = kdc_exchange_dict['armor_subkey']
2936 req_body_checksum = None
2937 else:
2938 tgt = kdc_exchange_dict['tgt']
2939 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2940 body_checksum_type = kdc_exchange_dict['body_checksum_type']
2942 req_body_blob = self.der_encode(req_body,
2943 asn1Spec=krb5_asn1.KDC_REQ_BODY())
2945 req_body_checksum = self.Checksum_create(tgt.session_key,
2946 KU_TGS_REQ_AUTH_CKSUM,
2947 req_body_blob,
2948 ctype=body_checksum_type)
2950 auth_data = kdc_exchange_dict['auth_data']
2952 subkey_obj = None
2953 if authenticator_subkey is not None:
2954 subkey_obj = authenticator_subkey.export_obj()
2955 seq_number = random.randint(0, 0xfffffffe)
2956 (ctime, cusec) = self.get_KerberosTimeWithUsec()
2957 authenticator_obj = self.Authenticator_create(
2958 crealm=tgt.crealm,
2959 cname=tgt.cname,
2960 cksum=req_body_checksum,
2961 cusec=cusec,
2962 ctime=ctime,
2963 subkey=subkey_obj,
2964 seq_number=seq_number,
2965 authorization_data=auth_data)
2966 authenticator_blob = self.der_encode(
2967 authenticator_obj,
2968 asn1Spec=krb5_asn1.Authenticator())
2970 usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
2971 authenticator = self.EncryptedData_create(tgt.session_key,
2972 usage,
2973 authenticator_blob)
2975 ap_options = krb5_asn1.APOptions('0')
2976 ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
2977 ticket=tgt.ticket,
2978 authenticator=authenticator)
2979 ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
2981 return ap_req
2983 def generate_simple_tgs_padata(self,
2984 kdc_exchange_dict,
2985 callback_dict,
2986 req_body):
2987 ap_req = self.generate_ap_req(kdc_exchange_dict,
2988 callback_dict,
2989 req_body,
2990 armor=False)
2991 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
2992 padata = [pa_tgs_req]
2994 return padata, req_body
2996 def get_preauth_key(self, kdc_exchange_dict):
2997 msg_type = kdc_exchange_dict['rep_msg_type']
2999 if msg_type == KRB_AS_REP:
3000 key = kdc_exchange_dict['preauth_key']
3001 usage = KU_AS_REP_ENC_PART
3002 else: # KRB_TGS_REP
3003 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
3004 if authenticator_subkey is not None:
3005 key = authenticator_subkey
3006 usage = KU_TGS_REP_ENC_PART_SUB_KEY
3007 else:
3008 tgt = kdc_exchange_dict['tgt']
3009 key = tgt.session_key
3010 usage = KU_TGS_REP_ENC_PART_SESSION
3012 self.assertIsNotNone(key)
3014 return key, usage
3016 def generate_armor_key(self, subkey, session_key):
3017 armor_key = kcrypto.cf2(subkey.key,
3018 session_key.key,
3019 b'subkeyarmor',
3020 b'ticketarmor')
3021 armor_key = Krb5EncryptionKey(armor_key, None)
3023 return armor_key
3025 def generate_strengthen_reply_key(self, strengthen_key, reply_key):
3026 strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
3027 reply_key.key,
3028 b'strengthenkey',
3029 b'replykey')
3030 strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
3031 reply_key.kvno)
3033 return strengthen_reply_key
3035 def generate_client_challenge_key(self, armor_key, longterm_key):
3036 client_challenge_key = kcrypto.cf2(armor_key.key,
3037 longterm_key.key,
3038 b'clientchallengearmor',
3039 b'challengelongterm')
3040 client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
3042 return client_challenge_key
3044 def generate_kdc_challenge_key(self, armor_key, longterm_key):
3045 kdc_challenge_key = kcrypto.cf2(armor_key.key,
3046 longterm_key.key,
3047 b'kdcchallengearmor',
3048 b'challengelongterm')
3049 kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
3051 return kdc_challenge_key
3053 def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
3054 expected_type = expected_checksum['cksumtype']
3055 self.assertEqual(armor_key.ctype, expected_type)
3057 ticket_blob = self.der_encode(ticket,
3058 asn1Spec=krb5_asn1.Ticket())
3059 checksum = self.Checksum_create(armor_key,
3060 KU_FAST_FINISHED,
3061 ticket_blob)
3062 self.assertEqual(expected_checksum, checksum)
3064 def verify_ticket(self, ticket, krbtgt_key, expect_pac=True,
3065 expect_ticket_checksum=True):
3066 # Check if the ticket is a TGT.
3067 sname = ticket.ticket['sname']
3068 is_tgt = self.is_tgs(sname)
3070 # Decrypt the ticket.
3072 key = ticket.decryption_key
3073 enc_part = ticket.ticket['enc-part']
3075 self.assertElementEqual(enc_part, 'etype', key.etype)
3076 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3078 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3079 enc_part = self.der_decode(
3080 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3082 # Fetch the authorization data from the ticket.
3083 auth_data = enc_part.get('authorization-data')
3084 if expect_pac:
3085 self.assertIsNotNone(auth_data)
3086 elif auth_data is None:
3087 return
3089 # Get a copy of the authdata with an empty PAC, and the existing PAC
3090 # (if present).
3091 empty_pac = self.get_empty_pac()
3092 auth_data, pac_data = self.replace_pac(auth_data,
3093 empty_pac,
3094 expect_pac=expect_pac)
3095 if not expect_pac:
3096 return
3098 # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
3099 # raw type to create a new PAC with zeroed signatures for
3100 # verification. This is because on Windows, the resource_groups field
3101 # is added to PAC_LOGON_INFO after the info3 field has been created,
3102 # which results in a different ordering of pointer values than Samba
3103 # (see commit 0e201ecdc53). Using the raw type avoids changing
3104 # PAC_LOGON_INFO, so verification against Windows can work. We still
3105 # need the PAC_DATA type to retrieve the actual checksums, because the
3106 # signatures in the raw type may contain padding bytes.
3107 pac = ndr_unpack(krb5pac.PAC_DATA,
3108 pac_data)
3109 raw_pac = ndr_unpack(krb5pac.PAC_DATA_RAW,
3110 pac_data)
3112 checksums = {}
3114 for pac_buffer, raw_pac_buffer in zip(pac.buffers, raw_pac.buffers):
3115 buffer_type = pac_buffer.type
3116 if buffer_type in self.pac_checksum_types:
3117 self.assertNotIn(buffer_type, checksums,
3118 f'Duplicate checksum type {buffer_type}')
3120 # Fetch the checksum and the checksum type from the PAC buffer.
3121 checksum = pac_buffer.info.signature
3122 ctype = pac_buffer.info.type
3123 if ctype & 1 << 31:
3124 ctype |= -1 << 31
3126 checksums[buffer_type] = checksum, ctype
3128 if buffer_type != krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3129 # Zero the checksum field so that we can later verify the
3130 # checksums. The ticket checksum field is not zeroed.
3132 signature = ndr_unpack(
3133 krb5pac.PAC_SIGNATURE_DATA,
3134 raw_pac_buffer.info.remaining)
3135 signature.signature = bytes(len(checksum))
3136 raw_pac_buffer.info.remaining = ndr_pack(
3137 signature)
3139 # Re-encode the PAC.
3140 pac_data = ndr_pack(raw_pac)
3142 # Verify the signatures.
3144 server_checksum, server_ctype = checksums[
3145 krb5pac.PAC_TYPE_SRV_CHECKSUM]
3146 key.verify_checksum(KU_NON_KERB_CKSUM_SALT,
3147 pac_data,
3148 server_ctype,
3149 server_checksum)
3151 kdc_checksum, kdc_ctype = checksums[
3152 krb5pac.PAC_TYPE_KDC_CHECKSUM]
3153 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
3154 server_checksum,
3155 kdc_ctype,
3156 kdc_checksum)
3158 if is_tgt:
3159 self.assertNotIn(krb5pac.PAC_TYPE_TICKET_CHECKSUM, checksums)
3160 else:
3161 ticket_checksum, ticket_ctype = checksums.get(
3162 krb5pac.PAC_TYPE_TICKET_CHECKSUM,
3163 (None, None))
3164 if expect_ticket_checksum:
3165 self.assertIsNotNone(ticket_checksum)
3166 elif expect_ticket_checksum is False:
3167 self.assertIsNone(ticket_checksum)
3168 if ticket_checksum is not None:
3169 enc_part['authorization-data'] = auth_data
3170 enc_part = self.der_encode(enc_part,
3171 asn1Spec=krb5_asn1.EncTicketPart())
3173 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
3174 enc_part,
3175 ticket_ctype,
3176 ticket_checksum)
3178 def modified_ticket(self,
3179 ticket, *,
3180 new_ticket_key=None,
3181 modify_fn=None,
3182 modify_pac_fn=None,
3183 exclude_pac=False,
3184 update_pac_checksums=True,
3185 checksum_keys=None,
3186 include_checksums=None):
3187 if checksum_keys is None:
3188 # A dict containing a key for each checksum type to be created in
3189 # the PAC.
3190 checksum_keys = {}
3192 if include_checksums is None:
3193 # A dict containing a value for each checksum type; True if the
3194 # checksum type is to be included in the PAC, False if it is to be
3195 # excluded, or None/not present if the checksum is to be included
3196 # based on its presence in the original PAC.
3197 include_checksums = {}
3199 # Check that the values passed in by the caller make sense.
3201 self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types)
3202 self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types)
3204 if exclude_pac:
3205 self.assertIsNone(modify_pac_fn)
3207 update_pac_checksums = False
3209 if not update_pac_checksums:
3210 self.assertFalse(checksum_keys)
3211 self.assertFalse(include_checksums)
3213 expect_pac = update_pac_checksums or modify_pac_fn is not None
3215 key = ticket.decryption_key
3217 if new_ticket_key is None:
3218 # Use the same key to re-encrypt the ticket.
3219 new_ticket_key = key
3221 if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys:
3222 # If the server signature key is not present, fall back to the key
3223 # used to encrypt the ticket.
3224 checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key
3226 if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys:
3227 # If the ticket signature key is not present, fall back to the key
3228 # used for the KDC signature.
3229 kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
3230 if kdc_checksum_key is not None:
3231 checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = (
3232 kdc_checksum_key)
3234 # Decrypt the ticket.
3236 enc_part = ticket.ticket['enc-part']
3238 self.assertElementEqual(enc_part, 'etype', key.etype)
3239 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3241 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3242 enc_part = self.der_decode(
3243 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3245 # Modify the ticket here.
3246 if modify_fn is not None:
3247 enc_part = modify_fn(enc_part)
3249 auth_data = enc_part.get('authorization-data')
3250 if expect_pac:
3251 self.assertIsNotNone(auth_data)
3252 if auth_data is not None:
3253 new_pac = None
3254 if not exclude_pac:
3255 # Get a copy of the authdata with an empty PAC, and the
3256 # existing PAC (if present).
3257 empty_pac = self.get_empty_pac()
3258 empty_pac_auth_data, pac_data = self.replace_pac(
3259 auth_data,
3260 empty_pac,
3261 expect_pac=expect_pac)
3263 if pac_data is not None:
3264 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
3266 # Modify the PAC here.
3267 if modify_pac_fn is not None:
3268 pac = modify_pac_fn(pac)
3270 if update_pac_checksums:
3271 # Get the enc-part with an empty PAC, which is needed
3272 # to create a ticket signature.
3273 enc_part_to_sign = enc_part.copy()
3274 enc_part_to_sign['authorization-data'] = (
3275 empty_pac_auth_data)
3276 enc_part_to_sign = self.der_encode(
3277 enc_part_to_sign,
3278 asn1Spec=krb5_asn1.EncTicketPart())
3280 self.update_pac_checksums(pac,
3281 checksum_keys,
3282 include_checksums,
3283 enc_part_to_sign)
3285 # Re-encode the PAC.
3286 pac_data = ndr_pack(pac)
3287 new_pac = self.AuthorizationData_create(AD_WIN2K_PAC,
3288 pac_data)
3290 # Replace the PAC in the authorization data and re-add it to the
3291 # ticket enc-part.
3292 auth_data, _ = self.replace_pac(auth_data, new_pac,
3293 expect_pac=expect_pac)
3294 enc_part['authorization-data'] = auth_data
3296 # Re-encrypt the ticket enc-part with the new key.
3297 enc_part_new = self.der_encode(enc_part,
3298 asn1Spec=krb5_asn1.EncTicketPart())
3299 enc_part_new = self.EncryptedData_create(new_ticket_key,
3300 KU_TICKET,
3301 enc_part_new)
3303 # Create a copy of the ticket with the new enc-part.
3304 new_ticket = ticket.ticket.copy()
3305 new_ticket['enc-part'] = enc_part_new
3307 new_ticket_creds = KerberosTicketCreds(
3308 new_ticket,
3309 session_key=ticket.session_key,
3310 crealm=ticket.crealm,
3311 cname=ticket.cname,
3312 srealm=ticket.srealm,
3313 sname=ticket.sname,
3314 decryption_key=new_ticket_key,
3315 ticket_private=enc_part,
3316 encpart_private=ticket.encpart_private)
3318 return new_ticket_creds
3320 def update_pac_checksums(self,
3321 pac,
3322 checksum_keys,
3323 include_checksums,
3324 enc_part=None):
3325 pac_buffers = pac.buffers
3326 checksum_buffers = {}
3328 # Find the relevant PAC checksum buffers.
3329 for pac_buffer in pac_buffers:
3330 buffer_type = pac_buffer.type
3331 if buffer_type in self.pac_checksum_types:
3332 self.assertNotIn(buffer_type, checksum_buffers,
3333 f'Duplicate checksum type {buffer_type}')
3335 checksum_buffers[buffer_type] = pac_buffer
3337 # Create any additional buffers that were requested but not
3338 # present. Conversely, remove any buffers that were requested to be
3339 # removed.
3340 for buffer_type in self.pac_checksum_types:
3341 if buffer_type in checksum_buffers:
3342 if include_checksums.get(buffer_type) is False:
3343 checksum_buffer = checksum_buffers.pop(buffer_type)
3345 pac.num_buffers -= 1
3346 pac_buffers.remove(checksum_buffer)
3348 elif include_checksums.get(buffer_type) is True:
3349 info = krb5pac.PAC_SIGNATURE_DATA()
3351 checksum_buffer = krb5pac.PAC_BUFFER()
3352 checksum_buffer.type = buffer_type
3353 checksum_buffer.info = info
3355 pac_buffers.append(checksum_buffer)
3356 pac.num_buffers += 1
3358 checksum_buffers[buffer_type] = checksum_buffer
3360 # Fill the relevant checksum buffers.
3361 for buffer_type, checksum_buffer in checksum_buffers.items():
3362 checksum_key = checksum_keys[buffer_type]
3363 ctype = checksum_key.ctype & ((1 << 32) - 1)
3365 if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3366 self.assertIsNotNone(enc_part)
3368 signature = checksum_key.make_rodc_checksum(
3369 KU_NON_KERB_CKSUM_SALT,
3370 enc_part)
3372 elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM:
3373 signature = checksum_key.make_zeroed_checksum()
3375 else:
3376 signature = checksum_key.make_rodc_zeroed_checksum()
3378 checksum_buffer.info.signature = signature
3379 checksum_buffer.info.type = ctype
3381 # Add the new checksum buffers to the PAC.
3382 pac.buffers = pac_buffers
3384 # Calculate the server and KDC checksums and insert them into the PAC.
3386 server_checksum_buffer = checksum_buffers.get(
3387 krb5pac.PAC_TYPE_SRV_CHECKSUM)
3388 if server_checksum_buffer is not None:
3389 server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM]
3391 pac_data = ndr_pack(pac)
3392 server_checksum = server_checksum_key.make_checksum(
3393 KU_NON_KERB_CKSUM_SALT,
3394 pac_data)
3396 server_checksum_buffer.info.signature = server_checksum
3398 kdc_checksum_buffer = checksum_buffers.get(
3399 krb5pac.PAC_TYPE_KDC_CHECKSUM)
3400 if kdc_checksum_buffer is not None:
3401 if server_checksum_buffer is None:
3402 # There's no server signature to make the checksum over, so
3403 # just make the checksum over an empty bytes object.
3404 server_checksum = bytes()
3406 kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM]
3408 kdc_checksum = kdc_checksum_key.make_rodc_checksum(
3409 KU_NON_KERB_CKSUM_SALT,
3410 server_checksum)
3412 kdc_checksum_buffer.info.signature = kdc_checksum
3414 def replace_pac(self, auth_data, new_pac, expect_pac=True):
3415 if new_pac is not None:
3416 self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)
3417 self.assertElementPresent(new_pac, 'ad-data')
3419 new_auth_data = []
3421 ad_relevant = None
3422 old_pac = None
3424 for authdata_elem in auth_data:
3425 if authdata_elem['ad-type'] == AD_IF_RELEVANT:
3426 ad_relevant = self.der_decode(
3427 authdata_elem['ad-data'],
3428 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3430 relevant_elems = []
3431 for relevant_elem in ad_relevant:
3432 if relevant_elem['ad-type'] == AD_WIN2K_PAC:
3433 self.assertIsNone(old_pac, 'Multiple PACs detected')
3434 old_pac = relevant_elem['ad-data']
3436 if new_pac is not None:
3437 relevant_elems.append(new_pac)
3438 else:
3439 relevant_elems.append(relevant_elem)
3440 if expect_pac:
3441 self.assertIsNotNone(old_pac, 'Expected PAC')
3443 if relevant_elems:
3444 ad_relevant = self.der_encode(
3445 relevant_elems,
3446 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3448 authdata_elem = self.AuthorizationData_create(
3449 AD_IF_RELEVANT,
3450 ad_relevant)
3451 else:
3452 authdata_elem = None
3454 if authdata_elem is not None:
3455 new_auth_data.append(authdata_elem)
3457 if expect_pac:
3458 self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT')
3460 return new_auth_data, old_pac
3462 def get_pac(self, auth_data, expect_pac=True):
3463 _, pac = self.replace_pac(auth_data, None, expect_pac)
3464 return pac
3466 def get_ticket_pac(self, ticket, expect_pac=True):
3467 auth_data = ticket.ticket_private.get('authorization-data')
3468 if expect_pac:
3469 self.assertIsNotNone(auth_data)
3470 elif auth_data is None:
3471 return None
3473 return self.get_pac(auth_data, expect_pac=expect_pac)
3475 def get_krbtgt_checksum_key(self):
3476 krbtgt_creds = self.get_krbtgt_creds()
3477 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
3479 return {
3480 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
3483 def is_tgs(self, principal):
3484 name = principal['name-string'][0]
3485 return name in ('krbtgt', b'krbtgt')
3487 def get_empty_pac(self):
3488 return self.AuthorizationData_create(AD_WIN2K_PAC, bytes(1))
3490 def get_outer_pa_dict(self, kdc_exchange_dict):
3491 return self.get_pa_dict(kdc_exchange_dict['req_padata'])
3493 def get_fast_pa_dict(self, kdc_exchange_dict):
3494 req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
3496 if req_pa_dict:
3497 return req_pa_dict
3499 return self.get_outer_pa_dict(kdc_exchange_dict)
3501 def sent_fast(self, kdc_exchange_dict):
3502 outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
3504 return PADATA_FX_FAST in outer_pa_dict
3506 def sent_enc_challenge(self, kdc_exchange_dict):
3507 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
3509 return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
3511 def get_sent_pac_options(self, kdc_exchange_dict):
3512 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
3514 if PADATA_PAC_OPTIONS not in fast_pa_dict:
3515 return ''
3517 pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
3518 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
3519 pac_options = pac_options['options']
3521 # Mask out unsupported bits.
3522 pac_options, remaining = pac_options[:4], pac_options[4:]
3523 pac_options += '0' * len(remaining)
3525 return pac_options
3527 def get_krbtgt_sname(self):
3528 krbtgt_creds = self.get_krbtgt_creds()
3529 krbtgt_username = krbtgt_creds.get_username()
3530 krbtgt_realm = krbtgt_creds.get_realm()
3531 krbtgt_sname = self.PrincipalName_create(
3532 name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm])
3534 return krbtgt_sname
3536 def _test_as_exchange(self,
3537 cname,
3538 realm,
3539 sname,
3540 till,
3541 client_as_etypes,
3542 expected_error_mode,
3543 expected_crealm,
3544 expected_cname,
3545 expected_srealm,
3546 expected_sname,
3547 expected_salt,
3548 etypes,
3549 padata,
3550 kdc_options,
3551 expected_flags=None,
3552 unexpected_flags=None,
3553 expected_supported_etypes=None,
3554 preauth_key=None,
3555 ticket_decryption_key=None,
3556 pac_request=None,
3557 pac_options=None,
3558 expect_pac=True,
3559 to_rodc=False):
3561 def _generate_padata_copy(_kdc_exchange_dict,
3562 _callback_dict,
3563 req_body):
3564 return padata, req_body
3566 if not expected_error_mode:
3567 check_error_fn = None
3568 check_rep_fn = self.generic_check_kdc_rep
3569 else:
3570 check_error_fn = self.generic_check_kdc_error
3571 check_rep_fn = None
3573 if padata is not None:
3574 generate_padata_fn = _generate_padata_copy
3575 else:
3576 generate_padata_fn = None
3578 kdc_exchange_dict = self.as_exchange_dict(
3579 expected_crealm=expected_crealm,
3580 expected_cname=expected_cname,
3581 expected_srealm=expected_srealm,
3582 expected_sname=expected_sname,
3583 expected_supported_etypes=expected_supported_etypes,
3584 ticket_decryption_key=ticket_decryption_key,
3585 generate_padata_fn=generate_padata_fn,
3586 check_error_fn=check_error_fn,
3587 check_rep_fn=check_rep_fn,
3588 check_kdc_private_fn=self.generic_check_kdc_private,
3589 expected_error_mode=expected_error_mode,
3590 client_as_etypes=client_as_etypes,
3591 expected_salt=expected_salt,
3592 expected_flags=expected_flags,
3593 unexpected_flags=unexpected_flags,
3594 preauth_key=preauth_key,
3595 kdc_options=str(kdc_options),
3596 pac_request=pac_request,
3597 pac_options=pac_options,
3598 expect_pac=expect_pac,
3599 to_rodc=to_rodc)
3601 rep = self._generic_kdc_exchange(kdc_exchange_dict,
3602 cname=cname,
3603 realm=realm,
3604 sname=sname,
3605 till_time=till,
3606 etypes=etypes)
3608 return rep, kdc_exchange_dict