1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
31 from pprint
import pprint
33 from cryptography
import x509
34 from cryptography
.hazmat
.primitives
import asymmetric
, hashes
35 from cryptography
.hazmat
.primitives
.ciphers
import Cipher
, algorithms
, modes
36 from cryptography
.hazmat
.backends
import default_backend
38 from pyasn1
.codec
.der
.decoder
import decode
as pyasn1_der_decode
39 from pyasn1
.codec
.der
.encoder
import encode
as pyasn1_der_encode
40 from pyasn1
.codec
.native
.decoder
import decode
as pyasn1_native_decode
41 from pyasn1
.codec
.native
.encoder
import encode
as pyasn1_native_encode
43 from pyasn1
.codec
.ber
.encoder
import BitStringEncoder
44 import pyasn1
.type.univ
46 from pyasn1
.error
import PyAsn1Error
48 from samba
import unix2nttime
49 from samba
.credentials
import Credentials
50 from samba
.dcerpc
import claims
, krb5pac
, netlogon
, samr
, security
51 from samba
.gensec
import FEATURE_SEAL
52 from samba
.ndr
import ndr_pack
, ndr_unpack
53 from samba
.dcerpc
.misc
import (
59 from samba
.tests
import TestCase
61 import samba
.tests
.krb5
.rfc4120_pyasn1
as krb5_asn1
62 from samba
.tests
.krb5
.rfc4120_constants
import (
65 FX_FAST_ARMOR_AP_REQUEST
,
66 KDC_ERR_CLIENT_REVOKED
,
69 KDC_ERR_PREAUTH_FAILED
,
71 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
,
72 KERB_ERR_TYPE_EXTENDED
,
92 KU_NON_KERB_CKSUM_SALT
,
95 KU_TGS_REP_ENC_PART_SESSION
,
96 KU_TGS_REP_ENC_PART_SUB_KEY
,
98 KU_TGS_REQ_AUTH_CKSUM
,
99 KU_TGS_REQ_AUTH_DAT_SESSION
,
100 KU_TGS_REQ_AUTH_DAT_SUBKEY
,
106 PADATA_ENCRYPTED_CHALLENGE
,
107 PADATA_ENC_TIMESTAMP
,
122 PADATA_SUPPORTED_ETYPES
,
123 PADATA_REQ_ENC_PA_REP
125 import samba
.tests
.krb5
.kcrypto
as kcrypto
128 def BitStringEncoder_encodeValue32(
129 self
, value
, asn1Spec
, encodeFun
, **options
):
131 # BitStrings like KDCOptions or TicketFlags should at least
132 # be 32-Bit on the wire
134 if asn1Spec
is not None:
135 # TODO: try to avoid ASN.1 schema instantiation
136 value
= asn1Spec
.clone(value
)
138 valueLength
= len(value
)
140 alignedValue
= value
<< (8 - valueLength
% 8)
144 substrate
= alignedValue
.asOctets()
145 length
= len(substrate
)
146 # We need at least 32-Bit / 4-Bytes
151 ret
= b
'\x00' + substrate
+ (b
'\x00' * padding
)
152 return ret
, False, True
155 BitStringEncoder
.encodeValue
= BitStringEncoder_encodeValue32
158 def BitString_NamedValues_prettyPrint(self
, scope
=0):
159 ret
= "%s" % self
.asBinary()
162 for byte
in self
.asNumbers():
163 for bit
in [7, 6, 5, 4, 3, 2, 1, 0]:
170 if len(bits
) < highest_bit
:
171 for bitPosition
in range(len(bits
), highest_bit
):
174 delim
= ": (\n%s " % indent
175 for bitPosition
in range(highest_bit
):
176 if bitPosition
in self
.prettyPrintNamedValues
:
177 name
= self
.prettyPrintNamedValues
[bitPosition
]
178 elif bits
[bitPosition
] != 0:
179 name
= "unknown-bit-%u" % bitPosition
182 ret
+= "%s%s:%u" % (delim
, name
, bits
[bitPosition
])
183 delim
= ",\n%s " % indent
184 ret
+= "\n%s)" % indent
188 krb5_asn1
.TicketFlags
.prettyPrintNamedValues
=\
189 krb5_asn1
.TicketFlagsValues
.namedValues
190 krb5_asn1
.TicketFlags
.namedValues
=\
191 krb5_asn1
.TicketFlagsValues
.namedValues
192 krb5_asn1
.TicketFlags
.prettyPrint
=\
193 BitString_NamedValues_prettyPrint
194 krb5_asn1
.KDCOptions
.prettyPrintNamedValues
=\
195 krb5_asn1
.KDCOptionsValues
.namedValues
196 krb5_asn1
.KDCOptions
.namedValues
=\
197 krb5_asn1
.KDCOptionsValues
.namedValues
198 krb5_asn1
.KDCOptions
.prettyPrint
=\
199 BitString_NamedValues_prettyPrint
200 krb5_asn1
.APOptions
.prettyPrintNamedValues
=\
201 krb5_asn1
.APOptionsValues
.namedValues
202 krb5_asn1
.APOptions
.namedValues
=\
203 krb5_asn1
.APOptionsValues
.namedValues
204 krb5_asn1
.APOptions
.prettyPrint
=\
205 BitString_NamedValues_prettyPrint
206 krb5_asn1
.PACOptionFlags
.prettyPrintNamedValues
=\
207 krb5_asn1
.PACOptionFlagsValues
.namedValues
208 krb5_asn1
.PACOptionFlags
.namedValues
=\
209 krb5_asn1
.PACOptionFlagsValues
.namedValues
210 krb5_asn1
.PACOptionFlags
.prettyPrint
=\
211 BitString_NamedValues_prettyPrint
214 def Integer_NamedValues_prettyPrint(self
, scope
=0):
216 if intval
in self
.prettyPrintNamedValues
:
217 name
= self
.prettyPrintNamedValues
[intval
]
219 name
= "<__unknown__>"
220 ret
= "%d (0x%x) %s" % (intval
, intval
, name
)
224 krb5_asn1
.NameType
.prettyPrintNamedValues
=\
225 krb5_asn1
.NameTypeValues
.namedValues
226 krb5_asn1
.NameType
.prettyPrint
=\
227 Integer_NamedValues_prettyPrint
228 krb5_asn1
.AuthDataType
.prettyPrintNamedValues
=\
229 krb5_asn1
.AuthDataTypeValues
.namedValues
230 krb5_asn1
.AuthDataType
.prettyPrint
=\
231 Integer_NamedValues_prettyPrint
232 krb5_asn1
.PADataType
.prettyPrintNamedValues
=\
233 krb5_asn1
.PADataTypeValues
.namedValues
234 krb5_asn1
.PADataType
.prettyPrint
=\
235 Integer_NamedValues_prettyPrint
236 krb5_asn1
.EncryptionType
.prettyPrintNamedValues
=\
237 krb5_asn1
.EncryptionTypeValues
.namedValues
238 krb5_asn1
.EncryptionType
.prettyPrint
=\
239 Integer_NamedValues_prettyPrint
240 krb5_asn1
.ChecksumType
.prettyPrintNamedValues
=\
241 krb5_asn1
.ChecksumTypeValues
.namedValues
242 krb5_asn1
.ChecksumType
.prettyPrint
=\
243 Integer_NamedValues_prettyPrint
244 krb5_asn1
.KerbErrorDataType
.prettyPrintNamedValues
=\
245 krb5_asn1
.KerbErrorDataTypeValues
.namedValues
246 krb5_asn1
.KerbErrorDataType
.prettyPrint
=\
247 Integer_NamedValues_prettyPrint
250 class Krb5EncryptionKey
:
251 def __init__(self
, key
, kvno
):
253 kcrypto
.Enctype
.AES256
: kcrypto
.Cksumtype
.SHA1_AES256
,
254 kcrypto
.Enctype
.AES128
: kcrypto
.Cksumtype
.SHA1_AES128
,
255 kcrypto
.Enctype
.RC4
: kcrypto
.Cksumtype
.HMAC_MD5
,
258 self
.etype
= key
.enctype
259 self
.ctype
= EncTypeChecksum
[self
.etype
]
263 return "etype=%d ctype=%d kvno=%d key=%s" % (
264 self
.etype
, self
.ctype
, self
.kvno
, self
.key
)
266 def encrypt(self
, usage
, plaintext
):
267 ciphertext
= kcrypto
.encrypt(self
.key
, usage
, plaintext
)
270 def decrypt(self
, usage
, ciphertext
):
271 plaintext
= kcrypto
.decrypt(self
.key
, usage
, ciphertext
)
274 def make_zeroed_checksum(self
, ctype
=None):
278 checksum_len
= kcrypto
.checksum_len(ctype
)
279 return bytes(checksum_len
)
281 def make_checksum(self
, usage
, plaintext
, ctype
=None):
284 cksum
= kcrypto
.make_checksum(ctype
, self
.key
, usage
, plaintext
)
287 def verify_checksum(self
, usage
, plaintext
, ctype
, cksum
):
288 if self
.ctype
!= ctype
:
289 raise AssertionError(f
'key checksum type ({self.ctype}) != '
290 f
'checksum type ({ctype})')
292 kcrypto
.verify_checksum(ctype
,
298 def export_obj(self
):
299 EncryptionKey_obj
= {
300 'keytype': self
.etype
,
301 'keyvalue': self
.key
.contents
,
303 return EncryptionKey_obj
306 class RodcPacEncryptionKey(Krb5EncryptionKey
):
307 def __init__(self
, key
, kvno
, rodc_id
=None):
308 super().__init
__(key
, kvno
)
314 kvno
&= (1 << 16) - 1
316 rodc_id
= kvno
or None
318 if rodc_id
is not None:
319 self
.rodc_id
= rodc_id
.to_bytes(2, byteorder
='little')
323 def make_rodc_zeroed_checksum(self
, ctype
=None):
324 checksum
= super().make_zeroed_checksum(ctype
)
325 return checksum
+ bytes(len(self
.rodc_id
))
327 def make_rodc_checksum(self
, usage
, plaintext
, ctype
=None):
328 checksum
= super().make_checksum(usage
, plaintext
, ctype
)
329 return checksum
+ self
.rodc_id
331 def verify_rodc_checksum(self
, usage
, plaintext
, ctype
, cksum
):
333 cksum
, cksum_rodc_id
= cksum
[:-2], cksum
[-2:]
335 if self
.rodc_id
!= cksum_rodc_id
:
336 raise AssertionError(f
'{self.rodc_id.hex()} != '
337 f
'{cksum_rodc_id.hex()}')
339 super().verify_checksum(usage
,
345 class ZeroedChecksumKey(RodcPacEncryptionKey
):
346 def make_checksum(self
, usage
, plaintext
, ctype
=None):
347 return self
.make_zeroed_checksum(ctype
)
349 def make_rodc_checksum(self
, usage
, plaintext
, ctype
=None):
350 return self
.make_rodc_zeroed_checksum(ctype
)
353 class WrongLengthChecksumKey(RodcPacEncryptionKey
):
354 def __init__(self
, key
, kvno
, length
):
355 super().__init
__(key
, kvno
)
357 self
._length
= length
360 def _adjust_to_length(cls
, checksum
, length
):
361 diff
= length
- len(checksum
)
363 checksum
+= bytes(diff
)
365 checksum
= checksum
[:length
]
369 def make_zeroed_checksum(self
, ctype
=None):
370 return bytes(self
._length
)
372 def make_checksum(self
, usage
, plaintext
, ctype
=None):
373 checksum
= super().make_checksum(usage
, plaintext
, ctype
)
374 return self
._adjust
_to
_length
(checksum
, self
._length
)
376 def make_rodc_zeroed_checksum(self
, ctype
=None):
377 return bytes(self
._length
)
379 def make_rodc_checksum(self
, usage
, plaintext
, ctype
=None):
380 checksum
= super().make_rodc_checksum(usage
, plaintext
, ctype
)
381 return self
._adjust
_to
_length
(checksum
, self
._length
)
384 class KerberosCredentials(Credentials
):
387 security
.KERB_ENCTYPE_FAST_SUPPORTED
) |
(
388 security
.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED
) |
(
389 security
.KERB_ENCTYPE_CLAIMS_SUPPORTED
) |
(
390 security
.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED
) |
(
391 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
)
394 super(KerberosCredentials
, self
).__init
__()
396 all_enc_types |
= security
.KERB_ENCTYPE_RC4_HMAC_MD5
397 all_enc_types |
= security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
398 all_enc_types |
= security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
400 self
.as_supported_enctypes
= all_enc_types
401 self
.tgs_supported_enctypes
= all_enc_types
402 self
.ap_supported_enctypes
= all_enc_types
405 self
.forced_keys
= {}
407 self
.forced_salt
= None
413 self
.account_type
= None
415 self
._private
_key
= None
417 def set_as_supported_enctypes(self
, value
):
418 self
.as_supported_enctypes
= int(value
)
420 def set_tgs_supported_enctypes(self
, value
):
421 self
.tgs_supported_enctypes
= int(value
)
423 def set_ap_supported_enctypes(self
, value
):
424 self
.ap_supported_enctypes
= int(value
)
426 etype_map
= collections
.OrderedDict([
427 (kcrypto
.Enctype
.AES256
,
428 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
),
429 (kcrypto
.Enctype
.AES128
,
430 security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
),
431 (kcrypto
.Enctype
.RC4
,
432 security
.KERB_ENCTYPE_RC4_HMAC_MD5
),
433 (kcrypto
.Enctype
.DES_MD5
,
434 security
.KERB_ENCTYPE_DES_CBC_MD5
),
435 (kcrypto
.Enctype
.DES_CRC
,
436 security
.KERB_ENCTYPE_DES_CBC_CRC
)
440 def etypes_to_bits(cls
, etypes
):
443 bit
= cls
.etype_map
[etype
]
445 raise ValueError(f
'Got duplicate etype: {etype}')
451 def bits_to_etypes(cls
, bits
):
453 for etype
, bit
in cls
.etype_map
.items():
458 bits
&= ~cls
.non_etype_bits
460 raise ValueError(f
'Unsupported etype bits: {bits}')
464 def get_as_krb5_etypes(self
):
465 return self
.bits_to_etypes(self
.as_supported_enctypes
)
467 def get_tgs_krb5_etypes(self
):
468 return self
.bits_to_etypes(self
.tgs_supported_enctypes
)
470 def get_ap_krb5_etypes(self
):
471 return self
.bits_to_etypes(self
.ap_supported_enctypes
)
473 def set_kvno(self
, kvno
):
474 # Sign-extend from 32 bits.
482 def set_forced_key(self
, etype
, hexkey
):
484 contents
= binascii
.a2b_hex(hexkey
)
485 key
= kcrypto
.Key(etype
, contents
)
486 self
.forced_keys
[etype
] = RodcPacEncryptionKey(key
, self
.kvno
)
488 # Also set the NT hash of computer accounts for which we don’t know the
490 if etype
== kcrypto
.Enctype
.RC4
and self
.get_password() is None:
491 nt_hash
= samr
.Password()
492 nt_hash
.hash = list(contents
)
494 self
.set_nt_hash(nt_hash
)
496 def get_forced_key(self
, etype
):
498 return self
.forced_keys
.get(etype
)
500 def set_forced_salt(self
, salt
):
501 self
.forced_salt
= bytes(salt
)
503 def get_forced_salt(self
):
504 return self
.forced_salt
507 if self
.forced_salt
is not None:
508 return self
.forced_salt
512 salt_name
= upn
.rsplit('@', 1)[0].replace('/', '')
514 salt_name
= self
.get_username()
516 secure_schannel_type
= self
.get_secure_channel_type()
517 if secure_schannel_type
in [SEC_CHAN_WKSTA
,SEC_CHAN_BDC
]:
518 salt_name
= self
.get_username().lower()
519 if salt_name
[-1] == '$':
520 salt_name
= salt_name
[:-1]
521 salt_string
= '%shost%s.%s' % (
522 self
.get_realm().upper(),
524 self
.get_realm().lower())
526 salt_string
= self
.get_realm().upper() + salt_name
528 return salt_string
.encode('utf-8')
530 def set_dn(self
, dn
):
536 def set_spn(self
, spn
):
542 def set_upn(self
, upn
):
548 def set_sid(self
, sid
):
559 _
, rid
= sid
.rsplit('-', 1)
562 def set_type(self
, account_type
):
563 self
.account_type
= account_type
566 return self
.account_type
568 def update_password(self
, password
):
569 self
.set_password(password
)
570 self
.set_kvno(self
.get_kvno() + 1)
572 def get_private_key(self
):
573 if self
._private
_key
is None:
574 # Generate a new keypair.
575 self
._private
_key
= asymmetric
.rsa
.generate_private_key(
576 public_exponent
=65537,
578 backend
=default_backend()
581 return self
._private
_key
583 def get_public_key(self
):
584 return self
.get_private_key().public_key()
587 class KerberosTicketCreds
:
588 def __init__(self
, ticket
, session_key
,
589 crealm
=None, cname
=None,
590 srealm
=None, sname
=None,
593 encpart_private
=None):
595 self
.session_key
= session_key
600 self
.decryption_key
= decryption_key
601 self
.ticket_private
= ticket_private
602 self
.encpart_private
= encpart_private
604 def set_sname(self
, sname
):
605 self
.ticket
['sname'] = sname
611 PUBLIC_KEY
= object()
612 DIFFIE_HELLMAN
= object()
615 class RawKerberosTest(TestCase
):
616 """A raw Kerberos Test case."""
618 class KpasswdMode(Enum
):
622 # The location of a SID within the PAC
624 BASE_SID
= object() # in info3.base.groups
625 EXTRA_SID
= object() # in info3.sids
626 RESOURCE_SID
= object() # in resource_groups
627 PRIMARY_GID
= object() # the (sole) primary group
630 return self
.__str
__()
632 pac_checksum_types
= {krb5pac
.PAC_TYPE_SRV_CHECKSUM
,
633 krb5pac
.PAC_TYPE_KDC_CHECKSUM
,
634 krb5pac
.PAC_TYPE_TICKET_CHECKSUM
,
635 krb5pac
.PAC_TYPE_FULL_CHECKSUM
}
638 {"value": -1111, "name": "dummy", },
639 {"value": kcrypto
.Enctype
.AES256
, "name": "aes128", },
640 {"value": kcrypto
.Enctype
.AES128
, "name": "aes256", },
641 {"value": kcrypto
.Enctype
.RC4
, "name": "rc4", },
644 expect_padata_outer
= object()
646 setup_etype_test_permutations_done
= False
649 def setup_etype_test_permutations(cls
):
650 if cls
.setup_etype_test_permutations_done
:
655 num_idxs
= len(cls
.etypes_to_test
)
657 for num
in range(1, num_idxs
+ 1):
658 chunk
= list(itertools
.permutations(range(num_idxs
), num
))
661 permutations
.append(el
)
663 for p
in permutations
:
667 n
= cls
.etypes_to_test
[idx
]["name"]
672 etypes
+= (cls
.etypes_to_test
[idx
]["value"],)
674 r
= {"name": name
, "etypes": etypes
, }
677 cls
.etype_test_permutations
= res
678 cls
.setup_etype_test_permutations_done
= True
681 def etype_test_permutation_name_idx(cls
):
682 cls
.setup_etype_test_permutations()
685 for e
in cls
.etype_test_permutations
:
691 def etype_test_permutation_by_idx(self
, idx
):
692 e
= self
.etype_test_permutations
[idx
]
693 return (e
['name'], e
['etypes'])
699 cls
.host
= samba
.tests
.env_get_var_value('SERVER')
700 cls
.dc_host
= samba
.tests
.env_get_var_value('DC_SERVER')
702 # A dictionary containing credentials that have already been
706 kdc_fast_support
= samba
.tests
.env_get_var_value('FAST_SUPPORT',
708 if kdc_fast_support
is None:
709 kdc_fast_support
= '0'
710 cls
.kdc_fast_support
= bool(int(kdc_fast_support
))
712 kdc_claims_support
= samba
.tests
.env_get_var_value('CLAIMS_SUPPORT',
714 if kdc_claims_support
is None:
715 kdc_claims_support
= '0'
716 cls
.kdc_claims_support
= bool(int(kdc_claims_support
))
718 kdc_compound_id_support
= samba
.tests
.env_get_var_value(
719 'COMPOUND_ID_SUPPORT',
721 if kdc_compound_id_support
is None:
722 kdc_compound_id_support
= '0'
723 cls
.kdc_compound_id_support
= bool(int(kdc_compound_id_support
))
725 tkt_sig_support
= samba
.tests
.env_get_var_value('TKT_SIG_SUPPORT',
727 if tkt_sig_support
is None:
728 tkt_sig_support
= '1'
729 cls
.tkt_sig_support
= bool(int(tkt_sig_support
))
731 full_sig_support
= samba
.tests
.env_get_var_value('FULL_SIG_SUPPORT',
733 if full_sig_support
is None:
734 full_sig_support
= '1'
735 cls
.full_sig_support
= bool(int(full_sig_support
))
737 expect_pac
= samba
.tests
.env_get_var_value('EXPECT_PAC',
739 if expect_pac
is None:
741 cls
.expect_pac
= bool(int(expect_pac
))
743 expect_extra_pac_buffers
= samba
.tests
.env_get_var_value(
744 'EXPECT_EXTRA_PAC_BUFFERS',
746 if expect_extra_pac_buffers
is None:
747 expect_extra_pac_buffers
= '1'
748 cls
.expect_extra_pac_buffers
= bool(int(expect_extra_pac_buffers
))
750 cname_checking
= samba
.tests
.env_get_var_value('CHECK_CNAME',
752 if cname_checking
is None:
754 cls
.cname_checking
= bool(int(cname_checking
))
756 padata_checking
= samba
.tests
.env_get_var_value('CHECK_PADATA',
758 if padata_checking
is None:
759 padata_checking
= '1'
760 cls
.padata_checking
= bool(int(padata_checking
))
762 kadmin_is_tgs
= samba
.tests
.env_get_var_value('KADMIN_IS_TGS',
764 if kadmin_is_tgs
is None:
766 cls
.kadmin_is_tgs
= bool(int(kadmin_is_tgs
))
768 default_etypes
= samba
.tests
.env_get_var_value('DEFAULT_ETYPES',
770 if default_etypes
is not None:
771 default_etypes
= int(default_etypes
)
772 cls
.default_etypes
= default_etypes
774 forced_rc4
= samba
.tests
.env_get_var_value('FORCED_RC4',
776 if forced_rc4
is None:
778 cls
.forced_rc4
= bool(int(forced_rc4
))
780 expect_nt_hash
= samba
.tests
.env_get_var_value('EXPECT_NT_HASH',
782 if expect_nt_hash
is None:
784 cls
.expect_nt_hash
= bool(int(expect_nt_hash
))
786 expect_nt_status
= samba
.tests
.env_get_var_value('EXPECT_NT_STATUS',
788 if expect_nt_status
is None:
789 expect_nt_status
= '1'
790 cls
.expect_nt_status
= bool(int(expect_nt_status
))
794 self
.do_asn1_print
= False
795 self
.do_hexdump
= False
797 strict_checking
= samba
.tests
.env_get_var_value('STRICT_CHECKING',
799 if strict_checking
is None:
800 strict_checking
= '1'
801 self
.strict_checking
= bool(int(strict_checking
))
805 self
.unspecified_kvno
= object()
808 self
._disconnect
("tearDown")
811 def _disconnect(self
, reason
):
817 sys
.stderr
.write("disconnect[%s]\n" % reason
)
819 def _connect_tcp(self
, host
, port
=None):
823 self
.a
= socket
.getaddrinfo(host
, port
, socket
.AF_UNSPEC
,
824 socket
.SOCK_STREAM
, socket
.SOL_TCP
,
826 self
.s
= socket
.socket(self
.a
[0][0], self
.a
[0][1], self
.a
[0][2])
827 self
.s
.settimeout(10)
828 self
.s
.connect(self
.a
[0][4])
836 def connect(self
, host
, port
=None):
837 self
.assertNotConnected()
838 self
._connect
_tcp
(host
, port
)
840 sys
.stderr
.write("connected[%s]\n" % host
)
842 def env_get_var(self
, varname
, prefix
,
843 fallback_default
=True,
844 allow_missing
=False):
846 if prefix
is not None:
847 allow_missing_prefix
= allow_missing
or fallback_default
848 val
= samba
.tests
.env_get_var_value(
849 '%s_%s' % (prefix
, varname
),
850 allow_missing
=allow_missing_prefix
)
852 fallback_default
= True
853 if val
is None and fallback_default
:
854 val
= samba
.tests
.env_get_var_value(varname
,
855 allow_missing
=allow_missing
)
858 def _get_krb5_creds_from_env(self
, prefix
,
859 default_username
=None,
860 allow_missing_password
=False,
861 allow_missing_keys
=True,
862 require_strongest_key
=False):
863 c
= KerberosCredentials()
866 domain
= self
.env_get_var('DOMAIN', prefix
)
867 realm
= self
.env_get_var('REALM', prefix
)
868 allow_missing_username
= default_username
is not None
869 username
= self
.env_get_var('USERNAME', prefix
,
870 fallback_default
=False,
871 allow_missing
=allow_missing_username
)
873 username
= default_username
874 password
= self
.env_get_var('PASSWORD', prefix
,
875 fallback_default
=False,
876 allow_missing
=allow_missing_password
)
879 c
.set_username(username
)
880 if password
is not None:
881 c
.set_password(password
)
882 as_supported_enctypes
= self
.env_get_var('AS_SUPPORTED_ENCTYPES',
883 prefix
, allow_missing
=True)
884 if as_supported_enctypes
is not None:
885 c
.set_as_supported_enctypes(as_supported_enctypes
)
886 tgs_supported_enctypes
= self
.env_get_var('TGS_SUPPORTED_ENCTYPES',
887 prefix
, allow_missing
=True)
888 if tgs_supported_enctypes
is not None:
889 c
.set_tgs_supported_enctypes(tgs_supported_enctypes
)
890 ap_supported_enctypes
= self
.env_get_var('AP_SUPPORTED_ENCTYPES',
891 prefix
, allow_missing
=True)
892 if ap_supported_enctypes
is not None:
893 c
.set_ap_supported_enctypes(ap_supported_enctypes
)
895 if require_strongest_key
:
896 kvno_allow_missing
= False
898 aes256_allow_missing
= False
900 aes256_allow_missing
= True
902 kvno_allow_missing
= allow_missing_keys
903 aes256_allow_missing
= allow_missing_keys
904 kvno
= self
.env_get_var('KVNO', prefix
,
905 fallback_default
=False,
906 allow_missing
=kvno_allow_missing
)
908 c
.set_kvno(int(kvno
))
909 aes256_key
= self
.env_get_var('AES256_KEY_HEX', prefix
,
910 fallback_default
=False,
911 allow_missing
=aes256_allow_missing
)
912 if aes256_key
is not None:
913 c
.set_forced_key(kcrypto
.Enctype
.AES256
, aes256_key
)
914 aes128_key
= self
.env_get_var('AES128_KEY_HEX', prefix
,
915 fallback_default
=False,
917 if aes128_key
is not None:
918 c
.set_forced_key(kcrypto
.Enctype
.AES128
, aes128_key
)
919 rc4_key
= self
.env_get_var('RC4_KEY_HEX', prefix
,
920 fallback_default
=False, allow_missing
=True)
921 if rc4_key
is not None:
922 c
.set_forced_key(kcrypto
.Enctype
.RC4
, rc4_key
)
924 if not allow_missing_keys
:
925 self
.assertTrue(c
.forced_keys
,
926 'Please supply %s encryption keys '
927 'in environment' % prefix
)
931 def _get_krb5_creds(self
,
933 default_username
=None,
934 allow_missing_password
=False,
935 allow_missing_keys
=True,
936 require_strongest_key
=False,
937 fallback_creds_fn
=None):
938 if prefix
in self
.creds_dict
:
939 return self
.creds_dict
[prefix
]
941 # We don't have the credentials already
945 # Try to obtain them from the environment
946 creds
= self
._get
_krb
5_creds
_from
_env
(
948 default_username
=default_username
,
949 allow_missing_password
=allow_missing_password
,
950 allow_missing_keys
=allow_missing_keys
,
951 require_strongest_key
=require_strongest_key
)
952 except Exception as err
:
953 # An error occurred, so save it for later
956 self
.assertIsNotNone(creds
)
957 # Save the obtained credentials
958 self
.creds_dict
[prefix
] = creds
961 if fallback_creds_fn
is not None:
963 # Try to use the fallback method
964 creds
= fallback_creds_fn()
965 except Exception as err
:
966 print("ERROR FROM ENV: %r" % (env_err
))
967 print("FALLBACK-FN: %s" % (fallback_creds_fn
))
968 print("FALLBACK-ERROR: %r" % (err
))
970 self
.assertIsNotNone(creds
)
971 # Save the obtained credentials
972 self
.creds_dict
[prefix
] = creds
975 # Both methods failed, so raise the exception from the
979 def get_user_creds(self
,
980 allow_missing_password
=False,
981 allow_missing_keys
=True):
982 c
= self
._get
_krb
5_creds
(prefix
=None,
983 allow_missing_password
=allow_missing_password
,
984 allow_missing_keys
=allow_missing_keys
)
987 def get_service_creds(self
,
988 allow_missing_password
=False,
989 allow_missing_keys
=True):
990 c
= self
._get
_krb
5_creds
(prefix
='SERVICE',
991 allow_missing_password
=allow_missing_password
,
992 allow_missing_keys
=allow_missing_keys
)
995 def get_client_creds(self
,
996 allow_missing_password
=False,
997 allow_missing_keys
=True):
998 c
= self
._get
_krb
5_creds
(prefix
='CLIENT',
999 allow_missing_password
=allow_missing_password
,
1000 allow_missing_keys
=allow_missing_keys
)
1003 def get_server_creds(self
,
1004 allow_missing_password
=False,
1005 allow_missing_keys
=True):
1006 c
= self
._get
_krb
5_creds
(prefix
='SERVER',
1007 allow_missing_password
=allow_missing_password
,
1008 allow_missing_keys
=allow_missing_keys
)
1011 def get_admin_creds(self
,
1012 allow_missing_password
=False,
1013 allow_missing_keys
=True):
1014 c
= self
._get
_krb
5_creds
(prefix
='ADMIN',
1015 allow_missing_password
=allow_missing_password
,
1016 allow_missing_keys
=allow_missing_keys
)
1017 c
.set_gensec_features(c
.get_gensec_features() | FEATURE_SEAL
)
1018 c
.set_workstation('')
1021 def get_rodc_krbtgt_creds(self
,
1023 require_strongest_key
=False):
1024 if require_strongest_key
:
1025 self
.assertTrue(require_keys
)
1026 c
= self
._get
_krb
5_creds
(prefix
='RODC_KRBTGT',
1027 allow_missing_password
=True,
1028 allow_missing_keys
=not require_keys
,
1029 require_strongest_key
=require_strongest_key
)
1032 def get_krbtgt_creds(self
,
1034 require_strongest_key
=False):
1035 if require_strongest_key
:
1036 self
.assertTrue(require_keys
)
1037 c
= self
._get
_krb
5_creds
(prefix
='KRBTGT',
1038 default_username
='krbtgt',
1039 allow_missing_password
=True,
1040 allow_missing_keys
=not require_keys
,
1041 require_strongest_key
=require_strongest_key
)
1044 def get_anon_creds(self
):
1049 # Overridden by KDCBaseTest. At this level we don't know what actual
1050 # enctypes are supported, so the best we can do is go by whether NT hashes
1051 # are expected and whether the account is a workstation or not. This
1052 # matches the behaviour that tests expect by default.
1053 def get_default_enctypes(self
, creds
):
1054 self
.assertIsNotNone(creds
)
1056 default_enctypes
= [
1057 kcrypto
.Enctype
.AES256
,
1058 kcrypto
.Enctype
.AES128
,
1061 if self
.expect_nt_hash
or creds
.get_workstation():
1062 default_enctypes
.append(kcrypto
.Enctype
.RC4
)
1064 return default_enctypes
1066 def asn1_dump(self
, name
, obj
, asn1_print
=None):
1067 if asn1_print
is None:
1068 asn1_print
= self
.do_asn1_print
1070 if name
is not None:
1071 sys
.stderr
.write("%s:\n%s" % (name
, obj
))
1073 sys
.stderr
.write("%s" % (obj
))
1075 def hex_dump(self
, name
, blob
, hexdump
=None):
1077 hexdump
= self
.do_hexdump
1080 "%s: %d\n%s" % (name
, len(blob
), self
.hexdump(blob
)))
1089 if asn1Spec
is not None:
1090 class_name
= type(asn1Spec
).__name
__.split(':')[0]
1092 class_name
= "<None-asn1Spec>"
1093 self
.hex_dump(class_name
, blob
, hexdump
=hexdump
)
1094 obj
, _
= pyasn1_der_decode(blob
, asn1Spec
=asn1Spec
)
1095 self
.asn1_dump(None, obj
, asn1_print
=asn1_print
)
1097 obj
= pyasn1_native_encode(obj
)
1108 obj
= pyasn1_native_decode(obj
, asn1Spec
=asn1Spec
)
1109 class_name
= type(obj
).__name
__.split(':')[0]
1110 if class_name
is not None:
1111 self
.asn1_dump(None, obj
, asn1_print
=asn1_print
)
1112 blob
= pyasn1_der_encode(obj
)
1113 if class_name
is not None:
1114 self
.hex_dump(class_name
, blob
, hexdump
=hexdump
)
1117 def send_pdu(self
, req
, asn1_print
=None, hexdump
=None):
1118 k5_pdu
= self
.der_encode(
1119 req
, native_decode
=False, asn1_print
=asn1_print
, hexdump
=False)
1120 self
.send_msg(k5_pdu
, hexdump
=hexdump
)
1122 def send_msg(self
, msg
, hexdump
=None):
1123 header
= struct
.pack('>I', len(msg
))
1126 self
.hex_dump("send_msg", header
, hexdump
=hexdump
)
1127 self
.hex_dump("send_msg", msg
, hexdump
=hexdump
)
1131 sent
= self
.s
.send(req_pdu
, 0)
1132 if sent
== len(req_pdu
):
1134 req_pdu
= req_pdu
[sent
:]
1135 except socket
.error
as e
:
1136 self
._disconnect
("send_msg: %s" % e
)
1138 except IOError as e
:
1139 self
._disconnect
("send_msg: %s" % e
)
1142 def recv_raw(self
, num_recv
=0xffff, hexdump
=None, timeout
=None):
1145 if timeout
is not None:
1146 self
.s
.settimeout(timeout
)
1147 rep_pdu
= self
.s
.recv(num_recv
, 0)
1148 self
.s
.settimeout(10)
1149 if len(rep_pdu
) == 0:
1150 self
._disconnect
("recv_raw: EOF")
1152 self
.hex_dump("recv_raw", rep_pdu
, hexdump
=hexdump
)
1153 except socket
.timeout
:
1154 self
.s
.settimeout(10)
1155 sys
.stderr
.write("recv_raw: TIMEOUT\n")
1156 except socket
.error
as e
:
1157 self
._disconnect
("recv_raw: %s" % e
)
1159 except IOError as e
:
1160 self
._disconnect
("recv_raw: %s" % e
)
1164 def recv_pdu_raw(self
, asn1_print
=None, hexdump
=None, timeout
=None):
1165 raw_pdu
= self
.recv_raw(
1166 num_recv
=4, hexdump
=hexdump
, timeout
=timeout
)
1169 header
= struct
.unpack(">I", raw_pdu
[0:4])
1176 raw_pdu
= self
.recv_raw(
1177 num_recv
=missing
, hexdump
=hexdump
, timeout
=timeout
)
1178 self
.assertGreaterEqual(len(raw_pdu
), 1)
1180 missing
= k5_len
- len(rep_pdu
)
1183 def recv_reply(self
, asn1_print
=None, hexdump
=None, timeout
=None):
1184 rep_pdu
= self
.recv_pdu_raw(asn1_print
=asn1_print
,
1188 return None, rep_pdu
1189 k5_raw
= self
.der_decode(
1192 native_encode
=False,
1195 pvno
= k5_raw
['field-0']
1196 self
.assertEqual(pvno
, 5)
1197 msg_type
= k5_raw
['field-1']
1198 self
.assertIn(msg_type
, [KRB_AS_REP
, KRB_TGS_REP
, KRB_ERROR
])
1199 if msg_type
== KRB_AS_REP
:
1200 asn1Spec
= krb5_asn1
.AS_REP()
1201 elif msg_type
== KRB_TGS_REP
:
1202 asn1Spec
= krb5_asn1
.TGS_REP()
1203 elif msg_type
== KRB_ERROR
:
1204 asn1Spec
= krb5_asn1
.KRB_ERROR()
1205 rep
= self
.der_decode(rep_pdu
, asn1Spec
=asn1Spec
,
1206 asn1_print
=asn1_print
, hexdump
=False)
1207 return (rep
, rep_pdu
)
1209 def recv_pdu(self
, asn1_print
=None, hexdump
=None, timeout
=None):
1210 (rep
, rep_pdu
) = self
.recv_reply(asn1_print
=asn1_print
,
1215 def assertIsConnected(self
):
1216 self
.assertIsNotNone(self
.s
, msg
="Not connected")
1218 def assertNotConnected(self
):
1219 self
.assertIsNone(self
.s
, msg
="Is connected")
1221 def send_recv_transaction(
1228 host
= self
.host
if to_rodc
else self
.dc_host
1231 self
.send_pdu(req
, asn1_print
=asn1_print
, hexdump
=hexdump
)
1232 rep
= self
.recv_pdu(
1233 asn1_print
=asn1_print
, hexdump
=hexdump
, timeout
=timeout
)
1235 self
._disconnect
("transaction failed")
1237 self
._disconnect
("transaction done")
1240 def getElementValue(self
, obj
, elem
):
1241 return obj
.get(elem
)
1243 def assertElementMissing(self
, obj
, elem
):
1244 v
= self
.getElementValue(obj
, elem
)
1245 self
.assertIsNone(v
)
1247 def assertElementPresent(self
, obj
, elem
, expect_empty
=False):
1248 v
= self
.getElementValue(obj
, elem
)
1249 self
.assertIsNotNone(v
)
1250 if self
.strict_checking
:
1251 if isinstance(v
, collections
.abc
.Container
):
1253 self
.assertEqual(0, len(v
))
1255 self
.assertNotEqual(0, len(v
))
1257 def assertElementEqual(self
, obj
, elem
, value
):
1258 v
= self
.getElementValue(obj
, elem
)
1259 self
.assertIsNotNone(v
)
1260 self
.assertEqual(v
, value
)
1262 def assertElementEqualUTF8(self
, obj
, elem
, value
):
1263 v
= self
.getElementValue(obj
, elem
)
1264 self
.assertIsNotNone(v
)
1265 self
.assertEqual(v
, bytes(value
, 'utf8'))
1267 def assertPrincipalEqual(self
, princ1
, princ2
):
1268 self
.assertEqual(princ1
['name-type'], princ2
['name-type'])
1270 len(princ1
['name-string']),
1271 len(princ2
['name-string']),
1272 msg
="princ1=%s != princ2=%s" % (princ1
, princ2
))
1273 for idx
in range(len(princ1
['name-string'])):
1275 princ1
['name-string'][idx
],
1276 princ2
['name-string'][idx
],
1277 msg
="princ1=%s != princ2=%s" % (princ1
, princ2
))
1279 def assertElementEqualPrincipal(self
, obj
, elem
, value
):
1280 v
= self
.getElementValue(obj
, elem
)
1281 self
.assertIsNotNone(v
)
1282 v
= pyasn1_native_decode(v
, asn1Spec
=krb5_asn1
.PrincipalName())
1283 self
.assertPrincipalEqual(v
, value
)
1285 def assertElementKVNO(self
, obj
, elem
, value
):
1286 v
= self
.getElementValue(obj
, elem
)
1287 if value
== "autodetect":
1289 if value
is not None:
1290 self
.assertIsNotNone(v
)
1291 # The value on the wire should never be 0
1292 self
.assertNotEqual(v
, 0)
1293 # unspecified_kvno means we don't know the kvno,
1294 # but want to enforce its presence
1295 if value
is not self
.unspecified_kvno
:
1297 self
.assertNotEqual(value
, 0)
1298 self
.assertEqual(v
, value
)
1300 self
.assertIsNone(v
)
1302 def assertElementFlags(self
, obj
, elem
, expected
, unexpected
):
1303 v
= self
.getElementValue(obj
, elem
)
1304 self
.assertIsNotNone(v
)
1305 if expected
is not None:
1306 self
.assertIsInstance(expected
, krb5_asn1
.TicketFlags
)
1307 for i
, flag
in enumerate(expected
):
1309 self
.assertEqual('1', v
[i
],
1310 f
"'{expected.namedValues[i]}' "
1312 if unexpected
is not None:
1313 self
.assertIsInstance(unexpected
, krb5_asn1
.TicketFlags
)
1314 for i
, flag
in enumerate(unexpected
):
1316 self
.assertEqual('0', v
[i
],
1317 f
"'{unexpected.namedValues[i]}' "
1318 f
"unexpected in {v}")
1320 def assertSequenceElementsEqual(self
, expected
, got
, *,
1321 require_strict
=None,
1323 require_ordered
=True):
1324 if self
.strict_checking
and require_ordered
and not unchecked
:
1325 self
.assertEqual(expected
, got
)
1327 fail_msg
= f
'expected: {expected} got: {got}'
1331 ignored
.update(unchecked
)
1332 if require_strict
and not self
.strict_checking
:
1333 ignored
.update(require_strict
)
1336 fail_msg
+= f
' (ignoring: {ignored})'
1337 expected
= (x
for x
in expected
if x
not in ignored
)
1338 got
= (x
for x
in got
if x
not in ignored
)
1340 self
.assertCountEqual(expected
, got
, fail_msg
)
1342 def get_KerberosTimeWithUsec(self
, epoch
=None, offset
=None):
1345 if offset
is not None:
1346 epoch
= epoch
+ int(offset
)
1347 dt
= datetime
.datetime
.fromtimestamp(epoch
, tz
=datetime
.timezone
.utc
)
1348 return (dt
.strftime("%Y%m%d%H%M%SZ"), dt
.microsecond
)
1350 def get_KerberosTime(self
, epoch
=None, offset
=None):
1351 (s
, _
) = self
.get_KerberosTimeWithUsec(epoch
=epoch
, offset
=offset
)
1354 def get_EpochFromKerberosTime(self
, kerberos_time
):
1355 if isinstance(kerberos_time
, bytes
):
1356 kerberos_time
= kerberos_time
.decode()
1358 epoch
= datetime
.datetime
.strptime(kerberos_time
,
1360 epoch
= epoch
.replace(tzinfo
=datetime
.timezone
.utc
)
1361 epoch
= int(epoch
.timestamp())
1365 def get_Nonce(self
):
1366 nonce_min
= 0x7f000000
1367 nonce_max
= 0x7fffffff
1368 v
= random
.randint(nonce_min
, nonce_max
)
1371 def get_pa_dict(self
, pa_data
):
1374 if pa_data
is not None:
1376 pa_type
= pa
['padata-type']
1377 if pa_type
in pa_dict
:
1378 raise RuntimeError(f
'Duplicate type {pa_type}')
1379 pa_dict
[pa_type
] = pa
['padata-value']
1383 def SessionKey_create(self
, etype
, contents
, kvno
=None):
1384 key
= kcrypto
.Key(etype
, contents
)
1385 return RodcPacEncryptionKey(key
, kvno
)
1387 def PasswordKey_create(self
, etype
=None, pwd
=None, salt
=None, kvno
=None,
1389 self
.assertIsNotNone(pwd
)
1390 self
.assertIsNotNone(salt
)
1391 key
= kcrypto
.string_to_key(etype
, pwd
, salt
, params
=params
)
1392 return RodcPacEncryptionKey(key
, kvno
)
1394 def PasswordKey_from_etype_info2(self
, creds
, etype_info2
, kvno
=None):
1395 e
= etype_info2
['etype']
1396 salt
= etype_info2
.get('salt')
1397 params
= etype_info2
.get('s2kparams')
1398 return self
.PasswordKey_from_etype(creds
, e
,
1403 def PasswordKey_from_creds(self
, creds
, etype
):
1404 kvno
= creds
.get_kvno()
1405 salt
= creds
.get_salt()
1406 return self
.PasswordKey_from_etype(creds
, etype
,
1410 def PasswordKey_from_etype(self
, creds
, etype
, kvno
=None, salt
=None, params
=None):
1411 if etype
== kcrypto
.Enctype
.RC4
:
1412 nthash
= creds
.get_nt_hash()
1413 return self
.SessionKey_create(etype
=etype
, contents
=nthash
, kvno
=kvno
)
1415 password
= creds
.get_password().encode('utf-8')
1416 return self
.PasswordKey_create(
1417 etype
=etype
, pwd
=password
, salt
=salt
, kvno
=kvno
)
1419 def TicketDecryptionKey_from_creds(self
, creds
, etype
=None):
1422 etypes
= creds
.get_tgs_krb5_etypes()
1423 if etypes
and etypes
[0] not in (kcrypto
.Enctype
.DES_CRC
,
1424 kcrypto
.Enctype
.DES_MD5
):
1427 etype
= kcrypto
.Enctype
.RC4
1429 forced_key
= creds
.get_forced_key(etype
)
1430 if forced_key
is not None:
1433 kvno
= creds
.get_kvno()
1435 fail_msg
= ("%s has no fixed key for etype[%s] kvno[%s] "
1436 "nor a password specified, " % (
1437 creds
.get_username(), etype
, kvno
))
1439 if etype
== kcrypto
.Enctype
.RC4
:
1440 nthash
= creds
.get_nt_hash()
1441 self
.assertIsNotNone(nthash
, msg
=fail_msg
)
1442 return self
.SessionKey_create(etype
=etype
,
1446 password
= creds
.get_password()
1447 self
.assertIsNotNone(password
, msg
=fail_msg
)
1448 salt
= creds
.get_salt()
1449 return self
.PasswordKey_create(etype
=etype
,
1454 def RandomKey(self
, etype
):
1455 e
= kcrypto
._get
_enctype
_profile
(etype
)
1456 contents
= samba
.generate_random_bytes(e
.keysize
)
1457 return self
.SessionKey_create(etype
=etype
, contents
=contents
)
1459 def EncryptionKey_import(self
, EncryptionKey_obj
):
1460 return self
.SessionKey_create(EncryptionKey_obj
['keytype'],
1461 EncryptionKey_obj
['keyvalue'])
1463 def EncryptedData_create(self
, key
, usage
, plaintext
):
1464 # EncryptedData ::= SEQUENCE {
1465 # etype [0] Int32 -- EncryptionType --,
1466 # kvno [1] Int32 OPTIONAL,
1467 # cipher [2] OCTET STRING -- ciphertext
1469 ciphertext
= key
.encrypt(usage
, plaintext
)
1470 EncryptedData_obj
= {
1472 'cipher': ciphertext
1474 if key
.kvno
is not None:
1475 EncryptedData_obj
['kvno'] = key
.kvno
1476 return EncryptedData_obj
1478 def Checksum_create(self
, key
, usage
, plaintext
, ctype
=None):
1479 # Checksum ::= SEQUENCE {
1480 # cksumtype [0] Int32,
1481 # checksum [1] OCTET STRING
1485 checksum
= key
.make_checksum(usage
, plaintext
, ctype
=ctype
)
1488 'checksum': checksum
,
1493 def PrincipalName_create(cls
, name_type
, names
):
1494 # PrincipalName ::= SEQUENCE {
1495 # name-type [0] Int32,
1496 # name-string [1] SEQUENCE OF KerberosString
1498 PrincipalName_obj
= {
1499 'name-type': name_type
,
1500 'name-string': names
,
1502 return PrincipalName_obj
1504 def AuthorizationData_create(self
, ad_type
, ad_data
):
1505 # AuthorizationData ::= SEQUENCE {
1506 # ad-type [0] Int32,
1507 # ad-data [1] OCTET STRING
1513 return AUTH_DATA_obj
1515 def PA_DATA_create(self
, padata_type
, padata_value
):
1516 # PA-DATA ::= SEQUENCE {
1517 # -- NOTE: first tag is [1], not [0]
1518 # padata-type [1] Int32,
1519 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1522 'padata-type': padata_type
,
1523 'padata-value': padata_value
,
1527 def PA_ENC_TS_ENC_create(self
, ts
, usec
):
1528 # PA-ENC-TS-ENC ::= SEQUENCE {
1529 # patimestamp[0] KerberosTime, -- client's time
1530 # pausec[1] krb5int32 OPTIONAL
1532 PA_ENC_TS_ENC_obj
= {
1536 return PA_ENC_TS_ENC_obj
1538 def PA_PAC_OPTIONS_create(self
, options
):
1539 # PA-PAC-OPTIONS ::= SEQUENCE {
1540 # options [0] PACOptionFlags
1542 PA_PAC_OPTIONS_obj
= {
1545 return PA_PAC_OPTIONS_obj
1547 def KRB_FAST_ARMOR_create(self
, armor_type
, armor_value
):
1548 # KrbFastArmor ::= SEQUENCE {
1549 # armor-type [0] Int32,
1550 # armor-value [1] OCTET STRING,
1553 KRB_FAST_ARMOR_obj
= {
1554 'armor-type': armor_type
,
1555 'armor-value': armor_value
1557 return KRB_FAST_ARMOR_obj
1559 def KRB_FAST_REQ_create(self
, fast_options
, padata
, req_body
):
1560 # KrbFastReq ::= SEQUENCE {
1561 # fast-options [0] FastOptions,
1562 # padata [1] SEQUENCE OF PA-DATA,
1563 # req-body [2] KDC-REQ-BODY,
1566 KRB_FAST_REQ_obj
= {
1567 'fast-options': fast_options
,
1569 'req-body': req_body
1571 return KRB_FAST_REQ_obj
1573 def KRB_FAST_ARMORED_REQ_create(self
, armor
, req_checksum
, enc_fast_req
):
1574 # KrbFastArmoredReq ::= SEQUENCE {
1575 # armor [0] KrbFastArmor OPTIONAL,
1576 # req-checksum [1] Checksum,
1577 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1579 KRB_FAST_ARMORED_REQ_obj
= {
1580 'req-checksum': req_checksum
,
1581 'enc-fast-req': enc_fast_req
1583 if armor
is not None:
1584 KRB_FAST_ARMORED_REQ_obj
['armor'] = armor
1585 return KRB_FAST_ARMORED_REQ_obj
1587 def PA_FX_FAST_REQUEST_create(self
, armored_data
):
1588 # PA-FX-FAST-REQUEST ::= CHOICE {
1589 # armored-data [0] KrbFastArmoredReq,
1592 PA_FX_FAST_REQUEST_obj
= {
1593 'armored-data': armored_data
1595 return PA_FX_FAST_REQUEST_obj
1597 def KERB_PA_PAC_REQUEST_create(self
, include_pac
, pa_data_create
=True):
1598 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1599 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1601 # --If FALSE, and PAC present,
1604 KERB_PA_PAC_REQUEST_obj
= {
1605 'include-pac': include_pac
,
1607 if not pa_data_create
:
1608 return KERB_PA_PAC_REQUEST_obj
1609 pa_pac
= self
.der_encode(KERB_PA_PAC_REQUEST_obj
,
1610 asn1Spec
=krb5_asn1
.KERB_PA_PAC_REQUEST())
1611 pa_data
= self
.PA_DATA_create(PADATA_PAC_REQUEST
, pa_pac
)
1614 def get_pa_pac_options(self
, options
):
1615 pac_options
= self
.PA_PAC_OPTIONS_create(options
)
1616 pac_options
= self
.der_encode(pac_options
,
1617 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
1618 pac_options
= self
.PA_DATA_create(PADATA_PAC_OPTIONS
, pac_options
)
1622 def KDC_REQ_BODY_create(self
,
1634 EncAuthorizationData
,
1635 EncAuthorizationData_key
,
1636 EncAuthorizationData_usage
,
1639 # KDC-REQ-BODY ::= SEQUENCE {
1640 # kdc-options [0] KDCOptions,
1641 # cname [1] PrincipalName OPTIONAL
1642 # -- Used only in AS-REQ --,
1645 # -- Also client's in AS-REQ --,
1646 # sname [3] PrincipalName OPTIONAL,
1647 # from [4] KerberosTime OPTIONAL,
1648 # till [5] KerberosTime,
1649 # rtime [6] KerberosTime OPTIONAL,
1651 # etype [8] SEQUENCE OF Int32
1653 # -- in preference order --,
1654 # addresses [9] HostAddresses OPTIONAL,
1655 # enc-authorization-data [10] EncryptedData OPTIONAL
1656 # -- AuthorizationData --,
1657 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1658 # -- NOTE: not empty
1660 if EncAuthorizationData
is not None:
1661 enc_ad_plain
= self
.der_encode(
1662 EncAuthorizationData
,
1663 asn1Spec
=krb5_asn1
.AuthorizationData(),
1664 asn1_print
=asn1_print
,
1666 enc_ad
= self
.EncryptedData_create(EncAuthorizationData_key
,
1667 EncAuthorizationData_usage
,
1671 KDC_REQ_BODY_obj
= {
1672 'kdc-options': kdc_options
,
1678 if cname
is not None:
1679 KDC_REQ_BODY_obj
['cname'] = cname
1680 if sname
is not None:
1681 KDC_REQ_BODY_obj
['sname'] = sname
1682 if from_time
is not None:
1683 KDC_REQ_BODY_obj
['from'] = from_time
1684 if renew_time
is not None:
1685 KDC_REQ_BODY_obj
['rtime'] = renew_time
1686 if addresses
is not None:
1687 KDC_REQ_BODY_obj
['addresses'] = addresses
1688 if enc_ad
is not None:
1689 KDC_REQ_BODY_obj
['enc-authorization-data'] = enc_ad
1690 if additional_tickets
is not None:
1691 KDC_REQ_BODY_obj
['additional-tickets'] = additional_tickets
1692 return KDC_REQ_BODY_obj
1694 def KDC_REQ_create(self
,
1701 # KDC-REQ ::= SEQUENCE {
1702 # -- NOTE: first tag is [1], not [0]
1703 # pvno [1] INTEGER (5) ,
1704 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1705 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1706 # -- NOTE: not empty --,
1707 # req-body [4] KDC-REQ-BODY
1712 'msg-type': msg_type
,
1713 'req-body': req_body
,
1715 if padata
is not None:
1716 KDC_REQ_obj
['padata'] = padata
1717 if asn1Spec
is not None:
1718 KDC_REQ_decoded
= pyasn1_native_decode(
1719 KDC_REQ_obj
, asn1Spec
=asn1Spec
)
1721 KDC_REQ_decoded
= None
1722 return KDC_REQ_obj
, KDC_REQ_decoded
1724 def AS_REQ_create(self
,
1726 kdc_options
, # required
1730 from_time
, # optional
1731 till_time
, # required
1732 renew_time
, # optional
1735 addresses
, # optional
1737 native_decoded_only
=True,
1740 # KDC-REQ ::= SEQUENCE {
1741 # -- NOTE: first tag is [1], not [0]
1742 # pvno [1] INTEGER (5) ,
1743 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1744 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1745 # -- NOTE: not empty --,
1746 # req-body [4] KDC-REQ-BODY
1749 # KDC-REQ-BODY ::= SEQUENCE {
1750 # kdc-options [0] KDCOptions,
1751 # cname [1] PrincipalName OPTIONAL
1752 # -- Used only in AS-REQ --,
1755 # -- Also client's in AS-REQ --,
1756 # sname [3] PrincipalName OPTIONAL,
1757 # from [4] KerberosTime OPTIONAL,
1758 # till [5] KerberosTime,
1759 # rtime [6] KerberosTime OPTIONAL,
1761 # etype [8] SEQUENCE OF Int32
1763 # -- in preference order --,
1764 # addresses [9] HostAddresses OPTIONAL,
1765 # enc-authorization-data [10] EncryptedData OPTIONAL
1766 # -- AuthorizationData --,
1767 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1768 # -- NOTE: not empty
1770 KDC_REQ_BODY_obj
= self
.KDC_REQ_BODY_create(
1782 EncAuthorizationData
=None,
1783 EncAuthorizationData_key
=None,
1784 EncAuthorizationData_usage
=None,
1785 asn1_print
=asn1_print
,
1787 obj
, decoded
= self
.KDC_REQ_create(
1788 msg_type
=KRB_AS_REQ
,
1790 req_body
=KDC_REQ_BODY_obj
,
1791 asn1Spec
=krb5_asn1
.AS_REQ(),
1792 asn1_print
=asn1_print
,
1794 if native_decoded_only
:
1798 def AP_REQ_create(self
, ap_options
, ticket
, authenticator
):
1799 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1800 # pvno [0] INTEGER (5),
1801 # msg-type [1] INTEGER (14),
1802 # ap-options [2] APOptions,
1803 # ticket [3] Ticket,
1804 # authenticator [4] EncryptedData -- Authenticator
1808 'msg-type': KRB_AP_REQ
,
1809 'ap-options': ap_options
,
1811 'authenticator': authenticator
,
1815 def Authenticator_create(
1816 self
, crealm
, cname
, cksum
, cusec
, ctime
, subkey
, seq_number
,
1817 authorization_data
):
1818 # -- Unencrypted authenticator
1819 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1820 # authenticator-vno [0] INTEGER (5),
1822 # cname [2] PrincipalName,
1823 # cksum [3] Checksum OPTIONAL,
1824 # cusec [4] Microseconds,
1825 # ctime [5] KerberosTime,
1826 # subkey [6] EncryptionKey OPTIONAL,
1827 # seq-number [7] UInt32 OPTIONAL,
1828 # authorization-data [8] AuthorizationData OPTIONAL
1830 Authenticator_obj
= {
1831 'authenticator-vno': 5,
1837 if cksum
is not None:
1838 Authenticator_obj
['cksum'] = cksum
1839 if subkey
is not None:
1840 Authenticator_obj
['subkey'] = subkey
1841 if seq_number
is not None:
1842 Authenticator_obj
['seq-number'] = seq_number
1843 if authorization_data
is not None:
1844 Authenticator_obj
['authorization-data'] = authorization_data
1845 return Authenticator_obj
1847 def PKAuthenticator_create(self
,
1853 freshness_token
=None,
1856 win2k_variant
=False):
1858 self
.assertIsNone(pa_checksum
)
1859 self
.assertIsNone(freshness_token
)
1860 self
.assertIsNotNone(kdc_name
)
1861 self
.assertIsNotNone(kdc_realm
)
1863 self
.assertIsNone(kdc_name
)
1864 self
.assertIsNone(kdc_realm
)
1866 pk_authenticator_obj
= {
1871 if pa_checksum
is not None:
1872 pk_authenticator_obj
['paChecksum'] = pa_checksum
1873 if freshness_token
is not None:
1874 pk_authenticator_obj
['freshnessToken'] = freshness_token
1875 if kdc_name
is not None:
1876 pk_authenticator_obj
['kdcName'] = kdc_name
1877 if kdc_realm
is not None:
1878 pk_authenticator_obj
['kdcRealm'] = kdc_realm
1880 return pk_authenticator_obj
1882 def TGS_REQ_create(self
,
1887 kdc_options
, # required
1891 from_time
, # optional
1892 till_time
, # required
1893 renew_time
, # optional
1896 addresses
, # optional
1897 EncAuthorizationData
,
1898 EncAuthorizationData_key
,
1901 authenticator_subkey
=None,
1902 body_checksum_type
=None,
1903 native_decoded_only
=True,
1906 # KDC-REQ ::= SEQUENCE {
1907 # -- NOTE: first tag is [1], not [0]
1908 # pvno [1] INTEGER (5) ,
1909 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1910 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1911 # -- NOTE: not empty --,
1912 # req-body [4] KDC-REQ-BODY
1915 # KDC-REQ-BODY ::= SEQUENCE {
1916 # kdc-options [0] KDCOptions,
1917 # cname [1] PrincipalName OPTIONAL
1918 # -- Used only in AS-REQ --,
1921 # -- Also client's in AS-REQ --,
1922 # sname [3] PrincipalName OPTIONAL,
1923 # from [4] KerberosTime OPTIONAL,
1924 # till [5] KerberosTime,
1925 # rtime [6] KerberosTime OPTIONAL,
1927 # etype [8] SEQUENCE OF Int32
1929 # -- in preference order --,
1930 # addresses [9] HostAddresses OPTIONAL,
1931 # enc-authorization-data [10] EncryptedData OPTIONAL
1932 # -- AuthorizationData --,
1933 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1934 # -- NOTE: not empty
1937 if authenticator_subkey
is not None:
1938 EncAuthorizationData_usage
= KU_TGS_REQ_AUTH_DAT_SUBKEY
1940 EncAuthorizationData_usage
= KU_TGS_REQ_AUTH_DAT_SESSION
1942 req_body
= self
.KDC_REQ_BODY_create(
1943 kdc_options
=kdc_options
,
1947 from_time
=from_time
,
1948 till_time
=till_time
,
1949 renew_time
=renew_time
,
1952 addresses
=addresses
,
1953 additional_tickets
=additional_tickets
,
1954 EncAuthorizationData
=EncAuthorizationData
,
1955 EncAuthorizationData_key
=EncAuthorizationData_key
,
1956 EncAuthorizationData_usage
=EncAuthorizationData_usage
)
1957 req_body_blob
= self
.der_encode(req_body
,
1958 asn1Spec
=krb5_asn1
.KDC_REQ_BODY(),
1959 asn1_print
=asn1_print
, hexdump
=hexdump
)
1961 req_body_checksum
= self
.Checksum_create(ticket_session_key
,
1962 KU_TGS_REQ_AUTH_CKSUM
,
1964 ctype
=body_checksum_type
)
1967 if authenticator_subkey
is not None:
1968 subkey_obj
= authenticator_subkey
.export_obj()
1969 seq_number
= random
.randint(0, 0xfffffffe)
1970 authenticator
= self
.Authenticator_create(
1973 cksum
=req_body_checksum
,
1977 seq_number
=seq_number
,
1978 authorization_data
=None)
1979 authenticator
= self
.der_encode(
1981 asn1Spec
=krb5_asn1
.Authenticator(),
1982 asn1_print
=asn1_print
,
1985 authenticator
= self
.EncryptedData_create(
1986 ticket_session_key
, KU_TGS_REQ_AUTH
, authenticator
)
1988 ap_options
= krb5_asn1
.APOptions('0')
1989 ap_req
= self
.AP_REQ_create(ap_options
=str(ap_options
),
1991 authenticator
=authenticator
)
1992 ap_req
= self
.der_encode(ap_req
, asn1Spec
=krb5_asn1
.AP_REQ(),
1993 asn1_print
=asn1_print
, hexdump
=hexdump
)
1994 pa_tgs_req
= self
.PA_DATA_create(PADATA_KDC_REQ
, ap_req
)
1995 if padata
is not None:
1996 padata
.append(pa_tgs_req
)
1998 padata
= [pa_tgs_req
]
2000 obj
, decoded
= self
.KDC_REQ_create(
2001 msg_type
=KRB_TGS_REQ
,
2004 asn1Spec
=krb5_asn1
.TGS_REQ(),
2005 asn1_print
=asn1_print
,
2007 if native_decoded_only
:
2011 def PA_S4U2Self_create(self
, name
, realm
, tgt_session_key
, ctype
=None):
2012 # PA-S4U2Self ::= SEQUENCE {
2013 # name [0] PrincipalName,
2015 # cksum [2] Checksum,
2016 # auth [3] GeneralString
2018 cksum_data
= name
['name-type'].to_bytes(4, byteorder
='little')
2019 for n
in name
['name-string']:
2020 cksum_data
+= n
.encode()
2021 cksum_data
+= realm
.encode()
2022 cksum_data
+= "Kerberos".encode()
2023 cksum
= self
.Checksum_create(tgt_session_key
,
2024 KU_NON_KERB_CKSUM_SALT
,
2034 pa_s4u2self
= self
.der_encode(
2035 PA_S4U2Self_obj
, asn1Spec
=krb5_asn1
.PA_S4U2Self())
2036 return self
.PA_DATA_create(PADATA_FOR_USER
, pa_s4u2self
)
2038 def ChangePasswdDataMS_create(self
,
2042 ChangePasswdDataMS_obj
= {
2043 'newpasswd': new_password
,
2045 if target_princ
is not None:
2046 ChangePasswdDataMS_obj
['targname'] = target_princ
2047 if target_realm
is not None:
2048 ChangePasswdDataMS_obj
['targrealm'] = target_realm
2050 change_password_data
= self
.der_encode(
2051 ChangePasswdDataMS_obj
, asn1Spec
=krb5_asn1
.ChangePasswdDataMS())
2053 return change_password_data
2055 def KRB_PRIV_create(self
,
2063 EncKrbPrivPart_obj
= {
2064 'user-data': user_data
,
2065 's-address': s_address
,
2067 if timestamp
is not None:
2068 EncKrbPrivPart_obj
['timestamp'] = timestamp
2069 if usec
is not None:
2070 EncKrbPrivPart_obj
['usec'] = usec
2071 if seq_number
is not None:
2072 EncKrbPrivPart_obj
['seq-number'] = seq_number
2073 if r_address
is not None:
2074 EncKrbPrivPart_obj
['r-address'] = r_address
2076 enc_krb_priv_part
= self
.der_encode(
2077 EncKrbPrivPart_obj
, asn1Spec
=krb5_asn1
.EncKrbPrivPart())
2079 enc_data
= self
.EncryptedData_create(subkey
,
2085 'msg-type': KRB_PRIV
,
2086 'enc-part': enc_data
,
2089 krb_priv
= self
.der_encode(
2090 KRB_PRIV_obj
, asn1Spec
=krb5_asn1
.KRB_PRIV())
2094 def ContentInfo_create(self
, content_type
, content
):
2095 content_info_obj
= {
2096 'contentType': content_type
,
2100 return content_info_obj
2102 def EncapsulatedContentInfo_create(self
, content_type
, content
):
2103 encapsulated_content_info_obj
= {
2104 'eContentType': content_type
,
2105 'eContent': content
,
2108 return encapsulated_content_info_obj
2110 def SignedData_create(self
,
2118 def is_cert_version_present(version
):
2119 return certificates
is not None and any(
2120 version
in cert
for cert
in certificates
)
2122 def is_crl_version_present(version
):
2123 return crls
is not None and any(
2124 version
in crl
for crl
in crls
)
2126 def is_signer_info_version_present(version
):
2127 return signer_infos
is not None and any(
2128 signer_info
['version'] == version
2129 for signer_info
in signer_infos
)
2133 if is_cert_version_present('other') or (
2134 is_crl_version_present('other')):
2137 if is_cert_version_present('v2AttrCert'):
2140 if is_cert_version_present('v1AttrCert') or (
2141 is_signer_info_version_present(3)) or (
2142 encap_content_info
['eContentType'] != krb5_asn1
.id_data
2149 version
= data_version()
2153 'digestAlgorithms': digest_algorithms
,
2154 'encapContentInfo': encap_content_info
,
2155 'signerInfos': signer_infos
,
2158 if certificates
is not None:
2159 signed_data_obj
['certificates'] = certificates
2160 if crls
is not None:
2161 signed_data_obj
['crls'] = crls
2163 return signed_data_obj
2165 def AuthPack_create(self
,
2168 client_public_value
=None,
2169 supported_cms_types
=None,
2170 client_dh_nonce
=None,
2171 win2k_variant
=False):
2173 self
.assertIsNone(supported_cms_types
)
2174 self
.assertIsNone(client_dh_nonce
)
2177 'pkAuthenticator': pk_authenticator
,
2180 if client_public_value
is not None:
2181 auth_pack_obj
['clientPublicValue'] = client_public_value
2182 if supported_cms_types
is not None:
2183 auth_pack_obj
['supportedCMSTypes'] = supported_cms_types
2184 if client_dh_nonce
is not None:
2185 auth_pack_obj
['clientDHNonce'] = client_dh_nonce
2187 return auth_pack_obj
2189 def PK_AS_REQ_create(self
,
2192 trusted_certifiers
=None,
2195 encryption_cert
=None,
2196 win2k_variant
=False):
2198 self
.assertIsNone(kdc_pk_id
)
2199 asn1_spec
= krb5_asn1
.PA_PK_AS_REQ_Win2k
2201 self
.assertIsNone(kdc_cert
)
2202 self
.assertIsNone(encryption_cert
)
2203 asn1_spec
= krb5_asn1
.PA_PK_AS_REQ
2205 content_info_obj
= self
.ContentInfo_create(
2206 krb5_asn1
.id_signedData
, signed_auth_pack
)
2207 content_info
= self
.der_encode(content_info_obj
,
2208 asn1Spec
=krb5_asn1
.ContentInfo())
2211 'signedAuthPack': content_info
,
2214 if trusted_certifiers
is not None:
2215 pk_as_req_obj
['trustedCertifiers'] = trusted_certifiers
2216 if kdc_pk_id
is not None:
2217 pk_as_req_obj
['kdcPkId'] = kdc_pk_id
2218 if kdc_cert
is not None:
2219 pk_as_req_obj
['kdcCert'] = kdc_cert
2220 if encryption_cert
is not None:
2221 pk_as_req_obj
['encryptionCert'] = encryption_cert
2223 return self
.der_encode(pk_as_req_obj
, asn1Spec
=asn1_spec())
2225 def SignerInfo_create(self
,
2228 signature_algorithm
,
2233 unsigned_attrs
=None):
2236 if 'issuerAndSerialNumber' in signer_id
:
2238 elif 'subjectKeyIdentifier' in signer_id
:
2241 self
.fail(f
'unknown signer ID version ({signer_id})')
2246 'digestAlgorithm': digest_algorithm
,
2247 'signatureAlgorithm': signature_algorithm
,
2248 'signature': signature
,
2251 if signed_attrs
is not None:
2252 signer_info_obj
['signedAttrs'] = signed_attrs
2253 if unsigned_attrs
is not None:
2254 signer_info_obj
['unsignedAttrs'] = unsigned_attrs
2256 return signer_info_obj
2258 def SignerIdentifier_create(self
, *,
2259 issuer_and_serial_number
=None,
2260 subject_key_id
=None):
2261 if issuer_and_serial_number
is not None:
2262 return {'issuerAndSerialNumber': issuer_and_serial_number
}
2264 if subject_key_id
is not None:
2265 return {'subjectKeyIdentifier': subject_key_id
}
2267 self
.fail('identifier not specified')
2269 def AlgorithmIdentifier_create(self
,
2273 algorithm_id_obj
= {
2274 'algorithm': algorithm
,
2277 if parameters
is not None:
2278 algorithm_id_obj
['parameters'] = parameters
2280 return algorithm_id_obj
2282 def SubjectPublicKeyInfo_create(self
,
2286 'algorithm': algorithm
,
2287 'subjectPublicKey': public_key
,
2290 def ValidationParms_create(self
,
2295 'pgenCounter': pgen_counter
,
2298 def DomainParameters_create(self
,
2304 validation_parms
=None):
2305 domain_params_obj
= {
2311 domain_params_obj
['q'] = q
2313 domain_params_obj
['j'] = j
2314 if validation_parms
is not None:
2315 domain_params_obj
['validationParms'] = validation_parms
2317 return domain_params_obj
2319 def length_in_bytes(self
, value
):
2320 """Return the length in bytes of an integer once it is encoded as
2323 self
.assertGreaterEqual(value
, 0, 'value must be positive')
2324 self
.assertIsInstance(value
, int)
2326 length_in_bits
= max(1, math
.log2(value
+ 1))
2327 length_in_bytes
= math
.ceil(length_in_bits
/ 8)
2328 return length_in_bytes
2330 def bytes_from_int(self
, value
, *, length
=None):
2331 """Return an integer encoded big-endian into bytes of an optionally
2335 length
= self
.length_in_bytes(value
)
2336 return value
.to_bytes(length
, 'big')
2338 def int_from_bytes(self
, data
):
2339 """Return an integer decoded from bytes in big-endian format."""
2340 return int.from_bytes(data
, 'big')
2342 def int_from_bit_string(self
, string
):
2343 """Return an integer decoded from a bitstring."""
2344 return int(string
, base
=2)
2346 def bit_string_from_int(self
, value
):
2347 """Return a bitstring encoding of an integer."""
2349 string
= f
'{value:b}'
2351 # The bitstring must be padded to a multiple of 8 bits in length, or
2352 # pyasn1 will interpret it incorrectly (as if the padding bits were
2353 # present, but on the wrong end).
2354 length
= len(string
)
2355 padding_len
= math
.ceil(length
/ 8) * 8 - length
2356 return '0' * padding_len
+ string
2358 def bit_string_from_bytes(self
, data
):
2359 """Return a bitstring encoding of bytes in big-endian format."""
2360 value
= self
.int_from_bytes(data
)
2361 return self
.bit_string_from_int(value
)
2363 def bytes_from_bit_string(self
, string
):
2364 """Return big-endian format bytes encoded from a bitstring."""
2365 value
= self
.int_from_bit_string(string
)
2366 length
= math
.ceil(len(string
) / 8)
2367 return value
.to_bytes(length
, 'big')
2369 def asn1_length(self
, data
):
2370 """Return the ASN.1 encoding of the length of some data."""
2374 self
.assertGreater(length
, 0)
2376 return bytes([length
])
2378 encoding_len
= self
.length_in_bytes(length
)
2379 self
.assertLess(encoding_len
, 0x80,
2380 'item is too long to be ASN.1 encoded')
2382 data
= self
.bytes_from_int(length
, length
=encoding_len
)
2383 return bytes([0x80 | encoding_len
]) + data
2386 def octetstring2key(x
, enctype
):
2387 """This implements the function defined in RFC4556 3.2.3.1 “Using
2388 Diffie-Hellman Key Exchange”."""
2390 seedsize
= kcrypto
.seedsize(enctype
)
2393 # A counter that cycles through the bytes 0x00–0xff.
2394 counter
= itertools
.cycle(map(lambda x
: bytes([x
]),
2397 while len(seed
) < seedsize
:
2398 digest
= hashes
.Hash(hashes
.SHA1(), default_backend())
2399 digest
.update(next(counter
) + x
)
2400 seed
+= digest
.finalize()
2402 key
= kcrypto
.random_to_key(enctype
, seed
[:seedsize
])
2403 return RodcPacEncryptionKey(key
, kvno
=None)
2405 def unpad(self
, data
):
2406 """Return unpadded data."""
2407 padding_len
= data
[-1]
2408 expected_padding
= bytes([padding_len
]) * padding_len
2409 self
.assertEqual(expected_padding
, data
[-padding_len
:],
2410 'invalid padding bytes')
2412 return data
[:-padding_len
]
2414 def try_decode(self
, data
, module
=None):
2415 """Try to decode some data of unknown type with various known ASN.1
2416 schemata (optionally restricted to those from a particular module) and
2417 print any results that seem promising. For use when debugging.
2421 # Try a couple of known ASN.1 modules.
2422 self
.try_decode(data
, krb5_asn1
)
2423 self
.try_decode(data
, pyasn1
.type.univ
)
2425 # It’s helpful to stop and give the user a chance to examine the
2427 self
.fail('decoding done')
2431 item
= getattr(module
, name
)
2432 if not callable(item
):
2436 decoded
= self
.der_decode(data
, asn1Spec
=item())
2438 # Initiating the schema or decoding the ASN.1 failed for
2442 # Decoding succeeded: print the structure to be examined.
2446 def cipher_from_algorithm(self
, algorithm
):
2447 if algorithm
== str(krb5_asn1
.aes256_CBC_PAD
):
2448 return algorithms
.AES
2450 if algorithm
== str(krb5_asn1
.des_EDE3_CBC
):
2451 return algorithms
.TripleDES
2453 self
.fail(f
'unknown cipher algorithm {algorithm}')
2455 def hash_from_algorithm(self
, algorithm
):
2456 # Let someone pass in an ObjectIdentifier.
2457 algorithm
= str(algorithm
)
2459 if algorithm
== str(krb5_asn1
.id_sha1
):
2462 if algorithm
== str(krb5_asn1
.sha1WithRSAEncryption
):
2465 if algorithm
== str(krb5_asn1
.rsaEncryption
):
2468 if algorithm
== str(krb5_asn1
.id_pkcs1_sha256WithRSAEncryption
):
2469 return hashes
.SHA256
2471 if algorithm
== str(krb5_asn1
.id_sha512
):
2472 return hashes
.SHA512
2474 self
.fail(f
'unknown hash algorithm {algorithm}')
2476 def hash_from_algorithm_id(self
, algorithm_id
):
2477 self
.assertIsInstance(algorithm_id
, dict)
2479 hash = self
.hash_from_algorithm(algorithm_id
['algorithm'])
2481 parameters
= algorithm_id
.get('parameters')
2482 if self
.strict_checking
:
2483 self
.assertIsNotNone(parameters
)
2484 if parameters
is not None:
2485 self
.assertEqual(b
'\x05\x00', parameters
)
2489 def create_freshness_token(self
,
2494 timestamp
, usec
= self
.get_KerberosTimeWithUsec(epoch
, offset
)
2496 # Encode the freshness token as PA-ENC-TS-ENC.
2497 ts_enc
= self
.PA_ENC_TS_ENC_create(timestamp
, usec
)
2498 ts_enc
= self
.der_encode(ts_enc
, asn1Spec
=krb5_asn1
.PA_ENC_TS_ENC())
2500 if krbtgt_creds
is None:
2501 krbtgt_creds
= self
.get_krbtgt_creds()
2502 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
2504 # Encrypt the freshness token.
2505 freshness
= self
.EncryptedData_create(krbtgt_key
, KU_AS_FRESHNESS
, ts_enc
)
2507 freshness_token
= self
.der_encode(freshness
,
2508 asn1Spec
=krb5_asn1
.EncryptedData())
2510 # Prepend a couple of zero bytes.
2511 freshness_token
= bytes(2) + freshness_token
2513 return freshness_token
2515 def kpasswd_create(self
,
2523 self
.assertIsNotNone(self
.s
, 'call self.connect() first')
2525 timestamp
, usec
= self
.get_KerberosTimeWithUsec()
2527 krb_priv
= self
.KRB_PRIV_create(subkey
,
2529 s_address
=local_address
,
2530 timestamp
=timestamp
,
2532 seq_number
=seq_number
,
2533 r_address
=remote_address
)
2535 size
= 6 + len(ap_req
) + len(krb_priv
)
2536 self
.assertLess(size
, 0x10000)
2539 msg
.append(size
>> 8)
2540 msg
.append(size
& 0xff)
2541 msg
.append(version
>> 8)
2542 msg
.append(version
& 0xff)
2543 msg
.append(len(ap_req
) >> 8)
2544 msg
.append(len(ap_req
) & 0xff)
2545 # Note: for sets, there could be a little-endian four-byte length here.
2548 msg
.extend(krb_priv
)
2552 def get_enc_part(self
, obj
, key
, usage
):
2553 self
.assertElementEqual(obj
, 'pvno', 5)
2555 enc_part
= obj
['enc-part']
2556 self
.assertElementEqual(enc_part
, 'etype', key
.etype
)
2557 self
.assertElementKVNO(enc_part
, 'kvno', key
.kvno
)
2559 enc_part
= key
.decrypt(usage
, enc_part
['cipher'])
2563 def kpasswd_exchange(self
,
2572 send_seq_number
=True):
2573 if mode
is self
.KpasswdMode
.SET
:
2575 user_data
= self
.ChangePasswdDataMS_create(new_password
,
2578 elif mode
is self
.KpasswdMode
.CHANGE
:
2579 self
.assertIsNone(target_princ
,
2580 'target_princ only valid for pw set')
2581 self
.assertIsNone(target_realm
,
2582 'target_realm only valid for pw set')
2585 user_data
= new_password
.encode('utf-8')
2587 self
.fail(f
'invalid mode {mode}')
2589 subkey
= self
.RandomKey(kcrypto
.Enctype
.AES256
)
2591 if ap_options
is None:
2593 ap_options
= str(krb5_asn1
.APOptions(ap_options
))
2595 kdc_exchange_dict
= {
2597 'authenticator_subkey': subkey
,
2599 'ap_options': ap_options
,
2603 seq_number
= random
.randint(0, 0xfffffffe)
2607 ap_req
= self
.generate_ap_req(kdc_exchange_dict
,
2611 usage
=KU_AP_REQ_AUTH
,
2612 seq_number
=seq_number
)
2614 self
.connect(self
.host
, port
=464)
2615 self
.assertIsNotNone(self
.s
)
2617 family
= self
.s
.family
2619 if family
== socket
.AF_INET
:
2620 addr_type
= 2 # IPv4
2621 elif family
== socket
.AF_INET6
:
2622 addr_type
= 24 # IPv6
2624 self
.fail(f
'unknown family {family}')
2626 def create_address(ip
):
2628 'addr-type': addr_type
,
2629 'address': socket
.inet_pton(family
, ip
),
2632 local_ip
= self
.s
.getsockname()[0]
2633 local_address
= create_address(local_ip
)
2635 # remote_ip = self.s.getpeername()[0]
2636 # remote_address = create_address(remote_ip)
2638 # TODO: due to a bug (?), MIT Kerberos will not accept the request
2639 # unless r-address is set to our _local_ address. Heimdal, on the other
2640 # hand, requires the r-address is set to the remote address (as
2641 # expected). To avoid problems, avoid sending r-address for now.
2642 remote_address
= None
2644 msg
= self
.kpasswd_create(subkey
,
2653 rep_pdu
= self
.recv_pdu_raw()
2655 self
._disconnect
('transaction done')
2657 self
.assertIsNotNone(rep_pdu
)
2659 header
= rep_pdu
[:6]
2662 reply_len
= (header
[0] << 8) | header
[1]
2663 reply_version
= (header
[2] << 8) | header
[3]
2664 ap_rep_len
= (header
[4] << 8) | header
[5]
2666 self
.assertEqual(reply_len
, len(rep_pdu
))
2667 self
.assertEqual(1, reply_version
) # KRB5_KPASSWD_VERS_CHANGEPW
2668 self
.assertLess(ap_rep_len
, reply_len
)
2670 self
.assertNotEqual(0x7e, rep_pdu
[1])
2671 self
.assertNotEqual(0x5e, rep_pdu
[1])
2674 # We received an AP-REQ and KRB-PRIV as a response. This may or may
2675 # not indicate an error, depending on the status code.
2676 ap_rep
= reply
[:ap_rep_len
]
2677 krb_priv
= reply
[ap_rep_len
:]
2679 key
= ticket
.session_key
2681 ap_rep
= self
.der_decode(ap_rep
, asn1Spec
=krb5_asn1
.AP_REP())
2682 self
.assertElementEqual(ap_rep
, 'msg-type', KRB_AP_REP
)
2683 enc_part
= self
.get_enc_part(ap_rep
, key
, KU_AP_REQ_ENC_PART
)
2684 enc_part
= self
.der_decode(
2685 enc_part
, asn1Spec
=krb5_asn1
.EncAPRepPart())
2687 self
.assertElementPresent(enc_part
, 'ctime')
2688 self
.assertElementPresent(enc_part
, 'cusec')
2689 # self.assertElementMissing(enc_part, 'subkey') # TODO
2690 # self.assertElementPresent(enc_part, 'seq-number') # TODO
2693 krb_priv
= self
.der_decode(krb_priv
, asn1Spec
=krb5_asn1
.KRB_PRIV())
2697 self
.assertElementEqual(krb_priv
, 'msg-type', KRB_PRIV
)
2698 priv_enc_part
= self
.get_enc_part(krb_priv
, subkey
, KU_KRB_PRIV
)
2699 priv_enc_part
= self
.der_decode(
2700 priv_enc_part
, asn1Spec
=krb5_asn1
.EncKrbPrivPart())
2702 self
.assertElementMissing(priv_enc_part
, 'timestamp')
2703 self
.assertElementMissing(priv_enc_part
, 'usec')
2704 # self.assertElementPresent(priv_enc_part, 'seq-number') # TODO
2705 # self.assertElementEqual(priv_enc_part, 's-address', remote_address) # TODO
2706 # self.assertElementMissing(priv_enc_part, 'r-address') # TODO
2708 result_data
= priv_enc_part
['user-data']
2710 # We received a KRB-ERROR as a response, indicating an error.
2711 krb_error
= self
.der_decode(reply
, asn1Spec
=krb5_asn1
.KRB_ERROR())
2713 sname
= self
.PrincipalName_create(
2714 name_type
=NT_PRINCIPAL
,
2715 names
=['kadmin', 'changepw'])
2716 realm
= self
.get_krbtgt_creds().get_realm().upper()
2718 self
.assertElementEqual(krb_error
, 'pvno', 5)
2719 self
.assertElementEqual(krb_error
, 'msg-type', KRB_ERROR
)
2720 self
.assertElementMissing(krb_error
, 'ctime')
2721 self
.assertElementMissing(krb_error
, 'usec')
2722 self
.assertElementPresent(krb_error
, 'stime')
2723 self
.assertElementPresent(krb_error
, 'susec')
2725 error_code
= krb_error
['error-code']
2726 if isinstance(expected_code
, int):
2727 self
.assertEqual(error_code
, expected_code
)
2729 self
.assertIn(error_code
, expected_code
)
2731 self
.assertElementMissing(krb_error
, 'crealm')
2732 self
.assertElementMissing(krb_error
, 'cname')
2733 self
.assertElementEqual(krb_error
, 'realm', realm
.encode('utf-8'))
2734 self
.assertElementEqualPrincipal(krb_error
, 'sname', sname
)
2735 self
.assertElementMissing(krb_error
, 'e-text')
2737 result_data
= krb_error
['e-data']
2739 status
= result_data
[:2]
2740 message
= result_data
[2:]
2742 status_code
= (status
[0] << 8) | status
[1]
2743 if isinstance(expected_code
, int):
2744 self
.assertEqual(status_code
, expected_code
)
2746 self
.assertIn(status_code
, expected_code
)
2749 self
.assertEqual(0, status_code
,
2750 'got an error result, but no message')
2753 # Check the first character of the message.
2755 if isinstance(expected_msg
, bytes
):
2756 self
.assertEqual(message
, expected_msg
)
2758 self
.assertIn(message
, expected_msg
)
2760 # We got AD password policy information.
2761 self
.assertEqual(30, len(message
))
2768 min_age
) = struct
.unpack('>HIIIQQ', message
)
2770 def _generic_kdc_exchange(self
,
2771 kdc_exchange_dict
, # required
2772 cname
=None, # optional
2773 realm
=None, # required
2774 sname
=None, # optional
2775 from_time
=None, # optional
2776 till_time
=None, # required
2777 renew_time
=None, # optional
2778 etypes
=None, # required
2779 addresses
=None, # optional
2780 additional_tickets
=None, # optional
2781 EncAuthorizationData
=None, # optional
2782 EncAuthorizationData_key
=None, # optional
2783 EncAuthorizationData_usage
=None): # optional
2785 check_error_fn
= kdc_exchange_dict
['check_error_fn']
2786 check_rep_fn
= kdc_exchange_dict
['check_rep_fn']
2787 generate_fast_fn
= kdc_exchange_dict
['generate_fast_fn']
2788 generate_fast_armor_fn
= kdc_exchange_dict
['generate_fast_armor_fn']
2789 generate_fast_padata_fn
= kdc_exchange_dict
['generate_fast_padata_fn']
2790 generate_padata_fn
= kdc_exchange_dict
['generate_padata_fn']
2791 callback_dict
= kdc_exchange_dict
['callback_dict']
2792 req_msg_type
= kdc_exchange_dict
['req_msg_type']
2793 req_asn1Spec
= kdc_exchange_dict
['req_asn1Spec']
2794 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
2796 expected_error_mode
= kdc_exchange_dict
['expected_error_mode']
2797 kdc_options
= kdc_exchange_dict
['kdc_options']
2799 pac_request
= kdc_exchange_dict
['pac_request']
2800 pac_options
= kdc_exchange_dict
['pac_options']
2802 # Parameters specific to the inner request body
2803 inner_req
= kdc_exchange_dict
['inner_req']
2805 # Parameters specific to the outer request body
2806 outer_req
= kdc_exchange_dict
['outer_req']
2808 if till_time
is None:
2809 till_time
= self
.get_KerberosTime(offset
=36000)
2811 if 'nonce' in kdc_exchange_dict
:
2812 nonce
= kdc_exchange_dict
['nonce']
2814 nonce
= self
.get_Nonce()
2815 kdc_exchange_dict
['nonce'] = nonce
2817 req_body
= self
.KDC_REQ_BODY_create(
2818 kdc_options
=kdc_options
,
2822 from_time
=from_time
,
2823 till_time
=till_time
,
2824 renew_time
=renew_time
,
2827 addresses
=addresses
,
2828 additional_tickets
=additional_tickets
,
2829 EncAuthorizationData
=EncAuthorizationData
,
2830 EncAuthorizationData_key
=EncAuthorizationData_key
,
2831 EncAuthorizationData_usage
=EncAuthorizationData_usage
)
2833 inner_req_body
= dict(req_body
)
2834 if inner_req
is not None:
2835 for key
, value
in inner_req
.items():
2836 if value
is not None:
2837 inner_req_body
[key
] = value
2839 del inner_req_body
[key
]
2840 if outer_req
is not None:
2841 for key
, value
in outer_req
.items():
2842 if value
is not None:
2843 req_body
[key
] = value
2847 additional_padata
= []
2848 if pac_request
is not None:
2849 pa_pac_request
= self
.KERB_PA_PAC_REQUEST_create(pac_request
)
2850 additional_padata
.append(pa_pac_request
)
2851 if pac_options
is not None:
2852 pa_pac_options
= self
.get_pa_pac_options(pac_options
)
2853 additional_padata
.append(pa_pac_options
)
2855 if req_msg_type
== KRB_AS_REQ
:
2857 tgs_req_padata
= None
2859 self
.assertEqual(KRB_TGS_REQ
, req_msg_type
)
2861 tgs_req
= self
.generate_ap_req(kdc_exchange_dict
,
2865 tgs_req_padata
= self
.PA_DATA_create(PADATA_KDC_REQ
, tgs_req
)
2867 if generate_fast_padata_fn
is not None:
2868 self
.assertIsNotNone(generate_fast_fn
)
2869 # This can alter req_body...
2870 fast_padata
, req_body
= generate_fast_padata_fn(kdc_exchange_dict
,
2876 if generate_fast_armor_fn
is not None:
2877 self
.assertIsNotNone(generate_fast_fn
)
2878 fast_ap_req
= generate_fast_armor_fn(kdc_exchange_dict
,
2883 fast_armor_type
= kdc_exchange_dict
['fast_armor_type']
2884 fast_armor
= self
.KRB_FAST_ARMOR_create(fast_armor_type
,
2889 if generate_padata_fn
is not None:
2890 # This can alter req_body...
2891 outer_padata
, req_body
= generate_padata_fn(kdc_exchange_dict
,
2894 self
.assertIsNotNone(outer_padata
)
2895 self
.assertNotIn(PADATA_KDC_REQ
,
2896 [pa
['padata-type'] for pa
in outer_padata
],
2897 'Don\'t create TGS-REQ manually')
2901 if generate_fast_fn
is not None:
2902 armor_key
= kdc_exchange_dict
['armor_key']
2903 self
.assertIsNotNone(armor_key
)
2905 if req_msg_type
== KRB_AS_REQ
:
2906 checksum_blob
= self
.der_encode(
2908 asn1Spec
=krb5_asn1
.KDC_REQ_BODY())
2910 self
.assertEqual(KRB_TGS_REQ
, req_msg_type
)
2911 checksum_blob
= tgs_req
2913 checksum
= self
.Checksum_create(armor_key
,
2917 fast_padata
+= additional_padata
2918 fast
= generate_fast_fn(kdc_exchange_dict
,
2929 if tgs_req_padata
is not None:
2930 padata
.append(tgs_req_padata
)
2932 if fast
is not None:
2935 if outer_padata
is not None:
2936 padata
+= outer_padata
2939 padata
+= additional_padata
2944 kdc_exchange_dict
['req_padata'] = padata
2945 kdc_exchange_dict
['fast_padata'] = fast_padata
2946 kdc_exchange_dict
['req_body'] = inner_req_body
2948 req_obj
, req_decoded
= self
.KDC_REQ_create(msg_type
=req_msg_type
,
2951 asn1Spec
=req_asn1Spec())
2953 kdc_exchange_dict
['req_obj'] = req_obj
2955 to_rodc
= kdc_exchange_dict
['to_rodc']
2957 rep
= self
.send_recv_transaction(req_decoded
, to_rodc
=to_rodc
)
2958 self
.assertIsNotNone(rep
)
2960 msg_type
= self
.getElementValue(rep
, 'msg-type')
2961 self
.assertIsNotNone(msg_type
)
2963 expected_msg_type
= None
2964 if check_error_fn
is not None:
2965 expected_msg_type
= KRB_ERROR
2966 self
.assertIsNone(check_rep_fn
)
2967 self
.assertNotEqual(0, len(expected_error_mode
))
2968 self
.assertNotIn(0, expected_error_mode
)
2969 if check_rep_fn
is not None:
2970 expected_msg_type
= rep_msg_type
2971 self
.assertIsNone(check_error_fn
)
2972 self
.assertEqual(0, len(expected_error_mode
))
2973 self
.assertIsNotNone(expected_msg_type
)
2974 if msg_type
== KRB_ERROR
:
2975 error_code
= self
.getElementValue(rep
, 'error-code')
2976 fail_msg
= f
'Got unexpected error: {error_code}'
2978 fail_msg
= f
'Expected to fail with error: {expected_error_mode}'
2979 self
.assertEqual(msg_type
, expected_msg_type
, fail_msg
)
2981 if msg_type
== KRB_ERROR
:
2982 return check_error_fn(kdc_exchange_dict
,
2986 return check_rep_fn(kdc_exchange_dict
, callback_dict
, rep
)
2988 def as_exchange_dict(self
,
2991 expected_crealm
=None,
2992 expected_cname
=None,
2993 expected_anon
=False,
2994 expected_srealm
=None,
2995 expected_sname
=None,
2996 expected_account_name
=None,
2997 expected_groups
=None,
2998 unexpected_groups
=None,
2999 expected_upn_name
=None,
3001 expected_requester_sid
=None,
3002 expected_domain_sid
=None,
3003 expected_supported_etypes
=None,
3004 expected_flags
=None,
3005 unexpected_flags
=None,
3006 ticket_decryption_key
=None,
3007 expect_ticket_checksum
=None,
3008 expect_full_checksum
=None,
3009 generate_fast_fn
=None,
3010 generate_fast_armor_fn
=None,
3011 generate_fast_padata_fn
=None,
3012 fast_armor_type
=FX_FAST_ARMOR_AP_REQUEST
,
3013 generate_padata_fn
=None,
3014 check_error_fn
=None,
3016 check_kdc_private_fn
=None,
3019 expected_error_mode
=0,
3021 expected_status
=None,
3023 authenticator_subkey
=None,
3035 fast_ap_options
=None,
3036 strict_edata_checking
=True,
3037 using_pkinit
=PkInit
.NOT_USED
,
3041 expect_client_claims
=None,
3042 expect_device_info
=None,
3043 expect_device_claims
=None,
3044 expect_upn_dns_info_ex
=None,
3045 expect_pac_attrs
=None,
3046 expect_pac_attrs_pac_request
=None,
3047 expect_requester_sid
=None,
3049 expected_client_claims
=None,
3050 unexpected_client_claims
=None,
3051 expected_device_claims
=None,
3052 unexpected_device_claims
=None,
3053 expect_resource_groups_flag
=None,
3054 expected_device_groups
=None,
3056 if expected_error_mode
== 0:
3057 expected_error_mode
= ()
3058 elif not isinstance(expected_error_mode
, collections
.abc
.Container
):
3059 expected_error_mode
= (expected_error_mode
,)
3061 kdc_exchange_dict
= {
3062 'req_msg_type': KRB_AS_REQ
,
3063 'req_asn1Spec': krb5_asn1
.AS_REQ
,
3064 'rep_msg_type': KRB_AS_REP
,
3065 'rep_asn1Spec': krb5_asn1
.AS_REP
,
3066 'rep_encpart_asn1Spec': krb5_asn1
.EncASRepPart
,
3068 'client_cert': client_cert
,
3069 'expected_crealm': expected_crealm
,
3070 'expected_cname': expected_cname
,
3071 'expected_anon': expected_anon
,
3072 'expected_srealm': expected_srealm
,
3073 'expected_sname': expected_sname
,
3074 'expected_account_name': expected_account_name
,
3075 'expected_groups': expected_groups
,
3076 'unexpected_groups': unexpected_groups
,
3077 'expected_upn_name': expected_upn_name
,
3078 'expected_sid': expected_sid
,
3079 'expected_requester_sid': expected_requester_sid
,
3080 'expected_domain_sid': expected_domain_sid
,
3081 'expected_supported_etypes': expected_supported_etypes
,
3082 'expected_flags': expected_flags
,
3083 'unexpected_flags': unexpected_flags
,
3084 'ticket_decryption_key': ticket_decryption_key
,
3085 'expect_ticket_checksum': expect_ticket_checksum
,
3086 'expect_full_checksum': expect_full_checksum
,
3087 'generate_fast_fn': generate_fast_fn
,
3088 'generate_fast_armor_fn': generate_fast_armor_fn
,
3089 'generate_fast_padata_fn': generate_fast_padata_fn
,
3090 'fast_armor_type': fast_armor_type
,
3091 'generate_padata_fn': generate_padata_fn
,
3092 'check_error_fn': check_error_fn
,
3093 'check_rep_fn': check_rep_fn
,
3094 'check_kdc_private_fn': check_kdc_private_fn
,
3095 'check_patypes': check_patypes
,
3096 'callback_dict': callback_dict
,
3097 'expected_error_mode': expected_error_mode
,
3098 'expect_status': expect_status
,
3099 'expected_status': expected_status
,
3100 'expected_salt': expected_salt
,
3101 'authenticator_subkey': authenticator_subkey
,
3102 'preauth_key': preauth_key
,
3103 'armor_key': armor_key
,
3104 'armor_tgt': armor_tgt
,
3105 'armor_subkey': armor_subkey
,
3106 'auth_data': auth_data
,
3107 'kdc_options': kdc_options
,
3108 'inner_req': inner_req
,
3109 'outer_req': outer_req
,
3110 'pac_request': pac_request
,
3111 'pac_options': pac_options
,
3112 'ap_options': ap_options
,
3113 'fast_ap_options': fast_ap_options
,
3114 'strict_edata_checking': strict_edata_checking
,
3115 'using_pkinit': using_pkinit
,
3116 'pk_nonce': pk_nonce
,
3117 'expect_edata': expect_edata
,
3118 'expect_pac': expect_pac
,
3119 'expect_client_claims': expect_client_claims
,
3120 'expect_device_info': expect_device_info
,
3121 'expect_device_claims': expect_device_claims
,
3122 'expect_upn_dns_info_ex': expect_upn_dns_info_ex
,
3123 'expect_pac_attrs': expect_pac_attrs
,
3124 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request
,
3125 'expect_requester_sid': expect_requester_sid
,
3126 'rc4_support': rc4_support
,
3127 'expected_client_claims': expected_client_claims
,
3128 'unexpected_client_claims': unexpected_client_claims
,
3129 'expected_device_claims': expected_device_claims
,
3130 'unexpected_device_claims': unexpected_device_claims
,
3131 'expect_resource_groups_flag': expect_resource_groups_flag
,
3132 'expected_device_groups': expected_device_groups
,
3135 if callback_dict
is None:
3138 return kdc_exchange_dict
3140 def tgs_exchange_dict(self
,
3142 expected_crealm
=None,
3143 expected_cname
=None,
3144 expected_anon
=False,
3145 expected_srealm
=None,
3146 expected_sname
=None,
3147 expected_account_name
=None,
3148 expected_groups
=None,
3149 unexpected_groups
=None,
3150 expected_upn_name
=None,
3152 expected_requester_sid
=None,
3153 expected_domain_sid
=None,
3154 expected_device_domain_sid
=None,
3155 expected_supported_etypes
=None,
3156 expected_flags
=None,
3157 unexpected_flags
=None,
3158 ticket_decryption_key
=None,
3159 expect_ticket_checksum
=None,
3160 expect_full_checksum
=None,
3161 generate_fast_fn
=None,
3162 generate_fast_armor_fn
=None,
3163 generate_fast_padata_fn
=None,
3164 fast_armor_type
=FX_FAST_ARMOR_AP_REQUEST
,
3165 generate_padata_fn
=None,
3166 check_error_fn
=None,
3168 check_kdc_private_fn
=None,
3170 expected_error_mode
=0,
3172 expected_status
=None,
3178 authenticator_subkey
=None,
3180 body_checksum_type
=None,
3187 fast_ap_options
=None,
3188 strict_edata_checking
=True,
3191 expect_client_claims
=None,
3192 expect_device_info
=None,
3193 expect_device_claims
=None,
3194 expect_upn_dns_info_ex
=None,
3195 expect_pac_attrs
=None,
3196 expect_pac_attrs_pac_request
=None,
3197 expect_requester_sid
=None,
3198 expected_proxy_target
=None,
3199 expected_transited_services
=None,
3201 expected_client_claims
=None,
3202 unexpected_client_claims
=None,
3203 expected_device_claims
=None,
3204 unexpected_device_claims
=None,
3205 expect_resource_groups_flag
=None,
3206 expected_device_groups
=None,
3208 if expected_error_mode
== 0:
3209 expected_error_mode
= ()
3210 elif not isinstance(expected_error_mode
, collections
.abc
.Container
):
3211 expected_error_mode
= (expected_error_mode
,)
3213 kdc_exchange_dict
= {
3214 'req_msg_type': KRB_TGS_REQ
,
3215 'req_asn1Spec': krb5_asn1
.TGS_REQ
,
3216 'rep_msg_type': KRB_TGS_REP
,
3217 'rep_asn1Spec': krb5_asn1
.TGS_REP
,
3218 'rep_encpart_asn1Spec': krb5_asn1
.EncTGSRepPart
,
3220 'expected_crealm': expected_crealm
,
3221 'expected_cname': expected_cname
,
3222 'expected_anon': expected_anon
,
3223 'expected_srealm': expected_srealm
,
3224 'expected_sname': expected_sname
,
3225 'expected_account_name': expected_account_name
,
3226 'expected_groups': expected_groups
,
3227 'unexpected_groups': unexpected_groups
,
3228 'expected_upn_name': expected_upn_name
,
3229 'expected_sid': expected_sid
,
3230 'expected_requester_sid': expected_requester_sid
,
3231 'expected_domain_sid': expected_domain_sid
,
3232 'expected_device_domain_sid': expected_device_domain_sid
,
3233 'expected_supported_etypes': expected_supported_etypes
,
3234 'expected_flags': expected_flags
,
3235 'unexpected_flags': unexpected_flags
,
3236 'ticket_decryption_key': ticket_decryption_key
,
3237 'expect_ticket_checksum': expect_ticket_checksum
,
3238 'expect_full_checksum': expect_full_checksum
,
3239 'generate_fast_fn': generate_fast_fn
,
3240 'generate_fast_armor_fn': generate_fast_armor_fn
,
3241 'generate_fast_padata_fn': generate_fast_padata_fn
,
3242 'fast_armor_type': fast_armor_type
,
3243 'generate_padata_fn': generate_padata_fn
,
3244 'check_error_fn': check_error_fn
,
3245 'check_rep_fn': check_rep_fn
,
3246 'check_kdc_private_fn': check_kdc_private_fn
,
3247 'check_patypes': check_patypes
,
3248 'callback_dict': callback_dict
,
3249 'expected_error_mode': expected_error_mode
,
3250 'expect_status': expect_status
,
3251 'expected_status': expected_status
,
3253 'body_checksum_type': body_checksum_type
,
3254 'armor_key': armor_key
,
3255 'armor_tgt': armor_tgt
,
3256 'armor_subkey': armor_subkey
,
3257 'auth_data': auth_data
,
3258 'authenticator_subkey': authenticator_subkey
,
3259 'kdc_options': kdc_options
,
3260 'inner_req': inner_req
,
3261 'outer_req': outer_req
,
3262 'pac_request': pac_request
,
3263 'pac_options': pac_options
,
3264 'ap_options': ap_options
,
3265 'fast_ap_options': fast_ap_options
,
3266 'strict_edata_checking': strict_edata_checking
,
3267 'expect_edata': expect_edata
,
3268 'expect_pac': expect_pac
,
3269 'expect_client_claims': expect_client_claims
,
3270 'expect_device_info': expect_device_info
,
3271 'expect_device_claims': expect_device_claims
,
3272 'expect_upn_dns_info_ex': expect_upn_dns_info_ex
,
3273 'expect_pac_attrs': expect_pac_attrs
,
3274 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request
,
3275 'expect_requester_sid': expect_requester_sid
,
3276 'expected_proxy_target': expected_proxy_target
,
3277 'expected_transited_services': expected_transited_services
,
3278 'rc4_support': rc4_support
,
3279 'expected_client_claims': expected_client_claims
,
3280 'unexpected_client_claims': unexpected_client_claims
,
3281 'expected_device_claims': expected_device_claims
,
3282 'unexpected_device_claims': unexpected_device_claims
,
3283 'expect_resource_groups_flag': expect_resource_groups_flag
,
3284 'expected_device_groups': expected_device_groups
,
3287 if callback_dict
is None:
3290 return kdc_exchange_dict
3292 def generic_check_kdc_rep(self
,
3297 expected_crealm
= kdc_exchange_dict
['expected_crealm']
3298 expected_anon
= kdc_exchange_dict
['expected_anon']
3299 expected_srealm
= kdc_exchange_dict
['expected_srealm']
3300 expected_sname
= kdc_exchange_dict
['expected_sname']
3301 ticket_decryption_key
= kdc_exchange_dict
['ticket_decryption_key']
3302 check_kdc_private_fn
= kdc_exchange_dict
['check_kdc_private_fn']
3303 rep_encpart_asn1Spec
= kdc_exchange_dict
['rep_encpart_asn1Spec']
3304 msg_type
= kdc_exchange_dict
['rep_msg_type']
3305 armor_key
= kdc_exchange_dict
['armor_key']
3307 self
.assertElementEqual(rep
, 'msg-type', msg_type
) # AS-REP | TGS-REP
3308 padata
= self
.getElementValue(rep
, 'padata')
3309 if self
.strict_checking
:
3310 self
.assertElementEqualUTF8(rep
, 'crealm', expected_crealm
)
3311 if self
.cname_checking
:
3313 expected_cname
= self
.PrincipalName_create(
3314 name_type
=NT_WELLKNOWN
,
3315 names
=['WELLKNOWN', 'ANONYMOUS'])
3317 expected_cname
= kdc_exchange_dict
['expected_cname']
3318 self
.assertElementEqualPrincipal(rep
, 'cname', expected_cname
)
3319 self
.assertElementPresent(rep
, 'ticket')
3320 ticket
= self
.getElementValue(rep
, 'ticket')
3321 ticket_encpart
= None
3322 ticket_cipher
= None
3323 self
.assertIsNotNone(ticket
)
3324 if ticket
is not None: # Never None, but gives indentation
3325 self
.assertElementEqual(ticket
, 'tkt-vno', 5)
3326 self
.assertElementEqualUTF8(ticket
, 'realm', expected_srealm
)
3327 self
.assertElementEqualPrincipal(ticket
, 'sname', expected_sname
)
3328 self
.assertElementPresent(ticket
, 'enc-part')
3329 ticket_encpart
= self
.getElementValue(ticket
, 'enc-part')
3330 self
.assertIsNotNone(ticket_encpart
)
3331 if ticket_encpart
is not None: # Never None, but gives indentation
3332 self
.assertElementPresent(ticket_encpart
, 'etype')
3334 kdc_options
= kdc_exchange_dict
['kdc_options']
3335 pos
= len(tuple(krb5_asn1
.KDCOptions('enc-tkt-in-skey'))) - 1
3336 expect_kvno
= (pos
>= len(kdc_options
)
3337 or kdc_options
[pos
] != '1')
3339 # 'unspecified' means present, with any value != 0
3340 self
.assertElementKVNO(ticket_encpart
, 'kvno',
3341 self
.unspecified_kvno
)
3343 # For user-to-user, don't expect a kvno.
3344 self
.assertElementMissing(ticket_encpart
, 'kvno')
3346 self
.assertElementPresent(ticket_encpart
, 'cipher')
3347 ticket_cipher
= self
.getElementValue(ticket_encpart
, 'cipher')
3348 self
.assertElementPresent(rep
, 'enc-part')
3349 encpart
= self
.getElementValue(rep
, 'enc-part')
3350 encpart_cipher
= None
3351 self
.assertIsNotNone(encpart
)
3352 if encpart
is not None: # Never None, but gives indentation
3353 self
.assertElementPresent(encpart
, 'etype')
3354 self
.assertElementKVNO(ticket_encpart
, 'kvno', 'autodetect')
3355 self
.assertElementPresent(encpart
, 'cipher')
3356 encpart_cipher
= self
.getElementValue(encpart
, 'cipher')
3358 if self
.padata_checking
:
3359 self
.check_reply_padata(kdc_exchange_dict
,
3364 ticket_checksum
= None
3366 # Get the decryption key for the encrypted part
3367 encpart_decryption_key
, encpart_decryption_usage
= (
3368 self
.get_preauth_key(kdc_exchange_dict
))
3370 pa_dict
= self
.get_pa_dict(padata
)
3372 pk_as_rep
= pa_dict
.get(PADATA_PK_AS_REP
)
3373 if pk_as_rep
is not None:
3374 pk_as_rep_asn1_spec
= krb5_asn1
.PA_PK_AS_REP
3375 reply_key_pack_asn1_spec
= krb5_asn1
.ReplyKeyPack
3378 pk_as_rep
= pa_dict
.get(PADATA_PK_AS_REP_19
)
3379 pk_as_rep_asn1_spec
= krb5_asn1
.PA_PK_AS_REP_Win2k
3380 reply_key_pack_asn1_spec
= krb5_asn1
.ReplyKeyPack_Win2k
3382 if pk_as_rep
is not None:
3383 pk_as_rep
= self
.der_decode(pk_as_rep
,
3384 asn1Spec
=pk_as_rep_asn1_spec())
3386 using_pkinit
= kdc_exchange_dict
['using_pkinit']
3387 if using_pkinit
is PkInit
.PUBLIC_KEY
:
3388 content_info
= self
.der_decode(
3389 pk_as_rep
['encKeyPack'],
3390 asn1Spec
=krb5_asn1
.ContentInfo())
3391 self
.assertEqual(str(krb5_asn1
.id_envelopedData
),
3392 content_info
['contentType'])
3394 content
= self
.der_decode(content_info
['content'],
3395 asn1Spec
=krb5_asn1
.EnvelopedData())
3397 self
.assertEqual(0, content
['version'])
3398 originator_info
= content
['originatorInfo']
3399 self
.assertFalse(originator_info
.get('certs'))
3400 self
.assertFalse(originator_info
.get('crls'))
3401 self
.assertFalse(content
.get('unprotectedAttrs'))
3403 encrypted_content_info
= content
['encryptedContentInfo']
3404 recipient_infos
= content
['recipientInfos']
3406 self
.assertEqual(1, len(recipient_infos
))
3407 ktri
= recipient_infos
[0]['ktri']
3409 if self
.strict_checking
:
3410 self
.assertEqual(0, ktri
['version'])
3412 private_key
= encpart_decryption_key
3413 self
.assertIsInstance(private_key
,
3414 asymmetric
.rsa
.RSAPrivateKey
)
3416 client_subject_key_id
= (
3417 x509
.SubjectKeyIdentifier
.from_public_key(
3418 private_key
.public_key()))
3420 # Check that the client certificate is named as the recipient.
3421 ktri_rid
= ktri
['rid']
3423 issuer_and_serial_number
= ktri_rid
[
3424 'issuerAndSerialNumber']
3426 subject_key_id
= ktri_rid
['subjectKeyIdentifier']
3427 self
.assertEqual(subject_key_id
,
3428 client_subject_key_id
.digest
)
3430 client_certificate
= kdc_exchange_dict
['client_cert']
3432 self
.assertIsNotNone(issuer_and_serial_number
['issuer'])
3433 self
.assertEqual(issuer_and_serial_number
['serialNumber'],
3434 client_certificate
.serial_number
)
3436 key_encryption_algorithm
= ktri
['keyEncryptionAlgorithm']
3437 self
.assertEqual(str(krb5_asn1
.rsaEncryption
),
3438 key_encryption_algorithm
['algorithm'])
3439 if self
.strict_checking
:
3442 key_encryption_algorithm
.get('parameters'))
3444 encrypted_key
= ktri
['encryptedKey']
3447 pad_len
= 256 - len(encrypted_key
)
3449 encrypted_key
= bytes(pad_len
) + encrypted_key
3450 decrypted_key
= private_key
.decrypt(
3452 padding
=asymmetric
.padding
.PKCS1v15())
3454 self
.assertEqual(str(krb5_asn1
.id_signedData
),
3455 encrypted_content_info
['contentType'])
3457 encrypted_content
= encrypted_content_info
['encryptedContent']
3458 encryption_algorithm
= encrypted_content_info
[
3459 'contentEncryptionAlgorithm']
3461 cipher_algorithm
= self
.cipher_from_algorithm(encryption_algorithm
['algorithm'])
3463 # This will serve as the IV.
3464 parameters
= self
.der_decode(
3465 encryption_algorithm
['parameters'],
3466 asn1Spec
=krb5_asn1
.CMSCBCParameter())
3468 # Decrypt the content.
3469 cipher
= Cipher(cipher_algorithm(decrypted_key
),
3470 modes
.CBC(parameters
),
3472 decryptor
= cipher
.decryptor()
3473 decrypted_content
= decryptor
.update(encrypted_content
)
3474 decrypted_content
+= decryptor
.finalize()
3476 # The padding doesn’t fully comply to PKCS7 with a specified
3477 # blocksize, so we must unpad the data ourselves.
3478 decrypted_content
= self
.unpad(decrypted_content
)
3481 signed_data_rfc2315
= None
3483 first_tag
= decrypted_content
[0]
3484 if first_tag
== 0x30: # ASN.1 SEQUENCE tag
3485 signed_data
= decrypted_content
3487 # Windows encodes the ASN.1 incorrectly, neglecting to add
3488 # the SEQUENCE tag. We’ll have to prepend it ourselves in
3489 # order for the decoding to work.
3490 encoded_len
= self
.asn1_length(decrypted_content
)
3491 decrypted_content
= bytes([0x30]) + encoded_len
+ (
3494 if first_tag
== 0x02: # ASN.1 INTEGER tag
3496 # The INTEGER tag indicates that the data is encoded
3497 # with the earlier variant of the SignedData ASN.1
3498 # schema specified in RFC2315, as per [MS-PKCA] 2.2.4
3500 signed_data_rfc2315
= decrypted_content
3502 elif first_tag
== 0x06: # ASN.1 OBJECT IDENTIFIER tag
3504 # The OBJECT IDENTIFIER tag indicates that the data is
3505 # encoded as SignedData and wrapped in a ContentInfo
3506 # structure, which we shall have to decode first. This
3507 # seems to be the case when the supportedCMSTypes field
3508 # in the client’s AuthPack is missing or empty.
3510 content_info
= self
.der_decode(
3512 asn1Spec
=krb5_asn1
.ContentInfo())
3513 self
.assertEqual(str(krb5_asn1
.id_signedData
),
3514 content_info
['contentType'])
3515 signed_data
= content_info
['content']
3517 self
.fail(f
'got reply with unknown initial tag '
3520 if signed_data
is not None:
3521 signed_data
= self
.der_decode(
3522 signed_data
, asn1Spec
=krb5_asn1
.SignedData())
3524 encap_content_info
= signed_data
['encapContentInfo']
3526 content_type
= encap_content_info
['eContentType']
3527 content
= encap_content_info
['eContent']
3528 elif signed_data_rfc2315
is not None:
3529 signed_data
= self
.der_decode(
3530 signed_data_rfc2315
,
3531 asn1Spec
=krb5_asn1
.SignedData_RFC2315())
3533 encap_content_info
= signed_data
['contentInfo']
3535 content_type
= encap_content_info
['contentType']
3536 content
= self
.der_decode(
3537 encap_content_info
['content'],
3538 asn1Spec
=pyasn1
.type.univ
.OctetString())
3540 self
.fail('we must have got SignedData')
3542 self
.assertEqual(str(krb5_asn1
.id_pkinit_rkeyData
),
3544 reply_key_pack
= self
.der_decode(
3545 content
, asn1Spec
=reply_key_pack_asn1_spec())
3547 req_obj
= kdc_exchange_dict
['req_obj']
3548 req_asn1Spec
= kdc_exchange_dict
['req_asn1Spec']
3549 req_obj
= self
.der_encode(req_obj
,
3550 asn1Spec
=req_asn1Spec())
3552 reply_key
= reply_key_pack
['replyKey']
3554 # Reply the encpart decryption key with the decrypted key from
3556 encpart_decryption_key
= self
.SessionKey_create(
3557 etype
=reply_key
['keytype'],
3558 contents
=reply_key
['keyvalue'],
3562 as_checksum
= reply_key_pack
['asChecksum']
3564 # Verify the checksum over the AS request body.
3565 kcrypto
.verify_checksum(as_checksum
['cksumtype'],
3566 encpart_decryption_key
.key
,
3569 as_checksum
['checksum'])
3570 elif using_pkinit
is PkInit
.DIFFIE_HELLMAN
:
3571 content_info
= self
.der_decode(
3572 pk_as_rep
['dhInfo']['dhSignedData'],
3573 asn1Spec
=krb5_asn1
.ContentInfo())
3574 self
.assertEqual(str(krb5_asn1
.id_signedData
),
3575 content_info
['contentType'])
3577 signed_data
= self
.der_decode(content_info
['content'],
3578 asn1Spec
=krb5_asn1
.SignedData())
3580 encap_content_info
= signed_data
['encapContentInfo']
3581 content
= encap_content_info
['eContent']
3583 self
.assertEqual(str(krb5_asn1
.id_pkinit_DHKeyData
),
3584 encap_content_info
['eContentType'])
3586 dh_key_info
= self
.der_decode(
3587 content
, asn1Spec
=krb5_asn1
.KDCDHKeyInfo())
3589 self
.assertNotIn('dhKeyExpiration', dh_key_info
)
3591 dh_private_key
= encpart_decryption_key
3592 self
.assertIsInstance(dh_private_key
,
3593 asymmetric
.dh
.DHPrivateKey
)
3595 self
.assertElementEqual(dh_key_info
, 'nonce',
3596 kdc_exchange_dict
['pk_nonce'])
3598 dh_public_key_data
= self
.bytes_from_bit_string(
3599 dh_key_info
['subjectPublicKey'])
3600 dh_public_key_decoded
= self
.der_decode(
3601 dh_public_key_data
, asn1Spec
=krb5_asn1
.DHPublicKey())
3603 dh_numbers
= dh_private_key
.parameters().parameter_numbers()
3605 public_numbers
= asymmetric
.dh
.DHPublicNumbers(
3606 dh_public_key_decoded
, dh_numbers
)
3607 dh_public_key
= public_numbers
.public_key(default_backend())
3609 # Perform the Diffie-Hellman key exchange.
3610 shared_secret
= dh_private_key
.exchange(dh_public_key
)
3612 # Pad the shared secret out to the length of ‘p’.
3613 p_len
= self
.length_in_bytes(dh_numbers
.p
)
3614 padding_len
= p_len
- len(shared_secret
)
3615 self
.assertGreaterEqual(padding_len
, 0)
3616 padded_shared_secret
= bytes(padding_len
) + shared_secret
3618 reply_key_enc_type
= self
.expected_etype(kdc_exchange_dict
)
3620 # At the moment, we don’t specify a nonce in the request, so we
3621 # can assume these are empty.
3625 ciphertext
= padded_shared_secret
+ client_nonce
+ server_nonce
3627 # Replace the encpart decryption key with the key derived from
3628 # the Diffie-Hellman key exchange.
3629 encpart_decryption_key
= self
.octetstring2key(
3630 ciphertext
, reply_key_enc_type
)
3632 self
.fail(f
'invalid value for using_pkinit: {using_pkinit}')
3634 self
.assertEqual(3, signed_data
['version'])
3636 digest_algorithms
= signed_data
['digestAlgorithms']
3637 self
.assertEqual(1, len(digest_algorithms
))
3638 digest_algorithm
= digest_algorithms
[0]
3639 # Ensure the hash algorithm is valid.
3640 _
= self
.hash_from_algorithm_id(digest_algorithm
)
3642 self
.assertFalse(signed_data
.get('crls'))
3644 signer_infos
= signed_data
['signerInfos']
3645 self
.assertEqual(1, len(signer_infos
))
3646 signer_info
= signer_infos
[0]
3648 self
.assertEqual(1, signer_info
['version'])
3650 # Get the certificate presented by the KDC.
3651 kdc_certificates
= signed_data
['certificates']
3652 self
.assertEqual(1, len(kdc_certificates
))
3653 kdc_certificate
= self
.der_encode(
3654 kdc_certificates
[0], asn1Spec
=krb5_asn1
.CertificateChoices())
3655 kdc_certificate
= x509
.load_der_x509_certificate(kdc_certificate
,
3658 # Verify that the KDC’s certificate is named as the signer.
3659 sid
= signer_info
['sid']
3661 issuer_and_serial_number
= sid
['issuerAndSerialNumber']
3663 extension
= kdc_certificate
.extensions
.get_extension_for_oid(
3664 x509
.oid
.ExtensionOID
.SUBJECT_KEY_IDENTIFIER
)
3665 cert_subject_key_id
= extension
.value
.digest
3666 self
.assertEqual(sid
['subjectKeyIdentifier'], cert_subject_key_id
)
3668 self
.assertIsNotNone(issuer_and_serial_number
['issuer'])
3669 self
.assertEqual(issuer_and_serial_number
['serialNumber'],
3670 kdc_certificate
.serial_number
)
3672 digest_algorithm
= signer_info
['digestAlgorithm']
3673 digest_hash_fn
= self
.hash_from_algorithm_id(digest_algorithm
)
3675 signed_attrs
= signer_info
['signedAttrs']
3676 self
.assertEqual(2, len(signed_attrs
))
3678 signed_attr0
= signed_attrs
[0]
3679 self
.assertEqual(str(krb5_asn1
.id_contentType
),
3680 signed_attr0
['attrType'])
3681 signed_attr0_values
= signed_attr0
['attrValues']
3682 self
.assertEqual(1, len(signed_attr0_values
))
3683 signed_attr0_value
= self
.der_decode(
3684 signed_attr0_values
[0],
3685 asn1Spec
=krb5_asn1
.ContentType())
3686 if using_pkinit
is PkInit
.DIFFIE_HELLMAN
:
3687 self
.assertEqual(str(krb5_asn1
.id_pkinit_DHKeyData
),
3690 self
.assertEqual(str(krb5_asn1
.id_pkinit_rkeyData
),
3693 signed_attr1
= signed_attrs
[1]
3694 self
.assertEqual(str(krb5_asn1
.id_messageDigest
),
3695 signed_attr1
['attrType'])
3696 signed_attr1_values
= signed_attr1
['attrValues']
3697 self
.assertEqual(1, len(signed_attr1_values
))
3698 message_digest
= self
.der_decode(signed_attr1_values
[0],
3699 krb5_asn1
.MessageDigest())
3701 signature_algorithm
= signer_info
['signatureAlgorithm']
3702 hash_fn
= self
.hash_from_algorithm_id(signature_algorithm
)
3704 # Compute the hash of the content to be signed. With the
3705 # Diffie-Hellman key exchange, this signature is over the type
3706 # KDCDHKeyInfo; otherwise, it is over the type ReplyKeyPack.
3707 digest
= hashes
.Hash(digest_hash_fn(), default_backend())
3708 digest
.update(content
)
3709 digest
= digest
.finalize()
3711 # Verify the hash. Note: this is a non–constant time comparison.
3712 self
.assertEqual(digest
, message_digest
)
3714 # Re-encode the attributes ready for verifying the signature.
3715 cms_attrs
= self
.der_encode(signed_attrs
,
3716 asn1Spec
=krb5_asn1
.CMSAttributes())
3718 # Verify the signature.
3719 kdc_public_key
= kdc_certificate
.public_key()
3720 kdc_public_key
.verify(
3721 signer_info
['signature'],
3723 asymmetric
.padding
.PKCS1v15(),
3726 self
.assertFalse(signer_info
.get('unsignedAttrs'))
3728 if armor_key
is not None:
3729 if PADATA_FX_FAST
in pa_dict
:
3730 fx_fast_data
= pa_dict
[PADATA_FX_FAST
]
3731 fast_response
= self
.check_fx_fast_data(kdc_exchange_dict
,
3736 if 'strengthen-key' in fast_response
:
3737 strengthen_key
= self
.EncryptionKey_import(
3738 fast_response
['strengthen-key'])
3739 encpart_decryption_key
= (
3740 self
.generate_strengthen_reply_key(
3742 encpart_decryption_key
))
3744 fast_finished
= fast_response
.get('finished')
3745 if fast_finished
is not None:
3746 ticket_checksum
= fast_finished
['ticket-checksum']
3748 self
.check_rep_padata(kdc_exchange_dict
,
3750 fast_response
['padata'],
3753 ticket_private
= None
3754 if ticket_decryption_key
is not None:
3755 self
.assertElementEqual(ticket_encpart
, 'etype',
3756 ticket_decryption_key
.etype
)
3757 self
.assertElementKVNO(ticket_encpart
, 'kvno',
3758 ticket_decryption_key
.kvno
)
3759 ticket_decpart
= ticket_decryption_key
.decrypt(KU_TICKET
,
3761 ticket_private
= self
.der_decode(
3763 asn1Spec
=krb5_asn1
.EncTicketPart())
3765 encpart_private
= None
3766 self
.assertIsNotNone(encpart_decryption_key
)
3767 if encpart_decryption_key
is not None:
3768 self
.assertElementEqual(encpart
, 'etype',
3769 encpart_decryption_key
.etype
)
3770 if self
.strict_checking
:
3771 self
.assertElementKVNO(encpart
, 'kvno',
3772 encpart_decryption_key
.kvno
)
3773 rep_decpart
= encpart_decryption_key
.decrypt(
3774 encpart_decryption_usage
,
3776 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
3777 # application tag 26
3779 encpart_private
= self
.der_decode(
3781 asn1Spec
=rep_encpart_asn1Spec())
3783 encpart_private
= self
.der_decode(
3785 asn1Spec
=krb5_asn1
.EncTGSRepPart())
3787 kdc_exchange_dict
['reply_key'] = encpart_decryption_key
3789 self
.assertIsNotNone(check_kdc_private_fn
)
3790 if check_kdc_private_fn
is not None:
3791 check_kdc_private_fn(kdc_exchange_dict
, callback_dict
,
3792 rep
, ticket_private
, encpart_private
,
3797 def check_fx_fast_data(self
,
3802 expect_strengthen_key
=True):
3803 fx_fast_data
= self
.der_decode(fx_fast_data
,
3804 asn1Spec
=krb5_asn1
.PA_FX_FAST_REPLY())
3806 enc_fast_rep
= fx_fast_data
['armored-data']['enc-fast-rep']
3807 self
.assertEqual(enc_fast_rep
['etype'], armor_key
.etype
)
3809 fast_rep
= armor_key
.decrypt(KU_FAST_REP
, enc_fast_rep
['cipher'])
3811 fast_response
= self
.der_decode(fast_rep
,
3812 asn1Spec
=krb5_asn1
.KrbFastResponse())
3814 if expect_strengthen_key
and self
.strict_checking
:
3815 self
.assertIn('strengthen-key', fast_response
)
3818 self
.assertIn('finished', fast_response
)
3820 # Ensure that the nonce matches the nonce in the body of the request
3822 nonce
= kdc_exchange_dict
['nonce']
3823 self
.assertEqual(nonce
, fast_response
['nonce'])
3825 return fast_response
3827 def generic_check_kdc_private(self
,
3834 kdc_options
= kdc_exchange_dict
['kdc_options']
3835 canon_pos
= len(tuple(krb5_asn1
.KDCOptions('canonicalize'))) - 1
3836 canonicalize
= (canon_pos
< len(kdc_options
)
3837 and kdc_options
[canon_pos
] == '1')
3838 renewable_pos
= len(tuple(krb5_asn1
.KDCOptions('renewable'))) - 1
3839 renewable
= (renewable_pos
< len(kdc_options
)
3840 and kdc_options
[renewable_pos
] == '1')
3841 renew_pos
= len(tuple(krb5_asn1
.KDCOptions('renew'))) - 1
3842 renew
= (renew_pos
< len(kdc_options
)
3843 and kdc_options
[renew_pos
] == '1')
3844 expect_renew_till
= renewable
or renew
3846 expected_crealm
= kdc_exchange_dict
['expected_crealm']
3847 expected_cname
= kdc_exchange_dict
['expected_cname']
3848 expected_srealm
= kdc_exchange_dict
['expected_srealm']
3849 expected_sname
= kdc_exchange_dict
['expected_sname']
3850 ticket_decryption_key
= kdc_exchange_dict
['ticket_decryption_key']
3852 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
3854 expected_flags
= kdc_exchange_dict
.get('expected_flags')
3855 unexpected_flags
= kdc_exchange_dict
.get('unexpected_flags')
3857 ticket
= self
.getElementValue(rep
, 'ticket')
3859 if ticket_checksum
is not None:
3860 armor_key
= kdc_exchange_dict
['armor_key']
3861 self
.verify_ticket_checksum(ticket
, ticket_checksum
, armor_key
)
3863 to_rodc
= kdc_exchange_dict
['to_rodc']
3865 krbtgt_creds
= self
.get_rodc_krbtgt_creds()
3867 krbtgt_creds
= self
.get_krbtgt_creds()
3868 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
3870 krbtgt_keys
= [krbtgt_key
]
3871 if not self
.strict_checking
:
3872 krbtgt_key_rc4
= self
.TicketDecryptionKey_from_creds(
3874 etype
=kcrypto
.Enctype
.RC4
)
3875 krbtgt_keys
.append(krbtgt_key_rc4
)
3877 if self
.expect_pac
and self
.is_tgs(expected_sname
):
3880 expect_pac
= kdc_exchange_dict
['expect_pac']
3882 ticket_session_key
= None
3883 if ticket_private
is not None:
3884 self
.assertElementFlags(ticket_private
, 'flags',
3887 self
.assertElementPresent(ticket_private
, 'key')
3888 ticket_key
= self
.getElementValue(ticket_private
, 'key')
3889 self
.assertIsNotNone(ticket_key
)
3890 if ticket_key
is not None: # Never None, but gives indentation
3891 self
.assertElementPresent(ticket_key
, 'keytype')
3892 self
.assertElementPresent(ticket_key
, 'keyvalue')
3893 ticket_session_key
= self
.EncryptionKey_import(ticket_key
)
3894 self
.assertElementEqualUTF8(ticket_private
, 'crealm',
3896 if self
.cname_checking
:
3897 self
.assertElementEqualPrincipal(ticket_private
, 'cname',
3899 self
.assertElementPresent(ticket_private
, 'transited')
3900 self
.assertElementPresent(ticket_private
, 'authtime')
3901 if self
.strict_checking
:
3902 self
.assertElementPresent(ticket_private
, 'starttime')
3903 self
.assertElementPresent(ticket_private
, 'endtime')
3904 if self
.strict_checking
:
3905 if expect_renew_till
:
3906 self
.assertElementPresent(ticket_private
, 'renew-till')
3908 self
.assertElementMissing(ticket_private
, 'renew-till')
3909 if self
.strict_checking
:
3910 self
.assertElementMissing(ticket_private
, 'caddr')
3911 if expect_pac
is not None:
3913 self
.assertElementPresent(ticket_private
,
3914 'authorization-data',
3915 expect_empty
=not expect_pac
)
3917 # It is more correct to not have an authorization-data
3918 # present than an empty one.
3920 # https://github.com/krb5/krb5/pull/1225#issuecomment-995104193
3921 v
= self
.getElementValue(ticket_private
,
3922 'authorization-data')
3924 self
.assertElementPresent(ticket_private
,
3925 'authorization-data',
3928 encpart_session_key
= None
3929 if encpart_private
is not None:
3930 self
.assertElementPresent(encpart_private
, 'key')
3931 encpart_key
= self
.getElementValue(encpart_private
, 'key')
3932 self
.assertIsNotNone(encpart_key
)
3933 if encpart_key
is not None: # Never None, but gives indentation
3934 self
.assertElementPresent(encpart_key
, 'keytype')
3935 self
.assertElementPresent(encpart_key
, 'keyvalue')
3936 encpart_session_key
= self
.EncryptionKey_import(encpart_key
)
3937 self
.assertElementPresent(encpart_private
, 'last-req')
3938 expected_nonce
= kdc_exchange_dict
.get('pk_nonce')
3939 if not expected_nonce
:
3940 expected_nonce
= kdc_exchange_dict
['nonce']
3941 self
.assertElementEqual(encpart_private
, 'nonce',
3943 if rep_msg_type
== KRB_AS_REP
:
3944 if self
.strict_checking
:
3945 self
.assertElementPresent(encpart_private
,
3948 self
.assertElementMissing(encpart_private
,
3950 self
.assertElementFlags(encpart_private
, 'flags',
3953 self
.assertElementPresent(encpart_private
, 'authtime')
3954 if self
.strict_checking
:
3955 self
.assertElementPresent(encpart_private
, 'starttime')
3956 self
.assertElementPresent(encpart_private
, 'endtime')
3957 if self
.strict_checking
:
3958 if expect_renew_till
:
3959 self
.assertElementPresent(encpart_private
, 'renew-till')
3961 self
.assertElementMissing(encpart_private
, 'renew-till')
3962 self
.assertElementEqualUTF8(encpart_private
, 'srealm',
3964 self
.assertElementEqualPrincipal(encpart_private
, 'sname',
3966 if self
.strict_checking
:
3967 self
.assertElementMissing(encpart_private
, 'caddr')
3969 sent_pac_options
= self
.get_sent_pac_options(kdc_exchange_dict
)
3971 sent_enc_pa_rep
= self
.sent_enc_pa_rep(kdc_exchange_dict
)
3973 enc_padata
= self
.getElementValue(encpart_private
,
3974 'encrypted-pa-data')
3975 if (canonicalize
or '1' in sent_pac_options
or (
3976 rep_msg_type
== KRB_AS_REP
and sent_enc_pa_rep
)):
3977 if self
.strict_checking
:
3978 self
.assertIsNotNone(enc_padata
)
3980 if enc_padata
is not None:
3981 enc_pa_dict
= self
.get_pa_dict(enc_padata
)
3982 if self
.strict_checking
:
3984 self
.assertIn(PADATA_SUPPORTED_ETYPES
, enc_pa_dict
)
3986 self
.assertNotIn(PADATA_SUPPORTED_ETYPES
,
3989 if '1' in sent_pac_options
:
3990 self
.assertIn(PADATA_PAC_OPTIONS
, enc_pa_dict
)
3992 self
.assertNotIn(PADATA_PAC_OPTIONS
, enc_pa_dict
)
3994 if rep_msg_type
== KRB_AS_REP
and sent_enc_pa_rep
:
3995 self
.assertIn(PADATA_REQ_ENC_PA_REP
, enc_pa_dict
)
3997 self
.assertNotIn(PADATA_REQ_ENC_PA_REP
, enc_pa_dict
)
3999 if PADATA_SUPPORTED_ETYPES
in enc_pa_dict
:
4000 expected_supported_etypes
= kdc_exchange_dict
[
4001 'expected_supported_etypes']
4003 (supported_etypes
,) = struct
.unpack(
4005 enc_pa_dict
[PADATA_SUPPORTED_ETYPES
])
4007 ignore_bits
= (security
.KERB_ENCTYPE_DES_CBC_CRC |
4008 security
.KERB_ENCTYPE_DES_CBC_MD5
)
4011 supported_etypes
& ~ignore_bits
,
4012 expected_supported_etypes
& ~ignore_bits
,
4013 f
'PADATA_SUPPORTED_ETYPES: got: {supported_etypes} (0x{supported_etypes:X}), '
4014 f
'expected: {expected_supported_etypes} (0x{expected_supported_etypes:X})')
4016 if PADATA_PAC_OPTIONS
in enc_pa_dict
:
4017 pac_options
= self
.der_decode(
4018 enc_pa_dict
[PADATA_PAC_OPTIONS
],
4019 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
4021 self
.assertElementEqual(pac_options
, 'options',
4024 if PADATA_REQ_ENC_PA_REP
in enc_pa_dict
:
4025 enc_pa_rep
= enc_pa_dict
[PADATA_REQ_ENC_PA_REP
]
4027 enc_pa_rep
= self
.der_decode(
4029 asn1Spec
=krb5_asn1
.Checksum())
4031 reply_key
= kdc_exchange_dict
['reply_key']
4032 req_obj
= kdc_exchange_dict
['req_obj']
4033 req_asn1Spec
= kdc_exchange_dict
['req_asn1Spec']
4035 req_obj
= self
.der_encode(req_obj
,
4036 asn1Spec
=req_asn1Spec())
4038 checksum
= enc_pa_rep
['checksum']
4039 ctype
= enc_pa_rep
['cksumtype']
4041 reply_key
.verify_checksum(KU_AS_REQ
,
4046 if enc_padata
is not None:
4047 self
.assertEqual(enc_padata
, [])
4049 if ticket_session_key
is not None and encpart_session_key
is not None:
4050 self
.assertEqual(ticket_session_key
.etype
,
4051 encpart_session_key
.etype
)
4052 self
.assertEqual(ticket_session_key
.key
.contents
,
4053 encpart_session_key
.key
.contents
)
4054 if encpart_session_key
is not None:
4055 session_key
= encpart_session_key
4057 session_key
= ticket_session_key
4058 ticket_creds
= KerberosTicketCreds(
4061 crealm
=expected_crealm
,
4062 cname
=expected_cname
,
4063 srealm
=expected_srealm
,
4064 sname
=expected_sname
,
4065 decryption_key
=ticket_decryption_key
,
4066 ticket_private
=ticket_private
,
4067 encpart_private
=encpart_private
)
4069 if ticket_private
is not None:
4070 pac_data
= self
.get_ticket_pac(ticket_creds
, expect_pac
=expect_pac
)
4071 if expect_pac
is True:
4072 self
.assertIsNotNone(pac_data
)
4073 elif expect_pac
is False:
4074 self
.assertIsNone(pac_data
)
4076 if pac_data
is not None:
4077 self
.check_pac_buffers(pac_data
, kdc_exchange_dict
)
4079 expect_ticket_checksum
= kdc_exchange_dict
['expect_ticket_checksum']
4080 expect_full_checksum
= kdc_exchange_dict
['expect_full_checksum']
4081 if expect_ticket_checksum
or expect_full_checksum
:
4082 self
.assertIsNotNone(ticket_decryption_key
)
4084 if ticket_decryption_key
is not None:
4085 service_ticket
= (rep_msg_type
== KRB_TGS_REP
4086 and not self
.is_tgs_principal(expected_sname
))
4087 self
.verify_ticket(ticket_creds
, krbtgt_keys
,
4088 service_ticket
=service_ticket
,
4089 expect_pac
=expect_pac
,
4090 expect_ticket_checksum
=expect_ticket_checksum
4091 or self
.tkt_sig_support
,
4092 expect_full_checksum
=expect_full_checksum
4093 or self
.full_sig_support
)
4095 kdc_exchange_dict
['rep_ticket_creds'] = ticket_creds
4097 # Check the SIDs in a LOGON_INFO PAC buffer.
4098 def check_logon_info_sids(self
, logon_info_buffer
, kdc_exchange_dict
):
4099 info3
= logon_info_buffer
.info
.info
.info3
4100 logon_info
= info3
.base
4101 resource_groups
= logon_info_buffer
.info
.info
.resource_groups
4103 expected_groups
= kdc_exchange_dict
['expected_groups']
4104 unexpected_groups
= kdc_exchange_dict
['unexpected_groups']
4105 expected_domain_sid
= kdc_exchange_dict
['expected_domain_sid']
4106 expected_sid
= kdc_exchange_dict
['expected_sid']
4108 domain_sid
= logon_info
.domain_sid
4109 if expected_domain_sid
is not None:
4110 self
.assertEqual(expected_domain_sid
, str(domain_sid
))
4112 if expected_sid
is not None:
4113 got_sid
= f
'{domain_sid}-{logon_info.rid}'
4114 self
.assertEqual(expected_sid
, got_sid
)
4116 if expected_groups
is None and unexpected_groups
is None:
4117 # Nothing more to do.
4120 # Check the SIDs in the PAC.
4122 # Form a representation of the PAC, containing at first the primary
4124 primary_sid
= f
'{domain_sid}-{logon_info.primary_gid}'
4126 (primary_sid
, self
.SidType
.PRIMARY_GID
, None),
4129 # Collect the Extra SIDs.
4130 if info3
.sids
is not None:
4131 self
.assertTrue(logon_info
.user_flags
& (
4132 netlogon
.NETLOGON_EXTRA_SIDS
),
4133 'extra SIDs present, but EXTRA_SIDS flag not set')
4134 self
.assertTrue(info3
.sids
, 'got empty SIDs')
4136 for sid_attr
in info3
.sids
:
4137 got_sid
= str(sid_attr
.sid
)
4138 if unexpected_groups
is not None:
4139 self
.assertNotIn(got_sid
, unexpected_groups
)
4142 self
.SidType
.EXTRA_SID
,
4143 sid_attr
.attributes
)
4144 self
.assertNotIn(pac_sid
, pac_sids
, 'got duplicated SID')
4145 pac_sids
.add(pac_sid
)
4147 self
.assertFalse(logon_info
.user_flags
& (
4148 netlogon
.NETLOGON_EXTRA_SIDS
),
4149 'no extra SIDs present, but EXTRA_SIDS flag set')
4151 # Collect the Base RIDs.
4152 if logon_info
.groups
.rids
is not None:
4153 self
.assertTrue(logon_info
.groups
.rids
, 'got empty RIDs')
4155 for group
in logon_info
.groups
.rids
:
4156 got_sid
= f
'{domain_sid}-{group.rid}'
4157 if unexpected_groups
is not None:
4158 self
.assertNotIn(got_sid
, unexpected_groups
)
4160 pac_sid
= (got_sid
, self
.SidType
.BASE_SID
, group
.attributes
)
4161 self
.assertNotIn(pac_sid
, pac_sids
, 'got duplicated SID')
4162 pac_sids
.add(pac_sid
)
4164 # Collect the Resource SIDs.
4165 expect_resource_groups_flag
= kdc_exchange_dict
[
4166 'expect_resource_groups_flag']
4167 expect_set_reason
= ''
4168 expect_reset_reason
= ''
4169 if expect_resource_groups_flag
is None:
4170 expect_resource_groups_flag
= (
4171 resource_groups
.groups
.rids
is not None)
4172 expect_set_reason
= 'resource groups present, but '
4173 expect_reset_reason
= 'no resource groups present, but '
4175 if expect_resource_groups_flag
:
4177 logon_info
.user_flags
& netlogon
.NETLOGON_RESOURCE_GROUPS
,
4178 f
'{expect_set_reason}RESOURCE_GROUPS flag unexpectedly reset')
4181 logon_info
.user_flags
& netlogon
.NETLOGON_RESOURCE_GROUPS
,
4182 f
'{expect_reset_reason}RESOURCE_GROUPS flag unexpectedly set')
4184 if resource_groups
.groups
.rids
is not None:
4185 self
.assertTrue(resource_groups
.groups
.rids
, 'got empty RIDs')
4187 resource_group_sid
= resource_groups
.domain_sid
4188 for resource_group
in resource_groups
.groups
.rids
:
4189 got_sid
= f
'{resource_group_sid}-{resource_group.rid}'
4190 if unexpected_groups
is not None:
4191 self
.assertNotIn(got_sid
, unexpected_groups
)
4194 self
.SidType
.RESOURCE_SID
,
4195 resource_group
.attributes
)
4196 self
.assertNotIn(pac_sid
, pac_sids
, 'got duplicated SID')
4197 pac_sids
.add(pac_sid
)
4199 # Compare the aggregated SIDs against the set of expected SIDs.
4200 if expected_groups
is not None:
4201 if ... in expected_groups
:
4202 # The caller is only interested in asserting the
4203 # presence of particular groups, and doesn't mind if
4204 # other groups are present as well.
4206 self
.assertLessEqual(expected_groups
, pac_sids
,
4209 # The caller wants to make sure the groups match
4211 self
.assertEqual(expected_groups
, pac_sids
,
4214 def check_device_info(self
, device_info
, kdc_exchange_dict
):
4215 armor_tgt
= kdc_exchange_dict
['armor_tgt']
4216 armor_auth_data
= armor_tgt
.ticket_private
.get(
4217 'authorization-data')
4218 self
.assertIsNotNone(armor_auth_data
,
4219 'missing authdata for armor TGT')
4220 armor_pac_data
= self
.get_pac(armor_auth_data
)
4221 armor_pac
= ndr_unpack(krb5pac
.PAC_DATA
, armor_pac_data
)
4222 for armor_pac_buffer
in armor_pac
.buffers
:
4223 if armor_pac_buffer
.type == krb5pac
.PAC_TYPE_LOGON_INFO
:
4224 armor_info
= armor_pac_buffer
.info
.info
.info3
4227 self
.fail('missing logon info for armor PAC')
4228 self
.assertEqual(armor_info
.base
.rid
, device_info
.rid
)
4230 device_domain_sid
= kdc_exchange_dict
['expected_device_domain_sid']
4231 expected_device_groups
= kdc_exchange_dict
['expected_device_groups']
4232 if kdc_exchange_dict
['expect_device_info']:
4233 self
.assertIsNotNone(device_domain_sid
)
4234 self
.assertIsNotNone(expected_device_groups
)
4236 if device_domain_sid
is not None:
4237 self
.assertEqual(device_domain_sid
, str(device_info
.domain_sid
))
4239 device_domain_sid
= str(device_info
.domain_sid
)
4241 # Check the device info SIDs.
4243 # A representation of the device info groups.
4244 primary_sid
= f
'{device_domain_sid}-{device_info.primary_gid}'
4246 (primary_sid
, self
.SidType
.PRIMARY_GID
, None),
4249 # Collect the groups.
4250 if device_info
.groups
.rids
is not None:
4251 self
.assertTrue(device_info
.groups
.rids
, 'got empty RIDs')
4253 for group
in device_info
.groups
.rids
:
4254 got_sid
= f
'{device_domain_sid}-{group.rid}'
4256 device_sid
= (got_sid
, self
.SidType
.BASE_SID
, group
.attributes
)
4257 self
.assertNotIn(device_sid
, got_sids
, 'got duplicated SID')
4258 got_sids
.add(device_sid
)
4261 if device_info
.sids
is not None:
4262 self
.assertTrue(device_info
.sids
, 'got empty SIDs')
4264 for sid_attr
in device_info
.sids
:
4265 got_sid
= str(sid_attr
.sid
)
4267 in_a_domain
= sid_attr
.sid
.num_auths
== 5 and (
4268 str(sid_attr
.sid
).startswith('S-1-5-21-'))
4269 self
.assertFalse(in_a_domain
,
4270 f
'got unexpected SID for domain: {got_sid} '
4271 f
'(should be in device_info.domain_groups)')
4273 device_sid
= (got_sid
,
4274 self
.SidType
.EXTRA_SID
,
4275 sid_attr
.attributes
)
4276 self
.assertNotIn(device_sid
, got_sids
, 'got duplicated SID')
4277 got_sids
.add(device_sid
)
4279 # Collect the domain groups.
4280 if device_info
.domain_groups
is not None:
4281 self
.assertTrue(device_info
.domain_groups
, 'got empty domain groups')
4283 for domain_group
in device_info
.domain_groups
:
4284 self
.assertTrue(domain_group
, 'got empty domain group')
4286 got_domain_sids
= set()
4288 resource_group_sid
= domain_group
.domain_sid
4290 in_a_domain
= resource_group_sid
.num_auths
== 4 and (
4291 str(resource_group_sid
).startswith('S-1-5-21-'))
4294 f
'got unexpected domain SID for non-domain: {resource_group_sid} '
4295 f
'(should be in device_info.sids)')
4297 for resource_group
in domain_group
.groups
.rids
:
4298 got_sid
= f
'{resource_group_sid}-{resource_group.rid}'
4300 device_sid
= (got_sid
,
4301 self
.SidType
.RESOURCE_SID
,
4302 resource_group
.attributes
)
4303 self
.assertNotIn(device_sid
, got_domain_sids
, 'got duplicated SID')
4304 got_domain_sids
.add(device_sid
)
4306 got_domain_sids
= frozenset(got_domain_sids
)
4307 self
.assertNotIn(got_domain_sids
, got_sids
)
4308 got_sids
.add(got_domain_sids
)
4310 # Compare the aggregated device SIDs against the set of expected device
4312 if expected_device_groups
is not None:
4313 self
.assertEqual(expected_device_groups
, got_sids
,
4316 def check_pac_buffers(self
, pac_data
, kdc_exchange_dict
):
4317 pac
= ndr_unpack(krb5pac
.PAC_DATA
, pac_data
)
4319 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
4320 armor_tgt
= kdc_exchange_dict
['armor_tgt']
4322 compound_id
= rep_msg_type
== KRB_TGS_REP
and armor_tgt
is not None
4324 expected_sname
= kdc_exchange_dict
['expected_sname']
4325 expect_client_claims
= kdc_exchange_dict
['expect_client_claims']
4326 expect_device_info
= kdc_exchange_dict
['expect_device_info']
4327 expect_device_claims
= kdc_exchange_dict
['expect_device_claims']
4329 expected_types
= [krb5pac
.PAC_TYPE_LOGON_INFO
,
4330 krb5pac
.PAC_TYPE_SRV_CHECKSUM
,
4331 krb5pac
.PAC_TYPE_KDC_CHECKSUM
,
4332 krb5pac
.PAC_TYPE_LOGON_NAME
,
4333 krb5pac
.PAC_TYPE_UPN_DNS_INFO
]
4335 kdc_options
= kdc_exchange_dict
['kdc_options']
4336 pos
= len(tuple(krb5_asn1
.KDCOptions('cname-in-addl-tkt'))) - 1
4337 constrained_delegation
= (pos
< len(kdc_options
)
4338 and kdc_options
[pos
] == '1')
4339 if constrained_delegation
:
4340 expected_types
.append(krb5pac
.PAC_TYPE_CONSTRAINED_DELEGATION
)
4342 require_strict
= set()
4344 if not self
.tkt_sig_support
:
4345 require_strict
.add(krb5pac
.PAC_TYPE_TICKET_CHECKSUM
)
4346 if not self
.full_sig_support
:
4347 require_strict
.add(krb5pac
.PAC_TYPE_FULL_CHECKSUM
)
4349 expected_client_claims
= kdc_exchange_dict
['expected_client_claims']
4350 unexpected_client_claims
= kdc_exchange_dict
[
4351 'unexpected_client_claims']
4353 if self
.kdc_claims_support
and expect_client_claims
:
4354 expected_types
.append(krb5pac
.PAC_TYPE_CLIENT_CLAIMS_INFO
)
4357 expected_client_claims
,
4358 'expected client claims, but client claims not expected in '
4361 unexpected_client_claims
,
4362 'unexpected client claims, but client claims not expected in '
4365 if expect_client_claims
is None:
4366 unchecked
.add(krb5pac
.PAC_TYPE_CLIENT_CLAIMS_INFO
)
4368 expected_device_claims
= kdc_exchange_dict
['expected_device_claims']
4369 unexpected_device_claims
= kdc_exchange_dict
['unexpected_device_claims']
4371 expected_device_groups
= kdc_exchange_dict
['expected_device_groups']
4373 if (self
.kdc_claims_support
and self
.kdc_compound_id_support
4374 and expect_device_claims
and compound_id
):
4375 expected_types
.append(krb5pac
.PAC_TYPE_DEVICE_CLAIMS_INFO
)
4378 expect_device_claims
,
4379 'expected device claims buffer, but device claims not '
4382 expected_device_claims
,
4383 'expected device claims, but device claims not expected in '
4386 unexpected_device_claims
,
4387 'unexpected device claims, but device claims not expected in '
4390 if expect_device_claims
is None and compound_id
:
4391 unchecked
.add(krb5pac
.PAC_TYPE_DEVICE_CLAIMS_INFO
)
4393 if self
.kdc_compound_id_support
and compound_id
and expect_device_info
:
4394 expected_types
.append(krb5pac
.PAC_TYPE_DEVICE_INFO
)
4396 self
.assertFalse(expect_device_info
,
4397 'expected device info with no armor TGT or '
4398 'for non-TGS request')
4399 self
.assertFalse(expected_device_groups
,
4400 'expected device groups, but device info not '
4403 if expect_device_info
is None and compound_id
:
4404 unchecked
.add(krb5pac
.PAC_TYPE_DEVICE_INFO
)
4406 if rep_msg_type
== KRB_TGS_REP
:
4407 if not self
.is_tgs_principal(expected_sname
):
4408 expected_types
.append(krb5pac
.PAC_TYPE_TICKET_CHECKSUM
)
4409 expected_types
.append(krb5pac
.PAC_TYPE_FULL_CHECKSUM
)
4411 expect_extra_pac_buffers
= self
.is_tgs(expected_sname
)
4413 expect_pac_attrs
= kdc_exchange_dict
['expect_pac_attrs']
4415 if expect_pac_attrs
:
4416 expect_pac_attrs_pac_request
= kdc_exchange_dict
[
4417 'expect_pac_attrs_pac_request']
4419 expect_pac_attrs_pac_request
= kdc_exchange_dict
[
4422 if expect_pac_attrs
is None:
4423 if self
.expect_extra_pac_buffers
:
4424 expect_pac_attrs
= expect_extra_pac_buffers
4426 require_strict
.add(krb5pac
.PAC_TYPE_ATTRIBUTES_INFO
)
4427 if expect_pac_attrs
:
4428 expected_types
.append(krb5pac
.PAC_TYPE_ATTRIBUTES_INFO
)
4430 expect_requester_sid
= kdc_exchange_dict
['expect_requester_sid']
4431 expected_requester_sid
= kdc_exchange_dict
['expected_requester_sid']
4433 if expect_requester_sid
is None:
4434 if self
.expect_extra_pac_buffers
:
4435 expect_requester_sid
= expect_extra_pac_buffers
4437 require_strict
.add(krb5pac
.PAC_TYPE_REQUESTER_SID
)
4438 if expected_requester_sid
is not None:
4439 expect_requester_sid
= True
4440 if expect_requester_sid
:
4441 expected_types
.append(krb5pac
.PAC_TYPE_REQUESTER_SID
)
4443 sent_pk_as_req
= self
.sent_pk_as_req(kdc_exchange_dict
) or (
4444 self
.sent_pk_as_req_win2k(kdc_exchange_dict
))
4446 expected_types
.append(krb5pac
.PAC_TYPE_CREDENTIAL_INFO
)
4448 buffer_types
= [pac_buffer
.type
4449 for pac_buffer
in pac
.buffers
]
4450 self
.assertSequenceElementsEqual(
4451 expected_types
, buffer_types
,
4452 require_ordered
=False,
4453 require_strict
=require_strict
,
4454 unchecked
=unchecked
)
4456 expected_account_name
= kdc_exchange_dict
['expected_account_name']
4457 expected_sid
= kdc_exchange_dict
['expected_sid']
4459 expect_upn_dns_info_ex
= kdc_exchange_dict
['expect_upn_dns_info_ex']
4460 if expect_upn_dns_info_ex
is None and (
4461 expected_account_name
is not None
4462 or expected_sid
is not None):
4463 expect_upn_dns_info_ex
= True
4465 for pac_buffer
in pac
.buffers
:
4466 if pac_buffer
.type == krb5pac
.PAC_TYPE_CONSTRAINED_DELEGATION
:
4467 expected_proxy_target
= kdc_exchange_dict
[
4468 'expected_proxy_target']
4469 expected_transited_services
= kdc_exchange_dict
[
4470 'expected_transited_services']
4472 delegation_info
= pac_buffer
.info
.info
4474 self
.assertEqual(expected_proxy_target
,
4475 str(delegation_info
.proxy_target
))
4477 transited_services
= list(map(
4478 str, delegation_info
.transited_services
))
4479 self
.assertEqual(expected_transited_services
,
4482 elif pac_buffer
.type == krb5pac
.PAC_TYPE_LOGON_NAME
:
4483 expected_cname
= kdc_exchange_dict
['expected_cname']
4484 account_name
= '/'.join(expected_cname
['name-string'])
4486 self
.assertEqual(account_name
, pac_buffer
.info
.account_name
)
4488 elif pac_buffer
.type == krb5pac
.PAC_TYPE_LOGON_INFO
:
4489 info3
= pac_buffer
.info
.info
.info3
4490 logon_info
= info3
.base
4492 if expected_account_name
is not None:
4493 self
.assertEqual(expected_account_name
,
4494 str(logon_info
.account_name
))
4496 self
.check_logon_info_sids(pac_buffer
, kdc_exchange_dict
)
4498 elif pac_buffer
.type == krb5pac
.PAC_TYPE_UPN_DNS_INFO
:
4499 upn_dns_info
= pac_buffer
.info
4500 upn_dns_info_ex
= upn_dns_info
.ex
4502 expected_realm
= kdc_exchange_dict
['expected_crealm']
4503 self
.assertEqual(expected_realm
,
4504 upn_dns_info
.dns_domain_name
)
4506 expected_upn_name
= kdc_exchange_dict
['expected_upn_name']
4507 if expected_upn_name
is not None:
4508 self
.assertEqual(expected_upn_name
,
4509 upn_dns_info
.upn_name
)
4511 if expect_upn_dns_info_ex
:
4512 self
.assertIsNotNone(upn_dns_info_ex
)
4514 if upn_dns_info_ex
is not None:
4515 if expected_account_name
is not None:
4516 self
.assertEqual(expected_account_name
,
4517 upn_dns_info_ex
.samaccountname
)
4519 if expected_sid
is not None:
4520 self
.assertEqual(expected_sid
,
4521 str(upn_dns_info_ex
.objectsid
))
4523 elif (pac_buffer
.type == krb5pac
.PAC_TYPE_ATTRIBUTES_INFO
4524 and expect_pac_attrs
):
4525 attr_info
= pac_buffer
.info
4527 self
.assertEqual(2, attr_info
.flags_length
)
4529 flags
= attr_info
.flags
4531 requested_pac
= bool(flags
& 1)
4532 given_pac
= bool(flags
& 2)
4534 self
.assertEqual(expect_pac_attrs_pac_request
is True,
4536 self
.assertEqual(expect_pac_attrs_pac_request
is None,
4539 elif (pac_buffer
.type == krb5pac
.PAC_TYPE_REQUESTER_SID
4540 and expect_requester_sid
):
4541 requester_sid
= pac_buffer
.info
.sid
4543 if expected_requester_sid
is None:
4544 expected_requester_sid
= expected_sid
4545 if expected_sid
is not None:
4546 self
.assertEqual(expected_requester_sid
,
4549 elif pac_buffer
.type in {krb5pac
.PAC_TYPE_CLIENT_CLAIMS_INFO
,
4550 krb5pac
.PAC_TYPE_DEVICE_CLAIMS_INFO
}:
4551 remaining
= pac_buffer
.info
.remaining
4553 if pac_buffer
.type == krb5pac
.PAC_TYPE_CLIENT_CLAIMS_INFO
:
4554 claims_type
= 'client claims'
4555 expected_claims
= expected_client_claims
4556 unexpected_claims
= unexpected_client_claims
4558 claims_type
= 'device claims'
4559 expected_claims
= expected_device_claims
4560 unexpected_claims
= unexpected_device_claims
4563 # Windows may produce an empty claims buffer.
4564 self
.assertFalse(expected_claims
,
4565 f
'expected {claims_type}, but the PAC '
4566 f
'buffer was empty')
4570 empty_msg
= ', and {claims_type} were expected'
4572 empty_msg
= ' for {claims_type} (should be missing)'
4574 client_claims
= ndr_unpack(claims
.CLAIMS_SET_METADATA_NDR
,
4576 client_claims
= client_claims
.claims
.metadata
4577 self
.assertIsNotNone(client_claims
,
4578 f
'got empty CLAIMS_SET_METADATA_NDR '
4579 f
'inner structure {empty_msg}')
4581 self
.assertIsNotNone(client_claims
.claims_set
,
4582 f
'got empty CLAIMS_SET_METADATA '
4583 f
'structure {empty_msg}')
4585 uncompressed_size
= client_claims
.uncompressed_claims_set_size
4586 compression_format
= client_claims
.compression_format
4588 if uncompressed_size
< claims
.CLAIM_MINIMUM_BYTES_TO_COMPRESS
:
4589 self
.assertEqual(claims
.CLAIMS_COMPRESSION_FORMAT_NONE
,
4591 f
'{claims_type} unexpectedly '
4592 f
'compressed ({uncompressed_size} '
4593 f
'bytes uncompressed)')
4596 claims
.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF
,
4598 f
'{claims_type} unexpectedly not compressed '
4599 f
'({uncompressed_size} bytes uncompressed)')
4601 claims_set
= client_claims
.claims_set
.claims
.claims
4602 self
.assertIsNotNone(claims_set
,
4603 f
'got empty CLAIMS_SET_NDR inner '
4604 f
'structure {empty_msg}')
4606 claims_arrays
= claims_set
.claims_arrays
4607 self
.assertIsNotNone(claims_arrays
,
4608 f
'got empty CLAIMS_SET structure '
4610 self
.assertGreater(len(claims_arrays
), 0,
4611 f
'got empty claims array {empty_msg}')
4612 self
.assertEqual(len(claims_arrays
),
4613 claims_set
.claims_array_count
,
4614 f
'{claims_type} arrays size mismatch')
4618 for claims_array
in claims_arrays
:
4619 claim_entries
= claims_array
.claim_entries
4620 self
.assertIsNotNone(claim_entries
,
4621 f
'got empty CLAIMS_ARRAY structure '
4623 self
.assertGreater(len(claim_entries
), 0,
4624 f
'got empty claim entries array '
4626 self
.assertEqual(len(claim_entries
),
4627 claims_array
.claims_count
,
4628 f
'{claims_type} entries array size '
4631 for entry
in claim_entries
:
4632 if unexpected_claims
is not None:
4633 self
.assertNotIn(entry
.id, unexpected_claims
,
4634 f
'got unexpected {claims_type} '
4636 if expected_claims
is None:
4639 expected_claim
= expected_claims
.get(entry
.id)
4640 if expected_claim
is None:
4643 self
.assertNotIn(entry
.id, got_claims
,
4644 f
'got duplicate {claims_type}')
4646 self
.assertIsNotNone(entry
.values
.values
,
4647 f
'got {claims_type} with no '
4649 self
.assertGreater(len(entry
.values
.values
), 0,
4650 f
'got empty {claims_type} values '
4652 self
.assertEqual(len(entry
.values
.values
),
4653 entry
.values
.value_count
,
4654 f
'{claims_type} values array size '
4657 expected_claim_values
= expected_claim
.get('values')
4658 self
.assertIsNotNone(expected_claim_values
,
4659 f
'got expected {claims_type} '
4662 values
= type(expected_claim_values
)(
4663 entry
.values
.values
)
4665 got_claims
[entry
.id] = {
4666 'source_type': claims_array
.claims_source_type
,
4671 self
.assertEqual(expected_claims
, got_claims
or None,
4672 f
'{claims_type} did not match expectations')
4674 elif pac_buffer
.type == krb5pac
.PAC_TYPE_DEVICE_INFO
:
4675 device_info
= pac_buffer
.info
.info
4677 self
.check_device_info(device_info
, kdc_exchange_dict
)
4679 elif pac_buffer
.type == krb5pac
.PAC_TYPE_CREDENTIAL_INFO
:
4680 credential_info
= pac_buffer
.info
4682 expected_etype
= self
.expected_etype(kdc_exchange_dict
)
4684 self
.assertEqual(0, credential_info
.version
)
4685 self
.assertEqual(expected_etype
,
4686 credential_info
.encryption_type
)
4688 encrypted_data
= credential_info
.encrypted_data
4689 reply_key
= kdc_exchange_dict
['reply_key']
4691 data
= reply_key
.decrypt(KU_NON_KERB_SALT
, encrypted_data
)
4693 credential_data_ndr
= ndr_unpack(
4694 krb5pac
.PAC_CREDENTIAL_DATA_NDR
, data
)
4696 credential_data
= credential_data_ndr
.ctr
.data
4698 self
.assertEqual(1, credential_data
.credential_count
)
4699 self
.assertEqual(credential_data
.credential_count
,
4700 len(credential_data
.credentials
))
4702 package
= credential_data
.credentials
[0]
4703 self
.assertEqual('NTLM', str(package
.package_name
))
4705 ntlm_blob
= bytes(package
.credential
)
4707 ntlm_package
= ndr_unpack(krb5pac
.PAC_CREDENTIAL_NTLM_SECPKG
,
4710 self
.assertEqual(0, ntlm_package
.version
)
4711 self
.assertEqual(krb5pac
.PAC_CREDENTIAL_NTLM_HAS_NT_HASH
,
4714 creds
= kdc_exchange_dict
['creds']
4715 nt_password
= bytes(ntlm_package
.nt_password
.hash)
4716 self
.assertEqual(creds
.get_nt_hash(), nt_password
)
4718 lm_password
= bytes(ntlm_package
.lm_password
.hash)
4719 self
.assertEqual(bytes(16), lm_password
)
4721 def generic_check_kdc_error(self
,
4727 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
4729 expected_anon
= kdc_exchange_dict
['expected_anon']
4730 expected_srealm
= kdc_exchange_dict
['expected_srealm']
4731 expected_sname
= kdc_exchange_dict
['expected_sname']
4732 expected_error_mode
= kdc_exchange_dict
['expected_error_mode']
4734 sent_fast
= self
.sent_fast(kdc_exchange_dict
)
4736 fast_armor_type
= kdc_exchange_dict
['fast_armor_type']
4738 self
.assertElementEqual(rep
, 'pvno', 5)
4739 self
.assertElementEqual(rep
, 'msg-type', KRB_ERROR
)
4740 error_code
= self
.getElementValue(rep
, 'error-code')
4741 self
.assertIn(error_code
, expected_error_mode
)
4742 if self
.strict_checking
:
4743 self
.assertElementMissing(rep
, 'ctime')
4744 self
.assertElementMissing(rep
, 'cusec')
4745 self
.assertElementPresent(rep
, 'stime')
4746 self
.assertElementPresent(rep
, 'susec')
4747 # error-code checked above
4748 if expected_anon
and not inner
:
4749 expected_cname
= self
.PrincipalName_create(
4750 name_type
=NT_WELLKNOWN
,
4751 names
=['WELLKNOWN', 'ANONYMOUS'])
4752 self
.assertElementEqualPrincipal(rep
, 'cname', expected_cname
)
4753 elif self
.strict_checking
:
4754 self
.assertElementMissing(rep
, 'cname')
4755 if self
.strict_checking
:
4756 self
.assertElementMissing(rep
, 'crealm')
4757 self
.assertElementEqualUTF8(rep
, 'realm', expected_srealm
)
4758 self
.assertElementEqualPrincipal(rep
, 'sname', expected_sname
)
4759 self
.assertElementMissing(rep
, 'e-text')
4760 expect_status
= kdc_exchange_dict
['expect_status']
4761 expected_status
= kdc_exchange_dict
['expected_status']
4762 expect_edata
= kdc_exchange_dict
['expect_edata']
4763 if expect_edata
is None:
4764 expect_edata
= (error_code
!= KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
4765 and (not sent_fast
or fast_armor_type
is None
4766 or fast_armor_type
== FX_FAST_ARMOR_AP_REQUEST
)
4768 if inner
and expect_edata
is self
.expect_padata_outer
:
4769 expect_edata
= False
4770 if not expect_edata
:
4771 self
.assertFalse(expect_status
)
4772 if self
.strict_checking
or expect_status
is False:
4773 self
.assertElementMissing(rep
, 'e-data')
4775 edata
= self
.getElementValue(rep
, 'e-data')
4776 if self
.strict_checking
or expect_status
:
4777 self
.assertIsNotNone(edata
)
4778 if edata
is not None:
4780 error_data
= self
.der_decode(
4782 asn1Spec
=krb5_asn1
.KERB_ERROR_DATA())
4785 # The test requires that the KDC be declared to support
4786 # NTSTATUS values in e-data to proceed.
4788 self
.expect_nt_status
,
4789 'expected status code (which, according to '
4790 'EXPECT_NT_STATUS=0, the KDC does not support)')
4792 self
.fail('expected to get status code')
4794 rep_padata
= self
.der_decode(
4795 edata
, asn1Spec
=krb5_asn1
.METHOD_DATA())
4796 self
.assertGreater(len(rep_padata
), 0)
4799 self
.assertEqual(1, len(rep_padata
))
4800 rep_pa_dict
= self
.get_pa_dict(rep_padata
)
4801 self
.assertIn(PADATA_FX_FAST
, rep_pa_dict
)
4803 armor_key
= kdc_exchange_dict
['armor_key']
4804 self
.assertIsNotNone(armor_key
)
4805 fast_response
= self
.check_fx_fast_data(
4807 rep_pa_dict
[PADATA_FX_FAST
],
4809 expect_strengthen_key
=False)
4811 rep_padata
= fast_response
['padata']
4813 etype_info2
= self
.check_rep_padata(kdc_exchange_dict
,
4818 kdc_exchange_dict
['preauth_etype_info2'] = etype_info2
4820 self
.assertTrue(self
.expect_nt_status
,
4821 'got status code, but EXPECT_NT_STATUS=0')
4823 if expect_status
is not None:
4824 self
.assertTrue(expect_status
,
4825 'got unexpected status code')
4827 self
.assertEqual(KERB_ERR_TYPE_EXTENDED
,
4828 error_data
['data-type'])
4830 extended_error
= error_data
['data-value']
4832 self
.assertEqual(12, len(extended_error
))
4834 status
= int.from_bytes(extended_error
[:4], 'little')
4835 flags
= int.from_bytes(extended_error
[8:], 'little')
4837 self
.assertEqual(expected_status
, status
)
4839 if rep_msg_type
== KRB_TGS_REP
:
4840 self
.assertEqual(3, flags
)
4842 self
.assertEqual(1, flags
)
4846 def check_reply_padata(self
,
4851 expected_patypes
= ()
4853 sent_fast
= self
.sent_fast(kdc_exchange_dict
)
4854 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
4857 expected_patypes
+= (PADATA_FX_FAST
,)
4858 elif rep_msg_type
== KRB_AS_REP
:
4859 if self
.sent_pk_as_req(kdc_exchange_dict
):
4860 expected_patypes
+= PADATA_PK_AS_REP
,
4861 elif self
.sent_pk_as_req_win2k(kdc_exchange_dict
):
4862 expected_patypes
+= PADATA_PK_AS_REP_19
,
4864 chosen_etype
= self
.getElementValue(encpart
, 'etype')
4865 self
.assertIsNotNone(chosen_etype
)
4867 if chosen_etype
in {kcrypto
.Enctype
.AES256
,
4868 kcrypto
.Enctype
.AES128
}:
4869 expected_patypes
+= (PADATA_ETYPE_INFO2
,)
4871 preauth_key
= kdc_exchange_dict
['preauth_key']
4872 self
.assertIsInstance(preauth_key
, Krb5EncryptionKey
)
4873 if preauth_key
.etype
== kcrypto
.Enctype
.RC4
and rep_padata
is None:
4875 elif rep_msg_type
== KRB_TGS_REP
:
4876 if expected_patypes
== () and rep_padata
is None:
4879 if not self
.strict_checking
and rep_padata
is None:
4882 self
.assertIsNotNone(rep_padata
)
4883 got_patypes
= tuple(pa
['padata-type'] for pa
in rep_padata
)
4884 self
.assertSequenceElementsEqual(expected_patypes
, got_patypes
,
4885 # Windows does not add this.
4886 unchecked
={PADATA_PKINIT_KX}
)
4888 if len(expected_patypes
) == 0:
4891 pa_dict
= self
.get_pa_dict(rep_padata
)
4893 etype_info2
= pa_dict
.get(PADATA_ETYPE_INFO2
)
4894 if etype_info2
is not None:
4895 etype_info2
= self
.der_decode(etype_info2
,
4896 asn1Spec
=krb5_asn1
.ETYPE_INFO2())
4897 self
.assertEqual(len(etype_info2
), 1)
4898 elem
= etype_info2
[0]
4900 e
= self
.getElementValue(elem
, 'etype')
4901 self
.assertEqual(e
, chosen_etype
)
4902 salt
= self
.getElementValue(elem
, 'salt')
4903 self
.assertIsNotNone(salt
)
4904 expected_salt
= kdc_exchange_dict
['expected_salt']
4905 if expected_salt
is not None:
4906 self
.assertEqual(salt
, expected_salt
)
4907 s2kparams
= self
.getElementValue(elem
, 's2kparams')
4908 if self
.strict_checking
:
4909 self
.assertIsNone(s2kparams
)
4912 def greatest_common_etype(etypes
, proposed_etypes
):
4913 return max(filter(lambda e
: e
in etypes
, proposed_etypes
),
4917 def first_common_etype(etypes
, proposed_etypes
):
4918 return next(filter(lambda e
: e
in etypes
, proposed_etypes
), None)
4920 def supported_aes_rc4_etypes(self
, kdc_exchange_dict
):
4921 creds
= kdc_exchange_dict
['creds']
4922 supported_etypes
= self
.get_default_enctypes(creds
)
4924 rc4_support
= kdc_exchange_dict
['rc4_support']
4927 if kcrypto
.Enctype
.AES256
in supported_etypes
:
4928 aes_etypes
.add(kcrypto
.Enctype
.AES256
)
4929 if kcrypto
.Enctype
.AES128
in supported_etypes
:
4930 aes_etypes
.add(kcrypto
.Enctype
.AES128
)
4933 if rc4_support
and kcrypto
.Enctype
.RC4
in supported_etypes
:
4934 rc4_etypes
.add(kcrypto
.Enctype
.RC4
)
4936 return aes_etypes
, rc4_etypes
4938 def greatest_aes_rc4_etypes(self
, kdc_exchange_dict
):
4939 req_body
= kdc_exchange_dict
['req_body']
4940 proposed_etypes
= req_body
['etype']
4942 aes_etypes
, rc4_etypes
= self
.supported_aes_rc4_etypes(kdc_exchange_dict
)
4944 expected_aes
= self
.greatest_common_etype(aes_etypes
, proposed_etypes
)
4945 expected_rc4
= self
.greatest_common_etype(rc4_etypes
, proposed_etypes
)
4947 return expected_aes
, expected_rc4
4949 def expected_etype(self
, kdc_exchange_dict
):
4950 req_body
= kdc_exchange_dict
['req_body']
4951 proposed_etypes
= req_body
['etype']
4953 aes_etypes
, rc4_etypes
= self
.supported_aes_rc4_etypes(
4956 return self
.first_common_etype(aes_etypes | rc4_etypes
,
4959 def check_rep_padata(self
,
4964 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
4966 sent_fast
= self
.sent_fast(kdc_exchange_dict
)
4967 sent_enc_challenge
= self
.sent_enc_challenge(kdc_exchange_dict
)
4969 if rep_msg_type
== KRB_TGS_REP
:
4970 self
.assertTrue(sent_fast
)
4972 rc4_support
= kdc_exchange_dict
['rc4_support']
4974 expected_aes
, expected_rc4
= self
.greatest_aes_rc4_etypes(
4977 expect_etype_info2
= ()
4978 expect_etype_info
= False
4979 if expected_aes
is not None:
4980 expect_etype_info2
+= (expected_aes
,)
4981 if expected_rc4
is not None:
4983 expect_etype_info2
+= (expected_rc4
,)
4984 if expected_aes
is None:
4985 expect_etype_info
= True
4987 if expect_etype_info
:
4988 self
.assertGreater(len(expect_etype_info2
), 0)
4990 sent_pac_options
= self
.get_sent_pac_options(kdc_exchange_dict
)
4992 check_patypes
= kdc_exchange_dict
['check_patypes']
4994 expected_patypes
= ()
4995 if sent_fast
and error_code
!= 0:
4996 expected_patypes
+= (PADATA_FX_ERROR
,)
4997 expected_patypes
+= (PADATA_FX_COOKIE
,)
4999 if rep_msg_type
== KRB_TGS_REP
:
5000 if ('1' in sent_pac_options
5001 and error_code
not in (0, KDC_ERR_GENERIC
)):
5002 expected_patypes
+= (PADATA_PAC_OPTIONS
,)
5003 elif error_code
!= KDC_ERR_GENERIC
:
5004 if expect_etype_info
:
5005 expected_patypes
+= (PADATA_ETYPE_INFO
,)
5006 if len(expect_etype_info2
) != 0:
5007 expected_patypes
+= (PADATA_ETYPE_INFO2
,)
5009 sent_freshness
= self
.sent_freshness(kdc_exchange_dict
)
5011 if error_code
not in (KDC_ERR_PREAUTH_FAILED
, KDC_ERR_SKEW
,
5012 KDC_ERR_POLICY
, KDC_ERR_CLIENT_REVOKED
):
5014 expected_patypes
+= (PADATA_ENCRYPTED_CHALLENGE
,)
5016 expected_patypes
+= (PADATA_ENC_TIMESTAMP
,)
5018 if not sent_enc_challenge
:
5019 expected_patypes
+= (PADATA_PK_AS_REQ
,)
5020 if not sent_freshness
:
5021 expected_patypes
+= (PADATA_PK_AS_REP_19
,)
5024 expected_patypes
+= PADATA_AS_FRESHNESS
,
5026 if (self
.kdc_fast_support
5028 and not sent_enc_challenge
):
5029 expected_patypes
+= (PADATA_FX_FAST
,)
5030 expected_patypes
+= (PADATA_FX_COOKIE
,)
5032 require_strict
= {PADATA_FX_COOKIE
,
5035 PADATA_PK_AS_REP_19
,
5039 strict_edata_checking
= kdc_exchange_dict
['strict_edata_checking']
5040 if not strict_edata_checking
:
5041 require_strict
.add(PADATA_ETYPE_INFO2
)
5042 require_strict
.add(PADATA_ENCRYPTED_CHALLENGE
)
5044 got_patypes
= tuple(pa
['padata-type'] for pa
in rep_padata
)
5045 self
.assertSequenceElementsEqual(expected_patypes
, got_patypes
,
5046 require_strict
=require_strict
)
5048 if not expected_patypes
:
5051 pa_dict
= self
.get_pa_dict(rep_padata
)
5053 enc_timestamp
= pa_dict
.get(PADATA_ENC_TIMESTAMP
)
5054 if enc_timestamp
is not None:
5055 self
.assertEqual(len(enc_timestamp
), 0)
5057 pk_as_req
= pa_dict
.get(PADATA_PK_AS_REQ
)
5058 if pk_as_req
is not None:
5059 self
.assertEqual(len(pk_as_req
), 0)
5061 pk_as_rep19
= pa_dict
.get(PADATA_PK_AS_REP_19
)
5062 if pk_as_rep19
is not None:
5063 self
.assertEqual(len(pk_as_rep19
), 0)
5065 freshness_token
= pa_dict
.get(PADATA_AS_FRESHNESS
)
5066 if freshness_token
is not None:
5067 self
.assertEqual(bytes(2), freshness_token
[:2])
5069 freshness
= self
.der_decode(freshness_token
[2:],
5070 asn1Spec
=krb5_asn1
.EncryptedData())
5072 krbtgt_creds
= self
.get_krbtgt_creds()
5073 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
5075 self
.assertElementEqual(freshness
, 'etype', krbtgt_key
.etype
)
5076 self
.assertElementKVNO(freshness
, 'kvno', krbtgt_key
.kvno
)
5078 # Decrypt the freshness token.
5079 ts_enc
= krbtgt_key
.decrypt(KU_AS_FRESHNESS
,
5080 freshness
['cipher'])
5082 # Ensure that we can decode it as PA-ENC-TS-ENC.
5083 ts_enc
= self
.der_decode(ts_enc
,
5084 asn1Spec
=krb5_asn1
.PA_ENC_TS_ENC())
5085 freshness_time
= self
.get_EpochFromKerberosTime(
5086 ts_enc
['patimestamp'])
5087 freshness_time
+= ts_enc
['pausec'] / 1e6
5089 # Ensure that it is reasonably close to the current time (within
5090 # five minutes, to allow for clock skew).
5091 current_time
= datetime
.datetime
.now(
5092 datetime
.timezone
.utc
).timestamp()
5093 self
.assertLess(current_time
- 5 * 60, freshness_time
)
5094 self
.assertLess(freshness_time
, current_time
+ 5 * 60)
5096 kdc_exchange_dict
['freshness_token'] = freshness_token
5098 fx_fast
= pa_dict
.get(PADATA_FX_FAST
)
5099 if fx_fast
is not None:
5100 self
.assertEqual(len(fx_fast
), 0)
5102 fast_cookie
= pa_dict
.get(PADATA_FX_COOKIE
)
5103 if fast_cookie
is not None:
5104 kdc_exchange_dict
['fast_cookie'] = fast_cookie
5106 fast_error
= pa_dict
.get(PADATA_FX_ERROR
)
5107 if fast_error
is not None:
5108 fast_error
= self
.der_decode(fast_error
,
5109 asn1Spec
=krb5_asn1
.KRB_ERROR())
5110 self
.generic_check_kdc_error(kdc_exchange_dict
,
5115 pac_options
= pa_dict
.get(PADATA_PAC_OPTIONS
)
5116 if pac_options
is not None:
5117 pac_options
= self
.der_decode(
5119 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
5120 self
.assertElementEqual(pac_options
, 'options', sent_pac_options
)
5122 enc_challenge
= pa_dict
.get(PADATA_ENCRYPTED_CHALLENGE
)
5123 if enc_challenge
is not None:
5124 if not sent_enc_challenge
:
5125 self
.assertEqual(len(enc_challenge
), 0)
5127 armor_key
= kdc_exchange_dict
['armor_key']
5128 self
.assertIsNotNone(armor_key
)
5130 preauth_key
, _
= self
.get_preauth_key(kdc_exchange_dict
)
5132 kdc_challenge_key
= self
.generate_kdc_challenge_key(
5133 armor_key
, preauth_key
)
5135 # Ensure that the encrypted challenge FAST factor is supported
5137 if self
.strict_checking
:
5138 self
.assertNotEqual(len(enc_challenge
), 0)
5139 if len(enc_challenge
) != 0:
5140 encrypted_challenge
= self
.der_decode(
5142 asn1Spec
=krb5_asn1
.EncryptedData())
5143 self
.assertEqual(encrypted_challenge
['etype'],
5144 kdc_challenge_key
.etype
)
5146 challenge
= kdc_challenge_key
.decrypt(
5147 KU_ENC_CHALLENGE_KDC
,
5148 encrypted_challenge
['cipher'])
5149 challenge
= self
.der_decode(
5151 asn1Spec
=krb5_asn1
.PA_ENC_TS_ENC())
5153 # Retrieve the returned timestamp.
5154 rep_patime
= challenge
['patimestamp']
5155 self
.assertIn('pausec', challenge
)
5157 # Ensure the returned time is within five minutes of the
5159 rep_time
= self
.get_EpochFromKerberosTime(rep_patime
)
5160 current_time
= time
.time()
5162 self
.assertLess(current_time
- 300, rep_time
)
5163 self
.assertLess(rep_time
, current_time
+ 300)
5165 etype_info2
= pa_dict
.get(PADATA_ETYPE_INFO2
)
5166 if etype_info2
is not None:
5167 etype_info2
= self
.der_decode(etype_info2
,
5168 asn1Spec
=krb5_asn1
.ETYPE_INFO2())
5169 self
.assertGreaterEqual(len(etype_info2
), 1)
5170 if self
.strict_checking
:
5171 self
.assertEqual(len(etype_info2
), len(expect_etype_info2
))
5172 for i
in range(0, len(etype_info2
)):
5173 e
= self
.getElementValue(etype_info2
[i
], 'etype')
5174 if self
.strict_checking
:
5175 self
.assertEqual(e
, expect_etype_info2
[i
])
5176 salt
= self
.getElementValue(etype_info2
[i
], 'salt')
5177 if e
== kcrypto
.Enctype
.RC4
:
5178 if self
.strict_checking
:
5179 self
.assertIsNone(salt
)
5181 self
.assertIsNotNone(salt
)
5182 expected_salt
= kdc_exchange_dict
['expected_salt']
5183 if expected_salt
is not None:
5184 self
.assertEqual(salt
, expected_salt
)
5185 s2kparams
= self
.getElementValue(etype_info2
[i
], 's2kparams')
5186 if self
.strict_checking
:
5187 self
.assertIsNone(s2kparams
)
5189 etype_info
= pa_dict
.get(PADATA_ETYPE_INFO
)
5190 if etype_info
is not None:
5191 etype_info
= self
.der_decode(etype_info
,
5192 asn1Spec
=krb5_asn1
.ETYPE_INFO())
5193 self
.assertEqual(len(etype_info
), 1)
5194 e
= self
.getElementValue(etype_info
[0], 'etype')
5195 self
.assertEqual(e
, kcrypto
.Enctype
.RC4
)
5197 self
.assertEqual(e
, expect_etype_info2
[0])
5198 salt
= self
.getElementValue(etype_info
[0], 'salt')
5199 if self
.strict_checking
:
5200 self
.assertIsNotNone(salt
)
5201 self
.assertEqual(len(salt
), 0)
5205 def generate_simple_fast(self
,
5213 armor_key
= kdc_exchange_dict
['armor_key']
5215 fast_req
= self
.KRB_FAST_REQ_create(fast_options
,
5218 fast_req
= self
.der_encode(fast_req
,
5219 asn1Spec
=krb5_asn1
.KrbFastReq())
5220 fast_req
= self
.EncryptedData_create(armor_key
,
5224 fast_armored_req
= self
.KRB_FAST_ARMORED_REQ_create(fast_armor
,
5228 fx_fast_request
= self
.PA_FX_FAST_REQUEST_create(fast_armored_req
)
5229 fx_fast_request
= self
.der_encode(
5231 asn1Spec
=krb5_asn1
.PA_FX_FAST_REQUEST())
5233 fast_padata
= self
.PA_DATA_create(PADATA_FX_FAST
,
5238 def generate_ap_req(self
,
5245 req_body_checksum
= None
5248 self
.assertIsNone(req_body
)
5250 tgt
= kdc_exchange_dict
['armor_tgt']
5251 authenticator_subkey
= kdc_exchange_dict
['armor_subkey']
5253 tgt
= kdc_exchange_dict
['tgt']
5254 authenticator_subkey
= kdc_exchange_dict
['authenticator_subkey']
5256 if req_body
is not None:
5257 body_checksum_type
= kdc_exchange_dict
['body_checksum_type']
5259 req_body_blob
= self
.der_encode(
5260 req_body
, asn1Spec
=krb5_asn1
.KDC_REQ_BODY())
5262 req_body_checksum
= self
.Checksum_create(
5264 KU_TGS_REQ_AUTH_CKSUM
,
5266 ctype
=body_checksum_type
)
5268 auth_data
= kdc_exchange_dict
['auth_data']
5271 if authenticator_subkey
is not None:
5272 subkey_obj
= authenticator_subkey
.export_obj()
5273 if seq_number
is None:
5274 seq_number
= random
.randint(0, 0xfffffffe)
5275 (ctime
, cusec
) = self
.get_KerberosTimeWithUsec()
5276 authenticator_obj
= self
.Authenticator_create(
5279 cksum
=req_body_checksum
,
5283 seq_number
=seq_number
,
5284 authorization_data
=auth_data
)
5285 authenticator_blob
= self
.der_encode(
5287 asn1Spec
=krb5_asn1
.Authenticator())
5290 usage
= KU_AP_REQ_AUTH
if armor
else KU_TGS_REQ_AUTH
5291 authenticator
= self
.EncryptedData_create(tgt
.session_key
,
5296 ap_options
= kdc_exchange_dict
['fast_ap_options']
5298 ap_options
= kdc_exchange_dict
['ap_options']
5299 if ap_options
is None:
5300 ap_options
= str(krb5_asn1
.APOptions('0'))
5301 ap_req_obj
= self
.AP_REQ_create(ap_options
=ap_options
,
5303 authenticator
=authenticator
)
5304 ap_req
= self
.der_encode(ap_req_obj
, asn1Spec
=krb5_asn1
.AP_REQ())
5308 def generate_simple_tgs_padata(self
,
5312 ap_req
= self
.generate_ap_req(kdc_exchange_dict
,
5316 pa_tgs_req
= self
.PA_DATA_create(PADATA_KDC_REQ
, ap_req
)
5317 padata
= [pa_tgs_req
]
5319 return padata
, req_body
5321 def get_preauth_key(self
, kdc_exchange_dict
):
5322 msg_type
= kdc_exchange_dict
['rep_msg_type']
5324 if msg_type
== KRB_AS_REP
:
5325 key
= kdc_exchange_dict
['preauth_key']
5326 usage
= KU_AS_REP_ENC_PART
5328 authenticator_subkey
= kdc_exchange_dict
['authenticator_subkey']
5329 if authenticator_subkey
is not None:
5330 key
= authenticator_subkey
5331 usage
= KU_TGS_REP_ENC_PART_SUB_KEY
5333 tgt
= kdc_exchange_dict
['tgt']
5334 key
= tgt
.session_key
5335 usage
= KU_TGS_REP_ENC_PART_SESSION
5337 self
.assertIsNotNone(key
)
5341 def generate_armor_key(self
, subkey
, session_key
):
5342 armor_key
= kcrypto
.cf2(subkey
.key
,
5346 armor_key
= Krb5EncryptionKey(armor_key
, None)
5350 def generate_strengthen_reply_key(self
, strengthen_key
, reply_key
):
5351 strengthen_reply_key
= kcrypto
.cf2(strengthen_key
.key
,
5355 strengthen_reply_key
= Krb5EncryptionKey(strengthen_reply_key
,
5358 return strengthen_reply_key
5360 def generate_client_challenge_key(self
, armor_key
, longterm_key
):
5361 client_challenge_key
= kcrypto
.cf2(armor_key
.key
,
5363 b
'clientchallengearmor',
5364 b
'challengelongterm')
5365 client_challenge_key
= Krb5EncryptionKey(client_challenge_key
, None)
5367 return client_challenge_key
5369 def generate_kdc_challenge_key(self
, armor_key
, longterm_key
):
5370 kdc_challenge_key
= kcrypto
.cf2(armor_key
.key
,
5372 b
'kdcchallengearmor',
5373 b
'challengelongterm')
5374 kdc_challenge_key
= Krb5EncryptionKey(kdc_challenge_key
, None)
5376 return kdc_challenge_key
5378 def verify_ticket_checksum(self
, ticket
, expected_checksum
, armor_key
):
5379 expected_type
= expected_checksum
['cksumtype']
5380 self
.assertEqual(armor_key
.ctype
, expected_type
)
5382 ticket_blob
= self
.der_encode(ticket
,
5383 asn1Spec
=krb5_asn1
.Ticket())
5384 checksum
= self
.Checksum_create(armor_key
,
5387 self
.assertEqual(expected_checksum
, checksum
)
5389 def verify_ticket(self
, ticket
, krbtgt_keys
, service_ticket
,
5391 expect_ticket_checksum
=True,
5392 expect_full_checksum
=None):
5393 # Decrypt the ticket.
5395 key
= ticket
.decryption_key
5396 enc_part
= ticket
.ticket
['enc-part']
5398 self
.assertElementEqual(enc_part
, 'etype', key
.etype
)
5399 self
.assertElementKVNO(enc_part
, 'kvno', key
.kvno
)
5401 enc_part
= key
.decrypt(KU_TICKET
, enc_part
['cipher'])
5402 enc_part
= self
.der_decode(
5403 enc_part
, asn1Spec
=krb5_asn1
.EncTicketPart())
5405 # Fetch the authorization data from the ticket.
5406 auth_data
= enc_part
.get('authorization-data')
5408 self
.assertIsNotNone(auth_data
)
5409 elif auth_data
is None:
5412 # Get a copy of the authdata with an empty PAC, and the existing PAC
5414 empty_pac
= self
.get_empty_pac()
5415 auth_data
, pac_data
= self
.replace_pac(auth_data
,
5417 expect_pac
=expect_pac
)
5421 # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
5422 # raw type to create a new PAC with zeroed signatures for
5423 # verification. This is because on Windows, the resource_groups field
5424 # is added to PAC_LOGON_INFO after the info3 field has been created,
5425 # which results in a different ordering of pointer values than Samba
5426 # (see commit 0e201ecdc53). Using the raw type avoids changing
5427 # PAC_LOGON_INFO, so verification against Windows can work. We still
5428 # need the PAC_DATA type to retrieve the actual checksums, because the
5429 # signatures in the raw type may contain padding bytes.
5430 pac
= ndr_unpack(krb5pac
.PAC_DATA
,
5432 raw_pac
= ndr_unpack(krb5pac
.PAC_DATA_RAW
,
5437 full_checksum_buffer
= None
5439 for pac_buffer
, raw_pac_buffer
in zip(pac
.buffers
, raw_pac
.buffers
):
5440 buffer_type
= pac_buffer
.type
5441 if buffer_type
in self
.pac_checksum_types
:
5442 self
.assertNotIn(buffer_type
, checksums
,
5443 f
'Duplicate checksum type {buffer_type}')
5445 # Fetch the checksum and the checksum type from the PAC buffer.
5446 checksum
= pac_buffer
.info
.signature
5447 ctype
= pac_buffer
.info
.type
5451 checksums
[buffer_type
] = checksum
, ctype
5453 if buffer_type
== krb5pac
.PAC_TYPE_FULL_CHECKSUM
:
5454 full_checksum_buffer
= raw_pac_buffer
5455 elif buffer_type
!= krb5pac
.PAC_TYPE_TICKET_CHECKSUM
:
5456 # Zero the checksum field so that we can later verify the
5457 # checksums. The ticket checksum field is not zeroed.
5459 signature
= ndr_unpack(
5460 krb5pac
.PAC_SIGNATURE_DATA
,
5461 raw_pac_buffer
.info
.remaining
)
5462 signature
.signature
= bytes(len(checksum
))
5463 raw_pac_buffer
.info
.remaining
= ndr_pack(
5466 # Re-encode the PAC.
5467 pac_data
= ndr_pack(raw_pac
)
5469 if full_checksum_buffer
is not None:
5470 signature
= ndr_unpack(
5471 krb5pac
.PAC_SIGNATURE_DATA
,
5472 full_checksum_buffer
.info
.remaining
)
5473 signature
.signature
= bytes(len(checksum
))
5474 full_checksum_buffer
.info
.remaining
= ndr_pack(
5477 # Re-encode the PAC.
5478 full_pac_data
= ndr_pack(raw_pac
)
5480 # Verify the signatures.
5482 server_checksum
, server_ctype
= checksums
[
5483 krb5pac
.PAC_TYPE_SRV_CHECKSUM
]
5484 key
.verify_checksum(KU_NON_KERB_CKSUM_SALT
,
5489 kdc_checksum
, kdc_ctype
= checksums
[
5490 krb5pac
.PAC_TYPE_KDC_CHECKSUM
]
5492 if isinstance(krbtgt_keys
, collections
.abc
.Container
):
5493 if self
.strict_checking
:
5494 krbtgt_key
= krbtgt_keys
[0]
5496 krbtgt_key
= next(key
for key
in krbtgt_keys
5497 if key
.ctype
== kdc_ctype
)
5499 krbtgt_key
= krbtgt_keys
5501 krbtgt_key
.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT
,
5506 if not service_ticket
:
5507 self
.assertNotIn(krb5pac
.PAC_TYPE_TICKET_CHECKSUM
, checksums
)
5508 self
.assertNotIn(krb5pac
.PAC_TYPE_FULL_CHECKSUM
, checksums
)
5510 ticket_checksum
, ticket_ctype
= checksums
.get(
5511 krb5pac
.PAC_TYPE_TICKET_CHECKSUM
,
5513 if expect_ticket_checksum
:
5514 self
.assertIsNotNone(ticket_checksum
)
5515 elif expect_ticket_checksum
is False:
5516 self
.assertIsNone(ticket_checksum
)
5517 if ticket_checksum
is not None:
5518 enc_part
['authorization-data'] = auth_data
5519 enc_part
= self
.der_encode(enc_part
,
5520 asn1Spec
=krb5_asn1
.EncTicketPart())
5522 krbtgt_key
.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT
,
5527 full_checksum
, full_ctype
= checksums
.get(
5528 krb5pac
.PAC_TYPE_FULL_CHECKSUM
,
5530 if expect_full_checksum
:
5531 self
.assertIsNotNone(full_checksum
)
5532 elif expect_full_checksum
is False:
5533 self
.assertIsNone(full_checksum
)
5534 if full_checksum
is not None:
5535 krbtgt_key
.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT
,
5540 def modified_ticket(self
,
5542 new_ticket_key
=None,
5546 allow_empty_authdata
=False,
5547 update_pac_checksums
=True,
5549 include_checksums
=None):
5550 if checksum_keys
is None:
5551 # A dict containing a key for each checksum type to be created in
5555 if include_checksums
is None:
5556 # A dict containing a value for each checksum type; True if the
5557 # checksum type is to be included in the PAC, False if it is to be
5558 # excluded, or None/not present if the checksum is to be included
5559 # based on its presence in the original PAC.
5560 include_checksums
= {}
5562 # Check that the values passed in by the caller make sense.
5564 self
.assertLessEqual(checksum_keys
.keys(), self
.pac_checksum_types
)
5565 self
.assertLessEqual(include_checksums
.keys(), self
.pac_checksum_types
)
5568 self
.assertIsNone(modify_pac_fn
)
5570 update_pac_checksums
= False
5572 if not update_pac_checksums
:
5573 self
.assertFalse(checksum_keys
)
5574 self
.assertFalse(include_checksums
)
5576 expect_pac
= modify_pac_fn
is not None
5578 key
= ticket
.decryption_key
5580 if new_ticket_key
is None:
5581 # Use the same key to re-encrypt the ticket.
5582 new_ticket_key
= key
5584 if krb5pac
.PAC_TYPE_SRV_CHECKSUM
not in checksum_keys
:
5585 # If the server signature key is not present, fall back to the key
5586 # used to encrypt the ticket.
5587 checksum_keys
[krb5pac
.PAC_TYPE_SRV_CHECKSUM
] = new_ticket_key
5589 if krb5pac
.PAC_TYPE_TICKET_CHECKSUM
not in checksum_keys
:
5590 # If the ticket signature key is not present, fall back to the key
5591 # used for the KDC signature.
5592 kdc_checksum_key
= checksum_keys
.get(krb5pac
.PAC_TYPE_KDC_CHECKSUM
)
5593 if kdc_checksum_key
is not None:
5594 checksum_keys
[krb5pac
.PAC_TYPE_TICKET_CHECKSUM
] = (
5597 if krb5pac
.PAC_TYPE_FULL_CHECKSUM
not in checksum_keys
:
5598 # If the full signature key is not present, fall back to the key
5599 # used for the KDC signature.
5600 kdc_checksum_key
= checksum_keys
.get(krb5pac
.PAC_TYPE_KDC_CHECKSUM
)
5601 if kdc_checksum_key
is not None:
5602 checksum_keys
[krb5pac
.PAC_TYPE_FULL_CHECKSUM
] = (
5605 # Decrypt the ticket.
5607 enc_part
= ticket
.ticket
['enc-part']
5609 self
.assertElementEqual(enc_part
, 'etype', key
.etype
)
5610 self
.assertElementKVNO(enc_part
, 'kvno', key
.kvno
)
5612 enc_part
= key
.decrypt(KU_TICKET
, enc_part
['cipher'])
5613 enc_part
= self
.der_decode(
5614 enc_part
, asn1Spec
=krb5_asn1
.EncTicketPart())
5616 # Modify the ticket here.
5617 if modify_fn
is not None:
5618 enc_part
= modify_fn(enc_part
)
5620 auth_data
= enc_part
.get('authorization-data')
5622 self
.assertIsNotNone(auth_data
)
5623 if auth_data
is not None:
5626 # Get a copy of the authdata with an empty PAC, and the
5627 # existing PAC (if present).
5628 empty_pac
= self
.get_empty_pac()
5629 empty_pac_auth_data
, pac_data
= self
.replace_pac(
5632 expect_pac
=expect_pac
)
5634 if pac_data
is not None:
5635 pac
= ndr_unpack(krb5pac
.PAC_DATA
, pac_data
)
5637 # Modify the PAC here.
5638 if modify_pac_fn
is not None:
5639 pac
= modify_pac_fn(pac
)
5641 if update_pac_checksums
:
5642 # Get the enc-part with an empty PAC, which is needed
5643 # to create a ticket signature.
5644 enc_part_to_sign
= enc_part
.copy()
5645 enc_part_to_sign
['authorization-data'] = (
5646 empty_pac_auth_data
)
5647 enc_part_to_sign
= self
.der_encode(
5649 asn1Spec
=krb5_asn1
.EncTicketPart())
5651 self
.update_pac_checksums(pac
,
5656 # Re-encode the PAC.
5657 pac_data
= ndr_pack(pac
)
5658 new_pac
= self
.AuthorizationData_create(AD_WIN2K_PAC
,
5661 # Replace the PAC in the authorization data and re-add it to the
5663 auth_data
, _
= self
.replace_pac(
5665 expect_pac
=expect_pac
,
5666 allow_empty_authdata
=allow_empty_authdata
)
5667 enc_part
['authorization-data'] = auth_data
5669 # Re-encrypt the ticket enc-part with the new key.
5670 enc_part_new
= self
.der_encode(enc_part
,
5671 asn1Spec
=krb5_asn1
.EncTicketPart())
5672 enc_part_new
= self
.EncryptedData_create(new_ticket_key
,
5676 # Create a copy of the ticket with the new enc-part.
5677 new_ticket
= ticket
.ticket
.copy()
5678 new_ticket
['enc-part'] = enc_part_new
5680 new_ticket_creds
= KerberosTicketCreds(
5682 session_key
=ticket
.session_key
,
5683 crealm
=ticket
.crealm
,
5685 srealm
=ticket
.srealm
,
5687 decryption_key
=new_ticket_key
,
5688 ticket_private
=enc_part
,
5689 encpart_private
=ticket
.encpart_private
)
5691 return new_ticket_creds
5693 def update_pac_checksums(self
,
5698 pac_buffers
= pac
.buffers
5699 checksum_buffers
= {}
5701 # Find the relevant PAC checksum buffers.
5702 for pac_buffer
in pac_buffers
:
5703 buffer_type
= pac_buffer
.type
5704 if buffer_type
in self
.pac_checksum_types
:
5705 self
.assertNotIn(buffer_type
, checksum_buffers
,
5706 f
'Duplicate checksum type {buffer_type}')
5708 checksum_buffers
[buffer_type
] = pac_buffer
5710 # Create any additional buffers that were requested but not
5711 # present. Conversely, remove any buffers that were requested to be
5713 for buffer_type
in self
.pac_checksum_types
:
5714 if buffer_type
in checksum_buffers
:
5715 if include_checksums
.get(buffer_type
) is False:
5716 checksum_buffer
= checksum_buffers
.pop(buffer_type
)
5718 pac
.num_buffers
-= 1
5719 pac_buffers
.remove(checksum_buffer
)
5721 elif include_checksums
.get(buffer_type
) is True:
5722 info
= krb5pac
.PAC_SIGNATURE_DATA()
5724 checksum_buffer
= krb5pac
.PAC_BUFFER()
5725 checksum_buffer
.type = buffer_type
5726 checksum_buffer
.info
= info
5728 pac_buffers
.append(checksum_buffer
)
5729 pac
.num_buffers
+= 1
5731 checksum_buffers
[buffer_type
] = checksum_buffer
5733 # Fill the relevant checksum buffers.
5734 for buffer_type
, checksum_buffer
in checksum_buffers
.items():
5735 checksum_key
= checksum_keys
[buffer_type
]
5736 ctype
= checksum_key
.ctype
& ((1 << 32) - 1)
5738 if buffer_type
== krb5pac
.PAC_TYPE_TICKET_CHECKSUM
:
5739 self
.assertIsNotNone(enc_part
)
5741 signature
= checksum_key
.make_rodc_checksum(
5742 KU_NON_KERB_CKSUM_SALT
,
5745 elif buffer_type
== krb5pac
.PAC_TYPE_SRV_CHECKSUM
:
5746 signature
= checksum_key
.make_zeroed_checksum()
5749 signature
= checksum_key
.make_rodc_zeroed_checksum()
5751 checksum_buffer
.info
.signature
= signature
5752 checksum_buffer
.info
.type = ctype
5754 # Add the new checksum buffers to the PAC.
5755 pac
.buffers
= pac_buffers
5757 # Calculate the full checksum and insert it into the PAC.
5758 full_checksum_buffer
= checksum_buffers
.get(
5759 krb5pac
.PAC_TYPE_FULL_CHECKSUM
)
5760 if full_checksum_buffer
is not None:
5761 full_checksum_key
= checksum_keys
[krb5pac
.PAC_TYPE_FULL_CHECKSUM
]
5763 pac_data
= ndr_pack(pac
)
5764 full_checksum
= full_checksum_key
.make_rodc_checksum(
5765 KU_NON_KERB_CKSUM_SALT
,
5768 full_checksum_buffer
.info
.signature
= full_checksum
5770 # Calculate the server and KDC checksums and insert them into the PAC.
5772 server_checksum_buffer
= checksum_buffers
.get(
5773 krb5pac
.PAC_TYPE_SRV_CHECKSUM
)
5774 if server_checksum_buffer
is not None:
5775 server_checksum_key
= checksum_keys
[krb5pac
.PAC_TYPE_SRV_CHECKSUM
]
5777 pac_data
= ndr_pack(pac
)
5778 server_checksum
= server_checksum_key
.make_checksum(
5779 KU_NON_KERB_CKSUM_SALT
,
5782 server_checksum_buffer
.info
.signature
= server_checksum
5784 kdc_checksum_buffer
= checksum_buffers
.get(
5785 krb5pac
.PAC_TYPE_KDC_CHECKSUM
)
5786 if kdc_checksum_buffer
is not None:
5787 if server_checksum_buffer
is None:
5788 # There's no server signature to make the checksum over, so
5789 # just make the checksum over an empty bytes object.
5790 server_checksum
= bytes()
5792 kdc_checksum_key
= checksum_keys
[krb5pac
.PAC_TYPE_KDC_CHECKSUM
]
5794 kdc_checksum
= kdc_checksum_key
.make_rodc_checksum(
5795 KU_NON_KERB_CKSUM_SALT
,
5798 kdc_checksum_buffer
.info
.signature
= kdc_checksum
5800 def replace_pac(self
, auth_data
, new_pac
, expect_pac
=True,
5801 allow_empty_authdata
=False):
5802 if new_pac
is not None:
5803 self
.assertElementEqual(new_pac
, 'ad-type', AD_WIN2K_PAC
)
5804 self
.assertElementPresent(new_pac
, 'ad-data')
5811 for authdata_elem
in auth_data
:
5812 if authdata_elem
['ad-type'] == AD_IF_RELEVANT
:
5813 ad_relevant
= self
.der_decode(
5814 authdata_elem
['ad-data'],
5815 asn1Spec
=krb5_asn1
.AD_IF_RELEVANT())
5818 for relevant_elem
in ad_relevant
:
5819 if relevant_elem
['ad-type'] == AD_WIN2K_PAC
:
5820 self
.assertIsNone(old_pac
, 'Multiple PACs detected')
5821 old_pac
= relevant_elem
['ad-data']
5823 if new_pac
is not None:
5824 relevant_elems
.append(new_pac
)
5826 relevant_elems
.append(relevant_elem
)
5828 self
.assertIsNotNone(old_pac
, 'Expected PAC')
5830 if relevant_elems
or allow_empty_authdata
:
5831 ad_relevant
= self
.der_encode(
5833 asn1Spec
=krb5_asn1
.AD_IF_RELEVANT())
5835 authdata_elem
= self
.AuthorizationData_create(
5839 authdata_elem
= None
5841 if authdata_elem
is not None or allow_empty_authdata
:
5842 new_auth_data
.append(authdata_elem
)
5845 self
.assertIsNotNone(ad_relevant
, 'Expected AD-RELEVANT')
5847 return new_auth_data
, old_pac
5849 def get_pac(self
, auth_data
, expect_pac
=True):
5850 _
, pac
= self
.replace_pac(auth_data
, None, expect_pac
)
5853 def get_ticket_pac(self
, ticket
, expect_pac
=True):
5854 auth_data
= ticket
.ticket_private
.get('authorization-data')
5856 self
.assertIsNotNone(auth_data
)
5857 elif auth_data
is None:
5860 return self
.get_pac(auth_data
, expect_pac
=expect_pac
)
5862 def get_krbtgt_checksum_key(self
):
5863 krbtgt_creds
= self
.get_krbtgt_creds()
5864 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
5867 krb5pac
.PAC_TYPE_KDC_CHECKSUM
: krbtgt_key
5870 def is_tgs_principal(self
, principal
):
5871 if self
.is_tgs(principal
):
5874 if self
.kadmin_is_tgs
and self
.is_kadmin(principal
):
5879 def is_kadmin(self
, principal
):
5880 name
= principal
['name-string'][0]
5881 return name
in ('kadmin', b
'kadmin')
5883 def is_tgs(self
, principal
):
5884 name
= principal
['name-string'][0]
5885 return name
in ('krbtgt', b
'krbtgt')
5887 def is_tgt(self
, ticket
):
5888 sname
= ticket
.ticket
['sname']
5889 return self
.is_tgs(sname
)
5891 def get_empty_pac(self
):
5892 return self
.AuthorizationData_create(AD_WIN2K_PAC
, bytes(1))
5894 def get_outer_pa_dict(self
, kdc_exchange_dict
):
5895 return self
.get_pa_dict(kdc_exchange_dict
['req_padata'])
5897 def get_fast_pa_dict(self
, kdc_exchange_dict
):
5898 req_pa_dict
= self
.get_pa_dict(kdc_exchange_dict
['fast_padata'])
5903 return self
.get_outer_pa_dict(kdc_exchange_dict
)
5905 def sent_fast(self
, kdc_exchange_dict
):
5906 outer_pa_dict
= self
.get_outer_pa_dict(kdc_exchange_dict
)
5908 return PADATA_FX_FAST
in outer_pa_dict
5910 def sent_enc_challenge(self
, kdc_exchange_dict
):
5911 fast_pa_dict
= self
.get_fast_pa_dict(kdc_exchange_dict
)
5913 return PADATA_ENCRYPTED_CHALLENGE
in fast_pa_dict
5915 def sent_enc_pa_rep(self
, kdc_exchange_dict
):
5916 fast_pa_dict
= self
.get_fast_pa_dict(kdc_exchange_dict
)
5918 return PADATA_REQ_ENC_PA_REP
in fast_pa_dict
5920 def sent_pk_as_req(self
, kdc_exchange_dict
):
5921 fast_pa_dict
= self
.get_fast_pa_dict(kdc_exchange_dict
)
5923 return PADATA_PK_AS_REQ
in fast_pa_dict
5925 def sent_pk_as_req_win2k(self
, kdc_exchange_dict
):
5926 fast_pa_dict
= self
.get_fast_pa_dict(kdc_exchange_dict
)
5928 return PADATA_PK_AS_REP_19
in fast_pa_dict
5930 def sent_freshness(self
, kdc_exchange_dict
):
5931 fast_pa_dict
= self
.get_fast_pa_dict(kdc_exchange_dict
)
5933 return PADATA_AS_FRESHNESS
in fast_pa_dict
5935 def get_sent_pac_options(self
, kdc_exchange_dict
):
5936 fast_pa_dict
= self
.get_fast_pa_dict(kdc_exchange_dict
)
5938 if PADATA_PAC_OPTIONS
not in fast_pa_dict
:
5941 pac_options
= self
.der_decode(fast_pa_dict
[PADATA_PAC_OPTIONS
],
5942 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
5943 pac_options
= pac_options
['options']
5945 # Mask out unsupported bits.
5946 pac_options
, remaining
= pac_options
[:4], pac_options
[4:]
5947 pac_options
+= '0' * len(remaining
)
5951 def get_krbtgt_sname(self
):
5952 krbtgt_creds
= self
.get_krbtgt_creds()
5953 krbtgt_username
= krbtgt_creds
.get_username()
5954 krbtgt_realm
= krbtgt_creds
.get_realm()
5955 krbtgt_sname
= self
.PrincipalName_create(
5956 name_type
=NT_SRV_INST
, names
=[krbtgt_username
, krbtgt_realm
])
5960 def add_requester_sid(self
, pac
, sid
):
5961 pac_buffers
= pac
.buffers
5963 buffer_types
= [pac_buffer
.type for pac_buffer
in pac_buffers
]
5964 self
.assertNotIn(krb5pac
.PAC_TYPE_REQUESTER_SID
, buffer_types
)
5966 requester_sid
= krb5pac
.PAC_REQUESTER_SID()
5967 requester_sid
.sid
= security
.dom_sid(sid
)
5969 requester_sid_buffer
= krb5pac
.PAC_BUFFER()
5970 requester_sid_buffer
.type = krb5pac
.PAC_TYPE_REQUESTER_SID
5971 requester_sid_buffer
.info
= requester_sid
5973 pac_buffers
.append(requester_sid_buffer
)
5975 pac
.buffers
= pac_buffers
5976 pac
.num_buffers
+= 1
5980 def modify_lifetime(self
, ticket
, lifetime
, requester_sid
=None):
5981 # Get the krbtgt key.
5982 krbtgt_creds
= self
.get_krbtgt_creds()
5984 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
5986 krb5pac
.PAC_TYPE_KDC_CHECKSUM
: krbtgt_key
,
5989 current_time
= time
.time()
5991 # Set authtime and starttime to an hour in the past, to show that they
5992 # do not affect ticket rejection.
5993 start_time
= self
.get_KerberosTime(epoch
=current_time
, offset
=-60 * 60)
5995 # Set the endtime of the ticket relative to our current time, so that
5996 # the ticket has 'lifetime' seconds remaining to live.
5997 end_time
= self
.get_KerberosTime(epoch
=current_time
, offset
=lifetime
)
5999 # Modify the times in the ticket.
6000 def modify_ticket_times(enc_part
):
6001 enc_part
['authtime'] = start_time
6002 if 'starttime' in enc_part
:
6003 enc_part
['starttime'] = start_time
6005 enc_part
['endtime'] = end_time
6009 # We have to set the times in both the ticket and the PAC, otherwise
6010 # Heimdal will complain.
6011 def modify_pac_time(pac
):
6012 pac_buffers
= pac
.buffers
6014 for pac_buffer
in pac_buffers
:
6015 if pac_buffer
.type == krb5pac
.PAC_TYPE_LOGON_NAME
:
6016 logon_time
= self
.get_EpochFromKerberosTime(start_time
)
6017 pac_buffer
.info
.logon_time
= unix2nttime(logon_time
)
6020 self
.fail('failed to find LOGON_NAME PAC buffer')
6022 pac
.buffers
= pac_buffers
6026 def modify_pac_fn(pac
):
6027 if requester_sid
is not None:
6028 # Add a requester SID to show that the KDC will then accept
6029 # this kpasswd ticket as if it were a TGT.
6030 pac
= self
.add_requester_sid(pac
, sid
=requester_sid
)
6031 pac
= modify_pac_time(pac
)
6034 # Do the actual modification.
6035 return self
.modified_ticket(ticket
,
6036 new_ticket_key
=krbtgt_key
,
6037 modify_fn
=modify_ticket_times
,
6038 modify_pac_fn
=modify_pac_fn
,
6039 checksum_keys
=checksum_keys
)
6041 def _test_as_exchange(self
,
6046 expected_error_mode
,
6057 expected_account_name
=None,
6058 expected_groups
=None,
6059 unexpected_groups
=None,
6060 expected_upn_name
=None,
6062 expected_domain_sid
=None,
6063 expected_flags
=None,
6064 unexpected_flags
=None,
6065 expected_supported_etypes
=None,
6067 ticket_decryption_key
=None,
6071 expect_pac_attrs
=None,
6072 expect_pac_attrs_pac_request
=None,
6073 expect_requester_sid
=None,
6074 expect_client_claims
=None,
6075 expect_device_claims
=None,
6076 expected_client_claims
=None,
6077 unexpected_client_claims
=None,
6078 expected_device_claims
=None,
6079 unexpected_device_claims
=None,
6082 expected_status
=None,
6086 def _generate_padata_copy(_kdc_exchange_dict
,
6089 return padata
, req_body
6091 if not expected_error_mode
:
6092 check_error_fn
= None
6093 check_rep_fn
= self
.generic_check_kdc_rep
6095 check_error_fn
= self
.generic_check_kdc_error
6098 if padata
is not None:
6099 generate_padata_fn
= _generate_padata_copy
6101 generate_padata_fn
= None
6103 kdc_exchange_dict
= self
.as_exchange_dict(
6105 expected_crealm
=expected_crealm
,
6106 expected_cname
=expected_cname
,
6107 expected_srealm
=expected_srealm
,
6108 expected_sname
=expected_sname
,
6109 expected_account_name
=expected_account_name
,
6110 expected_groups
=expected_groups
,
6111 unexpected_groups
=unexpected_groups
,
6112 expected_upn_name
=expected_upn_name
,
6113 expected_sid
=expected_sid
,
6114 expected_domain_sid
=expected_domain_sid
,
6115 expected_supported_etypes
=expected_supported_etypes
,
6116 ticket_decryption_key
=ticket_decryption_key
,
6117 generate_padata_fn
=generate_padata_fn
,
6118 check_error_fn
=check_error_fn
,
6119 check_rep_fn
=check_rep_fn
,
6120 check_kdc_private_fn
=self
.generic_check_kdc_private
,
6121 expected_error_mode
=expected_error_mode
,
6122 expected_salt
=expected_salt
,
6123 expected_flags
=expected_flags
,
6124 unexpected_flags
=unexpected_flags
,
6125 preauth_key
=preauth_key
,
6126 kdc_options
=str(kdc_options
),
6127 pac_request
=pac_request
,
6128 pac_options
=pac_options
,
6129 expect_pac
=expect_pac
,
6130 expect_pac_attrs
=expect_pac_attrs
,
6131 expect_pac_attrs_pac_request
=expect_pac_attrs_pac_request
,
6132 expect_requester_sid
=expect_requester_sid
,
6133 expect_client_claims
=expect_client_claims
,
6134 expect_device_claims
=expect_device_claims
,
6135 expected_client_claims
=expected_client_claims
,
6136 unexpected_client_claims
=unexpected_client_claims
,
6137 expected_device_claims
=expected_device_claims
,
6138 unexpected_device_claims
=unexpected_device_claims
,
6139 expect_edata
=expect_edata
,
6140 expect_status
=expect_status
,
6141 expected_status
=expected_status
,
6142 rc4_support
=rc4_support
,
6145 rep
= self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
6150 renew_time
=renew_time
,
6153 return rep
, kdc_exchange_dict