tests/krb5: Add KerberosCredentials.get_rid()
[Samba.git] / python / samba / tests / krb5 / raw_testcase.py
blob8d578bb0b899775ebeaa0f94acec306ed1e1be32
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import socket
21 import struct
22 import time
23 import datetime
24 import random
25 import binascii
26 import itertools
27 import collections
28 import math
30 from enum import Enum
31 from pprint import pprint
33 from cryptography import x509
34 from cryptography.hazmat.primitives import asymmetric, hashes
35 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
36 from cryptography.hazmat.backends import default_backend
38 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
39 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
40 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
41 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
43 from pyasn1.codec.ber.encoder import BitStringEncoder
44 import pyasn1.type.univ
46 from pyasn1.error import PyAsn1Error
48 from samba import unix2nttime
49 from samba.credentials import Credentials
50 from samba.dcerpc import claims, krb5pac, netlogon, samr, security
51 from samba.gensec import FEATURE_SEAL
52 from samba.ndr import ndr_pack, ndr_unpack
53 from samba.dcerpc.misc import (
54 SEC_CHAN_WKSTA,
55 SEC_CHAN_BDC,
58 import samba.tests
59 from samba.tests import TestCase
61 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
62 from samba.tests.krb5.rfc4120_constants import (
63 AD_IF_RELEVANT,
64 AD_WIN2K_PAC,
65 FX_FAST_ARMOR_AP_REQUEST,
66 KDC_ERR_CLIENT_REVOKED,
67 KDC_ERR_GENERIC,
68 KDC_ERR_POLICY,
69 KDC_ERR_PREAUTH_FAILED,
70 KDC_ERR_SKEW,
71 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
72 KERB_ERR_TYPE_EXTENDED,
73 KRB_AP_REP,
74 KRB_AP_REQ,
75 KRB_AS_REP,
76 KRB_AS_REQ,
77 KRB_ERROR,
78 KRB_PRIV,
79 KRB_TGS_REP,
80 KRB_TGS_REQ,
81 KU_AP_REQ_AUTH,
82 KU_AS_FRESHNESS,
83 KU_AS_REP_ENC_PART,
84 KU_AP_REQ_ENC_PART,
85 KU_AS_REQ,
86 KU_ENC_CHALLENGE_KDC,
87 KU_FAST_ENC,
88 KU_FAST_FINISHED,
89 KU_FAST_REP,
90 KU_FAST_REQ_CHKSUM,
91 KU_KRB_PRIV,
92 KU_NON_KERB_CKSUM_SALT,
93 KU_NON_KERB_SALT,
94 KU_PKINIT_AS_REQ,
95 KU_TGS_REP_ENC_PART_SESSION,
96 KU_TGS_REP_ENC_PART_SUB_KEY,
97 KU_TGS_REQ_AUTH,
98 KU_TGS_REQ_AUTH_CKSUM,
99 KU_TGS_REQ_AUTH_DAT_SESSION,
100 KU_TGS_REQ_AUTH_DAT_SUBKEY,
101 KU_TICKET,
102 NT_PRINCIPAL,
103 NT_SRV_INST,
104 NT_WELLKNOWN,
105 PADATA_AS_FRESHNESS,
106 PADATA_ENCRYPTED_CHALLENGE,
107 PADATA_ENC_TIMESTAMP,
108 PADATA_ETYPE_INFO,
109 PADATA_ETYPE_INFO2,
110 PADATA_FOR_USER,
111 PADATA_FX_COOKIE,
112 PADATA_FX_ERROR,
113 PADATA_FX_FAST,
114 PADATA_GSS,
115 PADATA_KDC_REQ,
116 PADATA_PAC_OPTIONS,
117 PADATA_PAC_REQUEST,
118 PADATA_PKINIT_KX,
119 PADATA_PK_AS_REP,
120 PADATA_PK_AS_REQ,
121 PADATA_PK_AS_REP_19,
122 PADATA_SUPPORTED_ETYPES,
123 PADATA_REQ_ENC_PA_REP
125 import samba.tests.krb5.kcrypto as kcrypto
128 def BitStringEncoder_encodeValue32(
129 self, value, asn1Spec, encodeFun, **options):
131 # BitStrings like KDCOptions or TicketFlags should at least
132 # be 32-Bit on the wire
134 if asn1Spec is not None:
135 # TODO: try to avoid ASN.1 schema instantiation
136 value = asn1Spec.clone(value)
138 valueLength = len(value)
139 if valueLength % 8:
140 alignedValue = value << (8 - valueLength % 8)
141 else:
142 alignedValue = value
144 substrate = alignedValue.asOctets()
145 length = len(substrate)
146 # We need at least 32-Bit / 4-Bytes
147 if length < 4:
148 padding = 4 - length
149 else:
150 padding = 0
151 ret = b'\x00' + substrate + (b'\x00' * padding)
152 return ret, False, True
155 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
158 def BitString_NamedValues_prettyPrint(self, scope=0):
159 ret = "%s" % self.asBinary()
160 bits = []
161 highest_bit = 32
162 for byte in self.asNumbers():
163 for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
164 mask = 1 << bit
165 if byte & mask:
166 val = 1
167 else:
168 val = 0
169 bits.append(val)
170 if len(bits) < highest_bit:
171 for bitPosition in range(len(bits), highest_bit):
172 bits.append(0)
173 indent = " " * scope
174 delim = ": (\n%s " % indent
175 for bitPosition in range(highest_bit):
176 if bitPosition in self.prettyPrintNamedValues:
177 name = self.prettyPrintNamedValues[bitPosition]
178 elif bits[bitPosition] != 0:
179 name = "unknown-bit-%u" % bitPosition
180 else:
181 continue
182 ret += "%s%s:%u" % (delim, name, bits[bitPosition])
183 delim = ",\n%s " % indent
184 ret += "\n%s)" % indent
185 return ret
188 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
189 krb5_asn1.TicketFlagsValues.namedValues
190 krb5_asn1.TicketFlags.namedValues =\
191 krb5_asn1.TicketFlagsValues.namedValues
192 krb5_asn1.TicketFlags.prettyPrint =\
193 BitString_NamedValues_prettyPrint
194 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
195 krb5_asn1.KDCOptionsValues.namedValues
196 krb5_asn1.KDCOptions.namedValues =\
197 krb5_asn1.KDCOptionsValues.namedValues
198 krb5_asn1.KDCOptions.prettyPrint =\
199 BitString_NamedValues_prettyPrint
200 krb5_asn1.APOptions.prettyPrintNamedValues =\
201 krb5_asn1.APOptionsValues.namedValues
202 krb5_asn1.APOptions.namedValues =\
203 krb5_asn1.APOptionsValues.namedValues
204 krb5_asn1.APOptions.prettyPrint =\
205 BitString_NamedValues_prettyPrint
206 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
207 krb5_asn1.PACOptionFlagsValues.namedValues
208 krb5_asn1.PACOptionFlags.namedValues =\
209 krb5_asn1.PACOptionFlagsValues.namedValues
210 krb5_asn1.PACOptionFlags.prettyPrint =\
211 BitString_NamedValues_prettyPrint
214 def Integer_NamedValues_prettyPrint(self, scope=0):
215 intval = int(self)
216 if intval in self.prettyPrintNamedValues:
217 name = self.prettyPrintNamedValues[intval]
218 else:
219 name = "<__unknown__>"
220 ret = "%d (0x%x) %s" % (intval, intval, name)
221 return ret
224 krb5_asn1.NameType.prettyPrintNamedValues =\
225 krb5_asn1.NameTypeValues.namedValues
226 krb5_asn1.NameType.prettyPrint =\
227 Integer_NamedValues_prettyPrint
228 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
229 krb5_asn1.AuthDataTypeValues.namedValues
230 krb5_asn1.AuthDataType.prettyPrint =\
231 Integer_NamedValues_prettyPrint
232 krb5_asn1.PADataType.prettyPrintNamedValues =\
233 krb5_asn1.PADataTypeValues.namedValues
234 krb5_asn1.PADataType.prettyPrint =\
235 Integer_NamedValues_prettyPrint
236 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
237 krb5_asn1.EncryptionTypeValues.namedValues
238 krb5_asn1.EncryptionType.prettyPrint =\
239 Integer_NamedValues_prettyPrint
240 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
241 krb5_asn1.ChecksumTypeValues.namedValues
242 krb5_asn1.ChecksumType.prettyPrint =\
243 Integer_NamedValues_prettyPrint
244 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
245 krb5_asn1.KerbErrorDataTypeValues.namedValues
246 krb5_asn1.KerbErrorDataType.prettyPrint =\
247 Integer_NamedValues_prettyPrint
250 class Krb5EncryptionKey:
251 def __init__(self, key, kvno):
252 EncTypeChecksum = {
253 kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
254 kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
255 kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
257 self.key = key
258 self.etype = key.enctype
259 self.ctype = EncTypeChecksum[self.etype]
260 self.kvno = kvno
262 def __str__(self):
263 return "etype=%d ctype=%d kvno=%d key=%s" % (
264 self.etype, self.ctype, self.kvno, self.key)
266 def encrypt(self, usage, plaintext):
267 ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
268 return ciphertext
270 def decrypt(self, usage, ciphertext):
271 plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
272 return plaintext
274 def make_zeroed_checksum(self, ctype=None):
275 if ctype is None:
276 ctype = self.ctype
278 checksum_len = kcrypto.checksum_len(ctype)
279 return bytes(checksum_len)
281 def make_checksum(self, usage, plaintext, ctype=None):
282 if ctype is None:
283 ctype = self.ctype
284 cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
285 return cksum
287 def verify_checksum(self, usage, plaintext, ctype, cksum):
288 if self.ctype != ctype:
289 raise AssertionError(f'key checksum type ({self.ctype}) != '
290 f'checksum type ({ctype})')
292 kcrypto.verify_checksum(ctype,
293 self.key,
294 usage,
295 plaintext,
296 cksum)
298 def export_obj(self):
299 EncryptionKey_obj = {
300 'keytype': self.etype,
301 'keyvalue': self.key.contents,
303 return EncryptionKey_obj
306 class RodcPacEncryptionKey(Krb5EncryptionKey):
307 def __init__(self, key, kvno, rodc_id=None):
308 super().__init__(key, kvno)
310 if rodc_id is None:
311 kvno = self.kvno
312 if kvno is not None:
313 kvno >>= 16
314 kvno &= (1 << 16) - 1
316 rodc_id = kvno or None
318 if rodc_id is not None:
319 self.rodc_id = rodc_id.to_bytes(2, byteorder='little')
320 else:
321 self.rodc_id = b''
323 def make_rodc_zeroed_checksum(self, ctype=None):
324 checksum = super().make_zeroed_checksum(ctype)
325 return checksum + bytes(len(self.rodc_id))
327 def make_rodc_checksum(self, usage, plaintext, ctype=None):
328 checksum = super().make_checksum(usage, plaintext, ctype)
329 return checksum + self.rodc_id
331 def verify_rodc_checksum(self, usage, plaintext, ctype, cksum):
332 if self.rodc_id:
333 cksum, cksum_rodc_id = cksum[:-2], cksum[-2:]
335 if self.rodc_id != cksum_rodc_id:
336 raise AssertionError(f'{self.rodc_id.hex()} != '
337 f'{cksum_rodc_id.hex()}')
339 super().verify_checksum(usage,
340 plaintext,
341 ctype,
342 cksum)
345 class ZeroedChecksumKey(RodcPacEncryptionKey):
346 def make_checksum(self, usage, plaintext, ctype=None):
347 return self.make_zeroed_checksum(ctype)
349 def make_rodc_checksum(self, usage, plaintext, ctype=None):
350 return self.make_rodc_zeroed_checksum(ctype)
353 class WrongLengthChecksumKey(RodcPacEncryptionKey):
354 def __init__(self, key, kvno, length):
355 super().__init__(key, kvno)
357 self._length = length
359 @classmethod
360 def _adjust_to_length(cls, checksum, length):
361 diff = length - len(checksum)
362 if diff > 0:
363 checksum += bytes(diff)
364 elif diff < 0:
365 checksum = checksum[:length]
367 return checksum
369 def make_zeroed_checksum(self, ctype=None):
370 return bytes(self._length)
372 def make_checksum(self, usage, plaintext, ctype=None):
373 checksum = super().make_checksum(usage, plaintext, ctype)
374 return self._adjust_to_length(checksum, self._length)
376 def make_rodc_zeroed_checksum(self, ctype=None):
377 return bytes(self._length)
379 def make_rodc_checksum(self, usage, plaintext, ctype=None):
380 checksum = super().make_rodc_checksum(usage, plaintext, ctype)
381 return self._adjust_to_length(checksum, self._length)
384 class KerberosCredentials(Credentials):
386 non_etype_bits = (
387 security.KERB_ENCTYPE_FAST_SUPPORTED) | (
388 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED) | (
389 security.KERB_ENCTYPE_CLAIMS_SUPPORTED) | (
390 security.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED) | (
391 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK)
393 def __init__(self):
394 super(KerberosCredentials, self).__init__()
395 all_enc_types = 0
396 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
397 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
398 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
400 self.as_supported_enctypes = all_enc_types
401 self.tgs_supported_enctypes = all_enc_types
402 self.ap_supported_enctypes = all_enc_types
404 self.kvno = None
405 self.forced_keys = {}
407 self.forced_salt = None
409 self.dn = None
410 self.upn = None
411 self.spn = None
412 self.sid = None
413 self.account_type = None
415 self._private_key = None
417 def set_as_supported_enctypes(self, value):
418 self.as_supported_enctypes = int(value)
420 def set_tgs_supported_enctypes(self, value):
421 self.tgs_supported_enctypes = int(value)
423 def set_ap_supported_enctypes(self, value):
424 self.ap_supported_enctypes = int(value)
426 etype_map = collections.OrderedDict([
427 (kcrypto.Enctype.AES256,
428 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96),
429 (kcrypto.Enctype.AES128,
430 security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96),
431 (kcrypto.Enctype.RC4,
432 security.KERB_ENCTYPE_RC4_HMAC_MD5),
433 (kcrypto.Enctype.DES_MD5,
434 security.KERB_ENCTYPE_DES_CBC_MD5),
435 (kcrypto.Enctype.DES_CRC,
436 security.KERB_ENCTYPE_DES_CBC_CRC)
439 @classmethod
440 def etypes_to_bits(cls, etypes):
441 bits = 0
442 for etype in etypes:
443 bit = cls.etype_map[etype]
444 if bits & bit:
445 raise ValueError(f'Got duplicate etype: {etype}')
446 bits |= bit
448 return bits
450 @classmethod
451 def bits_to_etypes(cls, bits):
452 etypes = ()
453 for etype, bit in cls.etype_map.items():
454 if bit & bits:
455 bits &= ~bit
456 etypes += (etype,)
458 bits &= ~cls.non_etype_bits
459 if bits != 0:
460 raise ValueError(f'Unsupported etype bits: {bits}')
462 return etypes
464 def get_as_krb5_etypes(self):
465 return self.bits_to_etypes(self.as_supported_enctypes)
467 def get_tgs_krb5_etypes(self):
468 return self.bits_to_etypes(self.tgs_supported_enctypes)
470 def get_ap_krb5_etypes(self):
471 return self.bits_to_etypes(self.ap_supported_enctypes)
473 def set_kvno(self, kvno):
474 # Sign-extend from 32 bits.
475 if kvno & 1 << 31:
476 kvno |= -1 << 31
477 self.kvno = kvno
479 def get_kvno(self):
480 return self.kvno
482 def set_forced_key(self, etype, hexkey):
483 etype = int(etype)
484 contents = binascii.a2b_hex(hexkey)
485 key = kcrypto.Key(etype, contents)
486 self.forced_keys[etype] = RodcPacEncryptionKey(key, self.kvno)
488 # Also set the NT hash of computer accounts for which we don’t know the
489 # password.
490 if etype == kcrypto.Enctype.RC4 and self.get_password() is None:
491 nt_hash = samr.Password()
492 nt_hash.hash = list(contents)
494 self.set_nt_hash(nt_hash)
496 def get_forced_key(self, etype):
497 etype = int(etype)
498 return self.forced_keys.get(etype)
500 def set_forced_salt(self, salt):
501 self.forced_salt = bytes(salt)
503 def get_forced_salt(self):
504 return self.forced_salt
506 def get_salt(self):
507 if self.forced_salt is not None:
508 return self.forced_salt
510 upn = self.get_upn()
511 if upn is not None:
512 salt_name = upn.rsplit('@', 1)[0].replace('/', '')
513 else:
514 salt_name = self.get_username()
516 secure_schannel_type = self.get_secure_channel_type()
517 if secure_schannel_type in [SEC_CHAN_WKSTA,SEC_CHAN_BDC]:
518 salt_name = self.get_username().lower()
519 if salt_name[-1] == '$':
520 salt_name = salt_name[:-1]
521 salt_string = '%shost%s.%s' % (
522 self.get_realm().upper(),
523 salt_name,
524 self.get_realm().lower())
525 else:
526 salt_string = self.get_realm().upper() + salt_name
528 return salt_string.encode('utf-8')
530 def set_dn(self, dn):
531 self.dn = dn
533 def get_dn(self):
534 return self.dn
536 def set_spn(self, spn):
537 self.spn = spn
539 def get_spn(self):
540 return self.spn
542 def set_upn(self, upn):
543 self.upn = upn
545 def get_upn(self):
546 return self.upn
548 def set_sid(self, sid):
549 self.sid = sid
551 def get_sid(self):
552 return self.sid
554 def get_rid(self):
555 sid = self.get_sid()
556 if sid is None:
557 return None
559 _, rid = sid.rsplit('-', 1)
560 return int(rid)
562 def set_type(self, account_type):
563 self.account_type = account_type
565 def get_type(self):
566 return self.account_type
568 def update_password(self, password):
569 self.set_password(password)
570 self.set_kvno(self.get_kvno() + 1)
572 def get_private_key(self):
573 if self._private_key is None:
574 # Generate a new keypair.
575 self._private_key = asymmetric.rsa.generate_private_key(
576 public_exponent=65537,
577 key_size=2048,
578 backend=default_backend()
581 return self._private_key
583 def get_public_key(self):
584 return self.get_private_key().public_key()
587 class KerberosTicketCreds:
588 def __init__(self, ticket, session_key,
589 crealm=None, cname=None,
590 srealm=None, sname=None,
591 decryption_key=None,
592 ticket_private=None,
593 encpart_private=None):
594 self.ticket = ticket
595 self.session_key = session_key
596 self.crealm = crealm
597 self.cname = cname
598 self.srealm = srealm
599 self.sname = sname
600 self.decryption_key = decryption_key
601 self.ticket_private = ticket_private
602 self.encpart_private = encpart_private
604 def set_sname(self, sname):
605 self.ticket['sname'] = sname
606 self.sname = sname
609 class PkInit(Enum):
610 NOT_USED = object()
611 PUBLIC_KEY = object()
612 DIFFIE_HELLMAN = object()
615 class RawKerberosTest(TestCase):
616 """A raw Kerberos Test case."""
618 class KpasswdMode(Enum):
619 SET = object()
620 CHANGE = object()
622 # The location of a SID within the PAC
623 class SidType(Enum):
624 BASE_SID = object() # in info3.base.groups
625 EXTRA_SID = object() # in info3.sids
626 RESOURCE_SID = object() # in resource_groups
627 PRIMARY_GID = object() # the (sole) primary group
629 def __repr__(self):
630 return self.__str__()
632 pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
633 krb5pac.PAC_TYPE_KDC_CHECKSUM,
634 krb5pac.PAC_TYPE_TICKET_CHECKSUM,
635 krb5pac.PAC_TYPE_FULL_CHECKSUM}
637 etypes_to_test = (
638 {"value": -1111, "name": "dummy", },
639 {"value": kcrypto.Enctype.AES256, "name": "aes128", },
640 {"value": kcrypto.Enctype.AES128, "name": "aes256", },
641 {"value": kcrypto.Enctype.RC4, "name": "rc4", },
644 expect_padata_outer = object()
646 setup_etype_test_permutations_done = False
648 @classmethod
649 def setup_etype_test_permutations(cls):
650 if cls.setup_etype_test_permutations_done:
651 return
653 res = []
655 num_idxs = len(cls.etypes_to_test)
656 permutations = []
657 for num in range(1, num_idxs + 1):
658 chunk = list(itertools.permutations(range(num_idxs), num))
659 for e in chunk:
660 el = list(e)
661 permutations.append(el)
663 for p in permutations:
664 name = None
665 etypes = ()
666 for idx in p:
667 n = cls.etypes_to_test[idx]["name"]
668 if name is None:
669 name = n
670 else:
671 name += "_%s" % n
672 etypes += (cls.etypes_to_test[idx]["value"],)
674 r = {"name": name, "etypes": etypes, }
675 res.append(r)
677 cls.etype_test_permutations = res
678 cls.setup_etype_test_permutations_done = True
680 @classmethod
681 def etype_test_permutation_name_idx(cls):
682 cls.setup_etype_test_permutations()
683 res = []
684 idx = 0
685 for e in cls.etype_test_permutations:
686 r = (e['name'], idx)
687 idx += 1
688 res.append(r)
689 return res
691 def etype_test_permutation_by_idx(self, idx):
692 e = self.etype_test_permutations[idx]
693 return (e['name'], e['etypes'])
695 @classmethod
696 def setUpClass(cls):
697 super().setUpClass()
699 cls.host = samba.tests.env_get_var_value('SERVER')
700 cls.dc_host = samba.tests.env_get_var_value('DC_SERVER')
702 # A dictionary containing credentials that have already been
703 # obtained.
704 cls.creds_dict = {}
706 kdc_fast_support = samba.tests.env_get_var_value('FAST_SUPPORT',
707 allow_missing=True)
708 if kdc_fast_support is None:
709 kdc_fast_support = '0'
710 cls.kdc_fast_support = bool(int(kdc_fast_support))
712 kdc_claims_support = samba.tests.env_get_var_value('CLAIMS_SUPPORT',
713 allow_missing=True)
714 if kdc_claims_support is None:
715 kdc_claims_support = '0'
716 cls.kdc_claims_support = bool(int(kdc_claims_support))
718 kdc_compound_id_support = samba.tests.env_get_var_value(
719 'COMPOUND_ID_SUPPORT',
720 allow_missing=True)
721 if kdc_compound_id_support is None:
722 kdc_compound_id_support = '0'
723 cls.kdc_compound_id_support = bool(int(kdc_compound_id_support))
725 tkt_sig_support = samba.tests.env_get_var_value('TKT_SIG_SUPPORT',
726 allow_missing=True)
727 if tkt_sig_support is None:
728 tkt_sig_support = '1'
729 cls.tkt_sig_support = bool(int(tkt_sig_support))
731 full_sig_support = samba.tests.env_get_var_value('FULL_SIG_SUPPORT',
732 allow_missing=True)
733 if full_sig_support is None:
734 full_sig_support = '1'
735 cls.full_sig_support = bool(int(full_sig_support))
737 expect_pac = samba.tests.env_get_var_value('EXPECT_PAC',
738 allow_missing=True)
739 if expect_pac is None:
740 expect_pac = '1'
741 cls.expect_pac = bool(int(expect_pac))
743 expect_extra_pac_buffers = samba.tests.env_get_var_value(
744 'EXPECT_EXTRA_PAC_BUFFERS',
745 allow_missing=True)
746 if expect_extra_pac_buffers is None:
747 expect_extra_pac_buffers = '1'
748 cls.expect_extra_pac_buffers = bool(int(expect_extra_pac_buffers))
750 cname_checking = samba.tests.env_get_var_value('CHECK_CNAME',
751 allow_missing=True)
752 if cname_checking is None:
753 cname_checking = '1'
754 cls.cname_checking = bool(int(cname_checking))
756 padata_checking = samba.tests.env_get_var_value('CHECK_PADATA',
757 allow_missing=True)
758 if padata_checking is None:
759 padata_checking = '1'
760 cls.padata_checking = bool(int(padata_checking))
762 kadmin_is_tgs = samba.tests.env_get_var_value('KADMIN_IS_TGS',
763 allow_missing=True)
764 if kadmin_is_tgs is None:
765 kadmin_is_tgs = '0'
766 cls.kadmin_is_tgs = bool(int(kadmin_is_tgs))
768 default_etypes = samba.tests.env_get_var_value('DEFAULT_ETYPES',
769 allow_missing=True)
770 if default_etypes is not None:
771 default_etypes = int(default_etypes)
772 cls.default_etypes = default_etypes
774 forced_rc4 = samba.tests.env_get_var_value('FORCED_RC4',
775 allow_missing=True)
776 if forced_rc4 is None:
777 forced_rc4 = '0'
778 cls.forced_rc4 = bool(int(forced_rc4))
780 expect_nt_hash = samba.tests.env_get_var_value('EXPECT_NT_HASH',
781 allow_missing=True)
782 if expect_nt_hash is None:
783 expect_nt_hash = '1'
784 cls.expect_nt_hash = bool(int(expect_nt_hash))
786 expect_nt_status = samba.tests.env_get_var_value('EXPECT_NT_STATUS',
787 allow_missing=True)
788 if expect_nt_status is None:
789 expect_nt_status = '1'
790 cls.expect_nt_status = bool(int(expect_nt_status))
792 def setUp(self):
793 super().setUp()
794 self.do_asn1_print = False
795 self.do_hexdump = False
797 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
798 allow_missing=True)
799 if strict_checking is None:
800 strict_checking = '1'
801 self.strict_checking = bool(int(strict_checking))
803 self.s = None
805 self.unspecified_kvno = object()
807 def tearDown(self):
808 self._disconnect("tearDown")
809 super().tearDown()
811 def _disconnect(self, reason):
812 if self.s is None:
813 return
814 self.s.close()
815 self.s = None
816 if self.do_hexdump:
817 sys.stderr.write("disconnect[%s]\n" % reason)
819 def _connect_tcp(self, host, port=None):
820 if port is None:
821 port = 88
822 try:
823 self.a = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
824 socket.SOCK_STREAM, socket.SOL_TCP,
826 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
827 self.s.settimeout(10)
828 self.s.connect(self.a[0][4])
829 except socket.error:
830 self.s.close()
831 raise
832 except IOError:
833 self.s.close()
834 raise
836 def connect(self, host, port=None):
837 self.assertNotConnected()
838 self._connect_tcp(host, port)
839 if self.do_hexdump:
840 sys.stderr.write("connected[%s]\n" % host)
842 def env_get_var(self, varname, prefix,
843 fallback_default=True,
844 allow_missing=False):
845 val = None
846 if prefix is not None:
847 allow_missing_prefix = allow_missing or fallback_default
848 val = samba.tests.env_get_var_value(
849 '%s_%s' % (prefix, varname),
850 allow_missing=allow_missing_prefix)
851 else:
852 fallback_default = True
853 if val is None and fallback_default:
854 val = samba.tests.env_get_var_value(varname,
855 allow_missing=allow_missing)
856 return val
858 def _get_krb5_creds_from_env(self, prefix,
859 default_username=None,
860 allow_missing_password=False,
861 allow_missing_keys=True,
862 require_strongest_key=False):
863 c = KerberosCredentials()
864 c.guess()
866 domain = self.env_get_var('DOMAIN', prefix)
867 realm = self.env_get_var('REALM', prefix)
868 allow_missing_username = default_username is not None
869 username = self.env_get_var('USERNAME', prefix,
870 fallback_default=False,
871 allow_missing=allow_missing_username)
872 if username is None:
873 username = default_username
874 password = self.env_get_var('PASSWORD', prefix,
875 fallback_default=False,
876 allow_missing=allow_missing_password)
877 c.set_domain(domain)
878 c.set_realm(realm)
879 c.set_username(username)
880 if password is not None:
881 c.set_password(password)
882 as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
883 prefix, allow_missing=True)
884 if as_supported_enctypes is not None:
885 c.set_as_supported_enctypes(as_supported_enctypes)
886 tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
887 prefix, allow_missing=True)
888 if tgs_supported_enctypes is not None:
889 c.set_tgs_supported_enctypes(tgs_supported_enctypes)
890 ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
891 prefix, allow_missing=True)
892 if ap_supported_enctypes is not None:
893 c.set_ap_supported_enctypes(ap_supported_enctypes)
895 if require_strongest_key:
896 kvno_allow_missing = False
897 if password is None:
898 aes256_allow_missing = False
899 else:
900 aes256_allow_missing = True
901 else:
902 kvno_allow_missing = allow_missing_keys
903 aes256_allow_missing = allow_missing_keys
904 kvno = self.env_get_var('KVNO', prefix,
905 fallback_default=False,
906 allow_missing=kvno_allow_missing)
907 if kvno is not None:
908 c.set_kvno(int(kvno))
909 aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
910 fallback_default=False,
911 allow_missing=aes256_allow_missing)
912 if aes256_key is not None:
913 c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
914 aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
915 fallback_default=False,
916 allow_missing=True)
917 if aes128_key is not None:
918 c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
919 rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
920 fallback_default=False, allow_missing=True)
921 if rc4_key is not None:
922 c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
924 if not allow_missing_keys:
925 self.assertTrue(c.forced_keys,
926 'Please supply %s encryption keys '
927 'in environment' % prefix)
929 return c
931 def _get_krb5_creds(self,
932 prefix,
933 default_username=None,
934 allow_missing_password=False,
935 allow_missing_keys=True,
936 require_strongest_key=False,
937 fallback_creds_fn=None):
938 if prefix in self.creds_dict:
939 return self.creds_dict[prefix]
941 # We don't have the credentials already
942 creds = None
943 env_err = None
944 try:
945 # Try to obtain them from the environment
946 creds = self._get_krb5_creds_from_env(
947 prefix,
948 default_username=default_username,
949 allow_missing_password=allow_missing_password,
950 allow_missing_keys=allow_missing_keys,
951 require_strongest_key=require_strongest_key)
952 except Exception as err:
953 # An error occurred, so save it for later
954 env_err = err
955 else:
956 self.assertIsNotNone(creds)
957 # Save the obtained credentials
958 self.creds_dict[prefix] = creds
959 return creds
961 if fallback_creds_fn is not None:
962 try:
963 # Try to use the fallback method
964 creds = fallback_creds_fn()
965 except Exception as err:
966 print("ERROR FROM ENV: %r" % (env_err))
967 print("FALLBACK-FN: %s" % (fallback_creds_fn))
968 print("FALLBACK-ERROR: %r" % (err))
969 else:
970 self.assertIsNotNone(creds)
971 # Save the obtained credentials
972 self.creds_dict[prefix] = creds
973 return creds
975 # Both methods failed, so raise the exception from the
976 # environment method
977 raise env_err
979 def get_user_creds(self,
980 allow_missing_password=False,
981 allow_missing_keys=True):
982 c = self._get_krb5_creds(prefix=None,
983 allow_missing_password=allow_missing_password,
984 allow_missing_keys=allow_missing_keys)
985 return c
987 def get_service_creds(self,
988 allow_missing_password=False,
989 allow_missing_keys=True):
990 c = self._get_krb5_creds(prefix='SERVICE',
991 allow_missing_password=allow_missing_password,
992 allow_missing_keys=allow_missing_keys)
993 return c
995 def get_client_creds(self,
996 allow_missing_password=False,
997 allow_missing_keys=True):
998 c = self._get_krb5_creds(prefix='CLIENT',
999 allow_missing_password=allow_missing_password,
1000 allow_missing_keys=allow_missing_keys)
1001 return c
1003 def get_server_creds(self,
1004 allow_missing_password=False,
1005 allow_missing_keys=True):
1006 c = self._get_krb5_creds(prefix='SERVER',
1007 allow_missing_password=allow_missing_password,
1008 allow_missing_keys=allow_missing_keys)
1009 return c
1011 def get_admin_creds(self,
1012 allow_missing_password=False,
1013 allow_missing_keys=True):
1014 c = self._get_krb5_creds(prefix='ADMIN',
1015 allow_missing_password=allow_missing_password,
1016 allow_missing_keys=allow_missing_keys)
1017 c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
1018 c.set_workstation('')
1019 return c
1021 def get_rodc_krbtgt_creds(self,
1022 require_keys=True,
1023 require_strongest_key=False):
1024 if require_strongest_key:
1025 self.assertTrue(require_keys)
1026 c = self._get_krb5_creds(prefix='RODC_KRBTGT',
1027 allow_missing_password=True,
1028 allow_missing_keys=not require_keys,
1029 require_strongest_key=require_strongest_key)
1030 return c
1032 def get_krbtgt_creds(self,
1033 require_keys=True,
1034 require_strongest_key=False):
1035 if require_strongest_key:
1036 self.assertTrue(require_keys)
1037 c = self._get_krb5_creds(prefix='KRBTGT',
1038 default_username='krbtgt',
1039 allow_missing_password=True,
1040 allow_missing_keys=not require_keys,
1041 require_strongest_key=require_strongest_key)
1042 return c
1044 def get_anon_creds(self):
1045 c = Credentials()
1046 c.set_anonymous()
1047 return c
1049 # Overridden by KDCBaseTest. At this level we don't know what actual
1050 # enctypes are supported, so the best we can do is go by whether NT hashes
1051 # are expected and whether the account is a workstation or not. This
1052 # matches the behaviour that tests expect by default.
1053 def get_default_enctypes(self, creds):
1054 self.assertIsNotNone(creds)
1056 default_enctypes = [
1057 kcrypto.Enctype.AES256,
1058 kcrypto.Enctype.AES128,
1061 if self.expect_nt_hash or creds.get_workstation():
1062 default_enctypes.append(kcrypto.Enctype.RC4)
1064 return default_enctypes
1066 def asn1_dump(self, name, obj, asn1_print=None):
1067 if asn1_print is None:
1068 asn1_print = self.do_asn1_print
1069 if asn1_print:
1070 if name is not None:
1071 sys.stderr.write("%s:\n%s" % (name, obj))
1072 else:
1073 sys.stderr.write("%s" % (obj))
1075 def hex_dump(self, name, blob, hexdump=None):
1076 if hexdump is None:
1077 hexdump = self.do_hexdump
1078 if hexdump:
1079 sys.stderr.write(
1080 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
1082 def der_decode(
1083 self,
1084 blob,
1085 asn1Spec=None,
1086 native_encode=True,
1087 asn1_print=None,
1088 hexdump=None):
1089 if asn1Spec is not None:
1090 class_name = type(asn1Spec).__name__.split(':')[0]
1091 else:
1092 class_name = "<None-asn1Spec>"
1093 self.hex_dump(class_name, blob, hexdump=hexdump)
1094 obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
1095 self.asn1_dump(None, obj, asn1_print=asn1_print)
1096 if native_encode:
1097 obj = pyasn1_native_encode(obj)
1098 return obj
1100 def der_encode(
1101 self,
1102 obj,
1103 asn1Spec=None,
1104 native_decode=True,
1105 asn1_print=None,
1106 hexdump=None):
1107 if native_decode:
1108 obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
1109 class_name = type(obj).__name__.split(':')[0]
1110 if class_name is not None:
1111 self.asn1_dump(None, obj, asn1_print=asn1_print)
1112 blob = pyasn1_der_encode(obj)
1113 if class_name is not None:
1114 self.hex_dump(class_name, blob, hexdump=hexdump)
1115 return blob
1117 def send_pdu(self, req, asn1_print=None, hexdump=None):
1118 k5_pdu = self.der_encode(
1119 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
1120 self.send_msg(k5_pdu, hexdump=hexdump)
1122 def send_msg(self, msg, hexdump=None):
1123 header = struct.pack('>I', len(msg))
1124 req_pdu = header
1125 req_pdu += msg
1126 self.hex_dump("send_msg", header, hexdump=hexdump)
1127 self.hex_dump("send_msg", msg, hexdump=hexdump)
1129 try:
1130 while True:
1131 sent = self.s.send(req_pdu, 0)
1132 if sent == len(req_pdu):
1133 return
1134 req_pdu = req_pdu[sent:]
1135 except socket.error as e:
1136 self._disconnect("send_msg: %s" % e)
1137 raise
1138 except IOError as e:
1139 self._disconnect("send_msg: %s" % e)
1140 raise
1142 def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
1143 rep_pdu = None
1144 try:
1145 if timeout is not None:
1146 self.s.settimeout(timeout)
1147 rep_pdu = self.s.recv(num_recv, 0)
1148 self.s.settimeout(10)
1149 if len(rep_pdu) == 0:
1150 self._disconnect("recv_raw: EOF")
1151 return None
1152 self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
1153 except socket.timeout:
1154 self.s.settimeout(10)
1155 sys.stderr.write("recv_raw: TIMEOUT\n")
1156 except socket.error as e:
1157 self._disconnect("recv_raw: %s" % e)
1158 raise
1159 except IOError as e:
1160 self._disconnect("recv_raw: %s" % e)
1161 raise
1162 return rep_pdu
1164 def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
1165 raw_pdu = self.recv_raw(
1166 num_recv=4, hexdump=hexdump, timeout=timeout)
1167 if raw_pdu is None:
1168 return None
1169 header = struct.unpack(">I", raw_pdu[0:4])
1170 k5_len = header[0]
1171 if k5_len == 0:
1172 return ""
1173 missing = k5_len
1174 rep_pdu = b''
1175 while missing > 0:
1176 raw_pdu = self.recv_raw(
1177 num_recv=missing, hexdump=hexdump, timeout=timeout)
1178 self.assertGreaterEqual(len(raw_pdu), 1)
1179 rep_pdu += raw_pdu
1180 missing = k5_len - len(rep_pdu)
1181 return rep_pdu
1183 def recv_reply(self, asn1_print=None, hexdump=None, timeout=None):
1184 rep_pdu = self.recv_pdu_raw(asn1_print=asn1_print,
1185 hexdump=hexdump,
1186 timeout=timeout)
1187 if not rep_pdu:
1188 return None, rep_pdu
1189 k5_raw = self.der_decode(
1190 rep_pdu,
1191 asn1Spec=None,
1192 native_encode=False,
1193 asn1_print=False,
1194 hexdump=False)
1195 pvno = k5_raw['field-0']
1196 self.assertEqual(pvno, 5)
1197 msg_type = k5_raw['field-1']
1198 self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
1199 if msg_type == KRB_AS_REP:
1200 asn1Spec = krb5_asn1.AS_REP()
1201 elif msg_type == KRB_TGS_REP:
1202 asn1Spec = krb5_asn1.TGS_REP()
1203 elif msg_type == KRB_ERROR:
1204 asn1Spec = krb5_asn1.KRB_ERROR()
1205 rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
1206 asn1_print=asn1_print, hexdump=False)
1207 return (rep, rep_pdu)
1209 def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
1210 (rep, rep_pdu) = self.recv_reply(asn1_print=asn1_print,
1211 hexdump=hexdump,
1212 timeout=timeout)
1213 return rep
1215 def assertIsConnected(self):
1216 self.assertIsNotNone(self.s, msg="Not connected")
1218 def assertNotConnected(self):
1219 self.assertIsNone(self.s, msg="Is connected")
1221 def send_recv_transaction(
1222 self,
1223 req,
1224 asn1_print=None,
1225 hexdump=None,
1226 timeout=None,
1227 to_rodc=False):
1228 host = self.host if to_rodc else self.dc_host
1229 self.connect(host)
1230 try:
1231 self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
1232 rep = self.recv_pdu(
1233 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
1234 except Exception:
1235 self._disconnect("transaction failed")
1236 raise
1237 self._disconnect("transaction done")
1238 return rep
1240 def getElementValue(self, obj, elem):
1241 return obj.get(elem)
1243 def assertElementMissing(self, obj, elem):
1244 v = self.getElementValue(obj, elem)
1245 self.assertIsNone(v)
1247 def assertElementPresent(self, obj, elem, expect_empty=False):
1248 v = self.getElementValue(obj, elem)
1249 self.assertIsNotNone(v)
1250 if self.strict_checking:
1251 if isinstance(v, collections.abc.Container):
1252 if expect_empty:
1253 self.assertEqual(0, len(v))
1254 else:
1255 self.assertNotEqual(0, len(v))
1257 def assertElementEqual(self, obj, elem, value):
1258 v = self.getElementValue(obj, elem)
1259 self.assertIsNotNone(v)
1260 self.assertEqual(v, value)
1262 def assertElementEqualUTF8(self, obj, elem, value):
1263 v = self.getElementValue(obj, elem)
1264 self.assertIsNotNone(v)
1265 self.assertEqual(v, bytes(value, 'utf8'))
1267 def assertPrincipalEqual(self, princ1, princ2):
1268 self.assertEqual(princ1['name-type'], princ2['name-type'])
1269 self.assertEqual(
1270 len(princ1['name-string']),
1271 len(princ2['name-string']),
1272 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1273 for idx in range(len(princ1['name-string'])):
1274 self.assertEqual(
1275 princ1['name-string'][idx],
1276 princ2['name-string'][idx],
1277 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1279 def assertElementEqualPrincipal(self, obj, elem, value):
1280 v = self.getElementValue(obj, elem)
1281 self.assertIsNotNone(v)
1282 v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
1283 self.assertPrincipalEqual(v, value)
1285 def assertElementKVNO(self, obj, elem, value):
1286 v = self.getElementValue(obj, elem)
1287 if value == "autodetect":
1288 value = v
1289 if value is not None:
1290 self.assertIsNotNone(v)
1291 # The value on the wire should never be 0
1292 self.assertNotEqual(v, 0)
1293 # unspecified_kvno means we don't know the kvno,
1294 # but want to enforce its presence
1295 if value is not self.unspecified_kvno:
1296 value = int(value)
1297 self.assertNotEqual(value, 0)
1298 self.assertEqual(v, value)
1299 else:
1300 self.assertIsNone(v)
1302 def assertElementFlags(self, obj, elem, expected, unexpected):
1303 v = self.getElementValue(obj, elem)
1304 self.assertIsNotNone(v)
1305 if expected is not None:
1306 self.assertIsInstance(expected, krb5_asn1.TicketFlags)
1307 for i, flag in enumerate(expected):
1308 if flag == 1:
1309 self.assertEqual('1', v[i],
1310 f"'{expected.namedValues[i]}' "
1311 f"expected in {v}")
1312 if unexpected is not None:
1313 self.assertIsInstance(unexpected, krb5_asn1.TicketFlags)
1314 for i, flag in enumerate(unexpected):
1315 if flag == 1:
1316 self.assertEqual('0', v[i],
1317 f"'{unexpected.namedValues[i]}' "
1318 f"unexpected in {v}")
1320 def assertSequenceElementsEqual(self, expected, got, *,
1321 require_strict=None,
1322 unchecked=None,
1323 require_ordered=True):
1324 if self.strict_checking and require_ordered and not unchecked:
1325 self.assertEqual(expected, got)
1326 else:
1327 fail_msg = f'expected: {expected} got: {got}'
1329 ignored = set()
1330 if unchecked:
1331 ignored.update(unchecked)
1332 if require_strict and not self.strict_checking:
1333 ignored.update(require_strict)
1335 if ignored:
1336 fail_msg += f' (ignoring: {ignored})'
1337 expected = (x for x in expected if x not in ignored)
1338 got = (x for x in got if x not in ignored)
1340 self.assertCountEqual(expected, got, fail_msg)
1342 def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
1343 if epoch is None:
1344 epoch = time.time()
1345 if offset is not None:
1346 epoch = epoch + int(offset)
1347 dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
1348 return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
1350 def get_KerberosTime(self, epoch=None, offset=None):
1351 (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
1352 return s
1354 def get_EpochFromKerberosTime(self, kerberos_time):
1355 if isinstance(kerberos_time, bytes):
1356 kerberos_time = kerberos_time.decode()
1358 epoch = datetime.datetime.strptime(kerberos_time,
1359 '%Y%m%d%H%M%SZ')
1360 epoch = epoch.replace(tzinfo=datetime.timezone.utc)
1361 epoch = int(epoch.timestamp())
1363 return epoch
1365 def get_Nonce(self):
1366 nonce_min = 0x7f000000
1367 nonce_max = 0x7fffffff
1368 v = random.randint(nonce_min, nonce_max)
1369 return v
1371 def get_pa_dict(self, pa_data):
1372 pa_dict = {}
1374 if pa_data is not None:
1375 for pa in pa_data:
1376 pa_type = pa['padata-type']
1377 if pa_type in pa_dict:
1378 raise RuntimeError(f'Duplicate type {pa_type}')
1379 pa_dict[pa_type] = pa['padata-value']
1381 return pa_dict
1383 def SessionKey_create(self, etype, contents, kvno=None):
1384 key = kcrypto.Key(etype, contents)
1385 return RodcPacEncryptionKey(key, kvno)
1387 def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None,
1388 params=None):
1389 self.assertIsNotNone(pwd)
1390 self.assertIsNotNone(salt)
1391 key = kcrypto.string_to_key(etype, pwd, salt, params=params)
1392 return RodcPacEncryptionKey(key, kvno)
1394 def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
1395 e = etype_info2['etype']
1396 salt = etype_info2.get('salt')
1397 params = etype_info2.get('s2kparams')
1398 return self.PasswordKey_from_etype(creds, e,
1399 kvno=kvno,
1400 salt=salt,
1401 params=params)
1403 def PasswordKey_from_creds(self, creds, etype):
1404 kvno = creds.get_kvno()
1405 salt = creds.get_salt()
1406 return self.PasswordKey_from_etype(creds, etype,
1407 kvno=kvno,
1408 salt=salt)
1410 def PasswordKey_from_etype(self, creds, etype, kvno=None, salt=None, params=None):
1411 if etype == kcrypto.Enctype.RC4:
1412 nthash = creds.get_nt_hash()
1413 return self.SessionKey_create(etype=etype, contents=nthash, kvno=kvno)
1415 password = creds.get_password().encode('utf-8')
1416 return self.PasswordKey_create(
1417 etype=etype, pwd=password, salt=salt, kvno=kvno)
1419 def TicketDecryptionKey_from_creds(self, creds, etype=None):
1421 if etype is None:
1422 etypes = creds.get_tgs_krb5_etypes()
1423 if etypes and etypes[0] not in (kcrypto.Enctype.DES_CRC,
1424 kcrypto.Enctype.DES_MD5):
1425 etype = etypes[0]
1426 else:
1427 etype = kcrypto.Enctype.RC4
1429 forced_key = creds.get_forced_key(etype)
1430 if forced_key is not None:
1431 return forced_key
1433 kvno = creds.get_kvno()
1435 fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
1436 "nor a password specified, " % (
1437 creds.get_username(), etype, kvno))
1439 if etype == kcrypto.Enctype.RC4:
1440 nthash = creds.get_nt_hash()
1441 self.assertIsNotNone(nthash, msg=fail_msg)
1442 return self.SessionKey_create(etype=etype,
1443 contents=nthash,
1444 kvno=kvno)
1446 password = creds.get_password()
1447 self.assertIsNotNone(password, msg=fail_msg)
1448 salt = creds.get_salt()
1449 return self.PasswordKey_create(etype=etype,
1450 pwd=password,
1451 salt=salt,
1452 kvno=kvno)
1454 def RandomKey(self, etype):
1455 e = kcrypto._get_enctype_profile(etype)
1456 contents = samba.generate_random_bytes(e.keysize)
1457 return self.SessionKey_create(etype=etype, contents=contents)
1459 def EncryptionKey_import(self, EncryptionKey_obj):
1460 return self.SessionKey_create(EncryptionKey_obj['keytype'],
1461 EncryptionKey_obj['keyvalue'])
1463 def EncryptedData_create(self, key, usage, plaintext):
1464 # EncryptedData ::= SEQUENCE {
1465 # etype [0] Int32 -- EncryptionType --,
1466 # kvno [1] Int32 OPTIONAL,
1467 # cipher [2] OCTET STRING -- ciphertext
1469 ciphertext = key.encrypt(usage, plaintext)
1470 EncryptedData_obj = {
1471 'etype': key.etype,
1472 'cipher': ciphertext
1474 if key.kvno is not None:
1475 EncryptedData_obj['kvno'] = key.kvno
1476 return EncryptedData_obj
1478 def Checksum_create(self, key, usage, plaintext, ctype=None):
1479 # Checksum ::= SEQUENCE {
1480 # cksumtype [0] Int32,
1481 # checksum [1] OCTET STRING
1483 if ctype is None:
1484 ctype = key.ctype
1485 checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1486 Checksum_obj = {
1487 'cksumtype': ctype,
1488 'checksum': checksum,
1490 return Checksum_obj
1492 @classmethod
1493 def PrincipalName_create(cls, name_type, names):
1494 # PrincipalName ::= SEQUENCE {
1495 # name-type [0] Int32,
1496 # name-string [1] SEQUENCE OF KerberosString
1498 PrincipalName_obj = {
1499 'name-type': name_type,
1500 'name-string': names,
1502 return PrincipalName_obj
1504 def AuthorizationData_create(self, ad_type, ad_data):
1505 # AuthorizationData ::= SEQUENCE {
1506 # ad-type [0] Int32,
1507 # ad-data [1] OCTET STRING
1509 AUTH_DATA_obj = {
1510 'ad-type': ad_type,
1511 'ad-data': ad_data
1513 return AUTH_DATA_obj
1515 def PA_DATA_create(self, padata_type, padata_value):
1516 # PA-DATA ::= SEQUENCE {
1517 # -- NOTE: first tag is [1], not [0]
1518 # padata-type [1] Int32,
1519 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1521 PA_DATA_obj = {
1522 'padata-type': padata_type,
1523 'padata-value': padata_value,
1525 return PA_DATA_obj
1527 def PA_ENC_TS_ENC_create(self, ts, usec):
1528 # PA-ENC-TS-ENC ::= SEQUENCE {
1529 # patimestamp[0] KerberosTime, -- client's time
1530 # pausec[1] krb5int32 OPTIONAL
1532 PA_ENC_TS_ENC_obj = {
1533 'patimestamp': ts,
1534 'pausec': usec,
1536 return PA_ENC_TS_ENC_obj
1538 def PA_PAC_OPTIONS_create(self, options):
1539 # PA-PAC-OPTIONS ::= SEQUENCE {
1540 # options [0] PACOptionFlags
1542 PA_PAC_OPTIONS_obj = {
1543 'options': options
1545 return PA_PAC_OPTIONS_obj
1547 def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1548 # KrbFastArmor ::= SEQUENCE {
1549 # armor-type [0] Int32,
1550 # armor-value [1] OCTET STRING,
1551 # ...
1553 KRB_FAST_ARMOR_obj = {
1554 'armor-type': armor_type,
1555 'armor-value': armor_value
1557 return KRB_FAST_ARMOR_obj
1559 def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1560 # KrbFastReq ::= SEQUENCE {
1561 # fast-options [0] FastOptions,
1562 # padata [1] SEQUENCE OF PA-DATA,
1563 # req-body [2] KDC-REQ-BODY,
1564 # ...
1566 KRB_FAST_REQ_obj = {
1567 'fast-options': fast_options,
1568 'padata': padata,
1569 'req-body': req_body
1571 return KRB_FAST_REQ_obj
1573 def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1574 # KrbFastArmoredReq ::= SEQUENCE {
1575 # armor [0] KrbFastArmor OPTIONAL,
1576 # req-checksum [1] Checksum,
1577 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1579 KRB_FAST_ARMORED_REQ_obj = {
1580 'req-checksum': req_checksum,
1581 'enc-fast-req': enc_fast_req
1583 if armor is not None:
1584 KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1585 return KRB_FAST_ARMORED_REQ_obj
1587 def PA_FX_FAST_REQUEST_create(self, armored_data):
1588 # PA-FX-FAST-REQUEST ::= CHOICE {
1589 # armored-data [0] KrbFastArmoredReq,
1590 # ...
1592 PA_FX_FAST_REQUEST_obj = {
1593 'armored-data': armored_data
1595 return PA_FX_FAST_REQUEST_obj
1597 def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1598 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1599 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1600 # -- include PAC.
1601 # --If FALSE, and PAC present,
1602 # -- remove PAC.
1604 KERB_PA_PAC_REQUEST_obj = {
1605 'include-pac': include_pac,
1607 if not pa_data_create:
1608 return KERB_PA_PAC_REQUEST_obj
1609 pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1610 asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1611 pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1612 return pa_data
1614 def get_pa_pac_options(self, options):
1615 pac_options = self.PA_PAC_OPTIONS_create(options)
1616 pac_options = self.der_encode(pac_options,
1617 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
1618 pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options)
1620 return pac_options
1622 def KDC_REQ_BODY_create(self,
1623 kdc_options,
1624 cname,
1625 realm,
1626 sname,
1627 from_time,
1628 till_time,
1629 renew_time,
1630 nonce,
1631 etypes,
1632 addresses,
1633 additional_tickets,
1634 EncAuthorizationData,
1635 EncAuthorizationData_key,
1636 EncAuthorizationData_usage,
1637 asn1_print=None,
1638 hexdump=None):
1639 # KDC-REQ-BODY ::= SEQUENCE {
1640 # kdc-options [0] KDCOptions,
1641 # cname [1] PrincipalName OPTIONAL
1642 # -- Used only in AS-REQ --,
1643 # realm [2] Realm
1644 # -- Server's realm
1645 # -- Also client's in AS-REQ --,
1646 # sname [3] PrincipalName OPTIONAL,
1647 # from [4] KerberosTime OPTIONAL,
1648 # till [5] KerberosTime,
1649 # rtime [6] KerberosTime OPTIONAL,
1650 # nonce [7] UInt32,
1651 # etype [8] SEQUENCE OF Int32
1652 # -- EncryptionType
1653 # -- in preference order --,
1654 # addresses [9] HostAddresses OPTIONAL,
1655 # enc-authorization-data [10] EncryptedData OPTIONAL
1656 # -- AuthorizationData --,
1657 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1658 # -- NOTE: not empty
1660 if EncAuthorizationData is not None:
1661 enc_ad_plain = self.der_encode(
1662 EncAuthorizationData,
1663 asn1Spec=krb5_asn1.AuthorizationData(),
1664 asn1_print=asn1_print,
1665 hexdump=hexdump)
1666 enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1667 EncAuthorizationData_usage,
1668 enc_ad_plain)
1669 else:
1670 enc_ad = None
1671 KDC_REQ_BODY_obj = {
1672 'kdc-options': kdc_options,
1673 'realm': realm,
1674 'till': till_time,
1675 'nonce': nonce,
1676 'etype': etypes,
1678 if cname is not None:
1679 KDC_REQ_BODY_obj['cname'] = cname
1680 if sname is not None:
1681 KDC_REQ_BODY_obj['sname'] = sname
1682 if from_time is not None:
1683 KDC_REQ_BODY_obj['from'] = from_time
1684 if renew_time is not None:
1685 KDC_REQ_BODY_obj['rtime'] = renew_time
1686 if addresses is not None:
1687 KDC_REQ_BODY_obj['addresses'] = addresses
1688 if enc_ad is not None:
1689 KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1690 if additional_tickets is not None:
1691 KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1692 return KDC_REQ_BODY_obj
1694 def KDC_REQ_create(self,
1695 msg_type,
1696 padata,
1697 req_body,
1698 asn1Spec=None,
1699 asn1_print=None,
1700 hexdump=None):
1701 # KDC-REQ ::= SEQUENCE {
1702 # -- NOTE: first tag is [1], not [0]
1703 # pvno [1] INTEGER (5) ,
1704 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1705 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1706 # -- NOTE: not empty --,
1707 # req-body [4] KDC-REQ-BODY
1710 KDC_REQ_obj = {
1711 'pvno': 5,
1712 'msg-type': msg_type,
1713 'req-body': req_body,
1715 if padata is not None:
1716 KDC_REQ_obj['padata'] = padata
1717 if asn1Spec is not None:
1718 KDC_REQ_decoded = pyasn1_native_decode(
1719 KDC_REQ_obj, asn1Spec=asn1Spec)
1720 else:
1721 KDC_REQ_decoded = None
1722 return KDC_REQ_obj, KDC_REQ_decoded
1724 def AS_REQ_create(self,
1725 padata, # optional
1726 kdc_options, # required
1727 cname, # optional
1728 realm, # required
1729 sname, # optional
1730 from_time, # optional
1731 till_time, # required
1732 renew_time, # optional
1733 nonce, # required
1734 etypes, # required
1735 addresses, # optional
1736 additional_tickets,
1737 native_decoded_only=True,
1738 asn1_print=None,
1739 hexdump=None):
1740 # KDC-REQ ::= SEQUENCE {
1741 # -- NOTE: first tag is [1], not [0]
1742 # pvno [1] INTEGER (5) ,
1743 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1744 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1745 # -- NOTE: not empty --,
1746 # req-body [4] KDC-REQ-BODY
1749 # KDC-REQ-BODY ::= SEQUENCE {
1750 # kdc-options [0] KDCOptions,
1751 # cname [1] PrincipalName OPTIONAL
1752 # -- Used only in AS-REQ --,
1753 # realm [2] Realm
1754 # -- Server's realm
1755 # -- Also client's in AS-REQ --,
1756 # sname [3] PrincipalName OPTIONAL,
1757 # from [4] KerberosTime OPTIONAL,
1758 # till [5] KerberosTime,
1759 # rtime [6] KerberosTime OPTIONAL,
1760 # nonce [7] UInt32,
1761 # etype [8] SEQUENCE OF Int32
1762 # -- EncryptionType
1763 # -- in preference order --,
1764 # addresses [9] HostAddresses OPTIONAL,
1765 # enc-authorization-data [10] EncryptedData OPTIONAL
1766 # -- AuthorizationData --,
1767 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1768 # -- NOTE: not empty
1770 KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1771 kdc_options,
1772 cname,
1773 realm,
1774 sname,
1775 from_time,
1776 till_time,
1777 renew_time,
1778 nonce,
1779 etypes,
1780 addresses,
1781 additional_tickets,
1782 EncAuthorizationData=None,
1783 EncAuthorizationData_key=None,
1784 EncAuthorizationData_usage=None,
1785 asn1_print=asn1_print,
1786 hexdump=hexdump)
1787 obj, decoded = self.KDC_REQ_create(
1788 msg_type=KRB_AS_REQ,
1789 padata=padata,
1790 req_body=KDC_REQ_BODY_obj,
1791 asn1Spec=krb5_asn1.AS_REQ(),
1792 asn1_print=asn1_print,
1793 hexdump=hexdump)
1794 if native_decoded_only:
1795 return decoded
1796 return decoded, obj
1798 def AP_REQ_create(self, ap_options, ticket, authenticator):
1799 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1800 # pvno [0] INTEGER (5),
1801 # msg-type [1] INTEGER (14),
1802 # ap-options [2] APOptions,
1803 # ticket [3] Ticket,
1804 # authenticator [4] EncryptedData -- Authenticator
1806 AP_REQ_obj = {
1807 'pvno': 5,
1808 'msg-type': KRB_AP_REQ,
1809 'ap-options': ap_options,
1810 'ticket': ticket,
1811 'authenticator': authenticator,
1813 return AP_REQ_obj
1815 def Authenticator_create(
1816 self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1817 authorization_data):
1818 # -- Unencrypted authenticator
1819 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1820 # authenticator-vno [0] INTEGER (5),
1821 # crealm [1] Realm,
1822 # cname [2] PrincipalName,
1823 # cksum [3] Checksum OPTIONAL,
1824 # cusec [4] Microseconds,
1825 # ctime [5] KerberosTime,
1826 # subkey [6] EncryptionKey OPTIONAL,
1827 # seq-number [7] UInt32 OPTIONAL,
1828 # authorization-data [8] AuthorizationData OPTIONAL
1830 Authenticator_obj = {
1831 'authenticator-vno': 5,
1832 'crealm': crealm,
1833 'cname': cname,
1834 'cusec': cusec,
1835 'ctime': ctime,
1837 if cksum is not None:
1838 Authenticator_obj['cksum'] = cksum
1839 if subkey is not None:
1840 Authenticator_obj['subkey'] = subkey
1841 if seq_number is not None:
1842 Authenticator_obj['seq-number'] = seq_number
1843 if authorization_data is not None:
1844 Authenticator_obj['authorization-data'] = authorization_data
1845 return Authenticator_obj
1847 def PKAuthenticator_create(self,
1848 cusec,
1849 ctime,
1850 nonce,
1852 pa_checksum=None,
1853 freshness_token=None,
1854 kdc_name=None,
1855 kdc_realm=None,
1856 win2k_variant=False):
1857 if win2k_variant:
1858 self.assertIsNone(pa_checksum)
1859 self.assertIsNone(freshness_token)
1860 self.assertIsNotNone(kdc_name)
1861 self.assertIsNotNone(kdc_realm)
1862 else:
1863 self.assertIsNone(kdc_name)
1864 self.assertIsNone(kdc_realm)
1866 pk_authenticator_obj = {
1867 'cusec': cusec,
1868 'ctime': ctime,
1869 'nonce': nonce,
1871 if pa_checksum is not None:
1872 pk_authenticator_obj['paChecksum'] = pa_checksum
1873 if freshness_token is not None:
1874 pk_authenticator_obj['freshnessToken'] = freshness_token
1875 if kdc_name is not None:
1876 pk_authenticator_obj['kdcName'] = kdc_name
1877 if kdc_realm is not None:
1878 pk_authenticator_obj['kdcRealm'] = kdc_realm
1880 return pk_authenticator_obj
1882 def TGS_REQ_create(self,
1883 padata, # optional
1884 cusec,
1885 ctime,
1886 ticket,
1887 kdc_options, # required
1888 cname, # optional
1889 realm, # required
1890 sname, # optional
1891 from_time, # optional
1892 till_time, # required
1893 renew_time, # optional
1894 nonce, # required
1895 etypes, # required
1896 addresses, # optional
1897 EncAuthorizationData,
1898 EncAuthorizationData_key,
1899 additional_tickets,
1900 ticket_session_key,
1901 authenticator_subkey=None,
1902 body_checksum_type=None,
1903 native_decoded_only=True,
1904 asn1_print=None,
1905 hexdump=None):
1906 # KDC-REQ ::= SEQUENCE {
1907 # -- NOTE: first tag is [1], not [0]
1908 # pvno [1] INTEGER (5) ,
1909 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1910 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1911 # -- NOTE: not empty --,
1912 # req-body [4] KDC-REQ-BODY
1915 # KDC-REQ-BODY ::= SEQUENCE {
1916 # kdc-options [0] KDCOptions,
1917 # cname [1] PrincipalName OPTIONAL
1918 # -- Used only in AS-REQ --,
1919 # realm [2] Realm
1920 # -- Server's realm
1921 # -- Also client's in AS-REQ --,
1922 # sname [3] PrincipalName OPTIONAL,
1923 # from [4] KerberosTime OPTIONAL,
1924 # till [5] KerberosTime,
1925 # rtime [6] KerberosTime OPTIONAL,
1926 # nonce [7] UInt32,
1927 # etype [8] SEQUENCE OF Int32
1928 # -- EncryptionType
1929 # -- in preference order --,
1930 # addresses [9] HostAddresses OPTIONAL,
1931 # enc-authorization-data [10] EncryptedData OPTIONAL
1932 # -- AuthorizationData --,
1933 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1934 # -- NOTE: not empty
1937 if authenticator_subkey is not None:
1938 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1939 else:
1940 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1942 req_body = self.KDC_REQ_BODY_create(
1943 kdc_options=kdc_options,
1944 cname=None,
1945 realm=realm,
1946 sname=sname,
1947 from_time=from_time,
1948 till_time=till_time,
1949 renew_time=renew_time,
1950 nonce=nonce,
1951 etypes=etypes,
1952 addresses=addresses,
1953 additional_tickets=additional_tickets,
1954 EncAuthorizationData=EncAuthorizationData,
1955 EncAuthorizationData_key=EncAuthorizationData_key,
1956 EncAuthorizationData_usage=EncAuthorizationData_usage)
1957 req_body_blob = self.der_encode(req_body,
1958 asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1959 asn1_print=asn1_print, hexdump=hexdump)
1961 req_body_checksum = self.Checksum_create(ticket_session_key,
1962 KU_TGS_REQ_AUTH_CKSUM,
1963 req_body_blob,
1964 ctype=body_checksum_type)
1966 subkey_obj = None
1967 if authenticator_subkey is not None:
1968 subkey_obj = authenticator_subkey.export_obj()
1969 seq_number = random.randint(0, 0xfffffffe)
1970 authenticator = self.Authenticator_create(
1971 crealm=realm,
1972 cname=cname,
1973 cksum=req_body_checksum,
1974 cusec=cusec,
1975 ctime=ctime,
1976 subkey=subkey_obj,
1977 seq_number=seq_number,
1978 authorization_data=None)
1979 authenticator = self.der_encode(
1980 authenticator,
1981 asn1Spec=krb5_asn1.Authenticator(),
1982 asn1_print=asn1_print,
1983 hexdump=hexdump)
1985 authenticator = self.EncryptedData_create(
1986 ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1988 ap_options = krb5_asn1.APOptions('0')
1989 ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1990 ticket=ticket,
1991 authenticator=authenticator)
1992 ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1993 asn1_print=asn1_print, hexdump=hexdump)
1994 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1995 if padata is not None:
1996 padata.append(pa_tgs_req)
1997 else:
1998 padata = [pa_tgs_req]
2000 obj, decoded = self.KDC_REQ_create(
2001 msg_type=KRB_TGS_REQ,
2002 padata=padata,
2003 req_body=req_body,
2004 asn1Spec=krb5_asn1.TGS_REQ(),
2005 asn1_print=asn1_print,
2006 hexdump=hexdump)
2007 if native_decoded_only:
2008 return decoded
2009 return decoded, obj
2011 def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
2012 # PA-S4U2Self ::= SEQUENCE {
2013 # name [0] PrincipalName,
2014 # realm [1] Realm,
2015 # cksum [2] Checksum,
2016 # auth [3] GeneralString
2018 cksum_data = name['name-type'].to_bytes(4, byteorder='little')
2019 for n in name['name-string']:
2020 cksum_data += n.encode()
2021 cksum_data += realm.encode()
2022 cksum_data += "Kerberos".encode()
2023 cksum = self.Checksum_create(tgt_session_key,
2024 KU_NON_KERB_CKSUM_SALT,
2025 cksum_data,
2026 ctype)
2028 PA_S4U2Self_obj = {
2029 'name': name,
2030 'realm': realm,
2031 'cksum': cksum,
2032 'auth': "Kerberos",
2034 pa_s4u2self = self.der_encode(
2035 PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
2036 return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
2038 def ChangePasswdDataMS_create(self,
2039 new_password,
2040 target_princ=None,
2041 target_realm=None):
2042 ChangePasswdDataMS_obj = {
2043 'newpasswd': new_password,
2045 if target_princ is not None:
2046 ChangePasswdDataMS_obj['targname'] = target_princ
2047 if target_realm is not None:
2048 ChangePasswdDataMS_obj['targrealm'] = target_realm
2050 change_password_data = self.der_encode(
2051 ChangePasswdDataMS_obj, asn1Spec=krb5_asn1.ChangePasswdDataMS())
2053 return change_password_data
2055 def KRB_PRIV_create(self,
2056 subkey,
2057 user_data,
2058 s_address,
2059 timestamp=None,
2060 usec=None,
2061 seq_number=None,
2062 r_address=None):
2063 EncKrbPrivPart_obj = {
2064 'user-data': user_data,
2065 's-address': s_address,
2067 if timestamp is not None:
2068 EncKrbPrivPart_obj['timestamp'] = timestamp
2069 if usec is not None:
2070 EncKrbPrivPart_obj['usec'] = usec
2071 if seq_number is not None:
2072 EncKrbPrivPart_obj['seq-number'] = seq_number
2073 if r_address is not None:
2074 EncKrbPrivPart_obj['r-address'] = r_address
2076 enc_krb_priv_part = self.der_encode(
2077 EncKrbPrivPart_obj, asn1Spec=krb5_asn1.EncKrbPrivPart())
2079 enc_data = self.EncryptedData_create(subkey,
2080 KU_KRB_PRIV,
2081 enc_krb_priv_part)
2083 KRB_PRIV_obj = {
2084 'pvno': 5,
2085 'msg-type': KRB_PRIV,
2086 'enc-part': enc_data,
2089 krb_priv = self.der_encode(
2090 KRB_PRIV_obj, asn1Spec=krb5_asn1.KRB_PRIV())
2092 return krb_priv
2094 def ContentInfo_create(self, content_type, content):
2095 content_info_obj = {
2096 'contentType': content_type,
2097 'content': content,
2100 return content_info_obj
2102 def EncapsulatedContentInfo_create(self, content_type, content):
2103 encapsulated_content_info_obj = {
2104 'eContentType': content_type,
2105 'eContent': content,
2108 return encapsulated_content_info_obj
2110 def SignedData_create(self,
2111 digest_algorithms,
2112 encap_content_info,
2113 signer_infos,
2115 version=None,
2116 certificates=None,
2117 crls=None):
2118 def is_cert_version_present(version):
2119 return certificates is not None and any(
2120 version in cert for cert in certificates)
2122 def is_crl_version_present(version):
2123 return crls is not None and any(
2124 version in crl for crl in crls)
2126 def is_signer_info_version_present(version):
2127 return signer_infos is not None and any(
2128 signer_info['version'] == version
2129 for signer_info in signer_infos)
2131 def data_version():
2132 # per RFC5652 5.1:
2133 if is_cert_version_present('other') or (
2134 is_crl_version_present('other')):
2135 return 5
2137 if is_cert_version_present('v2AttrCert'):
2138 return 4
2140 if is_cert_version_present('v1AttrCert') or (
2141 is_signer_info_version_present(3)) or (
2142 encap_content_info['eContentType'] != krb5_asn1.id_data
2144 return 3
2146 return 1
2148 if version is None:
2149 version = data_version()
2151 signed_data_obj = {
2152 'version': version,
2153 'digestAlgorithms': digest_algorithms,
2154 'encapContentInfo': encap_content_info,
2155 'signerInfos': signer_infos,
2158 if certificates is not None:
2159 signed_data_obj['certificates'] = certificates
2160 if crls is not None:
2161 signed_data_obj['crls'] = crls
2163 return signed_data_obj
2165 def AuthPack_create(self,
2166 pk_authenticator,
2168 client_public_value=None,
2169 supported_cms_types=None,
2170 client_dh_nonce=None,
2171 win2k_variant=False):
2172 if win2k_variant:
2173 self.assertIsNone(supported_cms_types)
2174 self.assertIsNone(client_dh_nonce)
2176 auth_pack_obj = {
2177 'pkAuthenticator': pk_authenticator,
2180 if client_public_value is not None:
2181 auth_pack_obj['clientPublicValue'] = client_public_value
2182 if supported_cms_types is not None:
2183 auth_pack_obj['supportedCMSTypes'] = supported_cms_types
2184 if client_dh_nonce is not None:
2185 auth_pack_obj['clientDHNonce'] = client_dh_nonce
2187 return auth_pack_obj
2189 def PK_AS_REQ_create(self,
2190 signed_auth_pack,
2192 trusted_certifiers=None,
2193 kdc_pk_id=None,
2194 kdc_cert=None,
2195 encryption_cert=None,
2196 win2k_variant=False):
2197 if win2k_variant:
2198 self.assertIsNone(kdc_pk_id)
2199 asn1_spec = krb5_asn1.PA_PK_AS_REQ_Win2k
2200 else:
2201 self.assertIsNone(kdc_cert)
2202 self.assertIsNone(encryption_cert)
2203 asn1_spec = krb5_asn1.PA_PK_AS_REQ
2205 content_info_obj = self.ContentInfo_create(
2206 krb5_asn1.id_signedData, signed_auth_pack)
2207 content_info = self.der_encode(content_info_obj,
2208 asn1Spec=krb5_asn1.ContentInfo())
2210 pk_as_req_obj = {
2211 'signedAuthPack': content_info,
2214 if trusted_certifiers is not None:
2215 pk_as_req_obj['trustedCertifiers'] = trusted_certifiers
2216 if kdc_pk_id is not None:
2217 pk_as_req_obj['kdcPkId'] = kdc_pk_id
2218 if kdc_cert is not None:
2219 pk_as_req_obj['kdcCert'] = kdc_cert
2220 if encryption_cert is not None:
2221 pk_as_req_obj['encryptionCert'] = encryption_cert
2223 return self.der_encode(pk_as_req_obj, asn1Spec=asn1_spec())
2225 def SignerInfo_create(self,
2226 signer_id,
2227 digest_algorithm,
2228 signature_algorithm,
2229 signature,
2231 version=None,
2232 signed_attrs=None,
2233 unsigned_attrs=None):
2234 if version is None:
2235 # per RFC5652 5.3:
2236 if 'issuerAndSerialNumber' in signer_id:
2237 version = 1
2238 elif 'subjectKeyIdentifier' in signer_id:
2239 version = 3
2240 else:
2241 self.fail(f'unknown signer ID version ({signer_id})')
2243 signer_info_obj = {
2244 'version': version,
2245 'sid': signer_id,
2246 'digestAlgorithm': digest_algorithm,
2247 'signatureAlgorithm': signature_algorithm,
2248 'signature': signature,
2251 if signed_attrs is not None:
2252 signer_info_obj['signedAttrs'] = signed_attrs
2253 if unsigned_attrs is not None:
2254 signer_info_obj['unsignedAttrs'] = unsigned_attrs
2256 return signer_info_obj
2258 def SignerIdentifier_create(self, *,
2259 issuer_and_serial_number=None,
2260 subject_key_id=None):
2261 if issuer_and_serial_number is not None:
2262 return {'issuerAndSerialNumber': issuer_and_serial_number}
2264 if subject_key_id is not None:
2265 return {'subjectKeyIdentifier': subject_key_id}
2267 self.fail('identifier not specified')
2269 def AlgorithmIdentifier_create(self,
2270 algorithm,
2272 parameters=None):
2273 algorithm_id_obj = {
2274 'algorithm': algorithm,
2277 if parameters is not None:
2278 algorithm_id_obj['parameters'] = parameters
2280 return algorithm_id_obj
2282 def SubjectPublicKeyInfo_create(self,
2283 algorithm,
2284 public_key):
2285 return {
2286 'algorithm': algorithm,
2287 'subjectPublicKey': public_key,
2290 def ValidationParms_create(self,
2291 seed,
2292 pgen_counter):
2293 return {
2294 'seed': seed,
2295 'pgenCounter': pgen_counter,
2298 def DomainParameters_create(self,
2302 q=None,
2303 j=None,
2304 validation_parms=None):
2305 domain_params_obj = {
2306 'p': p,
2307 'g': g,
2310 if q is not None:
2311 domain_params_obj['q'] = q
2312 if j is not None:
2313 domain_params_obj['j'] = j
2314 if validation_parms is not None:
2315 domain_params_obj['validationParms'] = validation_parms
2317 return domain_params_obj
2319 def length_in_bytes(self, value):
2320 """Return the length in bytes of an integer once it is encoded as
2321 bytes."""
2323 self.assertGreaterEqual(value, 0, 'value must be positive')
2324 self.assertIsInstance(value, int)
2326 length_in_bits = max(1, math.log2(value + 1))
2327 length_in_bytes = math.ceil(length_in_bits / 8)
2328 return length_in_bytes
2330 def bytes_from_int(self, value, *, length=None):
2331 """Return an integer encoded big-endian into bytes of an optionally
2332 specified length.
2334 if length is None:
2335 length = self.length_in_bytes(value)
2336 return value.to_bytes(length, 'big')
2338 def int_from_bytes(self, data):
2339 """Return an integer decoded from bytes in big-endian format."""
2340 return int.from_bytes(data, 'big')
2342 def int_from_bit_string(self, string):
2343 """Return an integer decoded from a bitstring."""
2344 return int(string, base=2)
2346 def bit_string_from_int(self, value):
2347 """Return a bitstring encoding of an integer."""
2349 string = f'{value:b}'
2351 # The bitstring must be padded to a multiple of 8 bits in length, or
2352 # pyasn1 will interpret it incorrectly (as if the padding bits were
2353 # present, but on the wrong end).
2354 length = len(string)
2355 padding_len = math.ceil(length / 8) * 8 - length
2356 return '0' * padding_len + string
2358 def bit_string_from_bytes(self, data):
2359 """Return a bitstring encoding of bytes in big-endian format."""
2360 value = self.int_from_bytes(data)
2361 return self.bit_string_from_int(value)
2363 def bytes_from_bit_string(self, string):
2364 """Return big-endian format bytes encoded from a bitstring."""
2365 value = self.int_from_bit_string(string)
2366 length = math.ceil(len(string) / 8)
2367 return value.to_bytes(length, 'big')
2369 def asn1_length(self, data):
2370 """Return the ASN.1 encoding of the length of some data."""
2372 length = len(data)
2374 self.assertGreater(length, 0)
2375 if length < 0x80:
2376 return bytes([length])
2378 encoding_len = self.length_in_bytes(length)
2379 self.assertLess(encoding_len, 0x80,
2380 'item is too long to be ASN.1 encoded')
2382 data = self.bytes_from_int(length, length=encoding_len)
2383 return bytes([0x80 | encoding_len]) + data
2385 @staticmethod
2386 def octetstring2key(x, enctype):
2387 """This implements the function defined in RFC4556 3.2.3.1 “Using
2388 Diffie-Hellman Key Exchange”."""
2390 seedsize = kcrypto.seedsize(enctype)
2391 seed = b''
2393 # A counter that cycles through the bytes 0x00–0xff.
2394 counter = itertools.cycle(map(lambda x: bytes([x]),
2395 range(256)))
2397 while len(seed) < seedsize:
2398 digest = hashes.Hash(hashes.SHA1(), default_backend())
2399 digest.update(next(counter) + x)
2400 seed += digest.finalize()
2402 key = kcrypto.random_to_key(enctype, seed[:seedsize])
2403 return RodcPacEncryptionKey(key, kvno=None)
2405 def unpad(self, data):
2406 """Return unpadded data."""
2407 padding_len = data[-1]
2408 expected_padding = bytes([padding_len]) * padding_len
2409 self.assertEqual(expected_padding, data[-padding_len:],
2410 'invalid padding bytes')
2412 return data[:-padding_len]
2414 def try_decode(self, data, module=None):
2415 """Try to decode some data of unknown type with various known ASN.1
2416 schemata (optionally restricted to those from a particular module) and
2417 print any results that seem promising. For use when debugging.
2420 if module is None:
2421 # Try a couple of known ASN.1 modules.
2422 self.try_decode(data, krb5_asn1)
2423 self.try_decode(data, pyasn1.type.univ)
2425 # It’s helpful to stop and give the user a chance to examine the
2426 # results.
2427 self.fail('decoding done')
2429 names = dir(module)
2430 for name in names:
2431 item = getattr(module, name)
2432 if not callable(item):
2433 continue
2435 try:
2436 decoded = self.der_decode(data, asn1Spec=item())
2437 except Exception:
2438 # Initiating the schema or decoding the ASN.1 failed for
2439 # whatever reason.
2440 pass
2441 else:
2442 # Decoding succeeded: print the structure to be examined.
2443 print(f'\t{name}')
2444 pprint(decoded)
2446 def cipher_from_algorithm(self, algorithm):
2447 if algorithm == str(krb5_asn1.aes256_CBC_PAD):
2448 return algorithms.AES
2450 if algorithm == str(krb5_asn1.des_EDE3_CBC):
2451 return algorithms.TripleDES
2453 self.fail(f'unknown cipher algorithm {algorithm}')
2455 def hash_from_algorithm(self, algorithm):
2456 # Let someone pass in an ObjectIdentifier.
2457 algorithm = str(algorithm)
2459 if algorithm == str(krb5_asn1.id_sha1):
2460 return hashes.SHA1
2462 if algorithm == str(krb5_asn1.sha1WithRSAEncryption):
2463 return hashes.SHA1
2465 if algorithm == str(krb5_asn1.rsaEncryption):
2466 return hashes.SHA1
2468 if algorithm == str(krb5_asn1.id_pkcs1_sha256WithRSAEncryption):
2469 return hashes.SHA256
2471 if algorithm == str(krb5_asn1.id_sha512):
2472 return hashes.SHA512
2474 self.fail(f'unknown hash algorithm {algorithm}')
2476 def hash_from_algorithm_id(self, algorithm_id):
2477 self.assertIsInstance(algorithm_id, dict)
2479 hash = self.hash_from_algorithm(algorithm_id['algorithm'])
2481 parameters = algorithm_id.get('parameters')
2482 if self.strict_checking:
2483 self.assertIsNotNone(parameters)
2484 if parameters is not None:
2485 self.assertEqual(b'\x05\x00', parameters)
2487 return hash
2489 def create_freshness_token(self,
2490 epoch=None,
2492 offset=None,
2493 krbtgt_creds=None):
2494 timestamp, usec = self.get_KerberosTimeWithUsec(epoch, offset)
2496 # Encode the freshness token as PA-ENC-TS-ENC.
2497 ts_enc = self.PA_ENC_TS_ENC_create(timestamp, usec)
2498 ts_enc = self.der_encode(ts_enc, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
2500 if krbtgt_creds is None:
2501 krbtgt_creds = self.get_krbtgt_creds()
2502 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
2504 # Encrypt the freshness token.
2505 freshness = self.EncryptedData_create(krbtgt_key, KU_AS_FRESHNESS, ts_enc)
2507 freshness_token = self.der_encode(freshness,
2508 asn1Spec=krb5_asn1.EncryptedData())
2510 # Prepend a couple of zero bytes.
2511 freshness_token = bytes(2) + freshness_token
2513 return freshness_token
2515 def kpasswd_create(self,
2516 subkey,
2517 user_data,
2518 version,
2519 seq_number,
2520 ap_req,
2521 local_address,
2522 remote_address):
2523 self.assertIsNotNone(self.s, 'call self.connect() first')
2525 timestamp, usec = self.get_KerberosTimeWithUsec()
2527 krb_priv = self.KRB_PRIV_create(subkey,
2528 user_data,
2529 s_address=local_address,
2530 timestamp=timestamp,
2531 usec=usec,
2532 seq_number=seq_number,
2533 r_address=remote_address)
2535 size = 6 + len(ap_req) + len(krb_priv)
2536 self.assertLess(size, 0x10000)
2538 msg = bytearray()
2539 msg.append(size >> 8)
2540 msg.append(size & 0xff)
2541 msg.append(version >> 8)
2542 msg.append(version & 0xff)
2543 msg.append(len(ap_req) >> 8)
2544 msg.append(len(ap_req) & 0xff)
2545 # Note: for sets, there could be a little-endian four-byte length here.
2547 msg.extend(ap_req)
2548 msg.extend(krb_priv)
2550 return msg
2552 def get_enc_part(self, obj, key, usage):
2553 self.assertElementEqual(obj, 'pvno', 5)
2555 enc_part = obj['enc-part']
2556 self.assertElementEqual(enc_part, 'etype', key.etype)
2557 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
2559 enc_part = key.decrypt(usage, enc_part['cipher'])
2561 return enc_part
2563 def kpasswd_exchange(self,
2564 ticket,
2565 new_password,
2566 expected_code,
2567 expected_msg,
2568 mode,
2569 target_princ=None,
2570 target_realm=None,
2571 ap_options=None,
2572 send_seq_number=True):
2573 if mode is self.KpasswdMode.SET:
2574 version = 0xff80
2575 user_data = self.ChangePasswdDataMS_create(new_password,
2576 target_princ,
2577 target_realm)
2578 elif mode is self.KpasswdMode.CHANGE:
2579 self.assertIsNone(target_princ,
2580 'target_princ only valid for pw set')
2581 self.assertIsNone(target_realm,
2582 'target_realm only valid for pw set')
2584 version = 1
2585 user_data = new_password.encode('utf-8')
2586 else:
2587 self.fail(f'invalid mode {mode}')
2589 subkey = self.RandomKey(kcrypto.Enctype.AES256)
2591 if ap_options is None:
2592 ap_options = '0'
2593 ap_options = str(krb5_asn1.APOptions(ap_options))
2595 kdc_exchange_dict = {
2596 'tgt': ticket,
2597 'authenticator_subkey': subkey,
2598 'auth_data': None,
2599 'ap_options': ap_options,
2602 if send_seq_number:
2603 seq_number = random.randint(0, 0xfffffffe)
2604 else:
2605 seq_number = None
2607 ap_req = self.generate_ap_req(kdc_exchange_dict,
2608 None,
2609 req_body=None,
2610 armor=False,
2611 usage=KU_AP_REQ_AUTH,
2612 seq_number=seq_number)
2614 self.connect(self.host, port=464)
2615 self.assertIsNotNone(self.s)
2617 family = self.s.family
2619 if family == socket.AF_INET:
2620 addr_type = 2 # IPv4
2621 elif family == socket.AF_INET6:
2622 addr_type = 24 # IPv6
2623 else:
2624 self.fail(f'unknown family {family}')
2626 def create_address(ip):
2627 return {
2628 'addr-type': addr_type,
2629 'address': socket.inet_pton(family, ip),
2632 local_ip = self.s.getsockname()[0]
2633 local_address = create_address(local_ip)
2635 # remote_ip = self.s.getpeername()[0]
2636 # remote_address = create_address(remote_ip)
2638 # TODO: due to a bug (?), MIT Kerberos will not accept the request
2639 # unless r-address is set to our _local_ address. Heimdal, on the other
2640 # hand, requires the r-address is set to the remote address (as
2641 # expected). To avoid problems, avoid sending r-address for now.
2642 remote_address = None
2644 msg = self.kpasswd_create(subkey,
2645 user_data,
2646 version,
2647 seq_number,
2648 ap_req,
2649 local_address,
2650 remote_address)
2652 self.send_msg(msg)
2653 rep_pdu = self.recv_pdu_raw()
2655 self._disconnect('transaction done')
2657 self.assertIsNotNone(rep_pdu)
2659 header = rep_pdu[:6]
2660 reply = rep_pdu[6:]
2662 reply_len = (header[0] << 8) | header[1]
2663 reply_version = (header[2] << 8) | header[3]
2664 ap_rep_len = (header[4] << 8) | header[5]
2666 self.assertEqual(reply_len, len(rep_pdu))
2667 self.assertEqual(1, reply_version) # KRB5_KPASSWD_VERS_CHANGEPW
2668 self.assertLess(ap_rep_len, reply_len)
2670 self.assertNotEqual(0x7e, rep_pdu[1])
2671 self.assertNotEqual(0x5e, rep_pdu[1])
2673 if ap_rep_len:
2674 # We received an AP-REQ and KRB-PRIV as a response. This may or may
2675 # not indicate an error, depending on the status code.
2676 ap_rep = reply[:ap_rep_len]
2677 krb_priv = reply[ap_rep_len:]
2679 key = ticket.session_key
2681 ap_rep = self.der_decode(ap_rep, asn1Spec=krb5_asn1.AP_REP())
2682 self.assertElementEqual(ap_rep, 'msg-type', KRB_AP_REP)
2683 enc_part = self.get_enc_part(ap_rep, key, KU_AP_REQ_ENC_PART)
2684 enc_part = self.der_decode(
2685 enc_part, asn1Spec=krb5_asn1.EncAPRepPart())
2687 self.assertElementPresent(enc_part, 'ctime')
2688 self.assertElementPresent(enc_part, 'cusec')
2689 # self.assertElementMissing(enc_part, 'subkey') # TODO
2690 # self.assertElementPresent(enc_part, 'seq-number') # TODO
2692 try:
2693 krb_priv = self.der_decode(krb_priv, asn1Spec=krb5_asn1.KRB_PRIV())
2694 except PyAsn1Error:
2695 self.fail()
2697 self.assertElementEqual(krb_priv, 'msg-type', KRB_PRIV)
2698 priv_enc_part = self.get_enc_part(krb_priv, subkey, KU_KRB_PRIV)
2699 priv_enc_part = self.der_decode(
2700 priv_enc_part, asn1Spec=krb5_asn1.EncKrbPrivPart())
2702 self.assertElementMissing(priv_enc_part, 'timestamp')
2703 self.assertElementMissing(priv_enc_part, 'usec')
2704 # self.assertElementPresent(priv_enc_part, 'seq-number') # TODO
2705 # self.assertElementEqual(priv_enc_part, 's-address', remote_address) # TODO
2706 # self.assertElementMissing(priv_enc_part, 'r-address') # TODO
2708 result_data = priv_enc_part['user-data']
2709 else:
2710 # We received a KRB-ERROR as a response, indicating an error.
2711 krb_error = self.der_decode(reply, asn1Spec=krb5_asn1.KRB_ERROR())
2713 sname = self.PrincipalName_create(
2714 name_type=NT_PRINCIPAL,
2715 names=['kadmin', 'changepw'])
2716 realm = self.get_krbtgt_creds().get_realm().upper()
2718 self.assertElementEqual(krb_error, 'pvno', 5)
2719 self.assertElementEqual(krb_error, 'msg-type', KRB_ERROR)
2720 self.assertElementMissing(krb_error, 'ctime')
2721 self.assertElementMissing(krb_error, 'usec')
2722 self.assertElementPresent(krb_error, 'stime')
2723 self.assertElementPresent(krb_error, 'susec')
2725 error_code = krb_error['error-code']
2726 if isinstance(expected_code, int):
2727 self.assertEqual(error_code, expected_code)
2728 else:
2729 self.assertIn(error_code, expected_code)
2731 self.assertElementMissing(krb_error, 'crealm')
2732 self.assertElementMissing(krb_error, 'cname')
2733 self.assertElementEqual(krb_error, 'realm', realm.encode('utf-8'))
2734 self.assertElementEqualPrincipal(krb_error, 'sname', sname)
2735 self.assertElementMissing(krb_error, 'e-text')
2737 result_data = krb_error['e-data']
2739 status = result_data[:2]
2740 message = result_data[2:]
2742 status_code = (status[0] << 8) | status[1]
2743 if isinstance(expected_code, int):
2744 self.assertEqual(status_code, expected_code)
2745 else:
2746 self.assertIn(status_code, expected_code)
2748 if not message:
2749 self.assertEqual(0, status_code,
2750 'got an error result, but no message')
2751 return
2753 # Check the first character of the message.
2754 if message[0]:
2755 if isinstance(expected_msg, bytes):
2756 self.assertEqual(message, expected_msg)
2757 else:
2758 self.assertIn(message, expected_msg)
2759 else:
2760 # We got AD password policy information.
2761 self.assertEqual(30, len(message))
2763 (empty_bytes,
2764 min_length,
2765 history_length,
2766 properties,
2767 expire_time,
2768 min_age) = struct.unpack('>HIIIQQ', message)
2770 def _generic_kdc_exchange(self,
2771 kdc_exchange_dict, # required
2772 cname=None, # optional
2773 realm=None, # required
2774 sname=None, # optional
2775 from_time=None, # optional
2776 till_time=None, # required
2777 renew_time=None, # optional
2778 etypes=None, # required
2779 addresses=None, # optional
2780 additional_tickets=None, # optional
2781 EncAuthorizationData=None, # optional
2782 EncAuthorizationData_key=None, # optional
2783 EncAuthorizationData_usage=None): # optional
2785 check_error_fn = kdc_exchange_dict['check_error_fn']
2786 check_rep_fn = kdc_exchange_dict['check_rep_fn']
2787 generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
2788 generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
2789 generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
2790 generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
2791 callback_dict = kdc_exchange_dict['callback_dict']
2792 req_msg_type = kdc_exchange_dict['req_msg_type']
2793 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
2794 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2796 expected_error_mode = kdc_exchange_dict['expected_error_mode']
2797 kdc_options = kdc_exchange_dict['kdc_options']
2799 pac_request = kdc_exchange_dict['pac_request']
2800 pac_options = kdc_exchange_dict['pac_options']
2802 # Parameters specific to the inner request body
2803 inner_req = kdc_exchange_dict['inner_req']
2805 # Parameters specific to the outer request body
2806 outer_req = kdc_exchange_dict['outer_req']
2808 if till_time is None:
2809 till_time = self.get_KerberosTime(offset=36000)
2811 if 'nonce' in kdc_exchange_dict:
2812 nonce = kdc_exchange_dict['nonce']
2813 else:
2814 nonce = self.get_Nonce()
2815 kdc_exchange_dict['nonce'] = nonce
2817 req_body = self.KDC_REQ_BODY_create(
2818 kdc_options=kdc_options,
2819 cname=cname,
2820 realm=realm,
2821 sname=sname,
2822 from_time=from_time,
2823 till_time=till_time,
2824 renew_time=renew_time,
2825 nonce=nonce,
2826 etypes=etypes,
2827 addresses=addresses,
2828 additional_tickets=additional_tickets,
2829 EncAuthorizationData=EncAuthorizationData,
2830 EncAuthorizationData_key=EncAuthorizationData_key,
2831 EncAuthorizationData_usage=EncAuthorizationData_usage)
2833 inner_req_body = dict(req_body)
2834 if inner_req is not None:
2835 for key, value in inner_req.items():
2836 if value is not None:
2837 inner_req_body[key] = value
2838 else:
2839 del inner_req_body[key]
2840 if outer_req is not None:
2841 for key, value in outer_req.items():
2842 if value is not None:
2843 req_body[key] = value
2844 else:
2845 del req_body[key]
2847 additional_padata = []
2848 if pac_request is not None:
2849 pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request)
2850 additional_padata.append(pa_pac_request)
2851 if pac_options is not None:
2852 pa_pac_options = self.get_pa_pac_options(pac_options)
2853 additional_padata.append(pa_pac_options)
2855 if req_msg_type == KRB_AS_REQ:
2856 tgs_req = None
2857 tgs_req_padata = None
2858 else:
2859 self.assertEqual(KRB_TGS_REQ, req_msg_type)
2861 tgs_req = self.generate_ap_req(kdc_exchange_dict,
2862 callback_dict,
2863 req_body,
2864 armor=False)
2865 tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
2867 if generate_fast_padata_fn is not None:
2868 self.assertIsNotNone(generate_fast_fn)
2869 # This can alter req_body...
2870 fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
2871 callback_dict,
2872 req_body)
2873 else:
2874 fast_padata = []
2876 if generate_fast_armor_fn is not None:
2877 self.assertIsNotNone(generate_fast_fn)
2878 fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
2879 callback_dict,
2880 None,
2881 armor=True)
2883 fast_armor_type = kdc_exchange_dict['fast_armor_type']
2884 fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
2885 fast_ap_req)
2886 else:
2887 fast_armor = None
2889 if generate_padata_fn is not None:
2890 # This can alter req_body...
2891 outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
2892 callback_dict,
2893 req_body)
2894 self.assertIsNotNone(outer_padata)
2895 self.assertNotIn(PADATA_KDC_REQ,
2896 [pa['padata-type'] for pa in outer_padata],
2897 'Don\'t create TGS-REQ manually')
2898 else:
2899 outer_padata = None
2901 if generate_fast_fn is not None:
2902 armor_key = kdc_exchange_dict['armor_key']
2903 self.assertIsNotNone(armor_key)
2905 if req_msg_type == KRB_AS_REQ:
2906 checksum_blob = self.der_encode(
2907 req_body,
2908 asn1Spec=krb5_asn1.KDC_REQ_BODY())
2909 else:
2910 self.assertEqual(KRB_TGS_REQ, req_msg_type)
2911 checksum_blob = tgs_req
2913 checksum = self.Checksum_create(armor_key,
2914 KU_FAST_REQ_CHKSUM,
2915 checksum_blob)
2917 fast_padata += additional_padata
2918 fast = generate_fast_fn(kdc_exchange_dict,
2919 callback_dict,
2920 inner_req_body,
2921 fast_padata,
2922 fast_armor,
2923 checksum)
2924 else:
2925 fast = None
2927 padata = []
2929 if tgs_req_padata is not None:
2930 padata.append(tgs_req_padata)
2932 if fast is not None:
2933 padata.append(fast)
2935 if outer_padata is not None:
2936 padata += outer_padata
2938 if fast is None:
2939 padata += additional_padata
2941 if not padata:
2942 padata = None
2944 kdc_exchange_dict['req_padata'] = padata
2945 kdc_exchange_dict['fast_padata'] = fast_padata
2946 kdc_exchange_dict['req_body'] = inner_req_body
2948 req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
2949 padata=padata,
2950 req_body=req_body,
2951 asn1Spec=req_asn1Spec())
2953 kdc_exchange_dict['req_obj'] = req_obj
2955 to_rodc = kdc_exchange_dict['to_rodc']
2957 rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc)
2958 self.assertIsNotNone(rep)
2960 msg_type = self.getElementValue(rep, 'msg-type')
2961 self.assertIsNotNone(msg_type)
2963 expected_msg_type = None
2964 if check_error_fn is not None:
2965 expected_msg_type = KRB_ERROR
2966 self.assertIsNone(check_rep_fn)
2967 self.assertNotEqual(0, len(expected_error_mode))
2968 self.assertNotIn(0, expected_error_mode)
2969 if check_rep_fn is not None:
2970 expected_msg_type = rep_msg_type
2971 self.assertIsNone(check_error_fn)
2972 self.assertEqual(0, len(expected_error_mode))
2973 self.assertIsNotNone(expected_msg_type)
2974 if msg_type == KRB_ERROR:
2975 error_code = self.getElementValue(rep, 'error-code')
2976 fail_msg = f'Got unexpected error: {error_code}'
2977 else:
2978 fail_msg = f'Expected to fail with error: {expected_error_mode}'
2979 self.assertEqual(msg_type, expected_msg_type, fail_msg)
2981 if msg_type == KRB_ERROR:
2982 return check_error_fn(kdc_exchange_dict,
2983 callback_dict,
2984 rep)
2986 return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
2988 def as_exchange_dict(self,
2989 creds=None,
2990 client_cert=None,
2991 expected_crealm=None,
2992 expected_cname=None,
2993 expected_anon=False,
2994 expected_srealm=None,
2995 expected_sname=None,
2996 expected_account_name=None,
2997 expected_groups=None,
2998 unexpected_groups=None,
2999 expected_upn_name=None,
3000 expected_sid=None,
3001 expected_requester_sid=None,
3002 expected_domain_sid=None,
3003 expected_supported_etypes=None,
3004 expected_flags=None,
3005 unexpected_flags=None,
3006 ticket_decryption_key=None,
3007 expect_ticket_checksum=None,
3008 expect_full_checksum=None,
3009 generate_fast_fn=None,
3010 generate_fast_armor_fn=None,
3011 generate_fast_padata_fn=None,
3012 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
3013 generate_padata_fn=None,
3014 check_error_fn=None,
3015 check_rep_fn=None,
3016 check_kdc_private_fn=None,
3017 check_patypes=True,
3018 callback_dict=None,
3019 expected_error_mode=0,
3020 expect_status=None,
3021 expected_status=None,
3022 expected_salt=None,
3023 authenticator_subkey=None,
3024 preauth_key=None,
3025 armor_key=None,
3026 armor_tgt=None,
3027 armor_subkey=None,
3028 auth_data=None,
3029 kdc_options='',
3030 inner_req=None,
3031 outer_req=None,
3032 pac_request=None,
3033 pac_options=None,
3034 ap_options=None,
3035 fast_ap_options=None,
3036 strict_edata_checking=True,
3037 using_pkinit=PkInit.NOT_USED,
3038 pk_nonce=None,
3039 expect_edata=None,
3040 expect_pac=True,
3041 expect_client_claims=None,
3042 expect_device_info=None,
3043 expect_device_claims=None,
3044 expect_upn_dns_info_ex=None,
3045 expect_pac_attrs=None,
3046 expect_pac_attrs_pac_request=None,
3047 expect_requester_sid=None,
3048 rc4_support=True,
3049 expected_client_claims=None,
3050 unexpected_client_claims=None,
3051 expected_device_claims=None,
3052 unexpected_device_claims=None,
3053 expect_resource_groups_flag=None,
3054 expected_device_groups=None,
3055 to_rodc=False):
3056 if expected_error_mode == 0:
3057 expected_error_mode = ()
3058 elif not isinstance(expected_error_mode, collections.abc.Container):
3059 expected_error_mode = (expected_error_mode,)
3061 kdc_exchange_dict = {
3062 'req_msg_type': KRB_AS_REQ,
3063 'req_asn1Spec': krb5_asn1.AS_REQ,
3064 'rep_msg_type': KRB_AS_REP,
3065 'rep_asn1Spec': krb5_asn1.AS_REP,
3066 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
3067 'creds': creds,
3068 'client_cert': client_cert,
3069 'expected_crealm': expected_crealm,
3070 'expected_cname': expected_cname,
3071 'expected_anon': expected_anon,
3072 'expected_srealm': expected_srealm,
3073 'expected_sname': expected_sname,
3074 'expected_account_name': expected_account_name,
3075 'expected_groups': expected_groups,
3076 'unexpected_groups': unexpected_groups,
3077 'expected_upn_name': expected_upn_name,
3078 'expected_sid': expected_sid,
3079 'expected_requester_sid': expected_requester_sid,
3080 'expected_domain_sid': expected_domain_sid,
3081 'expected_supported_etypes': expected_supported_etypes,
3082 'expected_flags': expected_flags,
3083 'unexpected_flags': unexpected_flags,
3084 'ticket_decryption_key': ticket_decryption_key,
3085 'expect_ticket_checksum': expect_ticket_checksum,
3086 'expect_full_checksum': expect_full_checksum,
3087 'generate_fast_fn': generate_fast_fn,
3088 'generate_fast_armor_fn': generate_fast_armor_fn,
3089 'generate_fast_padata_fn': generate_fast_padata_fn,
3090 'fast_armor_type': fast_armor_type,
3091 'generate_padata_fn': generate_padata_fn,
3092 'check_error_fn': check_error_fn,
3093 'check_rep_fn': check_rep_fn,
3094 'check_kdc_private_fn': check_kdc_private_fn,
3095 'check_patypes': check_patypes,
3096 'callback_dict': callback_dict,
3097 'expected_error_mode': expected_error_mode,
3098 'expect_status': expect_status,
3099 'expected_status': expected_status,
3100 'expected_salt': expected_salt,
3101 'authenticator_subkey': authenticator_subkey,
3102 'preauth_key': preauth_key,
3103 'armor_key': armor_key,
3104 'armor_tgt': armor_tgt,
3105 'armor_subkey': armor_subkey,
3106 'auth_data': auth_data,
3107 'kdc_options': kdc_options,
3108 'inner_req': inner_req,
3109 'outer_req': outer_req,
3110 'pac_request': pac_request,
3111 'pac_options': pac_options,
3112 'ap_options': ap_options,
3113 'fast_ap_options': fast_ap_options,
3114 'strict_edata_checking': strict_edata_checking,
3115 'using_pkinit': using_pkinit,
3116 'pk_nonce': pk_nonce,
3117 'expect_edata': expect_edata,
3118 'expect_pac': expect_pac,
3119 'expect_client_claims': expect_client_claims,
3120 'expect_device_info': expect_device_info,
3121 'expect_device_claims': expect_device_claims,
3122 'expect_upn_dns_info_ex': expect_upn_dns_info_ex,
3123 'expect_pac_attrs': expect_pac_attrs,
3124 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request,
3125 'expect_requester_sid': expect_requester_sid,
3126 'rc4_support': rc4_support,
3127 'expected_client_claims': expected_client_claims,
3128 'unexpected_client_claims': unexpected_client_claims,
3129 'expected_device_claims': expected_device_claims,
3130 'unexpected_device_claims': unexpected_device_claims,
3131 'expect_resource_groups_flag': expect_resource_groups_flag,
3132 'expected_device_groups': expected_device_groups,
3133 'to_rodc': to_rodc
3135 if callback_dict is None:
3136 callback_dict = {}
3138 return kdc_exchange_dict
3140 def tgs_exchange_dict(self,
3141 creds=None,
3142 expected_crealm=None,
3143 expected_cname=None,
3144 expected_anon=False,
3145 expected_srealm=None,
3146 expected_sname=None,
3147 expected_account_name=None,
3148 expected_groups=None,
3149 unexpected_groups=None,
3150 expected_upn_name=None,
3151 expected_sid=None,
3152 expected_requester_sid=None,
3153 expected_domain_sid=None,
3154 expected_device_domain_sid=None,
3155 expected_supported_etypes=None,
3156 expected_flags=None,
3157 unexpected_flags=None,
3158 ticket_decryption_key=None,
3159 expect_ticket_checksum=None,
3160 expect_full_checksum=None,
3161 generate_fast_fn=None,
3162 generate_fast_armor_fn=None,
3163 generate_fast_padata_fn=None,
3164 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
3165 generate_padata_fn=None,
3166 check_error_fn=None,
3167 check_rep_fn=None,
3168 check_kdc_private_fn=None,
3169 check_patypes=True,
3170 expected_error_mode=0,
3171 expect_status=None,
3172 expected_status=None,
3173 callback_dict=None,
3174 tgt=None,
3175 armor_key=None,
3176 armor_tgt=None,
3177 armor_subkey=None,
3178 authenticator_subkey=None,
3179 auth_data=None,
3180 body_checksum_type=None,
3181 kdc_options='',
3182 inner_req=None,
3183 outer_req=None,
3184 pac_request=None,
3185 pac_options=None,
3186 ap_options=None,
3187 fast_ap_options=None,
3188 strict_edata_checking=True,
3189 expect_edata=None,
3190 expect_pac=True,
3191 expect_client_claims=None,
3192 expect_device_info=None,
3193 expect_device_claims=None,
3194 expect_upn_dns_info_ex=None,
3195 expect_pac_attrs=None,
3196 expect_pac_attrs_pac_request=None,
3197 expect_requester_sid=None,
3198 expected_proxy_target=None,
3199 expected_transited_services=None,
3200 rc4_support=True,
3201 expected_client_claims=None,
3202 unexpected_client_claims=None,
3203 expected_device_claims=None,
3204 unexpected_device_claims=None,
3205 expect_resource_groups_flag=None,
3206 expected_device_groups=None,
3207 to_rodc=False):
3208 if expected_error_mode == 0:
3209 expected_error_mode = ()
3210 elif not isinstance(expected_error_mode, collections.abc.Container):
3211 expected_error_mode = (expected_error_mode,)
3213 kdc_exchange_dict = {
3214 'req_msg_type': KRB_TGS_REQ,
3215 'req_asn1Spec': krb5_asn1.TGS_REQ,
3216 'rep_msg_type': KRB_TGS_REP,
3217 'rep_asn1Spec': krb5_asn1.TGS_REP,
3218 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
3219 'creds': creds,
3220 'expected_crealm': expected_crealm,
3221 'expected_cname': expected_cname,
3222 'expected_anon': expected_anon,
3223 'expected_srealm': expected_srealm,
3224 'expected_sname': expected_sname,
3225 'expected_account_name': expected_account_name,
3226 'expected_groups': expected_groups,
3227 'unexpected_groups': unexpected_groups,
3228 'expected_upn_name': expected_upn_name,
3229 'expected_sid': expected_sid,
3230 'expected_requester_sid': expected_requester_sid,
3231 'expected_domain_sid': expected_domain_sid,
3232 'expected_device_domain_sid': expected_device_domain_sid,
3233 'expected_supported_etypes': expected_supported_etypes,
3234 'expected_flags': expected_flags,
3235 'unexpected_flags': unexpected_flags,
3236 'ticket_decryption_key': ticket_decryption_key,
3237 'expect_ticket_checksum': expect_ticket_checksum,
3238 'expect_full_checksum': expect_full_checksum,
3239 'generate_fast_fn': generate_fast_fn,
3240 'generate_fast_armor_fn': generate_fast_armor_fn,
3241 'generate_fast_padata_fn': generate_fast_padata_fn,
3242 'fast_armor_type': fast_armor_type,
3243 'generate_padata_fn': generate_padata_fn,
3244 'check_error_fn': check_error_fn,
3245 'check_rep_fn': check_rep_fn,
3246 'check_kdc_private_fn': check_kdc_private_fn,
3247 'check_patypes': check_patypes,
3248 'callback_dict': callback_dict,
3249 'expected_error_mode': expected_error_mode,
3250 'expect_status': expect_status,
3251 'expected_status': expected_status,
3252 'tgt': tgt,
3253 'body_checksum_type': body_checksum_type,
3254 'armor_key': armor_key,
3255 'armor_tgt': armor_tgt,
3256 'armor_subkey': armor_subkey,
3257 'auth_data': auth_data,
3258 'authenticator_subkey': authenticator_subkey,
3259 'kdc_options': kdc_options,
3260 'inner_req': inner_req,
3261 'outer_req': outer_req,
3262 'pac_request': pac_request,
3263 'pac_options': pac_options,
3264 'ap_options': ap_options,
3265 'fast_ap_options': fast_ap_options,
3266 'strict_edata_checking': strict_edata_checking,
3267 'expect_edata': expect_edata,
3268 'expect_pac': expect_pac,
3269 'expect_client_claims': expect_client_claims,
3270 'expect_device_info': expect_device_info,
3271 'expect_device_claims': expect_device_claims,
3272 'expect_upn_dns_info_ex': expect_upn_dns_info_ex,
3273 'expect_pac_attrs': expect_pac_attrs,
3274 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request,
3275 'expect_requester_sid': expect_requester_sid,
3276 'expected_proxy_target': expected_proxy_target,
3277 'expected_transited_services': expected_transited_services,
3278 'rc4_support': rc4_support,
3279 'expected_client_claims': expected_client_claims,
3280 'unexpected_client_claims': unexpected_client_claims,
3281 'expected_device_claims': expected_device_claims,
3282 'unexpected_device_claims': unexpected_device_claims,
3283 'expect_resource_groups_flag': expect_resource_groups_flag,
3284 'expected_device_groups': expected_device_groups,
3285 'to_rodc': to_rodc
3287 if callback_dict is None:
3288 callback_dict = {}
3290 return kdc_exchange_dict
3292 def generic_check_kdc_rep(self,
3293 kdc_exchange_dict,
3294 callback_dict,
3295 rep):
3297 expected_crealm = kdc_exchange_dict['expected_crealm']
3298 expected_anon = kdc_exchange_dict['expected_anon']
3299 expected_srealm = kdc_exchange_dict['expected_srealm']
3300 expected_sname = kdc_exchange_dict['expected_sname']
3301 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
3302 check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
3303 rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
3304 msg_type = kdc_exchange_dict['rep_msg_type']
3305 armor_key = kdc_exchange_dict['armor_key']
3307 self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
3308 padata = self.getElementValue(rep, 'padata')
3309 if self.strict_checking:
3310 self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
3311 if self.cname_checking:
3312 if expected_anon:
3313 expected_cname = self.PrincipalName_create(
3314 name_type=NT_WELLKNOWN,
3315 names=['WELLKNOWN', 'ANONYMOUS'])
3316 else:
3317 expected_cname = kdc_exchange_dict['expected_cname']
3318 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
3319 self.assertElementPresent(rep, 'ticket')
3320 ticket = self.getElementValue(rep, 'ticket')
3321 ticket_encpart = None
3322 ticket_cipher = None
3323 self.assertIsNotNone(ticket)
3324 if ticket is not None: # Never None, but gives indentation
3325 self.assertElementEqual(ticket, 'tkt-vno', 5)
3326 self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
3327 self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
3328 self.assertElementPresent(ticket, 'enc-part')
3329 ticket_encpart = self.getElementValue(ticket, 'enc-part')
3330 self.assertIsNotNone(ticket_encpart)
3331 if ticket_encpart is not None: # Never None, but gives indentation
3332 self.assertElementPresent(ticket_encpart, 'etype')
3334 kdc_options = kdc_exchange_dict['kdc_options']
3335 pos = len(tuple(krb5_asn1.KDCOptions('enc-tkt-in-skey'))) - 1
3336 expect_kvno = (pos >= len(kdc_options)
3337 or kdc_options[pos] != '1')
3338 if expect_kvno:
3339 # 'unspecified' means present, with any value != 0
3340 self.assertElementKVNO(ticket_encpart, 'kvno',
3341 self.unspecified_kvno)
3342 else:
3343 # For user-to-user, don't expect a kvno.
3344 self.assertElementMissing(ticket_encpart, 'kvno')
3346 self.assertElementPresent(ticket_encpart, 'cipher')
3347 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
3348 self.assertElementPresent(rep, 'enc-part')
3349 encpart = self.getElementValue(rep, 'enc-part')
3350 encpart_cipher = None
3351 self.assertIsNotNone(encpart)
3352 if encpart is not None: # Never None, but gives indentation
3353 self.assertElementPresent(encpart, 'etype')
3354 self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
3355 self.assertElementPresent(encpart, 'cipher')
3356 encpart_cipher = self.getElementValue(encpart, 'cipher')
3358 if self.padata_checking:
3359 self.check_reply_padata(kdc_exchange_dict,
3360 callback_dict,
3361 encpart,
3362 padata)
3364 ticket_checksum = None
3366 # Get the decryption key for the encrypted part
3367 encpart_decryption_key, encpart_decryption_usage = (
3368 self.get_preauth_key(kdc_exchange_dict))
3370 pa_dict = self.get_pa_dict(padata)
3372 pk_as_rep = pa_dict.get(PADATA_PK_AS_REP)
3373 if pk_as_rep is not None:
3374 pk_as_rep_asn1_spec = krb5_asn1.PA_PK_AS_REP
3375 reply_key_pack_asn1_spec = krb5_asn1.ReplyKeyPack
3376 pk_win2k = False
3377 else:
3378 pk_as_rep = pa_dict.get(PADATA_PK_AS_REP_19)
3379 pk_as_rep_asn1_spec = krb5_asn1.PA_PK_AS_REP_Win2k
3380 reply_key_pack_asn1_spec = krb5_asn1.ReplyKeyPack_Win2k
3381 pk_win2k = True
3382 if pk_as_rep is not None:
3383 pk_as_rep = self.der_decode(pk_as_rep,
3384 asn1Spec=pk_as_rep_asn1_spec())
3386 using_pkinit = kdc_exchange_dict['using_pkinit']
3387 if using_pkinit is PkInit.PUBLIC_KEY:
3388 content_info = self.der_decode(
3389 pk_as_rep['encKeyPack'],
3390 asn1Spec=krb5_asn1.ContentInfo())
3391 self.assertEqual(str(krb5_asn1.id_envelopedData),
3392 content_info['contentType'])
3394 content = self.der_decode(content_info['content'],
3395 asn1Spec=krb5_asn1.EnvelopedData())
3397 self.assertEqual(0, content['version'])
3398 originator_info = content['originatorInfo']
3399 self.assertFalse(originator_info.get('certs'))
3400 self.assertFalse(originator_info.get('crls'))
3401 self.assertFalse(content.get('unprotectedAttrs'))
3403 encrypted_content_info = content['encryptedContentInfo']
3404 recipient_infos = content['recipientInfos']
3406 self.assertEqual(1, len(recipient_infos))
3407 ktri = recipient_infos[0]['ktri']
3409 if self.strict_checking:
3410 self.assertEqual(0, ktri['version'])
3412 private_key = encpart_decryption_key
3413 self.assertIsInstance(private_key,
3414 asymmetric.rsa.RSAPrivateKey)
3416 client_subject_key_id = (
3417 x509.SubjectKeyIdentifier.from_public_key(
3418 private_key.public_key()))
3420 # Check that the client certificate is named as the recipient.
3421 ktri_rid = ktri['rid']
3422 try:
3423 issuer_and_serial_number = ktri_rid[
3424 'issuerAndSerialNumber']
3425 except KeyError:
3426 subject_key_id = ktri_rid['subjectKeyIdentifier']
3427 self.assertEqual(subject_key_id,
3428 client_subject_key_id.digest)
3429 else:
3430 client_certificate = kdc_exchange_dict['client_cert']
3432 self.assertIsNotNone(issuer_and_serial_number['issuer'])
3433 self.assertEqual(issuer_and_serial_number['serialNumber'],
3434 client_certificate.serial_number)
3436 key_encryption_algorithm = ktri['keyEncryptionAlgorithm']
3437 self.assertEqual(str(krb5_asn1.rsaEncryption),
3438 key_encryption_algorithm['algorithm'])
3439 if self.strict_checking:
3440 self.assertEqual(
3441 b'\x05\x00',
3442 key_encryption_algorithm.get('parameters'))
3444 encrypted_key = ktri['encryptedKey']
3446 # Decrypt the key.
3447 pad_len = 256 - len(encrypted_key)
3448 if pad_len:
3449 encrypted_key = bytes(pad_len) + encrypted_key
3450 decrypted_key = private_key.decrypt(
3451 encrypted_key,
3452 padding=asymmetric.padding.PKCS1v15())
3454 self.assertEqual(str(krb5_asn1.id_signedData),
3455 encrypted_content_info['contentType'])
3457 encrypted_content = encrypted_content_info['encryptedContent']
3458 encryption_algorithm = encrypted_content_info[
3459 'contentEncryptionAlgorithm']
3461 cipher_algorithm = self.cipher_from_algorithm(encryption_algorithm['algorithm'])
3463 # This will serve as the IV.
3464 parameters = self.der_decode(
3465 encryption_algorithm['parameters'],
3466 asn1Spec=krb5_asn1.CMSCBCParameter())
3468 # Decrypt the content.
3469 cipher = Cipher(cipher_algorithm(decrypted_key),
3470 modes.CBC(parameters),
3471 default_backend())
3472 decryptor = cipher.decryptor()
3473 decrypted_content = decryptor.update(encrypted_content)
3474 decrypted_content += decryptor.finalize()
3476 # The padding doesn’t fully comply to PKCS7 with a specified
3477 # blocksize, so we must unpad the data ourselves.
3478 decrypted_content = self.unpad(decrypted_content)
3480 signed_data = None
3481 signed_data_rfc2315 = None
3483 first_tag = decrypted_content[0]
3484 if first_tag == 0x30: # ASN.1 SEQUENCE tag
3485 signed_data = decrypted_content
3486 else:
3487 # Windows encodes the ASN.1 incorrectly, neglecting to add
3488 # the SEQUENCE tag. We’ll have to prepend it ourselves in
3489 # order for the decoding to work.
3490 encoded_len = self.asn1_length(decrypted_content)
3491 decrypted_content = bytes([0x30]) + encoded_len + (
3492 decrypted_content)
3494 if first_tag == 0x02: # ASN.1 INTEGER tag
3496 # The INTEGER tag indicates that the data is encoded
3497 # with the earlier variant of the SignedData ASN.1
3498 # schema specified in RFC2315, as per [MS-PKCA] 2.2.4
3499 # (PA-PK-AS-REP).
3500 signed_data_rfc2315 = decrypted_content
3502 elif first_tag == 0x06: # ASN.1 OBJECT IDENTIFIER tag
3504 # The OBJECT IDENTIFIER tag indicates that the data is
3505 # encoded as SignedData and wrapped in a ContentInfo
3506 # structure, which we shall have to decode first. This
3507 # seems to be the case when the supportedCMSTypes field
3508 # in the client’s AuthPack is missing or empty.
3510 content_info = self.der_decode(
3511 decrypted_content,
3512 asn1Spec=krb5_asn1.ContentInfo())
3513 self.assertEqual(str(krb5_asn1.id_signedData),
3514 content_info['contentType'])
3515 signed_data = content_info['content']
3516 else:
3517 self.fail(f'got reply with unknown initial tag '
3518 f'({first_tag})')
3520 if signed_data is not None:
3521 signed_data = self.der_decode(
3522 signed_data, asn1Spec=krb5_asn1.SignedData())
3524 encap_content_info = signed_data['encapContentInfo']
3526 content_type = encap_content_info['eContentType']
3527 content = encap_content_info['eContent']
3528 elif signed_data_rfc2315 is not None:
3529 signed_data = self.der_decode(
3530 signed_data_rfc2315,
3531 asn1Spec=krb5_asn1.SignedData_RFC2315())
3533 encap_content_info = signed_data['contentInfo']
3535 content_type = encap_content_info['contentType']
3536 content = self.der_decode(
3537 encap_content_info['content'],
3538 asn1Spec=pyasn1.type.univ.OctetString())
3539 else:
3540 self.fail('we must have got SignedData')
3542 self.assertEqual(str(krb5_asn1.id_pkinit_rkeyData),
3543 content_type)
3544 reply_key_pack = self.der_decode(
3545 content, asn1Spec=reply_key_pack_asn1_spec())
3547 req_obj = kdc_exchange_dict['req_obj']
3548 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
3549 req_obj = self.der_encode(req_obj,
3550 asn1Spec=req_asn1Spec())
3552 reply_key = reply_key_pack['replyKey']
3554 # Reply the encpart decryption key with the decrypted key from
3555 # the reply.
3556 encpart_decryption_key = self.SessionKey_create(
3557 etype=reply_key['keytype'],
3558 contents=reply_key['keyvalue'],
3559 kvno=None)
3561 if not pk_win2k:
3562 as_checksum = reply_key_pack['asChecksum']
3564 # Verify the checksum over the AS request body.
3565 kcrypto.verify_checksum(as_checksum['cksumtype'],
3566 encpart_decryption_key.key,
3567 KU_PKINIT_AS_REQ,
3568 req_obj,
3569 as_checksum['checksum'])
3570 elif using_pkinit is PkInit.DIFFIE_HELLMAN:
3571 content_info = self.der_decode(
3572 pk_as_rep['dhInfo']['dhSignedData'],
3573 asn1Spec=krb5_asn1.ContentInfo())
3574 self.assertEqual(str(krb5_asn1.id_signedData),
3575 content_info['contentType'])
3577 signed_data = self.der_decode(content_info['content'],
3578 asn1Spec=krb5_asn1.SignedData())
3580 encap_content_info = signed_data['encapContentInfo']
3581 content = encap_content_info['eContent']
3583 self.assertEqual(str(krb5_asn1.id_pkinit_DHKeyData),
3584 encap_content_info['eContentType'])
3586 dh_key_info = self.der_decode(
3587 content, asn1Spec=krb5_asn1.KDCDHKeyInfo())
3589 self.assertNotIn('dhKeyExpiration', dh_key_info)
3591 dh_private_key = encpart_decryption_key
3592 self.assertIsInstance(dh_private_key,
3593 asymmetric.dh.DHPrivateKey)
3595 self.assertElementEqual(dh_key_info, 'nonce',
3596 kdc_exchange_dict['pk_nonce'])
3598 dh_public_key_data = self.bytes_from_bit_string(
3599 dh_key_info['subjectPublicKey'])
3600 dh_public_key_decoded = self.der_decode(
3601 dh_public_key_data, asn1Spec=krb5_asn1.DHPublicKey())
3603 dh_numbers = dh_private_key.parameters().parameter_numbers()
3605 public_numbers = asymmetric.dh.DHPublicNumbers(
3606 dh_public_key_decoded, dh_numbers)
3607 dh_public_key = public_numbers.public_key(default_backend())
3609 # Perform the Diffie-Hellman key exchange.
3610 shared_secret = dh_private_key.exchange(dh_public_key)
3612 # Pad the shared secret out to the length of ‘p’.
3613 p_len = self.length_in_bytes(dh_numbers.p)
3614 padding_len = p_len - len(shared_secret)
3615 self.assertGreaterEqual(padding_len, 0)
3616 padded_shared_secret = bytes(padding_len) + shared_secret
3618 reply_key_enc_type = self.expected_etype(kdc_exchange_dict)
3620 # At the moment, we don’t specify a nonce in the request, so we
3621 # can assume these are empty.
3622 client_nonce = b''
3623 server_nonce = b''
3625 ciphertext = padded_shared_secret + client_nonce + server_nonce
3627 # Replace the encpart decryption key with the key derived from
3628 # the Diffie-Hellman key exchange.
3629 encpart_decryption_key = self.octetstring2key(
3630 ciphertext, reply_key_enc_type)
3631 else:
3632 self.fail(f'invalid value for using_pkinit: {using_pkinit}')
3634 self.assertEqual(3, signed_data['version'])
3636 digest_algorithms = signed_data['digestAlgorithms']
3637 self.assertEqual(1, len(digest_algorithms))
3638 digest_algorithm = digest_algorithms[0]
3639 # Ensure the hash algorithm is valid.
3640 _ = self.hash_from_algorithm_id(digest_algorithm)
3642 self.assertFalse(signed_data.get('crls'))
3644 signer_infos = signed_data['signerInfos']
3645 self.assertEqual(1, len(signer_infos))
3646 signer_info = signer_infos[0]
3648 self.assertEqual(1, signer_info['version'])
3650 # Get the certificate presented by the KDC.
3651 kdc_certificates = signed_data['certificates']
3652 self.assertEqual(1, len(kdc_certificates))
3653 kdc_certificate = self.der_encode(
3654 kdc_certificates[0], asn1Spec=krb5_asn1.CertificateChoices())
3655 kdc_certificate = x509.load_der_x509_certificate(kdc_certificate,
3656 default_backend())
3658 # Verify that the KDC’s certificate is named as the signer.
3659 sid = signer_info['sid']
3660 try:
3661 issuer_and_serial_number = sid['issuerAndSerialNumber']
3662 except KeyError:
3663 extension = kdc_certificate.extensions.get_extension_for_oid(
3664 x509.oid.ExtensionOID.SUBJECT_KEY_IDENTIFIER)
3665 cert_subject_key_id = extension.value.digest
3666 self.assertEqual(sid['subjectKeyIdentifier'], cert_subject_key_id)
3667 else:
3668 self.assertIsNotNone(issuer_and_serial_number['issuer'])
3669 self.assertEqual(issuer_and_serial_number['serialNumber'],
3670 kdc_certificate.serial_number)
3672 digest_algorithm = signer_info['digestAlgorithm']
3673 digest_hash_fn = self.hash_from_algorithm_id(digest_algorithm)
3675 signed_attrs = signer_info['signedAttrs']
3676 self.assertEqual(2, len(signed_attrs))
3678 signed_attr0 = signed_attrs[0]
3679 self.assertEqual(str(krb5_asn1.id_contentType),
3680 signed_attr0['attrType'])
3681 signed_attr0_values = signed_attr0['attrValues']
3682 self.assertEqual(1, len(signed_attr0_values))
3683 signed_attr0_value = self.der_decode(
3684 signed_attr0_values[0],
3685 asn1Spec=krb5_asn1.ContentType())
3686 if using_pkinit is PkInit.DIFFIE_HELLMAN:
3687 self.assertEqual(str(krb5_asn1.id_pkinit_DHKeyData),
3688 signed_attr0_value)
3689 else:
3690 self.assertEqual(str(krb5_asn1.id_pkinit_rkeyData),
3691 signed_attr0_value)
3693 signed_attr1 = signed_attrs[1]
3694 self.assertEqual(str(krb5_asn1.id_messageDigest),
3695 signed_attr1['attrType'])
3696 signed_attr1_values = signed_attr1['attrValues']
3697 self.assertEqual(1, len(signed_attr1_values))
3698 message_digest = self.der_decode(signed_attr1_values[0],
3699 krb5_asn1.MessageDigest())
3701 signature_algorithm = signer_info['signatureAlgorithm']
3702 hash_fn = self.hash_from_algorithm_id(signature_algorithm)
3704 # Compute the hash of the content to be signed. With the
3705 # Diffie-Hellman key exchange, this signature is over the type
3706 # KDCDHKeyInfo; otherwise, it is over the type ReplyKeyPack.
3707 digest = hashes.Hash(digest_hash_fn(), default_backend())
3708 digest.update(content)
3709 digest = digest.finalize()
3711 # Verify the hash. Note: this is a non–constant time comparison.
3712 self.assertEqual(digest, message_digest)
3714 # Re-encode the attributes ready for verifying the signature.
3715 cms_attrs = self.der_encode(signed_attrs,
3716 asn1Spec=krb5_asn1.CMSAttributes())
3718 # Verify the signature.
3719 kdc_public_key = kdc_certificate.public_key()
3720 kdc_public_key.verify(
3721 signer_info['signature'],
3722 cms_attrs,
3723 asymmetric.padding.PKCS1v15(),
3724 hash_fn())
3726 self.assertFalse(signer_info.get('unsignedAttrs'))
3728 if armor_key is not None:
3729 if PADATA_FX_FAST in pa_dict:
3730 fx_fast_data = pa_dict[PADATA_FX_FAST]
3731 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
3732 fx_fast_data,
3733 armor_key,
3734 finished=True)
3736 if 'strengthen-key' in fast_response:
3737 strengthen_key = self.EncryptionKey_import(
3738 fast_response['strengthen-key'])
3739 encpart_decryption_key = (
3740 self.generate_strengthen_reply_key(
3741 strengthen_key,
3742 encpart_decryption_key))
3744 fast_finished = fast_response.get('finished')
3745 if fast_finished is not None:
3746 ticket_checksum = fast_finished['ticket-checksum']
3748 self.check_rep_padata(kdc_exchange_dict,
3749 callback_dict,
3750 fast_response['padata'],
3751 error_code=0)
3753 ticket_private = None
3754 if ticket_decryption_key is not None:
3755 self.assertElementEqual(ticket_encpart, 'etype',
3756 ticket_decryption_key.etype)
3757 self.assertElementKVNO(ticket_encpart, 'kvno',
3758 ticket_decryption_key.kvno)
3759 ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
3760 ticket_cipher)
3761 ticket_private = self.der_decode(
3762 ticket_decpart,
3763 asn1Spec=krb5_asn1.EncTicketPart())
3765 encpart_private = None
3766 self.assertIsNotNone(encpart_decryption_key)
3767 if encpart_decryption_key is not None:
3768 self.assertElementEqual(encpart, 'etype',
3769 encpart_decryption_key.etype)
3770 if self.strict_checking:
3771 self.assertElementKVNO(encpart, 'kvno',
3772 encpart_decryption_key.kvno)
3773 rep_decpart = encpart_decryption_key.decrypt(
3774 encpart_decryption_usage,
3775 encpart_cipher)
3776 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
3777 # application tag 26
3778 try:
3779 encpart_private = self.der_decode(
3780 rep_decpart,
3781 asn1Spec=rep_encpart_asn1Spec())
3782 except Exception:
3783 encpart_private = self.der_decode(
3784 rep_decpart,
3785 asn1Spec=krb5_asn1.EncTGSRepPart())
3787 kdc_exchange_dict['reply_key'] = encpart_decryption_key
3789 self.assertIsNotNone(check_kdc_private_fn)
3790 if check_kdc_private_fn is not None:
3791 check_kdc_private_fn(kdc_exchange_dict, callback_dict,
3792 rep, ticket_private, encpart_private,
3793 ticket_checksum)
3795 return rep
3797 def check_fx_fast_data(self,
3798 kdc_exchange_dict,
3799 fx_fast_data,
3800 armor_key,
3801 finished=False,
3802 expect_strengthen_key=True):
3803 fx_fast_data = self.der_decode(fx_fast_data,
3804 asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
3806 enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
3807 self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
3809 fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
3811 fast_response = self.der_decode(fast_rep,
3812 asn1Spec=krb5_asn1.KrbFastResponse())
3814 if expect_strengthen_key and self.strict_checking:
3815 self.assertIn('strengthen-key', fast_response)
3817 if finished:
3818 self.assertIn('finished', fast_response)
3820 # Ensure that the nonce matches the nonce in the body of the request
3821 # (RFC6113 5.4.3).
3822 nonce = kdc_exchange_dict['nonce']
3823 self.assertEqual(nonce, fast_response['nonce'])
3825 return fast_response
3827 def generic_check_kdc_private(self,
3828 kdc_exchange_dict,
3829 callback_dict,
3830 rep,
3831 ticket_private,
3832 encpart_private,
3833 ticket_checksum):
3834 kdc_options = kdc_exchange_dict['kdc_options']
3835 canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
3836 canonicalize = (canon_pos < len(kdc_options)
3837 and kdc_options[canon_pos] == '1')
3838 renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1
3839 renewable = (renewable_pos < len(kdc_options)
3840 and kdc_options[renewable_pos] == '1')
3841 renew_pos = len(tuple(krb5_asn1.KDCOptions('renew'))) - 1
3842 renew = (renew_pos < len(kdc_options)
3843 and kdc_options[renew_pos] == '1')
3844 expect_renew_till = renewable or renew
3846 expected_crealm = kdc_exchange_dict['expected_crealm']
3847 expected_cname = kdc_exchange_dict['expected_cname']
3848 expected_srealm = kdc_exchange_dict['expected_srealm']
3849 expected_sname = kdc_exchange_dict['expected_sname']
3850 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
3852 rep_msg_type = kdc_exchange_dict['rep_msg_type']
3854 expected_flags = kdc_exchange_dict.get('expected_flags')
3855 unexpected_flags = kdc_exchange_dict.get('unexpected_flags')
3857 ticket = self.getElementValue(rep, 'ticket')
3859 if ticket_checksum is not None:
3860 armor_key = kdc_exchange_dict['armor_key']
3861 self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
3863 to_rodc = kdc_exchange_dict['to_rodc']
3864 if to_rodc:
3865 krbtgt_creds = self.get_rodc_krbtgt_creds()
3866 else:
3867 krbtgt_creds = self.get_krbtgt_creds()
3868 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
3870 krbtgt_keys = [krbtgt_key]
3871 if not self.strict_checking:
3872 krbtgt_key_rc4 = self.TicketDecryptionKey_from_creds(
3873 krbtgt_creds,
3874 etype=kcrypto.Enctype.RC4)
3875 krbtgt_keys.append(krbtgt_key_rc4)
3877 if self.expect_pac and self.is_tgs(expected_sname):
3878 expect_pac = True
3879 else:
3880 expect_pac = kdc_exchange_dict['expect_pac']
3882 ticket_session_key = None
3883 if ticket_private is not None:
3884 self.assertElementFlags(ticket_private, 'flags',
3885 expected_flags,
3886 unexpected_flags)
3887 self.assertElementPresent(ticket_private, 'key')
3888 ticket_key = self.getElementValue(ticket_private, 'key')
3889 self.assertIsNotNone(ticket_key)
3890 if ticket_key is not None: # Never None, but gives indentation
3891 self.assertElementPresent(ticket_key, 'keytype')
3892 self.assertElementPresent(ticket_key, 'keyvalue')
3893 ticket_session_key = self.EncryptionKey_import(ticket_key)
3894 self.assertElementEqualUTF8(ticket_private, 'crealm',
3895 expected_crealm)
3896 if self.cname_checking:
3897 self.assertElementEqualPrincipal(ticket_private, 'cname',
3898 expected_cname)
3899 self.assertElementPresent(ticket_private, 'transited')
3900 self.assertElementPresent(ticket_private, 'authtime')
3901 if self.strict_checking:
3902 self.assertElementPresent(ticket_private, 'starttime')
3903 self.assertElementPresent(ticket_private, 'endtime')
3904 if self.strict_checking:
3905 if expect_renew_till:
3906 self.assertElementPresent(ticket_private, 'renew-till')
3907 else:
3908 self.assertElementMissing(ticket_private, 'renew-till')
3909 if self.strict_checking:
3910 self.assertElementMissing(ticket_private, 'caddr')
3911 if expect_pac is not None:
3912 if expect_pac:
3913 self.assertElementPresent(ticket_private,
3914 'authorization-data',
3915 expect_empty=not expect_pac)
3916 else:
3917 # It is more correct to not have an authorization-data
3918 # present than an empty one.
3920 # https://github.com/krb5/krb5/pull/1225#issuecomment-995104193
3921 v = self.getElementValue(ticket_private,
3922 'authorization-data')
3923 if v is not None:
3924 self.assertElementPresent(ticket_private,
3925 'authorization-data',
3926 expect_empty=True)
3928 encpart_session_key = None
3929 if encpart_private is not None:
3930 self.assertElementPresent(encpart_private, 'key')
3931 encpart_key = self.getElementValue(encpart_private, 'key')
3932 self.assertIsNotNone(encpart_key)
3933 if encpart_key is not None: # Never None, but gives indentation
3934 self.assertElementPresent(encpart_key, 'keytype')
3935 self.assertElementPresent(encpart_key, 'keyvalue')
3936 encpart_session_key = self.EncryptionKey_import(encpart_key)
3937 self.assertElementPresent(encpart_private, 'last-req')
3938 expected_nonce = kdc_exchange_dict.get('pk_nonce')
3939 if not expected_nonce:
3940 expected_nonce = kdc_exchange_dict['nonce']
3941 self.assertElementEqual(encpart_private, 'nonce',
3942 expected_nonce)
3943 if rep_msg_type == KRB_AS_REP:
3944 if self.strict_checking:
3945 self.assertElementPresent(encpart_private,
3946 'key-expiration')
3947 else:
3948 self.assertElementMissing(encpart_private,
3949 'key-expiration')
3950 self.assertElementFlags(encpart_private, 'flags',
3951 expected_flags,
3952 unexpected_flags)
3953 self.assertElementPresent(encpart_private, 'authtime')
3954 if self.strict_checking:
3955 self.assertElementPresent(encpart_private, 'starttime')
3956 self.assertElementPresent(encpart_private, 'endtime')
3957 if self.strict_checking:
3958 if expect_renew_till:
3959 self.assertElementPresent(encpart_private, 'renew-till')
3960 else:
3961 self.assertElementMissing(encpart_private, 'renew-till')
3962 self.assertElementEqualUTF8(encpart_private, 'srealm',
3963 expected_srealm)
3964 self.assertElementEqualPrincipal(encpart_private, 'sname',
3965 expected_sname)
3966 if self.strict_checking:
3967 self.assertElementMissing(encpart_private, 'caddr')
3969 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
3971 sent_enc_pa_rep = self.sent_enc_pa_rep(kdc_exchange_dict)
3973 enc_padata = self.getElementValue(encpart_private,
3974 'encrypted-pa-data')
3975 if (canonicalize or '1' in sent_pac_options or (
3976 rep_msg_type == KRB_AS_REP and sent_enc_pa_rep)):
3977 if self.strict_checking:
3978 self.assertIsNotNone(enc_padata)
3980 if enc_padata is not None:
3981 enc_pa_dict = self.get_pa_dict(enc_padata)
3982 if self.strict_checking:
3983 if canonicalize:
3984 self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
3985 else:
3986 self.assertNotIn(PADATA_SUPPORTED_ETYPES,
3987 enc_pa_dict)
3989 if '1' in sent_pac_options:
3990 self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
3991 else:
3992 self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
3994 if rep_msg_type == KRB_AS_REP and sent_enc_pa_rep:
3995 self.assertIn(PADATA_REQ_ENC_PA_REP, enc_pa_dict)
3996 else:
3997 self.assertNotIn(PADATA_REQ_ENC_PA_REP, enc_pa_dict)
3999 if PADATA_SUPPORTED_ETYPES in enc_pa_dict:
4000 expected_supported_etypes = kdc_exchange_dict[
4001 'expected_supported_etypes']
4003 (supported_etypes,) = struct.unpack(
4004 '<L',
4005 enc_pa_dict[PADATA_SUPPORTED_ETYPES])
4007 ignore_bits = (security.KERB_ENCTYPE_DES_CBC_CRC |
4008 security.KERB_ENCTYPE_DES_CBC_MD5)
4010 self.assertEqual(
4011 supported_etypes & ~ignore_bits,
4012 expected_supported_etypes & ~ignore_bits,
4013 f'PADATA_SUPPORTED_ETYPES: got: {supported_etypes} (0x{supported_etypes:X}), '
4014 f'expected: {expected_supported_etypes} (0x{expected_supported_etypes:X})')
4016 if PADATA_PAC_OPTIONS in enc_pa_dict:
4017 pac_options = self.der_decode(
4018 enc_pa_dict[PADATA_PAC_OPTIONS],
4019 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
4021 self.assertElementEqual(pac_options, 'options',
4022 sent_pac_options)
4024 if PADATA_REQ_ENC_PA_REP in enc_pa_dict:
4025 enc_pa_rep = enc_pa_dict[PADATA_REQ_ENC_PA_REP]
4027 enc_pa_rep = self.der_decode(
4028 enc_pa_rep,
4029 asn1Spec=krb5_asn1.Checksum())
4031 reply_key = kdc_exchange_dict['reply_key']
4032 req_obj = kdc_exchange_dict['req_obj']
4033 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
4035 req_obj = self.der_encode(req_obj,
4036 asn1Spec=req_asn1Spec())
4038 checksum = enc_pa_rep['checksum']
4039 ctype = enc_pa_rep['cksumtype']
4041 reply_key.verify_checksum(KU_AS_REQ,
4042 req_obj,
4043 ctype,
4044 checksum)
4045 else:
4046 if enc_padata is not None:
4047 self.assertEqual(enc_padata, [])
4049 if ticket_session_key is not None and encpart_session_key is not None:
4050 self.assertEqual(ticket_session_key.etype,
4051 encpart_session_key.etype)
4052 self.assertEqual(ticket_session_key.key.contents,
4053 encpart_session_key.key.contents)
4054 if encpart_session_key is not None:
4055 session_key = encpart_session_key
4056 else:
4057 session_key = ticket_session_key
4058 ticket_creds = KerberosTicketCreds(
4059 ticket,
4060 session_key,
4061 crealm=expected_crealm,
4062 cname=expected_cname,
4063 srealm=expected_srealm,
4064 sname=expected_sname,
4065 decryption_key=ticket_decryption_key,
4066 ticket_private=ticket_private,
4067 encpart_private=encpart_private)
4069 if ticket_private is not None:
4070 pac_data = self.get_ticket_pac(ticket_creds, expect_pac=expect_pac)
4071 if expect_pac is True:
4072 self.assertIsNotNone(pac_data)
4073 elif expect_pac is False:
4074 self.assertIsNone(pac_data)
4076 if pac_data is not None:
4077 self.check_pac_buffers(pac_data, kdc_exchange_dict)
4079 expect_ticket_checksum = kdc_exchange_dict['expect_ticket_checksum']
4080 expect_full_checksum = kdc_exchange_dict['expect_full_checksum']
4081 if expect_ticket_checksum or expect_full_checksum:
4082 self.assertIsNotNone(ticket_decryption_key)
4084 if ticket_decryption_key is not None:
4085 service_ticket = (rep_msg_type == KRB_TGS_REP
4086 and not self.is_tgs_principal(expected_sname))
4087 self.verify_ticket(ticket_creds, krbtgt_keys,
4088 service_ticket=service_ticket,
4089 expect_pac=expect_pac,
4090 expect_ticket_checksum=expect_ticket_checksum
4091 or self.tkt_sig_support,
4092 expect_full_checksum=expect_full_checksum
4093 or self.full_sig_support)
4095 kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
4097 # Check the SIDs in a LOGON_INFO PAC buffer.
4098 def check_logon_info_sids(self, logon_info_buffer, kdc_exchange_dict):
4099 info3 = logon_info_buffer.info.info.info3
4100 logon_info = info3.base
4101 resource_groups = logon_info_buffer.info.info.resource_groups
4103 expected_groups = kdc_exchange_dict['expected_groups']
4104 unexpected_groups = kdc_exchange_dict['unexpected_groups']
4105 expected_domain_sid = kdc_exchange_dict['expected_domain_sid']
4106 expected_sid = kdc_exchange_dict['expected_sid']
4108 domain_sid = logon_info.domain_sid
4109 if expected_domain_sid is not None:
4110 self.assertEqual(expected_domain_sid, str(domain_sid))
4112 if expected_sid is not None:
4113 got_sid = f'{domain_sid}-{logon_info.rid}'
4114 self.assertEqual(expected_sid, got_sid)
4116 if expected_groups is None and unexpected_groups is None:
4117 # Nothing more to do.
4118 return
4120 # Check the SIDs in the PAC.
4122 # Form a representation of the PAC, containing at first the primary
4123 # GID.
4124 primary_sid = f'{domain_sid}-{logon_info.primary_gid}'
4125 pac_sids = {
4126 (primary_sid, self.SidType.PRIMARY_GID, None),
4129 # Collect the Extra SIDs.
4130 if info3.sids is not None:
4131 self.assertTrue(logon_info.user_flags & (
4132 netlogon.NETLOGON_EXTRA_SIDS),
4133 'extra SIDs present, but EXTRA_SIDS flag not set')
4134 self.assertTrue(info3.sids, 'got empty SIDs')
4136 for sid_attr in info3.sids:
4137 got_sid = str(sid_attr.sid)
4138 if unexpected_groups is not None:
4139 self.assertNotIn(got_sid, unexpected_groups)
4141 pac_sid = (got_sid,
4142 self.SidType.EXTRA_SID,
4143 sid_attr.attributes)
4144 self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID')
4145 pac_sids.add(pac_sid)
4146 else:
4147 self.assertFalse(logon_info.user_flags & (
4148 netlogon.NETLOGON_EXTRA_SIDS),
4149 'no extra SIDs present, but EXTRA_SIDS flag set')
4151 # Collect the Base RIDs.
4152 if logon_info.groups.rids is not None:
4153 self.assertTrue(logon_info.groups.rids, 'got empty RIDs')
4155 for group in logon_info.groups.rids:
4156 got_sid = f'{domain_sid}-{group.rid}'
4157 if unexpected_groups is not None:
4158 self.assertNotIn(got_sid, unexpected_groups)
4160 pac_sid = (got_sid, self.SidType.BASE_SID, group.attributes)
4161 self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID')
4162 pac_sids.add(pac_sid)
4164 # Collect the Resource SIDs.
4165 expect_resource_groups_flag = kdc_exchange_dict[
4166 'expect_resource_groups_flag']
4167 expect_set_reason = ''
4168 expect_reset_reason = ''
4169 if expect_resource_groups_flag is None:
4170 expect_resource_groups_flag = (
4171 resource_groups.groups.rids is not None)
4172 expect_set_reason = 'resource groups present, but '
4173 expect_reset_reason = 'no resource groups present, but '
4175 if expect_resource_groups_flag:
4176 self.assertTrue(
4177 logon_info.user_flags & netlogon.NETLOGON_RESOURCE_GROUPS,
4178 f'{expect_set_reason}RESOURCE_GROUPS flag unexpectedly reset')
4179 else:
4180 self.assertFalse(
4181 logon_info.user_flags & netlogon.NETLOGON_RESOURCE_GROUPS,
4182 f'{expect_reset_reason}RESOURCE_GROUPS flag unexpectedly set')
4184 if resource_groups.groups.rids is not None:
4185 self.assertTrue(resource_groups.groups.rids, 'got empty RIDs')
4187 resource_group_sid = resource_groups.domain_sid
4188 for resource_group in resource_groups.groups.rids:
4189 got_sid = f'{resource_group_sid}-{resource_group.rid}'
4190 if unexpected_groups is not None:
4191 self.assertNotIn(got_sid, unexpected_groups)
4193 pac_sid = (got_sid,
4194 self.SidType.RESOURCE_SID,
4195 resource_group.attributes)
4196 self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID')
4197 pac_sids.add(pac_sid)
4199 # Compare the aggregated SIDs against the set of expected SIDs.
4200 if expected_groups is not None:
4201 if ... in expected_groups:
4202 # The caller is only interested in asserting the
4203 # presence of particular groups, and doesn't mind if
4204 # other groups are present as well.
4205 pac_sids.add(...)
4206 self.assertLessEqual(expected_groups, pac_sids,
4207 'expected groups')
4208 else:
4209 # The caller wants to make sure the groups match
4210 # exactly.
4211 self.assertEqual(expected_groups, pac_sids,
4212 'expected != got')
4214 def check_device_info(self, device_info, kdc_exchange_dict):
4215 armor_tgt = kdc_exchange_dict['armor_tgt']
4216 armor_auth_data = armor_tgt.ticket_private.get(
4217 'authorization-data')
4218 self.assertIsNotNone(armor_auth_data,
4219 'missing authdata for armor TGT')
4220 armor_pac_data = self.get_pac(armor_auth_data)
4221 armor_pac = ndr_unpack(krb5pac.PAC_DATA, armor_pac_data)
4222 for armor_pac_buffer in armor_pac.buffers:
4223 if armor_pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
4224 armor_info = armor_pac_buffer.info.info.info3
4225 break
4226 else:
4227 self.fail('missing logon info for armor PAC')
4228 self.assertEqual(armor_info.base.rid, device_info.rid)
4230 device_domain_sid = kdc_exchange_dict['expected_device_domain_sid']
4231 expected_device_groups = kdc_exchange_dict['expected_device_groups']
4232 if kdc_exchange_dict['expect_device_info']:
4233 self.assertIsNotNone(device_domain_sid)
4234 self.assertIsNotNone(expected_device_groups)
4236 if device_domain_sid is not None:
4237 self.assertEqual(device_domain_sid, str(device_info.domain_sid))
4238 else:
4239 device_domain_sid = str(device_info.domain_sid)
4241 # Check the device info SIDs.
4243 # A representation of the device info groups.
4244 primary_sid = f'{device_domain_sid}-{device_info.primary_gid}'
4245 got_sids = {
4246 (primary_sid, self.SidType.PRIMARY_GID, None),
4249 # Collect the groups.
4250 if device_info.groups.rids is not None:
4251 self.assertTrue(device_info.groups.rids, 'got empty RIDs')
4253 for group in device_info.groups.rids:
4254 got_sid = f'{device_domain_sid}-{group.rid}'
4256 device_sid = (got_sid, self.SidType.BASE_SID, group.attributes)
4257 self.assertNotIn(device_sid, got_sids, 'got duplicated SID')
4258 got_sids.add(device_sid)
4260 # Collect the SIDs.
4261 if device_info.sids is not None:
4262 self.assertTrue(device_info.sids, 'got empty SIDs')
4264 for sid_attr in device_info.sids:
4265 got_sid = str(sid_attr.sid)
4267 in_a_domain = sid_attr.sid.num_auths == 5 and (
4268 str(sid_attr.sid).startswith('S-1-5-21-'))
4269 self.assertFalse(in_a_domain,
4270 f'got unexpected SID for domain: {got_sid} '
4271 f'(should be in device_info.domain_groups)')
4273 device_sid = (got_sid,
4274 self.SidType.EXTRA_SID,
4275 sid_attr.attributes)
4276 self.assertNotIn(device_sid, got_sids, 'got duplicated SID')
4277 got_sids.add(device_sid)
4279 # Collect the domain groups.
4280 if device_info.domain_groups is not None:
4281 self.assertTrue(device_info.domain_groups, 'got empty domain groups')
4283 for domain_group in device_info.domain_groups:
4284 self.assertTrue(domain_group, 'got empty domain group')
4286 got_domain_sids = set()
4288 resource_group_sid = domain_group.domain_sid
4290 in_a_domain = resource_group_sid.num_auths == 4 and (
4291 str(resource_group_sid).startswith('S-1-5-21-'))
4292 self.assertTrue(
4293 in_a_domain,
4294 f'got unexpected domain SID for non-domain: {resource_group_sid} '
4295 f'(should be in device_info.sids)')
4297 for resource_group in domain_group.groups.rids:
4298 got_sid = f'{resource_group_sid}-{resource_group.rid}'
4300 device_sid = (got_sid,
4301 self.SidType.RESOURCE_SID,
4302 resource_group.attributes)
4303 self.assertNotIn(device_sid, got_domain_sids, 'got duplicated SID')
4304 got_domain_sids.add(device_sid)
4306 got_domain_sids = frozenset(got_domain_sids)
4307 self.assertNotIn(got_domain_sids, got_sids)
4308 got_sids.add(got_domain_sids)
4310 # Compare the aggregated device SIDs against the set of expected device
4311 # SIDs.
4312 if expected_device_groups is not None:
4313 self.assertEqual(expected_device_groups, got_sids,
4314 'expected != got')
4316 def check_pac_buffers(self, pac_data, kdc_exchange_dict):
4317 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
4319 rep_msg_type = kdc_exchange_dict['rep_msg_type']
4320 armor_tgt = kdc_exchange_dict['armor_tgt']
4322 compound_id = rep_msg_type == KRB_TGS_REP and armor_tgt is not None
4324 expected_sname = kdc_exchange_dict['expected_sname']
4325 expect_client_claims = kdc_exchange_dict['expect_client_claims']
4326 expect_device_info = kdc_exchange_dict['expect_device_info']
4327 expect_device_claims = kdc_exchange_dict['expect_device_claims']
4329 expected_types = [krb5pac.PAC_TYPE_LOGON_INFO,
4330 krb5pac.PAC_TYPE_SRV_CHECKSUM,
4331 krb5pac.PAC_TYPE_KDC_CHECKSUM,
4332 krb5pac.PAC_TYPE_LOGON_NAME,
4333 krb5pac.PAC_TYPE_UPN_DNS_INFO]
4335 kdc_options = kdc_exchange_dict['kdc_options']
4336 pos = len(tuple(krb5_asn1.KDCOptions('cname-in-addl-tkt'))) - 1
4337 constrained_delegation = (pos < len(kdc_options)
4338 and kdc_options[pos] == '1')
4339 if constrained_delegation:
4340 expected_types.append(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION)
4342 require_strict = set()
4343 unchecked = set()
4344 if not self.tkt_sig_support:
4345 require_strict.add(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
4346 if not self.full_sig_support:
4347 require_strict.add(krb5pac.PAC_TYPE_FULL_CHECKSUM)
4349 expected_client_claims = kdc_exchange_dict['expected_client_claims']
4350 unexpected_client_claims = kdc_exchange_dict[
4351 'unexpected_client_claims']
4353 if self.kdc_claims_support and expect_client_claims:
4354 expected_types.append(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
4355 else:
4356 self.assertFalse(
4357 expected_client_claims,
4358 'expected client claims, but client claims not expected in '
4359 'PAC')
4360 self.assertFalse(
4361 unexpected_client_claims,
4362 'unexpected client claims, but client claims not expected in '
4363 'PAC')
4365 if expect_client_claims is None:
4366 unchecked.add(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
4368 expected_device_claims = kdc_exchange_dict['expected_device_claims']
4369 unexpected_device_claims = kdc_exchange_dict['unexpected_device_claims']
4371 expected_device_groups = kdc_exchange_dict['expected_device_groups']
4373 if (self.kdc_claims_support and self.kdc_compound_id_support
4374 and expect_device_claims and compound_id):
4375 expected_types.append(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO)
4376 else:
4377 self.assertFalse(
4378 expect_device_claims,
4379 'expected device claims buffer, but device claims not '
4380 'expected in PAC')
4381 self.assertFalse(
4382 expected_device_claims,
4383 'expected device claims, but device claims not expected in '
4384 'PAC')
4385 self.assertFalse(
4386 unexpected_device_claims,
4387 'unexpected device claims, but device claims not expected in '
4388 'PAC')
4390 if expect_device_claims is None and compound_id:
4391 unchecked.add(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO)
4393 if self.kdc_compound_id_support and compound_id and expect_device_info:
4394 expected_types.append(krb5pac.PAC_TYPE_DEVICE_INFO)
4395 else:
4396 self.assertFalse(expect_device_info,
4397 'expected device info with no armor TGT or '
4398 'for non-TGS request')
4399 self.assertFalse(expected_device_groups,
4400 'expected device groups, but device info not '
4401 'expected in PAC')
4403 if expect_device_info is None and compound_id:
4404 unchecked.add(krb5pac.PAC_TYPE_DEVICE_INFO)
4406 if rep_msg_type == KRB_TGS_REP:
4407 if not self.is_tgs_principal(expected_sname):
4408 expected_types.append(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
4409 expected_types.append(krb5pac.PAC_TYPE_FULL_CHECKSUM)
4411 expect_extra_pac_buffers = self.is_tgs(expected_sname)
4413 expect_pac_attrs = kdc_exchange_dict['expect_pac_attrs']
4415 if expect_pac_attrs:
4416 expect_pac_attrs_pac_request = kdc_exchange_dict[
4417 'expect_pac_attrs_pac_request']
4418 else:
4419 expect_pac_attrs_pac_request = kdc_exchange_dict[
4420 'pac_request']
4422 if expect_pac_attrs is None:
4423 if self.expect_extra_pac_buffers:
4424 expect_pac_attrs = expect_extra_pac_buffers
4425 else:
4426 require_strict.add(krb5pac.PAC_TYPE_ATTRIBUTES_INFO)
4427 if expect_pac_attrs:
4428 expected_types.append(krb5pac.PAC_TYPE_ATTRIBUTES_INFO)
4430 expect_requester_sid = kdc_exchange_dict['expect_requester_sid']
4431 expected_requester_sid = kdc_exchange_dict['expected_requester_sid']
4433 if expect_requester_sid is None:
4434 if self.expect_extra_pac_buffers:
4435 expect_requester_sid = expect_extra_pac_buffers
4436 else:
4437 require_strict.add(krb5pac.PAC_TYPE_REQUESTER_SID)
4438 if expected_requester_sid is not None:
4439 expect_requester_sid = True
4440 if expect_requester_sid:
4441 expected_types.append(krb5pac.PAC_TYPE_REQUESTER_SID)
4443 sent_pk_as_req = self.sent_pk_as_req(kdc_exchange_dict) or (
4444 self.sent_pk_as_req_win2k(kdc_exchange_dict))
4445 if sent_pk_as_req:
4446 expected_types.append(krb5pac.PAC_TYPE_CREDENTIAL_INFO)
4448 buffer_types = [pac_buffer.type
4449 for pac_buffer in pac.buffers]
4450 self.assertSequenceElementsEqual(
4451 expected_types, buffer_types,
4452 require_ordered=False,
4453 require_strict=require_strict,
4454 unchecked=unchecked)
4456 expected_account_name = kdc_exchange_dict['expected_account_name']
4457 expected_sid = kdc_exchange_dict['expected_sid']
4459 expect_upn_dns_info_ex = kdc_exchange_dict['expect_upn_dns_info_ex']
4460 if expect_upn_dns_info_ex is None and (
4461 expected_account_name is not None
4462 or expected_sid is not None):
4463 expect_upn_dns_info_ex = True
4465 for pac_buffer in pac.buffers:
4466 if pac_buffer.type == krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION:
4467 expected_proxy_target = kdc_exchange_dict[
4468 'expected_proxy_target']
4469 expected_transited_services = kdc_exchange_dict[
4470 'expected_transited_services']
4472 delegation_info = pac_buffer.info.info
4474 self.assertEqual(expected_proxy_target,
4475 str(delegation_info.proxy_target))
4477 transited_services = list(map(
4478 str, delegation_info.transited_services))
4479 self.assertEqual(expected_transited_services,
4480 transited_services)
4482 elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
4483 expected_cname = kdc_exchange_dict['expected_cname']
4484 account_name = '/'.join(expected_cname['name-string'])
4486 self.assertEqual(account_name, pac_buffer.info.account_name)
4488 elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
4489 info3 = pac_buffer.info.info.info3
4490 logon_info = info3.base
4492 if expected_account_name is not None:
4493 self.assertEqual(expected_account_name,
4494 str(logon_info.account_name))
4496 self.check_logon_info_sids(pac_buffer, kdc_exchange_dict)
4498 elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
4499 upn_dns_info = pac_buffer.info
4500 upn_dns_info_ex = upn_dns_info.ex
4502 expected_realm = kdc_exchange_dict['expected_crealm']
4503 self.assertEqual(expected_realm,
4504 upn_dns_info.dns_domain_name)
4506 expected_upn_name = kdc_exchange_dict['expected_upn_name']
4507 if expected_upn_name is not None:
4508 self.assertEqual(expected_upn_name,
4509 upn_dns_info.upn_name)
4511 if expect_upn_dns_info_ex:
4512 self.assertIsNotNone(upn_dns_info_ex)
4514 if upn_dns_info_ex is not None:
4515 if expected_account_name is not None:
4516 self.assertEqual(expected_account_name,
4517 upn_dns_info_ex.samaccountname)
4519 if expected_sid is not None:
4520 self.assertEqual(expected_sid,
4521 str(upn_dns_info_ex.objectsid))
4523 elif (pac_buffer.type == krb5pac.PAC_TYPE_ATTRIBUTES_INFO
4524 and expect_pac_attrs):
4525 attr_info = pac_buffer.info
4527 self.assertEqual(2, attr_info.flags_length)
4529 flags = attr_info.flags
4531 requested_pac = bool(flags & 1)
4532 given_pac = bool(flags & 2)
4534 self.assertEqual(expect_pac_attrs_pac_request is True,
4535 requested_pac)
4536 self.assertEqual(expect_pac_attrs_pac_request is None,
4537 given_pac)
4539 elif (pac_buffer.type == krb5pac.PAC_TYPE_REQUESTER_SID
4540 and expect_requester_sid):
4541 requester_sid = pac_buffer.info.sid
4543 if expected_requester_sid is None:
4544 expected_requester_sid = expected_sid
4545 if expected_sid is not None:
4546 self.assertEqual(expected_requester_sid,
4547 str(requester_sid))
4549 elif pac_buffer.type in {krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO,
4550 krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO}:
4551 remaining = pac_buffer.info.remaining
4553 if pac_buffer.type == krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO:
4554 claims_type = 'client claims'
4555 expected_claims = expected_client_claims
4556 unexpected_claims = unexpected_client_claims
4557 else:
4558 claims_type = 'device claims'
4559 expected_claims = expected_device_claims
4560 unexpected_claims = unexpected_device_claims
4562 if not remaining:
4563 # Windows may produce an empty claims buffer.
4564 self.assertFalse(expected_claims,
4565 f'expected {claims_type}, but the PAC '
4566 f'buffer was empty')
4567 continue
4569 if expected_claims:
4570 empty_msg = ', and {claims_type} were expected'
4571 else:
4572 empty_msg = ' for {claims_type} (should be missing)'
4574 client_claims = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR,
4575 remaining)
4576 client_claims = client_claims.claims.metadata
4577 self.assertIsNotNone(client_claims,
4578 f'got empty CLAIMS_SET_METADATA_NDR '
4579 f'inner structure {empty_msg}')
4581 self.assertIsNotNone(client_claims.claims_set,
4582 f'got empty CLAIMS_SET_METADATA '
4583 f'structure {empty_msg}')
4585 uncompressed_size = client_claims.uncompressed_claims_set_size
4586 compression_format = client_claims.compression_format
4588 if uncompressed_size < claims.CLAIM_MINIMUM_BYTES_TO_COMPRESS:
4589 self.assertEqual(claims.CLAIMS_COMPRESSION_FORMAT_NONE,
4590 compression_format,
4591 f'{claims_type} unexpectedly '
4592 f'compressed ({uncompressed_size} '
4593 f'bytes uncompressed)')
4594 else:
4595 self.assertEqual(
4596 claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF,
4597 compression_format,
4598 f'{claims_type} unexpectedly not compressed '
4599 f'({uncompressed_size} bytes uncompressed)')
4601 claims_set = client_claims.claims_set.claims.claims
4602 self.assertIsNotNone(claims_set,
4603 f'got empty CLAIMS_SET_NDR inner '
4604 f'structure {empty_msg}')
4606 claims_arrays = claims_set.claims_arrays
4607 self.assertIsNotNone(claims_arrays,
4608 f'got empty CLAIMS_SET structure '
4609 f'{empty_msg}')
4610 self.assertGreater(len(claims_arrays), 0,
4611 f'got empty claims array {empty_msg}')
4612 self.assertEqual(len(claims_arrays),
4613 claims_set.claims_array_count,
4614 f'{claims_type} arrays size mismatch')
4616 got_claims = {}
4618 for claims_array in claims_arrays:
4619 claim_entries = claims_array.claim_entries
4620 self.assertIsNotNone(claim_entries,
4621 f'got empty CLAIMS_ARRAY structure '
4622 f'{empty_msg}')
4623 self.assertGreater(len(claim_entries), 0,
4624 f'got empty claim entries array '
4625 f'{empty_msg}')
4626 self.assertEqual(len(claim_entries),
4627 claims_array.claims_count,
4628 f'{claims_type} entries array size '
4629 f'mismatch')
4631 for entry in claim_entries:
4632 if unexpected_claims is not None:
4633 self.assertNotIn(entry.id, unexpected_claims,
4634 f'got unexpected {claims_type} '
4635 f'in PAC')
4636 if expected_claims is None:
4637 continue
4639 expected_claim = expected_claims.get(entry.id)
4640 if expected_claim is None:
4641 continue
4643 self.assertNotIn(entry.id, got_claims,
4644 f'got duplicate {claims_type}')
4646 self.assertIsNotNone(entry.values.values,
4647 f'got {claims_type} with no '
4648 f'values')
4649 self.assertGreater(len(entry.values.values), 0,
4650 f'got empty {claims_type} values '
4651 f'array')
4652 self.assertEqual(len(entry.values.values),
4653 entry.values.value_count,
4654 f'{claims_type} values array size '
4655 f'mismatch')
4657 expected_claim_values = expected_claim.get('values')
4658 self.assertIsNotNone(expected_claim_values,
4659 f'got expected {claims_type} '
4660 f'with no values')
4662 values = type(expected_claim_values)(
4663 entry.values.values)
4665 got_claims[entry.id] = {
4666 'source_type': claims_array.claims_source_type,
4667 'type': entry.type,
4668 'values': values,
4671 self.assertEqual(expected_claims, got_claims or None,
4672 f'{claims_type} did not match expectations')
4674 elif pac_buffer.type == krb5pac.PAC_TYPE_DEVICE_INFO:
4675 device_info = pac_buffer.info.info
4677 self.check_device_info(device_info, kdc_exchange_dict)
4679 elif pac_buffer.type == krb5pac.PAC_TYPE_CREDENTIAL_INFO:
4680 credential_info = pac_buffer.info
4682 expected_etype = self.expected_etype(kdc_exchange_dict)
4684 self.assertEqual(0, credential_info.version)
4685 self.assertEqual(expected_etype,
4686 credential_info.encryption_type)
4688 encrypted_data = credential_info.encrypted_data
4689 reply_key = kdc_exchange_dict['reply_key']
4691 data = reply_key.decrypt(KU_NON_KERB_SALT, encrypted_data)
4693 credential_data_ndr = ndr_unpack(
4694 krb5pac.PAC_CREDENTIAL_DATA_NDR, data)
4696 credential_data = credential_data_ndr.ctr.data
4698 self.assertEqual(1, credential_data.credential_count)
4699 self.assertEqual(credential_data.credential_count,
4700 len(credential_data.credentials))
4702 package = credential_data.credentials[0]
4703 self.assertEqual('NTLM', str(package.package_name))
4705 ntlm_blob = bytes(package.credential)
4707 ntlm_package = ndr_unpack(krb5pac.PAC_CREDENTIAL_NTLM_SECPKG,
4708 ntlm_blob)
4710 self.assertEqual(0, ntlm_package.version)
4711 self.assertEqual(krb5pac.PAC_CREDENTIAL_NTLM_HAS_NT_HASH,
4712 ntlm_package.flags)
4714 creds = kdc_exchange_dict['creds']
4715 nt_password = bytes(ntlm_package.nt_password.hash)
4716 self.assertEqual(creds.get_nt_hash(), nt_password)
4718 lm_password = bytes(ntlm_package.lm_password.hash)
4719 self.assertEqual(bytes(16), lm_password)
4721 def generic_check_kdc_error(self,
4722 kdc_exchange_dict,
4723 callback_dict,
4724 rep,
4725 inner=False):
4727 rep_msg_type = kdc_exchange_dict['rep_msg_type']
4729 expected_anon = kdc_exchange_dict['expected_anon']
4730 expected_srealm = kdc_exchange_dict['expected_srealm']
4731 expected_sname = kdc_exchange_dict['expected_sname']
4732 expected_error_mode = kdc_exchange_dict['expected_error_mode']
4734 sent_fast = self.sent_fast(kdc_exchange_dict)
4736 fast_armor_type = kdc_exchange_dict['fast_armor_type']
4738 self.assertElementEqual(rep, 'pvno', 5)
4739 self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
4740 error_code = self.getElementValue(rep, 'error-code')
4741 self.assertIn(error_code, expected_error_mode)
4742 if self.strict_checking:
4743 self.assertElementMissing(rep, 'ctime')
4744 self.assertElementMissing(rep, 'cusec')
4745 self.assertElementPresent(rep, 'stime')
4746 self.assertElementPresent(rep, 'susec')
4747 # error-code checked above
4748 if expected_anon and not inner:
4749 expected_cname = self.PrincipalName_create(
4750 name_type=NT_WELLKNOWN,
4751 names=['WELLKNOWN', 'ANONYMOUS'])
4752 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
4753 elif self.strict_checking:
4754 self.assertElementMissing(rep, 'cname')
4755 if self.strict_checking:
4756 self.assertElementMissing(rep, 'crealm')
4757 self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
4758 self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
4759 self.assertElementMissing(rep, 'e-text')
4760 expect_status = kdc_exchange_dict['expect_status']
4761 expected_status = kdc_exchange_dict['expected_status']
4762 expect_edata = kdc_exchange_dict['expect_edata']
4763 if expect_edata is None:
4764 expect_edata = (error_code != KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
4765 and (not sent_fast or fast_armor_type is None
4766 or fast_armor_type == FX_FAST_ARMOR_AP_REQUEST)
4767 and not inner)
4768 if inner and expect_edata is self.expect_padata_outer:
4769 expect_edata = False
4770 if not expect_edata:
4771 self.assertFalse(expect_status)
4772 if self.strict_checking or expect_status is False:
4773 self.assertElementMissing(rep, 'e-data')
4774 return rep
4775 edata = self.getElementValue(rep, 'e-data')
4776 if self.strict_checking or expect_status:
4777 self.assertIsNotNone(edata)
4778 if edata is not None:
4779 try:
4780 error_data = self.der_decode(
4781 edata,
4782 asn1Spec=krb5_asn1.KERB_ERROR_DATA())
4783 except PyAsn1Error:
4784 if expect_status:
4785 # The test requires that the KDC be declared to support
4786 # NTSTATUS values in e-data to proceed.
4787 self.assertTrue(
4788 self.expect_nt_status,
4789 'expected status code (which, according to '
4790 'EXPECT_NT_STATUS=0, the KDC does not support)')
4792 self.fail('expected to get status code')
4794 rep_padata = self.der_decode(
4795 edata, asn1Spec=krb5_asn1.METHOD_DATA())
4796 self.assertGreater(len(rep_padata), 0)
4798 if sent_fast:
4799 self.assertEqual(1, len(rep_padata))
4800 rep_pa_dict = self.get_pa_dict(rep_padata)
4801 self.assertIn(PADATA_FX_FAST, rep_pa_dict)
4803 armor_key = kdc_exchange_dict['armor_key']
4804 self.assertIsNotNone(armor_key)
4805 fast_response = self.check_fx_fast_data(
4806 kdc_exchange_dict,
4807 rep_pa_dict[PADATA_FX_FAST],
4808 armor_key,
4809 expect_strengthen_key=False)
4811 rep_padata = fast_response['padata']
4813 etype_info2 = self.check_rep_padata(kdc_exchange_dict,
4814 callback_dict,
4815 rep_padata,
4816 error_code)
4818 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
4819 else:
4820 self.assertTrue(self.expect_nt_status,
4821 'got status code, but EXPECT_NT_STATUS=0')
4823 if expect_status is not None:
4824 self.assertTrue(expect_status,
4825 'got unexpected status code')
4827 self.assertEqual(KERB_ERR_TYPE_EXTENDED,
4828 error_data['data-type'])
4830 extended_error = error_data['data-value']
4832 self.assertEqual(12, len(extended_error))
4834 status = int.from_bytes(extended_error[:4], 'little')
4835 flags = int.from_bytes(extended_error[8:], 'little')
4837 self.assertEqual(expected_status, status)
4839 if rep_msg_type == KRB_TGS_REP:
4840 self.assertEqual(3, flags)
4841 else:
4842 self.assertEqual(1, flags)
4844 return rep
4846 def check_reply_padata(self,
4847 kdc_exchange_dict,
4848 callback_dict,
4849 encpart,
4850 rep_padata):
4851 expected_patypes = ()
4853 sent_fast = self.sent_fast(kdc_exchange_dict)
4854 rep_msg_type = kdc_exchange_dict['rep_msg_type']
4856 if sent_fast:
4857 expected_patypes += (PADATA_FX_FAST,)
4858 elif rep_msg_type == KRB_AS_REP:
4859 if self.sent_pk_as_req(kdc_exchange_dict):
4860 expected_patypes += PADATA_PK_AS_REP,
4861 elif self.sent_pk_as_req_win2k(kdc_exchange_dict):
4862 expected_patypes += PADATA_PK_AS_REP_19,
4863 else:
4864 chosen_etype = self.getElementValue(encpart, 'etype')
4865 self.assertIsNotNone(chosen_etype)
4867 if chosen_etype in {kcrypto.Enctype.AES256,
4868 kcrypto.Enctype.AES128}:
4869 expected_patypes += (PADATA_ETYPE_INFO2,)
4871 preauth_key = kdc_exchange_dict['preauth_key']
4872 self.assertIsInstance(preauth_key, Krb5EncryptionKey)
4873 if preauth_key.etype == kcrypto.Enctype.RC4 and rep_padata is None:
4874 rep_padata = ()
4875 elif rep_msg_type == KRB_TGS_REP:
4876 if expected_patypes == () and rep_padata is None:
4877 rep_padata = ()
4879 if not self.strict_checking and rep_padata is None:
4880 rep_padata = ()
4882 self.assertIsNotNone(rep_padata)
4883 got_patypes = tuple(pa['padata-type'] for pa in rep_padata)
4884 self.assertSequenceElementsEqual(expected_patypes, got_patypes,
4885 # Windows does not add this.
4886 unchecked={PADATA_PKINIT_KX})
4888 if len(expected_patypes) == 0:
4889 return None
4891 pa_dict = self.get_pa_dict(rep_padata)
4893 etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2)
4894 if etype_info2 is not None:
4895 etype_info2 = self.der_decode(etype_info2,
4896 asn1Spec=krb5_asn1.ETYPE_INFO2())
4897 self.assertEqual(len(etype_info2), 1)
4898 elem = etype_info2[0]
4900 e = self.getElementValue(elem, 'etype')
4901 self.assertEqual(e, chosen_etype)
4902 salt = self.getElementValue(elem, 'salt')
4903 self.assertIsNotNone(salt)
4904 expected_salt = kdc_exchange_dict['expected_salt']
4905 if expected_salt is not None:
4906 self.assertEqual(salt, expected_salt)
4907 s2kparams = self.getElementValue(elem, 's2kparams')
4908 if self.strict_checking:
4909 self.assertIsNone(s2kparams)
4911 @staticmethod
4912 def greatest_common_etype(etypes, proposed_etypes):
4913 return max(filter(lambda e: e in etypes, proposed_etypes),
4914 default=None)
4916 @staticmethod
4917 def first_common_etype(etypes, proposed_etypes):
4918 return next(filter(lambda e: e in etypes, proposed_etypes), None)
4920 def supported_aes_rc4_etypes(self, kdc_exchange_dict):
4921 creds = kdc_exchange_dict['creds']
4922 supported_etypes = self.get_default_enctypes(creds)
4924 rc4_support = kdc_exchange_dict['rc4_support']
4926 aes_etypes = set()
4927 if kcrypto.Enctype.AES256 in supported_etypes:
4928 aes_etypes.add(kcrypto.Enctype.AES256)
4929 if kcrypto.Enctype.AES128 in supported_etypes:
4930 aes_etypes.add(kcrypto.Enctype.AES128)
4932 rc4_etypes = set()
4933 if rc4_support and kcrypto.Enctype.RC4 in supported_etypes:
4934 rc4_etypes.add(kcrypto.Enctype.RC4)
4936 return aes_etypes, rc4_etypes
4938 def greatest_aes_rc4_etypes(self, kdc_exchange_dict):
4939 req_body = kdc_exchange_dict['req_body']
4940 proposed_etypes = req_body['etype']
4942 aes_etypes, rc4_etypes = self.supported_aes_rc4_etypes(kdc_exchange_dict)
4944 expected_aes = self.greatest_common_etype(aes_etypes, proposed_etypes)
4945 expected_rc4 = self.greatest_common_etype(rc4_etypes, proposed_etypes)
4947 return expected_aes, expected_rc4
4949 def expected_etype(self, kdc_exchange_dict):
4950 req_body = kdc_exchange_dict['req_body']
4951 proposed_etypes = req_body['etype']
4953 aes_etypes, rc4_etypes = self.supported_aes_rc4_etypes(
4954 kdc_exchange_dict)
4956 return self.first_common_etype(aes_etypes | rc4_etypes,
4957 proposed_etypes)
4959 def check_rep_padata(self,
4960 kdc_exchange_dict,
4961 callback_dict,
4962 rep_padata,
4963 error_code):
4964 rep_msg_type = kdc_exchange_dict['rep_msg_type']
4966 sent_fast = self.sent_fast(kdc_exchange_dict)
4967 sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict)
4969 if rep_msg_type == KRB_TGS_REP:
4970 self.assertTrue(sent_fast)
4972 rc4_support = kdc_exchange_dict['rc4_support']
4974 expected_aes, expected_rc4 = self.greatest_aes_rc4_etypes(
4975 kdc_exchange_dict)
4977 expect_etype_info2 = ()
4978 expect_etype_info = False
4979 if expected_aes is not None:
4980 expect_etype_info2 += (expected_aes,)
4981 if expected_rc4 is not None:
4982 if error_code != 0:
4983 expect_etype_info2 += (expected_rc4,)
4984 if expected_aes is None:
4985 expect_etype_info = True
4987 if expect_etype_info:
4988 self.assertGreater(len(expect_etype_info2), 0)
4990 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
4992 check_patypes = kdc_exchange_dict['check_patypes']
4993 if check_patypes:
4994 expected_patypes = ()
4995 if sent_fast and error_code != 0:
4996 expected_patypes += (PADATA_FX_ERROR,)
4997 expected_patypes += (PADATA_FX_COOKIE,)
4999 if rep_msg_type == KRB_TGS_REP:
5000 if ('1' in sent_pac_options
5001 and error_code not in (0, KDC_ERR_GENERIC)):
5002 expected_patypes += (PADATA_PAC_OPTIONS,)
5003 elif error_code != KDC_ERR_GENERIC:
5004 if expect_etype_info:
5005 expected_patypes += (PADATA_ETYPE_INFO,)
5006 if len(expect_etype_info2) != 0:
5007 expected_patypes += (PADATA_ETYPE_INFO2,)
5009 sent_freshness = self.sent_freshness(kdc_exchange_dict)
5011 if error_code not in (KDC_ERR_PREAUTH_FAILED, KDC_ERR_SKEW,
5012 KDC_ERR_POLICY, KDC_ERR_CLIENT_REVOKED):
5013 if sent_fast:
5014 expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,)
5015 else:
5016 expected_patypes += (PADATA_ENC_TIMESTAMP,)
5018 if not sent_enc_challenge:
5019 expected_patypes += (PADATA_PK_AS_REQ,)
5020 if not sent_freshness:
5021 expected_patypes += (PADATA_PK_AS_REP_19,)
5023 if sent_freshness:
5024 expected_patypes += PADATA_AS_FRESHNESS,
5026 if (self.kdc_fast_support
5027 and not sent_fast
5028 and not sent_enc_challenge):
5029 expected_patypes += (PADATA_FX_FAST,)
5030 expected_patypes += (PADATA_FX_COOKIE,)
5032 require_strict = {PADATA_FX_COOKIE,
5033 PADATA_FX_FAST,
5034 PADATA_PAC_OPTIONS,
5035 PADATA_PK_AS_REP_19,
5036 PADATA_PK_AS_REQ,
5037 PADATA_PKINIT_KX,
5038 PADATA_GSS}
5039 strict_edata_checking = kdc_exchange_dict['strict_edata_checking']
5040 if not strict_edata_checking:
5041 require_strict.add(PADATA_ETYPE_INFO2)
5042 require_strict.add(PADATA_ENCRYPTED_CHALLENGE)
5044 got_patypes = tuple(pa['padata-type'] for pa in rep_padata)
5045 self.assertSequenceElementsEqual(expected_patypes, got_patypes,
5046 require_strict=require_strict)
5048 if not expected_patypes:
5049 return None
5051 pa_dict = self.get_pa_dict(rep_padata)
5053 enc_timestamp = pa_dict.get(PADATA_ENC_TIMESTAMP)
5054 if enc_timestamp is not None:
5055 self.assertEqual(len(enc_timestamp), 0)
5057 pk_as_req = pa_dict.get(PADATA_PK_AS_REQ)
5058 if pk_as_req is not None:
5059 self.assertEqual(len(pk_as_req), 0)
5061 pk_as_rep19 = pa_dict.get(PADATA_PK_AS_REP_19)
5062 if pk_as_rep19 is not None:
5063 self.assertEqual(len(pk_as_rep19), 0)
5065 freshness_token = pa_dict.get(PADATA_AS_FRESHNESS)
5066 if freshness_token is not None:
5067 self.assertEqual(bytes(2), freshness_token[:2])
5069 freshness = self.der_decode(freshness_token[2:],
5070 asn1Spec=krb5_asn1.EncryptedData())
5072 krbtgt_creds = self.get_krbtgt_creds()
5073 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
5075 self.assertElementEqual(freshness, 'etype', krbtgt_key.etype)
5076 self.assertElementKVNO(freshness, 'kvno', krbtgt_key.kvno)
5078 # Decrypt the freshness token.
5079 ts_enc = krbtgt_key.decrypt(KU_AS_FRESHNESS,
5080 freshness['cipher'])
5082 # Ensure that we can decode it as PA-ENC-TS-ENC.
5083 ts_enc = self.der_decode(ts_enc,
5084 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
5085 freshness_time = self.get_EpochFromKerberosTime(
5086 ts_enc['patimestamp'])
5087 freshness_time += ts_enc['pausec'] / 1e6
5089 # Ensure that it is reasonably close to the current time (within
5090 # five minutes, to allow for clock skew).
5091 current_time = datetime.datetime.now(
5092 datetime.timezone.utc).timestamp()
5093 self.assertLess(current_time - 5 * 60, freshness_time)
5094 self.assertLess(freshness_time, current_time + 5 * 60)
5096 kdc_exchange_dict['freshness_token'] = freshness_token
5098 fx_fast = pa_dict.get(PADATA_FX_FAST)
5099 if fx_fast is not None:
5100 self.assertEqual(len(fx_fast), 0)
5102 fast_cookie = pa_dict.get(PADATA_FX_COOKIE)
5103 if fast_cookie is not None:
5104 kdc_exchange_dict['fast_cookie'] = fast_cookie
5106 fast_error = pa_dict.get(PADATA_FX_ERROR)
5107 if fast_error is not None:
5108 fast_error = self.der_decode(fast_error,
5109 asn1Spec=krb5_asn1.KRB_ERROR())
5110 self.generic_check_kdc_error(kdc_exchange_dict,
5111 callback_dict,
5112 fast_error,
5113 inner=True)
5115 pac_options = pa_dict.get(PADATA_PAC_OPTIONS)
5116 if pac_options is not None:
5117 pac_options = self.der_decode(
5118 pac_options,
5119 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
5120 self.assertElementEqual(pac_options, 'options', sent_pac_options)
5122 enc_challenge = pa_dict.get(PADATA_ENCRYPTED_CHALLENGE)
5123 if enc_challenge is not None:
5124 if not sent_enc_challenge:
5125 self.assertEqual(len(enc_challenge), 0)
5126 else:
5127 armor_key = kdc_exchange_dict['armor_key']
5128 self.assertIsNotNone(armor_key)
5130 preauth_key, _ = self.get_preauth_key(kdc_exchange_dict)
5132 kdc_challenge_key = self.generate_kdc_challenge_key(
5133 armor_key, preauth_key)
5135 # Ensure that the encrypted challenge FAST factor is supported
5136 # (RFC6113 5.4.6).
5137 if self.strict_checking:
5138 self.assertNotEqual(len(enc_challenge), 0)
5139 if len(enc_challenge) != 0:
5140 encrypted_challenge = self.der_decode(
5141 enc_challenge,
5142 asn1Spec=krb5_asn1.EncryptedData())
5143 self.assertEqual(encrypted_challenge['etype'],
5144 kdc_challenge_key.etype)
5146 challenge = kdc_challenge_key.decrypt(
5147 KU_ENC_CHALLENGE_KDC,
5148 encrypted_challenge['cipher'])
5149 challenge = self.der_decode(
5150 challenge,
5151 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
5153 # Retrieve the returned timestamp.
5154 rep_patime = challenge['patimestamp']
5155 self.assertIn('pausec', challenge)
5157 # Ensure the returned time is within five minutes of the
5158 # current time.
5159 rep_time = self.get_EpochFromKerberosTime(rep_patime)
5160 current_time = time.time()
5162 self.assertLess(current_time - 300, rep_time)
5163 self.assertLess(rep_time, current_time + 300)
5165 etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2)
5166 if etype_info2 is not None:
5167 etype_info2 = self.der_decode(etype_info2,
5168 asn1Spec=krb5_asn1.ETYPE_INFO2())
5169 self.assertGreaterEqual(len(etype_info2), 1)
5170 if self.strict_checking:
5171 self.assertEqual(len(etype_info2), len(expect_etype_info2))
5172 for i in range(0, len(etype_info2)):
5173 e = self.getElementValue(etype_info2[i], 'etype')
5174 if self.strict_checking:
5175 self.assertEqual(e, expect_etype_info2[i])
5176 salt = self.getElementValue(etype_info2[i], 'salt')
5177 if e == kcrypto.Enctype.RC4:
5178 if self.strict_checking:
5179 self.assertIsNone(salt)
5180 else:
5181 self.assertIsNotNone(salt)
5182 expected_salt = kdc_exchange_dict['expected_salt']
5183 if expected_salt is not None:
5184 self.assertEqual(salt, expected_salt)
5185 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
5186 if self.strict_checking:
5187 self.assertIsNone(s2kparams)
5189 etype_info = pa_dict.get(PADATA_ETYPE_INFO)
5190 if etype_info is not None:
5191 etype_info = self.der_decode(etype_info,
5192 asn1Spec=krb5_asn1.ETYPE_INFO())
5193 self.assertEqual(len(etype_info), 1)
5194 e = self.getElementValue(etype_info[0], 'etype')
5195 self.assertEqual(e, kcrypto.Enctype.RC4)
5196 if rc4_support:
5197 self.assertEqual(e, expect_etype_info2[0])
5198 salt = self.getElementValue(etype_info[0], 'salt')
5199 if self.strict_checking:
5200 self.assertIsNotNone(salt)
5201 self.assertEqual(len(salt), 0)
5203 return etype_info2
5205 def generate_simple_fast(self,
5206 kdc_exchange_dict,
5207 _callback_dict,
5208 req_body,
5209 fast_padata,
5210 fast_armor,
5211 checksum,
5212 fast_options=''):
5213 armor_key = kdc_exchange_dict['armor_key']
5215 fast_req = self.KRB_FAST_REQ_create(fast_options,
5216 fast_padata,
5217 req_body)
5218 fast_req = self.der_encode(fast_req,
5219 asn1Spec=krb5_asn1.KrbFastReq())
5220 fast_req = self.EncryptedData_create(armor_key,
5221 KU_FAST_ENC,
5222 fast_req)
5224 fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
5225 checksum,
5226 fast_req)
5228 fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
5229 fx_fast_request = self.der_encode(
5230 fx_fast_request,
5231 asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
5233 fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
5234 fx_fast_request)
5236 return fast_padata
5238 def generate_ap_req(self,
5239 kdc_exchange_dict,
5240 _callback_dict,
5241 req_body,
5242 armor,
5243 usage=None,
5244 seq_number=None):
5245 req_body_checksum = None
5247 if armor:
5248 self.assertIsNone(req_body)
5250 tgt = kdc_exchange_dict['armor_tgt']
5251 authenticator_subkey = kdc_exchange_dict['armor_subkey']
5252 else:
5253 tgt = kdc_exchange_dict['tgt']
5254 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
5256 if req_body is not None:
5257 body_checksum_type = kdc_exchange_dict['body_checksum_type']
5259 req_body_blob = self.der_encode(
5260 req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY())
5262 req_body_checksum = self.Checksum_create(
5263 tgt.session_key,
5264 KU_TGS_REQ_AUTH_CKSUM,
5265 req_body_blob,
5266 ctype=body_checksum_type)
5268 auth_data = kdc_exchange_dict['auth_data']
5270 subkey_obj = None
5271 if authenticator_subkey is not None:
5272 subkey_obj = authenticator_subkey.export_obj()
5273 if seq_number is None:
5274 seq_number = random.randint(0, 0xfffffffe)
5275 (ctime, cusec) = self.get_KerberosTimeWithUsec()
5276 authenticator_obj = self.Authenticator_create(
5277 crealm=tgt.crealm,
5278 cname=tgt.cname,
5279 cksum=req_body_checksum,
5280 cusec=cusec,
5281 ctime=ctime,
5282 subkey=subkey_obj,
5283 seq_number=seq_number,
5284 authorization_data=auth_data)
5285 authenticator_blob = self.der_encode(
5286 authenticator_obj,
5287 asn1Spec=krb5_asn1.Authenticator())
5289 if usage is None:
5290 usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
5291 authenticator = self.EncryptedData_create(tgt.session_key,
5292 usage,
5293 authenticator_blob)
5295 if armor:
5296 ap_options = kdc_exchange_dict['fast_ap_options']
5297 else:
5298 ap_options = kdc_exchange_dict['ap_options']
5299 if ap_options is None:
5300 ap_options = str(krb5_asn1.APOptions('0'))
5301 ap_req_obj = self.AP_REQ_create(ap_options=ap_options,
5302 ticket=tgt.ticket,
5303 authenticator=authenticator)
5304 ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
5306 return ap_req
5308 def generate_simple_tgs_padata(self,
5309 kdc_exchange_dict,
5310 callback_dict,
5311 req_body):
5312 ap_req = self.generate_ap_req(kdc_exchange_dict,
5313 callback_dict,
5314 req_body,
5315 armor=False)
5316 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
5317 padata = [pa_tgs_req]
5319 return padata, req_body
5321 def get_preauth_key(self, kdc_exchange_dict):
5322 msg_type = kdc_exchange_dict['rep_msg_type']
5324 if msg_type == KRB_AS_REP:
5325 key = kdc_exchange_dict['preauth_key']
5326 usage = KU_AS_REP_ENC_PART
5327 else: # KRB_TGS_REP
5328 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
5329 if authenticator_subkey is not None:
5330 key = authenticator_subkey
5331 usage = KU_TGS_REP_ENC_PART_SUB_KEY
5332 else:
5333 tgt = kdc_exchange_dict['tgt']
5334 key = tgt.session_key
5335 usage = KU_TGS_REP_ENC_PART_SESSION
5337 self.assertIsNotNone(key)
5339 return key, usage
5341 def generate_armor_key(self, subkey, session_key):
5342 armor_key = kcrypto.cf2(subkey.key,
5343 session_key.key,
5344 b'subkeyarmor',
5345 b'ticketarmor')
5346 armor_key = Krb5EncryptionKey(armor_key, None)
5348 return armor_key
5350 def generate_strengthen_reply_key(self, strengthen_key, reply_key):
5351 strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
5352 reply_key.key,
5353 b'strengthenkey',
5354 b'replykey')
5355 strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
5356 reply_key.kvno)
5358 return strengthen_reply_key
5360 def generate_client_challenge_key(self, armor_key, longterm_key):
5361 client_challenge_key = kcrypto.cf2(armor_key.key,
5362 longterm_key.key,
5363 b'clientchallengearmor',
5364 b'challengelongterm')
5365 client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
5367 return client_challenge_key
5369 def generate_kdc_challenge_key(self, armor_key, longterm_key):
5370 kdc_challenge_key = kcrypto.cf2(armor_key.key,
5371 longterm_key.key,
5372 b'kdcchallengearmor',
5373 b'challengelongterm')
5374 kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
5376 return kdc_challenge_key
5378 def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
5379 expected_type = expected_checksum['cksumtype']
5380 self.assertEqual(armor_key.ctype, expected_type)
5382 ticket_blob = self.der_encode(ticket,
5383 asn1Spec=krb5_asn1.Ticket())
5384 checksum = self.Checksum_create(armor_key,
5385 KU_FAST_FINISHED,
5386 ticket_blob)
5387 self.assertEqual(expected_checksum, checksum)
5389 def verify_ticket(self, ticket, krbtgt_keys, service_ticket,
5390 expect_pac=True,
5391 expect_ticket_checksum=True,
5392 expect_full_checksum=None):
5393 # Decrypt the ticket.
5395 key = ticket.decryption_key
5396 enc_part = ticket.ticket['enc-part']
5398 self.assertElementEqual(enc_part, 'etype', key.etype)
5399 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
5401 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
5402 enc_part = self.der_decode(
5403 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
5405 # Fetch the authorization data from the ticket.
5406 auth_data = enc_part.get('authorization-data')
5407 if expect_pac:
5408 self.assertIsNotNone(auth_data)
5409 elif auth_data is None:
5410 return
5412 # Get a copy of the authdata with an empty PAC, and the existing PAC
5413 # (if present).
5414 empty_pac = self.get_empty_pac()
5415 auth_data, pac_data = self.replace_pac(auth_data,
5416 empty_pac,
5417 expect_pac=expect_pac)
5418 if not expect_pac:
5419 return
5421 # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
5422 # raw type to create a new PAC with zeroed signatures for
5423 # verification. This is because on Windows, the resource_groups field
5424 # is added to PAC_LOGON_INFO after the info3 field has been created,
5425 # which results in a different ordering of pointer values than Samba
5426 # (see commit 0e201ecdc53). Using the raw type avoids changing
5427 # PAC_LOGON_INFO, so verification against Windows can work. We still
5428 # need the PAC_DATA type to retrieve the actual checksums, because the
5429 # signatures in the raw type may contain padding bytes.
5430 pac = ndr_unpack(krb5pac.PAC_DATA,
5431 pac_data)
5432 raw_pac = ndr_unpack(krb5pac.PAC_DATA_RAW,
5433 pac_data)
5435 checksums = {}
5437 full_checksum_buffer = None
5439 for pac_buffer, raw_pac_buffer in zip(pac.buffers, raw_pac.buffers):
5440 buffer_type = pac_buffer.type
5441 if buffer_type in self.pac_checksum_types:
5442 self.assertNotIn(buffer_type, checksums,
5443 f'Duplicate checksum type {buffer_type}')
5445 # Fetch the checksum and the checksum type from the PAC buffer.
5446 checksum = pac_buffer.info.signature
5447 ctype = pac_buffer.info.type
5448 if ctype & 1 << 31:
5449 ctype |= -1 << 31
5451 checksums[buffer_type] = checksum, ctype
5453 if buffer_type == krb5pac.PAC_TYPE_FULL_CHECKSUM:
5454 full_checksum_buffer = raw_pac_buffer
5455 elif buffer_type != krb5pac.PAC_TYPE_TICKET_CHECKSUM:
5456 # Zero the checksum field so that we can later verify the
5457 # checksums. The ticket checksum field is not zeroed.
5459 signature = ndr_unpack(
5460 krb5pac.PAC_SIGNATURE_DATA,
5461 raw_pac_buffer.info.remaining)
5462 signature.signature = bytes(len(checksum))
5463 raw_pac_buffer.info.remaining = ndr_pack(
5464 signature)
5466 # Re-encode the PAC.
5467 pac_data = ndr_pack(raw_pac)
5469 if full_checksum_buffer is not None:
5470 signature = ndr_unpack(
5471 krb5pac.PAC_SIGNATURE_DATA,
5472 full_checksum_buffer.info.remaining)
5473 signature.signature = bytes(len(checksum))
5474 full_checksum_buffer.info.remaining = ndr_pack(
5475 signature)
5477 # Re-encode the PAC.
5478 full_pac_data = ndr_pack(raw_pac)
5480 # Verify the signatures.
5482 server_checksum, server_ctype = checksums[
5483 krb5pac.PAC_TYPE_SRV_CHECKSUM]
5484 key.verify_checksum(KU_NON_KERB_CKSUM_SALT,
5485 pac_data,
5486 server_ctype,
5487 server_checksum)
5489 kdc_checksum, kdc_ctype = checksums[
5490 krb5pac.PAC_TYPE_KDC_CHECKSUM]
5492 if isinstance(krbtgt_keys, collections.abc.Container):
5493 if self.strict_checking:
5494 krbtgt_key = krbtgt_keys[0]
5495 else:
5496 krbtgt_key = next(key for key in krbtgt_keys
5497 if key.ctype == kdc_ctype)
5498 else:
5499 krbtgt_key = krbtgt_keys
5501 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
5502 server_checksum,
5503 kdc_ctype,
5504 kdc_checksum)
5506 if not service_ticket:
5507 self.assertNotIn(krb5pac.PAC_TYPE_TICKET_CHECKSUM, checksums)
5508 self.assertNotIn(krb5pac.PAC_TYPE_FULL_CHECKSUM, checksums)
5509 else:
5510 ticket_checksum, ticket_ctype = checksums.get(
5511 krb5pac.PAC_TYPE_TICKET_CHECKSUM,
5512 (None, None))
5513 if expect_ticket_checksum:
5514 self.assertIsNotNone(ticket_checksum)
5515 elif expect_ticket_checksum is False:
5516 self.assertIsNone(ticket_checksum)
5517 if ticket_checksum is not None:
5518 enc_part['authorization-data'] = auth_data
5519 enc_part = self.der_encode(enc_part,
5520 asn1Spec=krb5_asn1.EncTicketPart())
5522 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
5523 enc_part,
5524 ticket_ctype,
5525 ticket_checksum)
5527 full_checksum, full_ctype = checksums.get(
5528 krb5pac.PAC_TYPE_FULL_CHECKSUM,
5529 (None, None))
5530 if expect_full_checksum:
5531 self.assertIsNotNone(full_checksum)
5532 elif expect_full_checksum is False:
5533 self.assertIsNone(full_checksum)
5534 if full_checksum is not None:
5535 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
5536 full_pac_data,
5537 full_ctype,
5538 full_checksum)
5540 def modified_ticket(self,
5541 ticket, *,
5542 new_ticket_key=None,
5543 modify_fn=None,
5544 modify_pac_fn=None,
5545 exclude_pac=False,
5546 allow_empty_authdata=False,
5547 update_pac_checksums=True,
5548 checksum_keys=None,
5549 include_checksums=None):
5550 if checksum_keys is None:
5551 # A dict containing a key for each checksum type to be created in
5552 # the PAC.
5553 checksum_keys = {}
5555 if include_checksums is None:
5556 # A dict containing a value for each checksum type; True if the
5557 # checksum type is to be included in the PAC, False if it is to be
5558 # excluded, or None/not present if the checksum is to be included
5559 # based on its presence in the original PAC.
5560 include_checksums = {}
5562 # Check that the values passed in by the caller make sense.
5564 self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types)
5565 self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types)
5567 if exclude_pac:
5568 self.assertIsNone(modify_pac_fn)
5570 update_pac_checksums = False
5572 if not update_pac_checksums:
5573 self.assertFalse(checksum_keys)
5574 self.assertFalse(include_checksums)
5576 expect_pac = modify_pac_fn is not None
5578 key = ticket.decryption_key
5580 if new_ticket_key is None:
5581 # Use the same key to re-encrypt the ticket.
5582 new_ticket_key = key
5584 if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys:
5585 # If the server signature key is not present, fall back to the key
5586 # used to encrypt the ticket.
5587 checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key
5589 if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys:
5590 # If the ticket signature key is not present, fall back to the key
5591 # used for the KDC signature.
5592 kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
5593 if kdc_checksum_key is not None:
5594 checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = (
5595 kdc_checksum_key)
5597 if krb5pac.PAC_TYPE_FULL_CHECKSUM not in checksum_keys:
5598 # If the full signature key is not present, fall back to the key
5599 # used for the KDC signature.
5600 kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
5601 if kdc_checksum_key is not None:
5602 checksum_keys[krb5pac.PAC_TYPE_FULL_CHECKSUM] = (
5603 kdc_checksum_key)
5605 # Decrypt the ticket.
5607 enc_part = ticket.ticket['enc-part']
5609 self.assertElementEqual(enc_part, 'etype', key.etype)
5610 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
5612 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
5613 enc_part = self.der_decode(
5614 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
5616 # Modify the ticket here.
5617 if modify_fn is not None:
5618 enc_part = modify_fn(enc_part)
5620 auth_data = enc_part.get('authorization-data')
5621 if expect_pac:
5622 self.assertIsNotNone(auth_data)
5623 if auth_data is not None:
5624 new_pac = None
5625 if not exclude_pac:
5626 # Get a copy of the authdata with an empty PAC, and the
5627 # existing PAC (if present).
5628 empty_pac = self.get_empty_pac()
5629 empty_pac_auth_data, pac_data = self.replace_pac(
5630 auth_data,
5631 empty_pac,
5632 expect_pac=expect_pac)
5634 if pac_data is not None:
5635 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
5637 # Modify the PAC here.
5638 if modify_pac_fn is not None:
5639 pac = modify_pac_fn(pac)
5641 if update_pac_checksums:
5642 # Get the enc-part with an empty PAC, which is needed
5643 # to create a ticket signature.
5644 enc_part_to_sign = enc_part.copy()
5645 enc_part_to_sign['authorization-data'] = (
5646 empty_pac_auth_data)
5647 enc_part_to_sign = self.der_encode(
5648 enc_part_to_sign,
5649 asn1Spec=krb5_asn1.EncTicketPart())
5651 self.update_pac_checksums(pac,
5652 checksum_keys,
5653 include_checksums,
5654 enc_part_to_sign)
5656 # Re-encode the PAC.
5657 pac_data = ndr_pack(pac)
5658 new_pac = self.AuthorizationData_create(AD_WIN2K_PAC,
5659 pac_data)
5661 # Replace the PAC in the authorization data and re-add it to the
5662 # ticket enc-part.
5663 auth_data, _ = self.replace_pac(
5664 auth_data, new_pac,
5665 expect_pac=expect_pac,
5666 allow_empty_authdata=allow_empty_authdata)
5667 enc_part['authorization-data'] = auth_data
5669 # Re-encrypt the ticket enc-part with the new key.
5670 enc_part_new = self.der_encode(enc_part,
5671 asn1Spec=krb5_asn1.EncTicketPart())
5672 enc_part_new = self.EncryptedData_create(new_ticket_key,
5673 KU_TICKET,
5674 enc_part_new)
5676 # Create a copy of the ticket with the new enc-part.
5677 new_ticket = ticket.ticket.copy()
5678 new_ticket['enc-part'] = enc_part_new
5680 new_ticket_creds = KerberosTicketCreds(
5681 new_ticket,
5682 session_key=ticket.session_key,
5683 crealm=ticket.crealm,
5684 cname=ticket.cname,
5685 srealm=ticket.srealm,
5686 sname=ticket.sname,
5687 decryption_key=new_ticket_key,
5688 ticket_private=enc_part,
5689 encpart_private=ticket.encpart_private)
5691 return new_ticket_creds
5693 def update_pac_checksums(self,
5694 pac,
5695 checksum_keys,
5696 include_checksums,
5697 enc_part=None):
5698 pac_buffers = pac.buffers
5699 checksum_buffers = {}
5701 # Find the relevant PAC checksum buffers.
5702 for pac_buffer in pac_buffers:
5703 buffer_type = pac_buffer.type
5704 if buffer_type in self.pac_checksum_types:
5705 self.assertNotIn(buffer_type, checksum_buffers,
5706 f'Duplicate checksum type {buffer_type}')
5708 checksum_buffers[buffer_type] = pac_buffer
5710 # Create any additional buffers that were requested but not
5711 # present. Conversely, remove any buffers that were requested to be
5712 # removed.
5713 for buffer_type in self.pac_checksum_types:
5714 if buffer_type in checksum_buffers:
5715 if include_checksums.get(buffer_type) is False:
5716 checksum_buffer = checksum_buffers.pop(buffer_type)
5718 pac.num_buffers -= 1
5719 pac_buffers.remove(checksum_buffer)
5721 elif include_checksums.get(buffer_type) is True:
5722 info = krb5pac.PAC_SIGNATURE_DATA()
5724 checksum_buffer = krb5pac.PAC_BUFFER()
5725 checksum_buffer.type = buffer_type
5726 checksum_buffer.info = info
5728 pac_buffers.append(checksum_buffer)
5729 pac.num_buffers += 1
5731 checksum_buffers[buffer_type] = checksum_buffer
5733 # Fill the relevant checksum buffers.
5734 for buffer_type, checksum_buffer in checksum_buffers.items():
5735 checksum_key = checksum_keys[buffer_type]
5736 ctype = checksum_key.ctype & ((1 << 32) - 1)
5738 if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
5739 self.assertIsNotNone(enc_part)
5741 signature = checksum_key.make_rodc_checksum(
5742 KU_NON_KERB_CKSUM_SALT,
5743 enc_part)
5745 elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM:
5746 signature = checksum_key.make_zeroed_checksum()
5748 else:
5749 signature = checksum_key.make_rodc_zeroed_checksum()
5751 checksum_buffer.info.signature = signature
5752 checksum_buffer.info.type = ctype
5754 # Add the new checksum buffers to the PAC.
5755 pac.buffers = pac_buffers
5757 # Calculate the full checksum and insert it into the PAC.
5758 full_checksum_buffer = checksum_buffers.get(
5759 krb5pac.PAC_TYPE_FULL_CHECKSUM)
5760 if full_checksum_buffer is not None:
5761 full_checksum_key = checksum_keys[krb5pac.PAC_TYPE_FULL_CHECKSUM]
5763 pac_data = ndr_pack(pac)
5764 full_checksum = full_checksum_key.make_rodc_checksum(
5765 KU_NON_KERB_CKSUM_SALT,
5766 pac_data)
5768 full_checksum_buffer.info.signature = full_checksum
5770 # Calculate the server and KDC checksums and insert them into the PAC.
5772 server_checksum_buffer = checksum_buffers.get(
5773 krb5pac.PAC_TYPE_SRV_CHECKSUM)
5774 if server_checksum_buffer is not None:
5775 server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM]
5777 pac_data = ndr_pack(pac)
5778 server_checksum = server_checksum_key.make_checksum(
5779 KU_NON_KERB_CKSUM_SALT,
5780 pac_data)
5782 server_checksum_buffer.info.signature = server_checksum
5784 kdc_checksum_buffer = checksum_buffers.get(
5785 krb5pac.PAC_TYPE_KDC_CHECKSUM)
5786 if kdc_checksum_buffer is not None:
5787 if server_checksum_buffer is None:
5788 # There's no server signature to make the checksum over, so
5789 # just make the checksum over an empty bytes object.
5790 server_checksum = bytes()
5792 kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM]
5794 kdc_checksum = kdc_checksum_key.make_rodc_checksum(
5795 KU_NON_KERB_CKSUM_SALT,
5796 server_checksum)
5798 kdc_checksum_buffer.info.signature = kdc_checksum
5800 def replace_pac(self, auth_data, new_pac, expect_pac=True,
5801 allow_empty_authdata=False):
5802 if new_pac is not None:
5803 self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)
5804 self.assertElementPresent(new_pac, 'ad-data')
5806 new_auth_data = []
5808 ad_relevant = None
5809 old_pac = None
5811 for authdata_elem in auth_data:
5812 if authdata_elem['ad-type'] == AD_IF_RELEVANT:
5813 ad_relevant = self.der_decode(
5814 authdata_elem['ad-data'],
5815 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
5817 relevant_elems = []
5818 for relevant_elem in ad_relevant:
5819 if relevant_elem['ad-type'] == AD_WIN2K_PAC:
5820 self.assertIsNone(old_pac, 'Multiple PACs detected')
5821 old_pac = relevant_elem['ad-data']
5823 if new_pac is not None:
5824 relevant_elems.append(new_pac)
5825 else:
5826 relevant_elems.append(relevant_elem)
5827 if expect_pac:
5828 self.assertIsNotNone(old_pac, 'Expected PAC')
5830 if relevant_elems or allow_empty_authdata:
5831 ad_relevant = self.der_encode(
5832 relevant_elems,
5833 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
5835 authdata_elem = self.AuthorizationData_create(
5836 AD_IF_RELEVANT,
5837 ad_relevant)
5838 else:
5839 authdata_elem = None
5841 if authdata_elem is not None or allow_empty_authdata:
5842 new_auth_data.append(authdata_elem)
5844 if expect_pac:
5845 self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT')
5847 return new_auth_data, old_pac
5849 def get_pac(self, auth_data, expect_pac=True):
5850 _, pac = self.replace_pac(auth_data, None, expect_pac)
5851 return pac
5853 def get_ticket_pac(self, ticket, expect_pac=True):
5854 auth_data = ticket.ticket_private.get('authorization-data')
5855 if expect_pac:
5856 self.assertIsNotNone(auth_data)
5857 elif auth_data is None:
5858 return None
5860 return self.get_pac(auth_data, expect_pac=expect_pac)
5862 def get_krbtgt_checksum_key(self):
5863 krbtgt_creds = self.get_krbtgt_creds()
5864 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
5866 return {
5867 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
5870 def is_tgs_principal(self, principal):
5871 if self.is_tgs(principal):
5872 return True
5874 if self.kadmin_is_tgs and self.is_kadmin(principal):
5875 return True
5877 return False
5879 def is_kadmin(self, principal):
5880 name = principal['name-string'][0]
5881 return name in ('kadmin', b'kadmin')
5883 def is_tgs(self, principal):
5884 name = principal['name-string'][0]
5885 return name in ('krbtgt', b'krbtgt')
5887 def is_tgt(self, ticket):
5888 sname = ticket.ticket['sname']
5889 return self.is_tgs(sname)
5891 def get_empty_pac(self):
5892 return self.AuthorizationData_create(AD_WIN2K_PAC, bytes(1))
5894 def get_outer_pa_dict(self, kdc_exchange_dict):
5895 return self.get_pa_dict(kdc_exchange_dict['req_padata'])
5897 def get_fast_pa_dict(self, kdc_exchange_dict):
5898 req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
5900 if req_pa_dict:
5901 return req_pa_dict
5903 return self.get_outer_pa_dict(kdc_exchange_dict)
5905 def sent_fast(self, kdc_exchange_dict):
5906 outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
5908 return PADATA_FX_FAST in outer_pa_dict
5910 def sent_enc_challenge(self, kdc_exchange_dict):
5911 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
5913 return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
5915 def sent_enc_pa_rep(self, kdc_exchange_dict):
5916 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
5918 return PADATA_REQ_ENC_PA_REP in fast_pa_dict
5920 def sent_pk_as_req(self, kdc_exchange_dict):
5921 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
5923 return PADATA_PK_AS_REQ in fast_pa_dict
5925 def sent_pk_as_req_win2k(self, kdc_exchange_dict):
5926 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
5928 return PADATA_PK_AS_REP_19 in fast_pa_dict
5930 def sent_freshness(self, kdc_exchange_dict):
5931 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
5933 return PADATA_AS_FRESHNESS in fast_pa_dict
5935 def get_sent_pac_options(self, kdc_exchange_dict):
5936 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
5938 if PADATA_PAC_OPTIONS not in fast_pa_dict:
5939 return ''
5941 pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
5942 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
5943 pac_options = pac_options['options']
5945 # Mask out unsupported bits.
5946 pac_options, remaining = pac_options[:4], pac_options[4:]
5947 pac_options += '0' * len(remaining)
5949 return pac_options
5951 def get_krbtgt_sname(self):
5952 krbtgt_creds = self.get_krbtgt_creds()
5953 krbtgt_username = krbtgt_creds.get_username()
5954 krbtgt_realm = krbtgt_creds.get_realm()
5955 krbtgt_sname = self.PrincipalName_create(
5956 name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm])
5958 return krbtgt_sname
5960 def add_requester_sid(self, pac, sid):
5961 pac_buffers = pac.buffers
5963 buffer_types = [pac_buffer.type for pac_buffer in pac_buffers]
5964 self.assertNotIn(krb5pac.PAC_TYPE_REQUESTER_SID, buffer_types)
5966 requester_sid = krb5pac.PAC_REQUESTER_SID()
5967 requester_sid.sid = security.dom_sid(sid)
5969 requester_sid_buffer = krb5pac.PAC_BUFFER()
5970 requester_sid_buffer.type = krb5pac.PAC_TYPE_REQUESTER_SID
5971 requester_sid_buffer.info = requester_sid
5973 pac_buffers.append(requester_sid_buffer)
5975 pac.buffers = pac_buffers
5976 pac.num_buffers += 1
5978 return pac
5980 def modify_lifetime(self, ticket, lifetime, requester_sid=None):
5981 # Get the krbtgt key.
5982 krbtgt_creds = self.get_krbtgt_creds()
5984 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
5985 checksum_keys = {
5986 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
5989 current_time = time.time()
5991 # Set authtime and starttime to an hour in the past, to show that they
5992 # do not affect ticket rejection.
5993 start_time = self.get_KerberosTime(epoch=current_time, offset=-60 * 60)
5995 # Set the endtime of the ticket relative to our current time, so that
5996 # the ticket has 'lifetime' seconds remaining to live.
5997 end_time = self.get_KerberosTime(epoch=current_time, offset=lifetime)
5999 # Modify the times in the ticket.
6000 def modify_ticket_times(enc_part):
6001 enc_part['authtime'] = start_time
6002 if 'starttime' in enc_part:
6003 enc_part['starttime'] = start_time
6005 enc_part['endtime'] = end_time
6007 return enc_part
6009 # We have to set the times in both the ticket and the PAC, otherwise
6010 # Heimdal will complain.
6011 def modify_pac_time(pac):
6012 pac_buffers = pac.buffers
6014 for pac_buffer in pac_buffers:
6015 if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
6016 logon_time = self.get_EpochFromKerberosTime(start_time)
6017 pac_buffer.info.logon_time = unix2nttime(logon_time)
6018 break
6019 else:
6020 self.fail('failed to find LOGON_NAME PAC buffer')
6022 pac.buffers = pac_buffers
6024 return pac
6026 def modify_pac_fn(pac):
6027 if requester_sid is not None:
6028 # Add a requester SID to show that the KDC will then accept
6029 # this kpasswd ticket as if it were a TGT.
6030 pac = self.add_requester_sid(pac, sid=requester_sid)
6031 pac = modify_pac_time(pac)
6032 return pac
6034 # Do the actual modification.
6035 return self.modified_ticket(ticket,
6036 new_ticket_key=krbtgt_key,
6037 modify_fn=modify_ticket_times,
6038 modify_pac_fn=modify_pac_fn,
6039 checksum_keys=checksum_keys)
6041 def _test_as_exchange(self,
6042 cname,
6043 realm,
6044 sname,
6045 till,
6046 expected_error_mode,
6047 expected_crealm,
6048 expected_cname,
6049 expected_srealm,
6050 expected_sname,
6051 expected_salt,
6052 etypes,
6053 padata,
6054 kdc_options,
6055 creds=None,
6056 renew_time=None,
6057 expected_account_name=None,
6058 expected_groups=None,
6059 unexpected_groups=None,
6060 expected_upn_name=None,
6061 expected_sid=None,
6062 expected_domain_sid=None,
6063 expected_flags=None,
6064 unexpected_flags=None,
6065 expected_supported_etypes=None,
6066 preauth_key=None,
6067 ticket_decryption_key=None,
6068 pac_request=None,
6069 pac_options=None,
6070 expect_pac=True,
6071 expect_pac_attrs=None,
6072 expect_pac_attrs_pac_request=None,
6073 expect_requester_sid=None,
6074 expect_client_claims=None,
6075 expect_device_claims=None,
6076 expected_client_claims=None,
6077 unexpected_client_claims=None,
6078 expected_device_claims=None,
6079 unexpected_device_claims=None,
6080 expect_edata=None,
6081 expect_status=None,
6082 expected_status=None,
6083 rc4_support=True,
6084 to_rodc=False):
6086 def _generate_padata_copy(_kdc_exchange_dict,
6087 _callback_dict,
6088 req_body):
6089 return padata, req_body
6091 if not expected_error_mode:
6092 check_error_fn = None
6093 check_rep_fn = self.generic_check_kdc_rep
6094 else:
6095 check_error_fn = self.generic_check_kdc_error
6096 check_rep_fn = None
6098 if padata is not None:
6099 generate_padata_fn = _generate_padata_copy
6100 else:
6101 generate_padata_fn = None
6103 kdc_exchange_dict = self.as_exchange_dict(
6104 creds=creds,
6105 expected_crealm=expected_crealm,
6106 expected_cname=expected_cname,
6107 expected_srealm=expected_srealm,
6108 expected_sname=expected_sname,
6109 expected_account_name=expected_account_name,
6110 expected_groups=expected_groups,
6111 unexpected_groups=unexpected_groups,
6112 expected_upn_name=expected_upn_name,
6113 expected_sid=expected_sid,
6114 expected_domain_sid=expected_domain_sid,
6115 expected_supported_etypes=expected_supported_etypes,
6116 ticket_decryption_key=ticket_decryption_key,
6117 generate_padata_fn=generate_padata_fn,
6118 check_error_fn=check_error_fn,
6119 check_rep_fn=check_rep_fn,
6120 check_kdc_private_fn=self.generic_check_kdc_private,
6121 expected_error_mode=expected_error_mode,
6122 expected_salt=expected_salt,
6123 expected_flags=expected_flags,
6124 unexpected_flags=unexpected_flags,
6125 preauth_key=preauth_key,
6126 kdc_options=str(kdc_options),
6127 pac_request=pac_request,
6128 pac_options=pac_options,
6129 expect_pac=expect_pac,
6130 expect_pac_attrs=expect_pac_attrs,
6131 expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
6132 expect_requester_sid=expect_requester_sid,
6133 expect_client_claims=expect_client_claims,
6134 expect_device_claims=expect_device_claims,
6135 expected_client_claims=expected_client_claims,
6136 unexpected_client_claims=unexpected_client_claims,
6137 expected_device_claims=expected_device_claims,
6138 unexpected_device_claims=unexpected_device_claims,
6139 expect_edata=expect_edata,
6140 expect_status=expect_status,
6141 expected_status=expected_status,
6142 rc4_support=rc4_support,
6143 to_rodc=to_rodc)
6145 rep = self._generic_kdc_exchange(kdc_exchange_dict,
6146 cname=cname,
6147 realm=realm,
6148 sname=sname,
6149 till_time=till,
6150 renew_time=renew_time,
6151 etypes=etypes)
6153 return rep, kdc_exchange_dict