tests/krb5: Fix PA-PAC-OPTIONS checking
[Samba.git] / python / samba / tests / krb5 / raw_testcase.py
blob0217674ed2de58c16bd5fe82ad71e2e2bbe9bde4
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import socket
21 import struct
22 import time
23 import datetime
24 import random
25 import binascii
26 import itertools
27 import collections
29 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
30 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
31 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
32 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
34 from pyasn1.codec.ber.encoder import BitStringEncoder
36 from samba.credentials import Credentials
37 from samba.dcerpc import krb5pac, security
38 from samba.gensec import FEATURE_SEAL
39 from samba.ndr import ndr_pack, ndr_unpack
41 import samba.tests
42 from samba.tests import TestCaseInTempDir
44 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
45 from samba.tests.krb5.rfc4120_constants import (
46 AD_IF_RELEVANT,
47 AD_WIN2K_PAC,
48 FX_FAST_ARMOR_AP_REQUEST,
49 KDC_ERR_GENERIC,
50 KDC_ERR_PREAUTH_FAILED,
51 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
52 KRB_AP_REQ,
53 KRB_AS_REP,
54 KRB_AS_REQ,
55 KRB_ERROR,
56 KRB_TGS_REP,
57 KRB_TGS_REQ,
58 KU_AP_REQ_AUTH,
59 KU_AS_REP_ENC_PART,
60 KU_ENC_CHALLENGE_KDC,
61 KU_FAST_ENC,
62 KU_FAST_FINISHED,
63 KU_FAST_REP,
64 KU_FAST_REQ_CHKSUM,
65 KU_NON_KERB_CKSUM_SALT,
66 KU_TGS_REP_ENC_PART_SESSION,
67 KU_TGS_REP_ENC_PART_SUB_KEY,
68 KU_TGS_REQ_AUTH,
69 KU_TGS_REQ_AUTH_CKSUM,
70 KU_TGS_REQ_AUTH_DAT_SESSION,
71 KU_TGS_REQ_AUTH_DAT_SUBKEY,
72 KU_TICKET,
73 NT_SRV_INST,
74 NT_WELLKNOWN,
75 PADATA_ENCRYPTED_CHALLENGE,
76 PADATA_ENC_TIMESTAMP,
77 PADATA_ETYPE_INFO,
78 PADATA_ETYPE_INFO2,
79 PADATA_FOR_USER,
80 PADATA_FX_COOKIE,
81 PADATA_FX_ERROR,
82 PADATA_FX_FAST,
83 PADATA_KDC_REQ,
84 PADATA_PAC_OPTIONS,
85 PADATA_PAC_REQUEST,
86 PADATA_PK_AS_REQ,
87 PADATA_PK_AS_REP_19,
88 PADATA_PW_SALT,
89 PADATA_SUPPORTED_ETYPES
91 import samba.tests.krb5.kcrypto as kcrypto
94 def BitStringEncoder_encodeValue32(
95 self, value, asn1Spec, encodeFun, **options):
97 # BitStrings like KDCOptions or TicketFlags should at least
98 # be 32-Bit on the wire
100 if asn1Spec is not None:
101 # TODO: try to avoid ASN.1 schema instantiation
102 value = asn1Spec.clone(value)
104 valueLength = len(value)
105 if valueLength % 8:
106 alignedValue = value << (8 - valueLength % 8)
107 else:
108 alignedValue = value
110 substrate = alignedValue.asOctets()
111 length = len(substrate)
112 # We need at least 32-Bit / 4-Bytes
113 if length < 4:
114 padding = 4 - length
115 else:
116 padding = 0
117 ret = b'\x00' + substrate + (b'\x00' * padding)
118 return ret, False, True
121 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
124 def BitString_NamedValues_prettyPrint(self, scope=0):
125 ret = "%s" % self.asBinary()
126 bits = []
127 highest_bit = 32
128 for byte in self.asNumbers():
129 for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
130 mask = 1 << bit
131 if byte & mask:
132 val = 1
133 else:
134 val = 0
135 bits.append(val)
136 if len(bits) < highest_bit:
137 for bitPosition in range(len(bits), highest_bit):
138 bits.append(0)
139 indent = " " * scope
140 delim = ": (\n%s " % indent
141 for bitPosition in range(highest_bit):
142 if bitPosition in self.prettyPrintNamedValues:
143 name = self.prettyPrintNamedValues[bitPosition]
144 elif bits[bitPosition] != 0:
145 name = "unknown-bit-%u" % bitPosition
146 else:
147 continue
148 ret += "%s%s:%u" % (delim, name, bits[bitPosition])
149 delim = ",\n%s " % indent
150 ret += "\n%s)" % indent
151 return ret
154 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
155 krb5_asn1.TicketFlagsValues.namedValues
156 krb5_asn1.TicketFlags.namedValues =\
157 krb5_asn1.TicketFlagsValues.namedValues
158 krb5_asn1.TicketFlags.prettyPrint =\
159 BitString_NamedValues_prettyPrint
160 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
161 krb5_asn1.KDCOptionsValues.namedValues
162 krb5_asn1.KDCOptions.namedValues =\
163 krb5_asn1.KDCOptionsValues.namedValues
164 krb5_asn1.KDCOptions.prettyPrint =\
165 BitString_NamedValues_prettyPrint
166 krb5_asn1.APOptions.prettyPrintNamedValues =\
167 krb5_asn1.APOptionsValues.namedValues
168 krb5_asn1.APOptions.namedValues =\
169 krb5_asn1.APOptionsValues.namedValues
170 krb5_asn1.APOptions.prettyPrint =\
171 BitString_NamedValues_prettyPrint
172 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
173 krb5_asn1.PACOptionFlagsValues.namedValues
174 krb5_asn1.PACOptionFlags.namedValues =\
175 krb5_asn1.PACOptionFlagsValues.namedValues
176 krb5_asn1.PACOptionFlags.prettyPrint =\
177 BitString_NamedValues_prettyPrint
180 def Integer_NamedValues_prettyPrint(self, scope=0):
181 intval = int(self)
182 if intval in self.prettyPrintNamedValues:
183 name = self.prettyPrintNamedValues[intval]
184 else:
185 name = "<__unknown__>"
186 ret = "%d (0x%x) %s" % (intval, intval, name)
187 return ret
190 krb5_asn1.NameType.prettyPrintNamedValues =\
191 krb5_asn1.NameTypeValues.namedValues
192 krb5_asn1.NameType.prettyPrint =\
193 Integer_NamedValues_prettyPrint
194 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
195 krb5_asn1.AuthDataTypeValues.namedValues
196 krb5_asn1.AuthDataType.prettyPrint =\
197 Integer_NamedValues_prettyPrint
198 krb5_asn1.PADataType.prettyPrintNamedValues =\
199 krb5_asn1.PADataTypeValues.namedValues
200 krb5_asn1.PADataType.prettyPrint =\
201 Integer_NamedValues_prettyPrint
202 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
203 krb5_asn1.EncryptionTypeValues.namedValues
204 krb5_asn1.EncryptionType.prettyPrint =\
205 Integer_NamedValues_prettyPrint
206 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
207 krb5_asn1.ChecksumTypeValues.namedValues
208 krb5_asn1.ChecksumType.prettyPrint =\
209 Integer_NamedValues_prettyPrint
210 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
211 krb5_asn1.KerbErrorDataTypeValues.namedValues
212 krb5_asn1.KerbErrorDataType.prettyPrint =\
213 Integer_NamedValues_prettyPrint
216 class Krb5EncryptionKey:
217 def __init__(self, key, kvno):
218 EncTypeChecksum = {
219 kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
220 kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
221 kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
223 self.key = key
224 self.etype = key.enctype
225 self.ctype = EncTypeChecksum[self.etype]
226 self.kvno = kvno
228 def encrypt(self, usage, plaintext):
229 ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
230 return ciphertext
232 def decrypt(self, usage, ciphertext):
233 plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
234 return plaintext
236 def make_zeroed_checksum(self, ctype=None):
237 if ctype is None:
238 ctype = self.ctype
240 checksum_len = kcrypto.checksum_len(ctype)
241 return bytes(checksum_len)
243 def make_checksum(self, usage, plaintext, ctype=None):
244 if ctype is None:
245 ctype = self.ctype
246 cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
247 return cksum
249 def verify_checksum(self, usage, plaintext, ctype, cksum):
250 if self.ctype != ctype:
251 raise AssertionError(f'{self.ctype} != {ctype}')
253 kcrypto.verify_checksum(ctype,
254 self.key,
255 usage,
256 plaintext,
257 cksum)
259 def export_obj(self):
260 EncryptionKey_obj = {
261 'keytype': self.etype,
262 'keyvalue': self.key.contents,
264 return EncryptionKey_obj
267 class RodcPacEncryptionKey(Krb5EncryptionKey):
268 def __init__(self, key, kvno, rodc_id=None):
269 super().__init__(key, kvno)
271 if rodc_id is None:
272 kvno = self.kvno
273 if kvno is not None:
274 kvno >>= 16
275 kvno &= (1 << 16) - 1
277 rodc_id = kvno or None
279 if rodc_id is not None:
280 self.rodc_id = rodc_id.to_bytes(2, byteorder='little')
281 else:
282 self.rodc_id = b''
284 def make_zeroed_checksum(self, ctype=None):
285 checksum = super().make_zeroed_checksum(ctype)
286 return checksum + bytes(len(self.rodc_id))
288 def make_checksum(self, usage, plaintext, ctype=None):
289 checksum = super().make_checksum(usage, plaintext, ctype)
290 return checksum + self.rodc_id
292 def verify_checksum(self, usage, plaintext, ctype, cksum):
293 if self.rodc_id:
294 cksum, cksum_rodc_id = cksum[:-2], cksum[-2:]
296 if self.rodc_id != cksum_rodc_id:
297 raise AssertionError(f'{self.rodc_id.hex()} != '
298 f'{cksum_rodc_id.hex()}')
300 super().verify_checksum(usage,
301 plaintext,
302 ctype,
303 cksum)
306 class ZeroedChecksumKey(Krb5EncryptionKey):
307 def make_checksum(self, usage, plaintext, ctype=None):
308 return self.make_zeroed_checksum(ctype)
311 class WrongLengthChecksumKey(Krb5EncryptionKey):
312 def __init__(self, key, kvno, length):
313 super().__init__(key, kvno)
315 self._length = length
317 def make_checksum(self, usage, plaintext, ctype=None):
318 checksum = super().make_checksum(usage, plaintext, ctype)
320 diff = self._length - len(checksum)
321 if diff > 0:
322 checksum += bytes(diff)
323 elif diff < 0:
324 checksum = checksum[:self._length]
326 return checksum
329 class KerberosCredentials(Credentials):
331 fast_supported_bits = (security.KERB_ENCTYPE_FAST_SUPPORTED |
332 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
333 security.KERB_ENCTYPE_CLAIMS_SUPPORTED)
335 def __init__(self):
336 super(KerberosCredentials, self).__init__()
337 all_enc_types = 0
338 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
339 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
340 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
342 self.as_supported_enctypes = all_enc_types
343 self.tgs_supported_enctypes = all_enc_types
344 self.ap_supported_enctypes = all_enc_types
346 self.kvno = None
347 self.forced_keys = {}
349 self.forced_salt = None
351 self.dn = None
353 def set_as_supported_enctypes(self, value):
354 self.as_supported_enctypes = int(value)
356 def set_tgs_supported_enctypes(self, value):
357 self.tgs_supported_enctypes = int(value)
359 def set_ap_supported_enctypes(self, value):
360 self.ap_supported_enctypes = int(value)
362 etype_map = collections.OrderedDict([
363 (kcrypto.Enctype.AES256,
364 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96),
365 (kcrypto.Enctype.AES128,
366 security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96),
367 (kcrypto.Enctype.RC4,
368 security.KERB_ENCTYPE_RC4_HMAC_MD5),
369 (kcrypto.Enctype.DES_MD5,
370 security.KERB_ENCTYPE_DES_CBC_MD5),
371 (kcrypto.Enctype.DES_CRC,
372 security.KERB_ENCTYPE_DES_CBC_CRC)
375 @classmethod
376 def etypes_to_bits(cls, etypes):
377 bits = 0
378 for etype in etypes:
379 bit = cls.etype_map[etype]
380 if bits & bit:
381 raise ValueError(f'Got duplicate etype: {etype}')
382 bits |= bit
384 return bits
386 @classmethod
387 def bits_to_etypes(cls, bits):
388 etypes = ()
389 for etype, bit in cls.etype_map.items():
390 if bit & bits:
391 bits &= ~bit
392 etypes += (etype,)
394 bits &= ~cls.fast_supported_bits
395 if bits != 0:
396 raise ValueError(f'Unsupported etype bits: {bits}')
398 return etypes
400 def get_as_krb5_etypes(self):
401 return self.bits_to_etypes(self.as_supported_enctypes)
403 def get_tgs_krb5_etypes(self):
404 return self.bits_to_etypes(self.tgs_supported_enctypes)
406 def get_ap_krb5_etypes(self):
407 return self.bits_to_etypes(self.ap_supported_enctypes)
409 def set_kvno(self, kvno):
410 # Sign-extend from 32 bits.
411 if kvno & 1 << 31:
412 kvno |= -1 << 31
413 self.kvno = kvno
415 def get_kvno(self):
416 return self.kvno
418 def set_forced_key(self, etype, hexkey):
419 etype = int(etype)
420 contents = binascii.a2b_hex(hexkey)
421 key = kcrypto.Key(etype, contents)
422 self.forced_keys[etype] = RodcPacEncryptionKey(key, self.kvno)
424 def get_forced_key(self, etype):
425 etype = int(etype)
426 return self.forced_keys.get(etype)
428 def set_forced_salt(self, salt):
429 self.forced_salt = bytes(salt)
431 def get_forced_salt(self):
432 return self.forced_salt
434 def get_salt(self):
435 if self.forced_salt is not None:
436 return self.forced_salt
438 if self.get_workstation():
439 salt_string = '%shost%s.%s' % (
440 self.get_realm().upper(),
441 self.get_username().lower().rsplit('$', 1)[0],
442 self.get_realm().lower())
443 else:
444 salt_string = self.get_realm().upper() + self.get_username()
446 return salt_string.encode('utf-8')
448 def set_dn(self, dn):
449 self.dn = dn
451 def get_dn(self):
452 return self.dn
455 class KerberosTicketCreds:
456 def __init__(self, ticket, session_key,
457 crealm=None, cname=None,
458 srealm=None, sname=None,
459 decryption_key=None,
460 ticket_private=None,
461 encpart_private=None):
462 self.ticket = ticket
463 self.session_key = session_key
464 self.crealm = crealm
465 self.cname = cname
466 self.srealm = srealm
467 self.sname = sname
468 self.decryption_key = decryption_key
469 self.ticket_private = ticket_private
470 self.encpart_private = encpart_private
473 class RawKerberosTest(TestCaseInTempDir):
474 """A raw Kerberos Test case."""
476 pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
477 krb5pac.PAC_TYPE_KDC_CHECKSUM,
478 krb5pac.PAC_TYPE_TICKET_CHECKSUM}
480 etypes_to_test = (
481 {"value": -1111, "name": "dummy", },
482 {"value": kcrypto.Enctype.AES256, "name": "aes128", },
483 {"value": kcrypto.Enctype.AES128, "name": "aes256", },
484 {"value": kcrypto.Enctype.RC4, "name": "rc4", },
487 setup_etype_test_permutations_done = False
489 @classmethod
490 def setup_etype_test_permutations(cls):
491 if cls.setup_etype_test_permutations_done:
492 return
494 res = []
496 num_idxs = len(cls.etypes_to_test)
497 permutations = []
498 for num in range(1, num_idxs + 1):
499 chunk = list(itertools.permutations(range(num_idxs), num))
500 for e in chunk:
501 el = list(e)
502 permutations.append(el)
504 for p in permutations:
505 name = None
506 etypes = ()
507 for idx in p:
508 n = cls.etypes_to_test[idx]["name"]
509 if name is None:
510 name = n
511 else:
512 name += "_%s" % n
513 etypes += (cls.etypes_to_test[idx]["value"],)
515 r = {"name": name, "etypes": etypes, }
516 res.append(r)
518 cls.etype_test_permutations = res
519 cls.setup_etype_test_permutations_done = True
521 @classmethod
522 def etype_test_permutation_name_idx(cls):
523 cls.setup_etype_test_permutations()
524 res = []
525 idx = 0
526 for e in cls.etype_test_permutations:
527 r = (e['name'], idx)
528 idx += 1
529 res.append(r)
530 return res
532 def etype_test_permutation_by_idx(self, idx):
533 e = self.etype_test_permutations[idx]
534 return (e['name'], e['etypes'])
536 @classmethod
537 def setUpClass(cls):
538 super().setUpClass()
540 cls.host = samba.tests.env_get_var_value('SERVER')
541 cls.dc_host = samba.tests.env_get_var_value('DC_SERVER')
543 # A dictionary containing credentials that have already been
544 # obtained.
545 cls.creds_dict = {}
547 cls.kdc_fast_support = False
549 def setUp(self):
550 super().setUp()
551 self.do_asn1_print = False
552 self.do_hexdump = False
554 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
555 allow_missing=True)
556 if strict_checking is None:
557 strict_checking = '1'
558 self.strict_checking = bool(int(strict_checking))
560 self.s = None
562 self.unspecified_kvno = object()
564 def tearDown(self):
565 self._disconnect("tearDown")
566 super().tearDown()
568 def _disconnect(self, reason):
569 if self.s is None:
570 return
571 self.s.close()
572 self.s = None
573 if self.do_hexdump:
574 sys.stderr.write("disconnect[%s]\n" % reason)
576 def _connect_tcp(self, host):
577 tcp_port = 88
578 try:
579 self.a = socket.getaddrinfo(host, tcp_port, socket.AF_UNSPEC,
580 socket.SOCK_STREAM, socket.SOL_TCP,
582 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
583 self.s.settimeout(10)
584 self.s.connect(self.a[0][4])
585 except socket.error:
586 self.s.close()
587 raise
588 except IOError:
589 self.s.close()
590 raise
592 def connect(self, host):
593 self.assertNotConnected()
594 self._connect_tcp(host)
595 if self.do_hexdump:
596 sys.stderr.write("connected[%s]\n" % host)
598 def env_get_var(self, varname, prefix,
599 fallback_default=True,
600 allow_missing=False):
601 val = None
602 if prefix is not None:
603 allow_missing_prefix = allow_missing or fallback_default
604 val = samba.tests.env_get_var_value(
605 '%s_%s' % (prefix, varname),
606 allow_missing=allow_missing_prefix)
607 else:
608 fallback_default = True
609 if val is None and fallback_default:
610 val = samba.tests.env_get_var_value(varname,
611 allow_missing=allow_missing)
612 return val
614 def _get_krb5_creds_from_env(self, prefix,
615 default_username=None,
616 allow_missing_password=False,
617 allow_missing_keys=True,
618 require_strongest_key=False):
619 c = KerberosCredentials()
620 c.guess()
622 domain = self.env_get_var('DOMAIN', prefix)
623 realm = self.env_get_var('REALM', prefix)
624 allow_missing_username = default_username is not None
625 username = self.env_get_var('USERNAME', prefix,
626 fallback_default=False,
627 allow_missing=allow_missing_username)
628 if username is None:
629 username = default_username
630 password = self.env_get_var('PASSWORD', prefix,
631 fallback_default=False,
632 allow_missing=allow_missing_password)
633 c.set_domain(domain)
634 c.set_realm(realm)
635 c.set_username(username)
636 if password is not None:
637 c.set_password(password)
638 as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
639 prefix, allow_missing=True)
640 if as_supported_enctypes is not None:
641 c.set_as_supported_enctypes(as_supported_enctypes)
642 tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
643 prefix, allow_missing=True)
644 if tgs_supported_enctypes is not None:
645 c.set_tgs_supported_enctypes(tgs_supported_enctypes)
646 ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
647 prefix, allow_missing=True)
648 if ap_supported_enctypes is not None:
649 c.set_ap_supported_enctypes(ap_supported_enctypes)
651 if require_strongest_key:
652 kvno_allow_missing = False
653 if password is None:
654 aes256_allow_missing = False
655 else:
656 aes256_allow_missing = True
657 else:
658 kvno_allow_missing = allow_missing_keys
659 aes256_allow_missing = allow_missing_keys
660 kvno = self.env_get_var('KVNO', prefix,
661 fallback_default=False,
662 allow_missing=kvno_allow_missing)
663 if kvno is not None:
664 c.set_kvno(kvno)
665 aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
666 fallback_default=False,
667 allow_missing=aes256_allow_missing)
668 if aes256_key is not None:
669 c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
670 aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
671 fallback_default=False,
672 allow_missing=True)
673 if aes128_key is not None:
674 c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
675 rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
676 fallback_default=False, allow_missing=True)
677 if rc4_key is not None:
678 c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
680 if not allow_missing_keys:
681 self.assertTrue(c.forced_keys,
682 'Please supply %s encryption keys '
683 'in environment' % prefix)
685 return c
687 def _get_krb5_creds(self,
688 prefix,
689 default_username=None,
690 allow_missing_password=False,
691 allow_missing_keys=True,
692 require_strongest_key=False,
693 fallback_creds_fn=None):
694 if prefix in self.creds_dict:
695 return self.creds_dict[prefix]
697 # We don't have the credentials already
698 creds = None
699 env_err = None
700 try:
701 # Try to obtain them from the environment
702 creds = self._get_krb5_creds_from_env(
703 prefix,
704 default_username=default_username,
705 allow_missing_password=allow_missing_password,
706 allow_missing_keys=allow_missing_keys,
707 require_strongest_key=require_strongest_key)
708 except Exception as err:
709 # An error occurred, so save it for later
710 env_err = err
711 else:
712 self.assertIsNotNone(creds)
713 # Save the obtained credentials
714 self.creds_dict[prefix] = creds
715 return creds
717 if fallback_creds_fn is not None:
718 try:
719 # Try to use the fallback method
720 creds = fallback_creds_fn()
721 except Exception as err:
722 print("ERROR FROM ENV: %r" % (env_err))
723 print("FALLBACK-FN: %s" % (fallback_creds_fn))
724 print("FALLBACK-ERROR: %r" % (err))
725 else:
726 self.assertIsNotNone(creds)
727 # Save the obtained credentials
728 self.creds_dict[prefix] = creds
729 return creds
731 # Both methods failed, so raise the exception from the
732 # environment method
733 raise env_err
735 def get_user_creds(self,
736 allow_missing_password=False,
737 allow_missing_keys=True):
738 c = self._get_krb5_creds(prefix=None,
739 allow_missing_password=allow_missing_password,
740 allow_missing_keys=allow_missing_keys)
741 return c
743 def get_service_creds(self,
744 allow_missing_password=False,
745 allow_missing_keys=True):
746 c = self._get_krb5_creds(prefix='SERVICE',
747 allow_missing_password=allow_missing_password,
748 allow_missing_keys=allow_missing_keys)
749 return c
751 def get_client_creds(self,
752 allow_missing_password=False,
753 allow_missing_keys=True):
754 c = self._get_krb5_creds(prefix='CLIENT',
755 allow_missing_password=allow_missing_password,
756 allow_missing_keys=allow_missing_keys)
757 return c
759 def get_server_creds(self,
760 allow_missing_password=False,
761 allow_missing_keys=True):
762 c = self._get_krb5_creds(prefix='SERVER',
763 allow_missing_password=allow_missing_password,
764 allow_missing_keys=allow_missing_keys)
765 return c
767 def get_admin_creds(self,
768 allow_missing_password=False,
769 allow_missing_keys=True):
770 c = self._get_krb5_creds(prefix='ADMIN',
771 allow_missing_password=allow_missing_password,
772 allow_missing_keys=allow_missing_keys)
773 c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
774 return c
776 def get_rodc_krbtgt_creds(self,
777 require_keys=True,
778 require_strongest_key=False):
779 if require_strongest_key:
780 self.assertTrue(require_keys)
781 c = self._get_krb5_creds(prefix='RODC_KRBTGT',
782 allow_missing_password=True,
783 allow_missing_keys=not require_keys,
784 require_strongest_key=require_strongest_key)
785 return c
787 def get_krbtgt_creds(self,
788 require_keys=True,
789 require_strongest_key=False):
790 if require_strongest_key:
791 self.assertTrue(require_keys)
792 c = self._get_krb5_creds(prefix='KRBTGT',
793 default_username='krbtgt',
794 allow_missing_password=True,
795 allow_missing_keys=not require_keys,
796 require_strongest_key=require_strongest_key)
797 return c
799 def get_anon_creds(self):
800 c = Credentials()
801 c.set_anonymous()
802 return c
804 def asn1_dump(self, name, obj, asn1_print=None):
805 if asn1_print is None:
806 asn1_print = self.do_asn1_print
807 if asn1_print:
808 if name is not None:
809 sys.stderr.write("%s:\n%s" % (name, obj))
810 else:
811 sys.stderr.write("%s" % (obj))
813 def hex_dump(self, name, blob, hexdump=None):
814 if hexdump is None:
815 hexdump = self.do_hexdump
816 if hexdump:
817 sys.stderr.write(
818 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
820 def der_decode(
821 self,
822 blob,
823 asn1Spec=None,
824 native_encode=True,
825 asn1_print=None,
826 hexdump=None):
827 if asn1Spec is not None:
828 class_name = type(asn1Spec).__name__.split(':')[0]
829 else:
830 class_name = "<None-asn1Spec>"
831 self.hex_dump(class_name, blob, hexdump=hexdump)
832 obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
833 self.asn1_dump(None, obj, asn1_print=asn1_print)
834 if native_encode:
835 obj = pyasn1_native_encode(obj)
836 return obj
838 def der_encode(
839 self,
840 obj,
841 asn1Spec=None,
842 native_decode=True,
843 asn1_print=None,
844 hexdump=None):
845 if native_decode:
846 obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
847 class_name = type(obj).__name__.split(':')[0]
848 if class_name is not None:
849 self.asn1_dump(None, obj, asn1_print=asn1_print)
850 blob = pyasn1_der_encode(obj)
851 if class_name is not None:
852 self.hex_dump(class_name, blob, hexdump=hexdump)
853 return blob
855 def send_pdu(self, req, asn1_print=None, hexdump=None):
856 try:
857 k5_pdu = self.der_encode(
858 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
859 header = struct.pack('>I', len(k5_pdu))
860 req_pdu = header
861 req_pdu += k5_pdu
862 self.hex_dump("send_pdu", header, hexdump=hexdump)
863 self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
864 while True:
865 sent = self.s.send(req_pdu, 0)
866 if sent == len(req_pdu):
867 break
868 req_pdu = req_pdu[sent:]
869 except socket.error as e:
870 self._disconnect("send_pdu: %s" % e)
871 raise
872 except IOError as e:
873 self._disconnect("send_pdu: %s" % e)
874 raise
876 def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
877 rep_pdu = None
878 try:
879 if timeout is not None:
880 self.s.settimeout(timeout)
881 rep_pdu = self.s.recv(num_recv, 0)
882 self.s.settimeout(10)
883 if len(rep_pdu) == 0:
884 self._disconnect("recv_raw: EOF")
885 return None
886 self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
887 except socket.timeout:
888 self.s.settimeout(10)
889 sys.stderr.write("recv_raw: TIMEOUT\n")
890 except socket.error as e:
891 self._disconnect("recv_raw: %s" % e)
892 raise
893 except IOError as e:
894 self._disconnect("recv_raw: %s" % e)
895 raise
896 return rep_pdu
898 def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
899 rep_pdu = None
900 rep = None
901 raw_pdu = self.recv_raw(
902 num_recv=4, hexdump=hexdump, timeout=timeout)
903 if raw_pdu is None:
904 return (None, None)
905 header = struct.unpack(">I", raw_pdu[0:4])
906 k5_len = header[0]
907 if k5_len == 0:
908 return (None, "")
909 missing = k5_len
910 rep_pdu = b''
911 while missing > 0:
912 raw_pdu = self.recv_raw(
913 num_recv=missing, hexdump=hexdump, timeout=timeout)
914 self.assertGreaterEqual(len(raw_pdu), 1)
915 rep_pdu += raw_pdu
916 missing = k5_len - len(rep_pdu)
917 k5_raw = self.der_decode(
918 rep_pdu,
919 asn1Spec=None,
920 native_encode=False,
921 asn1_print=False,
922 hexdump=False)
923 pvno = k5_raw['field-0']
924 self.assertEqual(pvno, 5)
925 msg_type = k5_raw['field-1']
926 self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
927 if msg_type == KRB_AS_REP:
928 asn1Spec = krb5_asn1.AS_REP()
929 elif msg_type == KRB_TGS_REP:
930 asn1Spec = krb5_asn1.TGS_REP()
931 elif msg_type == KRB_ERROR:
932 asn1Spec = krb5_asn1.KRB_ERROR()
933 rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
934 asn1_print=asn1_print, hexdump=False)
935 return (rep, rep_pdu)
937 def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
938 (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
939 hexdump=hexdump,
940 timeout=timeout)
941 return rep
943 def assertIsConnected(self):
944 self.assertIsNotNone(self.s, msg="Not connected")
946 def assertNotConnected(self):
947 self.assertIsNone(self.s, msg="Is connected")
949 def send_recv_transaction(
950 self,
951 req,
952 asn1_print=None,
953 hexdump=None,
954 timeout=None,
955 to_rodc=False):
956 host = self.host if to_rodc else self.dc_host
957 self.connect(host)
958 try:
959 self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
960 rep = self.recv_pdu(
961 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
962 except Exception:
963 self._disconnect("transaction failed")
964 raise
965 self._disconnect("transaction done")
966 return rep
968 def assertNoValue(self, value):
969 self.assertTrue(value.isNoValue)
971 def assertHasValue(self, value):
972 self.assertIsNotNone(value)
974 def getElementValue(self, obj, elem):
975 return obj.get(elem)
977 def assertElementMissing(self, obj, elem):
978 v = self.getElementValue(obj, elem)
979 self.assertIsNone(v)
981 def assertElementPresent(self, obj, elem, expect_empty=False):
982 v = self.getElementValue(obj, elem)
983 self.assertIsNotNone(v)
984 if self.strict_checking:
985 if isinstance(v, collections.abc.Container):
986 if expect_empty:
987 self.assertEqual(0, len(v))
988 else:
989 self.assertNotEqual(0, len(v))
991 def assertElementEqual(self, obj, elem, value):
992 v = self.getElementValue(obj, elem)
993 self.assertIsNotNone(v)
994 self.assertEqual(v, value)
996 def assertElementEqualUTF8(self, obj, elem, value):
997 v = self.getElementValue(obj, elem)
998 self.assertIsNotNone(v)
999 self.assertEqual(v, bytes(value, 'utf8'))
1001 def assertPrincipalEqual(self, princ1, princ2):
1002 self.assertEqual(princ1['name-type'], princ2['name-type'])
1003 self.assertEqual(
1004 len(princ1['name-string']),
1005 len(princ2['name-string']),
1006 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1007 for idx in range(len(princ1['name-string'])):
1008 self.assertEqual(
1009 princ1['name-string'][idx],
1010 princ2['name-string'][idx],
1011 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1013 def assertElementEqualPrincipal(self, obj, elem, value):
1014 v = self.getElementValue(obj, elem)
1015 self.assertIsNotNone(v)
1016 v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
1017 self.assertPrincipalEqual(v, value)
1019 def assertElementKVNO(self, obj, elem, value):
1020 v = self.getElementValue(obj, elem)
1021 if value == "autodetect":
1022 value = v
1023 if value is not None:
1024 self.assertIsNotNone(v)
1025 # The value on the wire should never be 0
1026 self.assertNotEqual(v, 0)
1027 # unspecified_kvno means we don't know the kvno,
1028 # but want to enforce its presence
1029 if value is not self.unspecified_kvno:
1030 value = int(value)
1031 self.assertNotEqual(value, 0)
1032 self.assertEqual(v, value)
1033 else:
1034 self.assertIsNone(v)
1036 def assertElementFlags(self, obj, elem, expected, unexpected):
1037 v = self.getElementValue(obj, elem)
1038 self.assertIsNotNone(v)
1039 if expected is not None:
1040 self.assertIsInstance(expected, krb5_asn1.KDCOptions)
1041 for i, flag in enumerate(expected):
1042 if flag == 1:
1043 self.assertEqual('1', v[i],
1044 f"'{expected.namedValues[i]}' "
1045 f"expected in {v}")
1046 if unexpected is not None:
1047 self.assertIsInstance(unexpected, krb5_asn1.KDCOptions)
1048 for i, flag in enumerate(unexpected):
1049 if flag == 1:
1050 self.assertEqual('0', v[i],
1051 f"'{unexpected.namedValues[i]}' "
1052 f"unexpected in {v}")
1054 def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
1055 if epoch is None:
1056 epoch = time.time()
1057 if offset is not None:
1058 epoch = epoch + int(offset)
1059 dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
1060 return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
1062 def get_KerberosTime(self, epoch=None, offset=None):
1063 (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
1064 return s
1066 def get_EpochFromKerberosTime(self, kerberos_time):
1067 if isinstance(kerberos_time, bytes):
1068 kerberos_time = kerberos_time.decode()
1070 epoch = datetime.datetime.strptime(kerberos_time,
1071 '%Y%m%d%H%M%SZ')
1072 epoch = epoch.replace(tzinfo=datetime.timezone.utc)
1073 epoch = int(epoch.timestamp())
1075 return epoch
1077 def get_Nonce(self):
1078 nonce_min = 0x7f000000
1079 nonce_max = 0x7fffffff
1080 v = random.randint(nonce_min, nonce_max)
1081 return v
1083 def get_pa_dict(self, pa_data):
1084 pa_dict = {}
1086 if pa_data is not None:
1087 for pa in pa_data:
1088 pa_type = pa['padata-type']
1089 if pa_type in pa_dict:
1090 raise RuntimeError(f'Duplicate type {pa_type}')
1091 pa_dict[pa_type] = pa['padata-value']
1093 return pa_dict
1095 def SessionKey_create(self, etype, contents, kvno=None):
1096 key = kcrypto.Key(etype, contents)
1097 return RodcPacEncryptionKey(key, kvno)
1099 def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None):
1100 self.assertIsNotNone(pwd)
1101 self.assertIsNotNone(salt)
1102 key = kcrypto.string_to_key(etype, pwd, salt)
1103 return RodcPacEncryptionKey(key, kvno)
1105 def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
1106 e = etype_info2['etype']
1108 salt = etype_info2.get('salt')
1110 if e == kcrypto.Enctype.RC4:
1111 nthash = creds.get_nt_hash()
1112 return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
1114 password = creds.get_password()
1115 return self.PasswordKey_create(
1116 etype=e, pwd=password, salt=salt, kvno=kvno)
1118 def TicketDecryptionKey_from_creds(self, creds, etype=None):
1120 if etype is None:
1121 etypes = creds.get_tgs_krb5_etypes()
1122 if etypes:
1123 etype = etypes[0]
1124 else:
1125 etype = kcrypto.Enctype.RC4
1127 forced_key = creds.get_forced_key(etype)
1128 if forced_key is not None:
1129 return forced_key
1131 kvno = creds.get_kvno()
1133 fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
1134 "nor a password specified, " % (
1135 creds.get_username(), etype, kvno))
1137 if etype == kcrypto.Enctype.RC4:
1138 nthash = creds.get_nt_hash()
1139 self.assertIsNotNone(nthash, msg=fail_msg)
1140 return self.SessionKey_create(etype=etype,
1141 contents=nthash,
1142 kvno=kvno)
1144 password = creds.get_password()
1145 self.assertIsNotNone(password, msg=fail_msg)
1146 salt = creds.get_salt()
1147 return self.PasswordKey_create(etype=etype,
1148 pwd=password,
1149 salt=salt,
1150 kvno=kvno)
1152 def RandomKey(self, etype):
1153 e = kcrypto._get_enctype_profile(etype)
1154 contents = samba.generate_random_bytes(e.keysize)
1155 return self.SessionKey_create(etype=etype, contents=contents)
1157 def EncryptionKey_import(self, EncryptionKey_obj):
1158 return self.SessionKey_create(EncryptionKey_obj['keytype'],
1159 EncryptionKey_obj['keyvalue'])
1161 def EncryptedData_create(self, key, usage, plaintext):
1162 # EncryptedData ::= SEQUENCE {
1163 # etype [0] Int32 -- EncryptionType --,
1164 # kvno [1] Int32 OPTIONAL,
1165 # cipher [2] OCTET STRING -- ciphertext
1167 ciphertext = key.encrypt(usage, plaintext)
1168 EncryptedData_obj = {
1169 'etype': key.etype,
1170 'cipher': ciphertext
1172 if key.kvno is not None:
1173 EncryptedData_obj['kvno'] = key.kvno
1174 return EncryptedData_obj
1176 def Checksum_create(self, key, usage, plaintext, ctype=None):
1177 # Checksum ::= SEQUENCE {
1178 # cksumtype [0] Int32,
1179 # checksum [1] OCTET STRING
1181 if ctype is None:
1182 ctype = key.ctype
1183 checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1184 Checksum_obj = {
1185 'cksumtype': ctype,
1186 'checksum': checksum,
1188 return Checksum_obj
1190 @classmethod
1191 def PrincipalName_create(cls, name_type, names):
1192 # PrincipalName ::= SEQUENCE {
1193 # name-type [0] Int32,
1194 # name-string [1] SEQUENCE OF KerberosString
1196 PrincipalName_obj = {
1197 'name-type': name_type,
1198 'name-string': names,
1200 return PrincipalName_obj
1202 def AuthorizationData_create(self, ad_type, ad_data):
1203 # AuthorizationData ::= SEQUENCE {
1204 # ad-type [0] Int32,
1205 # ad-data [1] OCTET STRING
1207 AUTH_DATA_obj = {
1208 'ad-type': ad_type,
1209 'ad-data': ad_data
1211 return AUTH_DATA_obj
1213 def PA_DATA_create(self, padata_type, padata_value):
1214 # PA-DATA ::= SEQUENCE {
1215 # -- NOTE: first tag is [1], not [0]
1216 # padata-type [1] Int32,
1217 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1219 PA_DATA_obj = {
1220 'padata-type': padata_type,
1221 'padata-value': padata_value,
1223 return PA_DATA_obj
1225 def PA_ENC_TS_ENC_create(self, ts, usec):
1226 # PA-ENC-TS-ENC ::= SEQUENCE {
1227 # patimestamp[0] KerberosTime, -- client's time
1228 # pausec[1] krb5int32 OPTIONAL
1230 PA_ENC_TS_ENC_obj = {
1231 'patimestamp': ts,
1232 'pausec': usec,
1234 return PA_ENC_TS_ENC_obj
1236 def PA_PAC_OPTIONS_create(self, options):
1237 # PA-PAC-OPTIONS ::= SEQUENCE {
1238 # options [0] PACOptionFlags
1240 PA_PAC_OPTIONS_obj = {
1241 'options': options
1243 return PA_PAC_OPTIONS_obj
1245 def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1246 # KrbFastArmor ::= SEQUENCE {
1247 # armor-type [0] Int32,
1248 # armor-value [1] OCTET STRING,
1249 # ...
1251 KRB_FAST_ARMOR_obj = {
1252 'armor-type': armor_type,
1253 'armor-value': armor_value
1255 return KRB_FAST_ARMOR_obj
1257 def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1258 # KrbFastReq ::= SEQUENCE {
1259 # fast-options [0] FastOptions,
1260 # padata [1] SEQUENCE OF PA-DATA,
1261 # req-body [2] KDC-REQ-BODY,
1262 # ...
1264 KRB_FAST_REQ_obj = {
1265 'fast-options': fast_options,
1266 'padata': padata,
1267 'req-body': req_body
1269 return KRB_FAST_REQ_obj
1271 def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1272 # KrbFastArmoredReq ::= SEQUENCE {
1273 # armor [0] KrbFastArmor OPTIONAL,
1274 # req-checksum [1] Checksum,
1275 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1277 KRB_FAST_ARMORED_REQ_obj = {
1278 'req-checksum': req_checksum,
1279 'enc-fast-req': enc_fast_req
1281 if armor is not None:
1282 KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1283 return KRB_FAST_ARMORED_REQ_obj
1285 def PA_FX_FAST_REQUEST_create(self, armored_data):
1286 # PA-FX-FAST-REQUEST ::= CHOICE {
1287 # armored-data [0] KrbFastArmoredReq,
1288 # ...
1290 PA_FX_FAST_REQUEST_obj = {
1291 'armored-data': armored_data
1293 return PA_FX_FAST_REQUEST_obj
1295 def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1296 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1297 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1298 # -- include PAC.
1299 # --If FALSE, and PAC present,
1300 # -- remove PAC.
1302 KERB_PA_PAC_REQUEST_obj = {
1303 'include-pac': include_pac,
1305 if not pa_data_create:
1306 return KERB_PA_PAC_REQUEST_obj
1307 pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1308 asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1309 pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1310 return pa_data
1312 def get_pa_pac_options(self, options):
1313 pac_options = self.PA_PAC_OPTIONS_create(options)
1314 pac_options = self.der_encode(pac_options,
1315 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
1316 pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options)
1318 return pac_options
1320 def KDC_REQ_BODY_create(self,
1321 kdc_options,
1322 cname,
1323 realm,
1324 sname,
1325 from_time,
1326 till_time,
1327 renew_time,
1328 nonce,
1329 etypes,
1330 addresses,
1331 additional_tickets,
1332 EncAuthorizationData,
1333 EncAuthorizationData_key,
1334 EncAuthorizationData_usage,
1335 asn1_print=None,
1336 hexdump=None):
1337 # KDC-REQ-BODY ::= SEQUENCE {
1338 # kdc-options [0] KDCOptions,
1339 # cname [1] PrincipalName OPTIONAL
1340 # -- Used only in AS-REQ --,
1341 # realm [2] Realm
1342 # -- Server's realm
1343 # -- Also client's in AS-REQ --,
1344 # sname [3] PrincipalName OPTIONAL,
1345 # from [4] KerberosTime OPTIONAL,
1346 # till [5] KerberosTime,
1347 # rtime [6] KerberosTime OPTIONAL,
1348 # nonce [7] UInt32,
1349 # etype [8] SEQUENCE OF Int32
1350 # -- EncryptionType
1351 # -- in preference order --,
1352 # addresses [9] HostAddresses OPTIONAL,
1353 # enc-authorization-data [10] EncryptedData OPTIONAL
1354 # -- AuthorizationData --,
1355 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1356 # -- NOTE: not empty
1358 if EncAuthorizationData is not None:
1359 enc_ad_plain = self.der_encode(
1360 EncAuthorizationData,
1361 asn1Spec=krb5_asn1.AuthorizationData(),
1362 asn1_print=asn1_print,
1363 hexdump=hexdump)
1364 enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1365 EncAuthorizationData_usage,
1366 enc_ad_plain)
1367 else:
1368 enc_ad = None
1369 KDC_REQ_BODY_obj = {
1370 'kdc-options': kdc_options,
1371 'realm': realm,
1372 'till': till_time,
1373 'nonce': nonce,
1374 'etype': etypes,
1376 if cname is not None:
1377 KDC_REQ_BODY_obj['cname'] = cname
1378 if sname is not None:
1379 KDC_REQ_BODY_obj['sname'] = sname
1380 if from_time is not None:
1381 KDC_REQ_BODY_obj['from'] = from_time
1382 if renew_time is not None:
1383 KDC_REQ_BODY_obj['rtime'] = renew_time
1384 if addresses is not None:
1385 KDC_REQ_BODY_obj['addresses'] = addresses
1386 if enc_ad is not None:
1387 KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1388 if additional_tickets is not None:
1389 KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1390 return KDC_REQ_BODY_obj
1392 def KDC_REQ_create(self,
1393 msg_type,
1394 padata,
1395 req_body,
1396 asn1Spec=None,
1397 asn1_print=None,
1398 hexdump=None):
1399 # KDC-REQ ::= SEQUENCE {
1400 # -- NOTE: first tag is [1], not [0]
1401 # pvno [1] INTEGER (5) ,
1402 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1403 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1404 # -- NOTE: not empty --,
1405 # req-body [4] KDC-REQ-BODY
1408 KDC_REQ_obj = {
1409 'pvno': 5,
1410 'msg-type': msg_type,
1411 'req-body': req_body,
1413 if padata is not None:
1414 KDC_REQ_obj['padata'] = padata
1415 if asn1Spec is not None:
1416 KDC_REQ_decoded = pyasn1_native_decode(
1417 KDC_REQ_obj, asn1Spec=asn1Spec)
1418 else:
1419 KDC_REQ_decoded = None
1420 return KDC_REQ_obj, KDC_REQ_decoded
1422 def AS_REQ_create(self,
1423 padata, # optional
1424 kdc_options, # required
1425 cname, # optional
1426 realm, # required
1427 sname, # optional
1428 from_time, # optional
1429 till_time, # required
1430 renew_time, # optional
1431 nonce, # required
1432 etypes, # required
1433 addresses, # optional
1434 additional_tickets,
1435 native_decoded_only=True,
1436 asn1_print=None,
1437 hexdump=None):
1438 # KDC-REQ ::= SEQUENCE {
1439 # -- NOTE: first tag is [1], not [0]
1440 # pvno [1] INTEGER (5) ,
1441 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1442 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1443 # -- NOTE: not empty --,
1444 # req-body [4] KDC-REQ-BODY
1447 # KDC-REQ-BODY ::= SEQUENCE {
1448 # kdc-options [0] KDCOptions,
1449 # cname [1] PrincipalName OPTIONAL
1450 # -- Used only in AS-REQ --,
1451 # realm [2] Realm
1452 # -- Server's realm
1453 # -- Also client's in AS-REQ --,
1454 # sname [3] PrincipalName OPTIONAL,
1455 # from [4] KerberosTime OPTIONAL,
1456 # till [5] KerberosTime,
1457 # rtime [6] KerberosTime OPTIONAL,
1458 # nonce [7] UInt32,
1459 # etype [8] SEQUENCE OF Int32
1460 # -- EncryptionType
1461 # -- in preference order --,
1462 # addresses [9] HostAddresses OPTIONAL,
1463 # enc-authorization-data [10] EncryptedData OPTIONAL
1464 # -- AuthorizationData --,
1465 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1466 # -- NOTE: not empty
1468 KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1469 kdc_options,
1470 cname,
1471 realm,
1472 sname,
1473 from_time,
1474 till_time,
1475 renew_time,
1476 nonce,
1477 etypes,
1478 addresses,
1479 additional_tickets,
1480 EncAuthorizationData=None,
1481 EncAuthorizationData_key=None,
1482 EncAuthorizationData_usage=None,
1483 asn1_print=asn1_print,
1484 hexdump=hexdump)
1485 obj, decoded = self.KDC_REQ_create(
1486 msg_type=KRB_AS_REQ,
1487 padata=padata,
1488 req_body=KDC_REQ_BODY_obj,
1489 asn1Spec=krb5_asn1.AS_REQ(),
1490 asn1_print=asn1_print,
1491 hexdump=hexdump)
1492 if native_decoded_only:
1493 return decoded
1494 return decoded, obj
1496 def AP_REQ_create(self, ap_options, ticket, authenticator):
1497 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1498 # pvno [0] INTEGER (5),
1499 # msg-type [1] INTEGER (14),
1500 # ap-options [2] APOptions,
1501 # ticket [3] Ticket,
1502 # authenticator [4] EncryptedData -- Authenticator
1504 AP_REQ_obj = {
1505 'pvno': 5,
1506 'msg-type': KRB_AP_REQ,
1507 'ap-options': ap_options,
1508 'ticket': ticket,
1509 'authenticator': authenticator,
1511 return AP_REQ_obj
1513 def Authenticator_create(
1514 self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1515 authorization_data):
1516 # -- Unencrypted authenticator
1517 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1518 # authenticator-vno [0] INTEGER (5),
1519 # crealm [1] Realm,
1520 # cname [2] PrincipalName,
1521 # cksum [3] Checksum OPTIONAL,
1522 # cusec [4] Microseconds,
1523 # ctime [5] KerberosTime,
1524 # subkey [6] EncryptionKey OPTIONAL,
1525 # seq-number [7] UInt32 OPTIONAL,
1526 # authorization-data [8] AuthorizationData OPTIONAL
1528 Authenticator_obj = {
1529 'authenticator-vno': 5,
1530 'crealm': crealm,
1531 'cname': cname,
1532 'cusec': cusec,
1533 'ctime': ctime,
1535 if cksum is not None:
1536 Authenticator_obj['cksum'] = cksum
1537 if subkey is not None:
1538 Authenticator_obj['subkey'] = subkey
1539 if seq_number is not None:
1540 Authenticator_obj['seq-number'] = seq_number
1541 if authorization_data is not None:
1542 Authenticator_obj['authorization-data'] = authorization_data
1543 return Authenticator_obj
1545 def TGS_REQ_create(self,
1546 padata, # optional
1547 cusec,
1548 ctime,
1549 ticket,
1550 kdc_options, # required
1551 cname, # optional
1552 realm, # required
1553 sname, # optional
1554 from_time, # optional
1555 till_time, # required
1556 renew_time, # optional
1557 nonce, # required
1558 etypes, # required
1559 addresses, # optional
1560 EncAuthorizationData,
1561 EncAuthorizationData_key,
1562 additional_tickets,
1563 ticket_session_key,
1564 authenticator_subkey=None,
1565 body_checksum_type=None,
1566 native_decoded_only=True,
1567 asn1_print=None,
1568 hexdump=None):
1569 # KDC-REQ ::= SEQUENCE {
1570 # -- NOTE: first tag is [1], not [0]
1571 # pvno [1] INTEGER (5) ,
1572 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1573 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1574 # -- NOTE: not empty --,
1575 # req-body [4] KDC-REQ-BODY
1578 # KDC-REQ-BODY ::= SEQUENCE {
1579 # kdc-options [0] KDCOptions,
1580 # cname [1] PrincipalName OPTIONAL
1581 # -- Used only in AS-REQ --,
1582 # realm [2] Realm
1583 # -- Server's realm
1584 # -- Also client's in AS-REQ --,
1585 # sname [3] PrincipalName OPTIONAL,
1586 # from [4] KerberosTime OPTIONAL,
1587 # till [5] KerberosTime,
1588 # rtime [6] KerberosTime OPTIONAL,
1589 # nonce [7] UInt32,
1590 # etype [8] SEQUENCE OF Int32
1591 # -- EncryptionType
1592 # -- in preference order --,
1593 # addresses [9] HostAddresses OPTIONAL,
1594 # enc-authorization-data [10] EncryptedData OPTIONAL
1595 # -- AuthorizationData --,
1596 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1597 # -- NOTE: not empty
1600 if authenticator_subkey is not None:
1601 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1602 else:
1603 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1605 req_body = self.KDC_REQ_BODY_create(
1606 kdc_options=kdc_options,
1607 cname=None,
1608 realm=realm,
1609 sname=sname,
1610 from_time=from_time,
1611 till_time=till_time,
1612 renew_time=renew_time,
1613 nonce=nonce,
1614 etypes=etypes,
1615 addresses=addresses,
1616 additional_tickets=additional_tickets,
1617 EncAuthorizationData=EncAuthorizationData,
1618 EncAuthorizationData_key=EncAuthorizationData_key,
1619 EncAuthorizationData_usage=EncAuthorizationData_usage)
1620 req_body_blob = self.der_encode(req_body,
1621 asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1622 asn1_print=asn1_print, hexdump=hexdump)
1624 req_body_checksum = self.Checksum_create(ticket_session_key,
1625 KU_TGS_REQ_AUTH_CKSUM,
1626 req_body_blob,
1627 ctype=body_checksum_type)
1629 subkey_obj = None
1630 if authenticator_subkey is not None:
1631 subkey_obj = authenticator_subkey.export_obj()
1632 seq_number = random.randint(0, 0xfffffffe)
1633 authenticator = self.Authenticator_create(
1634 crealm=realm,
1635 cname=cname,
1636 cksum=req_body_checksum,
1637 cusec=cusec,
1638 ctime=ctime,
1639 subkey=subkey_obj,
1640 seq_number=seq_number,
1641 authorization_data=None)
1642 authenticator = self.der_encode(
1643 authenticator,
1644 asn1Spec=krb5_asn1.Authenticator(),
1645 asn1_print=asn1_print,
1646 hexdump=hexdump)
1648 authenticator = self.EncryptedData_create(
1649 ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1651 ap_options = krb5_asn1.APOptions('0')
1652 ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1653 ticket=ticket,
1654 authenticator=authenticator)
1655 ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1656 asn1_print=asn1_print, hexdump=hexdump)
1657 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1658 if padata is not None:
1659 padata.append(pa_tgs_req)
1660 else:
1661 padata = [pa_tgs_req]
1663 obj, decoded = self.KDC_REQ_create(
1664 msg_type=KRB_TGS_REQ,
1665 padata=padata,
1666 req_body=req_body,
1667 asn1Spec=krb5_asn1.TGS_REQ(),
1668 asn1_print=asn1_print,
1669 hexdump=hexdump)
1670 if native_decoded_only:
1671 return decoded
1672 return decoded, obj
1674 def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1675 # PA-S4U2Self ::= SEQUENCE {
1676 # name [0] PrincipalName,
1677 # realm [1] Realm,
1678 # cksum [2] Checksum,
1679 # auth [3] GeneralString
1681 cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1682 for n in name['name-string']:
1683 cksum_data += n.encode()
1684 cksum_data += realm.encode()
1685 cksum_data += "Kerberos".encode()
1686 cksum = self.Checksum_create(tgt_session_key,
1687 KU_NON_KERB_CKSUM_SALT,
1688 cksum_data,
1689 ctype)
1691 PA_S4U2Self_obj = {
1692 'name': name,
1693 'realm': realm,
1694 'cksum': cksum,
1695 'auth': "Kerberos",
1697 pa_s4u2self = self.der_encode(
1698 PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
1699 return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
1701 def _generic_kdc_exchange(self,
1702 kdc_exchange_dict, # required
1703 cname=None, # optional
1704 realm=None, # required
1705 sname=None, # optional
1706 from_time=None, # optional
1707 till_time=None, # required
1708 renew_time=None, # optional
1709 etypes=None, # required
1710 addresses=None, # optional
1711 additional_tickets=None, # optional
1712 EncAuthorizationData=None, # optional
1713 EncAuthorizationData_key=None, # optional
1714 EncAuthorizationData_usage=None): # optional
1716 check_error_fn = kdc_exchange_dict['check_error_fn']
1717 check_rep_fn = kdc_exchange_dict['check_rep_fn']
1718 generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
1719 generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
1720 generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
1721 generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
1722 callback_dict = kdc_exchange_dict['callback_dict']
1723 req_msg_type = kdc_exchange_dict['req_msg_type']
1724 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
1725 rep_msg_type = kdc_exchange_dict['rep_msg_type']
1727 expected_error_mode = kdc_exchange_dict['expected_error_mode']
1728 kdc_options = kdc_exchange_dict['kdc_options']
1730 pac_request = kdc_exchange_dict['pac_request']
1731 pac_options = kdc_exchange_dict['pac_options']
1733 # Parameters specific to the inner request body
1734 inner_req = kdc_exchange_dict['inner_req']
1736 # Parameters specific to the outer request body
1737 outer_req = kdc_exchange_dict['outer_req']
1739 if till_time is None:
1740 till_time = self.get_KerberosTime(offset=36000)
1742 if 'nonce' in kdc_exchange_dict:
1743 nonce = kdc_exchange_dict['nonce']
1744 else:
1745 nonce = self.get_Nonce()
1746 kdc_exchange_dict['nonce'] = nonce
1748 req_body = self.KDC_REQ_BODY_create(
1749 kdc_options=kdc_options,
1750 cname=cname,
1751 realm=realm,
1752 sname=sname,
1753 from_time=from_time,
1754 till_time=till_time,
1755 renew_time=renew_time,
1756 nonce=nonce,
1757 etypes=etypes,
1758 addresses=addresses,
1759 additional_tickets=additional_tickets,
1760 EncAuthorizationData=EncAuthorizationData,
1761 EncAuthorizationData_key=EncAuthorizationData_key,
1762 EncAuthorizationData_usage=EncAuthorizationData_usage)
1764 inner_req_body = dict(req_body)
1765 if inner_req is not None:
1766 for key, value in inner_req.items():
1767 if value is not None:
1768 inner_req_body[key] = value
1769 else:
1770 del inner_req_body[key]
1771 if outer_req is not None:
1772 for key, value in outer_req.items():
1773 if value is not None:
1774 req_body[key] = value
1775 else:
1776 del req_body[key]
1778 additional_padata = []
1779 if pac_request is not None:
1780 pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request)
1781 additional_padata.append(pa_pac_request)
1782 if pac_options is not None:
1783 pa_pac_options = self.get_pa_pac_options(pac_options)
1784 additional_padata.append(pa_pac_options)
1786 if req_msg_type == KRB_AS_REQ:
1787 tgs_req = None
1788 tgs_req_padata = None
1789 else:
1790 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1792 tgs_req = self.generate_ap_req(kdc_exchange_dict,
1793 callback_dict,
1794 req_body,
1795 armor=False)
1796 tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
1798 if generate_fast_padata_fn is not None:
1799 self.assertIsNotNone(generate_fast_fn)
1800 # This can alter req_body...
1801 fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
1802 callback_dict,
1803 req_body)
1804 else:
1805 fast_padata = []
1807 if generate_fast_armor_fn is not None:
1808 self.assertIsNotNone(generate_fast_fn)
1809 fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
1810 callback_dict,
1811 req_body,
1812 armor=True)
1814 fast_armor_type = kdc_exchange_dict['fast_armor_type']
1815 fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
1816 fast_ap_req)
1817 else:
1818 fast_armor = None
1820 if generate_padata_fn is not None:
1821 # This can alter req_body...
1822 outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
1823 callback_dict,
1824 req_body)
1825 self.assertIsNotNone(outer_padata)
1826 self.assertNotIn(PADATA_KDC_REQ,
1827 [pa['padata-type'] for pa in outer_padata],
1828 'Don\'t create TGS-REQ manually')
1829 else:
1830 outer_padata = None
1832 if generate_fast_fn is not None:
1833 armor_key = kdc_exchange_dict['armor_key']
1834 self.assertIsNotNone(armor_key)
1836 if req_msg_type == KRB_AS_REQ:
1837 checksum_blob = self.der_encode(
1838 req_body,
1839 asn1Spec=krb5_asn1.KDC_REQ_BODY())
1840 else:
1841 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1842 checksum_blob = tgs_req
1844 checksum = self.Checksum_create(armor_key,
1845 KU_FAST_REQ_CHKSUM,
1846 checksum_blob)
1848 fast_padata += additional_padata
1849 fast = generate_fast_fn(kdc_exchange_dict,
1850 callback_dict,
1851 inner_req_body,
1852 fast_padata,
1853 fast_armor,
1854 checksum)
1855 else:
1856 fast = None
1858 padata = []
1860 if tgs_req_padata is not None:
1861 padata.append(tgs_req_padata)
1863 if fast is not None:
1864 padata.append(fast)
1866 if outer_padata is not None:
1867 padata += outer_padata
1869 if fast is None:
1870 padata += additional_padata
1872 if not padata:
1873 padata = None
1875 kdc_exchange_dict['req_padata'] = padata
1876 kdc_exchange_dict['fast_padata'] = fast_padata
1877 kdc_exchange_dict['req_body'] = inner_req_body
1879 req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
1880 padata=padata,
1881 req_body=req_body,
1882 asn1Spec=req_asn1Spec())
1884 to_rodc = kdc_exchange_dict['to_rodc']
1886 rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc)
1887 self.assertIsNotNone(rep)
1889 msg_type = self.getElementValue(rep, 'msg-type')
1890 self.assertIsNotNone(msg_type)
1892 expected_msg_type = None
1893 if check_error_fn is not None:
1894 expected_msg_type = KRB_ERROR
1895 self.assertIsNone(check_rep_fn)
1896 self.assertNotEqual(0, len(expected_error_mode))
1897 self.assertNotIn(0, expected_error_mode)
1898 if check_rep_fn is not None:
1899 expected_msg_type = rep_msg_type
1900 self.assertIsNone(check_error_fn)
1901 self.assertEqual(0, len(expected_error_mode))
1902 self.assertIsNotNone(expected_msg_type)
1903 self.assertEqual(msg_type, expected_msg_type)
1905 if msg_type == KRB_ERROR:
1906 return check_error_fn(kdc_exchange_dict,
1907 callback_dict,
1908 rep)
1910 return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
1912 def as_exchange_dict(self,
1913 expected_crealm=None,
1914 expected_cname=None,
1915 expected_anon=False,
1916 expected_srealm=None,
1917 expected_sname=None,
1918 expected_supported_etypes=None,
1919 expected_flags=None,
1920 unexpected_flags=None,
1921 ticket_decryption_key=None,
1922 generate_fast_fn=None,
1923 generate_fast_armor_fn=None,
1924 generate_fast_padata_fn=None,
1925 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
1926 generate_padata_fn=None,
1927 check_error_fn=None,
1928 check_rep_fn=None,
1929 check_kdc_private_fn=None,
1930 callback_dict=None,
1931 expected_error_mode=0,
1932 expected_status=None,
1933 client_as_etypes=None,
1934 expected_salt=None,
1935 authenticator_subkey=None,
1936 preauth_key=None,
1937 armor_key=None,
1938 armor_tgt=None,
1939 armor_subkey=None,
1940 auth_data=None,
1941 kdc_options='',
1942 inner_req=None,
1943 outer_req=None,
1944 pac_request=None,
1945 pac_options=None,
1946 expect_pac=True,
1947 to_rodc=False):
1948 if expected_error_mode == 0:
1949 expected_error_mode = ()
1950 elif not isinstance(expected_error_mode, collections.abc.Container):
1951 expected_error_mode = (expected_error_mode,)
1953 kdc_exchange_dict = {
1954 'req_msg_type': KRB_AS_REQ,
1955 'req_asn1Spec': krb5_asn1.AS_REQ,
1956 'rep_msg_type': KRB_AS_REP,
1957 'rep_asn1Spec': krb5_asn1.AS_REP,
1958 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
1959 'expected_crealm': expected_crealm,
1960 'expected_cname': expected_cname,
1961 'expected_anon': expected_anon,
1962 'expected_srealm': expected_srealm,
1963 'expected_sname': expected_sname,
1964 'expected_supported_etypes': expected_supported_etypes,
1965 'expected_flags': expected_flags,
1966 'unexpected_flags': unexpected_flags,
1967 'ticket_decryption_key': ticket_decryption_key,
1968 'generate_fast_fn': generate_fast_fn,
1969 'generate_fast_armor_fn': generate_fast_armor_fn,
1970 'generate_fast_padata_fn': generate_fast_padata_fn,
1971 'fast_armor_type': fast_armor_type,
1972 'generate_padata_fn': generate_padata_fn,
1973 'check_error_fn': check_error_fn,
1974 'check_rep_fn': check_rep_fn,
1975 'check_kdc_private_fn': check_kdc_private_fn,
1976 'callback_dict': callback_dict,
1977 'expected_error_mode': expected_error_mode,
1978 'expected_status': expected_status,
1979 'client_as_etypes': client_as_etypes,
1980 'expected_salt': expected_salt,
1981 'authenticator_subkey': authenticator_subkey,
1982 'preauth_key': preauth_key,
1983 'armor_key': armor_key,
1984 'armor_tgt': armor_tgt,
1985 'armor_subkey': armor_subkey,
1986 'auth_data': auth_data,
1987 'kdc_options': kdc_options,
1988 'inner_req': inner_req,
1989 'outer_req': outer_req,
1990 'pac_request': pac_request,
1991 'pac_options': pac_options,
1992 'expect_pac': expect_pac,
1993 'to_rodc': to_rodc
1995 if callback_dict is None:
1996 callback_dict = {}
1998 return kdc_exchange_dict
2000 def tgs_exchange_dict(self,
2001 expected_crealm=None,
2002 expected_cname=None,
2003 expected_anon=False,
2004 expected_srealm=None,
2005 expected_sname=None,
2006 expected_supported_etypes=None,
2007 expected_flags=None,
2008 unexpected_flags=None,
2009 ticket_decryption_key=None,
2010 generate_fast_fn=None,
2011 generate_fast_armor_fn=None,
2012 generate_fast_padata_fn=None,
2013 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2014 generate_padata_fn=None,
2015 check_error_fn=None,
2016 check_rep_fn=None,
2017 check_kdc_private_fn=None,
2018 expected_error_mode=0,
2019 expected_status=None,
2020 callback_dict=None,
2021 tgt=None,
2022 armor_key=None,
2023 armor_tgt=None,
2024 armor_subkey=None,
2025 authenticator_subkey=None,
2026 auth_data=None,
2027 body_checksum_type=None,
2028 kdc_options='',
2029 inner_req=None,
2030 outer_req=None,
2031 pac_request=None,
2032 pac_options=None,
2033 expect_pac=True,
2034 to_rodc=False):
2035 if expected_error_mode == 0:
2036 expected_error_mode = ()
2037 elif not isinstance(expected_error_mode, collections.abc.Container):
2038 expected_error_mode = (expected_error_mode,)
2040 kdc_exchange_dict = {
2041 'req_msg_type': KRB_TGS_REQ,
2042 'req_asn1Spec': krb5_asn1.TGS_REQ,
2043 'rep_msg_type': KRB_TGS_REP,
2044 'rep_asn1Spec': krb5_asn1.TGS_REP,
2045 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
2046 'expected_crealm': expected_crealm,
2047 'expected_cname': expected_cname,
2048 'expected_anon': expected_anon,
2049 'expected_srealm': expected_srealm,
2050 'expected_sname': expected_sname,
2051 'expected_supported_etypes': expected_supported_etypes,
2052 'expected_flags': expected_flags,
2053 'unexpected_flags': unexpected_flags,
2054 'ticket_decryption_key': ticket_decryption_key,
2055 'generate_fast_fn': generate_fast_fn,
2056 'generate_fast_armor_fn': generate_fast_armor_fn,
2057 'generate_fast_padata_fn': generate_fast_padata_fn,
2058 'fast_armor_type': fast_armor_type,
2059 'generate_padata_fn': generate_padata_fn,
2060 'check_error_fn': check_error_fn,
2061 'check_rep_fn': check_rep_fn,
2062 'check_kdc_private_fn': check_kdc_private_fn,
2063 'callback_dict': callback_dict,
2064 'expected_error_mode': expected_error_mode,
2065 'expected_status': expected_status,
2066 'tgt': tgt,
2067 'body_checksum_type': body_checksum_type,
2068 'armor_key': armor_key,
2069 'armor_tgt': armor_tgt,
2070 'armor_subkey': armor_subkey,
2071 'auth_data': auth_data,
2072 'authenticator_subkey': authenticator_subkey,
2073 'kdc_options': kdc_options,
2074 'inner_req': inner_req,
2075 'outer_req': outer_req,
2076 'pac_request': pac_request,
2077 'pac_options': pac_options,
2078 'expect_pac': expect_pac,
2079 'to_rodc': to_rodc
2081 if callback_dict is None:
2082 callback_dict = {}
2084 return kdc_exchange_dict
2086 def generic_check_kdc_rep(self,
2087 kdc_exchange_dict,
2088 callback_dict,
2089 rep):
2091 expected_crealm = kdc_exchange_dict['expected_crealm']
2092 expected_anon = kdc_exchange_dict['expected_anon']
2093 expected_srealm = kdc_exchange_dict['expected_srealm']
2094 expected_sname = kdc_exchange_dict['expected_sname']
2095 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2096 check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
2097 rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
2098 msg_type = kdc_exchange_dict['rep_msg_type']
2099 armor_key = kdc_exchange_dict['armor_key']
2101 self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
2102 padata = self.getElementValue(rep, 'padata')
2103 if self.strict_checking:
2104 self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
2105 if expected_anon:
2106 expected_cname = self.PrincipalName_create(
2107 name_type=NT_WELLKNOWN,
2108 names=['WELLKNOWN', 'ANONYMOUS'])
2109 else:
2110 expected_cname = kdc_exchange_dict['expected_cname']
2111 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2112 self.assertElementPresent(rep, 'ticket')
2113 ticket = self.getElementValue(rep, 'ticket')
2114 ticket_encpart = None
2115 ticket_cipher = None
2116 self.assertIsNotNone(ticket)
2117 if ticket is not None: # Never None, but gives indentation
2118 self.assertElementEqual(ticket, 'tkt-vno', 5)
2119 self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
2120 self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
2121 self.assertElementPresent(ticket, 'enc-part')
2122 ticket_encpart = self.getElementValue(ticket, 'enc-part')
2123 self.assertIsNotNone(ticket_encpart)
2124 if ticket_encpart is not None: # Never None, but gives indentation
2125 self.assertElementPresent(ticket_encpart, 'etype')
2126 # 'unspecified' means present, with any value != 0
2127 self.assertElementKVNO(ticket_encpart, 'kvno',
2128 self.unspecified_kvno)
2129 self.assertElementPresent(ticket_encpart, 'cipher')
2130 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
2131 self.assertElementPresent(rep, 'enc-part')
2132 encpart = self.getElementValue(rep, 'enc-part')
2133 encpart_cipher = None
2134 self.assertIsNotNone(encpart)
2135 if encpart is not None: # Never None, but gives indentation
2136 self.assertElementPresent(encpart, 'etype')
2137 self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
2138 self.assertElementPresent(encpart, 'cipher')
2139 encpart_cipher = self.getElementValue(encpart, 'cipher')
2141 ticket_checksum = None
2143 # Get the decryption key for the encrypted part
2144 encpart_decryption_key, encpart_decryption_usage = (
2145 self.get_preauth_key(kdc_exchange_dict))
2147 if armor_key is not None:
2148 pa_dict = self.get_pa_dict(padata)
2150 if PADATA_FX_FAST in pa_dict:
2151 fx_fast_data = pa_dict[PADATA_FX_FAST]
2152 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
2153 fx_fast_data,
2154 armor_key,
2155 finished=True)
2157 if 'strengthen-key' in fast_response:
2158 strengthen_key = self.EncryptionKey_import(
2159 fast_response['strengthen-key'])
2160 encpart_decryption_key = (
2161 self.generate_strengthen_reply_key(
2162 strengthen_key,
2163 encpart_decryption_key))
2165 fast_finished = fast_response.get('finished')
2166 if fast_finished is not None:
2167 ticket_checksum = fast_finished['ticket-checksum']
2169 self.check_rep_padata(kdc_exchange_dict,
2170 callback_dict,
2171 fast_response['padata'],
2172 error_code=0)
2174 ticket_private = None
2175 if ticket_decryption_key is not None:
2176 self.assertElementEqual(ticket_encpart, 'etype',
2177 ticket_decryption_key.etype)
2178 self.assertElementKVNO(ticket_encpart, 'kvno',
2179 ticket_decryption_key.kvno)
2180 ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
2181 ticket_cipher)
2182 ticket_private = self.der_decode(
2183 ticket_decpart,
2184 asn1Spec=krb5_asn1.EncTicketPart())
2186 encpart_private = None
2187 self.assertIsNotNone(encpart_decryption_key)
2188 if encpart_decryption_key is not None:
2189 self.assertElementEqual(encpart, 'etype',
2190 encpart_decryption_key.etype)
2191 if self.strict_checking:
2192 self.assertElementKVNO(encpart, 'kvno',
2193 encpart_decryption_key.kvno)
2194 rep_decpart = encpart_decryption_key.decrypt(
2195 encpart_decryption_usage,
2196 encpart_cipher)
2197 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2198 # application tag 26
2199 try:
2200 encpart_private = self.der_decode(
2201 rep_decpart,
2202 asn1Spec=rep_encpart_asn1Spec())
2203 except Exception:
2204 encpart_private = self.der_decode(
2205 rep_decpart,
2206 asn1Spec=krb5_asn1.EncTGSRepPart())
2208 self.assertIsNotNone(check_kdc_private_fn)
2209 if check_kdc_private_fn is not None:
2210 check_kdc_private_fn(kdc_exchange_dict, callback_dict,
2211 rep, ticket_private, encpart_private,
2212 ticket_checksum)
2214 return rep
2216 def check_fx_fast_data(self,
2217 kdc_exchange_dict,
2218 fx_fast_data,
2219 armor_key,
2220 finished=False,
2221 expect_strengthen_key=True):
2222 fx_fast_data = self.der_decode(fx_fast_data,
2223 asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
2225 enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
2226 self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
2228 fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
2230 fast_response = self.der_decode(fast_rep,
2231 asn1Spec=krb5_asn1.KrbFastResponse())
2233 if expect_strengthen_key and self.strict_checking:
2234 self.assertIn('strengthen-key', fast_response)
2236 if finished:
2237 self.assertIn('finished', fast_response)
2239 # Ensure that the nonce matches the nonce in the body of the request
2240 # (RFC6113 5.4.3).
2241 nonce = kdc_exchange_dict['nonce']
2242 self.assertEqual(nonce, fast_response['nonce'])
2244 return fast_response
2246 def generic_check_kdc_private(self,
2247 kdc_exchange_dict,
2248 callback_dict,
2249 rep,
2250 ticket_private,
2251 encpart_private,
2252 ticket_checksum):
2253 kdc_options = kdc_exchange_dict['kdc_options']
2254 canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
2255 canonicalize = (canon_pos < len(kdc_options)
2256 and kdc_options[canon_pos] == '1')
2257 renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1
2258 renewable = (renewable_pos < len(kdc_options)
2259 and kdc_options[renewable_pos] == '1')
2261 expected_crealm = kdc_exchange_dict['expected_crealm']
2262 expected_cname = kdc_exchange_dict['expected_cname']
2263 expected_srealm = kdc_exchange_dict['expected_srealm']
2264 expected_sname = kdc_exchange_dict['expected_sname']
2265 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2267 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2269 expected_flags = kdc_exchange_dict.get('expected_flags')
2270 unexpected_flags = kdc_exchange_dict.get('unexpected_flags')
2272 ticket = self.getElementValue(rep, 'ticket')
2274 if ticket_checksum is not None:
2275 armor_key = kdc_exchange_dict['armor_key']
2276 self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
2278 to_rodc = kdc_exchange_dict['to_rodc']
2279 if to_rodc:
2280 krbtgt_creds = self.get_rodc_krbtgt_creds()
2281 else:
2282 krbtgt_creds = self.get_krbtgt_creds()
2283 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
2285 expect_pac = kdc_exchange_dict['expect_pac']
2287 ticket_session_key = None
2288 if ticket_private is not None:
2289 self.assertElementFlags(ticket_private, 'flags',
2290 expected_flags,
2291 unexpected_flags)
2292 self.assertElementPresent(ticket_private, 'key')
2293 ticket_key = self.getElementValue(ticket_private, 'key')
2294 self.assertIsNotNone(ticket_key)
2295 if ticket_key is not None: # Never None, but gives indentation
2296 self.assertElementPresent(ticket_key, 'keytype')
2297 self.assertElementPresent(ticket_key, 'keyvalue')
2298 ticket_session_key = self.EncryptionKey_import(ticket_key)
2299 self.assertElementEqualUTF8(ticket_private, 'crealm',
2300 expected_crealm)
2301 if self.strict_checking:
2302 self.assertElementEqualPrincipal(ticket_private, 'cname',
2303 expected_cname)
2304 self.assertElementPresent(ticket_private, 'transited')
2305 self.assertElementPresent(ticket_private, 'authtime')
2306 if self.strict_checking:
2307 self.assertElementPresent(ticket_private, 'starttime')
2308 self.assertElementPresent(ticket_private, 'endtime')
2309 if renewable:
2310 if self.strict_checking:
2311 self.assertElementPresent(ticket_private, 'renew-till')
2312 else:
2313 self.assertElementMissing(ticket_private, 'renew-till')
2314 if self.strict_checking:
2315 self.assertElementEqual(ticket_private, 'caddr', [])
2316 self.assertElementPresent(ticket_private, 'authorization-data',
2317 expect_empty=not expect_pac)
2319 encpart_session_key = None
2320 if encpart_private is not None:
2321 self.assertElementPresent(encpart_private, 'key')
2322 encpart_key = self.getElementValue(encpart_private, 'key')
2323 self.assertIsNotNone(encpart_key)
2324 if encpart_key is not None: # Never None, but gives indentation
2325 self.assertElementPresent(encpart_key, 'keytype')
2326 self.assertElementPresent(encpart_key, 'keyvalue')
2327 encpart_session_key = self.EncryptionKey_import(encpart_key)
2328 self.assertElementPresent(encpart_private, 'last-req')
2329 self.assertElementEqual(encpart_private, 'nonce',
2330 kdc_exchange_dict['nonce'])
2331 if rep_msg_type == KRB_AS_REP:
2332 if self.strict_checking:
2333 self.assertElementPresent(encpart_private,
2334 'key-expiration')
2335 else:
2336 self.assertElementMissing(encpart_private,
2337 'key-expiration')
2338 self.assertElementFlags(encpart_private, 'flags',
2339 expected_flags,
2340 unexpected_flags)
2341 self.assertElementPresent(encpart_private, 'authtime')
2342 if self.strict_checking:
2343 self.assertElementPresent(encpart_private, 'starttime')
2344 self.assertElementPresent(encpart_private, 'endtime')
2345 if renewable:
2346 if self.strict_checking:
2347 self.assertElementPresent(encpart_private, 'renew-till')
2348 else:
2349 self.assertElementMissing(encpart_private, 'renew-till')
2350 self.assertElementEqualUTF8(encpart_private, 'srealm',
2351 expected_srealm)
2352 self.assertElementEqualPrincipal(encpart_private, 'sname',
2353 expected_sname)
2354 if self.strict_checking:
2355 self.assertElementEqual(encpart_private, 'caddr', [])
2357 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2359 if self.strict_checking:
2360 if canonicalize or '1' in sent_pac_options:
2361 self.assertElementPresent(encpart_private,
2362 'encrypted-pa-data')
2363 enc_pa_dict = self.get_pa_dict(
2364 encpart_private['encrypted-pa-data'])
2365 if canonicalize:
2366 self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2368 expected_supported_etypes = kdc_exchange_dict[
2369 'expected_supported_etypes']
2370 expected_supported_etypes |= (
2371 security.KERB_ENCTYPE_DES_CBC_CRC |
2372 security.KERB_ENCTYPE_DES_CBC_MD5 |
2373 security.KERB_ENCTYPE_RC4_HMAC_MD5)
2375 (supported_etypes,) = struct.unpack(
2376 '<L',
2377 enc_pa_dict[PADATA_SUPPORTED_ETYPES])
2379 self.assertEqual(supported_etypes,
2380 expected_supported_etypes)
2381 else:
2382 self.assertNotIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2384 if '1' in sent_pac_options:
2385 self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2387 pac_options = self.der_decode(
2388 enc_pa_dict[PADATA_PAC_OPTIONS],
2389 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2391 self.assertElementEqual(pac_options, 'options',
2392 sent_pac_options)
2393 else:
2394 self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2395 else:
2396 self.assertElementEqual(encpart_private,
2397 'encrypted-pa-data',
2400 if ticket_session_key is not None and encpart_session_key is not None:
2401 self.assertEqual(ticket_session_key.etype,
2402 encpart_session_key.etype)
2403 self.assertEqual(ticket_session_key.key.contents,
2404 encpart_session_key.key.contents)
2405 if encpart_session_key is not None:
2406 session_key = encpart_session_key
2407 else:
2408 session_key = ticket_session_key
2409 ticket_creds = KerberosTicketCreds(
2410 ticket,
2411 session_key,
2412 crealm=expected_crealm,
2413 cname=expected_cname,
2414 srealm=expected_srealm,
2415 sname=expected_sname,
2416 decryption_key=ticket_decryption_key,
2417 ticket_private=ticket_private,
2418 encpart_private=encpart_private)
2420 if ticket_decryption_key is not None:
2421 self.verify_ticket(ticket_creds, krbtgt_key, expect_pac=expect_pac)
2423 kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
2425 def generic_check_kdc_error(self,
2426 kdc_exchange_dict,
2427 callback_dict,
2428 rep,
2429 inner=False):
2431 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2433 expected_anon = kdc_exchange_dict['expected_anon']
2434 expected_srealm = kdc_exchange_dict['expected_srealm']
2435 expected_sname = kdc_exchange_dict['expected_sname']
2436 expected_error_mode = kdc_exchange_dict['expected_error_mode']
2438 sent_fast = self.sent_fast(kdc_exchange_dict)
2440 fast_armor_type = kdc_exchange_dict['fast_armor_type']
2442 self.assertElementEqual(rep, 'pvno', 5)
2443 self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
2444 error_code = self.getElementValue(rep, 'error-code')
2445 self.assertIn(error_code, expected_error_mode)
2446 if self.strict_checking:
2447 self.assertElementMissing(rep, 'ctime')
2448 self.assertElementMissing(rep, 'cusec')
2449 self.assertElementPresent(rep, 'stime')
2450 self.assertElementPresent(rep, 'susec')
2451 # error-code checked above
2452 if self.strict_checking:
2453 self.assertElementMissing(rep, 'crealm')
2454 if expected_anon and not inner:
2455 expected_cname = self.PrincipalName_create(
2456 name_type=NT_WELLKNOWN,
2457 names=['WELLKNOWN', 'ANONYMOUS'])
2458 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2459 else:
2460 self.assertElementMissing(rep, 'cname')
2461 self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
2462 if sent_fast and error_code == KDC_ERR_GENERIC:
2463 self.assertElementEqualPrincipal(rep, 'sname',
2464 self.get_krbtgt_sname())
2465 else:
2466 self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
2467 self.assertElementMissing(rep, 'e-text')
2468 if (error_code == KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
2469 or (rep_msg_type == KRB_TGS_REP
2470 and not sent_fast)
2471 or (sent_fast and fast_armor_type is not None
2472 and fast_armor_type != FX_FAST_ARMOR_AP_REQUEST)
2473 or inner):
2474 self.assertElementMissing(rep, 'e-data')
2475 return rep
2476 edata = self.getElementValue(rep, 'e-data')
2477 if self.strict_checking:
2478 if error_code != KDC_ERR_GENERIC:
2479 # Predicting whether an ERR_GENERIC error contains e-data is
2480 # more complicated.
2481 self.assertIsNotNone(edata)
2482 if edata is not None:
2483 if rep_msg_type == KRB_TGS_REP and not sent_fast:
2484 rep_padata = [self.der_decode(edata,
2485 asn1Spec=krb5_asn1.PA_DATA())]
2486 else:
2487 rep_padata = self.der_decode(edata,
2488 asn1Spec=krb5_asn1.METHOD_DATA())
2489 self.assertGreater(len(rep_padata), 0)
2491 if sent_fast:
2492 self.assertEqual(1, len(rep_padata))
2493 rep_pa_dict = self.get_pa_dict(rep_padata)
2494 self.assertIn(PADATA_FX_FAST, rep_pa_dict)
2496 armor_key = kdc_exchange_dict['armor_key']
2497 self.assertIsNotNone(armor_key)
2498 fast_response = self.check_fx_fast_data(
2499 kdc_exchange_dict,
2500 rep_pa_dict[PADATA_FX_FAST],
2501 armor_key,
2502 expect_strengthen_key=False)
2504 rep_padata = fast_response['padata']
2506 etype_info2 = self.check_rep_padata(kdc_exchange_dict,
2507 callback_dict,
2508 rep_padata,
2509 error_code)
2511 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
2513 return rep
2515 def check_rep_padata(self,
2516 kdc_exchange_dict,
2517 callback_dict,
2518 rep_padata,
2519 error_code):
2520 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2522 req_body = kdc_exchange_dict['req_body']
2523 proposed_etypes = req_body['etype']
2524 client_as_etypes = kdc_exchange_dict.get('client_as_etypes', [])
2526 sent_fast = self.sent_fast(kdc_exchange_dict)
2527 sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict)
2529 if rep_msg_type == KRB_TGS_REP:
2530 self.assertTrue(sent_fast)
2532 expect_etype_info2 = ()
2533 expect_etype_info = False
2534 unexpect_etype_info = True
2535 expected_aes_type = 0
2536 expected_rc4_type = 0
2537 if kcrypto.Enctype.RC4 in proposed_etypes:
2538 expect_etype_info = True
2539 for etype in proposed_etypes:
2540 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2541 expect_etype_info = False
2542 if etype not in client_as_etypes:
2543 continue
2544 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2545 if etype > expected_aes_type:
2546 expected_aes_type = etype
2547 if etype in (kcrypto.Enctype.RC4,) and error_code != 0:
2548 unexpect_etype_info = False
2549 if etype > expected_rc4_type:
2550 expected_rc4_type = etype
2552 if expected_aes_type != 0:
2553 expect_etype_info2 += (expected_aes_type,)
2554 if expected_rc4_type != 0:
2555 expect_etype_info2 += (expected_rc4_type,)
2557 expected_patypes = ()
2558 if sent_fast and error_code != 0:
2559 expected_patypes += (PADATA_FX_ERROR,)
2560 expected_patypes += (PADATA_FX_COOKIE,)
2562 if rep_msg_type == KRB_TGS_REP:
2563 if not sent_fast and error_code != 0:
2564 expected_patypes += (PADATA_PW_SALT,)
2565 else:
2566 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2567 if ('1' in sent_pac_options
2568 and error_code not in (0, KDC_ERR_GENERIC)):
2569 expected_patypes += (PADATA_PAC_OPTIONS,)
2570 elif error_code != KDC_ERR_GENERIC:
2571 if expect_etype_info:
2572 self.assertGreater(len(expect_etype_info2), 0)
2573 expected_patypes += (PADATA_ETYPE_INFO,)
2574 if len(expect_etype_info2) != 0:
2575 expected_patypes += (PADATA_ETYPE_INFO2,)
2577 if error_code != KDC_ERR_PREAUTH_FAILED:
2578 if sent_fast:
2579 expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,)
2580 else:
2581 expected_patypes += (PADATA_ENC_TIMESTAMP,)
2583 if not sent_enc_challenge:
2584 expected_patypes += (PADATA_PK_AS_REQ,)
2585 expected_patypes += (PADATA_PK_AS_REP_19,)
2587 if (self.kdc_fast_support
2588 and not sent_fast
2589 and not sent_enc_challenge):
2590 expected_patypes += (PADATA_FX_FAST,)
2591 expected_patypes += (PADATA_FX_COOKIE,)
2593 if self.strict_checking:
2594 for i, patype in enumerate(expected_patypes):
2595 self.assertElementEqual(rep_padata[i], 'padata-type', patype)
2596 self.assertEqual(len(rep_padata), len(expected_patypes))
2598 etype_info2 = None
2599 etype_info = None
2600 enc_timestamp = None
2601 enc_challenge = None
2602 pk_as_req = None
2603 pk_as_rep19 = None
2604 fast_cookie = None
2605 fast_error = None
2606 fx_fast = None
2607 pac_options = None
2608 pw_salt = None
2609 for pa in rep_padata:
2610 patype = self.getElementValue(pa, 'padata-type')
2611 pavalue = self.getElementValue(pa, 'padata-value')
2612 if patype == PADATA_ETYPE_INFO2:
2613 self.assertIsNone(etype_info2)
2614 etype_info2 = self.der_decode(pavalue,
2615 asn1Spec=krb5_asn1.ETYPE_INFO2())
2616 continue
2617 if patype == PADATA_ETYPE_INFO:
2618 self.assertIsNone(etype_info)
2619 etype_info = self.der_decode(pavalue,
2620 asn1Spec=krb5_asn1.ETYPE_INFO())
2621 continue
2622 if patype == PADATA_ENC_TIMESTAMP:
2623 self.assertIsNone(enc_timestamp)
2624 enc_timestamp = pavalue
2625 self.assertEqual(len(enc_timestamp), 0)
2626 continue
2627 if patype == PADATA_ENCRYPTED_CHALLENGE:
2628 self.assertIsNone(enc_challenge)
2629 enc_challenge = pavalue
2630 continue
2631 if patype == PADATA_PK_AS_REQ:
2632 self.assertIsNone(pk_as_req)
2633 pk_as_req = pavalue
2634 self.assertEqual(len(pk_as_req), 0)
2635 continue
2636 if patype == PADATA_PK_AS_REP_19:
2637 self.assertIsNone(pk_as_rep19)
2638 pk_as_rep19 = pavalue
2639 self.assertEqual(len(pk_as_rep19), 0)
2640 continue
2641 if patype == PADATA_FX_COOKIE:
2642 self.assertIsNone(fast_cookie)
2643 fast_cookie = pavalue
2644 self.assertIsNotNone(fast_cookie)
2645 continue
2646 if patype == PADATA_FX_ERROR:
2647 self.assertIsNone(fast_error)
2648 fast_error = pavalue
2649 self.assertIsNotNone(fast_error)
2650 continue
2651 if patype == PADATA_FX_FAST:
2652 self.assertIsNone(fx_fast)
2653 fx_fast = pavalue
2654 self.assertEqual(len(fx_fast), 0)
2655 continue
2656 if patype == PADATA_PAC_OPTIONS:
2657 self.assertIsNone(pac_options)
2658 pac_options = self.der_decode(
2659 pavalue,
2660 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2661 continue
2662 if patype == PADATA_PW_SALT:
2663 self.assertIsNone(pw_salt)
2664 pw_salt = pavalue
2665 self.assertIsNotNone(pw_salt)
2666 continue
2668 if fast_cookie is not None:
2669 kdc_exchange_dict['fast_cookie'] = fast_cookie
2671 if fast_error is not None:
2672 fast_error = self.der_decode(fast_error,
2673 asn1Spec=krb5_asn1.KRB_ERROR())
2674 self.generic_check_kdc_error(kdc_exchange_dict,
2675 callback_dict,
2676 fast_error,
2677 inner=True)
2679 if pac_options is not None:
2680 self.assertElementEqual(pac_options, 'options', sent_pac_options)
2682 if pw_salt is not None:
2683 self.assertEqual(12, len(pw_salt))
2685 status = int.from_bytes(pw_salt[:4], 'little')
2686 flags = int.from_bytes(pw_salt[8:], 'little')
2688 expected_status = kdc_exchange_dict['expected_status']
2689 self.assertEqual(expected_status, status)
2691 self.assertEqual(3, flags)
2692 else:
2693 self.assertIsNone(kdc_exchange_dict.get('expected_status'))
2695 if enc_challenge is not None:
2696 if not sent_enc_challenge:
2697 self.assertEqual(len(enc_challenge), 0)
2698 else:
2699 armor_key = kdc_exchange_dict['armor_key']
2700 self.assertIsNotNone(armor_key)
2702 preauth_key, _ = self.get_preauth_key(kdc_exchange_dict)
2704 kdc_challenge_key = self.generate_kdc_challenge_key(
2705 armor_key, preauth_key)
2707 # Ensure that the encrypted challenge FAST factor is supported
2708 # (RFC6113 5.4.6).
2709 if self.strict_checking:
2710 self.assertNotEqual(len(enc_challenge), 0)
2711 if len(enc_challenge) != 0:
2712 encrypted_challenge = self.der_decode(
2713 enc_challenge,
2714 asn1Spec=krb5_asn1.EncryptedData())
2715 self.assertEqual(encrypted_challenge['etype'],
2716 kdc_challenge_key.etype)
2718 challenge = kdc_challenge_key.decrypt(
2719 KU_ENC_CHALLENGE_KDC,
2720 encrypted_challenge['cipher'])
2721 challenge = self.der_decode(
2722 challenge,
2723 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
2725 # Retrieve the returned timestamp.
2726 rep_patime = challenge['patimestamp']
2727 self.assertIn('pausec', challenge)
2729 # Ensure the returned time is within five minutes of the
2730 # current time.
2731 rep_time = self.get_EpochFromKerberosTime(rep_patime)
2732 current_time = time.time()
2734 self.assertLess(current_time - 300, rep_time)
2735 self.assertLess(rep_time, current_time + 300)
2737 if all(etype not in client_as_etypes or etype not in proposed_etypes
2738 for etype in (kcrypto.Enctype.AES256,
2739 kcrypto.Enctype.AES128,
2740 kcrypto.Enctype.RC4)):
2741 self.assertIsNone(etype_info2)
2742 self.assertIsNone(etype_info)
2743 if rep_msg_type == KRB_AS_REP:
2744 if self.strict_checking:
2745 if sent_fast:
2746 self.assertIsNotNone(enc_challenge)
2747 self.assertIsNone(enc_timestamp)
2748 else:
2749 self.assertIsNotNone(enc_timestamp)
2750 self.assertIsNone(enc_challenge)
2751 self.assertIsNotNone(pk_as_req)
2752 self.assertIsNotNone(pk_as_rep19)
2753 else:
2754 self.assertIsNone(enc_timestamp)
2755 self.assertIsNone(enc_challenge)
2756 self.assertIsNone(pk_as_req)
2757 self.assertIsNone(pk_as_rep19)
2758 return None
2760 if error_code != KDC_ERR_GENERIC:
2761 if self.strict_checking:
2762 self.assertIsNotNone(etype_info2)
2763 else:
2764 self.assertIsNone(etype_info2)
2765 if expect_etype_info:
2766 self.assertIsNotNone(etype_info)
2767 else:
2768 if self.strict_checking:
2769 self.assertIsNone(etype_info)
2770 if unexpect_etype_info:
2771 self.assertIsNone(etype_info)
2773 if error_code != KDC_ERR_GENERIC and self.strict_checking:
2774 self.assertGreaterEqual(len(etype_info2), 1)
2775 self.assertEqual(len(etype_info2), len(expect_etype_info2))
2776 for i in range(0, len(etype_info2)):
2777 e = self.getElementValue(etype_info2[i], 'etype')
2778 self.assertEqual(e, expect_etype_info2[i])
2779 salt = self.getElementValue(etype_info2[i], 'salt')
2780 if e == kcrypto.Enctype.RC4:
2781 self.assertIsNone(salt)
2782 else:
2783 self.assertIsNotNone(salt)
2784 expected_salt = kdc_exchange_dict['expected_salt']
2785 if expected_salt is not None:
2786 self.assertEqual(salt, expected_salt)
2787 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
2788 if self.strict_checking:
2789 self.assertIsNone(s2kparams)
2790 if etype_info is not None:
2791 self.assertEqual(len(etype_info), 1)
2792 e = self.getElementValue(etype_info[0], 'etype')
2793 self.assertEqual(e, kcrypto.Enctype.RC4)
2794 self.assertEqual(e, expect_etype_info2[0])
2795 salt = self.getElementValue(etype_info[0], 'salt')
2796 if self.strict_checking:
2797 self.assertIsNotNone(salt)
2798 self.assertEqual(len(salt), 0)
2800 if error_code not in (KDC_ERR_PREAUTH_FAILED,
2801 KDC_ERR_GENERIC):
2802 if sent_fast:
2803 self.assertIsNotNone(enc_challenge)
2804 if self.strict_checking:
2805 self.assertIsNone(enc_timestamp)
2806 else:
2807 self.assertIsNotNone(enc_timestamp)
2808 if self.strict_checking:
2809 self.assertIsNone(enc_challenge)
2810 if not sent_enc_challenge:
2811 if self.strict_checking:
2812 self.assertIsNotNone(pk_as_req)
2813 self.assertIsNotNone(pk_as_rep19)
2814 else:
2815 self.assertIsNone(pk_as_req)
2816 self.assertIsNone(pk_as_rep19)
2817 else:
2818 if self.strict_checking:
2819 self.assertIsNone(enc_timestamp)
2820 self.assertIsNone(enc_challenge)
2821 self.assertIsNone(pk_as_req)
2822 self.assertIsNone(pk_as_rep19)
2824 return etype_info2
2826 def generate_simple_fast(self,
2827 kdc_exchange_dict,
2828 _callback_dict,
2829 req_body,
2830 fast_padata,
2831 fast_armor,
2832 checksum,
2833 fast_options=''):
2834 armor_key = kdc_exchange_dict['armor_key']
2836 fast_req = self.KRB_FAST_REQ_create(fast_options,
2837 fast_padata,
2838 req_body)
2839 fast_req = self.der_encode(fast_req,
2840 asn1Spec=krb5_asn1.KrbFastReq())
2841 fast_req = self.EncryptedData_create(armor_key,
2842 KU_FAST_ENC,
2843 fast_req)
2845 fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
2846 checksum,
2847 fast_req)
2849 fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
2850 fx_fast_request = self.der_encode(
2851 fx_fast_request,
2852 asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
2854 fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
2855 fx_fast_request)
2857 return fast_padata
2859 def generate_ap_req(self,
2860 kdc_exchange_dict,
2861 _callback_dict,
2862 req_body,
2863 armor):
2864 if armor:
2865 tgt = kdc_exchange_dict['armor_tgt']
2866 authenticator_subkey = kdc_exchange_dict['armor_subkey']
2868 req_body_checksum = None
2869 else:
2870 tgt = kdc_exchange_dict['tgt']
2871 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2872 body_checksum_type = kdc_exchange_dict['body_checksum_type']
2874 req_body_blob = self.der_encode(req_body,
2875 asn1Spec=krb5_asn1.KDC_REQ_BODY())
2877 req_body_checksum = self.Checksum_create(tgt.session_key,
2878 KU_TGS_REQ_AUTH_CKSUM,
2879 req_body_blob,
2880 ctype=body_checksum_type)
2882 auth_data = kdc_exchange_dict['auth_data']
2884 subkey_obj = None
2885 if authenticator_subkey is not None:
2886 subkey_obj = authenticator_subkey.export_obj()
2887 seq_number = random.randint(0, 0xfffffffe)
2888 (ctime, cusec) = self.get_KerberosTimeWithUsec()
2889 authenticator_obj = self.Authenticator_create(
2890 crealm=tgt.crealm,
2891 cname=tgt.cname,
2892 cksum=req_body_checksum,
2893 cusec=cusec,
2894 ctime=ctime,
2895 subkey=subkey_obj,
2896 seq_number=seq_number,
2897 authorization_data=auth_data)
2898 authenticator_blob = self.der_encode(
2899 authenticator_obj,
2900 asn1Spec=krb5_asn1.Authenticator())
2902 usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
2903 authenticator = self.EncryptedData_create(tgt.session_key,
2904 usage,
2905 authenticator_blob)
2907 ap_options = krb5_asn1.APOptions('0')
2908 ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
2909 ticket=tgt.ticket,
2910 authenticator=authenticator)
2911 ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
2913 return ap_req
2915 def generate_simple_tgs_padata(self,
2916 kdc_exchange_dict,
2917 callback_dict,
2918 req_body):
2919 ap_req = self.generate_ap_req(kdc_exchange_dict,
2920 callback_dict,
2921 req_body,
2922 armor=False)
2923 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
2924 padata = [pa_tgs_req]
2926 return padata, req_body
2928 def get_preauth_key(self, kdc_exchange_dict):
2929 msg_type = kdc_exchange_dict['rep_msg_type']
2931 if msg_type == KRB_AS_REP:
2932 key = kdc_exchange_dict['preauth_key']
2933 usage = KU_AS_REP_ENC_PART
2934 else: # KRB_TGS_REP
2935 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2936 if authenticator_subkey is not None:
2937 key = authenticator_subkey
2938 usage = KU_TGS_REP_ENC_PART_SUB_KEY
2939 else:
2940 tgt = kdc_exchange_dict['tgt']
2941 key = tgt.session_key
2942 usage = KU_TGS_REP_ENC_PART_SESSION
2944 self.assertIsNotNone(key)
2946 return key, usage
2948 def generate_armor_key(self, subkey, session_key):
2949 armor_key = kcrypto.cf2(subkey.key,
2950 session_key.key,
2951 b'subkeyarmor',
2952 b'ticketarmor')
2953 armor_key = Krb5EncryptionKey(armor_key, None)
2955 return armor_key
2957 def generate_strengthen_reply_key(self, strengthen_key, reply_key):
2958 strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
2959 reply_key.key,
2960 b'strengthenkey',
2961 b'replykey')
2962 strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
2963 reply_key.kvno)
2965 return strengthen_reply_key
2967 def generate_client_challenge_key(self, armor_key, longterm_key):
2968 client_challenge_key = kcrypto.cf2(armor_key.key,
2969 longterm_key.key,
2970 b'clientchallengearmor',
2971 b'challengelongterm')
2972 client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
2974 return client_challenge_key
2976 def generate_kdc_challenge_key(self, armor_key, longterm_key):
2977 kdc_challenge_key = kcrypto.cf2(armor_key.key,
2978 longterm_key.key,
2979 b'kdcchallengearmor',
2980 b'challengelongterm')
2981 kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
2983 return kdc_challenge_key
2985 def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
2986 expected_type = expected_checksum['cksumtype']
2987 self.assertEqual(armor_key.ctype, expected_type)
2989 ticket_blob = self.der_encode(ticket,
2990 asn1Spec=krb5_asn1.Ticket())
2991 checksum = self.Checksum_create(armor_key,
2992 KU_FAST_FINISHED,
2993 ticket_blob)
2994 self.assertEqual(expected_checksum, checksum)
2996 def verify_ticket(self, ticket, krbtgt_key, expect_pac=True):
2997 # Check if the ticket is a TGT.
2998 sname = ticket.ticket['sname']
2999 is_tgt = self.is_tgs(sname)
3001 # Decrypt the ticket.
3003 key = ticket.decryption_key
3004 enc_part = ticket.ticket['enc-part']
3006 self.assertElementEqual(enc_part, 'etype', key.etype)
3007 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3009 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3010 enc_part = self.der_decode(
3011 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3013 # Fetch the authorization data from the ticket.
3014 auth_data = enc_part.get('authorization-data')
3015 if expect_pac:
3016 self.assertIsNotNone(auth_data)
3017 elif auth_data is None:
3018 return
3020 # Get a copy of the authdata with an empty PAC, and the existing PAC
3021 # (if present).
3022 empty_pac = self.get_empty_pac()
3023 auth_data, pac_data = self.replace_pac(auth_data,
3024 empty_pac,
3025 expect_pac=expect_pac)
3026 if not expect_pac:
3027 return
3029 # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
3030 # raw type to create a new PAC with zeroed signatures for
3031 # verification. This is because on Windows, the resource_groups field
3032 # is added to PAC_LOGON_INFO after the info3 field has been created,
3033 # which results in a different ordering of pointer values than Samba
3034 # (see commit 0e201ecdc53). Using the raw type avoids changing
3035 # PAC_LOGON_INFO, so verification against Windows can work. We still
3036 # need the PAC_DATA type to retrieve the actual checksums, because the
3037 # signatures in the raw type may contain padding bytes.
3038 pac = ndr_unpack(krb5pac.PAC_DATA,
3039 pac_data)
3040 raw_pac = ndr_unpack(krb5pac.PAC_DATA_RAW,
3041 pac_data)
3043 checksums = {}
3045 for pac_buffer, raw_pac_buffer in zip(pac.buffers, raw_pac.buffers):
3046 buffer_type = pac_buffer.type
3047 if buffer_type in self.pac_checksum_types:
3048 self.assertNotIn(buffer_type, checksums,
3049 f'Duplicate checksum type {buffer_type}')
3051 # Fetch the checksum and the checksum type from the PAC buffer.
3052 checksum = pac_buffer.info.signature
3053 ctype = pac_buffer.info.type
3054 if ctype & 1 << 31:
3055 ctype |= -1 << 31
3057 checksums[buffer_type] = checksum, ctype
3059 if buffer_type != krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3060 # Zero the checksum field so that we can later verify the
3061 # checksums. The ticket checksum field is not zeroed.
3063 signature = ndr_unpack(
3064 krb5pac.PAC_SIGNATURE_DATA,
3065 raw_pac_buffer.info.remaining)
3066 signature.signature = bytes(len(checksum))
3067 raw_pac_buffer.info.remaining = ndr_pack(
3068 signature)
3070 # Re-encode the PAC.
3071 pac_data = ndr_pack(raw_pac)
3073 # Verify the signatures.
3075 server_checksum, server_ctype = checksums[
3076 krb5pac.PAC_TYPE_SRV_CHECKSUM]
3077 Krb5EncryptionKey.verify_checksum(key,
3078 KU_NON_KERB_CKSUM_SALT,
3079 pac_data,
3080 server_ctype,
3081 server_checksum)
3083 kdc_checksum, kdc_ctype = checksums[
3084 krb5pac.PAC_TYPE_KDC_CHECKSUM]
3085 krbtgt_key.verify_checksum(KU_NON_KERB_CKSUM_SALT,
3086 server_checksum,
3087 kdc_ctype,
3088 kdc_checksum)
3090 if is_tgt:
3091 self.assertNotIn(krb5pac.PAC_TYPE_TICKET_CHECKSUM, checksums)
3092 else:
3093 ticket_checksum, ticket_ctype = checksums.get(
3094 krb5pac.PAC_TYPE_TICKET_CHECKSUM,
3095 (None, None))
3096 if self.strict_checking:
3097 self.assertIsNotNone(ticket_checksum)
3098 if ticket_checksum is not None:
3099 enc_part['authorization-data'] = auth_data
3100 enc_part = self.der_encode(enc_part,
3101 asn1Spec=krb5_asn1.EncTicketPart())
3103 krbtgt_key.verify_checksum(KU_NON_KERB_CKSUM_SALT,
3104 enc_part,
3105 ticket_ctype,
3106 ticket_checksum)
3108 def modified_ticket(self,
3109 ticket, *,
3110 new_ticket_key=None,
3111 modify_fn=None,
3112 modify_pac_fn=None,
3113 exclude_pac=False,
3114 update_pac_checksums=True,
3115 checksum_keys=None,
3116 include_checksums=None):
3117 if checksum_keys is None:
3118 # A dict containing a key for each checksum type to be created in
3119 # the PAC.
3120 checksum_keys = {}
3122 if include_checksums is None:
3123 # A dict containing a value for each checksum type; True if the
3124 # checksum type is to be included in the PAC, False if it is to be
3125 # excluded, or None/not present if the checksum is to be included
3126 # based on its presence in the original PAC.
3127 include_checksums = {}
3129 # Check that the values passed in by the caller make sense.
3131 self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types)
3132 self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types)
3134 if exclude_pac:
3135 self.assertIsNone(modify_pac_fn)
3137 update_pac_checksums = False
3139 if not update_pac_checksums:
3140 self.assertFalse(checksum_keys)
3141 self.assertFalse(include_checksums)
3143 expect_pac = update_pac_checksums or modify_pac_fn is not None
3145 key = ticket.decryption_key
3147 if new_ticket_key is None:
3148 # Use the same key to re-encrypt the ticket.
3149 new_ticket_key = key
3151 if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys:
3152 # If the server signature key is not present, fall back to the key
3153 # used to encrypt the ticket.
3154 checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key
3156 if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys:
3157 # If the ticket signature key is not present, fall back to the key
3158 # used for the KDC signature.
3159 kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
3160 if kdc_checksum_key is not None:
3161 checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = (
3162 kdc_checksum_key)
3164 # Decrypt the ticket.
3166 enc_part = ticket.ticket['enc-part']
3168 self.assertElementEqual(enc_part, 'etype', key.etype)
3169 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3171 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3172 enc_part = self.der_decode(
3173 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3175 # Modify the ticket here.
3176 if modify_fn is not None:
3177 enc_part = modify_fn(enc_part)
3179 auth_data = enc_part.get('authorization-data')
3180 if expect_pac:
3181 self.assertIsNotNone(auth_data)
3182 if auth_data is not None:
3183 new_pac = None
3184 if not exclude_pac:
3185 # Get a copy of the authdata with an empty PAC, and the
3186 # existing PAC (if present).
3187 empty_pac = self.get_empty_pac()
3188 empty_pac_auth_data, pac_data = self.replace_pac(auth_data,
3189 empty_pac)
3191 if expect_pac:
3192 self.assertIsNotNone(pac_data)
3193 if pac_data is not None:
3194 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
3196 # Modify the PAC here.
3197 if modify_pac_fn is not None:
3198 pac = modify_pac_fn(pac)
3200 if update_pac_checksums:
3201 # Get the enc-part with an empty PAC, which is needed
3202 # to create a ticket signature.
3203 enc_part_to_sign = enc_part.copy()
3204 enc_part_to_sign['authorization-data'] = (
3205 empty_pac_auth_data)
3206 enc_part_to_sign = self.der_encode(
3207 enc_part_to_sign,
3208 asn1Spec=krb5_asn1.EncTicketPart())
3210 self.update_pac_checksums(pac,
3211 checksum_keys,
3212 include_checksums,
3213 enc_part_to_sign)
3215 # Re-encode the PAC.
3216 pac_data = ndr_pack(pac)
3217 new_pac = self.AuthorizationData_create(AD_WIN2K_PAC,
3218 pac_data)
3220 # Replace the PAC in the authorization data and re-add it to the
3221 # ticket enc-part.
3222 auth_data, _ = self.replace_pac(auth_data, new_pac)
3223 enc_part['authorization-data'] = auth_data
3225 # Re-encrypt the ticket enc-part with the new key.
3226 enc_part_new = self.der_encode(enc_part,
3227 asn1Spec=krb5_asn1.EncTicketPart())
3228 enc_part_new = self.EncryptedData_create(new_ticket_key,
3229 KU_TICKET,
3230 enc_part_new)
3232 # Create a copy of the ticket with the new enc-part.
3233 new_ticket = ticket.ticket.copy()
3234 new_ticket['enc-part'] = enc_part_new
3236 new_ticket_creds = KerberosTicketCreds(
3237 new_ticket,
3238 session_key=ticket.session_key,
3239 crealm=ticket.crealm,
3240 cname=ticket.cname,
3241 srealm=ticket.srealm,
3242 sname=ticket.sname,
3243 decryption_key=new_ticket_key,
3244 ticket_private=enc_part,
3245 encpart_private=ticket.encpart_private)
3247 return new_ticket_creds
3249 def update_pac_checksums(self,
3250 pac,
3251 checksum_keys,
3252 include_checksums,
3253 enc_part=None):
3254 pac_buffers = pac.buffers
3255 checksum_buffers = {}
3257 # Find the relevant PAC checksum buffers.
3258 for pac_buffer in pac_buffers:
3259 buffer_type = pac_buffer.type
3260 if buffer_type in self.pac_checksum_types:
3261 self.assertNotIn(buffer_type, checksum_buffers,
3262 f'Duplicate checksum type {buffer_type}')
3264 checksum_buffers[buffer_type] = pac_buffer
3266 # Create any additional buffers that were requested but not
3267 # present. Conversely, remove any buffers that were requested to be
3268 # removed.
3269 for buffer_type in self.pac_checksum_types:
3270 if buffer_type in checksum_buffers:
3271 if include_checksums.get(buffer_type) is False:
3272 checksum_buffer = checksum_buffers.pop(buffer_type)
3274 pac.num_buffers -= 1
3275 pac_buffers.remove(checksum_buffer)
3277 elif include_checksums.get(buffer_type) is True:
3278 info = krb5pac.PAC_SIGNATURE_DATA()
3280 checksum_buffer = krb5pac.PAC_BUFFER()
3281 checksum_buffer.type = buffer_type
3282 checksum_buffer.info = info
3284 pac_buffers.append(checksum_buffer)
3285 pac.num_buffers += 1
3287 checksum_buffers[buffer_type] = checksum_buffer
3289 # Fill the relevant checksum buffers.
3290 for buffer_type, checksum_buffer in checksum_buffers.items():
3291 checksum_key = checksum_keys[buffer_type]
3292 ctype = checksum_key.ctype & ((1 << 32) - 1)
3294 if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3295 self.assertIsNotNone(enc_part)
3297 signature = checksum_key.make_checksum(
3298 KU_NON_KERB_CKSUM_SALT,
3299 enc_part)
3301 elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM:
3302 signature = Krb5EncryptionKey.make_zeroed_checksum(
3303 checksum_key)
3305 else:
3306 signature = checksum_key.make_zeroed_checksum()
3308 checksum_buffer.info.signature = signature
3309 checksum_buffer.info.type = ctype
3311 # Add the new checksum buffers to the PAC.
3312 pac.buffers = pac_buffers
3314 # Calculate the server and KDC checksums and insert them into the PAC.
3316 server_checksum_buffer = checksum_buffers.get(
3317 krb5pac.PAC_TYPE_SRV_CHECKSUM)
3318 if server_checksum_buffer is not None:
3319 server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM]
3321 pac_data = ndr_pack(pac)
3322 server_checksum = Krb5EncryptionKey.make_checksum(
3323 server_checksum_key,
3324 KU_NON_KERB_CKSUM_SALT,
3325 pac_data)
3327 server_checksum_buffer.info.signature = server_checksum
3329 kdc_checksum_buffer = checksum_buffers.get(
3330 krb5pac.PAC_TYPE_KDC_CHECKSUM)
3331 if kdc_checksum_buffer is not None:
3332 self.assertIsNotNone(server_checksum_buffer)
3334 kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM]
3336 kdc_checksum = kdc_checksum_key.make_checksum(
3337 KU_NON_KERB_CKSUM_SALT,
3338 server_checksum)
3340 kdc_checksum_buffer.info.signature = kdc_checksum
3342 def replace_pac(self, auth_data, new_pac, expect_pac=True):
3343 if new_pac is not None:
3344 self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)
3345 self.assertElementPresent(new_pac, 'ad-data')
3347 new_auth_data = []
3349 ad_relevant = None
3350 old_pac = None
3352 for authdata_elem in auth_data:
3353 if authdata_elem['ad-type'] == AD_IF_RELEVANT:
3354 ad_relevant = self.der_decode(
3355 authdata_elem['ad-data'],
3356 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3358 relevant_elems = []
3359 for relevant_elem in ad_relevant:
3360 if relevant_elem['ad-type'] == AD_WIN2K_PAC:
3361 self.assertIsNone(old_pac, 'Multiple PACs detected')
3362 old_pac = relevant_elem['ad-data']
3364 if new_pac is not None:
3365 relevant_elems.append(new_pac)
3366 else:
3367 relevant_elems.append(relevant_elem)
3368 if expect_pac:
3369 self.assertIsNotNone(old_pac, 'Expected PAC')
3371 ad_relevant = self.der_encode(
3372 relevant_elems,
3373 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3375 authdata_elem = self.AuthorizationData_create(AD_IF_RELEVANT,
3376 ad_relevant)
3378 new_auth_data.append(authdata_elem)
3380 if expect_pac:
3381 self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT')
3383 return new_auth_data, old_pac
3385 def get_krbtgt_checksum_key(self):
3386 krbtgt_creds = self.get_krbtgt_creds()
3387 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
3389 return {
3390 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
3393 def is_tgs(self, principal):
3394 name = principal['name-string'][0]
3395 return name in ('krbtgt', b'krbtgt')
3397 def get_empty_pac(self):
3398 return self.AuthorizationData_create(AD_WIN2K_PAC, bytes(1))
3400 def get_outer_pa_dict(self, kdc_exchange_dict):
3401 return self.get_pa_dict(kdc_exchange_dict['req_padata'])
3403 def get_fast_pa_dict(self, kdc_exchange_dict):
3404 req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
3406 if req_pa_dict:
3407 return req_pa_dict
3409 return self.get_outer_pa_dict(kdc_exchange_dict)
3411 def sent_fast(self, kdc_exchange_dict):
3412 outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
3414 return PADATA_FX_FAST in outer_pa_dict
3416 def sent_enc_challenge(self, kdc_exchange_dict):
3417 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
3419 return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
3421 def get_sent_pac_options(self, kdc_exchange_dict):
3422 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
3424 if PADATA_PAC_OPTIONS not in fast_pa_dict:
3425 return ''
3427 pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
3428 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
3429 pac_options = pac_options['options']
3431 # Mask out unsupported bits.
3432 pac_options, remaining = pac_options[:4], pac_options[4:]
3433 pac_options += '0' * len(remaining)
3435 return pac_options
3437 def get_krbtgt_sname(self):
3438 krbtgt_creds = self.get_krbtgt_creds()
3439 krbtgt_username = krbtgt_creds.get_username()
3440 krbtgt_realm = krbtgt_creds.get_realm()
3441 krbtgt_sname = self.PrincipalName_create(
3442 name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm])
3444 return krbtgt_sname
3446 def _test_as_exchange(self,
3447 cname,
3448 realm,
3449 sname,
3450 till,
3451 client_as_etypes,
3452 expected_error_mode,
3453 expected_crealm,
3454 expected_cname,
3455 expected_srealm,
3456 expected_sname,
3457 expected_salt,
3458 etypes,
3459 padata,
3460 kdc_options,
3461 expected_flags=None,
3462 unexpected_flags=None,
3463 expected_supported_etypes=None,
3464 preauth_key=None,
3465 ticket_decryption_key=None,
3466 pac_request=None,
3467 pac_options=None,
3468 to_rodc=False):
3470 def _generate_padata_copy(_kdc_exchange_dict,
3471 _callback_dict,
3472 req_body):
3473 return padata, req_body
3475 if not expected_error_mode:
3476 check_error_fn = None
3477 check_rep_fn = self.generic_check_kdc_rep
3478 else:
3479 check_error_fn = self.generic_check_kdc_error
3480 check_rep_fn = None
3482 if padata is not None:
3483 generate_padata_fn = _generate_padata_copy
3484 else:
3485 generate_padata_fn = None
3487 kdc_exchange_dict = self.as_exchange_dict(
3488 expected_crealm=expected_crealm,
3489 expected_cname=expected_cname,
3490 expected_srealm=expected_srealm,
3491 expected_sname=expected_sname,
3492 expected_supported_etypes=expected_supported_etypes,
3493 ticket_decryption_key=ticket_decryption_key,
3494 generate_padata_fn=generate_padata_fn,
3495 check_error_fn=check_error_fn,
3496 check_rep_fn=check_rep_fn,
3497 check_kdc_private_fn=self.generic_check_kdc_private,
3498 expected_error_mode=expected_error_mode,
3499 client_as_etypes=client_as_etypes,
3500 expected_salt=expected_salt,
3501 expected_flags=expected_flags,
3502 unexpected_flags=unexpected_flags,
3503 preauth_key=preauth_key,
3504 kdc_options=str(kdc_options),
3505 pac_request=pac_request,
3506 pac_options=pac_options,
3507 to_rodc=to_rodc)
3509 rep = self._generic_kdc_exchange(kdc_exchange_dict,
3510 cname=cname,
3511 realm=realm,
3512 sname=sname,
3513 till_time=till,
3514 etypes=etypes)
3516 return rep, kdc_exchange_dict