tests/krb5: Introduce helper method for creating invalid length checksums
[Samba.git] / python / samba / tests / krb5 / raw_testcase.py
blob6107442409fb662c43b95558c3cba2bef6b6745e
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import socket
21 import struct
22 import time
23 import datetime
24 import random
25 import binascii
26 import itertools
27 import collections
29 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
30 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
31 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
32 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
34 from pyasn1.codec.ber.encoder import BitStringEncoder
36 from samba.credentials import Credentials
37 from samba.dcerpc import krb5pac, security
38 from samba.gensec import FEATURE_SEAL
39 from samba.ndr import ndr_pack, ndr_unpack
41 import samba.tests
42 from samba.tests import TestCaseInTempDir
44 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
45 from samba.tests.krb5.rfc4120_constants import (
46 AD_IF_RELEVANT,
47 AD_WIN2K_PAC,
48 FX_FAST_ARMOR_AP_REQUEST,
49 KDC_ERR_GENERIC,
50 KDC_ERR_PREAUTH_FAILED,
51 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
52 KRB_AP_REQ,
53 KRB_AS_REP,
54 KRB_AS_REQ,
55 KRB_ERROR,
56 KRB_TGS_REP,
57 KRB_TGS_REQ,
58 KU_AP_REQ_AUTH,
59 KU_AS_REP_ENC_PART,
60 KU_ENC_CHALLENGE_KDC,
61 KU_FAST_ENC,
62 KU_FAST_FINISHED,
63 KU_FAST_REP,
64 KU_FAST_REQ_CHKSUM,
65 KU_NON_KERB_CKSUM_SALT,
66 KU_TGS_REP_ENC_PART_SESSION,
67 KU_TGS_REP_ENC_PART_SUB_KEY,
68 KU_TGS_REQ_AUTH,
69 KU_TGS_REQ_AUTH_CKSUM,
70 KU_TGS_REQ_AUTH_DAT_SESSION,
71 KU_TGS_REQ_AUTH_DAT_SUBKEY,
72 KU_TICKET,
73 NT_SRV_INST,
74 NT_WELLKNOWN,
75 PADATA_ENCRYPTED_CHALLENGE,
76 PADATA_ENC_TIMESTAMP,
77 PADATA_ETYPE_INFO,
78 PADATA_ETYPE_INFO2,
79 PADATA_FOR_USER,
80 PADATA_FX_COOKIE,
81 PADATA_FX_ERROR,
82 PADATA_FX_FAST,
83 PADATA_KDC_REQ,
84 PADATA_PAC_OPTIONS,
85 PADATA_PAC_REQUEST,
86 PADATA_PK_AS_REQ,
87 PADATA_PK_AS_REP_19,
88 PADATA_PW_SALT,
89 PADATA_SUPPORTED_ETYPES
91 import samba.tests.krb5.kcrypto as kcrypto
94 def BitStringEncoder_encodeValue32(
95 self, value, asn1Spec, encodeFun, **options):
97 # BitStrings like KDCOptions or TicketFlags should at least
98 # be 32-Bit on the wire
100 if asn1Spec is not None:
101 # TODO: try to avoid ASN.1 schema instantiation
102 value = asn1Spec.clone(value)
104 valueLength = len(value)
105 if valueLength % 8:
106 alignedValue = value << (8 - valueLength % 8)
107 else:
108 alignedValue = value
110 substrate = alignedValue.asOctets()
111 length = len(substrate)
112 # We need at least 32-Bit / 4-Bytes
113 if length < 4:
114 padding = 4 - length
115 else:
116 padding = 0
117 ret = b'\x00' + substrate + (b'\x00' * padding)
118 return ret, False, True
121 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
124 def BitString_NamedValues_prettyPrint(self, scope=0):
125 ret = "%s" % self.asBinary()
126 bits = []
127 highest_bit = 32
128 for byte in self.asNumbers():
129 for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
130 mask = 1 << bit
131 if byte & mask:
132 val = 1
133 else:
134 val = 0
135 bits.append(val)
136 if len(bits) < highest_bit:
137 for bitPosition in range(len(bits), highest_bit):
138 bits.append(0)
139 indent = " " * scope
140 delim = ": (\n%s " % indent
141 for bitPosition in range(highest_bit):
142 if bitPosition in self.prettyPrintNamedValues:
143 name = self.prettyPrintNamedValues[bitPosition]
144 elif bits[bitPosition] != 0:
145 name = "unknown-bit-%u" % bitPosition
146 else:
147 continue
148 ret += "%s%s:%u" % (delim, name, bits[bitPosition])
149 delim = ",\n%s " % indent
150 ret += "\n%s)" % indent
151 return ret
154 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
155 krb5_asn1.TicketFlagsValues.namedValues
156 krb5_asn1.TicketFlags.namedValues =\
157 krb5_asn1.TicketFlagsValues.namedValues
158 krb5_asn1.TicketFlags.prettyPrint =\
159 BitString_NamedValues_prettyPrint
160 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
161 krb5_asn1.KDCOptionsValues.namedValues
162 krb5_asn1.KDCOptions.namedValues =\
163 krb5_asn1.KDCOptionsValues.namedValues
164 krb5_asn1.KDCOptions.prettyPrint =\
165 BitString_NamedValues_prettyPrint
166 krb5_asn1.APOptions.prettyPrintNamedValues =\
167 krb5_asn1.APOptionsValues.namedValues
168 krb5_asn1.APOptions.namedValues =\
169 krb5_asn1.APOptionsValues.namedValues
170 krb5_asn1.APOptions.prettyPrint =\
171 BitString_NamedValues_prettyPrint
172 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
173 krb5_asn1.PACOptionFlagsValues.namedValues
174 krb5_asn1.PACOptionFlags.namedValues =\
175 krb5_asn1.PACOptionFlagsValues.namedValues
176 krb5_asn1.PACOptionFlags.prettyPrint =\
177 BitString_NamedValues_prettyPrint
180 def Integer_NamedValues_prettyPrint(self, scope=0):
181 intval = int(self)
182 if intval in self.prettyPrintNamedValues:
183 name = self.prettyPrintNamedValues[intval]
184 else:
185 name = "<__unknown__>"
186 ret = "%d (0x%x) %s" % (intval, intval, name)
187 return ret
190 krb5_asn1.NameType.prettyPrintNamedValues =\
191 krb5_asn1.NameTypeValues.namedValues
192 krb5_asn1.NameType.prettyPrint =\
193 Integer_NamedValues_prettyPrint
194 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
195 krb5_asn1.AuthDataTypeValues.namedValues
196 krb5_asn1.AuthDataType.prettyPrint =\
197 Integer_NamedValues_prettyPrint
198 krb5_asn1.PADataType.prettyPrintNamedValues =\
199 krb5_asn1.PADataTypeValues.namedValues
200 krb5_asn1.PADataType.prettyPrint =\
201 Integer_NamedValues_prettyPrint
202 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
203 krb5_asn1.EncryptionTypeValues.namedValues
204 krb5_asn1.EncryptionType.prettyPrint =\
205 Integer_NamedValues_prettyPrint
206 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
207 krb5_asn1.ChecksumTypeValues.namedValues
208 krb5_asn1.ChecksumType.prettyPrint =\
209 Integer_NamedValues_prettyPrint
210 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
211 krb5_asn1.KerbErrorDataTypeValues.namedValues
212 krb5_asn1.KerbErrorDataType.prettyPrint =\
213 Integer_NamedValues_prettyPrint
216 class Krb5EncryptionKey:
217 def __init__(self, key, kvno):
218 EncTypeChecksum = {
219 kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
220 kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
221 kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
223 self.key = key
224 self.etype = key.enctype
225 self.ctype = EncTypeChecksum[self.etype]
226 self.kvno = kvno
228 def encrypt(self, usage, plaintext):
229 ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
230 return ciphertext
232 def decrypt(self, usage, ciphertext):
233 plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
234 return plaintext
236 def make_zeroed_checksum(self, ctype=None):
237 if ctype is None:
238 ctype = self.ctype
240 checksum_len = kcrypto.checksum_len(ctype)
241 return bytes(checksum_len)
243 def make_checksum(self, usage, plaintext, ctype=None):
244 if ctype is None:
245 ctype = self.ctype
246 cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
247 return cksum
249 def verify_checksum(self, usage, plaintext, ctype, cksum):
250 if self.ctype != ctype:
251 raise AssertionError(f'{self.ctype} != {ctype}')
253 kcrypto.verify_checksum(ctype,
254 self.key,
255 usage,
256 plaintext,
257 cksum)
259 def export_obj(self):
260 EncryptionKey_obj = {
261 'keytype': self.etype,
262 'keyvalue': self.key.contents,
264 return EncryptionKey_obj
267 class RodcPacEncryptionKey(Krb5EncryptionKey):
268 def __init__(self, key, kvno, rodc_id=None):
269 super().__init__(key, kvno)
271 if rodc_id is None:
272 kvno = self.kvno
273 if kvno is not None:
274 kvno >>= 16
275 kvno &= (1 << 16) - 1
277 rodc_id = kvno or None
279 if rodc_id is not None:
280 self.rodc_id = rodc_id.to_bytes(2, byteorder='little')
281 else:
282 self.rodc_id = b''
284 def make_zeroed_checksum(self, ctype=None):
285 checksum = super().make_zeroed_checksum(ctype)
286 return checksum + bytes(len(self.rodc_id))
288 def make_checksum(self, usage, plaintext, ctype=None):
289 checksum = super().make_checksum(usage, plaintext, ctype)
290 return checksum + self.rodc_id
292 def verify_checksum(self, usage, plaintext, ctype, cksum):
293 if self.rodc_id:
294 cksum, cksum_rodc_id = cksum[:-2], cksum[-2:]
296 if self.rodc_id != cksum_rodc_id:
297 raise AssertionError(f'{self.rodc_id.hex()} != '
298 f'{cksum_rodc_id.hex()}')
300 super().verify_checksum(usage,
301 plaintext,
302 ctype,
303 cksum)
306 class ZeroedChecksumKey(Krb5EncryptionKey):
307 def make_checksum(self, usage, plaintext, ctype=None):
308 return self.make_zeroed_checksum(ctype)
311 class WrongLengthChecksumKey(Krb5EncryptionKey):
312 def __init__(self, key, kvno, length):
313 super().__init__(key, kvno)
315 self._length = length
317 @classmethod
318 def _adjust_to_length(cls, checksum, length):
319 diff = length - len(checksum)
320 if diff > 0:
321 checksum += bytes(diff)
322 elif diff < 0:
323 checksum = checksum[:length]
325 return checksum
327 def make_checksum(self, usage, plaintext, ctype=None):
328 checksum = super().make_checksum(usage, plaintext, ctype)
329 return self._adjust_to_length(checksum, self._length)
332 class KerberosCredentials(Credentials):
334 fast_supported_bits = (security.KERB_ENCTYPE_FAST_SUPPORTED |
335 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
336 security.KERB_ENCTYPE_CLAIMS_SUPPORTED)
338 def __init__(self):
339 super(KerberosCredentials, self).__init__()
340 all_enc_types = 0
341 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
342 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
343 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
345 self.as_supported_enctypes = all_enc_types
346 self.tgs_supported_enctypes = all_enc_types
347 self.ap_supported_enctypes = all_enc_types
349 self.kvno = None
350 self.forced_keys = {}
352 self.forced_salt = None
354 self.dn = None
356 def set_as_supported_enctypes(self, value):
357 self.as_supported_enctypes = int(value)
359 def set_tgs_supported_enctypes(self, value):
360 self.tgs_supported_enctypes = int(value)
362 def set_ap_supported_enctypes(self, value):
363 self.ap_supported_enctypes = int(value)
365 etype_map = collections.OrderedDict([
366 (kcrypto.Enctype.AES256,
367 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96),
368 (kcrypto.Enctype.AES128,
369 security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96),
370 (kcrypto.Enctype.RC4,
371 security.KERB_ENCTYPE_RC4_HMAC_MD5),
372 (kcrypto.Enctype.DES_MD5,
373 security.KERB_ENCTYPE_DES_CBC_MD5),
374 (kcrypto.Enctype.DES_CRC,
375 security.KERB_ENCTYPE_DES_CBC_CRC)
378 @classmethod
379 def etypes_to_bits(cls, etypes):
380 bits = 0
381 for etype in etypes:
382 bit = cls.etype_map[etype]
383 if bits & bit:
384 raise ValueError(f'Got duplicate etype: {etype}')
385 bits |= bit
387 return bits
389 @classmethod
390 def bits_to_etypes(cls, bits):
391 etypes = ()
392 for etype, bit in cls.etype_map.items():
393 if bit & bits:
394 bits &= ~bit
395 etypes += (etype,)
397 bits &= ~cls.fast_supported_bits
398 if bits != 0:
399 raise ValueError(f'Unsupported etype bits: {bits}')
401 return etypes
403 def get_as_krb5_etypes(self):
404 return self.bits_to_etypes(self.as_supported_enctypes)
406 def get_tgs_krb5_etypes(self):
407 return self.bits_to_etypes(self.tgs_supported_enctypes)
409 def get_ap_krb5_etypes(self):
410 return self.bits_to_etypes(self.ap_supported_enctypes)
412 def set_kvno(self, kvno):
413 # Sign-extend from 32 bits.
414 if kvno & 1 << 31:
415 kvno |= -1 << 31
416 self.kvno = kvno
418 def get_kvno(self):
419 return self.kvno
421 def set_forced_key(self, etype, hexkey):
422 etype = int(etype)
423 contents = binascii.a2b_hex(hexkey)
424 key = kcrypto.Key(etype, contents)
425 self.forced_keys[etype] = RodcPacEncryptionKey(key, self.kvno)
427 def get_forced_key(self, etype):
428 etype = int(etype)
429 return self.forced_keys.get(etype)
431 def set_forced_salt(self, salt):
432 self.forced_salt = bytes(salt)
434 def get_forced_salt(self):
435 return self.forced_salt
437 def get_salt(self):
438 if self.forced_salt is not None:
439 return self.forced_salt
441 if self.get_workstation():
442 salt_string = '%shost%s.%s' % (
443 self.get_realm().upper(),
444 self.get_username().lower().rsplit('$', 1)[0],
445 self.get_realm().lower())
446 else:
447 salt_string = self.get_realm().upper() + self.get_username()
449 return salt_string.encode('utf-8')
451 def set_dn(self, dn):
452 self.dn = dn
454 def get_dn(self):
455 return self.dn
458 class KerberosTicketCreds:
459 def __init__(self, ticket, session_key,
460 crealm=None, cname=None,
461 srealm=None, sname=None,
462 decryption_key=None,
463 ticket_private=None,
464 encpart_private=None):
465 self.ticket = ticket
466 self.session_key = session_key
467 self.crealm = crealm
468 self.cname = cname
469 self.srealm = srealm
470 self.sname = sname
471 self.decryption_key = decryption_key
472 self.ticket_private = ticket_private
473 self.encpart_private = encpart_private
476 class RawKerberosTest(TestCaseInTempDir):
477 """A raw Kerberos Test case."""
479 pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
480 krb5pac.PAC_TYPE_KDC_CHECKSUM,
481 krb5pac.PAC_TYPE_TICKET_CHECKSUM}
483 etypes_to_test = (
484 {"value": -1111, "name": "dummy", },
485 {"value": kcrypto.Enctype.AES256, "name": "aes128", },
486 {"value": kcrypto.Enctype.AES128, "name": "aes256", },
487 {"value": kcrypto.Enctype.RC4, "name": "rc4", },
490 setup_etype_test_permutations_done = False
492 @classmethod
493 def setup_etype_test_permutations(cls):
494 if cls.setup_etype_test_permutations_done:
495 return
497 res = []
499 num_idxs = len(cls.etypes_to_test)
500 permutations = []
501 for num in range(1, num_idxs + 1):
502 chunk = list(itertools.permutations(range(num_idxs), num))
503 for e in chunk:
504 el = list(e)
505 permutations.append(el)
507 for p in permutations:
508 name = None
509 etypes = ()
510 for idx in p:
511 n = cls.etypes_to_test[idx]["name"]
512 if name is None:
513 name = n
514 else:
515 name += "_%s" % n
516 etypes += (cls.etypes_to_test[idx]["value"],)
518 r = {"name": name, "etypes": etypes, }
519 res.append(r)
521 cls.etype_test_permutations = res
522 cls.setup_etype_test_permutations_done = True
524 @classmethod
525 def etype_test_permutation_name_idx(cls):
526 cls.setup_etype_test_permutations()
527 res = []
528 idx = 0
529 for e in cls.etype_test_permutations:
530 r = (e['name'], idx)
531 idx += 1
532 res.append(r)
533 return res
535 def etype_test_permutation_by_idx(self, idx):
536 e = self.etype_test_permutations[idx]
537 return (e['name'], e['etypes'])
539 @classmethod
540 def setUpClass(cls):
541 super().setUpClass()
543 cls.host = samba.tests.env_get_var_value('SERVER')
544 cls.dc_host = samba.tests.env_get_var_value('DC_SERVER')
546 # A dictionary containing credentials that have already been
547 # obtained.
548 cls.creds_dict = {}
550 cls.kdc_fast_support = False
552 def setUp(self):
553 super().setUp()
554 self.do_asn1_print = False
555 self.do_hexdump = False
557 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
558 allow_missing=True)
559 if strict_checking is None:
560 strict_checking = '1'
561 self.strict_checking = bool(int(strict_checking))
563 self.s = None
565 self.unspecified_kvno = object()
567 def tearDown(self):
568 self._disconnect("tearDown")
569 super().tearDown()
571 def _disconnect(self, reason):
572 if self.s is None:
573 return
574 self.s.close()
575 self.s = None
576 if self.do_hexdump:
577 sys.stderr.write("disconnect[%s]\n" % reason)
579 def _connect_tcp(self, host):
580 tcp_port = 88
581 try:
582 self.a = socket.getaddrinfo(host, tcp_port, socket.AF_UNSPEC,
583 socket.SOCK_STREAM, socket.SOL_TCP,
585 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
586 self.s.settimeout(10)
587 self.s.connect(self.a[0][4])
588 except socket.error:
589 self.s.close()
590 raise
591 except IOError:
592 self.s.close()
593 raise
595 def connect(self, host):
596 self.assertNotConnected()
597 self._connect_tcp(host)
598 if self.do_hexdump:
599 sys.stderr.write("connected[%s]\n" % host)
601 def env_get_var(self, varname, prefix,
602 fallback_default=True,
603 allow_missing=False):
604 val = None
605 if prefix is not None:
606 allow_missing_prefix = allow_missing or fallback_default
607 val = samba.tests.env_get_var_value(
608 '%s_%s' % (prefix, varname),
609 allow_missing=allow_missing_prefix)
610 else:
611 fallback_default = True
612 if val is None and fallback_default:
613 val = samba.tests.env_get_var_value(varname,
614 allow_missing=allow_missing)
615 return val
617 def _get_krb5_creds_from_env(self, prefix,
618 default_username=None,
619 allow_missing_password=False,
620 allow_missing_keys=True,
621 require_strongest_key=False):
622 c = KerberosCredentials()
623 c.guess()
625 domain = self.env_get_var('DOMAIN', prefix)
626 realm = self.env_get_var('REALM', prefix)
627 allow_missing_username = default_username is not None
628 username = self.env_get_var('USERNAME', prefix,
629 fallback_default=False,
630 allow_missing=allow_missing_username)
631 if username is None:
632 username = default_username
633 password = self.env_get_var('PASSWORD', prefix,
634 fallback_default=False,
635 allow_missing=allow_missing_password)
636 c.set_domain(domain)
637 c.set_realm(realm)
638 c.set_username(username)
639 if password is not None:
640 c.set_password(password)
641 as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
642 prefix, allow_missing=True)
643 if as_supported_enctypes is not None:
644 c.set_as_supported_enctypes(as_supported_enctypes)
645 tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
646 prefix, allow_missing=True)
647 if tgs_supported_enctypes is not None:
648 c.set_tgs_supported_enctypes(tgs_supported_enctypes)
649 ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
650 prefix, allow_missing=True)
651 if ap_supported_enctypes is not None:
652 c.set_ap_supported_enctypes(ap_supported_enctypes)
654 if require_strongest_key:
655 kvno_allow_missing = False
656 if password is None:
657 aes256_allow_missing = False
658 else:
659 aes256_allow_missing = True
660 else:
661 kvno_allow_missing = allow_missing_keys
662 aes256_allow_missing = allow_missing_keys
663 kvno = self.env_get_var('KVNO', prefix,
664 fallback_default=False,
665 allow_missing=kvno_allow_missing)
666 if kvno is not None:
667 c.set_kvno(kvno)
668 aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
669 fallback_default=False,
670 allow_missing=aes256_allow_missing)
671 if aes256_key is not None:
672 c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
673 aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
674 fallback_default=False,
675 allow_missing=True)
676 if aes128_key is not None:
677 c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
678 rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
679 fallback_default=False, allow_missing=True)
680 if rc4_key is not None:
681 c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
683 if not allow_missing_keys:
684 self.assertTrue(c.forced_keys,
685 'Please supply %s encryption keys '
686 'in environment' % prefix)
688 return c
690 def _get_krb5_creds(self,
691 prefix,
692 default_username=None,
693 allow_missing_password=False,
694 allow_missing_keys=True,
695 require_strongest_key=False,
696 fallback_creds_fn=None):
697 if prefix in self.creds_dict:
698 return self.creds_dict[prefix]
700 # We don't have the credentials already
701 creds = None
702 env_err = None
703 try:
704 # Try to obtain them from the environment
705 creds = self._get_krb5_creds_from_env(
706 prefix,
707 default_username=default_username,
708 allow_missing_password=allow_missing_password,
709 allow_missing_keys=allow_missing_keys,
710 require_strongest_key=require_strongest_key)
711 except Exception as err:
712 # An error occurred, so save it for later
713 env_err = err
714 else:
715 self.assertIsNotNone(creds)
716 # Save the obtained credentials
717 self.creds_dict[prefix] = creds
718 return creds
720 if fallback_creds_fn is not None:
721 try:
722 # Try to use the fallback method
723 creds = fallback_creds_fn()
724 except Exception as err:
725 print("ERROR FROM ENV: %r" % (env_err))
726 print("FALLBACK-FN: %s" % (fallback_creds_fn))
727 print("FALLBACK-ERROR: %r" % (err))
728 else:
729 self.assertIsNotNone(creds)
730 # Save the obtained credentials
731 self.creds_dict[prefix] = creds
732 return creds
734 # Both methods failed, so raise the exception from the
735 # environment method
736 raise env_err
738 def get_user_creds(self,
739 allow_missing_password=False,
740 allow_missing_keys=True):
741 c = self._get_krb5_creds(prefix=None,
742 allow_missing_password=allow_missing_password,
743 allow_missing_keys=allow_missing_keys)
744 return c
746 def get_service_creds(self,
747 allow_missing_password=False,
748 allow_missing_keys=True):
749 c = self._get_krb5_creds(prefix='SERVICE',
750 allow_missing_password=allow_missing_password,
751 allow_missing_keys=allow_missing_keys)
752 return c
754 def get_client_creds(self,
755 allow_missing_password=False,
756 allow_missing_keys=True):
757 c = self._get_krb5_creds(prefix='CLIENT',
758 allow_missing_password=allow_missing_password,
759 allow_missing_keys=allow_missing_keys)
760 return c
762 def get_server_creds(self,
763 allow_missing_password=False,
764 allow_missing_keys=True):
765 c = self._get_krb5_creds(prefix='SERVER',
766 allow_missing_password=allow_missing_password,
767 allow_missing_keys=allow_missing_keys)
768 return c
770 def get_admin_creds(self,
771 allow_missing_password=False,
772 allow_missing_keys=True):
773 c = self._get_krb5_creds(prefix='ADMIN',
774 allow_missing_password=allow_missing_password,
775 allow_missing_keys=allow_missing_keys)
776 c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
777 return c
779 def get_rodc_krbtgt_creds(self,
780 require_keys=True,
781 require_strongest_key=False):
782 if require_strongest_key:
783 self.assertTrue(require_keys)
784 c = self._get_krb5_creds(prefix='RODC_KRBTGT',
785 allow_missing_password=True,
786 allow_missing_keys=not require_keys,
787 require_strongest_key=require_strongest_key)
788 return c
790 def get_krbtgt_creds(self,
791 require_keys=True,
792 require_strongest_key=False):
793 if require_strongest_key:
794 self.assertTrue(require_keys)
795 c = self._get_krb5_creds(prefix='KRBTGT',
796 default_username='krbtgt',
797 allow_missing_password=True,
798 allow_missing_keys=not require_keys,
799 require_strongest_key=require_strongest_key)
800 return c
802 def get_anon_creds(self):
803 c = Credentials()
804 c.set_anonymous()
805 return c
807 def asn1_dump(self, name, obj, asn1_print=None):
808 if asn1_print is None:
809 asn1_print = self.do_asn1_print
810 if asn1_print:
811 if name is not None:
812 sys.stderr.write("%s:\n%s" % (name, obj))
813 else:
814 sys.stderr.write("%s" % (obj))
816 def hex_dump(self, name, blob, hexdump=None):
817 if hexdump is None:
818 hexdump = self.do_hexdump
819 if hexdump:
820 sys.stderr.write(
821 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
823 def der_decode(
824 self,
825 blob,
826 asn1Spec=None,
827 native_encode=True,
828 asn1_print=None,
829 hexdump=None):
830 if asn1Spec is not None:
831 class_name = type(asn1Spec).__name__.split(':')[0]
832 else:
833 class_name = "<None-asn1Spec>"
834 self.hex_dump(class_name, blob, hexdump=hexdump)
835 obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
836 self.asn1_dump(None, obj, asn1_print=asn1_print)
837 if native_encode:
838 obj = pyasn1_native_encode(obj)
839 return obj
841 def der_encode(
842 self,
843 obj,
844 asn1Spec=None,
845 native_decode=True,
846 asn1_print=None,
847 hexdump=None):
848 if native_decode:
849 obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
850 class_name = type(obj).__name__.split(':')[0]
851 if class_name is not None:
852 self.asn1_dump(None, obj, asn1_print=asn1_print)
853 blob = pyasn1_der_encode(obj)
854 if class_name is not None:
855 self.hex_dump(class_name, blob, hexdump=hexdump)
856 return blob
858 def send_pdu(self, req, asn1_print=None, hexdump=None):
859 try:
860 k5_pdu = self.der_encode(
861 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
862 header = struct.pack('>I', len(k5_pdu))
863 req_pdu = header
864 req_pdu += k5_pdu
865 self.hex_dump("send_pdu", header, hexdump=hexdump)
866 self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
867 while True:
868 sent = self.s.send(req_pdu, 0)
869 if sent == len(req_pdu):
870 break
871 req_pdu = req_pdu[sent:]
872 except socket.error as e:
873 self._disconnect("send_pdu: %s" % e)
874 raise
875 except IOError as e:
876 self._disconnect("send_pdu: %s" % e)
877 raise
879 def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
880 rep_pdu = None
881 try:
882 if timeout is not None:
883 self.s.settimeout(timeout)
884 rep_pdu = self.s.recv(num_recv, 0)
885 self.s.settimeout(10)
886 if len(rep_pdu) == 0:
887 self._disconnect("recv_raw: EOF")
888 return None
889 self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
890 except socket.timeout:
891 self.s.settimeout(10)
892 sys.stderr.write("recv_raw: TIMEOUT\n")
893 except socket.error as e:
894 self._disconnect("recv_raw: %s" % e)
895 raise
896 except IOError as e:
897 self._disconnect("recv_raw: %s" % e)
898 raise
899 return rep_pdu
901 def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
902 rep_pdu = None
903 rep = None
904 raw_pdu = self.recv_raw(
905 num_recv=4, hexdump=hexdump, timeout=timeout)
906 if raw_pdu is None:
907 return (None, None)
908 header = struct.unpack(">I", raw_pdu[0:4])
909 k5_len = header[0]
910 if k5_len == 0:
911 return (None, "")
912 missing = k5_len
913 rep_pdu = b''
914 while missing > 0:
915 raw_pdu = self.recv_raw(
916 num_recv=missing, hexdump=hexdump, timeout=timeout)
917 self.assertGreaterEqual(len(raw_pdu), 1)
918 rep_pdu += raw_pdu
919 missing = k5_len - len(rep_pdu)
920 k5_raw = self.der_decode(
921 rep_pdu,
922 asn1Spec=None,
923 native_encode=False,
924 asn1_print=False,
925 hexdump=False)
926 pvno = k5_raw['field-0']
927 self.assertEqual(pvno, 5)
928 msg_type = k5_raw['field-1']
929 self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
930 if msg_type == KRB_AS_REP:
931 asn1Spec = krb5_asn1.AS_REP()
932 elif msg_type == KRB_TGS_REP:
933 asn1Spec = krb5_asn1.TGS_REP()
934 elif msg_type == KRB_ERROR:
935 asn1Spec = krb5_asn1.KRB_ERROR()
936 rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
937 asn1_print=asn1_print, hexdump=False)
938 return (rep, rep_pdu)
940 def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
941 (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
942 hexdump=hexdump,
943 timeout=timeout)
944 return rep
946 def assertIsConnected(self):
947 self.assertIsNotNone(self.s, msg="Not connected")
949 def assertNotConnected(self):
950 self.assertIsNone(self.s, msg="Is connected")
952 def send_recv_transaction(
953 self,
954 req,
955 asn1_print=None,
956 hexdump=None,
957 timeout=None,
958 to_rodc=False):
959 host = self.host if to_rodc else self.dc_host
960 self.connect(host)
961 try:
962 self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
963 rep = self.recv_pdu(
964 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
965 except Exception:
966 self._disconnect("transaction failed")
967 raise
968 self._disconnect("transaction done")
969 return rep
971 def assertNoValue(self, value):
972 self.assertTrue(value.isNoValue)
974 def assertHasValue(self, value):
975 self.assertIsNotNone(value)
977 def getElementValue(self, obj, elem):
978 return obj.get(elem)
980 def assertElementMissing(self, obj, elem):
981 v = self.getElementValue(obj, elem)
982 self.assertIsNone(v)
984 def assertElementPresent(self, obj, elem, expect_empty=False):
985 v = self.getElementValue(obj, elem)
986 self.assertIsNotNone(v)
987 if self.strict_checking:
988 if isinstance(v, collections.abc.Container):
989 if expect_empty:
990 self.assertEqual(0, len(v))
991 else:
992 self.assertNotEqual(0, len(v))
994 def assertElementEqual(self, obj, elem, value):
995 v = self.getElementValue(obj, elem)
996 self.assertIsNotNone(v)
997 self.assertEqual(v, value)
999 def assertElementEqualUTF8(self, obj, elem, value):
1000 v = self.getElementValue(obj, elem)
1001 self.assertIsNotNone(v)
1002 self.assertEqual(v, bytes(value, 'utf8'))
1004 def assertPrincipalEqual(self, princ1, princ2):
1005 self.assertEqual(princ1['name-type'], princ2['name-type'])
1006 self.assertEqual(
1007 len(princ1['name-string']),
1008 len(princ2['name-string']),
1009 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1010 for idx in range(len(princ1['name-string'])):
1011 self.assertEqual(
1012 princ1['name-string'][idx],
1013 princ2['name-string'][idx],
1014 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1016 def assertElementEqualPrincipal(self, obj, elem, value):
1017 v = self.getElementValue(obj, elem)
1018 self.assertIsNotNone(v)
1019 v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
1020 self.assertPrincipalEqual(v, value)
1022 def assertElementKVNO(self, obj, elem, value):
1023 v = self.getElementValue(obj, elem)
1024 if value == "autodetect":
1025 value = v
1026 if value is not None:
1027 self.assertIsNotNone(v)
1028 # The value on the wire should never be 0
1029 self.assertNotEqual(v, 0)
1030 # unspecified_kvno means we don't know the kvno,
1031 # but want to enforce its presence
1032 if value is not self.unspecified_kvno:
1033 value = int(value)
1034 self.assertNotEqual(value, 0)
1035 self.assertEqual(v, value)
1036 else:
1037 self.assertIsNone(v)
1039 def assertElementFlags(self, obj, elem, expected, unexpected):
1040 v = self.getElementValue(obj, elem)
1041 self.assertIsNotNone(v)
1042 if expected is not None:
1043 self.assertIsInstance(expected, krb5_asn1.KDCOptions)
1044 for i, flag in enumerate(expected):
1045 if flag == 1:
1046 self.assertEqual('1', v[i],
1047 f"'{expected.namedValues[i]}' "
1048 f"expected in {v}")
1049 if unexpected is not None:
1050 self.assertIsInstance(unexpected, krb5_asn1.KDCOptions)
1051 for i, flag in enumerate(unexpected):
1052 if flag == 1:
1053 self.assertEqual('0', v[i],
1054 f"'{unexpected.namedValues[i]}' "
1055 f"unexpected in {v}")
1057 def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
1058 if epoch is None:
1059 epoch = time.time()
1060 if offset is not None:
1061 epoch = epoch + int(offset)
1062 dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
1063 return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
1065 def get_KerberosTime(self, epoch=None, offset=None):
1066 (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
1067 return s
1069 def get_EpochFromKerberosTime(self, kerberos_time):
1070 if isinstance(kerberos_time, bytes):
1071 kerberos_time = kerberos_time.decode()
1073 epoch = datetime.datetime.strptime(kerberos_time,
1074 '%Y%m%d%H%M%SZ')
1075 epoch = epoch.replace(tzinfo=datetime.timezone.utc)
1076 epoch = int(epoch.timestamp())
1078 return epoch
1080 def get_Nonce(self):
1081 nonce_min = 0x7f000000
1082 nonce_max = 0x7fffffff
1083 v = random.randint(nonce_min, nonce_max)
1084 return v
1086 def get_pa_dict(self, pa_data):
1087 pa_dict = {}
1089 if pa_data is not None:
1090 for pa in pa_data:
1091 pa_type = pa['padata-type']
1092 if pa_type in pa_dict:
1093 raise RuntimeError(f'Duplicate type {pa_type}')
1094 pa_dict[pa_type] = pa['padata-value']
1096 return pa_dict
1098 def SessionKey_create(self, etype, contents, kvno=None):
1099 key = kcrypto.Key(etype, contents)
1100 return RodcPacEncryptionKey(key, kvno)
1102 def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None):
1103 self.assertIsNotNone(pwd)
1104 self.assertIsNotNone(salt)
1105 key = kcrypto.string_to_key(etype, pwd, salt)
1106 return RodcPacEncryptionKey(key, kvno)
1108 def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
1109 e = etype_info2['etype']
1111 salt = etype_info2.get('salt')
1113 if e == kcrypto.Enctype.RC4:
1114 nthash = creds.get_nt_hash()
1115 return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
1117 password = creds.get_password()
1118 return self.PasswordKey_create(
1119 etype=e, pwd=password, salt=salt, kvno=kvno)
1121 def TicketDecryptionKey_from_creds(self, creds, etype=None):
1123 if etype is None:
1124 etypes = creds.get_tgs_krb5_etypes()
1125 if etypes:
1126 etype = etypes[0]
1127 else:
1128 etype = kcrypto.Enctype.RC4
1130 forced_key = creds.get_forced_key(etype)
1131 if forced_key is not None:
1132 return forced_key
1134 kvno = creds.get_kvno()
1136 fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
1137 "nor a password specified, " % (
1138 creds.get_username(), etype, kvno))
1140 if etype == kcrypto.Enctype.RC4:
1141 nthash = creds.get_nt_hash()
1142 self.assertIsNotNone(nthash, msg=fail_msg)
1143 return self.SessionKey_create(etype=etype,
1144 contents=nthash,
1145 kvno=kvno)
1147 password = creds.get_password()
1148 self.assertIsNotNone(password, msg=fail_msg)
1149 salt = creds.get_salt()
1150 return self.PasswordKey_create(etype=etype,
1151 pwd=password,
1152 salt=salt,
1153 kvno=kvno)
1155 def RandomKey(self, etype):
1156 e = kcrypto._get_enctype_profile(etype)
1157 contents = samba.generate_random_bytes(e.keysize)
1158 return self.SessionKey_create(etype=etype, contents=contents)
1160 def EncryptionKey_import(self, EncryptionKey_obj):
1161 return self.SessionKey_create(EncryptionKey_obj['keytype'],
1162 EncryptionKey_obj['keyvalue'])
1164 def EncryptedData_create(self, key, usage, plaintext):
1165 # EncryptedData ::= SEQUENCE {
1166 # etype [0] Int32 -- EncryptionType --,
1167 # kvno [1] Int32 OPTIONAL,
1168 # cipher [2] OCTET STRING -- ciphertext
1170 ciphertext = key.encrypt(usage, plaintext)
1171 EncryptedData_obj = {
1172 'etype': key.etype,
1173 'cipher': ciphertext
1175 if key.kvno is not None:
1176 EncryptedData_obj['kvno'] = key.kvno
1177 return EncryptedData_obj
1179 def Checksum_create(self, key, usage, plaintext, ctype=None):
1180 # Checksum ::= SEQUENCE {
1181 # cksumtype [0] Int32,
1182 # checksum [1] OCTET STRING
1184 if ctype is None:
1185 ctype = key.ctype
1186 checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1187 Checksum_obj = {
1188 'cksumtype': ctype,
1189 'checksum': checksum,
1191 return Checksum_obj
1193 @classmethod
1194 def PrincipalName_create(cls, name_type, names):
1195 # PrincipalName ::= SEQUENCE {
1196 # name-type [0] Int32,
1197 # name-string [1] SEQUENCE OF KerberosString
1199 PrincipalName_obj = {
1200 'name-type': name_type,
1201 'name-string': names,
1203 return PrincipalName_obj
1205 def AuthorizationData_create(self, ad_type, ad_data):
1206 # AuthorizationData ::= SEQUENCE {
1207 # ad-type [0] Int32,
1208 # ad-data [1] OCTET STRING
1210 AUTH_DATA_obj = {
1211 'ad-type': ad_type,
1212 'ad-data': ad_data
1214 return AUTH_DATA_obj
1216 def PA_DATA_create(self, padata_type, padata_value):
1217 # PA-DATA ::= SEQUENCE {
1218 # -- NOTE: first tag is [1], not [0]
1219 # padata-type [1] Int32,
1220 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1222 PA_DATA_obj = {
1223 'padata-type': padata_type,
1224 'padata-value': padata_value,
1226 return PA_DATA_obj
1228 def PA_ENC_TS_ENC_create(self, ts, usec):
1229 # PA-ENC-TS-ENC ::= SEQUENCE {
1230 # patimestamp[0] KerberosTime, -- client's time
1231 # pausec[1] krb5int32 OPTIONAL
1233 PA_ENC_TS_ENC_obj = {
1234 'patimestamp': ts,
1235 'pausec': usec,
1237 return PA_ENC_TS_ENC_obj
1239 def PA_PAC_OPTIONS_create(self, options):
1240 # PA-PAC-OPTIONS ::= SEQUENCE {
1241 # options [0] PACOptionFlags
1243 PA_PAC_OPTIONS_obj = {
1244 'options': options
1246 return PA_PAC_OPTIONS_obj
1248 def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1249 # KrbFastArmor ::= SEQUENCE {
1250 # armor-type [0] Int32,
1251 # armor-value [1] OCTET STRING,
1252 # ...
1254 KRB_FAST_ARMOR_obj = {
1255 'armor-type': armor_type,
1256 'armor-value': armor_value
1258 return KRB_FAST_ARMOR_obj
1260 def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1261 # KrbFastReq ::= SEQUENCE {
1262 # fast-options [0] FastOptions,
1263 # padata [1] SEQUENCE OF PA-DATA,
1264 # req-body [2] KDC-REQ-BODY,
1265 # ...
1267 KRB_FAST_REQ_obj = {
1268 'fast-options': fast_options,
1269 'padata': padata,
1270 'req-body': req_body
1272 return KRB_FAST_REQ_obj
1274 def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1275 # KrbFastArmoredReq ::= SEQUENCE {
1276 # armor [0] KrbFastArmor OPTIONAL,
1277 # req-checksum [1] Checksum,
1278 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1280 KRB_FAST_ARMORED_REQ_obj = {
1281 'req-checksum': req_checksum,
1282 'enc-fast-req': enc_fast_req
1284 if armor is not None:
1285 KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1286 return KRB_FAST_ARMORED_REQ_obj
1288 def PA_FX_FAST_REQUEST_create(self, armored_data):
1289 # PA-FX-FAST-REQUEST ::= CHOICE {
1290 # armored-data [0] KrbFastArmoredReq,
1291 # ...
1293 PA_FX_FAST_REQUEST_obj = {
1294 'armored-data': armored_data
1296 return PA_FX_FAST_REQUEST_obj
1298 def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1299 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1300 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1301 # -- include PAC.
1302 # --If FALSE, and PAC present,
1303 # -- remove PAC.
1305 KERB_PA_PAC_REQUEST_obj = {
1306 'include-pac': include_pac,
1308 if not pa_data_create:
1309 return KERB_PA_PAC_REQUEST_obj
1310 pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1311 asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1312 pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1313 return pa_data
1315 def get_pa_pac_options(self, options):
1316 pac_options = self.PA_PAC_OPTIONS_create(options)
1317 pac_options = self.der_encode(pac_options,
1318 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
1319 pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options)
1321 return pac_options
1323 def KDC_REQ_BODY_create(self,
1324 kdc_options,
1325 cname,
1326 realm,
1327 sname,
1328 from_time,
1329 till_time,
1330 renew_time,
1331 nonce,
1332 etypes,
1333 addresses,
1334 additional_tickets,
1335 EncAuthorizationData,
1336 EncAuthorizationData_key,
1337 EncAuthorizationData_usage,
1338 asn1_print=None,
1339 hexdump=None):
1340 # KDC-REQ-BODY ::= SEQUENCE {
1341 # kdc-options [0] KDCOptions,
1342 # cname [1] PrincipalName OPTIONAL
1343 # -- Used only in AS-REQ --,
1344 # realm [2] Realm
1345 # -- Server's realm
1346 # -- Also client's in AS-REQ --,
1347 # sname [3] PrincipalName OPTIONAL,
1348 # from [4] KerberosTime OPTIONAL,
1349 # till [5] KerberosTime,
1350 # rtime [6] KerberosTime OPTIONAL,
1351 # nonce [7] UInt32,
1352 # etype [8] SEQUENCE OF Int32
1353 # -- EncryptionType
1354 # -- in preference order --,
1355 # addresses [9] HostAddresses OPTIONAL,
1356 # enc-authorization-data [10] EncryptedData OPTIONAL
1357 # -- AuthorizationData --,
1358 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1359 # -- NOTE: not empty
1361 if EncAuthorizationData is not None:
1362 enc_ad_plain = self.der_encode(
1363 EncAuthorizationData,
1364 asn1Spec=krb5_asn1.AuthorizationData(),
1365 asn1_print=asn1_print,
1366 hexdump=hexdump)
1367 enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1368 EncAuthorizationData_usage,
1369 enc_ad_plain)
1370 else:
1371 enc_ad = None
1372 KDC_REQ_BODY_obj = {
1373 'kdc-options': kdc_options,
1374 'realm': realm,
1375 'till': till_time,
1376 'nonce': nonce,
1377 'etype': etypes,
1379 if cname is not None:
1380 KDC_REQ_BODY_obj['cname'] = cname
1381 if sname is not None:
1382 KDC_REQ_BODY_obj['sname'] = sname
1383 if from_time is not None:
1384 KDC_REQ_BODY_obj['from'] = from_time
1385 if renew_time is not None:
1386 KDC_REQ_BODY_obj['rtime'] = renew_time
1387 if addresses is not None:
1388 KDC_REQ_BODY_obj['addresses'] = addresses
1389 if enc_ad is not None:
1390 KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1391 if additional_tickets is not None:
1392 KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1393 return KDC_REQ_BODY_obj
1395 def KDC_REQ_create(self,
1396 msg_type,
1397 padata,
1398 req_body,
1399 asn1Spec=None,
1400 asn1_print=None,
1401 hexdump=None):
1402 # KDC-REQ ::= SEQUENCE {
1403 # -- NOTE: first tag is [1], not [0]
1404 # pvno [1] INTEGER (5) ,
1405 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1406 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1407 # -- NOTE: not empty --,
1408 # req-body [4] KDC-REQ-BODY
1411 KDC_REQ_obj = {
1412 'pvno': 5,
1413 'msg-type': msg_type,
1414 'req-body': req_body,
1416 if padata is not None:
1417 KDC_REQ_obj['padata'] = padata
1418 if asn1Spec is not None:
1419 KDC_REQ_decoded = pyasn1_native_decode(
1420 KDC_REQ_obj, asn1Spec=asn1Spec)
1421 else:
1422 KDC_REQ_decoded = None
1423 return KDC_REQ_obj, KDC_REQ_decoded
1425 def AS_REQ_create(self,
1426 padata, # optional
1427 kdc_options, # required
1428 cname, # optional
1429 realm, # required
1430 sname, # optional
1431 from_time, # optional
1432 till_time, # required
1433 renew_time, # optional
1434 nonce, # required
1435 etypes, # required
1436 addresses, # optional
1437 additional_tickets,
1438 native_decoded_only=True,
1439 asn1_print=None,
1440 hexdump=None):
1441 # KDC-REQ ::= SEQUENCE {
1442 # -- NOTE: first tag is [1], not [0]
1443 # pvno [1] INTEGER (5) ,
1444 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1445 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1446 # -- NOTE: not empty --,
1447 # req-body [4] KDC-REQ-BODY
1450 # KDC-REQ-BODY ::= SEQUENCE {
1451 # kdc-options [0] KDCOptions,
1452 # cname [1] PrincipalName OPTIONAL
1453 # -- Used only in AS-REQ --,
1454 # realm [2] Realm
1455 # -- Server's realm
1456 # -- Also client's in AS-REQ --,
1457 # sname [3] PrincipalName OPTIONAL,
1458 # from [4] KerberosTime OPTIONAL,
1459 # till [5] KerberosTime,
1460 # rtime [6] KerberosTime OPTIONAL,
1461 # nonce [7] UInt32,
1462 # etype [8] SEQUENCE OF Int32
1463 # -- EncryptionType
1464 # -- in preference order --,
1465 # addresses [9] HostAddresses OPTIONAL,
1466 # enc-authorization-data [10] EncryptedData OPTIONAL
1467 # -- AuthorizationData --,
1468 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1469 # -- NOTE: not empty
1471 KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1472 kdc_options,
1473 cname,
1474 realm,
1475 sname,
1476 from_time,
1477 till_time,
1478 renew_time,
1479 nonce,
1480 etypes,
1481 addresses,
1482 additional_tickets,
1483 EncAuthorizationData=None,
1484 EncAuthorizationData_key=None,
1485 EncAuthorizationData_usage=None,
1486 asn1_print=asn1_print,
1487 hexdump=hexdump)
1488 obj, decoded = self.KDC_REQ_create(
1489 msg_type=KRB_AS_REQ,
1490 padata=padata,
1491 req_body=KDC_REQ_BODY_obj,
1492 asn1Spec=krb5_asn1.AS_REQ(),
1493 asn1_print=asn1_print,
1494 hexdump=hexdump)
1495 if native_decoded_only:
1496 return decoded
1497 return decoded, obj
1499 def AP_REQ_create(self, ap_options, ticket, authenticator):
1500 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1501 # pvno [0] INTEGER (5),
1502 # msg-type [1] INTEGER (14),
1503 # ap-options [2] APOptions,
1504 # ticket [3] Ticket,
1505 # authenticator [4] EncryptedData -- Authenticator
1507 AP_REQ_obj = {
1508 'pvno': 5,
1509 'msg-type': KRB_AP_REQ,
1510 'ap-options': ap_options,
1511 'ticket': ticket,
1512 'authenticator': authenticator,
1514 return AP_REQ_obj
1516 def Authenticator_create(
1517 self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1518 authorization_data):
1519 # -- Unencrypted authenticator
1520 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1521 # authenticator-vno [0] INTEGER (5),
1522 # crealm [1] Realm,
1523 # cname [2] PrincipalName,
1524 # cksum [3] Checksum OPTIONAL,
1525 # cusec [4] Microseconds,
1526 # ctime [5] KerberosTime,
1527 # subkey [6] EncryptionKey OPTIONAL,
1528 # seq-number [7] UInt32 OPTIONAL,
1529 # authorization-data [8] AuthorizationData OPTIONAL
1531 Authenticator_obj = {
1532 'authenticator-vno': 5,
1533 'crealm': crealm,
1534 'cname': cname,
1535 'cusec': cusec,
1536 'ctime': ctime,
1538 if cksum is not None:
1539 Authenticator_obj['cksum'] = cksum
1540 if subkey is not None:
1541 Authenticator_obj['subkey'] = subkey
1542 if seq_number is not None:
1543 Authenticator_obj['seq-number'] = seq_number
1544 if authorization_data is not None:
1545 Authenticator_obj['authorization-data'] = authorization_data
1546 return Authenticator_obj
1548 def TGS_REQ_create(self,
1549 padata, # optional
1550 cusec,
1551 ctime,
1552 ticket,
1553 kdc_options, # required
1554 cname, # optional
1555 realm, # required
1556 sname, # optional
1557 from_time, # optional
1558 till_time, # required
1559 renew_time, # optional
1560 nonce, # required
1561 etypes, # required
1562 addresses, # optional
1563 EncAuthorizationData,
1564 EncAuthorizationData_key,
1565 additional_tickets,
1566 ticket_session_key,
1567 authenticator_subkey=None,
1568 body_checksum_type=None,
1569 native_decoded_only=True,
1570 asn1_print=None,
1571 hexdump=None):
1572 # KDC-REQ ::= SEQUENCE {
1573 # -- NOTE: first tag is [1], not [0]
1574 # pvno [1] INTEGER (5) ,
1575 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1576 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1577 # -- NOTE: not empty --,
1578 # req-body [4] KDC-REQ-BODY
1581 # KDC-REQ-BODY ::= SEQUENCE {
1582 # kdc-options [0] KDCOptions,
1583 # cname [1] PrincipalName OPTIONAL
1584 # -- Used only in AS-REQ --,
1585 # realm [2] Realm
1586 # -- Server's realm
1587 # -- Also client's in AS-REQ --,
1588 # sname [3] PrincipalName OPTIONAL,
1589 # from [4] KerberosTime OPTIONAL,
1590 # till [5] KerberosTime,
1591 # rtime [6] KerberosTime OPTIONAL,
1592 # nonce [7] UInt32,
1593 # etype [8] SEQUENCE OF Int32
1594 # -- EncryptionType
1595 # -- in preference order --,
1596 # addresses [9] HostAddresses OPTIONAL,
1597 # enc-authorization-data [10] EncryptedData OPTIONAL
1598 # -- AuthorizationData --,
1599 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1600 # -- NOTE: not empty
1603 if authenticator_subkey is not None:
1604 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1605 else:
1606 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1608 req_body = self.KDC_REQ_BODY_create(
1609 kdc_options=kdc_options,
1610 cname=None,
1611 realm=realm,
1612 sname=sname,
1613 from_time=from_time,
1614 till_time=till_time,
1615 renew_time=renew_time,
1616 nonce=nonce,
1617 etypes=etypes,
1618 addresses=addresses,
1619 additional_tickets=additional_tickets,
1620 EncAuthorizationData=EncAuthorizationData,
1621 EncAuthorizationData_key=EncAuthorizationData_key,
1622 EncAuthorizationData_usage=EncAuthorizationData_usage)
1623 req_body_blob = self.der_encode(req_body,
1624 asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1625 asn1_print=asn1_print, hexdump=hexdump)
1627 req_body_checksum = self.Checksum_create(ticket_session_key,
1628 KU_TGS_REQ_AUTH_CKSUM,
1629 req_body_blob,
1630 ctype=body_checksum_type)
1632 subkey_obj = None
1633 if authenticator_subkey is not None:
1634 subkey_obj = authenticator_subkey.export_obj()
1635 seq_number = random.randint(0, 0xfffffffe)
1636 authenticator = self.Authenticator_create(
1637 crealm=realm,
1638 cname=cname,
1639 cksum=req_body_checksum,
1640 cusec=cusec,
1641 ctime=ctime,
1642 subkey=subkey_obj,
1643 seq_number=seq_number,
1644 authorization_data=None)
1645 authenticator = self.der_encode(
1646 authenticator,
1647 asn1Spec=krb5_asn1.Authenticator(),
1648 asn1_print=asn1_print,
1649 hexdump=hexdump)
1651 authenticator = self.EncryptedData_create(
1652 ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1654 ap_options = krb5_asn1.APOptions('0')
1655 ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1656 ticket=ticket,
1657 authenticator=authenticator)
1658 ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1659 asn1_print=asn1_print, hexdump=hexdump)
1660 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1661 if padata is not None:
1662 padata.append(pa_tgs_req)
1663 else:
1664 padata = [pa_tgs_req]
1666 obj, decoded = self.KDC_REQ_create(
1667 msg_type=KRB_TGS_REQ,
1668 padata=padata,
1669 req_body=req_body,
1670 asn1Spec=krb5_asn1.TGS_REQ(),
1671 asn1_print=asn1_print,
1672 hexdump=hexdump)
1673 if native_decoded_only:
1674 return decoded
1675 return decoded, obj
1677 def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1678 # PA-S4U2Self ::= SEQUENCE {
1679 # name [0] PrincipalName,
1680 # realm [1] Realm,
1681 # cksum [2] Checksum,
1682 # auth [3] GeneralString
1684 cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1685 for n in name['name-string']:
1686 cksum_data += n.encode()
1687 cksum_data += realm.encode()
1688 cksum_data += "Kerberos".encode()
1689 cksum = self.Checksum_create(tgt_session_key,
1690 KU_NON_KERB_CKSUM_SALT,
1691 cksum_data,
1692 ctype)
1694 PA_S4U2Self_obj = {
1695 'name': name,
1696 'realm': realm,
1697 'cksum': cksum,
1698 'auth': "Kerberos",
1700 pa_s4u2self = self.der_encode(
1701 PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
1702 return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
1704 def _generic_kdc_exchange(self,
1705 kdc_exchange_dict, # required
1706 cname=None, # optional
1707 realm=None, # required
1708 sname=None, # optional
1709 from_time=None, # optional
1710 till_time=None, # required
1711 renew_time=None, # optional
1712 etypes=None, # required
1713 addresses=None, # optional
1714 additional_tickets=None, # optional
1715 EncAuthorizationData=None, # optional
1716 EncAuthorizationData_key=None, # optional
1717 EncAuthorizationData_usage=None): # optional
1719 check_error_fn = kdc_exchange_dict['check_error_fn']
1720 check_rep_fn = kdc_exchange_dict['check_rep_fn']
1721 generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
1722 generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
1723 generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
1724 generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
1725 callback_dict = kdc_exchange_dict['callback_dict']
1726 req_msg_type = kdc_exchange_dict['req_msg_type']
1727 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
1728 rep_msg_type = kdc_exchange_dict['rep_msg_type']
1730 expected_error_mode = kdc_exchange_dict['expected_error_mode']
1731 kdc_options = kdc_exchange_dict['kdc_options']
1733 pac_request = kdc_exchange_dict['pac_request']
1734 pac_options = kdc_exchange_dict['pac_options']
1736 # Parameters specific to the inner request body
1737 inner_req = kdc_exchange_dict['inner_req']
1739 # Parameters specific to the outer request body
1740 outer_req = kdc_exchange_dict['outer_req']
1742 if till_time is None:
1743 till_time = self.get_KerberosTime(offset=36000)
1745 if 'nonce' in kdc_exchange_dict:
1746 nonce = kdc_exchange_dict['nonce']
1747 else:
1748 nonce = self.get_Nonce()
1749 kdc_exchange_dict['nonce'] = nonce
1751 req_body = self.KDC_REQ_BODY_create(
1752 kdc_options=kdc_options,
1753 cname=cname,
1754 realm=realm,
1755 sname=sname,
1756 from_time=from_time,
1757 till_time=till_time,
1758 renew_time=renew_time,
1759 nonce=nonce,
1760 etypes=etypes,
1761 addresses=addresses,
1762 additional_tickets=additional_tickets,
1763 EncAuthorizationData=EncAuthorizationData,
1764 EncAuthorizationData_key=EncAuthorizationData_key,
1765 EncAuthorizationData_usage=EncAuthorizationData_usage)
1767 inner_req_body = dict(req_body)
1768 if inner_req is not None:
1769 for key, value in inner_req.items():
1770 if value is not None:
1771 inner_req_body[key] = value
1772 else:
1773 del inner_req_body[key]
1774 if outer_req is not None:
1775 for key, value in outer_req.items():
1776 if value is not None:
1777 req_body[key] = value
1778 else:
1779 del req_body[key]
1781 additional_padata = []
1782 if pac_request is not None:
1783 pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request)
1784 additional_padata.append(pa_pac_request)
1785 if pac_options is not None:
1786 pa_pac_options = self.get_pa_pac_options(pac_options)
1787 additional_padata.append(pa_pac_options)
1789 if req_msg_type == KRB_AS_REQ:
1790 tgs_req = None
1791 tgs_req_padata = None
1792 else:
1793 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1795 tgs_req = self.generate_ap_req(kdc_exchange_dict,
1796 callback_dict,
1797 req_body,
1798 armor=False)
1799 tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
1801 if generate_fast_padata_fn is not None:
1802 self.assertIsNotNone(generate_fast_fn)
1803 # This can alter req_body...
1804 fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
1805 callback_dict,
1806 req_body)
1807 else:
1808 fast_padata = []
1810 if generate_fast_armor_fn is not None:
1811 self.assertIsNotNone(generate_fast_fn)
1812 fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
1813 callback_dict,
1814 req_body,
1815 armor=True)
1817 fast_armor_type = kdc_exchange_dict['fast_armor_type']
1818 fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
1819 fast_ap_req)
1820 else:
1821 fast_armor = None
1823 if generate_padata_fn is not None:
1824 # This can alter req_body...
1825 outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
1826 callback_dict,
1827 req_body)
1828 self.assertIsNotNone(outer_padata)
1829 self.assertNotIn(PADATA_KDC_REQ,
1830 [pa['padata-type'] for pa in outer_padata],
1831 'Don\'t create TGS-REQ manually')
1832 else:
1833 outer_padata = None
1835 if generate_fast_fn is not None:
1836 armor_key = kdc_exchange_dict['armor_key']
1837 self.assertIsNotNone(armor_key)
1839 if req_msg_type == KRB_AS_REQ:
1840 checksum_blob = self.der_encode(
1841 req_body,
1842 asn1Spec=krb5_asn1.KDC_REQ_BODY())
1843 else:
1844 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1845 checksum_blob = tgs_req
1847 checksum = self.Checksum_create(armor_key,
1848 KU_FAST_REQ_CHKSUM,
1849 checksum_blob)
1851 fast_padata += additional_padata
1852 fast = generate_fast_fn(kdc_exchange_dict,
1853 callback_dict,
1854 inner_req_body,
1855 fast_padata,
1856 fast_armor,
1857 checksum)
1858 else:
1859 fast = None
1861 padata = []
1863 if tgs_req_padata is not None:
1864 padata.append(tgs_req_padata)
1866 if fast is not None:
1867 padata.append(fast)
1869 if outer_padata is not None:
1870 padata += outer_padata
1872 if fast is None:
1873 padata += additional_padata
1875 if not padata:
1876 padata = None
1878 kdc_exchange_dict['req_padata'] = padata
1879 kdc_exchange_dict['fast_padata'] = fast_padata
1880 kdc_exchange_dict['req_body'] = inner_req_body
1882 req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
1883 padata=padata,
1884 req_body=req_body,
1885 asn1Spec=req_asn1Spec())
1887 to_rodc = kdc_exchange_dict['to_rodc']
1889 rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc)
1890 self.assertIsNotNone(rep)
1892 msg_type = self.getElementValue(rep, 'msg-type')
1893 self.assertIsNotNone(msg_type)
1895 expected_msg_type = None
1896 if check_error_fn is not None:
1897 expected_msg_type = KRB_ERROR
1898 self.assertIsNone(check_rep_fn)
1899 self.assertNotEqual(0, len(expected_error_mode))
1900 self.assertNotIn(0, expected_error_mode)
1901 if check_rep_fn is not None:
1902 expected_msg_type = rep_msg_type
1903 self.assertIsNone(check_error_fn)
1904 self.assertEqual(0, len(expected_error_mode))
1905 self.assertIsNotNone(expected_msg_type)
1906 self.assertEqual(msg_type, expected_msg_type)
1908 if msg_type == KRB_ERROR:
1909 return check_error_fn(kdc_exchange_dict,
1910 callback_dict,
1911 rep)
1913 return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
1915 def as_exchange_dict(self,
1916 expected_crealm=None,
1917 expected_cname=None,
1918 expected_anon=False,
1919 expected_srealm=None,
1920 expected_sname=None,
1921 expected_supported_etypes=None,
1922 expected_flags=None,
1923 unexpected_flags=None,
1924 ticket_decryption_key=None,
1925 generate_fast_fn=None,
1926 generate_fast_armor_fn=None,
1927 generate_fast_padata_fn=None,
1928 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
1929 generate_padata_fn=None,
1930 check_error_fn=None,
1931 check_rep_fn=None,
1932 check_kdc_private_fn=None,
1933 callback_dict=None,
1934 expected_error_mode=0,
1935 expected_status=None,
1936 client_as_etypes=None,
1937 expected_salt=None,
1938 authenticator_subkey=None,
1939 preauth_key=None,
1940 armor_key=None,
1941 armor_tgt=None,
1942 armor_subkey=None,
1943 auth_data=None,
1944 kdc_options='',
1945 inner_req=None,
1946 outer_req=None,
1947 pac_request=None,
1948 pac_options=None,
1949 expect_pac=True,
1950 to_rodc=False):
1951 if expected_error_mode == 0:
1952 expected_error_mode = ()
1953 elif not isinstance(expected_error_mode, collections.abc.Container):
1954 expected_error_mode = (expected_error_mode,)
1956 kdc_exchange_dict = {
1957 'req_msg_type': KRB_AS_REQ,
1958 'req_asn1Spec': krb5_asn1.AS_REQ,
1959 'rep_msg_type': KRB_AS_REP,
1960 'rep_asn1Spec': krb5_asn1.AS_REP,
1961 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
1962 'expected_crealm': expected_crealm,
1963 'expected_cname': expected_cname,
1964 'expected_anon': expected_anon,
1965 'expected_srealm': expected_srealm,
1966 'expected_sname': expected_sname,
1967 'expected_supported_etypes': expected_supported_etypes,
1968 'expected_flags': expected_flags,
1969 'unexpected_flags': unexpected_flags,
1970 'ticket_decryption_key': ticket_decryption_key,
1971 'generate_fast_fn': generate_fast_fn,
1972 'generate_fast_armor_fn': generate_fast_armor_fn,
1973 'generate_fast_padata_fn': generate_fast_padata_fn,
1974 'fast_armor_type': fast_armor_type,
1975 'generate_padata_fn': generate_padata_fn,
1976 'check_error_fn': check_error_fn,
1977 'check_rep_fn': check_rep_fn,
1978 'check_kdc_private_fn': check_kdc_private_fn,
1979 'callback_dict': callback_dict,
1980 'expected_error_mode': expected_error_mode,
1981 'expected_status': expected_status,
1982 'client_as_etypes': client_as_etypes,
1983 'expected_salt': expected_salt,
1984 'authenticator_subkey': authenticator_subkey,
1985 'preauth_key': preauth_key,
1986 'armor_key': armor_key,
1987 'armor_tgt': armor_tgt,
1988 'armor_subkey': armor_subkey,
1989 'auth_data': auth_data,
1990 'kdc_options': kdc_options,
1991 'inner_req': inner_req,
1992 'outer_req': outer_req,
1993 'pac_request': pac_request,
1994 'pac_options': pac_options,
1995 'expect_pac': expect_pac,
1996 'to_rodc': to_rodc
1998 if callback_dict is None:
1999 callback_dict = {}
2001 return kdc_exchange_dict
2003 def tgs_exchange_dict(self,
2004 expected_crealm=None,
2005 expected_cname=None,
2006 expected_anon=False,
2007 expected_srealm=None,
2008 expected_sname=None,
2009 expected_supported_etypes=None,
2010 expected_flags=None,
2011 unexpected_flags=None,
2012 ticket_decryption_key=None,
2013 generate_fast_fn=None,
2014 generate_fast_armor_fn=None,
2015 generate_fast_padata_fn=None,
2016 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2017 generate_padata_fn=None,
2018 check_error_fn=None,
2019 check_rep_fn=None,
2020 check_kdc_private_fn=None,
2021 expected_error_mode=0,
2022 expected_status=None,
2023 callback_dict=None,
2024 tgt=None,
2025 armor_key=None,
2026 armor_tgt=None,
2027 armor_subkey=None,
2028 authenticator_subkey=None,
2029 auth_data=None,
2030 body_checksum_type=None,
2031 kdc_options='',
2032 inner_req=None,
2033 outer_req=None,
2034 pac_request=None,
2035 pac_options=None,
2036 expect_pac=True,
2037 to_rodc=False):
2038 if expected_error_mode == 0:
2039 expected_error_mode = ()
2040 elif not isinstance(expected_error_mode, collections.abc.Container):
2041 expected_error_mode = (expected_error_mode,)
2043 kdc_exchange_dict = {
2044 'req_msg_type': KRB_TGS_REQ,
2045 'req_asn1Spec': krb5_asn1.TGS_REQ,
2046 'rep_msg_type': KRB_TGS_REP,
2047 'rep_asn1Spec': krb5_asn1.TGS_REP,
2048 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
2049 'expected_crealm': expected_crealm,
2050 'expected_cname': expected_cname,
2051 'expected_anon': expected_anon,
2052 'expected_srealm': expected_srealm,
2053 'expected_sname': expected_sname,
2054 'expected_supported_etypes': expected_supported_etypes,
2055 'expected_flags': expected_flags,
2056 'unexpected_flags': unexpected_flags,
2057 'ticket_decryption_key': ticket_decryption_key,
2058 'generate_fast_fn': generate_fast_fn,
2059 'generate_fast_armor_fn': generate_fast_armor_fn,
2060 'generate_fast_padata_fn': generate_fast_padata_fn,
2061 'fast_armor_type': fast_armor_type,
2062 'generate_padata_fn': generate_padata_fn,
2063 'check_error_fn': check_error_fn,
2064 'check_rep_fn': check_rep_fn,
2065 'check_kdc_private_fn': check_kdc_private_fn,
2066 'callback_dict': callback_dict,
2067 'expected_error_mode': expected_error_mode,
2068 'expected_status': expected_status,
2069 'tgt': tgt,
2070 'body_checksum_type': body_checksum_type,
2071 'armor_key': armor_key,
2072 'armor_tgt': armor_tgt,
2073 'armor_subkey': armor_subkey,
2074 'auth_data': auth_data,
2075 'authenticator_subkey': authenticator_subkey,
2076 'kdc_options': kdc_options,
2077 'inner_req': inner_req,
2078 'outer_req': outer_req,
2079 'pac_request': pac_request,
2080 'pac_options': pac_options,
2081 'expect_pac': expect_pac,
2082 'to_rodc': to_rodc
2084 if callback_dict is None:
2085 callback_dict = {}
2087 return kdc_exchange_dict
2089 def generic_check_kdc_rep(self,
2090 kdc_exchange_dict,
2091 callback_dict,
2092 rep):
2094 expected_crealm = kdc_exchange_dict['expected_crealm']
2095 expected_anon = kdc_exchange_dict['expected_anon']
2096 expected_srealm = kdc_exchange_dict['expected_srealm']
2097 expected_sname = kdc_exchange_dict['expected_sname']
2098 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2099 check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
2100 rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
2101 msg_type = kdc_exchange_dict['rep_msg_type']
2102 armor_key = kdc_exchange_dict['armor_key']
2104 self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
2105 padata = self.getElementValue(rep, 'padata')
2106 if self.strict_checking:
2107 self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
2108 if expected_anon:
2109 expected_cname = self.PrincipalName_create(
2110 name_type=NT_WELLKNOWN,
2111 names=['WELLKNOWN', 'ANONYMOUS'])
2112 else:
2113 expected_cname = kdc_exchange_dict['expected_cname']
2114 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2115 self.assertElementPresent(rep, 'ticket')
2116 ticket = self.getElementValue(rep, 'ticket')
2117 ticket_encpart = None
2118 ticket_cipher = None
2119 self.assertIsNotNone(ticket)
2120 if ticket is not None: # Never None, but gives indentation
2121 self.assertElementEqual(ticket, 'tkt-vno', 5)
2122 self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
2123 self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
2124 self.assertElementPresent(ticket, 'enc-part')
2125 ticket_encpart = self.getElementValue(ticket, 'enc-part')
2126 self.assertIsNotNone(ticket_encpart)
2127 if ticket_encpart is not None: # Never None, but gives indentation
2128 self.assertElementPresent(ticket_encpart, 'etype')
2129 # 'unspecified' means present, with any value != 0
2130 self.assertElementKVNO(ticket_encpart, 'kvno',
2131 self.unspecified_kvno)
2132 self.assertElementPresent(ticket_encpart, 'cipher')
2133 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
2134 self.assertElementPresent(rep, 'enc-part')
2135 encpart = self.getElementValue(rep, 'enc-part')
2136 encpart_cipher = None
2137 self.assertIsNotNone(encpart)
2138 if encpart is not None: # Never None, but gives indentation
2139 self.assertElementPresent(encpart, 'etype')
2140 self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
2141 self.assertElementPresent(encpart, 'cipher')
2142 encpart_cipher = self.getElementValue(encpart, 'cipher')
2144 ticket_checksum = None
2146 # Get the decryption key for the encrypted part
2147 encpart_decryption_key, encpart_decryption_usage = (
2148 self.get_preauth_key(kdc_exchange_dict))
2150 if armor_key is not None:
2151 pa_dict = self.get_pa_dict(padata)
2153 if PADATA_FX_FAST in pa_dict:
2154 fx_fast_data = pa_dict[PADATA_FX_FAST]
2155 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
2156 fx_fast_data,
2157 armor_key,
2158 finished=True)
2160 if 'strengthen-key' in fast_response:
2161 strengthen_key = self.EncryptionKey_import(
2162 fast_response['strengthen-key'])
2163 encpart_decryption_key = (
2164 self.generate_strengthen_reply_key(
2165 strengthen_key,
2166 encpart_decryption_key))
2168 fast_finished = fast_response.get('finished')
2169 if fast_finished is not None:
2170 ticket_checksum = fast_finished['ticket-checksum']
2172 self.check_rep_padata(kdc_exchange_dict,
2173 callback_dict,
2174 fast_response['padata'],
2175 error_code=0)
2177 ticket_private = None
2178 if ticket_decryption_key is not None:
2179 self.assertElementEqual(ticket_encpart, 'etype',
2180 ticket_decryption_key.etype)
2181 self.assertElementKVNO(ticket_encpart, 'kvno',
2182 ticket_decryption_key.kvno)
2183 ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
2184 ticket_cipher)
2185 ticket_private = self.der_decode(
2186 ticket_decpart,
2187 asn1Spec=krb5_asn1.EncTicketPart())
2189 encpart_private = None
2190 self.assertIsNotNone(encpart_decryption_key)
2191 if encpart_decryption_key is not None:
2192 self.assertElementEqual(encpart, 'etype',
2193 encpart_decryption_key.etype)
2194 if self.strict_checking:
2195 self.assertElementKVNO(encpart, 'kvno',
2196 encpart_decryption_key.kvno)
2197 rep_decpart = encpart_decryption_key.decrypt(
2198 encpart_decryption_usage,
2199 encpart_cipher)
2200 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2201 # application tag 26
2202 try:
2203 encpart_private = self.der_decode(
2204 rep_decpart,
2205 asn1Spec=rep_encpart_asn1Spec())
2206 except Exception:
2207 encpart_private = self.der_decode(
2208 rep_decpart,
2209 asn1Spec=krb5_asn1.EncTGSRepPart())
2211 self.assertIsNotNone(check_kdc_private_fn)
2212 if check_kdc_private_fn is not None:
2213 check_kdc_private_fn(kdc_exchange_dict, callback_dict,
2214 rep, ticket_private, encpart_private,
2215 ticket_checksum)
2217 return rep
2219 def check_fx_fast_data(self,
2220 kdc_exchange_dict,
2221 fx_fast_data,
2222 armor_key,
2223 finished=False,
2224 expect_strengthen_key=True):
2225 fx_fast_data = self.der_decode(fx_fast_data,
2226 asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
2228 enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
2229 self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
2231 fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
2233 fast_response = self.der_decode(fast_rep,
2234 asn1Spec=krb5_asn1.KrbFastResponse())
2236 if expect_strengthen_key and self.strict_checking:
2237 self.assertIn('strengthen-key', fast_response)
2239 if finished:
2240 self.assertIn('finished', fast_response)
2242 # Ensure that the nonce matches the nonce in the body of the request
2243 # (RFC6113 5.4.3).
2244 nonce = kdc_exchange_dict['nonce']
2245 self.assertEqual(nonce, fast_response['nonce'])
2247 return fast_response
2249 def generic_check_kdc_private(self,
2250 kdc_exchange_dict,
2251 callback_dict,
2252 rep,
2253 ticket_private,
2254 encpart_private,
2255 ticket_checksum):
2256 kdc_options = kdc_exchange_dict['kdc_options']
2257 canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
2258 canonicalize = (canon_pos < len(kdc_options)
2259 and kdc_options[canon_pos] == '1')
2260 renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1
2261 renewable = (renewable_pos < len(kdc_options)
2262 and kdc_options[renewable_pos] == '1')
2264 expected_crealm = kdc_exchange_dict['expected_crealm']
2265 expected_cname = kdc_exchange_dict['expected_cname']
2266 expected_srealm = kdc_exchange_dict['expected_srealm']
2267 expected_sname = kdc_exchange_dict['expected_sname']
2268 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2270 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2272 expected_flags = kdc_exchange_dict.get('expected_flags')
2273 unexpected_flags = kdc_exchange_dict.get('unexpected_flags')
2275 ticket = self.getElementValue(rep, 'ticket')
2277 if ticket_checksum is not None:
2278 armor_key = kdc_exchange_dict['armor_key']
2279 self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
2281 to_rodc = kdc_exchange_dict['to_rodc']
2282 if to_rodc:
2283 krbtgt_creds = self.get_rodc_krbtgt_creds()
2284 else:
2285 krbtgt_creds = self.get_krbtgt_creds()
2286 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
2288 expect_pac = kdc_exchange_dict['expect_pac']
2290 ticket_session_key = None
2291 if ticket_private is not None:
2292 self.assertElementFlags(ticket_private, 'flags',
2293 expected_flags,
2294 unexpected_flags)
2295 self.assertElementPresent(ticket_private, 'key')
2296 ticket_key = self.getElementValue(ticket_private, 'key')
2297 self.assertIsNotNone(ticket_key)
2298 if ticket_key is not None: # Never None, but gives indentation
2299 self.assertElementPresent(ticket_key, 'keytype')
2300 self.assertElementPresent(ticket_key, 'keyvalue')
2301 ticket_session_key = self.EncryptionKey_import(ticket_key)
2302 self.assertElementEqualUTF8(ticket_private, 'crealm',
2303 expected_crealm)
2304 if self.strict_checking:
2305 self.assertElementEqualPrincipal(ticket_private, 'cname',
2306 expected_cname)
2307 self.assertElementPresent(ticket_private, 'transited')
2308 self.assertElementPresent(ticket_private, 'authtime')
2309 if self.strict_checking:
2310 self.assertElementPresent(ticket_private, 'starttime')
2311 self.assertElementPresent(ticket_private, 'endtime')
2312 if renewable:
2313 if self.strict_checking:
2314 self.assertElementPresent(ticket_private, 'renew-till')
2315 else:
2316 self.assertElementMissing(ticket_private, 'renew-till')
2317 if self.strict_checking:
2318 self.assertElementEqual(ticket_private, 'caddr', [])
2319 self.assertElementPresent(ticket_private, 'authorization-data',
2320 expect_empty=not expect_pac)
2322 encpart_session_key = None
2323 if encpart_private is not None:
2324 self.assertElementPresent(encpart_private, 'key')
2325 encpart_key = self.getElementValue(encpart_private, 'key')
2326 self.assertIsNotNone(encpart_key)
2327 if encpart_key is not None: # Never None, but gives indentation
2328 self.assertElementPresent(encpart_key, 'keytype')
2329 self.assertElementPresent(encpart_key, 'keyvalue')
2330 encpart_session_key = self.EncryptionKey_import(encpart_key)
2331 self.assertElementPresent(encpart_private, 'last-req')
2332 self.assertElementEqual(encpart_private, 'nonce',
2333 kdc_exchange_dict['nonce'])
2334 if rep_msg_type == KRB_AS_REP:
2335 if self.strict_checking:
2336 self.assertElementPresent(encpart_private,
2337 'key-expiration')
2338 else:
2339 self.assertElementMissing(encpart_private,
2340 'key-expiration')
2341 self.assertElementFlags(encpart_private, 'flags',
2342 expected_flags,
2343 unexpected_flags)
2344 self.assertElementPresent(encpart_private, 'authtime')
2345 if self.strict_checking:
2346 self.assertElementPresent(encpart_private, 'starttime')
2347 self.assertElementPresent(encpart_private, 'endtime')
2348 if renewable:
2349 if self.strict_checking:
2350 self.assertElementPresent(encpart_private, 'renew-till')
2351 else:
2352 self.assertElementMissing(encpart_private, 'renew-till')
2353 self.assertElementEqualUTF8(encpart_private, 'srealm',
2354 expected_srealm)
2355 self.assertElementEqualPrincipal(encpart_private, 'sname',
2356 expected_sname)
2357 if self.strict_checking:
2358 self.assertElementEqual(encpart_private, 'caddr', [])
2360 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2362 if self.strict_checking:
2363 if canonicalize or '1' in sent_pac_options:
2364 self.assertElementPresent(encpart_private,
2365 'encrypted-pa-data')
2366 enc_pa_dict = self.get_pa_dict(
2367 encpart_private['encrypted-pa-data'])
2368 if canonicalize:
2369 self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2371 expected_supported_etypes = kdc_exchange_dict[
2372 'expected_supported_etypes']
2373 expected_supported_etypes |= (
2374 security.KERB_ENCTYPE_DES_CBC_CRC |
2375 security.KERB_ENCTYPE_DES_CBC_MD5 |
2376 security.KERB_ENCTYPE_RC4_HMAC_MD5)
2378 (supported_etypes,) = struct.unpack(
2379 '<L',
2380 enc_pa_dict[PADATA_SUPPORTED_ETYPES])
2382 self.assertEqual(supported_etypes,
2383 expected_supported_etypes)
2384 else:
2385 self.assertNotIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2387 if '1' in sent_pac_options:
2388 self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2390 pac_options = self.der_decode(
2391 enc_pa_dict[PADATA_PAC_OPTIONS],
2392 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2394 self.assertElementEqual(pac_options, 'options',
2395 sent_pac_options)
2396 else:
2397 self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2398 else:
2399 self.assertElementEqual(encpart_private,
2400 'encrypted-pa-data',
2403 if ticket_session_key is not None and encpart_session_key is not None:
2404 self.assertEqual(ticket_session_key.etype,
2405 encpart_session_key.etype)
2406 self.assertEqual(ticket_session_key.key.contents,
2407 encpart_session_key.key.contents)
2408 if encpart_session_key is not None:
2409 session_key = encpart_session_key
2410 else:
2411 session_key = ticket_session_key
2412 ticket_creds = KerberosTicketCreds(
2413 ticket,
2414 session_key,
2415 crealm=expected_crealm,
2416 cname=expected_cname,
2417 srealm=expected_srealm,
2418 sname=expected_sname,
2419 decryption_key=ticket_decryption_key,
2420 ticket_private=ticket_private,
2421 encpart_private=encpart_private)
2423 if ticket_decryption_key is not None:
2424 self.verify_ticket(ticket_creds, krbtgt_key, expect_pac=expect_pac)
2426 kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
2428 def generic_check_kdc_error(self,
2429 kdc_exchange_dict,
2430 callback_dict,
2431 rep,
2432 inner=False):
2434 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2436 expected_anon = kdc_exchange_dict['expected_anon']
2437 expected_srealm = kdc_exchange_dict['expected_srealm']
2438 expected_sname = kdc_exchange_dict['expected_sname']
2439 expected_error_mode = kdc_exchange_dict['expected_error_mode']
2441 sent_fast = self.sent_fast(kdc_exchange_dict)
2443 fast_armor_type = kdc_exchange_dict['fast_armor_type']
2445 self.assertElementEqual(rep, 'pvno', 5)
2446 self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
2447 error_code = self.getElementValue(rep, 'error-code')
2448 self.assertIn(error_code, expected_error_mode)
2449 if self.strict_checking:
2450 self.assertElementMissing(rep, 'ctime')
2451 self.assertElementMissing(rep, 'cusec')
2452 self.assertElementPresent(rep, 'stime')
2453 self.assertElementPresent(rep, 'susec')
2454 # error-code checked above
2455 if self.strict_checking:
2456 self.assertElementMissing(rep, 'crealm')
2457 if expected_anon and not inner:
2458 expected_cname = self.PrincipalName_create(
2459 name_type=NT_WELLKNOWN,
2460 names=['WELLKNOWN', 'ANONYMOUS'])
2461 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2462 else:
2463 self.assertElementMissing(rep, 'cname')
2464 self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
2465 if sent_fast and error_code == KDC_ERR_GENERIC:
2466 self.assertElementEqualPrincipal(rep, 'sname',
2467 self.get_krbtgt_sname())
2468 else:
2469 self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
2470 self.assertElementMissing(rep, 'e-text')
2471 if (error_code == KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
2472 or (rep_msg_type == KRB_TGS_REP
2473 and not sent_fast)
2474 or (sent_fast and fast_armor_type is not None
2475 and fast_armor_type != FX_FAST_ARMOR_AP_REQUEST)
2476 or inner):
2477 self.assertElementMissing(rep, 'e-data')
2478 return rep
2479 edata = self.getElementValue(rep, 'e-data')
2480 if self.strict_checking:
2481 if error_code != KDC_ERR_GENERIC:
2482 # Predicting whether an ERR_GENERIC error contains e-data is
2483 # more complicated.
2484 self.assertIsNotNone(edata)
2485 if edata is not None:
2486 if rep_msg_type == KRB_TGS_REP and not sent_fast:
2487 rep_padata = [self.der_decode(edata,
2488 asn1Spec=krb5_asn1.PA_DATA())]
2489 else:
2490 rep_padata = self.der_decode(edata,
2491 asn1Spec=krb5_asn1.METHOD_DATA())
2492 self.assertGreater(len(rep_padata), 0)
2494 if sent_fast:
2495 self.assertEqual(1, len(rep_padata))
2496 rep_pa_dict = self.get_pa_dict(rep_padata)
2497 self.assertIn(PADATA_FX_FAST, rep_pa_dict)
2499 armor_key = kdc_exchange_dict['armor_key']
2500 self.assertIsNotNone(armor_key)
2501 fast_response = self.check_fx_fast_data(
2502 kdc_exchange_dict,
2503 rep_pa_dict[PADATA_FX_FAST],
2504 armor_key,
2505 expect_strengthen_key=False)
2507 rep_padata = fast_response['padata']
2509 etype_info2 = self.check_rep_padata(kdc_exchange_dict,
2510 callback_dict,
2511 rep_padata,
2512 error_code)
2514 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
2516 return rep
2518 def check_rep_padata(self,
2519 kdc_exchange_dict,
2520 callback_dict,
2521 rep_padata,
2522 error_code):
2523 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2525 req_body = kdc_exchange_dict['req_body']
2526 proposed_etypes = req_body['etype']
2527 client_as_etypes = kdc_exchange_dict.get('client_as_etypes', [])
2529 sent_fast = self.sent_fast(kdc_exchange_dict)
2530 sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict)
2532 if rep_msg_type == KRB_TGS_REP:
2533 self.assertTrue(sent_fast)
2535 expect_etype_info2 = ()
2536 expect_etype_info = False
2537 unexpect_etype_info = True
2538 expected_aes_type = 0
2539 expected_rc4_type = 0
2540 if kcrypto.Enctype.RC4 in proposed_etypes:
2541 expect_etype_info = True
2542 for etype in proposed_etypes:
2543 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2544 expect_etype_info = False
2545 if etype not in client_as_etypes:
2546 continue
2547 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2548 if etype > expected_aes_type:
2549 expected_aes_type = etype
2550 if etype in (kcrypto.Enctype.RC4,) and error_code != 0:
2551 unexpect_etype_info = False
2552 if etype > expected_rc4_type:
2553 expected_rc4_type = etype
2555 if expected_aes_type != 0:
2556 expect_etype_info2 += (expected_aes_type,)
2557 if expected_rc4_type != 0:
2558 expect_etype_info2 += (expected_rc4_type,)
2560 expected_patypes = ()
2561 if sent_fast and error_code != 0:
2562 expected_patypes += (PADATA_FX_ERROR,)
2563 expected_patypes += (PADATA_FX_COOKIE,)
2565 if rep_msg_type == KRB_TGS_REP:
2566 if not sent_fast and error_code != 0:
2567 expected_patypes += (PADATA_PW_SALT,)
2568 else:
2569 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2570 if ('1' in sent_pac_options
2571 and error_code not in (0, KDC_ERR_GENERIC)):
2572 expected_patypes += (PADATA_PAC_OPTIONS,)
2573 elif error_code != KDC_ERR_GENERIC:
2574 if expect_etype_info:
2575 self.assertGreater(len(expect_etype_info2), 0)
2576 expected_patypes += (PADATA_ETYPE_INFO,)
2577 if len(expect_etype_info2) != 0:
2578 expected_patypes += (PADATA_ETYPE_INFO2,)
2580 if error_code != KDC_ERR_PREAUTH_FAILED:
2581 if sent_fast:
2582 expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,)
2583 else:
2584 expected_patypes += (PADATA_ENC_TIMESTAMP,)
2586 if not sent_enc_challenge:
2587 expected_patypes += (PADATA_PK_AS_REQ,)
2588 expected_patypes += (PADATA_PK_AS_REP_19,)
2590 if (self.kdc_fast_support
2591 and not sent_fast
2592 and not sent_enc_challenge):
2593 expected_patypes += (PADATA_FX_FAST,)
2594 expected_patypes += (PADATA_FX_COOKIE,)
2596 if self.strict_checking:
2597 for i, patype in enumerate(expected_patypes):
2598 self.assertElementEqual(rep_padata[i], 'padata-type', patype)
2599 self.assertEqual(len(rep_padata), len(expected_patypes))
2601 etype_info2 = None
2602 etype_info = None
2603 enc_timestamp = None
2604 enc_challenge = None
2605 pk_as_req = None
2606 pk_as_rep19 = None
2607 fast_cookie = None
2608 fast_error = None
2609 fx_fast = None
2610 pac_options = None
2611 pw_salt = None
2612 for pa in rep_padata:
2613 patype = self.getElementValue(pa, 'padata-type')
2614 pavalue = self.getElementValue(pa, 'padata-value')
2615 if patype == PADATA_ETYPE_INFO2:
2616 self.assertIsNone(etype_info2)
2617 etype_info2 = self.der_decode(pavalue,
2618 asn1Spec=krb5_asn1.ETYPE_INFO2())
2619 continue
2620 if patype == PADATA_ETYPE_INFO:
2621 self.assertIsNone(etype_info)
2622 etype_info = self.der_decode(pavalue,
2623 asn1Spec=krb5_asn1.ETYPE_INFO())
2624 continue
2625 if patype == PADATA_ENC_TIMESTAMP:
2626 self.assertIsNone(enc_timestamp)
2627 enc_timestamp = pavalue
2628 self.assertEqual(len(enc_timestamp), 0)
2629 continue
2630 if patype == PADATA_ENCRYPTED_CHALLENGE:
2631 self.assertIsNone(enc_challenge)
2632 enc_challenge = pavalue
2633 continue
2634 if patype == PADATA_PK_AS_REQ:
2635 self.assertIsNone(pk_as_req)
2636 pk_as_req = pavalue
2637 self.assertEqual(len(pk_as_req), 0)
2638 continue
2639 if patype == PADATA_PK_AS_REP_19:
2640 self.assertIsNone(pk_as_rep19)
2641 pk_as_rep19 = pavalue
2642 self.assertEqual(len(pk_as_rep19), 0)
2643 continue
2644 if patype == PADATA_FX_COOKIE:
2645 self.assertIsNone(fast_cookie)
2646 fast_cookie = pavalue
2647 self.assertIsNotNone(fast_cookie)
2648 continue
2649 if patype == PADATA_FX_ERROR:
2650 self.assertIsNone(fast_error)
2651 fast_error = pavalue
2652 self.assertIsNotNone(fast_error)
2653 continue
2654 if patype == PADATA_FX_FAST:
2655 self.assertIsNone(fx_fast)
2656 fx_fast = pavalue
2657 self.assertEqual(len(fx_fast), 0)
2658 continue
2659 if patype == PADATA_PAC_OPTIONS:
2660 self.assertIsNone(pac_options)
2661 pac_options = self.der_decode(
2662 pavalue,
2663 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2664 continue
2665 if patype == PADATA_PW_SALT:
2666 self.assertIsNone(pw_salt)
2667 pw_salt = pavalue
2668 self.assertIsNotNone(pw_salt)
2669 continue
2671 if fast_cookie is not None:
2672 kdc_exchange_dict['fast_cookie'] = fast_cookie
2674 if fast_error is not None:
2675 fast_error = self.der_decode(fast_error,
2676 asn1Spec=krb5_asn1.KRB_ERROR())
2677 self.generic_check_kdc_error(kdc_exchange_dict,
2678 callback_dict,
2679 fast_error,
2680 inner=True)
2682 if pac_options is not None:
2683 self.assertElementEqual(pac_options, 'options', sent_pac_options)
2685 if pw_salt is not None:
2686 self.assertEqual(12, len(pw_salt))
2688 status = int.from_bytes(pw_salt[:4], 'little')
2689 flags = int.from_bytes(pw_salt[8:], 'little')
2691 expected_status = kdc_exchange_dict['expected_status']
2692 self.assertEqual(expected_status, status)
2694 self.assertEqual(3, flags)
2695 else:
2696 self.assertIsNone(kdc_exchange_dict.get('expected_status'))
2698 if enc_challenge is not None:
2699 if not sent_enc_challenge:
2700 self.assertEqual(len(enc_challenge), 0)
2701 else:
2702 armor_key = kdc_exchange_dict['armor_key']
2703 self.assertIsNotNone(armor_key)
2705 preauth_key, _ = self.get_preauth_key(kdc_exchange_dict)
2707 kdc_challenge_key = self.generate_kdc_challenge_key(
2708 armor_key, preauth_key)
2710 # Ensure that the encrypted challenge FAST factor is supported
2711 # (RFC6113 5.4.6).
2712 if self.strict_checking:
2713 self.assertNotEqual(len(enc_challenge), 0)
2714 if len(enc_challenge) != 0:
2715 encrypted_challenge = self.der_decode(
2716 enc_challenge,
2717 asn1Spec=krb5_asn1.EncryptedData())
2718 self.assertEqual(encrypted_challenge['etype'],
2719 kdc_challenge_key.etype)
2721 challenge = kdc_challenge_key.decrypt(
2722 KU_ENC_CHALLENGE_KDC,
2723 encrypted_challenge['cipher'])
2724 challenge = self.der_decode(
2725 challenge,
2726 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
2728 # Retrieve the returned timestamp.
2729 rep_patime = challenge['patimestamp']
2730 self.assertIn('pausec', challenge)
2732 # Ensure the returned time is within five minutes of the
2733 # current time.
2734 rep_time = self.get_EpochFromKerberosTime(rep_patime)
2735 current_time = time.time()
2737 self.assertLess(current_time - 300, rep_time)
2738 self.assertLess(rep_time, current_time + 300)
2740 if all(etype not in client_as_etypes or etype not in proposed_etypes
2741 for etype in (kcrypto.Enctype.AES256,
2742 kcrypto.Enctype.AES128,
2743 kcrypto.Enctype.RC4)):
2744 self.assertIsNone(etype_info2)
2745 self.assertIsNone(etype_info)
2746 if rep_msg_type == KRB_AS_REP:
2747 if self.strict_checking:
2748 if sent_fast:
2749 self.assertIsNotNone(enc_challenge)
2750 self.assertIsNone(enc_timestamp)
2751 else:
2752 self.assertIsNotNone(enc_timestamp)
2753 self.assertIsNone(enc_challenge)
2754 self.assertIsNotNone(pk_as_req)
2755 self.assertIsNotNone(pk_as_rep19)
2756 else:
2757 self.assertIsNone(enc_timestamp)
2758 self.assertIsNone(enc_challenge)
2759 self.assertIsNone(pk_as_req)
2760 self.assertIsNone(pk_as_rep19)
2761 return None
2763 if error_code != KDC_ERR_GENERIC:
2764 if self.strict_checking:
2765 self.assertIsNotNone(etype_info2)
2766 else:
2767 self.assertIsNone(etype_info2)
2768 if expect_etype_info:
2769 self.assertIsNotNone(etype_info)
2770 else:
2771 if self.strict_checking:
2772 self.assertIsNone(etype_info)
2773 if unexpect_etype_info:
2774 self.assertIsNone(etype_info)
2776 if error_code != KDC_ERR_GENERIC and self.strict_checking:
2777 self.assertGreaterEqual(len(etype_info2), 1)
2778 self.assertEqual(len(etype_info2), len(expect_etype_info2))
2779 for i in range(0, len(etype_info2)):
2780 e = self.getElementValue(etype_info2[i], 'etype')
2781 self.assertEqual(e, expect_etype_info2[i])
2782 salt = self.getElementValue(etype_info2[i], 'salt')
2783 if e == kcrypto.Enctype.RC4:
2784 self.assertIsNone(salt)
2785 else:
2786 self.assertIsNotNone(salt)
2787 expected_salt = kdc_exchange_dict['expected_salt']
2788 if expected_salt is not None:
2789 self.assertEqual(salt, expected_salt)
2790 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
2791 if self.strict_checking:
2792 self.assertIsNone(s2kparams)
2793 if etype_info is not None:
2794 self.assertEqual(len(etype_info), 1)
2795 e = self.getElementValue(etype_info[0], 'etype')
2796 self.assertEqual(e, kcrypto.Enctype.RC4)
2797 self.assertEqual(e, expect_etype_info2[0])
2798 salt = self.getElementValue(etype_info[0], 'salt')
2799 if self.strict_checking:
2800 self.assertIsNotNone(salt)
2801 self.assertEqual(len(salt), 0)
2803 if error_code not in (KDC_ERR_PREAUTH_FAILED,
2804 KDC_ERR_GENERIC):
2805 if sent_fast:
2806 self.assertIsNotNone(enc_challenge)
2807 if self.strict_checking:
2808 self.assertIsNone(enc_timestamp)
2809 else:
2810 self.assertIsNotNone(enc_timestamp)
2811 if self.strict_checking:
2812 self.assertIsNone(enc_challenge)
2813 if not sent_enc_challenge:
2814 if self.strict_checking:
2815 self.assertIsNotNone(pk_as_req)
2816 self.assertIsNotNone(pk_as_rep19)
2817 else:
2818 self.assertIsNone(pk_as_req)
2819 self.assertIsNone(pk_as_rep19)
2820 else:
2821 if self.strict_checking:
2822 self.assertIsNone(enc_timestamp)
2823 self.assertIsNone(enc_challenge)
2824 self.assertIsNone(pk_as_req)
2825 self.assertIsNone(pk_as_rep19)
2827 return etype_info2
2829 def generate_simple_fast(self,
2830 kdc_exchange_dict,
2831 _callback_dict,
2832 req_body,
2833 fast_padata,
2834 fast_armor,
2835 checksum,
2836 fast_options=''):
2837 armor_key = kdc_exchange_dict['armor_key']
2839 fast_req = self.KRB_FAST_REQ_create(fast_options,
2840 fast_padata,
2841 req_body)
2842 fast_req = self.der_encode(fast_req,
2843 asn1Spec=krb5_asn1.KrbFastReq())
2844 fast_req = self.EncryptedData_create(armor_key,
2845 KU_FAST_ENC,
2846 fast_req)
2848 fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
2849 checksum,
2850 fast_req)
2852 fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
2853 fx_fast_request = self.der_encode(
2854 fx_fast_request,
2855 asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
2857 fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
2858 fx_fast_request)
2860 return fast_padata
2862 def generate_ap_req(self,
2863 kdc_exchange_dict,
2864 _callback_dict,
2865 req_body,
2866 armor):
2867 if armor:
2868 tgt = kdc_exchange_dict['armor_tgt']
2869 authenticator_subkey = kdc_exchange_dict['armor_subkey']
2871 req_body_checksum = None
2872 else:
2873 tgt = kdc_exchange_dict['tgt']
2874 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2875 body_checksum_type = kdc_exchange_dict['body_checksum_type']
2877 req_body_blob = self.der_encode(req_body,
2878 asn1Spec=krb5_asn1.KDC_REQ_BODY())
2880 req_body_checksum = self.Checksum_create(tgt.session_key,
2881 KU_TGS_REQ_AUTH_CKSUM,
2882 req_body_blob,
2883 ctype=body_checksum_type)
2885 auth_data = kdc_exchange_dict['auth_data']
2887 subkey_obj = None
2888 if authenticator_subkey is not None:
2889 subkey_obj = authenticator_subkey.export_obj()
2890 seq_number = random.randint(0, 0xfffffffe)
2891 (ctime, cusec) = self.get_KerberosTimeWithUsec()
2892 authenticator_obj = self.Authenticator_create(
2893 crealm=tgt.crealm,
2894 cname=tgt.cname,
2895 cksum=req_body_checksum,
2896 cusec=cusec,
2897 ctime=ctime,
2898 subkey=subkey_obj,
2899 seq_number=seq_number,
2900 authorization_data=auth_data)
2901 authenticator_blob = self.der_encode(
2902 authenticator_obj,
2903 asn1Spec=krb5_asn1.Authenticator())
2905 usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
2906 authenticator = self.EncryptedData_create(tgt.session_key,
2907 usage,
2908 authenticator_blob)
2910 ap_options = krb5_asn1.APOptions('0')
2911 ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
2912 ticket=tgt.ticket,
2913 authenticator=authenticator)
2914 ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
2916 return ap_req
2918 def generate_simple_tgs_padata(self,
2919 kdc_exchange_dict,
2920 callback_dict,
2921 req_body):
2922 ap_req = self.generate_ap_req(kdc_exchange_dict,
2923 callback_dict,
2924 req_body,
2925 armor=False)
2926 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
2927 padata = [pa_tgs_req]
2929 return padata, req_body
2931 def get_preauth_key(self, kdc_exchange_dict):
2932 msg_type = kdc_exchange_dict['rep_msg_type']
2934 if msg_type == KRB_AS_REP:
2935 key = kdc_exchange_dict['preauth_key']
2936 usage = KU_AS_REP_ENC_PART
2937 else: # KRB_TGS_REP
2938 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2939 if authenticator_subkey is not None:
2940 key = authenticator_subkey
2941 usage = KU_TGS_REP_ENC_PART_SUB_KEY
2942 else:
2943 tgt = kdc_exchange_dict['tgt']
2944 key = tgt.session_key
2945 usage = KU_TGS_REP_ENC_PART_SESSION
2947 self.assertIsNotNone(key)
2949 return key, usage
2951 def generate_armor_key(self, subkey, session_key):
2952 armor_key = kcrypto.cf2(subkey.key,
2953 session_key.key,
2954 b'subkeyarmor',
2955 b'ticketarmor')
2956 armor_key = Krb5EncryptionKey(armor_key, None)
2958 return armor_key
2960 def generate_strengthen_reply_key(self, strengthen_key, reply_key):
2961 strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
2962 reply_key.key,
2963 b'strengthenkey',
2964 b'replykey')
2965 strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
2966 reply_key.kvno)
2968 return strengthen_reply_key
2970 def generate_client_challenge_key(self, armor_key, longterm_key):
2971 client_challenge_key = kcrypto.cf2(armor_key.key,
2972 longterm_key.key,
2973 b'clientchallengearmor',
2974 b'challengelongterm')
2975 client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
2977 return client_challenge_key
2979 def generate_kdc_challenge_key(self, armor_key, longterm_key):
2980 kdc_challenge_key = kcrypto.cf2(armor_key.key,
2981 longterm_key.key,
2982 b'kdcchallengearmor',
2983 b'challengelongterm')
2984 kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
2986 return kdc_challenge_key
2988 def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
2989 expected_type = expected_checksum['cksumtype']
2990 self.assertEqual(armor_key.ctype, expected_type)
2992 ticket_blob = self.der_encode(ticket,
2993 asn1Spec=krb5_asn1.Ticket())
2994 checksum = self.Checksum_create(armor_key,
2995 KU_FAST_FINISHED,
2996 ticket_blob)
2997 self.assertEqual(expected_checksum, checksum)
2999 def verify_ticket(self, ticket, krbtgt_key, expect_pac=True):
3000 # Check if the ticket is a TGT.
3001 sname = ticket.ticket['sname']
3002 is_tgt = self.is_tgs(sname)
3004 # Decrypt the ticket.
3006 key = ticket.decryption_key
3007 enc_part = ticket.ticket['enc-part']
3009 self.assertElementEqual(enc_part, 'etype', key.etype)
3010 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3012 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3013 enc_part = self.der_decode(
3014 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3016 # Fetch the authorization data from the ticket.
3017 auth_data = enc_part.get('authorization-data')
3018 if expect_pac:
3019 self.assertIsNotNone(auth_data)
3020 elif auth_data is None:
3021 return
3023 # Get a copy of the authdata with an empty PAC, and the existing PAC
3024 # (if present).
3025 empty_pac = self.get_empty_pac()
3026 auth_data, pac_data = self.replace_pac(auth_data,
3027 empty_pac,
3028 expect_pac=expect_pac)
3029 if not expect_pac:
3030 return
3032 # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
3033 # raw type to create a new PAC with zeroed signatures for
3034 # verification. This is because on Windows, the resource_groups field
3035 # is added to PAC_LOGON_INFO after the info3 field has been created,
3036 # which results in a different ordering of pointer values than Samba
3037 # (see commit 0e201ecdc53). Using the raw type avoids changing
3038 # PAC_LOGON_INFO, so verification against Windows can work. We still
3039 # need the PAC_DATA type to retrieve the actual checksums, because the
3040 # signatures in the raw type may contain padding bytes.
3041 pac = ndr_unpack(krb5pac.PAC_DATA,
3042 pac_data)
3043 raw_pac = ndr_unpack(krb5pac.PAC_DATA_RAW,
3044 pac_data)
3046 checksums = {}
3048 for pac_buffer, raw_pac_buffer in zip(pac.buffers, raw_pac.buffers):
3049 buffer_type = pac_buffer.type
3050 if buffer_type in self.pac_checksum_types:
3051 self.assertNotIn(buffer_type, checksums,
3052 f'Duplicate checksum type {buffer_type}')
3054 # Fetch the checksum and the checksum type from the PAC buffer.
3055 checksum = pac_buffer.info.signature
3056 ctype = pac_buffer.info.type
3057 if ctype & 1 << 31:
3058 ctype |= -1 << 31
3060 checksums[buffer_type] = checksum, ctype
3062 if buffer_type != krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3063 # Zero the checksum field so that we can later verify the
3064 # checksums. The ticket checksum field is not zeroed.
3066 signature = ndr_unpack(
3067 krb5pac.PAC_SIGNATURE_DATA,
3068 raw_pac_buffer.info.remaining)
3069 signature.signature = bytes(len(checksum))
3070 raw_pac_buffer.info.remaining = ndr_pack(
3071 signature)
3073 # Re-encode the PAC.
3074 pac_data = ndr_pack(raw_pac)
3076 # Verify the signatures.
3078 server_checksum, server_ctype = checksums[
3079 krb5pac.PAC_TYPE_SRV_CHECKSUM]
3080 Krb5EncryptionKey.verify_checksum(key,
3081 KU_NON_KERB_CKSUM_SALT,
3082 pac_data,
3083 server_ctype,
3084 server_checksum)
3086 kdc_checksum, kdc_ctype = checksums[
3087 krb5pac.PAC_TYPE_KDC_CHECKSUM]
3088 krbtgt_key.verify_checksum(KU_NON_KERB_CKSUM_SALT,
3089 server_checksum,
3090 kdc_ctype,
3091 kdc_checksum)
3093 if is_tgt:
3094 self.assertNotIn(krb5pac.PAC_TYPE_TICKET_CHECKSUM, checksums)
3095 else:
3096 ticket_checksum, ticket_ctype = checksums.get(
3097 krb5pac.PAC_TYPE_TICKET_CHECKSUM,
3098 (None, None))
3099 if self.strict_checking:
3100 self.assertIsNotNone(ticket_checksum)
3101 if ticket_checksum is not None:
3102 enc_part['authorization-data'] = auth_data
3103 enc_part = self.der_encode(enc_part,
3104 asn1Spec=krb5_asn1.EncTicketPart())
3106 krbtgt_key.verify_checksum(KU_NON_KERB_CKSUM_SALT,
3107 enc_part,
3108 ticket_ctype,
3109 ticket_checksum)
3111 def modified_ticket(self,
3112 ticket, *,
3113 new_ticket_key=None,
3114 modify_fn=None,
3115 modify_pac_fn=None,
3116 exclude_pac=False,
3117 update_pac_checksums=True,
3118 checksum_keys=None,
3119 include_checksums=None):
3120 if checksum_keys is None:
3121 # A dict containing a key for each checksum type to be created in
3122 # the PAC.
3123 checksum_keys = {}
3125 if include_checksums is None:
3126 # A dict containing a value for each checksum type; True if the
3127 # checksum type is to be included in the PAC, False if it is to be
3128 # excluded, or None/not present if the checksum is to be included
3129 # based on its presence in the original PAC.
3130 include_checksums = {}
3132 # Check that the values passed in by the caller make sense.
3134 self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types)
3135 self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types)
3137 if exclude_pac:
3138 self.assertIsNone(modify_pac_fn)
3140 update_pac_checksums = False
3142 if not update_pac_checksums:
3143 self.assertFalse(checksum_keys)
3144 self.assertFalse(include_checksums)
3146 expect_pac = update_pac_checksums or modify_pac_fn is not None
3148 key = ticket.decryption_key
3150 if new_ticket_key is None:
3151 # Use the same key to re-encrypt the ticket.
3152 new_ticket_key = key
3154 if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys:
3155 # If the server signature key is not present, fall back to the key
3156 # used to encrypt the ticket.
3157 checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key
3159 if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys:
3160 # If the ticket signature key is not present, fall back to the key
3161 # used for the KDC signature.
3162 kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
3163 if kdc_checksum_key is not None:
3164 checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = (
3165 kdc_checksum_key)
3167 # Decrypt the ticket.
3169 enc_part = ticket.ticket['enc-part']
3171 self.assertElementEqual(enc_part, 'etype', key.etype)
3172 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3174 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3175 enc_part = self.der_decode(
3176 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3178 # Modify the ticket here.
3179 if modify_fn is not None:
3180 enc_part = modify_fn(enc_part)
3182 auth_data = enc_part.get('authorization-data')
3183 if expect_pac:
3184 self.assertIsNotNone(auth_data)
3185 if auth_data is not None:
3186 new_pac = None
3187 if not exclude_pac:
3188 # Get a copy of the authdata with an empty PAC, and the
3189 # existing PAC (if present).
3190 empty_pac = self.get_empty_pac()
3191 empty_pac_auth_data, pac_data = self.replace_pac(auth_data,
3192 empty_pac)
3194 if expect_pac:
3195 self.assertIsNotNone(pac_data)
3196 if pac_data is not None:
3197 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
3199 # Modify the PAC here.
3200 if modify_pac_fn is not None:
3201 pac = modify_pac_fn(pac)
3203 if update_pac_checksums:
3204 # Get the enc-part with an empty PAC, which is needed
3205 # to create a ticket signature.
3206 enc_part_to_sign = enc_part.copy()
3207 enc_part_to_sign['authorization-data'] = (
3208 empty_pac_auth_data)
3209 enc_part_to_sign = self.der_encode(
3210 enc_part_to_sign,
3211 asn1Spec=krb5_asn1.EncTicketPart())
3213 self.update_pac_checksums(pac,
3214 checksum_keys,
3215 include_checksums,
3216 enc_part_to_sign)
3218 # Re-encode the PAC.
3219 pac_data = ndr_pack(pac)
3220 new_pac = self.AuthorizationData_create(AD_WIN2K_PAC,
3221 pac_data)
3223 # Replace the PAC in the authorization data and re-add it to the
3224 # ticket enc-part.
3225 auth_data, _ = self.replace_pac(auth_data, new_pac)
3226 enc_part['authorization-data'] = auth_data
3228 # Re-encrypt the ticket enc-part with the new key.
3229 enc_part_new = self.der_encode(enc_part,
3230 asn1Spec=krb5_asn1.EncTicketPart())
3231 enc_part_new = self.EncryptedData_create(new_ticket_key,
3232 KU_TICKET,
3233 enc_part_new)
3235 # Create a copy of the ticket with the new enc-part.
3236 new_ticket = ticket.ticket.copy()
3237 new_ticket['enc-part'] = enc_part_new
3239 new_ticket_creds = KerberosTicketCreds(
3240 new_ticket,
3241 session_key=ticket.session_key,
3242 crealm=ticket.crealm,
3243 cname=ticket.cname,
3244 srealm=ticket.srealm,
3245 sname=ticket.sname,
3246 decryption_key=new_ticket_key,
3247 ticket_private=enc_part,
3248 encpart_private=ticket.encpart_private)
3250 return new_ticket_creds
3252 def update_pac_checksums(self,
3253 pac,
3254 checksum_keys,
3255 include_checksums,
3256 enc_part=None):
3257 pac_buffers = pac.buffers
3258 checksum_buffers = {}
3260 # Find the relevant PAC checksum buffers.
3261 for pac_buffer in pac_buffers:
3262 buffer_type = pac_buffer.type
3263 if buffer_type in self.pac_checksum_types:
3264 self.assertNotIn(buffer_type, checksum_buffers,
3265 f'Duplicate checksum type {buffer_type}')
3267 checksum_buffers[buffer_type] = pac_buffer
3269 # Create any additional buffers that were requested but not
3270 # present. Conversely, remove any buffers that were requested to be
3271 # removed.
3272 for buffer_type in self.pac_checksum_types:
3273 if buffer_type in checksum_buffers:
3274 if include_checksums.get(buffer_type) is False:
3275 checksum_buffer = checksum_buffers.pop(buffer_type)
3277 pac.num_buffers -= 1
3278 pac_buffers.remove(checksum_buffer)
3280 elif include_checksums.get(buffer_type) is True:
3281 info = krb5pac.PAC_SIGNATURE_DATA()
3283 checksum_buffer = krb5pac.PAC_BUFFER()
3284 checksum_buffer.type = buffer_type
3285 checksum_buffer.info = info
3287 pac_buffers.append(checksum_buffer)
3288 pac.num_buffers += 1
3290 checksum_buffers[buffer_type] = checksum_buffer
3292 # Fill the relevant checksum buffers.
3293 for buffer_type, checksum_buffer in checksum_buffers.items():
3294 checksum_key = checksum_keys[buffer_type]
3295 ctype = checksum_key.ctype & ((1 << 32) - 1)
3297 if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3298 self.assertIsNotNone(enc_part)
3300 signature = checksum_key.make_checksum(
3301 KU_NON_KERB_CKSUM_SALT,
3302 enc_part)
3304 elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM:
3305 signature = Krb5EncryptionKey.make_zeroed_checksum(
3306 checksum_key)
3308 else:
3309 signature = checksum_key.make_zeroed_checksum()
3311 checksum_buffer.info.signature = signature
3312 checksum_buffer.info.type = ctype
3314 # Add the new checksum buffers to the PAC.
3315 pac.buffers = pac_buffers
3317 # Calculate the server and KDC checksums and insert them into the PAC.
3319 server_checksum_buffer = checksum_buffers.get(
3320 krb5pac.PAC_TYPE_SRV_CHECKSUM)
3321 if server_checksum_buffer is not None:
3322 server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM]
3324 pac_data = ndr_pack(pac)
3325 server_checksum = Krb5EncryptionKey.make_checksum(
3326 server_checksum_key,
3327 KU_NON_KERB_CKSUM_SALT,
3328 pac_data)
3330 server_checksum_buffer.info.signature = server_checksum
3332 kdc_checksum_buffer = checksum_buffers.get(
3333 krb5pac.PAC_TYPE_KDC_CHECKSUM)
3334 if kdc_checksum_buffer is not None:
3335 self.assertIsNotNone(server_checksum_buffer)
3337 kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM]
3339 kdc_checksum = kdc_checksum_key.make_checksum(
3340 KU_NON_KERB_CKSUM_SALT,
3341 server_checksum)
3343 kdc_checksum_buffer.info.signature = kdc_checksum
3345 def replace_pac(self, auth_data, new_pac, expect_pac=True):
3346 if new_pac is not None:
3347 self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)
3348 self.assertElementPresent(new_pac, 'ad-data')
3350 new_auth_data = []
3352 ad_relevant = None
3353 old_pac = None
3355 for authdata_elem in auth_data:
3356 if authdata_elem['ad-type'] == AD_IF_RELEVANT:
3357 ad_relevant = self.der_decode(
3358 authdata_elem['ad-data'],
3359 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3361 relevant_elems = []
3362 for relevant_elem in ad_relevant:
3363 if relevant_elem['ad-type'] == AD_WIN2K_PAC:
3364 self.assertIsNone(old_pac, 'Multiple PACs detected')
3365 old_pac = relevant_elem['ad-data']
3367 if new_pac is not None:
3368 relevant_elems.append(new_pac)
3369 else:
3370 relevant_elems.append(relevant_elem)
3371 if expect_pac:
3372 self.assertIsNotNone(old_pac, 'Expected PAC')
3374 ad_relevant = self.der_encode(
3375 relevant_elems,
3376 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3378 authdata_elem = self.AuthorizationData_create(AD_IF_RELEVANT,
3379 ad_relevant)
3381 new_auth_data.append(authdata_elem)
3383 if expect_pac:
3384 self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT')
3386 return new_auth_data, old_pac
3388 def get_krbtgt_checksum_key(self):
3389 krbtgt_creds = self.get_krbtgt_creds()
3390 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
3392 return {
3393 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
3396 def is_tgs(self, principal):
3397 name = principal['name-string'][0]
3398 return name in ('krbtgt', b'krbtgt')
3400 def get_empty_pac(self):
3401 return self.AuthorizationData_create(AD_WIN2K_PAC, bytes(1))
3403 def get_outer_pa_dict(self, kdc_exchange_dict):
3404 return self.get_pa_dict(kdc_exchange_dict['req_padata'])
3406 def get_fast_pa_dict(self, kdc_exchange_dict):
3407 req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
3409 if req_pa_dict:
3410 return req_pa_dict
3412 return self.get_outer_pa_dict(kdc_exchange_dict)
3414 def sent_fast(self, kdc_exchange_dict):
3415 outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
3417 return PADATA_FX_FAST in outer_pa_dict
3419 def sent_enc_challenge(self, kdc_exchange_dict):
3420 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
3422 return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
3424 def get_sent_pac_options(self, kdc_exchange_dict):
3425 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
3427 if PADATA_PAC_OPTIONS not in fast_pa_dict:
3428 return ''
3430 pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
3431 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
3432 pac_options = pac_options['options']
3434 # Mask out unsupported bits.
3435 pac_options, remaining = pac_options[:4], pac_options[4:]
3436 pac_options += '0' * len(remaining)
3438 return pac_options
3440 def get_krbtgt_sname(self):
3441 krbtgt_creds = self.get_krbtgt_creds()
3442 krbtgt_username = krbtgt_creds.get_username()
3443 krbtgt_realm = krbtgt_creds.get_realm()
3444 krbtgt_sname = self.PrincipalName_create(
3445 name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm])
3447 return krbtgt_sname
3449 def _test_as_exchange(self,
3450 cname,
3451 realm,
3452 sname,
3453 till,
3454 client_as_etypes,
3455 expected_error_mode,
3456 expected_crealm,
3457 expected_cname,
3458 expected_srealm,
3459 expected_sname,
3460 expected_salt,
3461 etypes,
3462 padata,
3463 kdc_options,
3464 expected_flags=None,
3465 unexpected_flags=None,
3466 expected_supported_etypes=None,
3467 preauth_key=None,
3468 ticket_decryption_key=None,
3469 pac_request=None,
3470 pac_options=None,
3471 to_rodc=False):
3473 def _generate_padata_copy(_kdc_exchange_dict,
3474 _callback_dict,
3475 req_body):
3476 return padata, req_body
3478 if not expected_error_mode:
3479 check_error_fn = None
3480 check_rep_fn = self.generic_check_kdc_rep
3481 else:
3482 check_error_fn = self.generic_check_kdc_error
3483 check_rep_fn = None
3485 if padata is not None:
3486 generate_padata_fn = _generate_padata_copy
3487 else:
3488 generate_padata_fn = None
3490 kdc_exchange_dict = self.as_exchange_dict(
3491 expected_crealm=expected_crealm,
3492 expected_cname=expected_cname,
3493 expected_srealm=expected_srealm,
3494 expected_sname=expected_sname,
3495 expected_supported_etypes=expected_supported_etypes,
3496 ticket_decryption_key=ticket_decryption_key,
3497 generate_padata_fn=generate_padata_fn,
3498 check_error_fn=check_error_fn,
3499 check_rep_fn=check_rep_fn,
3500 check_kdc_private_fn=self.generic_check_kdc_private,
3501 expected_error_mode=expected_error_mode,
3502 client_as_etypes=client_as_etypes,
3503 expected_salt=expected_salt,
3504 expected_flags=expected_flags,
3505 unexpected_flags=unexpected_flags,
3506 preauth_key=preauth_key,
3507 kdc_options=str(kdc_options),
3508 pac_request=pac_request,
3509 pac_options=pac_options,
3510 to_rodc=to_rodc)
3512 rep = self._generic_kdc_exchange(kdc_exchange_dict,
3513 cname=cname,
3514 realm=realm,
3515 sname=sname,
3516 till_time=till,
3517 etypes=etypes)
3519 return rep, kdc_exchange_dict