tests/krb5: Add tests for PK-INIT Freshness Extension (RFC 8070)
[Samba.git] / python / samba / tests / krb5 / raw_testcase.py
blobb7d2d2a42e54db2e956e6d583849a3701f7a45c8
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import socket
21 import struct
22 import time
23 import datetime
24 import random
25 import binascii
26 import itertools
27 import collections
28 import math
30 from enum import Enum
31 from pprint import pprint
33 from cryptography import x509
34 from cryptography.hazmat.primitives import asymmetric, hashes
35 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
36 from cryptography.hazmat.backends import default_backend
38 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
39 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
40 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
41 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
43 from pyasn1.codec.ber.encoder import BitStringEncoder
44 import pyasn1.type.univ
46 from pyasn1.error import PyAsn1Error
48 from samba import unix2nttime
49 from samba.credentials import Credentials
50 from samba.dcerpc import claims, krb5pac, netlogon, samr, security
51 from samba.gensec import FEATURE_SEAL
52 from samba.ndr import ndr_pack, ndr_unpack
53 from samba.dcerpc.misc import (
54 SEC_CHAN_WKSTA,
55 SEC_CHAN_BDC,
58 import samba.tests
59 from samba.tests import TestCase
61 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
62 from samba.tests.krb5.rfc4120_constants import (
63 AD_IF_RELEVANT,
64 AD_WIN2K_PAC,
65 FX_FAST_ARMOR_AP_REQUEST,
66 KDC_ERR_CLIENT_REVOKED,
67 KDC_ERR_GENERIC,
68 KDC_ERR_POLICY,
69 KDC_ERR_PREAUTH_FAILED,
70 KDC_ERR_SKEW,
71 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
72 KERB_ERR_TYPE_EXTENDED,
73 KRB_AP_REP,
74 KRB_AP_REQ,
75 KRB_AS_REP,
76 KRB_AS_REQ,
77 KRB_ERROR,
78 KRB_PRIV,
79 KRB_TGS_REP,
80 KRB_TGS_REQ,
81 KU_AP_REQ_AUTH,
82 KU_AS_FRESHNESS,
83 KU_AS_REP_ENC_PART,
84 KU_AP_REQ_ENC_PART,
85 KU_AS_REQ,
86 KU_ENC_CHALLENGE_KDC,
87 KU_FAST_ENC,
88 KU_FAST_FINISHED,
89 KU_FAST_REP,
90 KU_FAST_REQ_CHKSUM,
91 KU_KRB_PRIV,
92 KU_NON_KERB_CKSUM_SALT,
93 KU_NON_KERB_SALT,
94 KU_PKINIT_AS_REQ,
95 KU_TGS_REP_ENC_PART_SESSION,
96 KU_TGS_REP_ENC_PART_SUB_KEY,
97 KU_TGS_REQ_AUTH,
98 KU_TGS_REQ_AUTH_CKSUM,
99 KU_TGS_REQ_AUTH_DAT_SESSION,
100 KU_TGS_REQ_AUTH_DAT_SUBKEY,
101 KU_TICKET,
102 NT_PRINCIPAL,
103 NT_SRV_INST,
104 NT_WELLKNOWN,
105 PADATA_AS_FRESHNESS,
106 PADATA_ENCRYPTED_CHALLENGE,
107 PADATA_ENC_TIMESTAMP,
108 PADATA_ETYPE_INFO,
109 PADATA_ETYPE_INFO2,
110 PADATA_FOR_USER,
111 PADATA_FX_COOKIE,
112 PADATA_FX_ERROR,
113 PADATA_FX_FAST,
114 PADATA_GSS,
115 PADATA_KDC_REQ,
116 PADATA_PAC_OPTIONS,
117 PADATA_PAC_REQUEST,
118 PADATA_PKINIT_KX,
119 PADATA_PK_AS_REP,
120 PADATA_PK_AS_REQ,
121 PADATA_PK_AS_REP_19,
122 PADATA_SUPPORTED_ETYPES,
123 PADATA_REQ_ENC_PA_REP
125 import samba.tests.krb5.kcrypto as kcrypto
128 def BitStringEncoder_encodeValue32(
129 self, value, asn1Spec, encodeFun, **options):
131 # BitStrings like KDCOptions or TicketFlags should at least
132 # be 32-Bit on the wire
134 if asn1Spec is not None:
135 # TODO: try to avoid ASN.1 schema instantiation
136 value = asn1Spec.clone(value)
138 valueLength = len(value)
139 if valueLength % 8:
140 alignedValue = value << (8 - valueLength % 8)
141 else:
142 alignedValue = value
144 substrate = alignedValue.asOctets()
145 length = len(substrate)
146 # We need at least 32-Bit / 4-Bytes
147 if length < 4:
148 padding = 4 - length
149 else:
150 padding = 0
151 ret = b'\x00' + substrate + (b'\x00' * padding)
152 return ret, False, True
155 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
158 def BitString_NamedValues_prettyPrint(self, scope=0):
159 ret = "%s" % self.asBinary()
160 bits = []
161 highest_bit = 32
162 for byte in self.asNumbers():
163 for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
164 mask = 1 << bit
165 if byte & mask:
166 val = 1
167 else:
168 val = 0
169 bits.append(val)
170 if len(bits) < highest_bit:
171 for bitPosition in range(len(bits), highest_bit):
172 bits.append(0)
173 indent = " " * scope
174 delim = ": (\n%s " % indent
175 for bitPosition in range(highest_bit):
176 if bitPosition in self.prettyPrintNamedValues:
177 name = self.prettyPrintNamedValues[bitPosition]
178 elif bits[bitPosition] != 0:
179 name = "unknown-bit-%u" % bitPosition
180 else:
181 continue
182 ret += "%s%s:%u" % (delim, name, bits[bitPosition])
183 delim = ",\n%s " % indent
184 ret += "\n%s)" % indent
185 return ret
188 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
189 krb5_asn1.TicketFlagsValues.namedValues
190 krb5_asn1.TicketFlags.namedValues =\
191 krb5_asn1.TicketFlagsValues.namedValues
192 krb5_asn1.TicketFlags.prettyPrint =\
193 BitString_NamedValues_prettyPrint
194 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
195 krb5_asn1.KDCOptionsValues.namedValues
196 krb5_asn1.KDCOptions.namedValues =\
197 krb5_asn1.KDCOptionsValues.namedValues
198 krb5_asn1.KDCOptions.prettyPrint =\
199 BitString_NamedValues_prettyPrint
200 krb5_asn1.APOptions.prettyPrintNamedValues =\
201 krb5_asn1.APOptionsValues.namedValues
202 krb5_asn1.APOptions.namedValues =\
203 krb5_asn1.APOptionsValues.namedValues
204 krb5_asn1.APOptions.prettyPrint =\
205 BitString_NamedValues_prettyPrint
206 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
207 krb5_asn1.PACOptionFlagsValues.namedValues
208 krb5_asn1.PACOptionFlags.namedValues =\
209 krb5_asn1.PACOptionFlagsValues.namedValues
210 krb5_asn1.PACOptionFlags.prettyPrint =\
211 BitString_NamedValues_prettyPrint
214 def Integer_NamedValues_prettyPrint(self, scope=0):
215 intval = int(self)
216 if intval in self.prettyPrintNamedValues:
217 name = self.prettyPrintNamedValues[intval]
218 else:
219 name = "<__unknown__>"
220 ret = "%d (0x%x) %s" % (intval, intval, name)
221 return ret
224 krb5_asn1.NameType.prettyPrintNamedValues =\
225 krb5_asn1.NameTypeValues.namedValues
226 krb5_asn1.NameType.prettyPrint =\
227 Integer_NamedValues_prettyPrint
228 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
229 krb5_asn1.AuthDataTypeValues.namedValues
230 krb5_asn1.AuthDataType.prettyPrint =\
231 Integer_NamedValues_prettyPrint
232 krb5_asn1.PADataType.prettyPrintNamedValues =\
233 krb5_asn1.PADataTypeValues.namedValues
234 krb5_asn1.PADataType.prettyPrint =\
235 Integer_NamedValues_prettyPrint
236 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
237 krb5_asn1.EncryptionTypeValues.namedValues
238 krb5_asn1.EncryptionType.prettyPrint =\
239 Integer_NamedValues_prettyPrint
240 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
241 krb5_asn1.ChecksumTypeValues.namedValues
242 krb5_asn1.ChecksumType.prettyPrint =\
243 Integer_NamedValues_prettyPrint
244 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
245 krb5_asn1.KerbErrorDataTypeValues.namedValues
246 krb5_asn1.KerbErrorDataType.prettyPrint =\
247 Integer_NamedValues_prettyPrint
250 class Krb5EncryptionKey:
251 def __init__(self, key, kvno):
252 EncTypeChecksum = {
253 kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
254 kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
255 kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
257 self.key = key
258 self.etype = key.enctype
259 self.ctype = EncTypeChecksum[self.etype]
260 self.kvno = kvno
262 def __str__(self):
263 return "etype=%d ctype=%d kvno=%d key=%s" % (
264 self.etype, self.ctype, self.kvno, self.key)
266 def encrypt(self, usage, plaintext):
267 ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
268 return ciphertext
270 def decrypt(self, usage, ciphertext):
271 plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
272 return plaintext
274 def make_zeroed_checksum(self, ctype=None):
275 if ctype is None:
276 ctype = self.ctype
278 checksum_len = kcrypto.checksum_len(ctype)
279 return bytes(checksum_len)
281 def make_checksum(self, usage, plaintext, ctype=None):
282 if ctype is None:
283 ctype = self.ctype
284 cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
285 return cksum
287 def verify_checksum(self, usage, plaintext, ctype, cksum):
288 if self.ctype != ctype:
289 raise AssertionError(f'key checksum type ({self.ctype}) != '
290 f'checksum type ({ctype})')
292 kcrypto.verify_checksum(ctype,
293 self.key,
294 usage,
295 plaintext,
296 cksum)
298 def export_obj(self):
299 EncryptionKey_obj = {
300 'keytype': self.etype,
301 'keyvalue': self.key.contents,
303 return EncryptionKey_obj
306 class RodcPacEncryptionKey(Krb5EncryptionKey):
307 def __init__(self, key, kvno, rodc_id=None):
308 super().__init__(key, kvno)
310 if rodc_id is None:
311 kvno = self.kvno
312 if kvno is not None:
313 kvno >>= 16
314 kvno &= (1 << 16) - 1
316 rodc_id = kvno or None
318 if rodc_id is not None:
319 self.rodc_id = rodc_id.to_bytes(2, byteorder='little')
320 else:
321 self.rodc_id = b''
323 def make_rodc_zeroed_checksum(self, ctype=None):
324 checksum = super().make_zeroed_checksum(ctype)
325 return checksum + bytes(len(self.rodc_id))
327 def make_rodc_checksum(self, usage, plaintext, ctype=None):
328 checksum = super().make_checksum(usage, plaintext, ctype)
329 return checksum + self.rodc_id
331 def verify_rodc_checksum(self, usage, plaintext, ctype, cksum):
332 if self.rodc_id:
333 cksum, cksum_rodc_id = cksum[:-2], cksum[-2:]
335 if self.rodc_id != cksum_rodc_id:
336 raise AssertionError(f'{self.rodc_id.hex()} != '
337 f'{cksum_rodc_id.hex()}')
339 super().verify_checksum(usage,
340 plaintext,
341 ctype,
342 cksum)
345 class ZeroedChecksumKey(RodcPacEncryptionKey):
346 def make_checksum(self, usage, plaintext, ctype=None):
347 return self.make_zeroed_checksum(ctype)
349 def make_rodc_checksum(self, usage, plaintext, ctype=None):
350 return self.make_rodc_zeroed_checksum(ctype)
353 class WrongLengthChecksumKey(RodcPacEncryptionKey):
354 def __init__(self, key, kvno, length):
355 super().__init__(key, kvno)
357 self._length = length
359 @classmethod
360 def _adjust_to_length(cls, checksum, length):
361 diff = length - len(checksum)
362 if diff > 0:
363 checksum += bytes(diff)
364 elif diff < 0:
365 checksum = checksum[:length]
367 return checksum
369 def make_zeroed_checksum(self, ctype=None):
370 return bytes(self._length)
372 def make_checksum(self, usage, plaintext, ctype=None):
373 checksum = super().make_checksum(usage, plaintext, ctype)
374 return self._adjust_to_length(checksum, self._length)
376 def make_rodc_zeroed_checksum(self, ctype=None):
377 return bytes(self._length)
379 def make_rodc_checksum(self, usage, plaintext, ctype=None):
380 checksum = super().make_rodc_checksum(usage, plaintext, ctype)
381 return self._adjust_to_length(checksum, self._length)
384 class KerberosCredentials(Credentials):
386 non_etype_bits = (
387 security.KERB_ENCTYPE_FAST_SUPPORTED) | (
388 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED) | (
389 security.KERB_ENCTYPE_CLAIMS_SUPPORTED) | (
390 security.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED) | (
391 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK)
393 def __init__(self):
394 super(KerberosCredentials, self).__init__()
395 all_enc_types = 0
396 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
397 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
398 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
400 self.as_supported_enctypes = all_enc_types
401 self.tgs_supported_enctypes = all_enc_types
402 self.ap_supported_enctypes = all_enc_types
404 self.kvno = None
405 self.forced_keys = {}
407 self.forced_salt = None
409 self.dn = None
410 self.upn = None
411 self.spn = None
412 self.sid = None
413 self.account_type = None
415 self._private_key = None
417 def set_as_supported_enctypes(self, value):
418 self.as_supported_enctypes = int(value)
420 def set_tgs_supported_enctypes(self, value):
421 self.tgs_supported_enctypes = int(value)
423 def set_ap_supported_enctypes(self, value):
424 self.ap_supported_enctypes = int(value)
426 etype_map = collections.OrderedDict([
427 (kcrypto.Enctype.AES256,
428 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96),
429 (kcrypto.Enctype.AES128,
430 security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96),
431 (kcrypto.Enctype.RC4,
432 security.KERB_ENCTYPE_RC4_HMAC_MD5),
433 (kcrypto.Enctype.DES_MD5,
434 security.KERB_ENCTYPE_DES_CBC_MD5),
435 (kcrypto.Enctype.DES_CRC,
436 security.KERB_ENCTYPE_DES_CBC_CRC)
439 @classmethod
440 def etypes_to_bits(cls, etypes):
441 bits = 0
442 for etype in etypes:
443 bit = cls.etype_map[etype]
444 if bits & bit:
445 raise ValueError(f'Got duplicate etype: {etype}')
446 bits |= bit
448 return bits
450 @classmethod
451 def bits_to_etypes(cls, bits):
452 etypes = ()
453 for etype, bit in cls.etype_map.items():
454 if bit & bits:
455 bits &= ~bit
456 etypes += (etype,)
458 bits &= ~cls.non_etype_bits
459 if bits != 0:
460 raise ValueError(f'Unsupported etype bits: {bits}')
462 return etypes
464 def get_as_krb5_etypes(self):
465 return self.bits_to_etypes(self.as_supported_enctypes)
467 def get_tgs_krb5_etypes(self):
468 return self.bits_to_etypes(self.tgs_supported_enctypes)
470 def get_ap_krb5_etypes(self):
471 return self.bits_to_etypes(self.ap_supported_enctypes)
473 def set_kvno(self, kvno):
474 # Sign-extend from 32 bits.
475 if kvno & 1 << 31:
476 kvno |= -1 << 31
477 self.kvno = kvno
479 def get_kvno(self):
480 return self.kvno
482 def set_forced_key(self, etype, hexkey):
483 etype = int(etype)
484 contents = binascii.a2b_hex(hexkey)
485 key = kcrypto.Key(etype, contents)
486 self.forced_keys[etype] = RodcPacEncryptionKey(key, self.kvno)
488 # Also set the NT hash of computer accounts for which we don’t know the
489 # password.
490 if etype == kcrypto.Enctype.RC4 and self.get_password() is None:
491 nt_hash = samr.Password()
492 nt_hash.hash = list(contents)
494 self.set_nt_hash(nt_hash)
496 def get_forced_key(self, etype):
497 etype = int(etype)
498 return self.forced_keys.get(etype)
500 def set_forced_salt(self, salt):
501 self.forced_salt = bytes(salt)
503 def get_forced_salt(self):
504 return self.forced_salt
506 def get_salt(self):
507 if self.forced_salt is not None:
508 return self.forced_salt
510 upn = self.get_upn()
511 if upn is not None:
512 salt_name = upn.rsplit('@', 1)[0].replace('/', '')
513 else:
514 salt_name = self.get_username()
516 secure_schannel_type = self.get_secure_channel_type()
517 if secure_schannel_type in [SEC_CHAN_WKSTA,SEC_CHAN_BDC]:
518 salt_name = self.get_username().lower()
519 if salt_name[-1] == '$':
520 salt_name = salt_name[:-1]
521 salt_string = '%shost%s.%s' % (
522 self.get_realm().upper(),
523 salt_name,
524 self.get_realm().lower())
525 else:
526 salt_string = self.get_realm().upper() + salt_name
528 return salt_string.encode('utf-8')
530 def set_dn(self, dn):
531 self.dn = dn
533 def get_dn(self):
534 return self.dn
536 def set_spn(self, spn):
537 self.spn = spn
539 def get_spn(self):
540 return self.spn
542 def set_upn(self, upn):
543 self.upn = upn
545 def get_upn(self):
546 return self.upn
548 def set_sid(self, sid):
549 self.sid = sid
551 def get_sid(self):
552 return self.sid
554 def set_type(self, account_type):
555 self.account_type = account_type
557 def get_type(self):
558 return self.account_type
560 def update_password(self, password):
561 self.set_password(password)
562 self.set_kvno(self.get_kvno() + 1)
564 def get_private_key(self):
565 if self._private_key is None:
566 # Generate a new keypair.
567 self._private_key = asymmetric.rsa.generate_private_key(
568 public_exponent=65537,
569 key_size=2048,
570 backend=default_backend()
573 return self._private_key
575 def get_public_key(self):
576 return self.get_private_key().public_key()
579 class KerberosTicketCreds:
580 def __init__(self, ticket, session_key,
581 crealm=None, cname=None,
582 srealm=None, sname=None,
583 decryption_key=None,
584 ticket_private=None,
585 encpart_private=None):
586 self.ticket = ticket
587 self.session_key = session_key
588 self.crealm = crealm
589 self.cname = cname
590 self.srealm = srealm
591 self.sname = sname
592 self.decryption_key = decryption_key
593 self.ticket_private = ticket_private
594 self.encpart_private = encpart_private
596 def set_sname(self, sname):
597 self.ticket['sname'] = sname
598 self.sname = sname
601 class PkInit(Enum):
602 NOT_USED = object()
603 PUBLIC_KEY = object()
604 DIFFIE_HELLMAN = object()
607 class RawKerberosTest(TestCase):
608 """A raw Kerberos Test case."""
610 class KpasswdMode(Enum):
611 SET = object()
612 CHANGE = object()
614 # The location of a SID within the PAC
615 class SidType(Enum):
616 BASE_SID = object() # in info3.base.groups
617 EXTRA_SID = object() # in info3.sids
618 RESOURCE_SID = object() # in resource_groups
619 PRIMARY_GID = object() # the (sole) primary group
621 def __repr__(self):
622 return self.__str__()
624 pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
625 krb5pac.PAC_TYPE_KDC_CHECKSUM,
626 krb5pac.PAC_TYPE_TICKET_CHECKSUM,
627 krb5pac.PAC_TYPE_FULL_CHECKSUM}
629 etypes_to_test = (
630 {"value": -1111, "name": "dummy", },
631 {"value": kcrypto.Enctype.AES256, "name": "aes128", },
632 {"value": kcrypto.Enctype.AES128, "name": "aes256", },
633 {"value": kcrypto.Enctype.RC4, "name": "rc4", },
636 expect_padata_outer = object()
638 setup_etype_test_permutations_done = False
640 @classmethod
641 def setup_etype_test_permutations(cls):
642 if cls.setup_etype_test_permutations_done:
643 return
645 res = []
647 num_idxs = len(cls.etypes_to_test)
648 permutations = []
649 for num in range(1, num_idxs + 1):
650 chunk = list(itertools.permutations(range(num_idxs), num))
651 for e in chunk:
652 el = list(e)
653 permutations.append(el)
655 for p in permutations:
656 name = None
657 etypes = ()
658 for idx in p:
659 n = cls.etypes_to_test[idx]["name"]
660 if name is None:
661 name = n
662 else:
663 name += "_%s" % n
664 etypes += (cls.etypes_to_test[idx]["value"],)
666 r = {"name": name, "etypes": etypes, }
667 res.append(r)
669 cls.etype_test_permutations = res
670 cls.setup_etype_test_permutations_done = True
672 @classmethod
673 def etype_test_permutation_name_idx(cls):
674 cls.setup_etype_test_permutations()
675 res = []
676 idx = 0
677 for e in cls.etype_test_permutations:
678 r = (e['name'], idx)
679 idx += 1
680 res.append(r)
681 return res
683 def etype_test_permutation_by_idx(self, idx):
684 e = self.etype_test_permutations[idx]
685 return (e['name'], e['etypes'])
687 @classmethod
688 def setUpClass(cls):
689 super().setUpClass()
691 cls.host = samba.tests.env_get_var_value('SERVER')
692 cls.dc_host = samba.tests.env_get_var_value('DC_SERVER')
694 # A dictionary containing credentials that have already been
695 # obtained.
696 cls.creds_dict = {}
698 kdc_fast_support = samba.tests.env_get_var_value('FAST_SUPPORT',
699 allow_missing=True)
700 if kdc_fast_support is None:
701 kdc_fast_support = '0'
702 cls.kdc_fast_support = bool(int(kdc_fast_support))
704 kdc_claims_support = samba.tests.env_get_var_value('CLAIMS_SUPPORT',
705 allow_missing=True)
706 if kdc_claims_support is None:
707 kdc_claims_support = '0'
708 cls.kdc_claims_support = bool(int(kdc_claims_support))
710 kdc_compound_id_support = samba.tests.env_get_var_value(
711 'COMPOUND_ID_SUPPORT',
712 allow_missing=True)
713 if kdc_compound_id_support is None:
714 kdc_compound_id_support = '0'
715 cls.kdc_compound_id_support = bool(int(kdc_compound_id_support))
717 tkt_sig_support = samba.tests.env_get_var_value('TKT_SIG_SUPPORT',
718 allow_missing=True)
719 if tkt_sig_support is None:
720 tkt_sig_support = '0'
721 cls.tkt_sig_support = bool(int(tkt_sig_support))
723 full_sig_support = samba.tests.env_get_var_value('FULL_SIG_SUPPORT',
724 allow_missing=True)
725 if full_sig_support is None:
726 full_sig_support = '0'
727 cls.full_sig_support = bool(int(full_sig_support))
729 expect_pac = samba.tests.env_get_var_value('EXPECT_PAC',
730 allow_missing=True)
731 if expect_pac is None:
732 expect_pac = '1'
733 cls.expect_pac = bool(int(expect_pac))
735 expect_extra_pac_buffers = samba.tests.env_get_var_value(
736 'EXPECT_EXTRA_PAC_BUFFERS',
737 allow_missing=True)
738 if expect_extra_pac_buffers is None:
739 expect_extra_pac_buffers = '1'
740 cls.expect_extra_pac_buffers = bool(int(expect_extra_pac_buffers))
742 cname_checking = samba.tests.env_get_var_value('CHECK_CNAME',
743 allow_missing=True)
744 if cname_checking is None:
745 cname_checking = '1'
746 cls.cname_checking = bool(int(cname_checking))
748 padata_checking = samba.tests.env_get_var_value('CHECK_PADATA',
749 allow_missing=True)
750 if padata_checking is None:
751 padata_checking = '1'
752 cls.padata_checking = bool(int(padata_checking))
754 kadmin_is_tgs = samba.tests.env_get_var_value('KADMIN_IS_TGS',
755 allow_missing=True)
756 if kadmin_is_tgs is None:
757 kadmin_is_tgs = '0'
758 cls.kadmin_is_tgs = bool(int(kadmin_is_tgs))
760 default_etypes = samba.tests.env_get_var_value('DEFAULT_ETYPES',
761 allow_missing=True)
762 if default_etypes is not None:
763 default_etypes = int(default_etypes)
764 cls.default_etypes = default_etypes
766 forced_rc4 = samba.tests.env_get_var_value('FORCED_RC4',
767 allow_missing=True)
768 if forced_rc4 is None:
769 forced_rc4 = '0'
770 cls.forced_rc4 = bool(int(forced_rc4))
772 expect_nt_hash = samba.tests.env_get_var_value('EXPECT_NT_HASH',
773 allow_missing=True)
774 if expect_nt_hash is None:
775 expect_nt_hash = '1'
776 cls.expect_nt_hash = bool(int(expect_nt_hash))
778 expect_nt_status = samba.tests.env_get_var_value('EXPECT_NT_STATUS',
779 allow_missing=True)
780 if expect_nt_status is None:
781 expect_nt_status = '1'
782 cls.expect_nt_status = bool(int(expect_nt_status))
784 def setUp(self):
785 super().setUp()
786 self.do_asn1_print = False
787 self.do_hexdump = False
789 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
790 allow_missing=True)
791 if strict_checking is None:
792 strict_checking = '1'
793 self.strict_checking = bool(int(strict_checking))
795 self.s = None
797 self.unspecified_kvno = object()
799 def tearDown(self):
800 self._disconnect("tearDown")
801 super().tearDown()
803 def _disconnect(self, reason):
804 if self.s is None:
805 return
806 self.s.close()
807 self.s = None
808 if self.do_hexdump:
809 sys.stderr.write("disconnect[%s]\n" % reason)
811 def _connect_tcp(self, host, port=None):
812 if port is None:
813 port = 88
814 try:
815 self.a = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
816 socket.SOCK_STREAM, socket.SOL_TCP,
818 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
819 self.s.settimeout(10)
820 self.s.connect(self.a[0][4])
821 except socket.error:
822 self.s.close()
823 raise
824 except IOError:
825 self.s.close()
826 raise
828 def connect(self, host, port=None):
829 self.assertNotConnected()
830 self._connect_tcp(host, port)
831 if self.do_hexdump:
832 sys.stderr.write("connected[%s]\n" % host)
834 def env_get_var(self, varname, prefix,
835 fallback_default=True,
836 allow_missing=False):
837 val = None
838 if prefix is not None:
839 allow_missing_prefix = allow_missing or fallback_default
840 val = samba.tests.env_get_var_value(
841 '%s_%s' % (prefix, varname),
842 allow_missing=allow_missing_prefix)
843 else:
844 fallback_default = True
845 if val is None and fallback_default:
846 val = samba.tests.env_get_var_value(varname,
847 allow_missing=allow_missing)
848 return val
850 def _get_krb5_creds_from_env(self, prefix,
851 default_username=None,
852 allow_missing_password=False,
853 allow_missing_keys=True,
854 require_strongest_key=False):
855 c = KerberosCredentials()
856 c.guess()
858 domain = self.env_get_var('DOMAIN', prefix)
859 realm = self.env_get_var('REALM', prefix)
860 allow_missing_username = default_username is not None
861 username = self.env_get_var('USERNAME', prefix,
862 fallback_default=False,
863 allow_missing=allow_missing_username)
864 if username is None:
865 username = default_username
866 password = self.env_get_var('PASSWORD', prefix,
867 fallback_default=False,
868 allow_missing=allow_missing_password)
869 c.set_domain(domain)
870 c.set_realm(realm)
871 c.set_username(username)
872 if password is not None:
873 c.set_password(password)
874 as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
875 prefix, allow_missing=True)
876 if as_supported_enctypes is not None:
877 c.set_as_supported_enctypes(as_supported_enctypes)
878 tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
879 prefix, allow_missing=True)
880 if tgs_supported_enctypes is not None:
881 c.set_tgs_supported_enctypes(tgs_supported_enctypes)
882 ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
883 prefix, allow_missing=True)
884 if ap_supported_enctypes is not None:
885 c.set_ap_supported_enctypes(ap_supported_enctypes)
887 if require_strongest_key:
888 kvno_allow_missing = False
889 if password is None:
890 aes256_allow_missing = False
891 else:
892 aes256_allow_missing = True
893 else:
894 kvno_allow_missing = allow_missing_keys
895 aes256_allow_missing = allow_missing_keys
896 kvno = self.env_get_var('KVNO', prefix,
897 fallback_default=False,
898 allow_missing=kvno_allow_missing)
899 if kvno is not None:
900 c.set_kvno(int(kvno))
901 aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
902 fallback_default=False,
903 allow_missing=aes256_allow_missing)
904 if aes256_key is not None:
905 c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
906 aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
907 fallback_default=False,
908 allow_missing=True)
909 if aes128_key is not None:
910 c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
911 rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
912 fallback_default=False, allow_missing=True)
913 if rc4_key is not None:
914 c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
916 if not allow_missing_keys:
917 self.assertTrue(c.forced_keys,
918 'Please supply %s encryption keys '
919 'in environment' % prefix)
921 return c
923 def _get_krb5_creds(self,
924 prefix,
925 default_username=None,
926 allow_missing_password=False,
927 allow_missing_keys=True,
928 require_strongest_key=False,
929 fallback_creds_fn=None):
930 if prefix in self.creds_dict:
931 return self.creds_dict[prefix]
933 # We don't have the credentials already
934 creds = None
935 env_err = None
936 try:
937 # Try to obtain them from the environment
938 creds = self._get_krb5_creds_from_env(
939 prefix,
940 default_username=default_username,
941 allow_missing_password=allow_missing_password,
942 allow_missing_keys=allow_missing_keys,
943 require_strongest_key=require_strongest_key)
944 except Exception as err:
945 # An error occurred, so save it for later
946 env_err = err
947 else:
948 self.assertIsNotNone(creds)
949 # Save the obtained credentials
950 self.creds_dict[prefix] = creds
951 return creds
953 if fallback_creds_fn is not None:
954 try:
955 # Try to use the fallback method
956 creds = fallback_creds_fn()
957 except Exception as err:
958 print("ERROR FROM ENV: %r" % (env_err))
959 print("FALLBACK-FN: %s" % (fallback_creds_fn))
960 print("FALLBACK-ERROR: %r" % (err))
961 else:
962 self.assertIsNotNone(creds)
963 # Save the obtained credentials
964 self.creds_dict[prefix] = creds
965 return creds
967 # Both methods failed, so raise the exception from the
968 # environment method
969 raise env_err
971 def get_user_creds(self,
972 allow_missing_password=False,
973 allow_missing_keys=True):
974 c = self._get_krb5_creds(prefix=None,
975 allow_missing_password=allow_missing_password,
976 allow_missing_keys=allow_missing_keys)
977 return c
979 def get_service_creds(self,
980 allow_missing_password=False,
981 allow_missing_keys=True):
982 c = self._get_krb5_creds(prefix='SERVICE',
983 allow_missing_password=allow_missing_password,
984 allow_missing_keys=allow_missing_keys)
985 return c
987 def get_client_creds(self,
988 allow_missing_password=False,
989 allow_missing_keys=True):
990 c = self._get_krb5_creds(prefix='CLIENT',
991 allow_missing_password=allow_missing_password,
992 allow_missing_keys=allow_missing_keys)
993 return c
995 def get_server_creds(self,
996 allow_missing_password=False,
997 allow_missing_keys=True):
998 c = self._get_krb5_creds(prefix='SERVER',
999 allow_missing_password=allow_missing_password,
1000 allow_missing_keys=allow_missing_keys)
1001 return c
1003 def get_admin_creds(self,
1004 allow_missing_password=False,
1005 allow_missing_keys=True):
1006 c = self._get_krb5_creds(prefix='ADMIN',
1007 allow_missing_password=allow_missing_password,
1008 allow_missing_keys=allow_missing_keys)
1009 c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
1010 c.set_workstation('')
1011 return c
1013 def get_rodc_krbtgt_creds(self,
1014 require_keys=True,
1015 require_strongest_key=False):
1016 if require_strongest_key:
1017 self.assertTrue(require_keys)
1018 c = self._get_krb5_creds(prefix='RODC_KRBTGT',
1019 allow_missing_password=True,
1020 allow_missing_keys=not require_keys,
1021 require_strongest_key=require_strongest_key)
1022 return c
1024 def get_krbtgt_creds(self,
1025 require_keys=True,
1026 require_strongest_key=False):
1027 if require_strongest_key:
1028 self.assertTrue(require_keys)
1029 c = self._get_krb5_creds(prefix='KRBTGT',
1030 default_username='krbtgt',
1031 allow_missing_password=True,
1032 allow_missing_keys=not require_keys,
1033 require_strongest_key=require_strongest_key)
1034 return c
1036 def get_anon_creds(self):
1037 c = Credentials()
1038 c.set_anonymous()
1039 return c
1041 # Overridden by KDCBaseTest. At this level we don't know what actual
1042 # enctypes are supported, so the best we can do is go by whether NT hashes
1043 # are expected and whether the account is a workstation or not. This
1044 # matches the behaviour that tests expect by default.
1045 def get_default_enctypes(self, creds):
1046 self.assertIsNotNone(creds)
1048 default_enctypes = [
1049 kcrypto.Enctype.AES256,
1050 kcrypto.Enctype.AES128,
1053 if self.expect_nt_hash or creds.get_workstation():
1054 default_enctypes.append(kcrypto.Enctype.RC4)
1056 return default_enctypes
1058 def asn1_dump(self, name, obj, asn1_print=None):
1059 if asn1_print is None:
1060 asn1_print = self.do_asn1_print
1061 if asn1_print:
1062 if name is not None:
1063 sys.stderr.write("%s:\n%s" % (name, obj))
1064 else:
1065 sys.stderr.write("%s" % (obj))
1067 def hex_dump(self, name, blob, hexdump=None):
1068 if hexdump is None:
1069 hexdump = self.do_hexdump
1070 if hexdump:
1071 sys.stderr.write(
1072 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
1074 def der_decode(
1075 self,
1076 blob,
1077 asn1Spec=None,
1078 native_encode=True,
1079 asn1_print=None,
1080 hexdump=None):
1081 if asn1Spec is not None:
1082 class_name = type(asn1Spec).__name__.split(':')[0]
1083 else:
1084 class_name = "<None-asn1Spec>"
1085 self.hex_dump(class_name, blob, hexdump=hexdump)
1086 obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
1087 self.asn1_dump(None, obj, asn1_print=asn1_print)
1088 if native_encode:
1089 obj = pyasn1_native_encode(obj)
1090 return obj
1092 def der_encode(
1093 self,
1094 obj,
1095 asn1Spec=None,
1096 native_decode=True,
1097 asn1_print=None,
1098 hexdump=None):
1099 if native_decode:
1100 obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
1101 class_name = type(obj).__name__.split(':')[0]
1102 if class_name is not None:
1103 self.asn1_dump(None, obj, asn1_print=asn1_print)
1104 blob = pyasn1_der_encode(obj)
1105 if class_name is not None:
1106 self.hex_dump(class_name, blob, hexdump=hexdump)
1107 return blob
1109 def send_pdu(self, req, asn1_print=None, hexdump=None):
1110 k5_pdu = self.der_encode(
1111 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
1112 self.send_msg(k5_pdu, hexdump=hexdump)
1114 def send_msg(self, msg, hexdump=None):
1115 header = struct.pack('>I', len(msg))
1116 req_pdu = header
1117 req_pdu += msg
1118 self.hex_dump("send_msg", header, hexdump=hexdump)
1119 self.hex_dump("send_msg", msg, hexdump=hexdump)
1121 try:
1122 while True:
1123 sent = self.s.send(req_pdu, 0)
1124 if sent == len(req_pdu):
1125 return
1126 req_pdu = req_pdu[sent:]
1127 except socket.error as e:
1128 self._disconnect("send_msg: %s" % e)
1129 raise
1130 except IOError as e:
1131 self._disconnect("send_msg: %s" % e)
1132 raise
1134 def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
1135 rep_pdu = None
1136 try:
1137 if timeout is not None:
1138 self.s.settimeout(timeout)
1139 rep_pdu = self.s.recv(num_recv, 0)
1140 self.s.settimeout(10)
1141 if len(rep_pdu) == 0:
1142 self._disconnect("recv_raw: EOF")
1143 return None
1144 self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
1145 except socket.timeout:
1146 self.s.settimeout(10)
1147 sys.stderr.write("recv_raw: TIMEOUT\n")
1148 except socket.error as e:
1149 self._disconnect("recv_raw: %s" % e)
1150 raise
1151 except IOError as e:
1152 self._disconnect("recv_raw: %s" % e)
1153 raise
1154 return rep_pdu
1156 def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
1157 raw_pdu = self.recv_raw(
1158 num_recv=4, hexdump=hexdump, timeout=timeout)
1159 if raw_pdu is None:
1160 return None
1161 header = struct.unpack(">I", raw_pdu[0:4])
1162 k5_len = header[0]
1163 if k5_len == 0:
1164 return ""
1165 missing = k5_len
1166 rep_pdu = b''
1167 while missing > 0:
1168 raw_pdu = self.recv_raw(
1169 num_recv=missing, hexdump=hexdump, timeout=timeout)
1170 self.assertGreaterEqual(len(raw_pdu), 1)
1171 rep_pdu += raw_pdu
1172 missing = k5_len - len(rep_pdu)
1173 return rep_pdu
1175 def recv_reply(self, asn1_print=None, hexdump=None, timeout=None):
1176 rep_pdu = self.recv_pdu_raw(asn1_print=asn1_print,
1177 hexdump=hexdump,
1178 timeout=timeout)
1179 if not rep_pdu:
1180 return None, rep_pdu
1181 k5_raw = self.der_decode(
1182 rep_pdu,
1183 asn1Spec=None,
1184 native_encode=False,
1185 asn1_print=False,
1186 hexdump=False)
1187 pvno = k5_raw['field-0']
1188 self.assertEqual(pvno, 5)
1189 msg_type = k5_raw['field-1']
1190 self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
1191 if msg_type == KRB_AS_REP:
1192 asn1Spec = krb5_asn1.AS_REP()
1193 elif msg_type == KRB_TGS_REP:
1194 asn1Spec = krb5_asn1.TGS_REP()
1195 elif msg_type == KRB_ERROR:
1196 asn1Spec = krb5_asn1.KRB_ERROR()
1197 rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
1198 asn1_print=asn1_print, hexdump=False)
1199 return (rep, rep_pdu)
1201 def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
1202 (rep, rep_pdu) = self.recv_reply(asn1_print=asn1_print,
1203 hexdump=hexdump,
1204 timeout=timeout)
1205 return rep
1207 def assertIsConnected(self):
1208 self.assertIsNotNone(self.s, msg="Not connected")
1210 def assertNotConnected(self):
1211 self.assertIsNone(self.s, msg="Is connected")
1213 def send_recv_transaction(
1214 self,
1215 req,
1216 asn1_print=None,
1217 hexdump=None,
1218 timeout=None,
1219 to_rodc=False):
1220 host = self.host if to_rodc else self.dc_host
1221 self.connect(host)
1222 try:
1223 self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
1224 rep = self.recv_pdu(
1225 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
1226 except Exception:
1227 self._disconnect("transaction failed")
1228 raise
1229 self._disconnect("transaction done")
1230 return rep
1232 def getElementValue(self, obj, elem):
1233 return obj.get(elem)
1235 def assertElementMissing(self, obj, elem):
1236 v = self.getElementValue(obj, elem)
1237 self.assertIsNone(v)
1239 def assertElementPresent(self, obj, elem, expect_empty=False):
1240 v = self.getElementValue(obj, elem)
1241 self.assertIsNotNone(v)
1242 if self.strict_checking:
1243 if isinstance(v, collections.abc.Container):
1244 if expect_empty:
1245 self.assertEqual(0, len(v))
1246 else:
1247 self.assertNotEqual(0, len(v))
1249 def assertElementEqual(self, obj, elem, value):
1250 v = self.getElementValue(obj, elem)
1251 self.assertIsNotNone(v)
1252 self.assertEqual(v, value)
1254 def assertElementEqualUTF8(self, obj, elem, value):
1255 v = self.getElementValue(obj, elem)
1256 self.assertIsNotNone(v)
1257 self.assertEqual(v, bytes(value, 'utf8'))
1259 def assertPrincipalEqual(self, princ1, princ2):
1260 self.assertEqual(princ1['name-type'], princ2['name-type'])
1261 self.assertEqual(
1262 len(princ1['name-string']),
1263 len(princ2['name-string']),
1264 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1265 for idx in range(len(princ1['name-string'])):
1266 self.assertEqual(
1267 princ1['name-string'][idx],
1268 princ2['name-string'][idx],
1269 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1271 def assertElementEqualPrincipal(self, obj, elem, value):
1272 v = self.getElementValue(obj, elem)
1273 self.assertIsNotNone(v)
1274 v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
1275 self.assertPrincipalEqual(v, value)
1277 def assertElementKVNO(self, obj, elem, value):
1278 v = self.getElementValue(obj, elem)
1279 if value == "autodetect":
1280 value = v
1281 if value is not None:
1282 self.assertIsNotNone(v)
1283 # The value on the wire should never be 0
1284 self.assertNotEqual(v, 0)
1285 # unspecified_kvno means we don't know the kvno,
1286 # but want to enforce its presence
1287 if value is not self.unspecified_kvno:
1288 value = int(value)
1289 self.assertNotEqual(value, 0)
1290 self.assertEqual(v, value)
1291 else:
1292 self.assertIsNone(v)
1294 def assertElementFlags(self, obj, elem, expected, unexpected):
1295 v = self.getElementValue(obj, elem)
1296 self.assertIsNotNone(v)
1297 if expected is not None:
1298 self.assertIsInstance(expected, krb5_asn1.TicketFlags)
1299 for i, flag in enumerate(expected):
1300 if flag == 1:
1301 self.assertEqual('1', v[i],
1302 f"'{expected.namedValues[i]}' "
1303 f"expected in {v}")
1304 if unexpected is not None:
1305 self.assertIsInstance(unexpected, krb5_asn1.TicketFlags)
1306 for i, flag in enumerate(unexpected):
1307 if flag == 1:
1308 self.assertEqual('0', v[i],
1309 f"'{unexpected.namedValues[i]}' "
1310 f"unexpected in {v}")
1312 def assertSequenceElementsEqual(self, expected, got, *,
1313 require_strict=None,
1314 unchecked=None,
1315 require_ordered=True):
1316 if self.strict_checking and require_ordered and not unchecked:
1317 self.assertEqual(expected, got)
1318 else:
1319 fail_msg = f'expected: {expected} got: {got}'
1321 ignored = set()
1322 if unchecked:
1323 ignored.update(unchecked)
1324 if require_strict and not self.strict_checking:
1325 ignored.update(require_strict)
1327 if ignored:
1328 fail_msg += f' (ignoring: {ignored})'
1329 expected = (x for x in expected if x not in ignored)
1330 got = (x for x in got if x not in ignored)
1332 self.assertCountEqual(expected, got, fail_msg)
1334 def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
1335 if epoch is None:
1336 epoch = time.time()
1337 if offset is not None:
1338 epoch = epoch + int(offset)
1339 dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
1340 return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
1342 def get_KerberosTime(self, epoch=None, offset=None):
1343 (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
1344 return s
1346 def get_EpochFromKerberosTime(self, kerberos_time):
1347 if isinstance(kerberos_time, bytes):
1348 kerberos_time = kerberos_time.decode()
1350 epoch = datetime.datetime.strptime(kerberos_time,
1351 '%Y%m%d%H%M%SZ')
1352 epoch = epoch.replace(tzinfo=datetime.timezone.utc)
1353 epoch = int(epoch.timestamp())
1355 return epoch
1357 def get_Nonce(self):
1358 nonce_min = 0x7f000000
1359 nonce_max = 0x7fffffff
1360 v = random.randint(nonce_min, nonce_max)
1361 return v
1363 def get_pa_dict(self, pa_data):
1364 pa_dict = {}
1366 if pa_data is not None:
1367 for pa in pa_data:
1368 pa_type = pa['padata-type']
1369 if pa_type in pa_dict:
1370 raise RuntimeError(f'Duplicate type {pa_type}')
1371 pa_dict[pa_type] = pa['padata-value']
1373 return pa_dict
1375 def SessionKey_create(self, etype, contents, kvno=None):
1376 key = kcrypto.Key(etype, contents)
1377 return RodcPacEncryptionKey(key, kvno)
1379 def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None,
1380 params=None):
1381 self.assertIsNotNone(pwd)
1382 self.assertIsNotNone(salt)
1383 key = kcrypto.string_to_key(etype, pwd, salt, params=params)
1384 return RodcPacEncryptionKey(key, kvno)
1386 def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
1387 e = etype_info2['etype']
1388 salt = etype_info2.get('salt')
1389 params = etype_info2.get('s2kparams')
1390 return self.PasswordKey_from_etype(creds, e,
1391 kvno=kvno,
1392 salt=salt,
1393 params=params)
1395 def PasswordKey_from_creds(self, creds, etype):
1396 kvno = creds.get_kvno()
1397 salt = creds.get_salt()
1398 return self.PasswordKey_from_etype(creds, etype,
1399 kvno=kvno,
1400 salt=salt)
1402 def PasswordKey_from_etype(self, creds, etype, kvno=None, salt=None, params=None):
1403 if etype == kcrypto.Enctype.RC4:
1404 nthash = creds.get_nt_hash()
1405 return self.SessionKey_create(etype=etype, contents=nthash, kvno=kvno)
1407 password = creds.get_password().encode('utf-8')
1408 return self.PasswordKey_create(
1409 etype=etype, pwd=password, salt=salt, kvno=kvno)
1411 def TicketDecryptionKey_from_creds(self, creds, etype=None):
1413 if etype is None:
1414 etypes = creds.get_tgs_krb5_etypes()
1415 if etypes and etypes[0] not in (kcrypto.Enctype.DES_CRC,
1416 kcrypto.Enctype.DES_MD5):
1417 etype = etypes[0]
1418 else:
1419 etype = kcrypto.Enctype.RC4
1421 forced_key = creds.get_forced_key(etype)
1422 if forced_key is not None:
1423 return forced_key
1425 kvno = creds.get_kvno()
1427 fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
1428 "nor a password specified, " % (
1429 creds.get_username(), etype, kvno))
1431 if etype == kcrypto.Enctype.RC4:
1432 nthash = creds.get_nt_hash()
1433 self.assertIsNotNone(nthash, msg=fail_msg)
1434 return self.SessionKey_create(etype=etype,
1435 contents=nthash,
1436 kvno=kvno)
1438 password = creds.get_password()
1439 self.assertIsNotNone(password, msg=fail_msg)
1440 salt = creds.get_salt()
1441 return self.PasswordKey_create(etype=etype,
1442 pwd=password,
1443 salt=salt,
1444 kvno=kvno)
1446 def RandomKey(self, etype):
1447 e = kcrypto._get_enctype_profile(etype)
1448 contents = samba.generate_random_bytes(e.keysize)
1449 return self.SessionKey_create(etype=etype, contents=contents)
1451 def EncryptionKey_import(self, EncryptionKey_obj):
1452 return self.SessionKey_create(EncryptionKey_obj['keytype'],
1453 EncryptionKey_obj['keyvalue'])
1455 def EncryptedData_create(self, key, usage, plaintext):
1456 # EncryptedData ::= SEQUENCE {
1457 # etype [0] Int32 -- EncryptionType --,
1458 # kvno [1] Int32 OPTIONAL,
1459 # cipher [2] OCTET STRING -- ciphertext
1461 ciphertext = key.encrypt(usage, plaintext)
1462 EncryptedData_obj = {
1463 'etype': key.etype,
1464 'cipher': ciphertext
1466 if key.kvno is not None:
1467 EncryptedData_obj['kvno'] = key.kvno
1468 return EncryptedData_obj
1470 def Checksum_create(self, key, usage, plaintext, ctype=None):
1471 # Checksum ::= SEQUENCE {
1472 # cksumtype [0] Int32,
1473 # checksum [1] OCTET STRING
1475 if ctype is None:
1476 ctype = key.ctype
1477 checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1478 Checksum_obj = {
1479 'cksumtype': ctype,
1480 'checksum': checksum,
1482 return Checksum_obj
1484 @classmethod
1485 def PrincipalName_create(cls, name_type, names):
1486 # PrincipalName ::= SEQUENCE {
1487 # name-type [0] Int32,
1488 # name-string [1] SEQUENCE OF KerberosString
1490 PrincipalName_obj = {
1491 'name-type': name_type,
1492 'name-string': names,
1494 return PrincipalName_obj
1496 def AuthorizationData_create(self, ad_type, ad_data):
1497 # AuthorizationData ::= SEQUENCE {
1498 # ad-type [0] Int32,
1499 # ad-data [1] OCTET STRING
1501 AUTH_DATA_obj = {
1502 'ad-type': ad_type,
1503 'ad-data': ad_data
1505 return AUTH_DATA_obj
1507 def PA_DATA_create(self, padata_type, padata_value):
1508 # PA-DATA ::= SEQUENCE {
1509 # -- NOTE: first tag is [1], not [0]
1510 # padata-type [1] Int32,
1511 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1513 PA_DATA_obj = {
1514 'padata-type': padata_type,
1515 'padata-value': padata_value,
1517 return PA_DATA_obj
1519 def PA_ENC_TS_ENC_create(self, ts, usec):
1520 # PA-ENC-TS-ENC ::= SEQUENCE {
1521 # patimestamp[0] KerberosTime, -- client's time
1522 # pausec[1] krb5int32 OPTIONAL
1524 PA_ENC_TS_ENC_obj = {
1525 'patimestamp': ts,
1526 'pausec': usec,
1528 return PA_ENC_TS_ENC_obj
1530 def PA_PAC_OPTIONS_create(self, options):
1531 # PA-PAC-OPTIONS ::= SEQUENCE {
1532 # options [0] PACOptionFlags
1534 PA_PAC_OPTIONS_obj = {
1535 'options': options
1537 return PA_PAC_OPTIONS_obj
1539 def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1540 # KrbFastArmor ::= SEQUENCE {
1541 # armor-type [0] Int32,
1542 # armor-value [1] OCTET STRING,
1543 # ...
1545 KRB_FAST_ARMOR_obj = {
1546 'armor-type': armor_type,
1547 'armor-value': armor_value
1549 return KRB_FAST_ARMOR_obj
1551 def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1552 # KrbFastReq ::= SEQUENCE {
1553 # fast-options [0] FastOptions,
1554 # padata [1] SEQUENCE OF PA-DATA,
1555 # req-body [2] KDC-REQ-BODY,
1556 # ...
1558 KRB_FAST_REQ_obj = {
1559 'fast-options': fast_options,
1560 'padata': padata,
1561 'req-body': req_body
1563 return KRB_FAST_REQ_obj
1565 def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1566 # KrbFastArmoredReq ::= SEQUENCE {
1567 # armor [0] KrbFastArmor OPTIONAL,
1568 # req-checksum [1] Checksum,
1569 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1571 KRB_FAST_ARMORED_REQ_obj = {
1572 'req-checksum': req_checksum,
1573 'enc-fast-req': enc_fast_req
1575 if armor is not None:
1576 KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1577 return KRB_FAST_ARMORED_REQ_obj
1579 def PA_FX_FAST_REQUEST_create(self, armored_data):
1580 # PA-FX-FAST-REQUEST ::= CHOICE {
1581 # armored-data [0] KrbFastArmoredReq,
1582 # ...
1584 PA_FX_FAST_REQUEST_obj = {
1585 'armored-data': armored_data
1587 return PA_FX_FAST_REQUEST_obj
1589 def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1590 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1591 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1592 # -- include PAC.
1593 # --If FALSE, and PAC present,
1594 # -- remove PAC.
1596 KERB_PA_PAC_REQUEST_obj = {
1597 'include-pac': include_pac,
1599 if not pa_data_create:
1600 return KERB_PA_PAC_REQUEST_obj
1601 pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1602 asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1603 pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1604 return pa_data
1606 def get_pa_pac_options(self, options):
1607 pac_options = self.PA_PAC_OPTIONS_create(options)
1608 pac_options = self.der_encode(pac_options,
1609 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
1610 pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options)
1612 return pac_options
1614 def KDC_REQ_BODY_create(self,
1615 kdc_options,
1616 cname,
1617 realm,
1618 sname,
1619 from_time,
1620 till_time,
1621 renew_time,
1622 nonce,
1623 etypes,
1624 addresses,
1625 additional_tickets,
1626 EncAuthorizationData,
1627 EncAuthorizationData_key,
1628 EncAuthorizationData_usage,
1629 asn1_print=None,
1630 hexdump=None):
1631 # KDC-REQ-BODY ::= SEQUENCE {
1632 # kdc-options [0] KDCOptions,
1633 # cname [1] PrincipalName OPTIONAL
1634 # -- Used only in AS-REQ --,
1635 # realm [2] Realm
1636 # -- Server's realm
1637 # -- Also client's in AS-REQ --,
1638 # sname [3] PrincipalName OPTIONAL,
1639 # from [4] KerberosTime OPTIONAL,
1640 # till [5] KerberosTime,
1641 # rtime [6] KerberosTime OPTIONAL,
1642 # nonce [7] UInt32,
1643 # etype [8] SEQUENCE OF Int32
1644 # -- EncryptionType
1645 # -- in preference order --,
1646 # addresses [9] HostAddresses OPTIONAL,
1647 # enc-authorization-data [10] EncryptedData OPTIONAL
1648 # -- AuthorizationData --,
1649 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1650 # -- NOTE: not empty
1652 if EncAuthorizationData is not None:
1653 enc_ad_plain = self.der_encode(
1654 EncAuthorizationData,
1655 asn1Spec=krb5_asn1.AuthorizationData(),
1656 asn1_print=asn1_print,
1657 hexdump=hexdump)
1658 enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1659 EncAuthorizationData_usage,
1660 enc_ad_plain)
1661 else:
1662 enc_ad = None
1663 KDC_REQ_BODY_obj = {
1664 'kdc-options': kdc_options,
1665 'realm': realm,
1666 'till': till_time,
1667 'nonce': nonce,
1668 'etype': etypes,
1670 if cname is not None:
1671 KDC_REQ_BODY_obj['cname'] = cname
1672 if sname is not None:
1673 KDC_REQ_BODY_obj['sname'] = sname
1674 if from_time is not None:
1675 KDC_REQ_BODY_obj['from'] = from_time
1676 if renew_time is not None:
1677 KDC_REQ_BODY_obj['rtime'] = renew_time
1678 if addresses is not None:
1679 KDC_REQ_BODY_obj['addresses'] = addresses
1680 if enc_ad is not None:
1681 KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1682 if additional_tickets is not None:
1683 KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1684 return KDC_REQ_BODY_obj
1686 def KDC_REQ_create(self,
1687 msg_type,
1688 padata,
1689 req_body,
1690 asn1Spec=None,
1691 asn1_print=None,
1692 hexdump=None):
1693 # KDC-REQ ::= SEQUENCE {
1694 # -- NOTE: first tag is [1], not [0]
1695 # pvno [1] INTEGER (5) ,
1696 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1697 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1698 # -- NOTE: not empty --,
1699 # req-body [4] KDC-REQ-BODY
1702 KDC_REQ_obj = {
1703 'pvno': 5,
1704 'msg-type': msg_type,
1705 'req-body': req_body,
1707 if padata is not None:
1708 KDC_REQ_obj['padata'] = padata
1709 if asn1Spec is not None:
1710 KDC_REQ_decoded = pyasn1_native_decode(
1711 KDC_REQ_obj, asn1Spec=asn1Spec)
1712 else:
1713 KDC_REQ_decoded = None
1714 return KDC_REQ_obj, KDC_REQ_decoded
1716 def AS_REQ_create(self,
1717 padata, # optional
1718 kdc_options, # required
1719 cname, # optional
1720 realm, # required
1721 sname, # optional
1722 from_time, # optional
1723 till_time, # required
1724 renew_time, # optional
1725 nonce, # required
1726 etypes, # required
1727 addresses, # optional
1728 additional_tickets,
1729 native_decoded_only=True,
1730 asn1_print=None,
1731 hexdump=None):
1732 # KDC-REQ ::= SEQUENCE {
1733 # -- NOTE: first tag is [1], not [0]
1734 # pvno [1] INTEGER (5) ,
1735 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1736 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1737 # -- NOTE: not empty --,
1738 # req-body [4] KDC-REQ-BODY
1741 # KDC-REQ-BODY ::= SEQUENCE {
1742 # kdc-options [0] KDCOptions,
1743 # cname [1] PrincipalName OPTIONAL
1744 # -- Used only in AS-REQ --,
1745 # realm [2] Realm
1746 # -- Server's realm
1747 # -- Also client's in AS-REQ --,
1748 # sname [3] PrincipalName OPTIONAL,
1749 # from [4] KerberosTime OPTIONAL,
1750 # till [5] KerberosTime,
1751 # rtime [6] KerberosTime OPTIONAL,
1752 # nonce [7] UInt32,
1753 # etype [8] SEQUENCE OF Int32
1754 # -- EncryptionType
1755 # -- in preference order --,
1756 # addresses [9] HostAddresses OPTIONAL,
1757 # enc-authorization-data [10] EncryptedData OPTIONAL
1758 # -- AuthorizationData --,
1759 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1760 # -- NOTE: not empty
1762 KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1763 kdc_options,
1764 cname,
1765 realm,
1766 sname,
1767 from_time,
1768 till_time,
1769 renew_time,
1770 nonce,
1771 etypes,
1772 addresses,
1773 additional_tickets,
1774 EncAuthorizationData=None,
1775 EncAuthorizationData_key=None,
1776 EncAuthorizationData_usage=None,
1777 asn1_print=asn1_print,
1778 hexdump=hexdump)
1779 obj, decoded = self.KDC_REQ_create(
1780 msg_type=KRB_AS_REQ,
1781 padata=padata,
1782 req_body=KDC_REQ_BODY_obj,
1783 asn1Spec=krb5_asn1.AS_REQ(),
1784 asn1_print=asn1_print,
1785 hexdump=hexdump)
1786 if native_decoded_only:
1787 return decoded
1788 return decoded, obj
1790 def AP_REQ_create(self, ap_options, ticket, authenticator):
1791 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1792 # pvno [0] INTEGER (5),
1793 # msg-type [1] INTEGER (14),
1794 # ap-options [2] APOptions,
1795 # ticket [3] Ticket,
1796 # authenticator [4] EncryptedData -- Authenticator
1798 AP_REQ_obj = {
1799 'pvno': 5,
1800 'msg-type': KRB_AP_REQ,
1801 'ap-options': ap_options,
1802 'ticket': ticket,
1803 'authenticator': authenticator,
1805 return AP_REQ_obj
1807 def Authenticator_create(
1808 self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1809 authorization_data):
1810 # -- Unencrypted authenticator
1811 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1812 # authenticator-vno [0] INTEGER (5),
1813 # crealm [1] Realm,
1814 # cname [2] PrincipalName,
1815 # cksum [3] Checksum OPTIONAL,
1816 # cusec [4] Microseconds,
1817 # ctime [5] KerberosTime,
1818 # subkey [6] EncryptionKey OPTIONAL,
1819 # seq-number [7] UInt32 OPTIONAL,
1820 # authorization-data [8] AuthorizationData OPTIONAL
1822 Authenticator_obj = {
1823 'authenticator-vno': 5,
1824 'crealm': crealm,
1825 'cname': cname,
1826 'cusec': cusec,
1827 'ctime': ctime,
1829 if cksum is not None:
1830 Authenticator_obj['cksum'] = cksum
1831 if subkey is not None:
1832 Authenticator_obj['subkey'] = subkey
1833 if seq_number is not None:
1834 Authenticator_obj['seq-number'] = seq_number
1835 if authorization_data is not None:
1836 Authenticator_obj['authorization-data'] = authorization_data
1837 return Authenticator_obj
1839 def PKAuthenticator_create(self,
1840 cusec,
1841 ctime,
1842 nonce,
1844 pa_checksum=None,
1845 freshness_token=None):
1846 pk_authenticator_obj = {
1847 'cusec': cusec,
1848 'ctime': ctime,
1849 'nonce': nonce,
1851 if pa_checksum is not None:
1852 pk_authenticator_obj['paChecksum'] = pa_checksum
1853 if freshness_token is not None:
1854 pk_authenticator_obj['freshnessToken'] = freshness_token
1856 return pk_authenticator_obj
1858 def TGS_REQ_create(self,
1859 padata, # optional
1860 cusec,
1861 ctime,
1862 ticket,
1863 kdc_options, # required
1864 cname, # optional
1865 realm, # required
1866 sname, # optional
1867 from_time, # optional
1868 till_time, # required
1869 renew_time, # optional
1870 nonce, # required
1871 etypes, # required
1872 addresses, # optional
1873 EncAuthorizationData,
1874 EncAuthorizationData_key,
1875 additional_tickets,
1876 ticket_session_key,
1877 authenticator_subkey=None,
1878 body_checksum_type=None,
1879 native_decoded_only=True,
1880 asn1_print=None,
1881 hexdump=None):
1882 # KDC-REQ ::= SEQUENCE {
1883 # -- NOTE: first tag is [1], not [0]
1884 # pvno [1] INTEGER (5) ,
1885 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1886 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1887 # -- NOTE: not empty --,
1888 # req-body [4] KDC-REQ-BODY
1891 # KDC-REQ-BODY ::= SEQUENCE {
1892 # kdc-options [0] KDCOptions,
1893 # cname [1] PrincipalName OPTIONAL
1894 # -- Used only in AS-REQ --,
1895 # realm [2] Realm
1896 # -- Server's realm
1897 # -- Also client's in AS-REQ --,
1898 # sname [3] PrincipalName OPTIONAL,
1899 # from [4] KerberosTime OPTIONAL,
1900 # till [5] KerberosTime,
1901 # rtime [6] KerberosTime OPTIONAL,
1902 # nonce [7] UInt32,
1903 # etype [8] SEQUENCE OF Int32
1904 # -- EncryptionType
1905 # -- in preference order --,
1906 # addresses [9] HostAddresses OPTIONAL,
1907 # enc-authorization-data [10] EncryptedData OPTIONAL
1908 # -- AuthorizationData --,
1909 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1910 # -- NOTE: not empty
1913 if authenticator_subkey is not None:
1914 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1915 else:
1916 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1918 req_body = self.KDC_REQ_BODY_create(
1919 kdc_options=kdc_options,
1920 cname=None,
1921 realm=realm,
1922 sname=sname,
1923 from_time=from_time,
1924 till_time=till_time,
1925 renew_time=renew_time,
1926 nonce=nonce,
1927 etypes=etypes,
1928 addresses=addresses,
1929 additional_tickets=additional_tickets,
1930 EncAuthorizationData=EncAuthorizationData,
1931 EncAuthorizationData_key=EncAuthorizationData_key,
1932 EncAuthorizationData_usage=EncAuthorizationData_usage)
1933 req_body_blob = self.der_encode(req_body,
1934 asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1935 asn1_print=asn1_print, hexdump=hexdump)
1937 req_body_checksum = self.Checksum_create(ticket_session_key,
1938 KU_TGS_REQ_AUTH_CKSUM,
1939 req_body_blob,
1940 ctype=body_checksum_type)
1942 subkey_obj = None
1943 if authenticator_subkey is not None:
1944 subkey_obj = authenticator_subkey.export_obj()
1945 seq_number = random.randint(0, 0xfffffffe)
1946 authenticator = self.Authenticator_create(
1947 crealm=realm,
1948 cname=cname,
1949 cksum=req_body_checksum,
1950 cusec=cusec,
1951 ctime=ctime,
1952 subkey=subkey_obj,
1953 seq_number=seq_number,
1954 authorization_data=None)
1955 authenticator = self.der_encode(
1956 authenticator,
1957 asn1Spec=krb5_asn1.Authenticator(),
1958 asn1_print=asn1_print,
1959 hexdump=hexdump)
1961 authenticator = self.EncryptedData_create(
1962 ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1964 ap_options = krb5_asn1.APOptions('0')
1965 ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1966 ticket=ticket,
1967 authenticator=authenticator)
1968 ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1969 asn1_print=asn1_print, hexdump=hexdump)
1970 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1971 if padata is not None:
1972 padata.append(pa_tgs_req)
1973 else:
1974 padata = [pa_tgs_req]
1976 obj, decoded = self.KDC_REQ_create(
1977 msg_type=KRB_TGS_REQ,
1978 padata=padata,
1979 req_body=req_body,
1980 asn1Spec=krb5_asn1.TGS_REQ(),
1981 asn1_print=asn1_print,
1982 hexdump=hexdump)
1983 if native_decoded_only:
1984 return decoded
1985 return decoded, obj
1987 def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1988 # PA-S4U2Self ::= SEQUENCE {
1989 # name [0] PrincipalName,
1990 # realm [1] Realm,
1991 # cksum [2] Checksum,
1992 # auth [3] GeneralString
1994 cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1995 for n in name['name-string']:
1996 cksum_data += n.encode()
1997 cksum_data += realm.encode()
1998 cksum_data += "Kerberos".encode()
1999 cksum = self.Checksum_create(tgt_session_key,
2000 KU_NON_KERB_CKSUM_SALT,
2001 cksum_data,
2002 ctype)
2004 PA_S4U2Self_obj = {
2005 'name': name,
2006 'realm': realm,
2007 'cksum': cksum,
2008 'auth': "Kerberos",
2010 pa_s4u2self = self.der_encode(
2011 PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
2012 return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
2014 def ChangePasswdDataMS_create(self,
2015 new_password,
2016 target_princ=None,
2017 target_realm=None):
2018 ChangePasswdDataMS_obj = {
2019 'newpasswd': new_password,
2021 if target_princ is not None:
2022 ChangePasswdDataMS_obj['targname'] = target_princ
2023 if target_realm is not None:
2024 ChangePasswdDataMS_obj['targrealm'] = target_realm
2026 change_password_data = self.der_encode(
2027 ChangePasswdDataMS_obj, asn1Spec=krb5_asn1.ChangePasswdDataMS())
2029 return change_password_data
2031 def KRB_PRIV_create(self,
2032 subkey,
2033 user_data,
2034 s_address,
2035 timestamp=None,
2036 usec=None,
2037 seq_number=None,
2038 r_address=None):
2039 EncKrbPrivPart_obj = {
2040 'user-data': user_data,
2041 's-address': s_address,
2043 if timestamp is not None:
2044 EncKrbPrivPart_obj['timestamp'] = timestamp
2045 if usec is not None:
2046 EncKrbPrivPart_obj['usec'] = usec
2047 if seq_number is not None:
2048 EncKrbPrivPart_obj['seq-number'] = seq_number
2049 if r_address is not None:
2050 EncKrbPrivPart_obj['r-address'] = r_address
2052 enc_krb_priv_part = self.der_encode(
2053 EncKrbPrivPart_obj, asn1Spec=krb5_asn1.EncKrbPrivPart())
2055 enc_data = self.EncryptedData_create(subkey,
2056 KU_KRB_PRIV,
2057 enc_krb_priv_part)
2059 KRB_PRIV_obj = {
2060 'pvno': 5,
2061 'msg-type': KRB_PRIV,
2062 'enc-part': enc_data,
2065 krb_priv = self.der_encode(
2066 KRB_PRIV_obj, asn1Spec=krb5_asn1.KRB_PRIV())
2068 return krb_priv
2070 def ContentInfo_create(self, content_type, content):
2071 content_info_obj = {
2072 'contentType': content_type,
2073 'content': content,
2076 return content_info_obj
2078 def EncapsulatedContentInfo_create(self, content_type, content):
2079 encapsulated_content_info_obj = {
2080 'eContentType': content_type,
2081 'eContent': content,
2084 return encapsulated_content_info_obj
2086 def SignedData_create(self,
2087 digest_algorithms,
2088 encap_content_info,
2089 signer_infos,
2091 version=None,
2092 certificates=None,
2093 crls=None):
2094 def is_cert_version_present(version):
2095 return certificates is not None and any(
2096 version in cert for cert in certificates)
2098 def is_crl_version_present(version):
2099 return crls is not None and any(
2100 version in crl for crl in crls)
2102 def is_signer_info_version_present(version):
2103 return signer_infos is not None and any(
2104 signer_info['version'] == version
2105 for signer_info in signer_infos)
2107 def data_version():
2108 # per RFC5652 5.1:
2109 if is_cert_version_present('other') or (
2110 is_crl_version_present('other')):
2111 return 5
2113 if is_cert_version_present('v2AttrCert'):
2114 return 4
2116 if is_cert_version_present('v1AttrCert') or (
2117 is_signer_info_version_present(3)) or (
2118 encap_content_info['eContentType'] != krb5_asn1.id_data
2120 return 3
2122 return 1
2124 if version is None:
2125 version = data_version()
2127 signed_data_obj = {
2128 'version': version,
2129 'digestAlgorithms': digest_algorithms,
2130 'encapContentInfo': encap_content_info,
2131 'signerInfos': signer_infos,
2134 if certificates is not None:
2135 signed_data_obj['certificates'] = certificates
2136 if crls is not None:
2137 signed_data_obj['crls'] = crls
2139 return signed_data_obj
2141 def AuthPack_create(self,
2142 pk_authenticator,
2144 client_public_value=None,
2145 supported_cms_types=None,
2146 client_dh_nonce=None):
2147 auth_pack_obj = {
2148 'pkAuthenticator': pk_authenticator,
2151 if client_public_value is not None:
2152 auth_pack_obj['clientPublicValue'] = client_public_value
2153 if supported_cms_types is not None:
2154 auth_pack_obj['supportedCMSTypes'] = supported_cms_types
2155 if client_dh_nonce is not None:
2156 auth_pack_obj['clientDHNonce'] = client_dh_nonce
2158 return auth_pack_obj
2160 def PK_AS_REQ_create(self,
2161 signed_auth_pack,
2163 trusted_certifiers=None,
2164 kdc_pk_id=None):
2165 content_info_obj = self.ContentInfo_create(
2166 krb5_asn1.id_signedData, signed_auth_pack)
2167 content_info = self.der_encode(content_info_obj,
2168 asn1Spec=krb5_asn1.ContentInfo())
2170 pk_as_req_obj = {
2171 'signedAuthPack': content_info,
2174 if trusted_certifiers is not None:
2175 pk_as_req_obj['trustedCertifiers'] = trusted_certifiers
2176 if kdc_pk_id is not None:
2177 pk_as_req_obj['kdcPkId'] = kdc_pk_id
2179 return self.der_encode(pk_as_req_obj,
2180 asn1Spec=krb5_asn1.PA_PK_AS_REQ())
2182 def SignerInfo_create(self,
2183 signer_id,
2184 digest_algorithm,
2185 signature_algorithm,
2186 signature,
2188 version=None,
2189 signed_attrs=None,
2190 unsigned_attrs=None):
2191 if version is None:
2192 # per RFC5652 5.3:
2193 if 'issuerAndSerialNumber' in signer_id:
2194 version = 1
2195 elif 'subjectKeyIdentifier' in signer_id:
2196 version = 3
2197 else:
2198 self.fail(f'unknown signer ID version ({signer_id})')
2200 signer_info_obj = {
2201 'version': version,
2202 'sid': signer_id,
2203 'digestAlgorithm': digest_algorithm,
2204 'signatureAlgorithm': signature_algorithm,
2205 'signature': signature,
2208 if signed_attrs is not None:
2209 signer_info_obj['signedAttrs'] = signed_attrs
2210 if unsigned_attrs is not None:
2211 signer_info_obj['unsignedAttrs'] = unsigned_attrs
2213 return signer_info_obj
2215 def SignerIdentifier_create(self, *,
2216 issuer_and_serial_number=None,
2217 subject_key_id=None):
2218 if issuer_and_serial_number is not None:
2219 return {'issuerAndSerialNumber': issuer_and_serial_number}
2221 if subject_key_id is not None:
2222 return {'subjectKeyIdentifier': subject_key_id}
2224 self.fail('identifier not specified')
2226 def AlgorithmIdentifier_create(self,
2227 algorithm,
2229 parameters=None):
2230 algorithm_id_obj = {
2231 'algorithm': algorithm,
2234 if parameters is not None:
2235 algorithm_id_obj['parameters'] = parameters
2237 return algorithm_id_obj
2239 def SubjectPublicKeyInfo_create(self,
2240 algorithm,
2241 public_key):
2242 return {
2243 'algorithm': algorithm,
2244 'subjectPublicKey': public_key,
2247 def ValidationParms_create(self,
2248 seed,
2249 pgen_counter):
2250 return {
2251 'seed': seed,
2252 'pgenCounter': pgen_counter,
2255 def DomainParameters_create(self,
2259 q=None,
2260 j=None,
2261 validation_parms=None):
2262 domain_params_obj = {
2263 'p': p,
2264 'g': g,
2267 if q is not None:
2268 domain_params_obj['q'] = q
2269 if j is not None:
2270 domain_params_obj['j'] = j
2271 if validation_parms is not None:
2272 domain_params_obj['validationParms'] = validation_parms
2274 return domain_params_obj
2276 def length_in_bytes(self, value):
2277 """Return the length in bytes of an integer once it is encoded as
2278 bytes."""
2280 self.assertGreaterEqual(value, 0, 'value must be positive')
2281 self.assertIsInstance(value, int)
2283 length_in_bits = max(1, math.log2(value + 1))
2284 length_in_bytes = math.ceil(length_in_bits / 8)
2285 return length_in_bytes
2287 def bytes_from_int(self, value, *, length=None):
2288 """Return an integer encoded big-endian into bytes of an optionally
2289 specified length.
2291 if length is None:
2292 length = self.length_in_bytes(value)
2293 return value.to_bytes(length, 'big')
2295 def int_from_bytes(self, data):
2296 """Return an integer decoded from bytes in big-endian format."""
2297 return int.from_bytes(data, 'big')
2299 def int_from_bit_string(self, string):
2300 """Return an integer decoded from a bitstring."""
2301 return int(string, base=2)
2303 def bit_string_from_int(self, value):
2304 """Return a bitstring encoding of an integer."""
2306 string = f'{value:b}'
2308 # The bitstring must be padded to a multiple of 8 bits in length, or
2309 # pyasn1 will interpret it incorrectly (as if the padding bits were
2310 # present, but on the wrong end).
2311 length = len(string)
2312 padding_len = math.ceil(length / 8) * 8 - length
2313 return '0' * padding_len + string
2315 def bit_string_from_bytes(self, data):
2316 """Return a bitstring encoding of bytes in big-endian format."""
2317 value = self.int_from_bytes(data)
2318 return self.bit_string_from_int(value)
2320 def bytes_from_bit_string(self, string):
2321 """Return big-endian format bytes encoded from a bitstring."""
2322 value = self.int_from_bit_string(string)
2323 length = math.ceil(len(string) / 8)
2324 return value.to_bytes(length, 'big')
2326 def asn1_length(self, data):
2327 """Return the ASN.1 encoding of the length of some data."""
2329 length = len(data)
2331 self.assertGreater(length, 0)
2332 if length < 0x80:
2333 return bytes([length])
2335 encoding_len = self.length_in_bytes(length)
2336 self.assertLess(encoding_len, 0x80,
2337 'item is too long to be ASN.1 encoded')
2339 data = self.bytes_from_int(length, length=encoding_len)
2340 return bytes([0x80 | encoding_len]) + data
2342 @staticmethod
2343 def octetstring2key(x, enctype):
2344 """This implements the function defined in RFC4556 3.2.3.1 “Using
2345 Diffie-Hellman Key Exchange”."""
2347 seedsize = kcrypto.seedsize(enctype)
2348 seed = b''
2350 # A counter that cycles through the bytes 0x00–0xff.
2351 counter = itertools.cycle(map(lambda x: bytes([x]),
2352 range(256)))
2354 while len(seed) < seedsize:
2355 digest = hashes.Hash(hashes.SHA1(), default_backend())
2356 digest.update(next(counter) + x)
2357 seed += digest.finalize()
2359 key = kcrypto.random_to_key(enctype, seed[:seedsize])
2360 return RodcPacEncryptionKey(key, kvno=None)
2362 def unpad(self, data):
2363 """Return unpadded data."""
2364 padding_len = data[-1]
2365 expected_padding = bytes([padding_len]) * padding_len
2366 self.assertEqual(expected_padding, data[-padding_len:],
2367 'invalid padding bytes')
2369 return data[:-padding_len]
2371 def try_decode(self, data, module=None):
2372 """Try to decode some data of unknown type with various known ASN.1
2373 schemata (optionally restricted to those from a particular module) and
2374 print any results that seem promising. For use when debugging.
2377 if module is None:
2378 # Try a couple of known ASN.1 modules.
2379 self.try_decode(data, krb5_asn1)
2380 self.try_decode(data, pyasn1.type.univ)
2382 # It’s helpful to stop and give the user a chance to examine the
2383 # results.
2384 self.fail('decoding done')
2386 names = dir(module)
2387 for name in names:
2388 item = getattr(module, name)
2389 if not callable(item):
2390 continue
2392 try:
2393 decoded = self.der_decode(data, asn1Spec=item())
2394 except Exception:
2395 # Initiating the schema or decoding the ASN.1 failed for
2396 # whatever reason.
2397 pass
2398 else:
2399 # Decoding succeeded: print the structure to be examined.
2400 print(f'\t{name}')
2401 pprint(decoded)
2403 def cipher_from_algorithm(self, algorithm):
2404 if algorithm == str(krb5_asn1.aes256_CBC_PAD):
2405 return algorithms.AES
2407 if algorithm == str(krb5_asn1.des_EDE3_CBC):
2408 return algorithms.TripleDES
2410 self.fail(f'unknown cipher algorithm {algorithm}')
2412 def hash_from_algorithm(self, algorithm):
2413 # Let someone pass in an ObjectIdentifier.
2414 algorithm = str(algorithm)
2416 if algorithm == str(krb5_asn1.id_sha1):
2417 return hashes.SHA1
2419 if algorithm == str(krb5_asn1.sha1WithRSAEncryption):
2420 return hashes.SHA1
2422 if algorithm == str(krb5_asn1.rsaEncryption):
2423 return hashes.SHA1
2425 if algorithm == str(krb5_asn1.id_pkcs1_sha256WithRSAEncryption):
2426 return hashes.SHA256
2428 if algorithm == str(krb5_asn1.id_sha512):
2429 return hashes.SHA512
2431 self.fail(f'unknown hash algorithm {algorithm}')
2433 def hash_from_algorithm_id(self, algorithm_id):
2434 self.assertIsInstance(algorithm_id, dict)
2436 hash = self.hash_from_algorithm(algorithm_id['algorithm'])
2438 parameters = algorithm_id.get('parameters')
2439 if self.strict_checking:
2440 self.assertIsNotNone(parameters)
2441 if parameters is not None:
2442 self.assertEqual(b'\x05\x00', parameters)
2444 return hash
2446 def create_freshness_token(self,
2447 epoch=None,
2449 offset=None,
2450 krbtgt_creds=None):
2451 timestamp, usec = self.get_KerberosTimeWithUsec(epoch, offset)
2453 # Encode the freshness token as PA-ENC-TS-ENC.
2454 ts_enc = self.PA_ENC_TS_ENC_create(timestamp, usec)
2455 ts_enc = self.der_encode(ts_enc, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
2457 if krbtgt_creds is None:
2458 krbtgt_creds = self.get_krbtgt_creds()
2459 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
2461 # Encrypt the freshness token.
2462 freshness = self.EncryptedData_create(krbtgt_key, KU_AS_FRESHNESS, ts_enc)
2464 freshness_token = self.der_encode(freshness,
2465 asn1Spec=krb5_asn1.EncryptedData())
2467 # Prepend a couple of zero bytes.
2468 freshness_token = bytes(2) + freshness_token
2470 return freshness_token
2472 def kpasswd_create(self,
2473 subkey,
2474 user_data,
2475 version,
2476 seq_number,
2477 ap_req,
2478 local_address,
2479 remote_address):
2480 self.assertIsNotNone(self.s, 'call self.connect() first')
2482 timestamp, usec = self.get_KerberosTimeWithUsec()
2484 krb_priv = self.KRB_PRIV_create(subkey,
2485 user_data,
2486 s_address=local_address,
2487 timestamp=timestamp,
2488 usec=usec,
2489 seq_number=seq_number,
2490 r_address=remote_address)
2492 size = 6 + len(ap_req) + len(krb_priv)
2493 self.assertLess(size, 0x10000)
2495 msg = bytearray()
2496 msg.append(size >> 8)
2497 msg.append(size & 0xff)
2498 msg.append(version >> 8)
2499 msg.append(version & 0xff)
2500 msg.append(len(ap_req) >> 8)
2501 msg.append(len(ap_req) & 0xff)
2502 # Note: for sets, there could be a little-endian four-byte length here.
2504 msg.extend(ap_req)
2505 msg.extend(krb_priv)
2507 return msg
2509 def get_enc_part(self, obj, key, usage):
2510 self.assertElementEqual(obj, 'pvno', 5)
2512 enc_part = obj['enc-part']
2513 self.assertElementEqual(enc_part, 'etype', key.etype)
2514 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
2516 enc_part = key.decrypt(usage, enc_part['cipher'])
2518 return enc_part
2520 def kpasswd_exchange(self,
2521 ticket,
2522 new_password,
2523 expected_code,
2524 expected_msg,
2525 mode,
2526 target_princ=None,
2527 target_realm=None,
2528 ap_options=None,
2529 send_seq_number=True):
2530 if mode is self.KpasswdMode.SET:
2531 version = 0xff80
2532 user_data = self.ChangePasswdDataMS_create(new_password,
2533 target_princ,
2534 target_realm)
2535 elif mode is self.KpasswdMode.CHANGE:
2536 self.assertIsNone(target_princ,
2537 'target_princ only valid for pw set')
2538 self.assertIsNone(target_realm,
2539 'target_realm only valid for pw set')
2541 version = 1
2542 user_data = new_password.encode('utf-8')
2543 else:
2544 self.fail(f'invalid mode {mode}')
2546 subkey = self.RandomKey(kcrypto.Enctype.AES256)
2548 if ap_options is None:
2549 ap_options = '0'
2550 ap_options = str(krb5_asn1.APOptions(ap_options))
2552 kdc_exchange_dict = {
2553 'tgt': ticket,
2554 'authenticator_subkey': subkey,
2555 'auth_data': None,
2556 'ap_options': ap_options,
2559 if send_seq_number:
2560 seq_number = random.randint(0, 0xfffffffe)
2561 else:
2562 seq_number = None
2564 ap_req = self.generate_ap_req(kdc_exchange_dict,
2565 None,
2566 req_body=None,
2567 armor=False,
2568 usage=KU_AP_REQ_AUTH,
2569 seq_number=seq_number)
2571 self.connect(self.host, port=464)
2572 self.assertIsNotNone(self.s)
2574 family = self.s.family
2576 if family == socket.AF_INET:
2577 addr_type = 2 # IPv4
2578 elif family == socket.AF_INET6:
2579 addr_type = 24 # IPv6
2580 else:
2581 self.fail(f'unknown family {family}')
2583 def create_address(ip):
2584 return {
2585 'addr-type': addr_type,
2586 'address': socket.inet_pton(family, ip),
2589 local_ip = self.s.getsockname()[0]
2590 local_address = create_address(local_ip)
2592 # remote_ip = self.s.getpeername()[0]
2593 # remote_address = create_address(remote_ip)
2595 # TODO: due to a bug (?), MIT Kerberos will not accept the request
2596 # unless r-address is set to our _local_ address. Heimdal, on the other
2597 # hand, requires the r-address is set to the remote address (as
2598 # expected). To avoid problems, avoid sending r-address for now.
2599 remote_address = None
2601 msg = self.kpasswd_create(subkey,
2602 user_data,
2603 version,
2604 seq_number,
2605 ap_req,
2606 local_address,
2607 remote_address)
2609 self.send_msg(msg)
2610 rep_pdu = self.recv_pdu_raw()
2612 self._disconnect('transaction done')
2614 self.assertIsNotNone(rep_pdu)
2616 header = rep_pdu[:6]
2617 reply = rep_pdu[6:]
2619 reply_len = (header[0] << 8) | header[1]
2620 reply_version = (header[2] << 8) | header[3]
2621 ap_rep_len = (header[4] << 8) | header[5]
2623 self.assertEqual(reply_len, len(rep_pdu))
2624 self.assertEqual(1, reply_version) # KRB5_KPASSWD_VERS_CHANGEPW
2625 self.assertLess(ap_rep_len, reply_len)
2627 self.assertNotEqual(0x7e, rep_pdu[1])
2628 self.assertNotEqual(0x5e, rep_pdu[1])
2630 if ap_rep_len:
2631 # We received an AP-REQ and KRB-PRIV as a response. This may or may
2632 # not indicate an error, depending on the status code.
2633 ap_rep = reply[:ap_rep_len]
2634 krb_priv = reply[ap_rep_len:]
2636 key = ticket.session_key
2638 ap_rep = self.der_decode(ap_rep, asn1Spec=krb5_asn1.AP_REP())
2639 self.assertElementEqual(ap_rep, 'msg-type', KRB_AP_REP)
2640 enc_part = self.get_enc_part(ap_rep, key, KU_AP_REQ_ENC_PART)
2641 enc_part = self.der_decode(
2642 enc_part, asn1Spec=krb5_asn1.EncAPRepPart())
2644 self.assertElementPresent(enc_part, 'ctime')
2645 self.assertElementPresent(enc_part, 'cusec')
2646 # self.assertElementMissing(enc_part, 'subkey') # TODO
2647 # self.assertElementPresent(enc_part, 'seq-number') # TODO
2649 try:
2650 krb_priv = self.der_decode(krb_priv, asn1Spec=krb5_asn1.KRB_PRIV())
2651 except PyAsn1Error:
2652 self.fail()
2654 self.assertElementEqual(krb_priv, 'msg-type', KRB_PRIV)
2655 priv_enc_part = self.get_enc_part(krb_priv, subkey, KU_KRB_PRIV)
2656 priv_enc_part = self.der_decode(
2657 priv_enc_part, asn1Spec=krb5_asn1.EncKrbPrivPart())
2659 self.assertElementMissing(priv_enc_part, 'timestamp')
2660 self.assertElementMissing(priv_enc_part, 'usec')
2661 # self.assertElementPresent(priv_enc_part, 'seq-number') # TODO
2662 # self.assertElementEqual(priv_enc_part, 's-address', remote_address) # TODO
2663 # self.assertElementMissing(priv_enc_part, 'r-address') # TODO
2665 result_data = priv_enc_part['user-data']
2666 else:
2667 # We received a KRB-ERROR as a response, indicating an error.
2668 krb_error = self.der_decode(reply, asn1Spec=krb5_asn1.KRB_ERROR())
2670 sname = self.PrincipalName_create(
2671 name_type=NT_PRINCIPAL,
2672 names=['kadmin', 'changepw'])
2673 realm = self.get_krbtgt_creds().get_realm().upper()
2675 self.assertElementEqual(krb_error, 'pvno', 5)
2676 self.assertElementEqual(krb_error, 'msg-type', KRB_ERROR)
2677 self.assertElementMissing(krb_error, 'ctime')
2678 self.assertElementMissing(krb_error, 'usec')
2679 self.assertElementPresent(krb_error, 'stime')
2680 self.assertElementPresent(krb_error, 'susec')
2682 error_code = krb_error['error-code']
2683 if isinstance(expected_code, int):
2684 self.assertEqual(error_code, expected_code)
2685 else:
2686 self.assertIn(error_code, expected_code)
2688 self.assertElementMissing(krb_error, 'crealm')
2689 self.assertElementMissing(krb_error, 'cname')
2690 self.assertElementEqual(krb_error, 'realm', realm.encode('utf-8'))
2691 self.assertElementEqualPrincipal(krb_error, 'sname', sname)
2692 self.assertElementMissing(krb_error, 'e-text')
2694 result_data = krb_error['e-data']
2696 status = result_data[:2]
2697 message = result_data[2:]
2699 status_code = (status[0] << 8) | status[1]
2700 if isinstance(expected_code, int):
2701 self.assertEqual(status_code, expected_code)
2702 else:
2703 self.assertIn(status_code, expected_code)
2705 if not message:
2706 self.assertEqual(0, status_code,
2707 'got an error result, but no message')
2708 return
2710 # Check the first character of the message.
2711 if message[0]:
2712 if isinstance(expected_msg, bytes):
2713 self.assertEqual(message, expected_msg)
2714 else:
2715 self.assertIn(message, expected_msg)
2716 else:
2717 # We got AD password policy information.
2718 self.assertEqual(30, len(message))
2720 (empty_bytes,
2721 min_length,
2722 history_length,
2723 properties,
2724 expire_time,
2725 min_age) = struct.unpack('>HIIIQQ', message)
2727 def _generic_kdc_exchange(self,
2728 kdc_exchange_dict, # required
2729 cname=None, # optional
2730 realm=None, # required
2731 sname=None, # optional
2732 from_time=None, # optional
2733 till_time=None, # required
2734 renew_time=None, # optional
2735 etypes=None, # required
2736 addresses=None, # optional
2737 additional_tickets=None, # optional
2738 EncAuthorizationData=None, # optional
2739 EncAuthorizationData_key=None, # optional
2740 EncAuthorizationData_usage=None): # optional
2742 check_error_fn = kdc_exchange_dict['check_error_fn']
2743 check_rep_fn = kdc_exchange_dict['check_rep_fn']
2744 generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
2745 generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
2746 generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
2747 generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
2748 callback_dict = kdc_exchange_dict['callback_dict']
2749 req_msg_type = kdc_exchange_dict['req_msg_type']
2750 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
2751 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2753 expected_error_mode = kdc_exchange_dict['expected_error_mode']
2754 kdc_options = kdc_exchange_dict['kdc_options']
2756 pac_request = kdc_exchange_dict['pac_request']
2757 pac_options = kdc_exchange_dict['pac_options']
2759 # Parameters specific to the inner request body
2760 inner_req = kdc_exchange_dict['inner_req']
2762 # Parameters specific to the outer request body
2763 outer_req = kdc_exchange_dict['outer_req']
2765 if till_time is None:
2766 till_time = self.get_KerberosTime(offset=36000)
2768 if 'nonce' in kdc_exchange_dict:
2769 nonce = kdc_exchange_dict['nonce']
2770 else:
2771 nonce = self.get_Nonce()
2772 kdc_exchange_dict['nonce'] = nonce
2774 req_body = self.KDC_REQ_BODY_create(
2775 kdc_options=kdc_options,
2776 cname=cname,
2777 realm=realm,
2778 sname=sname,
2779 from_time=from_time,
2780 till_time=till_time,
2781 renew_time=renew_time,
2782 nonce=nonce,
2783 etypes=etypes,
2784 addresses=addresses,
2785 additional_tickets=additional_tickets,
2786 EncAuthorizationData=EncAuthorizationData,
2787 EncAuthorizationData_key=EncAuthorizationData_key,
2788 EncAuthorizationData_usage=EncAuthorizationData_usage)
2790 inner_req_body = dict(req_body)
2791 if inner_req is not None:
2792 for key, value in inner_req.items():
2793 if value is not None:
2794 inner_req_body[key] = value
2795 else:
2796 del inner_req_body[key]
2797 if outer_req is not None:
2798 for key, value in outer_req.items():
2799 if value is not None:
2800 req_body[key] = value
2801 else:
2802 del req_body[key]
2804 additional_padata = []
2805 if pac_request is not None:
2806 pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request)
2807 additional_padata.append(pa_pac_request)
2808 if pac_options is not None:
2809 pa_pac_options = self.get_pa_pac_options(pac_options)
2810 additional_padata.append(pa_pac_options)
2812 if req_msg_type == KRB_AS_REQ:
2813 tgs_req = None
2814 tgs_req_padata = None
2815 else:
2816 self.assertEqual(KRB_TGS_REQ, req_msg_type)
2818 tgs_req = self.generate_ap_req(kdc_exchange_dict,
2819 callback_dict,
2820 req_body,
2821 armor=False)
2822 tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
2824 if generate_fast_padata_fn is not None:
2825 self.assertIsNotNone(generate_fast_fn)
2826 # This can alter req_body...
2827 fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
2828 callback_dict,
2829 req_body)
2830 else:
2831 fast_padata = []
2833 if generate_fast_armor_fn is not None:
2834 self.assertIsNotNone(generate_fast_fn)
2835 fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
2836 callback_dict,
2837 None,
2838 armor=True)
2840 fast_armor_type = kdc_exchange_dict['fast_armor_type']
2841 fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
2842 fast_ap_req)
2843 else:
2844 fast_armor = None
2846 if generate_padata_fn is not None:
2847 # This can alter req_body...
2848 outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
2849 callback_dict,
2850 req_body)
2851 self.assertIsNotNone(outer_padata)
2852 self.assertNotIn(PADATA_KDC_REQ,
2853 [pa['padata-type'] for pa in outer_padata],
2854 'Don\'t create TGS-REQ manually')
2855 else:
2856 outer_padata = None
2858 if generate_fast_fn is not None:
2859 armor_key = kdc_exchange_dict['armor_key']
2860 self.assertIsNotNone(armor_key)
2862 if req_msg_type == KRB_AS_REQ:
2863 checksum_blob = self.der_encode(
2864 req_body,
2865 asn1Spec=krb5_asn1.KDC_REQ_BODY())
2866 else:
2867 self.assertEqual(KRB_TGS_REQ, req_msg_type)
2868 checksum_blob = tgs_req
2870 checksum = self.Checksum_create(armor_key,
2871 KU_FAST_REQ_CHKSUM,
2872 checksum_blob)
2874 fast_padata += additional_padata
2875 fast = generate_fast_fn(kdc_exchange_dict,
2876 callback_dict,
2877 inner_req_body,
2878 fast_padata,
2879 fast_armor,
2880 checksum)
2881 else:
2882 fast = None
2884 padata = []
2886 if tgs_req_padata is not None:
2887 padata.append(tgs_req_padata)
2889 if fast is not None:
2890 padata.append(fast)
2892 if outer_padata is not None:
2893 padata += outer_padata
2895 if fast is None:
2896 padata += additional_padata
2898 if not padata:
2899 padata = None
2901 kdc_exchange_dict['req_padata'] = padata
2902 kdc_exchange_dict['fast_padata'] = fast_padata
2903 kdc_exchange_dict['req_body'] = inner_req_body
2905 req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
2906 padata=padata,
2907 req_body=req_body,
2908 asn1Spec=req_asn1Spec())
2910 kdc_exchange_dict['req_obj'] = req_obj
2912 to_rodc = kdc_exchange_dict['to_rodc']
2914 rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc)
2915 self.assertIsNotNone(rep)
2917 msg_type = self.getElementValue(rep, 'msg-type')
2918 self.assertIsNotNone(msg_type)
2920 expected_msg_type = None
2921 if check_error_fn is not None:
2922 expected_msg_type = KRB_ERROR
2923 self.assertIsNone(check_rep_fn)
2924 self.assertNotEqual(0, len(expected_error_mode))
2925 self.assertNotIn(0, expected_error_mode)
2926 if check_rep_fn is not None:
2927 expected_msg_type = rep_msg_type
2928 self.assertIsNone(check_error_fn)
2929 self.assertEqual(0, len(expected_error_mode))
2930 self.assertIsNotNone(expected_msg_type)
2931 if msg_type == KRB_ERROR:
2932 error_code = self.getElementValue(rep, 'error-code')
2933 fail_msg = f'Got unexpected error: {error_code}'
2934 else:
2935 fail_msg = f'Expected to fail with error: {expected_error_mode}'
2936 self.assertEqual(msg_type, expected_msg_type, fail_msg)
2938 if msg_type == KRB_ERROR:
2939 return check_error_fn(kdc_exchange_dict,
2940 callback_dict,
2941 rep)
2943 return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
2945 def as_exchange_dict(self,
2946 creds=None,
2947 client_cert=None,
2948 expected_crealm=None,
2949 expected_cname=None,
2950 expected_anon=False,
2951 expected_srealm=None,
2952 expected_sname=None,
2953 expected_account_name=None,
2954 expected_groups=None,
2955 unexpected_groups=None,
2956 expected_upn_name=None,
2957 expected_sid=None,
2958 expected_requester_sid=None,
2959 expected_domain_sid=None,
2960 expected_supported_etypes=None,
2961 expected_flags=None,
2962 unexpected_flags=None,
2963 ticket_decryption_key=None,
2964 expect_ticket_checksum=None,
2965 expect_full_checksum=None,
2966 generate_fast_fn=None,
2967 generate_fast_armor_fn=None,
2968 generate_fast_padata_fn=None,
2969 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2970 generate_padata_fn=None,
2971 check_error_fn=None,
2972 check_rep_fn=None,
2973 check_kdc_private_fn=None,
2974 check_patypes=True,
2975 callback_dict=None,
2976 expected_error_mode=0,
2977 expect_status=None,
2978 expected_status=None,
2979 expected_salt=None,
2980 authenticator_subkey=None,
2981 preauth_key=None,
2982 armor_key=None,
2983 armor_tgt=None,
2984 armor_subkey=None,
2985 auth_data=None,
2986 kdc_options='',
2987 inner_req=None,
2988 outer_req=None,
2989 pac_request=None,
2990 pac_options=None,
2991 ap_options=None,
2992 fast_ap_options=None,
2993 strict_edata_checking=True,
2994 using_pkinit=PkInit.NOT_USED,
2995 pk_nonce=None,
2996 expect_edata=None,
2997 expect_pac=True,
2998 expect_client_claims=None,
2999 expect_device_info=None,
3000 expect_device_claims=None,
3001 expect_upn_dns_info_ex=None,
3002 expect_pac_attrs=None,
3003 expect_pac_attrs_pac_request=None,
3004 expect_requester_sid=None,
3005 rc4_support=True,
3006 expected_client_claims=None,
3007 unexpected_client_claims=None,
3008 expected_device_claims=None,
3009 unexpected_device_claims=None,
3010 expect_resource_groups_flag=None,
3011 expected_device_groups=None,
3012 to_rodc=False):
3013 if expected_error_mode == 0:
3014 expected_error_mode = ()
3015 elif not isinstance(expected_error_mode, collections.abc.Container):
3016 expected_error_mode = (expected_error_mode,)
3018 kdc_exchange_dict = {
3019 'req_msg_type': KRB_AS_REQ,
3020 'req_asn1Spec': krb5_asn1.AS_REQ,
3021 'rep_msg_type': KRB_AS_REP,
3022 'rep_asn1Spec': krb5_asn1.AS_REP,
3023 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
3024 'creds': creds,
3025 'client_cert': client_cert,
3026 'expected_crealm': expected_crealm,
3027 'expected_cname': expected_cname,
3028 'expected_anon': expected_anon,
3029 'expected_srealm': expected_srealm,
3030 'expected_sname': expected_sname,
3031 'expected_account_name': expected_account_name,
3032 'expected_groups': expected_groups,
3033 'unexpected_groups': unexpected_groups,
3034 'expected_upn_name': expected_upn_name,
3035 'expected_sid': expected_sid,
3036 'expected_requester_sid': expected_requester_sid,
3037 'expected_domain_sid': expected_domain_sid,
3038 'expected_supported_etypes': expected_supported_etypes,
3039 'expected_flags': expected_flags,
3040 'unexpected_flags': unexpected_flags,
3041 'ticket_decryption_key': ticket_decryption_key,
3042 'expect_ticket_checksum': expect_ticket_checksum,
3043 'expect_full_checksum': expect_full_checksum,
3044 'generate_fast_fn': generate_fast_fn,
3045 'generate_fast_armor_fn': generate_fast_armor_fn,
3046 'generate_fast_padata_fn': generate_fast_padata_fn,
3047 'fast_armor_type': fast_armor_type,
3048 'generate_padata_fn': generate_padata_fn,
3049 'check_error_fn': check_error_fn,
3050 'check_rep_fn': check_rep_fn,
3051 'check_kdc_private_fn': check_kdc_private_fn,
3052 'check_patypes': check_patypes,
3053 'callback_dict': callback_dict,
3054 'expected_error_mode': expected_error_mode,
3055 'expect_status': expect_status,
3056 'expected_status': expected_status,
3057 'expected_salt': expected_salt,
3058 'authenticator_subkey': authenticator_subkey,
3059 'preauth_key': preauth_key,
3060 'armor_key': armor_key,
3061 'armor_tgt': armor_tgt,
3062 'armor_subkey': armor_subkey,
3063 'auth_data': auth_data,
3064 'kdc_options': kdc_options,
3065 'inner_req': inner_req,
3066 'outer_req': outer_req,
3067 'pac_request': pac_request,
3068 'pac_options': pac_options,
3069 'ap_options': ap_options,
3070 'fast_ap_options': fast_ap_options,
3071 'strict_edata_checking': strict_edata_checking,
3072 'using_pkinit': using_pkinit,
3073 'pk_nonce': pk_nonce,
3074 'expect_edata': expect_edata,
3075 'expect_pac': expect_pac,
3076 'expect_client_claims': expect_client_claims,
3077 'expect_device_info': expect_device_info,
3078 'expect_device_claims': expect_device_claims,
3079 'expect_upn_dns_info_ex': expect_upn_dns_info_ex,
3080 'expect_pac_attrs': expect_pac_attrs,
3081 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request,
3082 'expect_requester_sid': expect_requester_sid,
3083 'rc4_support': rc4_support,
3084 'expected_client_claims': expected_client_claims,
3085 'unexpected_client_claims': unexpected_client_claims,
3086 'expected_device_claims': expected_device_claims,
3087 'unexpected_device_claims': unexpected_device_claims,
3088 'expect_resource_groups_flag': expect_resource_groups_flag,
3089 'expected_device_groups': expected_device_groups,
3090 'to_rodc': to_rodc
3092 if callback_dict is None:
3093 callback_dict = {}
3095 return kdc_exchange_dict
3097 def tgs_exchange_dict(self,
3098 creds=None,
3099 expected_crealm=None,
3100 expected_cname=None,
3101 expected_anon=False,
3102 expected_srealm=None,
3103 expected_sname=None,
3104 expected_account_name=None,
3105 expected_groups=None,
3106 unexpected_groups=None,
3107 expected_upn_name=None,
3108 expected_sid=None,
3109 expected_requester_sid=None,
3110 expected_domain_sid=None,
3111 expected_device_domain_sid=None,
3112 expected_supported_etypes=None,
3113 expected_flags=None,
3114 unexpected_flags=None,
3115 ticket_decryption_key=None,
3116 expect_ticket_checksum=None,
3117 expect_full_checksum=None,
3118 generate_fast_fn=None,
3119 generate_fast_armor_fn=None,
3120 generate_fast_padata_fn=None,
3121 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
3122 generate_padata_fn=None,
3123 check_error_fn=None,
3124 check_rep_fn=None,
3125 check_kdc_private_fn=None,
3126 check_patypes=True,
3127 expected_error_mode=0,
3128 expect_status=None,
3129 expected_status=None,
3130 callback_dict=None,
3131 tgt=None,
3132 armor_key=None,
3133 armor_tgt=None,
3134 armor_subkey=None,
3135 authenticator_subkey=None,
3136 auth_data=None,
3137 body_checksum_type=None,
3138 kdc_options='',
3139 inner_req=None,
3140 outer_req=None,
3141 pac_request=None,
3142 pac_options=None,
3143 ap_options=None,
3144 fast_ap_options=None,
3145 strict_edata_checking=True,
3146 expect_edata=None,
3147 expect_pac=True,
3148 expect_client_claims=None,
3149 expect_device_info=None,
3150 expect_device_claims=None,
3151 expect_upn_dns_info_ex=None,
3152 expect_pac_attrs=None,
3153 expect_pac_attrs_pac_request=None,
3154 expect_requester_sid=None,
3155 expected_proxy_target=None,
3156 expected_transited_services=None,
3157 rc4_support=True,
3158 expected_client_claims=None,
3159 unexpected_client_claims=None,
3160 expected_device_claims=None,
3161 unexpected_device_claims=None,
3162 expect_resource_groups_flag=None,
3163 expected_device_groups=None,
3164 to_rodc=False):
3165 if expected_error_mode == 0:
3166 expected_error_mode = ()
3167 elif not isinstance(expected_error_mode, collections.abc.Container):
3168 expected_error_mode = (expected_error_mode,)
3170 kdc_exchange_dict = {
3171 'req_msg_type': KRB_TGS_REQ,
3172 'req_asn1Spec': krb5_asn1.TGS_REQ,
3173 'rep_msg_type': KRB_TGS_REP,
3174 'rep_asn1Spec': krb5_asn1.TGS_REP,
3175 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
3176 'creds': creds,
3177 'expected_crealm': expected_crealm,
3178 'expected_cname': expected_cname,
3179 'expected_anon': expected_anon,
3180 'expected_srealm': expected_srealm,
3181 'expected_sname': expected_sname,
3182 'expected_account_name': expected_account_name,
3183 'expected_groups': expected_groups,
3184 'unexpected_groups': unexpected_groups,
3185 'expected_upn_name': expected_upn_name,
3186 'expected_sid': expected_sid,
3187 'expected_requester_sid': expected_requester_sid,
3188 'expected_domain_sid': expected_domain_sid,
3189 'expected_device_domain_sid': expected_device_domain_sid,
3190 'expected_supported_etypes': expected_supported_etypes,
3191 'expected_flags': expected_flags,
3192 'unexpected_flags': unexpected_flags,
3193 'ticket_decryption_key': ticket_decryption_key,
3194 'expect_ticket_checksum': expect_ticket_checksum,
3195 'expect_full_checksum': expect_full_checksum,
3196 'generate_fast_fn': generate_fast_fn,
3197 'generate_fast_armor_fn': generate_fast_armor_fn,
3198 'generate_fast_padata_fn': generate_fast_padata_fn,
3199 'fast_armor_type': fast_armor_type,
3200 'generate_padata_fn': generate_padata_fn,
3201 'check_error_fn': check_error_fn,
3202 'check_rep_fn': check_rep_fn,
3203 'check_kdc_private_fn': check_kdc_private_fn,
3204 'check_patypes': check_patypes,
3205 'callback_dict': callback_dict,
3206 'expected_error_mode': expected_error_mode,
3207 'expect_status': expect_status,
3208 'expected_status': expected_status,
3209 'tgt': tgt,
3210 'body_checksum_type': body_checksum_type,
3211 'armor_key': armor_key,
3212 'armor_tgt': armor_tgt,
3213 'armor_subkey': armor_subkey,
3214 'auth_data': auth_data,
3215 'authenticator_subkey': authenticator_subkey,
3216 'kdc_options': kdc_options,
3217 'inner_req': inner_req,
3218 'outer_req': outer_req,
3219 'pac_request': pac_request,
3220 'pac_options': pac_options,
3221 'ap_options': ap_options,
3222 'fast_ap_options': fast_ap_options,
3223 'strict_edata_checking': strict_edata_checking,
3224 'expect_edata': expect_edata,
3225 'expect_pac': expect_pac,
3226 'expect_client_claims': expect_client_claims,
3227 'expect_device_info': expect_device_info,
3228 'expect_device_claims': expect_device_claims,
3229 'expect_upn_dns_info_ex': expect_upn_dns_info_ex,
3230 'expect_pac_attrs': expect_pac_attrs,
3231 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request,
3232 'expect_requester_sid': expect_requester_sid,
3233 'expected_proxy_target': expected_proxy_target,
3234 'expected_transited_services': expected_transited_services,
3235 'rc4_support': rc4_support,
3236 'expected_client_claims': expected_client_claims,
3237 'unexpected_client_claims': unexpected_client_claims,
3238 'expected_device_claims': expected_device_claims,
3239 'unexpected_device_claims': unexpected_device_claims,
3240 'expect_resource_groups_flag': expect_resource_groups_flag,
3241 'expected_device_groups': expected_device_groups,
3242 'to_rodc': to_rodc
3244 if callback_dict is None:
3245 callback_dict = {}
3247 return kdc_exchange_dict
3249 def generic_check_kdc_rep(self,
3250 kdc_exchange_dict,
3251 callback_dict,
3252 rep):
3254 expected_crealm = kdc_exchange_dict['expected_crealm']
3255 expected_anon = kdc_exchange_dict['expected_anon']
3256 expected_srealm = kdc_exchange_dict['expected_srealm']
3257 expected_sname = kdc_exchange_dict['expected_sname']
3258 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
3259 check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
3260 rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
3261 msg_type = kdc_exchange_dict['rep_msg_type']
3262 armor_key = kdc_exchange_dict['armor_key']
3264 self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
3265 padata = self.getElementValue(rep, 'padata')
3266 if self.strict_checking:
3267 self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
3268 if self.cname_checking:
3269 if expected_anon:
3270 expected_cname = self.PrincipalName_create(
3271 name_type=NT_WELLKNOWN,
3272 names=['WELLKNOWN', 'ANONYMOUS'])
3273 else:
3274 expected_cname = kdc_exchange_dict['expected_cname']
3275 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
3276 self.assertElementPresent(rep, 'ticket')
3277 ticket = self.getElementValue(rep, 'ticket')
3278 ticket_encpart = None
3279 ticket_cipher = None
3280 self.assertIsNotNone(ticket)
3281 if ticket is not None: # Never None, but gives indentation
3282 self.assertElementEqual(ticket, 'tkt-vno', 5)
3283 self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
3284 self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
3285 self.assertElementPresent(ticket, 'enc-part')
3286 ticket_encpart = self.getElementValue(ticket, 'enc-part')
3287 self.assertIsNotNone(ticket_encpart)
3288 if ticket_encpart is not None: # Never None, but gives indentation
3289 self.assertElementPresent(ticket_encpart, 'etype')
3291 kdc_options = kdc_exchange_dict['kdc_options']
3292 pos = len(tuple(krb5_asn1.KDCOptions('enc-tkt-in-skey'))) - 1
3293 expect_kvno = (pos >= len(kdc_options)
3294 or kdc_options[pos] != '1')
3295 if expect_kvno:
3296 # 'unspecified' means present, with any value != 0
3297 self.assertElementKVNO(ticket_encpart, 'kvno',
3298 self.unspecified_kvno)
3299 else:
3300 # For user-to-user, don't expect a kvno.
3301 self.assertElementMissing(ticket_encpart, 'kvno')
3303 self.assertElementPresent(ticket_encpart, 'cipher')
3304 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
3305 self.assertElementPresent(rep, 'enc-part')
3306 encpart = self.getElementValue(rep, 'enc-part')
3307 encpart_cipher = None
3308 self.assertIsNotNone(encpart)
3309 if encpart is not None: # Never None, but gives indentation
3310 self.assertElementPresent(encpart, 'etype')
3311 self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
3312 self.assertElementPresent(encpart, 'cipher')
3313 encpart_cipher = self.getElementValue(encpart, 'cipher')
3315 if self.padata_checking:
3316 self.check_reply_padata(kdc_exchange_dict,
3317 callback_dict,
3318 encpart,
3319 padata)
3321 ticket_checksum = None
3323 # Get the decryption key for the encrypted part
3324 encpart_decryption_key, encpart_decryption_usage = (
3325 self.get_preauth_key(kdc_exchange_dict))
3327 pa_dict = self.get_pa_dict(padata)
3329 if PADATA_PK_AS_REP in pa_dict:
3330 pk_as_rep = pa_dict[PADATA_PK_AS_REP]
3331 pk_as_rep = self.der_decode(pk_as_rep,
3332 asn1Spec=krb5_asn1.PA_PK_AS_REP())
3334 using_pkinit = kdc_exchange_dict['using_pkinit']
3335 if using_pkinit is PkInit.PUBLIC_KEY:
3336 content_info = self.der_decode(
3337 pk_as_rep['encKeyPack'],
3338 asn1Spec=krb5_asn1.ContentInfo())
3339 self.assertEqual(str(krb5_asn1.id_envelopedData),
3340 content_info['contentType'])
3342 content = self.der_decode(content_info['content'],
3343 asn1Spec=krb5_asn1.EnvelopedData())
3345 self.assertEqual(0, content['version'])
3346 originator_info = content['originatorInfo']
3347 self.assertFalse(originator_info.get('certs'))
3348 self.assertFalse(originator_info.get('crls'))
3349 self.assertFalse(content.get('unprotectedAttrs'))
3351 encrypted_content_info = content['encryptedContentInfo']
3352 recipient_infos = content['recipientInfos']
3354 self.assertEqual(1, len(recipient_infos))
3355 ktri = recipient_infos[0]['ktri']
3357 if self.strict_checking:
3358 self.assertEqual(0, ktri['version'])
3360 private_key = encpart_decryption_key
3361 self.assertIsInstance(private_key,
3362 asymmetric.rsa.RSAPrivateKey)
3364 client_subject_key_id = (
3365 x509.SubjectKeyIdentifier.from_public_key(
3366 private_key.public_key()))
3368 # Check that the client certificate is named as the recipient.
3369 ktri_rid = ktri['rid']
3370 try:
3371 issuer_and_serial_number = ktri_rid[
3372 'issuerAndSerialNumber']
3373 except KeyError:
3374 subject_key_id = ktri_rid['subjectKeyIdentifier']
3375 self.assertEqual(subject_key_id,
3376 client_subject_key_id.digest)
3377 else:
3378 client_certificate = kdc_exchange_dict['client_cert']
3380 self.assertIsNotNone(issuer_and_serial_number['issuer'])
3381 self.assertEqual(issuer_and_serial_number['serialNumber'],
3382 client_certificate.serial_number)
3384 key_encryption_algorithm = ktri['keyEncryptionAlgorithm']
3385 self.assertEqual(str(krb5_asn1.rsaEncryption),
3386 key_encryption_algorithm['algorithm'])
3387 if self.strict_checking:
3388 self.assertEqual(
3389 b'\x05\x00',
3390 key_encryption_algorithm.get('parameters'))
3392 encrypted_key = ktri['encryptedKey']
3394 # Decrypt the key.
3395 pad_len = 256 - len(encrypted_key)
3396 if pad_len:
3397 encrypted_key = bytes(pad_len) + encrypted_key
3398 decrypted_key = private_key.decrypt(
3399 encrypted_key,
3400 padding=asymmetric.padding.PKCS1v15())
3402 self.assertEqual(str(krb5_asn1.id_signedData),
3403 encrypted_content_info['contentType'])
3405 encrypted_content = encrypted_content_info['encryptedContent']
3406 encryption_algorithm = encrypted_content_info[
3407 'contentEncryptionAlgorithm']
3409 cipher_algorithm = self.cipher_from_algorithm(encryption_algorithm['algorithm'])
3411 # This will serve as the IV.
3412 parameters = self.der_decode(
3413 encryption_algorithm['parameters'],
3414 asn1Spec=krb5_asn1.CMSCBCParameter())
3416 # Decrypt the content.
3417 cipher = Cipher(cipher_algorithm(decrypted_key),
3418 modes.CBC(parameters),
3419 default_backend())
3420 decryptor = cipher.decryptor()
3421 decrypted_content = decryptor.update(encrypted_content)
3422 decrypted_content += decryptor.finalize()
3424 # The padding doesn’t fully comply to PKCS7 with a specified
3425 # blocksize, so we must unpad the data ourselves.
3426 decrypted_content = self.unpad(decrypted_content)
3428 signed_data = None
3429 signed_data_rfc2315 = None
3431 first_tag = decrypted_content[0]
3432 if first_tag == 0x30: # ASN.1 SEQUENCE tag
3433 signed_data = decrypted_content
3434 else:
3435 # Windows encodes the ASN.1 incorrectly, neglecting to add
3436 # the SEQUENCE tag. We’ll have to prepend it ourselves in
3437 # order for the decoding to work.
3438 encoded_len = self.asn1_length(decrypted_content)
3439 decrypted_content = bytes([0x30]) + encoded_len + (
3440 decrypted_content)
3442 if first_tag == 0x02: # ASN.1 INTEGER tag
3444 # The INTEGER tag indicates that the data is encoded
3445 # with the earlier variant of the SignedData ASN.1
3446 # schema specified in RFC2315, as per [MS-PKCA] 2.2.4
3447 # (PA-PK-AS-REP).
3448 signed_data_rfc2315 = decrypted_content
3450 elif first_tag == 0x06: # ASN.1 OBJECT IDENTIFIER tag
3452 # The OBJECT IDENTIFIER tag indicates that the data is
3453 # encoded as SignedData and wrapped in a ContentInfo
3454 # structure, which we shall have to decode first. This
3455 # seems to be the case when the supportedCMSTypes field
3456 # in the client’s AuthPack is missing or empty.
3458 content_info = self.der_decode(
3459 decrypted_content,
3460 asn1Spec=krb5_asn1.ContentInfo())
3461 self.assertEqual(str(krb5_asn1.id_signedData),
3462 content_info['contentType'])
3463 signed_data = content_info['content']
3464 else:
3465 self.fail(f'got reply with unknown initial tag '
3466 f'({first_tag})')
3468 if signed_data is not None:
3469 signed_data = self.der_decode(
3470 signed_data, asn1Spec=krb5_asn1.SignedData())
3472 encap_content_info = signed_data['encapContentInfo']
3474 content_type = encap_content_info['eContentType']
3475 content = encap_content_info['eContent']
3476 elif signed_data_rfc2315 is not None:
3477 signed_data = self.der_decode(
3478 signed_data_rfc2315,
3479 asn1Spec=krb5_asn1.SignedData_RFC2315())
3481 encap_content_info = signed_data['contentInfo']
3483 content_type = encap_content_info['contentType']
3484 content = self.der_decode(
3485 encap_content_info['content'],
3486 asn1Spec=pyasn1.type.univ.OctetString())
3487 else:
3488 self.fail('we must have got SignedData')
3490 self.assertEqual(str(krb5_asn1.id_pkinit_rkeyData),
3491 content_type)
3492 reply_key_pack = self.der_decode(
3493 content, asn1Spec=krb5_asn1.ReplyKeyPack())
3495 as_checksum = reply_key_pack['asChecksum']
3497 req_obj = kdc_exchange_dict['req_obj']
3498 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
3499 req_obj = self.der_encode(req_obj,
3500 asn1Spec=req_asn1Spec())
3502 reply_key = reply_key_pack['replyKey']
3504 # Reply the encpart decryption key with the decrypted key from
3505 # the reply.
3506 encpart_decryption_key = self.SessionKey_create(
3507 etype=reply_key['keytype'],
3508 contents=reply_key['keyvalue'],
3509 kvno=None)
3511 # Verify the checksum over the AS request body.
3512 kcrypto.verify_checksum(as_checksum['cksumtype'],
3513 encpart_decryption_key.key,
3514 KU_PKINIT_AS_REQ,
3515 req_obj,
3516 as_checksum['checksum'])
3517 elif using_pkinit is PkInit.DIFFIE_HELLMAN:
3518 content_info = self.der_decode(
3519 pk_as_rep['dhInfo']['dhSignedData'],
3520 asn1Spec=krb5_asn1.ContentInfo())
3521 self.assertEqual(str(krb5_asn1.id_signedData),
3522 content_info['contentType'])
3524 signed_data = self.der_decode(content_info['content'],
3525 asn1Spec=krb5_asn1.SignedData())
3527 encap_content_info = signed_data['encapContentInfo']
3528 content = encap_content_info['eContent']
3530 self.assertEqual(str(krb5_asn1.id_pkinit_DHKeyData),
3531 encap_content_info['eContentType'])
3533 dh_key_info = self.der_decode(
3534 content, asn1Spec=krb5_asn1.KDCDHKeyInfo())
3536 self.assertNotIn('dhKeyExpiration', dh_key_info)
3538 dh_private_key = encpart_decryption_key
3539 self.assertIsInstance(dh_private_key,
3540 asymmetric.dh.DHPrivateKey)
3542 self.assertElementEqual(dh_key_info, 'nonce',
3543 kdc_exchange_dict['pk_nonce'])
3545 dh_public_key_data = self.bytes_from_bit_string(
3546 dh_key_info['subjectPublicKey'])
3547 dh_public_key_decoded = self.der_decode(
3548 dh_public_key_data, asn1Spec=krb5_asn1.DHPublicKey())
3550 dh_numbers = dh_private_key.parameters().parameter_numbers()
3552 public_numbers = asymmetric.dh.DHPublicNumbers(
3553 dh_public_key_decoded, dh_numbers)
3554 dh_public_key = public_numbers.public_key(default_backend())
3556 # Perform the Diffie-Hellman key exchange.
3557 shared_secret = dh_private_key.exchange(dh_public_key)
3559 # Pad the shared secret out to the length of ‘p’.
3560 p_len = self.length_in_bytes(dh_numbers.p)
3561 padding_len = p_len - len(shared_secret)
3562 self.assertGreaterEqual(padding_len, 0)
3563 padded_shared_secret = bytes(padding_len) + shared_secret
3565 reply_key_enc_type = self.expected_etype(kdc_exchange_dict)
3567 # At the moment, we don’t specify a nonce in the request, so we
3568 # can assume these are empty.
3569 client_nonce = b''
3570 server_nonce = b''
3572 ciphertext = padded_shared_secret + client_nonce + server_nonce
3574 # Replace the encpart decryption key with the key derived from
3575 # the Diffie-Hellman key exchange.
3576 encpart_decryption_key = self.octetstring2key(
3577 ciphertext, reply_key_enc_type)
3578 else:
3579 self.fail(f'invalid value for using_pkinit: {using_pkinit}')
3581 self.assertEqual(3, signed_data['version'])
3583 digest_algorithms = signed_data['digestAlgorithms']
3584 self.assertEqual(1, len(digest_algorithms))
3585 digest_algorithm = digest_algorithms[0]
3586 # Ensure the hash algorithm is valid.
3587 _ = self.hash_from_algorithm_id(digest_algorithm)
3589 self.assertFalse(signed_data.get('crls'))
3591 signer_infos = signed_data['signerInfos']
3592 self.assertEqual(1, len(signer_infos))
3593 signer_info = signer_infos[0]
3595 self.assertEqual(1, signer_info['version'])
3597 # Get the certificate presented by the KDC.
3598 kdc_certificates = signed_data['certificates']
3599 self.assertEqual(1, len(kdc_certificates))
3600 kdc_certificate = self.der_encode(
3601 kdc_certificates[0], asn1Spec=krb5_asn1.CertificateChoices())
3602 kdc_certificate = x509.load_der_x509_certificate(kdc_certificate,
3603 default_backend())
3605 # Verify that the KDC’s certificate is named as the signer.
3606 sid = signer_info['sid']
3607 try:
3608 issuer_and_serial_number = sid['issuerAndSerialNumber']
3609 except KeyError:
3610 extension = kdc_certificate.extensions.get_extension_for_oid(
3611 x509.oid.ExtensionOID.SUBJECT_KEY_IDENTIFIER)
3612 cert_subject_key_id = extension.value.digest
3613 self.assertEqual(sid['subjectKeyIdentifier'], cert_subject_key_id)
3614 else:
3615 self.assertIsNotNone(issuer_and_serial_number['issuer'])
3616 self.assertEqual(issuer_and_serial_number['serialNumber'],
3617 kdc_certificate.serial_number)
3619 digest_algorithm = signer_info['digestAlgorithm']
3620 digest_hash_fn = self.hash_from_algorithm_id(digest_algorithm)
3622 signed_attrs = signer_info['signedAttrs']
3623 self.assertEqual(2, len(signed_attrs))
3625 signed_attr0 = signed_attrs[0]
3626 self.assertEqual(str(krb5_asn1.id_contentType),
3627 signed_attr0['attrType'])
3628 signed_attr0_values = signed_attr0['attrValues']
3629 self.assertEqual(1, len(signed_attr0_values))
3630 signed_attr0_value = self.der_decode(
3631 signed_attr0_values[0],
3632 asn1Spec=krb5_asn1.ContentType())
3633 if using_pkinit is PkInit.DIFFIE_HELLMAN:
3634 self.assertEqual(str(krb5_asn1.id_pkinit_DHKeyData),
3635 signed_attr0_value)
3636 else:
3637 self.assertEqual(str(krb5_asn1.id_pkinit_rkeyData),
3638 signed_attr0_value)
3640 signed_attr1 = signed_attrs[1]
3641 self.assertEqual(str(krb5_asn1.id_messageDigest),
3642 signed_attr1['attrType'])
3643 signed_attr1_values = signed_attr1['attrValues']
3644 self.assertEqual(1, len(signed_attr1_values))
3645 message_digest = self.der_decode(signed_attr1_values[0],
3646 krb5_asn1.MessageDigest())
3648 signature_algorithm = signer_info['signatureAlgorithm']
3649 hash_fn = self.hash_from_algorithm_id(signature_algorithm)
3651 # Compute the hash of the content to be signed. With the
3652 # Diffie-Hellman key exchange, this signature is over the type
3653 # KDCDHKeyInfo; otherwise, it is over the type ReplyKeyPack.
3654 digest = hashes.Hash(digest_hash_fn(), default_backend())
3655 digest.update(content)
3656 digest = digest.finalize()
3658 # Verify the hash. Note: this is a non–constant time comparison.
3659 self.assertEqual(digest, message_digest)
3661 # Re-encode the attributes ready for verifying the signature.
3662 cms_attrs = self.der_encode(signed_attrs,
3663 asn1Spec=krb5_asn1.CMSAttributes())
3665 # Verify the signature.
3666 kdc_public_key = kdc_certificate.public_key()
3667 kdc_public_key.verify(
3668 signer_info['signature'],
3669 cms_attrs,
3670 asymmetric.padding.PKCS1v15(),
3671 hash_fn())
3673 self.assertFalse(signer_info.get('unsignedAttrs'))
3675 if armor_key is not None:
3676 if PADATA_FX_FAST in pa_dict:
3677 fx_fast_data = pa_dict[PADATA_FX_FAST]
3678 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
3679 fx_fast_data,
3680 armor_key,
3681 finished=True)
3683 if 'strengthen-key' in fast_response:
3684 strengthen_key = self.EncryptionKey_import(
3685 fast_response['strengthen-key'])
3686 encpart_decryption_key = (
3687 self.generate_strengthen_reply_key(
3688 strengthen_key,
3689 encpart_decryption_key))
3691 fast_finished = fast_response.get('finished')
3692 if fast_finished is not None:
3693 ticket_checksum = fast_finished['ticket-checksum']
3695 self.check_rep_padata(kdc_exchange_dict,
3696 callback_dict,
3697 fast_response['padata'],
3698 error_code=0)
3700 ticket_private = None
3701 if ticket_decryption_key is not None:
3702 self.assertElementEqual(ticket_encpart, 'etype',
3703 ticket_decryption_key.etype)
3704 self.assertElementKVNO(ticket_encpart, 'kvno',
3705 ticket_decryption_key.kvno)
3706 ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
3707 ticket_cipher)
3708 ticket_private = self.der_decode(
3709 ticket_decpart,
3710 asn1Spec=krb5_asn1.EncTicketPart())
3712 encpart_private = None
3713 self.assertIsNotNone(encpart_decryption_key)
3714 if encpart_decryption_key is not None:
3715 self.assertElementEqual(encpart, 'etype',
3716 encpart_decryption_key.etype)
3717 if self.strict_checking:
3718 self.assertElementKVNO(encpart, 'kvno',
3719 encpart_decryption_key.kvno)
3720 rep_decpart = encpart_decryption_key.decrypt(
3721 encpart_decryption_usage,
3722 encpart_cipher)
3723 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
3724 # application tag 26
3725 try:
3726 encpart_private = self.der_decode(
3727 rep_decpart,
3728 asn1Spec=rep_encpart_asn1Spec())
3729 except Exception:
3730 encpart_private = self.der_decode(
3731 rep_decpart,
3732 asn1Spec=krb5_asn1.EncTGSRepPart())
3734 kdc_exchange_dict['reply_key'] = encpart_decryption_key
3736 self.assertIsNotNone(check_kdc_private_fn)
3737 if check_kdc_private_fn is not None:
3738 check_kdc_private_fn(kdc_exchange_dict, callback_dict,
3739 rep, ticket_private, encpart_private,
3740 ticket_checksum)
3742 return rep
3744 def check_fx_fast_data(self,
3745 kdc_exchange_dict,
3746 fx_fast_data,
3747 armor_key,
3748 finished=False,
3749 expect_strengthen_key=True):
3750 fx_fast_data = self.der_decode(fx_fast_data,
3751 asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
3753 enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
3754 self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
3756 fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
3758 fast_response = self.der_decode(fast_rep,
3759 asn1Spec=krb5_asn1.KrbFastResponse())
3761 if expect_strengthen_key and self.strict_checking:
3762 self.assertIn('strengthen-key', fast_response)
3764 if finished:
3765 self.assertIn('finished', fast_response)
3767 # Ensure that the nonce matches the nonce in the body of the request
3768 # (RFC6113 5.4.3).
3769 nonce = kdc_exchange_dict['nonce']
3770 self.assertEqual(nonce, fast_response['nonce'])
3772 return fast_response
3774 def generic_check_kdc_private(self,
3775 kdc_exchange_dict,
3776 callback_dict,
3777 rep,
3778 ticket_private,
3779 encpart_private,
3780 ticket_checksum):
3781 kdc_options = kdc_exchange_dict['kdc_options']
3782 canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
3783 canonicalize = (canon_pos < len(kdc_options)
3784 and kdc_options[canon_pos] == '1')
3785 renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1
3786 renewable = (renewable_pos < len(kdc_options)
3787 and kdc_options[renewable_pos] == '1')
3788 renew_pos = len(tuple(krb5_asn1.KDCOptions('renew'))) - 1
3789 renew = (renew_pos < len(kdc_options)
3790 and kdc_options[renew_pos] == '1')
3791 expect_renew_till = renewable or renew
3793 expected_crealm = kdc_exchange_dict['expected_crealm']
3794 expected_cname = kdc_exchange_dict['expected_cname']
3795 expected_srealm = kdc_exchange_dict['expected_srealm']
3796 expected_sname = kdc_exchange_dict['expected_sname']
3797 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
3799 rep_msg_type = kdc_exchange_dict['rep_msg_type']
3801 expected_flags = kdc_exchange_dict.get('expected_flags')
3802 unexpected_flags = kdc_exchange_dict.get('unexpected_flags')
3804 ticket = self.getElementValue(rep, 'ticket')
3806 if ticket_checksum is not None:
3807 armor_key = kdc_exchange_dict['armor_key']
3808 self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
3810 to_rodc = kdc_exchange_dict['to_rodc']
3811 if to_rodc:
3812 krbtgt_creds = self.get_rodc_krbtgt_creds()
3813 else:
3814 krbtgt_creds = self.get_krbtgt_creds()
3815 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
3817 krbtgt_keys = [krbtgt_key]
3818 if not self.strict_checking:
3819 krbtgt_key_rc4 = self.TicketDecryptionKey_from_creds(
3820 krbtgt_creds,
3821 etype=kcrypto.Enctype.RC4)
3822 krbtgt_keys.append(krbtgt_key_rc4)
3824 if self.expect_pac and self.is_tgs(expected_sname):
3825 expect_pac = True
3826 else:
3827 expect_pac = kdc_exchange_dict['expect_pac']
3829 ticket_session_key = None
3830 if ticket_private is not None:
3831 self.assertElementFlags(ticket_private, 'flags',
3832 expected_flags,
3833 unexpected_flags)
3834 self.assertElementPresent(ticket_private, 'key')
3835 ticket_key = self.getElementValue(ticket_private, 'key')
3836 self.assertIsNotNone(ticket_key)
3837 if ticket_key is not None: # Never None, but gives indentation
3838 self.assertElementPresent(ticket_key, 'keytype')
3839 self.assertElementPresent(ticket_key, 'keyvalue')
3840 ticket_session_key = self.EncryptionKey_import(ticket_key)
3841 self.assertElementEqualUTF8(ticket_private, 'crealm',
3842 expected_crealm)
3843 if self.cname_checking:
3844 self.assertElementEqualPrincipal(ticket_private, 'cname',
3845 expected_cname)
3846 self.assertElementPresent(ticket_private, 'transited')
3847 self.assertElementPresent(ticket_private, 'authtime')
3848 if self.strict_checking:
3849 self.assertElementPresent(ticket_private, 'starttime')
3850 self.assertElementPresent(ticket_private, 'endtime')
3851 if self.strict_checking:
3852 if expect_renew_till:
3853 self.assertElementPresent(ticket_private, 'renew-till')
3854 else:
3855 self.assertElementMissing(ticket_private, 'renew-till')
3856 if self.strict_checking:
3857 self.assertElementMissing(ticket_private, 'caddr')
3858 if expect_pac is not None:
3859 if expect_pac:
3860 self.assertElementPresent(ticket_private,
3861 'authorization-data',
3862 expect_empty=not expect_pac)
3863 else:
3864 # It is more correct to not have an authorization-data
3865 # present than an empty one.
3867 # https://github.com/krb5/krb5/pull/1225#issuecomment-995104193
3868 v = self.getElementValue(ticket_private,
3869 'authorization-data')
3870 if v is not None:
3871 self.assertElementPresent(ticket_private,
3872 'authorization-data',
3873 expect_empty=True)
3875 encpart_session_key = None
3876 if encpart_private is not None:
3877 self.assertElementPresent(encpart_private, 'key')
3878 encpart_key = self.getElementValue(encpart_private, 'key')
3879 self.assertIsNotNone(encpart_key)
3880 if encpart_key is not None: # Never None, but gives indentation
3881 self.assertElementPresent(encpart_key, 'keytype')
3882 self.assertElementPresent(encpart_key, 'keyvalue')
3883 encpart_session_key = self.EncryptionKey_import(encpart_key)
3884 self.assertElementPresent(encpart_private, 'last-req')
3885 expected_nonce = kdc_exchange_dict.get('pk_nonce')
3886 if not expected_nonce:
3887 expected_nonce = kdc_exchange_dict['nonce']
3888 self.assertElementEqual(encpart_private, 'nonce',
3889 expected_nonce)
3890 if rep_msg_type == KRB_AS_REP:
3891 if self.strict_checking:
3892 self.assertElementPresent(encpart_private,
3893 'key-expiration')
3894 else:
3895 self.assertElementMissing(encpart_private,
3896 'key-expiration')
3897 self.assertElementFlags(encpart_private, 'flags',
3898 expected_flags,
3899 unexpected_flags)
3900 self.assertElementPresent(encpart_private, 'authtime')
3901 if self.strict_checking:
3902 self.assertElementPresent(encpart_private, 'starttime')
3903 self.assertElementPresent(encpart_private, 'endtime')
3904 if self.strict_checking:
3905 if expect_renew_till:
3906 self.assertElementPresent(encpart_private, 'renew-till')
3907 else:
3908 self.assertElementMissing(encpart_private, 'renew-till')
3909 self.assertElementEqualUTF8(encpart_private, 'srealm',
3910 expected_srealm)
3911 self.assertElementEqualPrincipal(encpart_private, 'sname',
3912 expected_sname)
3913 if self.strict_checking:
3914 self.assertElementMissing(encpart_private, 'caddr')
3916 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
3918 sent_enc_pa_rep = self.sent_enc_pa_rep(kdc_exchange_dict)
3920 enc_padata = self.getElementValue(encpart_private,
3921 'encrypted-pa-data')
3922 if (canonicalize or '1' in sent_pac_options or (
3923 rep_msg_type == KRB_AS_REP and sent_enc_pa_rep)):
3924 if self.strict_checking:
3925 self.assertIsNotNone(enc_padata)
3927 if enc_padata is not None:
3928 enc_pa_dict = self.get_pa_dict(enc_padata)
3929 if self.strict_checking:
3930 if canonicalize:
3931 self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
3932 else:
3933 self.assertNotIn(PADATA_SUPPORTED_ETYPES,
3934 enc_pa_dict)
3936 if '1' in sent_pac_options:
3937 self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
3938 else:
3939 self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
3941 if rep_msg_type == KRB_AS_REP and sent_enc_pa_rep:
3942 self.assertIn(PADATA_REQ_ENC_PA_REP, enc_pa_dict)
3943 else:
3944 self.assertNotIn(PADATA_REQ_ENC_PA_REP, enc_pa_dict)
3946 if PADATA_SUPPORTED_ETYPES in enc_pa_dict:
3947 expected_supported_etypes = kdc_exchange_dict[
3948 'expected_supported_etypes']
3950 (supported_etypes,) = struct.unpack(
3951 '<L',
3952 enc_pa_dict[PADATA_SUPPORTED_ETYPES])
3954 ignore_bits = (security.KERB_ENCTYPE_DES_CBC_CRC |
3955 security.KERB_ENCTYPE_DES_CBC_MD5)
3957 self.assertEqual(
3958 supported_etypes & ~ignore_bits,
3959 expected_supported_etypes & ~ignore_bits,
3960 f'PADATA_SUPPORTED_ETYPES: got: {supported_etypes} (0x{supported_etypes:X}), '
3961 f'expected: {expected_supported_etypes} (0x{expected_supported_etypes:X})')
3963 if PADATA_PAC_OPTIONS in enc_pa_dict:
3964 pac_options = self.der_decode(
3965 enc_pa_dict[PADATA_PAC_OPTIONS],
3966 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
3968 self.assertElementEqual(pac_options, 'options',
3969 sent_pac_options)
3971 if PADATA_REQ_ENC_PA_REP in enc_pa_dict:
3972 enc_pa_rep = enc_pa_dict[PADATA_REQ_ENC_PA_REP]
3974 enc_pa_rep = self.der_decode(
3975 enc_pa_rep,
3976 asn1Spec=krb5_asn1.Checksum())
3978 reply_key = kdc_exchange_dict['reply_key']
3979 req_obj = kdc_exchange_dict['req_obj']
3980 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
3982 req_obj = self.der_encode(req_obj,
3983 asn1Spec=req_asn1Spec())
3985 checksum = enc_pa_rep['checksum']
3986 ctype = enc_pa_rep['cksumtype']
3988 reply_key.verify_checksum(KU_AS_REQ,
3989 req_obj,
3990 ctype,
3991 checksum)
3992 else:
3993 if enc_padata is not None:
3994 self.assertEqual(enc_padata, [])
3996 if ticket_session_key is not None and encpart_session_key is not None:
3997 self.assertEqual(ticket_session_key.etype,
3998 encpart_session_key.etype)
3999 self.assertEqual(ticket_session_key.key.contents,
4000 encpart_session_key.key.contents)
4001 if encpart_session_key is not None:
4002 session_key = encpart_session_key
4003 else:
4004 session_key = ticket_session_key
4005 ticket_creds = KerberosTicketCreds(
4006 ticket,
4007 session_key,
4008 crealm=expected_crealm,
4009 cname=expected_cname,
4010 srealm=expected_srealm,
4011 sname=expected_sname,
4012 decryption_key=ticket_decryption_key,
4013 ticket_private=ticket_private,
4014 encpart_private=encpart_private)
4016 if ticket_private is not None:
4017 pac_data = self.get_ticket_pac(ticket_creds, expect_pac=expect_pac)
4018 if expect_pac is True:
4019 self.assertIsNotNone(pac_data)
4020 elif expect_pac is False:
4021 self.assertIsNone(pac_data)
4023 if pac_data is not None:
4024 self.check_pac_buffers(pac_data, kdc_exchange_dict)
4026 expect_ticket_checksum = kdc_exchange_dict['expect_ticket_checksum']
4027 expect_full_checksum = kdc_exchange_dict['expect_full_checksum']
4028 if expect_ticket_checksum or expect_full_checksum:
4029 self.assertIsNotNone(ticket_decryption_key)
4031 if ticket_decryption_key is not None:
4032 service_ticket = (rep_msg_type == KRB_TGS_REP
4033 and not self.is_tgs_principal(expected_sname))
4034 self.verify_ticket(ticket_creds, krbtgt_keys,
4035 service_ticket=service_ticket,
4036 expect_pac=expect_pac,
4037 expect_ticket_checksum=expect_ticket_checksum
4038 or self.tkt_sig_support,
4039 expect_full_checksum=expect_full_checksum
4040 or self.full_sig_support)
4042 kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
4044 # Check the SIDs in a LOGON_INFO PAC buffer.
4045 def check_logon_info_sids(self, logon_info_buffer, kdc_exchange_dict):
4046 info3 = logon_info_buffer.info.info.info3
4047 logon_info = info3.base
4048 resource_groups = logon_info_buffer.info.info.resource_groups
4050 expected_groups = kdc_exchange_dict['expected_groups']
4051 unexpected_groups = kdc_exchange_dict['unexpected_groups']
4052 expected_domain_sid = kdc_exchange_dict['expected_domain_sid']
4053 expected_sid = kdc_exchange_dict['expected_sid']
4055 domain_sid = logon_info.domain_sid
4056 if expected_domain_sid is not None:
4057 self.assertEqual(expected_domain_sid, str(domain_sid))
4059 if expected_sid is not None:
4060 got_sid = f'{domain_sid}-{logon_info.rid}'
4061 self.assertEqual(expected_sid, got_sid)
4063 if expected_groups is None and unexpected_groups is None:
4064 # Nothing more to do.
4065 return
4067 # Check the SIDs in the PAC.
4069 # Form a representation of the PAC, containing at first the primary
4070 # GID.
4071 primary_sid = f'{domain_sid}-{logon_info.primary_gid}'
4072 pac_sids = {
4073 (primary_sid, self.SidType.PRIMARY_GID, None),
4076 # Collect the Extra SIDs.
4077 if info3.sids is not None:
4078 self.assertTrue(logon_info.user_flags & (
4079 netlogon.NETLOGON_EXTRA_SIDS),
4080 'extra SIDs present, but EXTRA_SIDS flag not set')
4081 self.assertTrue(info3.sids, 'got empty SIDs')
4083 for sid_attr in info3.sids:
4084 got_sid = str(sid_attr.sid)
4085 if unexpected_groups is not None:
4086 self.assertNotIn(got_sid, unexpected_groups)
4088 pac_sid = (got_sid,
4089 self.SidType.EXTRA_SID,
4090 sid_attr.attributes)
4091 self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID')
4092 pac_sids.add(pac_sid)
4093 else:
4094 self.assertFalse(logon_info.user_flags & (
4095 netlogon.NETLOGON_EXTRA_SIDS),
4096 'no extra SIDs present, but EXTRA_SIDS flag set')
4098 # Collect the Base RIDs.
4099 if logon_info.groups.rids is not None:
4100 self.assertTrue(logon_info.groups.rids, 'got empty RIDs')
4102 for group in logon_info.groups.rids:
4103 got_sid = f'{domain_sid}-{group.rid}'
4104 if unexpected_groups is not None:
4105 self.assertNotIn(got_sid, unexpected_groups)
4107 pac_sid = (got_sid, self.SidType.BASE_SID, group.attributes)
4108 self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID')
4109 pac_sids.add(pac_sid)
4111 # Collect the Resource SIDs.
4112 expect_resource_groups_flag = kdc_exchange_dict[
4113 'expect_resource_groups_flag']
4114 expect_set_reason = ''
4115 expect_reset_reason = ''
4116 if expect_resource_groups_flag is None:
4117 expect_resource_groups_flag = (
4118 resource_groups.groups.rids is not None)
4119 expect_set_reason = 'resource groups present, but '
4120 expect_reset_reason = 'no resource groups present, but '
4122 if expect_resource_groups_flag:
4123 self.assertTrue(
4124 logon_info.user_flags & netlogon.NETLOGON_RESOURCE_GROUPS,
4125 f'{expect_set_reason}RESOURCE_GROUPS flag unexpectedly reset')
4126 else:
4127 self.assertFalse(
4128 logon_info.user_flags & netlogon.NETLOGON_RESOURCE_GROUPS,
4129 f'{expect_reset_reason}RESOURCE_GROUPS flag unexpectedly set')
4131 if resource_groups.groups.rids is not None:
4132 self.assertTrue(resource_groups.groups.rids, 'got empty RIDs')
4134 resource_group_sid = resource_groups.domain_sid
4135 for resource_group in resource_groups.groups.rids:
4136 got_sid = f'{resource_group_sid}-{resource_group.rid}'
4137 if unexpected_groups is not None:
4138 self.assertNotIn(got_sid, unexpected_groups)
4140 pac_sid = (got_sid,
4141 self.SidType.RESOURCE_SID,
4142 resource_group.attributes)
4143 self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID')
4144 pac_sids.add(pac_sid)
4146 # Compare the aggregated SIDs against the set of expected SIDs.
4147 if expected_groups is not None:
4148 if ... in expected_groups:
4149 # The caller is only interested in asserting the
4150 # presence of particular groups, and doesn't mind if
4151 # other groups are present as well.
4152 pac_sids.add(...)
4153 self.assertLessEqual(expected_groups, pac_sids,
4154 'expected groups')
4155 else:
4156 # The caller wants to make sure the groups match
4157 # exactly.
4158 self.assertEqual(expected_groups, pac_sids,
4159 'expected != got')
4161 def check_device_info(self, device_info, kdc_exchange_dict):
4162 armor_tgt = kdc_exchange_dict['armor_tgt']
4163 armor_auth_data = armor_tgt.ticket_private.get(
4164 'authorization-data')
4165 self.assertIsNotNone(armor_auth_data,
4166 'missing authdata for armor TGT')
4167 armor_pac_data = self.get_pac(armor_auth_data)
4168 armor_pac = ndr_unpack(krb5pac.PAC_DATA, armor_pac_data)
4169 for armor_pac_buffer in armor_pac.buffers:
4170 if armor_pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
4171 armor_info = armor_pac_buffer.info.info.info3
4172 break
4173 else:
4174 self.fail('missing logon info for armor PAC')
4175 self.assertEqual(armor_info.base.rid, device_info.rid)
4177 device_domain_sid = kdc_exchange_dict['expected_device_domain_sid']
4178 expected_device_groups = kdc_exchange_dict['expected_device_groups']
4179 if kdc_exchange_dict['expect_device_info']:
4180 self.assertIsNotNone(device_domain_sid)
4181 self.assertIsNotNone(expected_device_groups)
4183 if device_domain_sid is not None:
4184 self.assertEqual(device_domain_sid, str(device_info.domain_sid))
4185 else:
4186 device_domain_sid = str(device_info.domain_sid)
4188 # Check the device info SIDs.
4190 # A representation of the device info groups.
4191 primary_sid = f'{device_domain_sid}-{device_info.primary_gid}'
4192 got_sids = {
4193 (primary_sid, self.SidType.PRIMARY_GID, None),
4196 # Collect the groups.
4197 if device_info.groups.rids is not None:
4198 self.assertTrue(device_info.groups.rids, 'got empty RIDs')
4200 for group in device_info.groups.rids:
4201 got_sid = f'{device_domain_sid}-{group.rid}'
4203 device_sid = (got_sid, self.SidType.BASE_SID, group.attributes)
4204 self.assertNotIn(device_sid, got_sids, 'got duplicated SID')
4205 got_sids.add(device_sid)
4207 # Collect the SIDs.
4208 if device_info.sids is not None:
4209 self.assertTrue(device_info.sids, 'got empty SIDs')
4211 for sid_attr in device_info.sids:
4212 got_sid = str(sid_attr.sid)
4214 in_a_domain = sid_attr.sid.num_auths == 5 and (
4215 str(sid_attr.sid).startswith('S-1-5-21-'))
4216 self.assertFalse(in_a_domain,
4217 f'got unexpected SID for domain: {got_sid} '
4218 f'(should be in device_info.domain_groups)')
4220 device_sid = (got_sid,
4221 self.SidType.EXTRA_SID,
4222 sid_attr.attributes)
4223 self.assertNotIn(device_sid, got_sids, 'got duplicated SID')
4224 got_sids.add(device_sid)
4226 # Collect the domain groups.
4227 if device_info.domain_groups is not None:
4228 self.assertTrue(device_info.domain_groups, 'got empty domain groups')
4230 for domain_group in device_info.domain_groups:
4231 self.assertTrue(domain_group, 'got empty domain group')
4233 got_domain_sids = set()
4235 resource_group_sid = domain_group.domain_sid
4237 in_a_domain = resource_group_sid.num_auths == 4 and (
4238 str(resource_group_sid).startswith('S-1-5-21-'))
4239 self.assertTrue(
4240 in_a_domain,
4241 f'got unexpected domain SID for non-domain: {resource_group_sid} '
4242 f'(should be in device_info.sids)')
4244 for resource_group in domain_group.groups.rids:
4245 got_sid = f'{resource_group_sid}-{resource_group.rid}'
4247 device_sid = (got_sid,
4248 self.SidType.RESOURCE_SID,
4249 resource_group.attributes)
4250 self.assertNotIn(device_sid, got_domain_sids, 'got duplicated SID')
4251 got_domain_sids.add(device_sid)
4253 got_domain_sids = frozenset(got_domain_sids)
4254 self.assertNotIn(got_domain_sids, got_sids)
4255 got_sids.add(got_domain_sids)
4257 # Compare the aggregated device SIDs against the set of expected device
4258 # SIDs.
4259 if expected_device_groups is not None:
4260 self.assertEqual(expected_device_groups, got_sids,
4261 'expected != got')
4263 def check_pac_buffers(self, pac_data, kdc_exchange_dict):
4264 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
4266 rep_msg_type = kdc_exchange_dict['rep_msg_type']
4267 armor_tgt = kdc_exchange_dict['armor_tgt']
4269 compound_id = rep_msg_type == KRB_TGS_REP and armor_tgt is not None
4271 expected_sname = kdc_exchange_dict['expected_sname']
4272 expect_client_claims = kdc_exchange_dict['expect_client_claims']
4273 expect_device_info = kdc_exchange_dict['expect_device_info']
4274 expect_device_claims = kdc_exchange_dict['expect_device_claims']
4276 expected_types = [krb5pac.PAC_TYPE_LOGON_INFO,
4277 krb5pac.PAC_TYPE_SRV_CHECKSUM,
4278 krb5pac.PAC_TYPE_KDC_CHECKSUM,
4279 krb5pac.PAC_TYPE_LOGON_NAME,
4280 krb5pac.PAC_TYPE_UPN_DNS_INFO]
4282 kdc_options = kdc_exchange_dict['kdc_options']
4283 pos = len(tuple(krb5_asn1.KDCOptions('cname-in-addl-tkt'))) - 1
4284 constrained_delegation = (pos < len(kdc_options)
4285 and kdc_options[pos] == '1')
4286 if constrained_delegation:
4287 expected_types.append(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION)
4289 require_strict = set()
4290 unchecked = set()
4291 if not self.tkt_sig_support:
4292 require_strict.add(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
4293 if not self.full_sig_support:
4294 require_strict.add(krb5pac.PAC_TYPE_FULL_CHECKSUM)
4296 expected_client_claims = kdc_exchange_dict['expected_client_claims']
4297 unexpected_client_claims = kdc_exchange_dict[
4298 'unexpected_client_claims']
4300 if self.kdc_claims_support and expect_client_claims:
4301 expected_types.append(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
4302 else:
4303 self.assertFalse(
4304 expected_client_claims,
4305 'expected client claims, but client claims not expected in '
4306 'PAC')
4307 self.assertFalse(
4308 unexpected_client_claims,
4309 'unexpected client claims, but client claims not expected in '
4310 'PAC')
4312 if expect_client_claims is None:
4313 unchecked.add(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
4315 expected_device_claims = kdc_exchange_dict['expected_device_claims']
4316 unexpected_device_claims = kdc_exchange_dict['unexpected_device_claims']
4318 expected_device_groups = kdc_exchange_dict['expected_device_groups']
4320 if (self.kdc_claims_support and self.kdc_compound_id_support
4321 and expect_device_claims and compound_id):
4322 expected_types.append(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO)
4323 else:
4324 self.assertFalse(
4325 expect_device_claims,
4326 'expected device claims buffer, but device claims not '
4327 'expected in PAC')
4328 self.assertFalse(
4329 expected_device_claims,
4330 'expected device claims, but device claims not expected in '
4331 'PAC')
4332 self.assertFalse(
4333 unexpected_device_claims,
4334 'unexpected device claims, but device claims not expected in '
4335 'PAC')
4337 if expect_device_claims is None and compound_id:
4338 unchecked.add(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO)
4340 if self.kdc_compound_id_support and compound_id and expect_device_info:
4341 expected_types.append(krb5pac.PAC_TYPE_DEVICE_INFO)
4342 else:
4343 self.assertFalse(expect_device_info,
4344 'expected device info with no armor TGT or '
4345 'for non-TGS request')
4346 self.assertFalse(expected_device_groups,
4347 'expected device groups, but device info not '
4348 'expected in PAC')
4350 if expect_device_info is None and compound_id:
4351 unchecked.add(krb5pac.PAC_TYPE_DEVICE_INFO)
4353 if rep_msg_type == KRB_TGS_REP:
4354 if not self.is_tgs_principal(expected_sname):
4355 expected_types.append(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
4356 expected_types.append(krb5pac.PAC_TYPE_FULL_CHECKSUM)
4358 expect_extra_pac_buffers = self.is_tgs(expected_sname)
4360 expect_pac_attrs = kdc_exchange_dict['expect_pac_attrs']
4362 if expect_pac_attrs:
4363 expect_pac_attrs_pac_request = kdc_exchange_dict[
4364 'expect_pac_attrs_pac_request']
4365 else:
4366 expect_pac_attrs_pac_request = kdc_exchange_dict[
4367 'pac_request']
4369 if expect_pac_attrs is None:
4370 if self.expect_extra_pac_buffers:
4371 expect_pac_attrs = expect_extra_pac_buffers
4372 else:
4373 require_strict.add(krb5pac.PAC_TYPE_ATTRIBUTES_INFO)
4374 if expect_pac_attrs:
4375 expected_types.append(krb5pac.PAC_TYPE_ATTRIBUTES_INFO)
4377 expect_requester_sid = kdc_exchange_dict['expect_requester_sid']
4378 expected_requester_sid = kdc_exchange_dict['expected_requester_sid']
4380 if expect_requester_sid is None:
4381 if self.expect_extra_pac_buffers:
4382 expect_requester_sid = expect_extra_pac_buffers
4383 else:
4384 require_strict.add(krb5pac.PAC_TYPE_REQUESTER_SID)
4385 if expected_requester_sid is not None:
4386 expect_requester_sid = True
4387 if expect_requester_sid:
4388 expected_types.append(krb5pac.PAC_TYPE_REQUESTER_SID)
4390 sent_pk_as_req = self.sent_pk_as_req(kdc_exchange_dict)
4391 if sent_pk_as_req:
4392 expected_types.append(krb5pac.PAC_TYPE_CREDENTIAL_INFO)
4394 buffer_types = [pac_buffer.type
4395 for pac_buffer in pac.buffers]
4396 self.assertSequenceElementsEqual(
4397 expected_types, buffer_types,
4398 require_ordered=False,
4399 require_strict=require_strict,
4400 unchecked=unchecked)
4402 expected_account_name = kdc_exchange_dict['expected_account_name']
4403 expected_sid = kdc_exchange_dict['expected_sid']
4405 expect_upn_dns_info_ex = kdc_exchange_dict['expect_upn_dns_info_ex']
4406 if expect_upn_dns_info_ex is None and (
4407 expected_account_name is not None
4408 or expected_sid is not None):
4409 expect_upn_dns_info_ex = True
4411 for pac_buffer in pac.buffers:
4412 if pac_buffer.type == krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION:
4413 expected_proxy_target = kdc_exchange_dict[
4414 'expected_proxy_target']
4415 expected_transited_services = kdc_exchange_dict[
4416 'expected_transited_services']
4418 delegation_info = pac_buffer.info.info
4420 self.assertEqual(expected_proxy_target,
4421 str(delegation_info.proxy_target))
4423 transited_services = list(map(
4424 str, delegation_info.transited_services))
4425 self.assertEqual(expected_transited_services,
4426 transited_services)
4428 elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
4429 expected_cname = kdc_exchange_dict['expected_cname']
4430 account_name = '/'.join(expected_cname['name-string'])
4432 self.assertEqual(account_name, pac_buffer.info.account_name)
4434 elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
4435 info3 = pac_buffer.info.info.info3
4436 logon_info = info3.base
4438 if expected_account_name is not None:
4439 self.assertEqual(expected_account_name,
4440 str(logon_info.account_name))
4442 self.check_logon_info_sids(pac_buffer, kdc_exchange_dict)
4444 elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
4445 upn_dns_info = pac_buffer.info
4446 upn_dns_info_ex = upn_dns_info.ex
4448 expected_realm = kdc_exchange_dict['expected_crealm']
4449 self.assertEqual(expected_realm,
4450 upn_dns_info.dns_domain_name)
4452 expected_upn_name = kdc_exchange_dict['expected_upn_name']
4453 if expected_upn_name is not None:
4454 self.assertEqual(expected_upn_name,
4455 upn_dns_info.upn_name)
4457 if expect_upn_dns_info_ex:
4458 self.assertIsNotNone(upn_dns_info_ex)
4460 if upn_dns_info_ex is not None:
4461 if expected_account_name is not None:
4462 self.assertEqual(expected_account_name,
4463 upn_dns_info_ex.samaccountname)
4465 if expected_sid is not None:
4466 self.assertEqual(expected_sid,
4467 str(upn_dns_info_ex.objectsid))
4469 elif (pac_buffer.type == krb5pac.PAC_TYPE_ATTRIBUTES_INFO
4470 and expect_pac_attrs):
4471 attr_info = pac_buffer.info
4473 self.assertEqual(2, attr_info.flags_length)
4475 flags = attr_info.flags
4477 requested_pac = bool(flags & 1)
4478 given_pac = bool(flags & 2)
4480 self.assertEqual(expect_pac_attrs_pac_request is True,
4481 requested_pac)
4482 self.assertEqual(expect_pac_attrs_pac_request is None,
4483 given_pac)
4485 elif (pac_buffer.type == krb5pac.PAC_TYPE_REQUESTER_SID
4486 and expect_requester_sid):
4487 requester_sid = pac_buffer.info.sid
4489 if expected_requester_sid is None:
4490 expected_requester_sid = expected_sid
4491 if expected_sid is not None:
4492 self.assertEqual(expected_requester_sid,
4493 str(requester_sid))
4495 elif pac_buffer.type in {krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO,
4496 krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO}:
4497 remaining = pac_buffer.info.remaining
4499 if pac_buffer.type == krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO:
4500 claims_type = 'client claims'
4501 expected_claims = expected_client_claims
4502 unexpected_claims = unexpected_client_claims
4503 else:
4504 claims_type = 'device claims'
4505 expected_claims = expected_device_claims
4506 unexpected_claims = unexpected_device_claims
4508 if not remaining:
4509 # Windows may produce an empty claims buffer.
4510 self.assertFalse(expected_claims,
4511 f'expected {claims_type}, but the PAC '
4512 f'buffer was empty')
4513 continue
4515 if expected_claims:
4516 empty_msg = ', and {claims_type} were expected'
4517 else:
4518 empty_msg = ' for {claims_type} (should be missing)'
4520 client_claims = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR,
4521 remaining)
4522 client_claims = client_claims.claims.metadata
4523 self.assertIsNotNone(client_claims,
4524 f'got empty CLAIMS_SET_METADATA_NDR '
4525 f'inner structure {empty_msg}')
4527 self.assertIsNotNone(client_claims.claims_set,
4528 f'got empty CLAIMS_SET_METADATA '
4529 f'structure {empty_msg}')
4531 uncompressed_size = client_claims.uncompressed_claims_set_size
4532 compression_format = client_claims.compression_format
4534 if uncompressed_size < claims.CLAIM_MINIMUM_BYTES_TO_COMPRESS:
4535 self.assertEqual(claims.CLAIMS_COMPRESSION_FORMAT_NONE,
4536 compression_format,
4537 f'{claims_type} unexpectedly '
4538 f'compressed ({uncompressed_size} '
4539 f'bytes uncompressed)')
4540 else:
4541 self.assertEqual(
4542 claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF,
4543 compression_format,
4544 f'{claims_type} unexpectedly not compressed '
4545 f'({uncompressed_size} bytes uncompressed)')
4547 claims_set = client_claims.claims_set.claims.claims
4548 self.assertIsNotNone(claims_set,
4549 f'got empty CLAIMS_SET_NDR inner '
4550 f'structure {empty_msg}')
4552 claims_arrays = claims_set.claims_arrays
4553 self.assertIsNotNone(claims_arrays,
4554 f'got empty CLAIMS_SET structure '
4555 f'{empty_msg}')
4556 self.assertGreater(len(claims_arrays), 0,
4557 f'got empty claims array {empty_msg}')
4558 self.assertEqual(len(claims_arrays),
4559 claims_set.claims_array_count,
4560 f'{claims_type} arrays size mismatch')
4562 got_claims = {}
4564 for claims_array in claims_arrays:
4565 claim_entries = claims_array.claim_entries
4566 self.assertIsNotNone(claim_entries,
4567 f'got empty CLAIMS_ARRAY structure '
4568 f'{empty_msg}')
4569 self.assertGreater(len(claim_entries), 0,
4570 f'got empty claim entries array '
4571 f'{empty_msg}')
4572 self.assertEqual(len(claim_entries),
4573 claims_array.claims_count,
4574 f'{claims_type} entries array size '
4575 f'mismatch')
4577 for entry in claim_entries:
4578 if unexpected_claims is not None:
4579 self.assertNotIn(entry.id, unexpected_claims,
4580 f'got unexpected {claims_type} '
4581 f'in PAC')
4582 if expected_claims is None:
4583 continue
4585 expected_claim = expected_claims.get(entry.id)
4586 if expected_claim is None:
4587 continue
4589 self.assertNotIn(entry.id, got_claims,
4590 f'got duplicate {claims_type}')
4592 self.assertIsNotNone(entry.values.values,
4593 f'got {claims_type} with no '
4594 f'values')
4595 self.assertGreater(len(entry.values.values), 0,
4596 f'got empty {claims_type} values '
4597 f'array')
4598 self.assertEqual(len(entry.values.values),
4599 entry.values.value_count,
4600 f'{claims_type} values array size '
4601 f'mismatch')
4603 expected_claim_values = expected_claim.get('values')
4604 self.assertIsNotNone(expected_claim_values,
4605 f'got expected {claims_type} '
4606 f'with no values')
4608 values = type(expected_claim_values)(
4609 entry.values.values)
4611 got_claims[entry.id] = {
4612 'source_type': claims_array.claims_source_type,
4613 'type': entry.type,
4614 'values': values,
4617 self.assertEqual(expected_claims, got_claims or None,
4618 f'{claims_type} did not match expectations')
4620 elif pac_buffer.type == krb5pac.PAC_TYPE_DEVICE_INFO:
4621 device_info = pac_buffer.info.info
4623 self.check_device_info(device_info, kdc_exchange_dict)
4625 elif pac_buffer.type == krb5pac.PAC_TYPE_CREDENTIAL_INFO:
4626 credential_info = pac_buffer.info
4628 expected_etype = self.expected_etype(kdc_exchange_dict)
4630 self.assertEqual(0, credential_info.version)
4631 self.assertEqual(expected_etype,
4632 credential_info.encryption_type)
4634 encrypted_data = credential_info.encrypted_data
4635 reply_key = kdc_exchange_dict['reply_key']
4637 data = reply_key.decrypt(KU_NON_KERB_SALT, encrypted_data)
4639 credential_data_ndr = ndr_unpack(
4640 krb5pac.PAC_CREDENTIAL_DATA_NDR, data)
4642 credential_data = credential_data_ndr.ctr.data
4644 self.assertEqual(1, credential_data.credential_count)
4645 self.assertEqual(credential_data.credential_count,
4646 len(credential_data.credentials))
4648 package = credential_data.credentials[0]
4649 self.assertEqual('NTLM', str(package.package_name))
4651 ntlm_blob = bytes(package.credential)
4653 ntlm_package = ndr_unpack(krb5pac.PAC_CREDENTIAL_NTLM_SECPKG,
4654 ntlm_blob)
4656 self.assertEqual(0, ntlm_package.version)
4657 self.assertEqual(krb5pac.PAC_CREDENTIAL_NTLM_HAS_NT_HASH,
4658 ntlm_package.flags)
4660 creds = kdc_exchange_dict['creds']
4661 nt_password = bytes(ntlm_package.nt_password.hash)
4662 self.assertEqual(creds.get_nt_hash(), nt_password)
4664 lm_password = bytes(ntlm_package.lm_password.hash)
4665 self.assertEqual(bytes(16), lm_password)
4667 def generic_check_kdc_error(self,
4668 kdc_exchange_dict,
4669 callback_dict,
4670 rep,
4671 inner=False):
4673 rep_msg_type = kdc_exchange_dict['rep_msg_type']
4675 expected_anon = kdc_exchange_dict['expected_anon']
4676 expected_srealm = kdc_exchange_dict['expected_srealm']
4677 expected_sname = kdc_exchange_dict['expected_sname']
4678 expected_error_mode = kdc_exchange_dict['expected_error_mode']
4680 sent_fast = self.sent_fast(kdc_exchange_dict)
4682 fast_armor_type = kdc_exchange_dict['fast_armor_type']
4684 self.assertElementEqual(rep, 'pvno', 5)
4685 self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
4686 error_code = self.getElementValue(rep, 'error-code')
4687 self.assertIn(error_code, expected_error_mode)
4688 if self.strict_checking:
4689 self.assertElementMissing(rep, 'ctime')
4690 self.assertElementMissing(rep, 'cusec')
4691 self.assertElementPresent(rep, 'stime')
4692 self.assertElementPresent(rep, 'susec')
4693 # error-code checked above
4694 if expected_anon and not inner:
4695 expected_cname = self.PrincipalName_create(
4696 name_type=NT_WELLKNOWN,
4697 names=['WELLKNOWN', 'ANONYMOUS'])
4698 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
4699 elif self.strict_checking:
4700 self.assertElementMissing(rep, 'cname')
4701 if self.strict_checking:
4702 self.assertElementMissing(rep, 'crealm')
4703 self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
4704 self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
4705 self.assertElementMissing(rep, 'e-text')
4706 expect_status = kdc_exchange_dict['expect_status']
4707 expected_status = kdc_exchange_dict['expected_status']
4708 expect_edata = kdc_exchange_dict['expect_edata']
4709 if expect_edata is None:
4710 expect_edata = (error_code != KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
4711 and (not sent_fast or fast_armor_type is None
4712 or fast_armor_type == FX_FAST_ARMOR_AP_REQUEST)
4713 and not inner)
4714 if inner and expect_edata is self.expect_padata_outer:
4715 expect_edata = False
4716 if not expect_edata:
4717 self.assertFalse(expect_status)
4718 if self.strict_checking or expect_status is False:
4719 self.assertElementMissing(rep, 'e-data')
4720 return rep
4721 edata = self.getElementValue(rep, 'e-data')
4722 if self.strict_checking or expect_status:
4723 self.assertIsNotNone(edata)
4724 if edata is not None:
4725 try:
4726 error_data = self.der_decode(
4727 edata,
4728 asn1Spec=krb5_asn1.KERB_ERROR_DATA())
4729 except PyAsn1Error:
4730 if expect_status:
4731 # The test requires that the KDC be declared to support
4732 # NTSTATUS values in e-data to proceed.
4733 self.assertTrue(
4734 self.expect_nt_status,
4735 'expected status code (which, according to '
4736 'EXPECT_NT_STATUS=0, the KDC does not support)')
4738 self.fail('expected to get status code')
4740 rep_padata = self.der_decode(
4741 edata, asn1Spec=krb5_asn1.METHOD_DATA())
4742 self.assertGreater(len(rep_padata), 0)
4744 if sent_fast:
4745 self.assertEqual(1, len(rep_padata))
4746 rep_pa_dict = self.get_pa_dict(rep_padata)
4747 self.assertIn(PADATA_FX_FAST, rep_pa_dict)
4749 armor_key = kdc_exchange_dict['armor_key']
4750 self.assertIsNotNone(armor_key)
4751 fast_response = self.check_fx_fast_data(
4752 kdc_exchange_dict,
4753 rep_pa_dict[PADATA_FX_FAST],
4754 armor_key,
4755 expect_strengthen_key=False)
4757 rep_padata = fast_response['padata']
4759 etype_info2 = self.check_rep_padata(kdc_exchange_dict,
4760 callback_dict,
4761 rep_padata,
4762 error_code)
4764 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
4765 else:
4766 self.assertTrue(self.expect_nt_status,
4767 'got status code, but EXPECT_NT_STATUS=0')
4769 if expect_status is not None:
4770 self.assertTrue(expect_status,
4771 'got unexpected status code')
4773 self.assertEqual(KERB_ERR_TYPE_EXTENDED,
4774 error_data['data-type'])
4776 extended_error = error_data['data-value']
4778 self.assertEqual(12, len(extended_error))
4780 status = int.from_bytes(extended_error[:4], 'little')
4781 flags = int.from_bytes(extended_error[8:], 'little')
4783 self.assertEqual(expected_status, status)
4785 if rep_msg_type == KRB_TGS_REP:
4786 self.assertEqual(3, flags)
4787 else:
4788 self.assertEqual(1, flags)
4790 return rep
4792 def check_reply_padata(self,
4793 kdc_exchange_dict,
4794 callback_dict,
4795 encpart,
4796 rep_padata):
4797 expected_patypes = ()
4799 sent_fast = self.sent_fast(kdc_exchange_dict)
4800 using_pkinit = kdc_exchange_dict.get('using_pkinit', PkInit.NOT_USED)
4801 rep_msg_type = kdc_exchange_dict['rep_msg_type']
4803 if sent_fast:
4804 expected_patypes += (PADATA_FX_FAST,)
4805 elif rep_msg_type == KRB_AS_REP:
4806 if using_pkinit is PkInit.NOT_USED:
4807 chosen_etype = self.getElementValue(encpart, 'etype')
4808 self.assertIsNotNone(chosen_etype)
4810 if chosen_etype in {kcrypto.Enctype.AES256,
4811 kcrypto.Enctype.AES128}:
4812 expected_patypes += (PADATA_ETYPE_INFO2,)
4814 preauth_key = kdc_exchange_dict['preauth_key']
4815 self.assertIsInstance(preauth_key, Krb5EncryptionKey)
4816 if preauth_key.etype == kcrypto.Enctype.RC4 and rep_padata is None:
4817 rep_padata = ()
4818 else:
4819 expected_patypes += PADATA_PK_AS_REP,
4820 elif rep_msg_type == KRB_TGS_REP:
4821 if expected_patypes == () and rep_padata is None:
4822 rep_padata = ()
4824 if not self.strict_checking and rep_padata is None:
4825 rep_padata = ()
4827 self.assertIsNotNone(rep_padata)
4828 got_patypes = tuple(pa['padata-type'] for pa in rep_padata)
4829 self.assertSequenceElementsEqual(expected_patypes, got_patypes,
4830 # Windows does not add this.
4831 unchecked={PADATA_PKINIT_KX})
4833 if len(expected_patypes) == 0:
4834 return None
4836 pa_dict = self.get_pa_dict(rep_padata)
4838 etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2)
4839 if etype_info2 is not None:
4840 etype_info2 = self.der_decode(etype_info2,
4841 asn1Spec=krb5_asn1.ETYPE_INFO2())
4842 self.assertEqual(len(etype_info2), 1)
4843 elem = etype_info2[0]
4845 e = self.getElementValue(elem, 'etype')
4846 self.assertEqual(e, chosen_etype)
4847 salt = self.getElementValue(elem, 'salt')
4848 self.assertIsNotNone(salt)
4849 expected_salt = kdc_exchange_dict['expected_salt']
4850 if expected_salt is not None:
4851 self.assertEqual(salt, expected_salt)
4852 s2kparams = self.getElementValue(elem, 's2kparams')
4853 if self.strict_checking:
4854 self.assertIsNone(s2kparams)
4856 @staticmethod
4857 def greatest_common_etype(etypes, proposed_etypes):
4858 return max(filter(lambda e: e in etypes, proposed_etypes),
4859 default=None)
4861 @staticmethod
4862 def first_common_etype(etypes, proposed_etypes):
4863 return next(filter(lambda e: e in etypes, proposed_etypes), None)
4865 def supported_aes_rc4_etypes(self, kdc_exchange_dict):
4866 creds = kdc_exchange_dict['creds']
4867 supported_etypes = self.get_default_enctypes(creds)
4869 rc4_support = kdc_exchange_dict['rc4_support']
4871 aes_etypes = set()
4872 if kcrypto.Enctype.AES256 in supported_etypes:
4873 aes_etypes.add(kcrypto.Enctype.AES256)
4874 if kcrypto.Enctype.AES128 in supported_etypes:
4875 aes_etypes.add(kcrypto.Enctype.AES128)
4877 rc4_etypes = set()
4878 if rc4_support and kcrypto.Enctype.RC4 in supported_etypes:
4879 rc4_etypes.add(kcrypto.Enctype.RC4)
4881 return aes_etypes, rc4_etypes
4883 def greatest_aes_rc4_etypes(self, kdc_exchange_dict):
4884 req_body = kdc_exchange_dict['req_body']
4885 proposed_etypes = req_body['etype']
4887 aes_etypes, rc4_etypes = self.supported_aes_rc4_etypes(kdc_exchange_dict)
4889 expected_aes = self.greatest_common_etype(aes_etypes, proposed_etypes)
4890 expected_rc4 = self.greatest_common_etype(rc4_etypes, proposed_etypes)
4892 return expected_aes, expected_rc4
4894 def expected_etype(self, kdc_exchange_dict):
4895 req_body = kdc_exchange_dict['req_body']
4896 proposed_etypes = req_body['etype']
4898 aes_etypes, rc4_etypes = self.supported_aes_rc4_etypes(
4899 kdc_exchange_dict)
4901 return self.first_common_etype(aes_etypes | rc4_etypes,
4902 proposed_etypes)
4904 def check_rep_padata(self,
4905 kdc_exchange_dict,
4906 callback_dict,
4907 rep_padata,
4908 error_code):
4909 rep_msg_type = kdc_exchange_dict['rep_msg_type']
4911 sent_fast = self.sent_fast(kdc_exchange_dict)
4912 sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict)
4914 if rep_msg_type == KRB_TGS_REP:
4915 self.assertTrue(sent_fast)
4917 rc4_support = kdc_exchange_dict['rc4_support']
4919 expected_aes, expected_rc4 = self.greatest_aes_rc4_etypes(
4920 kdc_exchange_dict)
4922 expect_etype_info2 = ()
4923 expect_etype_info = False
4924 if expected_aes is not None:
4925 expect_etype_info2 += (expected_aes,)
4926 if expected_rc4 is not None:
4927 if error_code != 0:
4928 expect_etype_info2 += (expected_rc4,)
4929 if expected_aes is None:
4930 expect_etype_info = True
4932 if expect_etype_info:
4933 self.assertGreater(len(expect_etype_info2), 0)
4935 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
4937 check_patypes = kdc_exchange_dict['check_patypes']
4938 if check_patypes:
4939 expected_patypes = ()
4940 if sent_fast and error_code != 0:
4941 expected_patypes += (PADATA_FX_ERROR,)
4942 expected_patypes += (PADATA_FX_COOKIE,)
4944 if rep_msg_type == KRB_TGS_REP:
4945 if ('1' in sent_pac_options
4946 and error_code not in (0, KDC_ERR_GENERIC)):
4947 expected_patypes += (PADATA_PAC_OPTIONS,)
4948 elif error_code != KDC_ERR_GENERIC:
4949 if expect_etype_info:
4950 expected_patypes += (PADATA_ETYPE_INFO,)
4951 if len(expect_etype_info2) != 0:
4952 expected_patypes += (PADATA_ETYPE_INFO2,)
4954 sent_freshness = self.sent_freshness(kdc_exchange_dict)
4956 if error_code not in (KDC_ERR_PREAUTH_FAILED, KDC_ERR_SKEW,
4957 KDC_ERR_POLICY, KDC_ERR_CLIENT_REVOKED):
4958 if sent_fast:
4959 expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,)
4960 else:
4961 expected_patypes += (PADATA_ENC_TIMESTAMP,)
4963 if not sent_enc_challenge:
4964 expected_patypes += (PADATA_PK_AS_REQ,)
4965 if not sent_freshness:
4966 expected_patypes += (PADATA_PK_AS_REP_19,)
4968 if sent_freshness:
4969 expected_patypes += PADATA_AS_FRESHNESS,
4971 if (self.kdc_fast_support
4972 and not sent_fast
4973 and not sent_enc_challenge):
4974 expected_patypes += (PADATA_FX_FAST,)
4975 expected_patypes += (PADATA_FX_COOKIE,)
4977 require_strict = {PADATA_FX_COOKIE,
4978 PADATA_FX_FAST,
4979 PADATA_PAC_OPTIONS,
4980 PADATA_PK_AS_REP_19,
4981 PADATA_PK_AS_REQ,
4982 PADATA_PKINIT_KX,
4983 PADATA_GSS}
4984 strict_edata_checking = kdc_exchange_dict['strict_edata_checking']
4985 if not strict_edata_checking:
4986 require_strict.add(PADATA_ETYPE_INFO2)
4987 require_strict.add(PADATA_ENCRYPTED_CHALLENGE)
4989 got_patypes = tuple(pa['padata-type'] for pa in rep_padata)
4990 self.assertSequenceElementsEqual(expected_patypes, got_patypes,
4991 require_strict=require_strict)
4993 if not expected_patypes:
4994 return None
4996 pa_dict = self.get_pa_dict(rep_padata)
4998 enc_timestamp = pa_dict.get(PADATA_ENC_TIMESTAMP)
4999 if enc_timestamp is not None:
5000 self.assertEqual(len(enc_timestamp), 0)
5002 pk_as_req = pa_dict.get(PADATA_PK_AS_REQ)
5003 if pk_as_req is not None:
5004 self.assertEqual(len(pk_as_req), 0)
5006 pk_as_rep19 = pa_dict.get(PADATA_PK_AS_REP_19)
5007 if pk_as_rep19 is not None:
5008 self.assertEqual(len(pk_as_rep19), 0)
5010 freshness_token = pa_dict.get(PADATA_AS_FRESHNESS)
5011 if freshness_token is not None:
5012 self.assertEqual(bytes(2), freshness_token[:2])
5014 freshness = self.der_decode(freshness_token[2:],
5015 asn1Spec=krb5_asn1.EncryptedData())
5017 krbtgt_creds = self.get_krbtgt_creds()
5018 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
5020 self.assertElementEqual(freshness, 'etype', krbtgt_key.etype)
5021 self.assertElementKVNO(freshness, 'kvno', krbtgt_key.kvno)
5023 # Decrypt the freshness token.
5024 ts_enc = krbtgt_key.decrypt(KU_AS_FRESHNESS,
5025 freshness['cipher'])
5027 # Ensure that we can decode it as PA-ENC-TS-ENC.
5028 ts_enc = self.der_decode(ts_enc,
5029 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
5030 freshness_time = self.get_EpochFromKerberosTime(
5031 ts_enc['patimestamp'])
5032 freshness_time += ts_enc['pausec'] / 1e6
5034 # Ensure that it is reasonably close to the current time (within
5035 # five minutes, to allow for clock skew).
5036 current_time = datetime.datetime.now(
5037 datetime.timezone.utc).timestamp()
5038 self.assertLess(current_time - 5 * 60, freshness_time)
5039 self.assertLess(freshness_time, current_time + 5 * 60)
5041 kdc_exchange_dict['freshness_token'] = freshness_token
5043 fx_fast = pa_dict.get(PADATA_FX_FAST)
5044 if fx_fast is not None:
5045 self.assertEqual(len(fx_fast), 0)
5047 fast_cookie = pa_dict.get(PADATA_FX_COOKIE)
5048 if fast_cookie is not None:
5049 kdc_exchange_dict['fast_cookie'] = fast_cookie
5051 fast_error = pa_dict.get(PADATA_FX_ERROR)
5052 if fast_error is not None:
5053 fast_error = self.der_decode(fast_error,
5054 asn1Spec=krb5_asn1.KRB_ERROR())
5055 self.generic_check_kdc_error(kdc_exchange_dict,
5056 callback_dict,
5057 fast_error,
5058 inner=True)
5060 pac_options = pa_dict.get(PADATA_PAC_OPTIONS)
5061 if pac_options is not None:
5062 pac_options = self.der_decode(
5063 pac_options,
5064 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
5065 self.assertElementEqual(pac_options, 'options', sent_pac_options)
5067 enc_challenge = pa_dict.get(PADATA_ENCRYPTED_CHALLENGE)
5068 if enc_challenge is not None:
5069 if not sent_enc_challenge:
5070 self.assertEqual(len(enc_challenge), 0)
5071 else:
5072 armor_key = kdc_exchange_dict['armor_key']
5073 self.assertIsNotNone(armor_key)
5075 preauth_key, _ = self.get_preauth_key(kdc_exchange_dict)
5077 kdc_challenge_key = self.generate_kdc_challenge_key(
5078 armor_key, preauth_key)
5080 # Ensure that the encrypted challenge FAST factor is supported
5081 # (RFC6113 5.4.6).
5082 if self.strict_checking:
5083 self.assertNotEqual(len(enc_challenge), 0)
5084 if len(enc_challenge) != 0:
5085 encrypted_challenge = self.der_decode(
5086 enc_challenge,
5087 asn1Spec=krb5_asn1.EncryptedData())
5088 self.assertEqual(encrypted_challenge['etype'],
5089 kdc_challenge_key.etype)
5091 challenge = kdc_challenge_key.decrypt(
5092 KU_ENC_CHALLENGE_KDC,
5093 encrypted_challenge['cipher'])
5094 challenge = self.der_decode(
5095 challenge,
5096 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
5098 # Retrieve the returned timestamp.
5099 rep_patime = challenge['patimestamp']
5100 self.assertIn('pausec', challenge)
5102 # Ensure the returned time is within five minutes of the
5103 # current time.
5104 rep_time = self.get_EpochFromKerberosTime(rep_patime)
5105 current_time = time.time()
5107 self.assertLess(current_time - 300, rep_time)
5108 self.assertLess(rep_time, current_time + 300)
5110 etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2)
5111 if etype_info2 is not None:
5112 etype_info2 = self.der_decode(etype_info2,
5113 asn1Spec=krb5_asn1.ETYPE_INFO2())
5114 self.assertGreaterEqual(len(etype_info2), 1)
5115 if self.strict_checking:
5116 self.assertEqual(len(etype_info2), len(expect_etype_info2))
5117 for i in range(0, len(etype_info2)):
5118 e = self.getElementValue(etype_info2[i], 'etype')
5119 if self.strict_checking:
5120 self.assertEqual(e, expect_etype_info2[i])
5121 salt = self.getElementValue(etype_info2[i], 'salt')
5122 if e == kcrypto.Enctype.RC4:
5123 if self.strict_checking:
5124 self.assertIsNone(salt)
5125 else:
5126 self.assertIsNotNone(salt)
5127 expected_salt = kdc_exchange_dict['expected_salt']
5128 if expected_salt is not None:
5129 self.assertEqual(salt, expected_salt)
5130 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
5131 if self.strict_checking:
5132 self.assertIsNone(s2kparams)
5134 etype_info = pa_dict.get(PADATA_ETYPE_INFO)
5135 if etype_info is not None:
5136 etype_info = self.der_decode(etype_info,
5137 asn1Spec=krb5_asn1.ETYPE_INFO())
5138 self.assertEqual(len(etype_info), 1)
5139 e = self.getElementValue(etype_info[0], 'etype')
5140 self.assertEqual(e, kcrypto.Enctype.RC4)
5141 if rc4_support:
5142 self.assertEqual(e, expect_etype_info2[0])
5143 salt = self.getElementValue(etype_info[0], 'salt')
5144 if self.strict_checking:
5145 self.assertIsNotNone(salt)
5146 self.assertEqual(len(salt), 0)
5148 return etype_info2
5150 def generate_simple_fast(self,
5151 kdc_exchange_dict,
5152 _callback_dict,
5153 req_body,
5154 fast_padata,
5155 fast_armor,
5156 checksum,
5157 fast_options=''):
5158 armor_key = kdc_exchange_dict['armor_key']
5160 fast_req = self.KRB_FAST_REQ_create(fast_options,
5161 fast_padata,
5162 req_body)
5163 fast_req = self.der_encode(fast_req,
5164 asn1Spec=krb5_asn1.KrbFastReq())
5165 fast_req = self.EncryptedData_create(armor_key,
5166 KU_FAST_ENC,
5167 fast_req)
5169 fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
5170 checksum,
5171 fast_req)
5173 fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
5174 fx_fast_request = self.der_encode(
5175 fx_fast_request,
5176 asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
5178 fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
5179 fx_fast_request)
5181 return fast_padata
5183 def generate_ap_req(self,
5184 kdc_exchange_dict,
5185 _callback_dict,
5186 req_body,
5187 armor,
5188 usage=None,
5189 seq_number=None):
5190 req_body_checksum = None
5192 if armor:
5193 self.assertIsNone(req_body)
5195 tgt = kdc_exchange_dict['armor_tgt']
5196 authenticator_subkey = kdc_exchange_dict['armor_subkey']
5197 else:
5198 tgt = kdc_exchange_dict['tgt']
5199 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
5201 if req_body is not None:
5202 body_checksum_type = kdc_exchange_dict['body_checksum_type']
5204 req_body_blob = self.der_encode(
5205 req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY())
5207 req_body_checksum = self.Checksum_create(
5208 tgt.session_key,
5209 KU_TGS_REQ_AUTH_CKSUM,
5210 req_body_blob,
5211 ctype=body_checksum_type)
5213 auth_data = kdc_exchange_dict['auth_data']
5215 subkey_obj = None
5216 if authenticator_subkey is not None:
5217 subkey_obj = authenticator_subkey.export_obj()
5218 if seq_number is None:
5219 seq_number = random.randint(0, 0xfffffffe)
5220 (ctime, cusec) = self.get_KerberosTimeWithUsec()
5221 authenticator_obj = self.Authenticator_create(
5222 crealm=tgt.crealm,
5223 cname=tgt.cname,
5224 cksum=req_body_checksum,
5225 cusec=cusec,
5226 ctime=ctime,
5227 subkey=subkey_obj,
5228 seq_number=seq_number,
5229 authorization_data=auth_data)
5230 authenticator_blob = self.der_encode(
5231 authenticator_obj,
5232 asn1Spec=krb5_asn1.Authenticator())
5234 if usage is None:
5235 usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
5236 authenticator = self.EncryptedData_create(tgt.session_key,
5237 usage,
5238 authenticator_blob)
5240 if armor:
5241 ap_options = kdc_exchange_dict['fast_ap_options']
5242 else:
5243 ap_options = kdc_exchange_dict['ap_options']
5244 if ap_options is None:
5245 ap_options = str(krb5_asn1.APOptions('0'))
5246 ap_req_obj = self.AP_REQ_create(ap_options=ap_options,
5247 ticket=tgt.ticket,
5248 authenticator=authenticator)
5249 ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
5251 return ap_req
5253 def generate_simple_tgs_padata(self,
5254 kdc_exchange_dict,
5255 callback_dict,
5256 req_body):
5257 ap_req = self.generate_ap_req(kdc_exchange_dict,
5258 callback_dict,
5259 req_body,
5260 armor=False)
5261 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
5262 padata = [pa_tgs_req]
5264 return padata, req_body
5266 def get_preauth_key(self, kdc_exchange_dict):
5267 msg_type = kdc_exchange_dict['rep_msg_type']
5269 if msg_type == KRB_AS_REP:
5270 key = kdc_exchange_dict['preauth_key']
5271 usage = KU_AS_REP_ENC_PART
5272 else: # KRB_TGS_REP
5273 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
5274 if authenticator_subkey is not None:
5275 key = authenticator_subkey
5276 usage = KU_TGS_REP_ENC_PART_SUB_KEY
5277 else:
5278 tgt = kdc_exchange_dict['tgt']
5279 key = tgt.session_key
5280 usage = KU_TGS_REP_ENC_PART_SESSION
5282 self.assertIsNotNone(key)
5284 return key, usage
5286 def generate_armor_key(self, subkey, session_key):
5287 armor_key = kcrypto.cf2(subkey.key,
5288 session_key.key,
5289 b'subkeyarmor',
5290 b'ticketarmor')
5291 armor_key = Krb5EncryptionKey(armor_key, None)
5293 return armor_key
5295 def generate_strengthen_reply_key(self, strengthen_key, reply_key):
5296 strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
5297 reply_key.key,
5298 b'strengthenkey',
5299 b'replykey')
5300 strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
5301 reply_key.kvno)
5303 return strengthen_reply_key
5305 def generate_client_challenge_key(self, armor_key, longterm_key):
5306 client_challenge_key = kcrypto.cf2(armor_key.key,
5307 longterm_key.key,
5308 b'clientchallengearmor',
5309 b'challengelongterm')
5310 client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
5312 return client_challenge_key
5314 def generate_kdc_challenge_key(self, armor_key, longterm_key):
5315 kdc_challenge_key = kcrypto.cf2(armor_key.key,
5316 longterm_key.key,
5317 b'kdcchallengearmor',
5318 b'challengelongterm')
5319 kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
5321 return kdc_challenge_key
5323 def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
5324 expected_type = expected_checksum['cksumtype']
5325 self.assertEqual(armor_key.ctype, expected_type)
5327 ticket_blob = self.der_encode(ticket,
5328 asn1Spec=krb5_asn1.Ticket())
5329 checksum = self.Checksum_create(armor_key,
5330 KU_FAST_FINISHED,
5331 ticket_blob)
5332 self.assertEqual(expected_checksum, checksum)
5334 def verify_ticket(self, ticket, krbtgt_keys, service_ticket,
5335 expect_pac=True,
5336 expect_ticket_checksum=True,
5337 expect_full_checksum=None):
5338 # Decrypt the ticket.
5340 key = ticket.decryption_key
5341 enc_part = ticket.ticket['enc-part']
5343 self.assertElementEqual(enc_part, 'etype', key.etype)
5344 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
5346 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
5347 enc_part = self.der_decode(
5348 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
5350 # Fetch the authorization data from the ticket.
5351 auth_data = enc_part.get('authorization-data')
5352 if expect_pac:
5353 self.assertIsNotNone(auth_data)
5354 elif auth_data is None:
5355 return
5357 # Get a copy of the authdata with an empty PAC, and the existing PAC
5358 # (if present).
5359 empty_pac = self.get_empty_pac()
5360 auth_data, pac_data = self.replace_pac(auth_data,
5361 empty_pac,
5362 expect_pac=expect_pac)
5363 if not expect_pac:
5364 return
5366 # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
5367 # raw type to create a new PAC with zeroed signatures for
5368 # verification. This is because on Windows, the resource_groups field
5369 # is added to PAC_LOGON_INFO after the info3 field has been created,
5370 # which results in a different ordering of pointer values than Samba
5371 # (see commit 0e201ecdc53). Using the raw type avoids changing
5372 # PAC_LOGON_INFO, so verification against Windows can work. We still
5373 # need the PAC_DATA type to retrieve the actual checksums, because the
5374 # signatures in the raw type may contain padding bytes.
5375 pac = ndr_unpack(krb5pac.PAC_DATA,
5376 pac_data)
5377 raw_pac = ndr_unpack(krb5pac.PAC_DATA_RAW,
5378 pac_data)
5380 checksums = {}
5382 full_checksum_buffer = None
5384 for pac_buffer, raw_pac_buffer in zip(pac.buffers, raw_pac.buffers):
5385 buffer_type = pac_buffer.type
5386 if buffer_type in self.pac_checksum_types:
5387 self.assertNotIn(buffer_type, checksums,
5388 f'Duplicate checksum type {buffer_type}')
5390 # Fetch the checksum and the checksum type from the PAC buffer.
5391 checksum = pac_buffer.info.signature
5392 ctype = pac_buffer.info.type
5393 if ctype & 1 << 31:
5394 ctype |= -1 << 31
5396 checksums[buffer_type] = checksum, ctype
5398 if buffer_type == krb5pac.PAC_TYPE_FULL_CHECKSUM:
5399 full_checksum_buffer = raw_pac_buffer
5400 elif buffer_type != krb5pac.PAC_TYPE_TICKET_CHECKSUM:
5401 # Zero the checksum field so that we can later verify the
5402 # checksums. The ticket checksum field is not zeroed.
5404 signature = ndr_unpack(
5405 krb5pac.PAC_SIGNATURE_DATA,
5406 raw_pac_buffer.info.remaining)
5407 signature.signature = bytes(len(checksum))
5408 raw_pac_buffer.info.remaining = ndr_pack(
5409 signature)
5411 # Re-encode the PAC.
5412 pac_data = ndr_pack(raw_pac)
5414 if full_checksum_buffer is not None:
5415 signature = ndr_unpack(
5416 krb5pac.PAC_SIGNATURE_DATA,
5417 full_checksum_buffer.info.remaining)
5418 signature.signature = bytes(len(checksum))
5419 full_checksum_buffer.info.remaining = ndr_pack(
5420 signature)
5422 # Re-encode the PAC.
5423 full_pac_data = ndr_pack(raw_pac)
5425 # Verify the signatures.
5427 server_checksum, server_ctype = checksums[
5428 krb5pac.PAC_TYPE_SRV_CHECKSUM]
5429 key.verify_checksum(KU_NON_KERB_CKSUM_SALT,
5430 pac_data,
5431 server_ctype,
5432 server_checksum)
5434 kdc_checksum, kdc_ctype = checksums[
5435 krb5pac.PAC_TYPE_KDC_CHECKSUM]
5437 if isinstance(krbtgt_keys, collections.abc.Container):
5438 if self.strict_checking:
5439 krbtgt_key = krbtgt_keys[0]
5440 else:
5441 krbtgt_key = next(key for key in krbtgt_keys
5442 if key.ctype == kdc_ctype)
5443 else:
5444 krbtgt_key = krbtgt_keys
5446 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
5447 server_checksum,
5448 kdc_ctype,
5449 kdc_checksum)
5451 if not service_ticket:
5452 self.assertNotIn(krb5pac.PAC_TYPE_TICKET_CHECKSUM, checksums)
5453 self.assertNotIn(krb5pac.PAC_TYPE_FULL_CHECKSUM, checksums)
5454 else:
5455 ticket_checksum, ticket_ctype = checksums.get(
5456 krb5pac.PAC_TYPE_TICKET_CHECKSUM,
5457 (None, None))
5458 if expect_ticket_checksum:
5459 self.assertIsNotNone(ticket_checksum)
5460 elif expect_ticket_checksum is False:
5461 self.assertIsNone(ticket_checksum)
5462 if ticket_checksum is not None:
5463 enc_part['authorization-data'] = auth_data
5464 enc_part = self.der_encode(enc_part,
5465 asn1Spec=krb5_asn1.EncTicketPart())
5467 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
5468 enc_part,
5469 ticket_ctype,
5470 ticket_checksum)
5472 full_checksum, full_ctype = checksums.get(
5473 krb5pac.PAC_TYPE_FULL_CHECKSUM,
5474 (None, None))
5475 if expect_full_checksum:
5476 self.assertIsNotNone(full_checksum)
5477 elif expect_full_checksum is False:
5478 self.assertIsNone(full_checksum)
5479 if full_checksum is not None:
5480 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
5481 full_pac_data,
5482 full_ctype,
5483 full_checksum)
5485 def modified_ticket(self,
5486 ticket, *,
5487 new_ticket_key=None,
5488 modify_fn=None,
5489 modify_pac_fn=None,
5490 exclude_pac=False,
5491 allow_empty_authdata=False,
5492 update_pac_checksums=True,
5493 checksum_keys=None,
5494 include_checksums=None):
5495 if checksum_keys is None:
5496 # A dict containing a key for each checksum type to be created in
5497 # the PAC.
5498 checksum_keys = {}
5500 if include_checksums is None:
5501 # A dict containing a value for each checksum type; True if the
5502 # checksum type is to be included in the PAC, False if it is to be
5503 # excluded, or None/not present if the checksum is to be included
5504 # based on its presence in the original PAC.
5505 include_checksums = {}
5507 # Check that the values passed in by the caller make sense.
5509 self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types)
5510 self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types)
5512 if exclude_pac:
5513 self.assertIsNone(modify_pac_fn)
5515 update_pac_checksums = False
5517 if not update_pac_checksums:
5518 self.assertFalse(checksum_keys)
5519 self.assertFalse(include_checksums)
5521 expect_pac = modify_pac_fn is not None
5523 key = ticket.decryption_key
5525 if new_ticket_key is None:
5526 # Use the same key to re-encrypt the ticket.
5527 new_ticket_key = key
5529 if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys:
5530 # If the server signature key is not present, fall back to the key
5531 # used to encrypt the ticket.
5532 checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key
5534 if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys:
5535 # If the ticket signature key is not present, fall back to the key
5536 # used for the KDC signature.
5537 kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
5538 if kdc_checksum_key is not None:
5539 checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = (
5540 kdc_checksum_key)
5542 if krb5pac.PAC_TYPE_FULL_CHECKSUM not in checksum_keys:
5543 # If the full signature key is not present, fall back to the key
5544 # used for the KDC signature.
5545 kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
5546 if kdc_checksum_key is not None:
5547 checksum_keys[krb5pac.PAC_TYPE_FULL_CHECKSUM] = (
5548 kdc_checksum_key)
5550 # Decrypt the ticket.
5552 enc_part = ticket.ticket['enc-part']
5554 self.assertElementEqual(enc_part, 'etype', key.etype)
5555 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
5557 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
5558 enc_part = self.der_decode(
5559 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
5561 # Modify the ticket here.
5562 if modify_fn is not None:
5563 enc_part = modify_fn(enc_part)
5565 auth_data = enc_part.get('authorization-data')
5566 if expect_pac:
5567 self.assertIsNotNone(auth_data)
5568 if auth_data is not None:
5569 new_pac = None
5570 if not exclude_pac:
5571 # Get a copy of the authdata with an empty PAC, and the
5572 # existing PAC (if present).
5573 empty_pac = self.get_empty_pac()
5574 empty_pac_auth_data, pac_data = self.replace_pac(
5575 auth_data,
5576 empty_pac,
5577 expect_pac=expect_pac)
5579 if pac_data is not None:
5580 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
5582 # Modify the PAC here.
5583 if modify_pac_fn is not None:
5584 pac = modify_pac_fn(pac)
5586 if update_pac_checksums:
5587 # Get the enc-part with an empty PAC, which is needed
5588 # to create a ticket signature.
5589 enc_part_to_sign = enc_part.copy()
5590 enc_part_to_sign['authorization-data'] = (
5591 empty_pac_auth_data)
5592 enc_part_to_sign = self.der_encode(
5593 enc_part_to_sign,
5594 asn1Spec=krb5_asn1.EncTicketPart())
5596 self.update_pac_checksums(pac,
5597 checksum_keys,
5598 include_checksums,
5599 enc_part_to_sign)
5601 # Re-encode the PAC.
5602 pac_data = ndr_pack(pac)
5603 new_pac = self.AuthorizationData_create(AD_WIN2K_PAC,
5604 pac_data)
5606 # Replace the PAC in the authorization data and re-add it to the
5607 # ticket enc-part.
5608 auth_data, _ = self.replace_pac(
5609 auth_data, new_pac,
5610 expect_pac=expect_pac,
5611 allow_empty_authdata=allow_empty_authdata)
5612 enc_part['authorization-data'] = auth_data
5614 # Re-encrypt the ticket enc-part with the new key.
5615 enc_part_new = self.der_encode(enc_part,
5616 asn1Spec=krb5_asn1.EncTicketPart())
5617 enc_part_new = self.EncryptedData_create(new_ticket_key,
5618 KU_TICKET,
5619 enc_part_new)
5621 # Create a copy of the ticket with the new enc-part.
5622 new_ticket = ticket.ticket.copy()
5623 new_ticket['enc-part'] = enc_part_new
5625 new_ticket_creds = KerberosTicketCreds(
5626 new_ticket,
5627 session_key=ticket.session_key,
5628 crealm=ticket.crealm,
5629 cname=ticket.cname,
5630 srealm=ticket.srealm,
5631 sname=ticket.sname,
5632 decryption_key=new_ticket_key,
5633 ticket_private=enc_part,
5634 encpart_private=ticket.encpart_private)
5636 return new_ticket_creds
5638 def update_pac_checksums(self,
5639 pac,
5640 checksum_keys,
5641 include_checksums,
5642 enc_part=None):
5643 pac_buffers = pac.buffers
5644 checksum_buffers = {}
5646 # Find the relevant PAC checksum buffers.
5647 for pac_buffer in pac_buffers:
5648 buffer_type = pac_buffer.type
5649 if buffer_type in self.pac_checksum_types:
5650 self.assertNotIn(buffer_type, checksum_buffers,
5651 f'Duplicate checksum type {buffer_type}')
5653 checksum_buffers[buffer_type] = pac_buffer
5655 # Create any additional buffers that were requested but not
5656 # present. Conversely, remove any buffers that were requested to be
5657 # removed.
5658 for buffer_type in self.pac_checksum_types:
5659 if buffer_type in checksum_buffers:
5660 if include_checksums.get(buffer_type) is False:
5661 checksum_buffer = checksum_buffers.pop(buffer_type)
5663 pac.num_buffers -= 1
5664 pac_buffers.remove(checksum_buffer)
5666 elif include_checksums.get(buffer_type) is True:
5667 info = krb5pac.PAC_SIGNATURE_DATA()
5669 checksum_buffer = krb5pac.PAC_BUFFER()
5670 checksum_buffer.type = buffer_type
5671 checksum_buffer.info = info
5673 pac_buffers.append(checksum_buffer)
5674 pac.num_buffers += 1
5676 checksum_buffers[buffer_type] = checksum_buffer
5678 # Fill the relevant checksum buffers.
5679 for buffer_type, checksum_buffer in checksum_buffers.items():
5680 checksum_key = checksum_keys[buffer_type]
5681 ctype = checksum_key.ctype & ((1 << 32) - 1)
5683 if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
5684 self.assertIsNotNone(enc_part)
5686 signature = checksum_key.make_rodc_checksum(
5687 KU_NON_KERB_CKSUM_SALT,
5688 enc_part)
5690 elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM:
5691 signature = checksum_key.make_zeroed_checksum()
5693 else:
5694 signature = checksum_key.make_rodc_zeroed_checksum()
5696 checksum_buffer.info.signature = signature
5697 checksum_buffer.info.type = ctype
5699 # Add the new checksum buffers to the PAC.
5700 pac.buffers = pac_buffers
5702 # Calculate the full checksum and insert it into the PAC.
5703 full_checksum_buffer = checksum_buffers.get(
5704 krb5pac.PAC_TYPE_FULL_CHECKSUM)
5705 if full_checksum_buffer is not None:
5706 full_checksum_key = checksum_keys[krb5pac.PAC_TYPE_FULL_CHECKSUM]
5708 pac_data = ndr_pack(pac)
5709 full_checksum = full_checksum_key.make_rodc_checksum(
5710 KU_NON_KERB_CKSUM_SALT,
5711 pac_data)
5713 full_checksum_buffer.info.signature = full_checksum
5715 # Calculate the server and KDC checksums and insert them into the PAC.
5717 server_checksum_buffer = checksum_buffers.get(
5718 krb5pac.PAC_TYPE_SRV_CHECKSUM)
5719 if server_checksum_buffer is not None:
5720 server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM]
5722 pac_data = ndr_pack(pac)
5723 server_checksum = server_checksum_key.make_checksum(
5724 KU_NON_KERB_CKSUM_SALT,
5725 pac_data)
5727 server_checksum_buffer.info.signature = server_checksum
5729 kdc_checksum_buffer = checksum_buffers.get(
5730 krb5pac.PAC_TYPE_KDC_CHECKSUM)
5731 if kdc_checksum_buffer is not None:
5732 if server_checksum_buffer is None:
5733 # There's no server signature to make the checksum over, so
5734 # just make the checksum over an empty bytes object.
5735 server_checksum = bytes()
5737 kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM]
5739 kdc_checksum = kdc_checksum_key.make_rodc_checksum(
5740 KU_NON_KERB_CKSUM_SALT,
5741 server_checksum)
5743 kdc_checksum_buffer.info.signature = kdc_checksum
5745 def replace_pac(self, auth_data, new_pac, expect_pac=True,
5746 allow_empty_authdata=False):
5747 if new_pac is not None:
5748 self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)
5749 self.assertElementPresent(new_pac, 'ad-data')
5751 new_auth_data = []
5753 ad_relevant = None
5754 old_pac = None
5756 for authdata_elem in auth_data:
5757 if authdata_elem['ad-type'] == AD_IF_RELEVANT:
5758 ad_relevant = self.der_decode(
5759 authdata_elem['ad-data'],
5760 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
5762 relevant_elems = []
5763 for relevant_elem in ad_relevant:
5764 if relevant_elem['ad-type'] == AD_WIN2K_PAC:
5765 self.assertIsNone(old_pac, 'Multiple PACs detected')
5766 old_pac = relevant_elem['ad-data']
5768 if new_pac is not None:
5769 relevant_elems.append(new_pac)
5770 else:
5771 relevant_elems.append(relevant_elem)
5772 if expect_pac:
5773 self.assertIsNotNone(old_pac, 'Expected PAC')
5775 if relevant_elems or allow_empty_authdata:
5776 ad_relevant = self.der_encode(
5777 relevant_elems,
5778 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
5780 authdata_elem = self.AuthorizationData_create(
5781 AD_IF_RELEVANT,
5782 ad_relevant)
5783 else:
5784 authdata_elem = None
5786 if authdata_elem is not None or allow_empty_authdata:
5787 new_auth_data.append(authdata_elem)
5789 if expect_pac:
5790 self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT')
5792 return new_auth_data, old_pac
5794 def get_pac(self, auth_data, expect_pac=True):
5795 _, pac = self.replace_pac(auth_data, None, expect_pac)
5796 return pac
5798 def get_ticket_pac(self, ticket, expect_pac=True):
5799 auth_data = ticket.ticket_private.get('authorization-data')
5800 if expect_pac:
5801 self.assertIsNotNone(auth_data)
5802 elif auth_data is None:
5803 return None
5805 return self.get_pac(auth_data, expect_pac=expect_pac)
5807 def get_krbtgt_checksum_key(self):
5808 krbtgt_creds = self.get_krbtgt_creds()
5809 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
5811 return {
5812 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
5815 def is_tgs_principal(self, principal):
5816 if self.is_tgs(principal):
5817 return True
5819 if self.kadmin_is_tgs and self.is_kadmin(principal):
5820 return True
5822 return False
5824 def is_kadmin(self, principal):
5825 name = principal['name-string'][0]
5826 return name in ('kadmin', b'kadmin')
5828 def is_tgs(self, principal):
5829 name = principal['name-string'][0]
5830 return name in ('krbtgt', b'krbtgt')
5832 def is_tgt(self, ticket):
5833 sname = ticket.ticket['sname']
5834 return self.is_tgs(sname)
5836 def get_empty_pac(self):
5837 return self.AuthorizationData_create(AD_WIN2K_PAC, bytes(1))
5839 def get_outer_pa_dict(self, kdc_exchange_dict):
5840 return self.get_pa_dict(kdc_exchange_dict['req_padata'])
5842 def get_fast_pa_dict(self, kdc_exchange_dict):
5843 req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
5845 if req_pa_dict:
5846 return req_pa_dict
5848 return self.get_outer_pa_dict(kdc_exchange_dict)
5850 def sent_fast(self, kdc_exchange_dict):
5851 outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
5853 return PADATA_FX_FAST in outer_pa_dict
5855 def sent_enc_challenge(self, kdc_exchange_dict):
5856 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
5858 return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
5860 def sent_enc_pa_rep(self, kdc_exchange_dict):
5861 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
5863 return PADATA_REQ_ENC_PA_REP in fast_pa_dict
5865 def sent_pk_as_req(self, kdc_exchange_dict):
5866 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
5868 return PADATA_PK_AS_REQ in fast_pa_dict
5870 def sent_freshness(self, kdc_exchange_dict):
5871 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
5873 return PADATA_AS_FRESHNESS in fast_pa_dict
5875 def get_sent_pac_options(self, kdc_exchange_dict):
5876 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
5878 if PADATA_PAC_OPTIONS not in fast_pa_dict:
5879 return ''
5881 pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
5882 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
5883 pac_options = pac_options['options']
5885 # Mask out unsupported bits.
5886 pac_options, remaining = pac_options[:4], pac_options[4:]
5887 pac_options += '0' * len(remaining)
5889 return pac_options
5891 def get_krbtgt_sname(self):
5892 krbtgt_creds = self.get_krbtgt_creds()
5893 krbtgt_username = krbtgt_creds.get_username()
5894 krbtgt_realm = krbtgt_creds.get_realm()
5895 krbtgt_sname = self.PrincipalName_create(
5896 name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm])
5898 return krbtgt_sname
5900 def add_requester_sid(self, pac, sid):
5901 pac_buffers = pac.buffers
5903 buffer_types = [pac_buffer.type for pac_buffer in pac_buffers]
5904 self.assertNotIn(krb5pac.PAC_TYPE_REQUESTER_SID, buffer_types)
5906 requester_sid = krb5pac.PAC_REQUESTER_SID()
5907 requester_sid.sid = security.dom_sid(sid)
5909 requester_sid_buffer = krb5pac.PAC_BUFFER()
5910 requester_sid_buffer.type = krb5pac.PAC_TYPE_REQUESTER_SID
5911 requester_sid_buffer.info = requester_sid
5913 pac_buffers.append(requester_sid_buffer)
5915 pac.buffers = pac_buffers
5916 pac.num_buffers += 1
5918 return pac
5920 def modify_lifetime(self, ticket, lifetime, requester_sid=None):
5921 # Get the krbtgt key.
5922 krbtgt_creds = self.get_krbtgt_creds()
5924 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
5925 checksum_keys = {
5926 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
5929 current_time = time.time()
5931 # Set authtime and starttime to an hour in the past, to show that they
5932 # do not affect ticket rejection.
5933 start_time = self.get_KerberosTime(epoch=current_time, offset=-60 * 60)
5935 # Set the endtime of the ticket relative to our current time, so that
5936 # the ticket has 'lifetime' seconds remaining to live.
5937 end_time = self.get_KerberosTime(epoch=current_time, offset=lifetime)
5939 # Modify the times in the ticket.
5940 def modify_ticket_times(enc_part):
5941 enc_part['authtime'] = start_time
5942 if 'starttime' in enc_part:
5943 enc_part['starttime'] = start_time
5945 enc_part['endtime'] = end_time
5947 return enc_part
5949 # We have to set the times in both the ticket and the PAC, otherwise
5950 # Heimdal will complain.
5951 def modify_pac_time(pac):
5952 pac_buffers = pac.buffers
5954 for pac_buffer in pac_buffers:
5955 if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
5956 logon_time = self.get_EpochFromKerberosTime(start_time)
5957 pac_buffer.info.logon_time = unix2nttime(logon_time)
5958 break
5959 else:
5960 self.fail('failed to find LOGON_NAME PAC buffer')
5962 pac.buffers = pac_buffers
5964 return pac
5966 def modify_pac_fn(pac):
5967 if requester_sid is not None:
5968 # Add a requester SID to show that the KDC will then accept
5969 # this kpasswd ticket as if it were a TGT.
5970 pac = self.add_requester_sid(pac, sid=requester_sid)
5971 pac = modify_pac_time(pac)
5972 return pac
5974 # Do the actual modification.
5975 return self.modified_ticket(ticket,
5976 new_ticket_key=krbtgt_key,
5977 modify_fn=modify_ticket_times,
5978 modify_pac_fn=modify_pac_fn,
5979 checksum_keys=checksum_keys)
5981 def _test_as_exchange(self,
5982 cname,
5983 realm,
5984 sname,
5985 till,
5986 expected_error_mode,
5987 expected_crealm,
5988 expected_cname,
5989 expected_srealm,
5990 expected_sname,
5991 expected_salt,
5992 etypes,
5993 padata,
5994 kdc_options,
5995 creds=None,
5996 renew_time=None,
5997 expected_account_name=None,
5998 expected_groups=None,
5999 unexpected_groups=None,
6000 expected_upn_name=None,
6001 expected_sid=None,
6002 expected_domain_sid=None,
6003 expected_flags=None,
6004 unexpected_flags=None,
6005 expected_supported_etypes=None,
6006 preauth_key=None,
6007 ticket_decryption_key=None,
6008 pac_request=None,
6009 pac_options=None,
6010 expect_pac=True,
6011 expect_pac_attrs=None,
6012 expect_pac_attrs_pac_request=None,
6013 expect_requester_sid=None,
6014 expect_client_claims=None,
6015 expect_device_claims=None,
6016 expected_client_claims=None,
6017 unexpected_client_claims=None,
6018 expected_device_claims=None,
6019 unexpected_device_claims=None,
6020 expect_edata=None,
6021 expect_status=None,
6022 expected_status=None,
6023 rc4_support=True,
6024 to_rodc=False):
6026 def _generate_padata_copy(_kdc_exchange_dict,
6027 _callback_dict,
6028 req_body):
6029 return padata, req_body
6031 if not expected_error_mode:
6032 check_error_fn = None
6033 check_rep_fn = self.generic_check_kdc_rep
6034 else:
6035 check_error_fn = self.generic_check_kdc_error
6036 check_rep_fn = None
6038 if padata is not None:
6039 generate_padata_fn = _generate_padata_copy
6040 else:
6041 generate_padata_fn = None
6043 kdc_exchange_dict = self.as_exchange_dict(
6044 creds=creds,
6045 expected_crealm=expected_crealm,
6046 expected_cname=expected_cname,
6047 expected_srealm=expected_srealm,
6048 expected_sname=expected_sname,
6049 expected_account_name=expected_account_name,
6050 expected_groups=expected_groups,
6051 unexpected_groups=unexpected_groups,
6052 expected_upn_name=expected_upn_name,
6053 expected_sid=expected_sid,
6054 expected_domain_sid=expected_domain_sid,
6055 expected_supported_etypes=expected_supported_etypes,
6056 ticket_decryption_key=ticket_decryption_key,
6057 generate_padata_fn=generate_padata_fn,
6058 check_error_fn=check_error_fn,
6059 check_rep_fn=check_rep_fn,
6060 check_kdc_private_fn=self.generic_check_kdc_private,
6061 expected_error_mode=expected_error_mode,
6062 expected_salt=expected_salt,
6063 expected_flags=expected_flags,
6064 unexpected_flags=unexpected_flags,
6065 preauth_key=preauth_key,
6066 kdc_options=str(kdc_options),
6067 pac_request=pac_request,
6068 pac_options=pac_options,
6069 expect_pac=expect_pac,
6070 expect_pac_attrs=expect_pac_attrs,
6071 expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
6072 expect_requester_sid=expect_requester_sid,
6073 expect_client_claims=expect_client_claims,
6074 expect_device_claims=expect_device_claims,
6075 expected_client_claims=expected_client_claims,
6076 unexpected_client_claims=unexpected_client_claims,
6077 expected_device_claims=expected_device_claims,
6078 unexpected_device_claims=unexpected_device_claims,
6079 expect_edata=expect_edata,
6080 expect_status=expect_status,
6081 expected_status=expected_status,
6082 rc4_support=rc4_support,
6083 to_rodc=to_rodc)
6085 rep = self._generic_kdc_exchange(kdc_exchange_dict,
6086 cname=cname,
6087 realm=realm,
6088 sname=sname,
6089 till_time=till,
6090 renew_time=renew_time,
6091 etypes=etypes)
6093 return rep, kdc_exchange_dict