1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
29 from pyasn1
.codec
.der
.decoder
import decode
as pyasn1_der_decode
30 from pyasn1
.codec
.der
.encoder
import encode
as pyasn1_der_encode
31 from pyasn1
.codec
.native
.decoder
import decode
as pyasn1_native_decode
32 from pyasn1
.codec
.native
.encoder
import encode
as pyasn1_native_encode
34 from pyasn1
.codec
.ber
.encoder
import BitStringEncoder
36 from samba
.credentials
import Credentials
37 from samba
.dcerpc
import krb5pac
, security
38 from samba
.gensec
import FEATURE_SEAL
39 from samba
.ndr
import ndr_pack
, ndr_unpack
42 from samba
.tests
import TestCaseInTempDir
44 import samba
.tests
.krb5
.rfc4120_pyasn1
as krb5_asn1
45 from samba
.tests
.krb5
.rfc4120_constants
import (
48 FX_FAST_ARMOR_AP_REQUEST
,
50 KDC_ERR_PREAUTH_FAILED
,
51 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
,
65 KU_NON_KERB_CKSUM_SALT
,
66 KU_TGS_REP_ENC_PART_SESSION
,
67 KU_TGS_REP_ENC_PART_SUB_KEY
,
69 KU_TGS_REQ_AUTH_CKSUM
,
70 KU_TGS_REQ_AUTH_DAT_SESSION
,
71 KU_TGS_REQ_AUTH_DAT_SUBKEY
,
75 PADATA_ENCRYPTED_CHALLENGE
,
89 PADATA_SUPPORTED_ETYPES
91 import samba
.tests
.krb5
.kcrypto
as kcrypto
94 def BitStringEncoder_encodeValue32(
95 self
, value
, asn1Spec
, encodeFun
, **options
):
97 # BitStrings like KDCOptions or TicketFlags should at least
98 # be 32-Bit on the wire
100 if asn1Spec
is not None:
101 # TODO: try to avoid ASN.1 schema instantiation
102 value
= asn1Spec
.clone(value
)
104 valueLength
= len(value
)
106 alignedValue
= value
<< (8 - valueLength
% 8)
110 substrate
= alignedValue
.asOctets()
111 length
= len(substrate
)
112 # We need at least 32-Bit / 4-Bytes
117 ret
= b
'\x00' + substrate
+ (b
'\x00' * padding
)
118 return ret
, False, True
121 BitStringEncoder
.encodeValue
= BitStringEncoder_encodeValue32
124 def BitString_NamedValues_prettyPrint(self
, scope
=0):
125 ret
= "%s" % self
.asBinary()
128 for byte
in self
.asNumbers():
129 for bit
in [7, 6, 5, 4, 3, 2, 1, 0]:
136 if len(bits
) < highest_bit
:
137 for bitPosition
in range(len(bits
), highest_bit
):
140 delim
= ": (\n%s " % indent
141 for bitPosition
in range(highest_bit
):
142 if bitPosition
in self
.prettyPrintNamedValues
:
143 name
= self
.prettyPrintNamedValues
[bitPosition
]
144 elif bits
[bitPosition
] != 0:
145 name
= "unknown-bit-%u" % bitPosition
148 ret
+= "%s%s:%u" % (delim
, name
, bits
[bitPosition
])
149 delim
= ",\n%s " % indent
150 ret
+= "\n%s)" % indent
154 krb5_asn1
.TicketFlags
.prettyPrintNamedValues
=\
155 krb5_asn1
.TicketFlagsValues
.namedValues
156 krb5_asn1
.TicketFlags
.namedValues
=\
157 krb5_asn1
.TicketFlagsValues
.namedValues
158 krb5_asn1
.TicketFlags
.prettyPrint
=\
159 BitString_NamedValues_prettyPrint
160 krb5_asn1
.KDCOptions
.prettyPrintNamedValues
=\
161 krb5_asn1
.KDCOptionsValues
.namedValues
162 krb5_asn1
.KDCOptions
.namedValues
=\
163 krb5_asn1
.KDCOptionsValues
.namedValues
164 krb5_asn1
.KDCOptions
.prettyPrint
=\
165 BitString_NamedValues_prettyPrint
166 krb5_asn1
.APOptions
.prettyPrintNamedValues
=\
167 krb5_asn1
.APOptionsValues
.namedValues
168 krb5_asn1
.APOptions
.namedValues
=\
169 krb5_asn1
.APOptionsValues
.namedValues
170 krb5_asn1
.APOptions
.prettyPrint
=\
171 BitString_NamedValues_prettyPrint
172 krb5_asn1
.PACOptionFlags
.prettyPrintNamedValues
=\
173 krb5_asn1
.PACOptionFlagsValues
.namedValues
174 krb5_asn1
.PACOptionFlags
.namedValues
=\
175 krb5_asn1
.PACOptionFlagsValues
.namedValues
176 krb5_asn1
.PACOptionFlags
.prettyPrint
=\
177 BitString_NamedValues_prettyPrint
180 def Integer_NamedValues_prettyPrint(self
, scope
=0):
182 if intval
in self
.prettyPrintNamedValues
:
183 name
= self
.prettyPrintNamedValues
[intval
]
185 name
= "<__unknown__>"
186 ret
= "%d (0x%x) %s" % (intval
, intval
, name
)
190 krb5_asn1
.NameType
.prettyPrintNamedValues
=\
191 krb5_asn1
.NameTypeValues
.namedValues
192 krb5_asn1
.NameType
.prettyPrint
=\
193 Integer_NamedValues_prettyPrint
194 krb5_asn1
.AuthDataType
.prettyPrintNamedValues
=\
195 krb5_asn1
.AuthDataTypeValues
.namedValues
196 krb5_asn1
.AuthDataType
.prettyPrint
=\
197 Integer_NamedValues_prettyPrint
198 krb5_asn1
.PADataType
.prettyPrintNamedValues
=\
199 krb5_asn1
.PADataTypeValues
.namedValues
200 krb5_asn1
.PADataType
.prettyPrint
=\
201 Integer_NamedValues_prettyPrint
202 krb5_asn1
.EncryptionType
.prettyPrintNamedValues
=\
203 krb5_asn1
.EncryptionTypeValues
.namedValues
204 krb5_asn1
.EncryptionType
.prettyPrint
=\
205 Integer_NamedValues_prettyPrint
206 krb5_asn1
.ChecksumType
.prettyPrintNamedValues
=\
207 krb5_asn1
.ChecksumTypeValues
.namedValues
208 krb5_asn1
.ChecksumType
.prettyPrint
=\
209 Integer_NamedValues_prettyPrint
210 krb5_asn1
.KerbErrorDataType
.prettyPrintNamedValues
=\
211 krb5_asn1
.KerbErrorDataTypeValues
.namedValues
212 krb5_asn1
.KerbErrorDataType
.prettyPrint
=\
213 Integer_NamedValues_prettyPrint
216 class Krb5EncryptionKey
:
217 def __init__(self
, key
, kvno
):
219 kcrypto
.Enctype
.AES256
: kcrypto
.Cksumtype
.SHA1_AES256
,
220 kcrypto
.Enctype
.AES128
: kcrypto
.Cksumtype
.SHA1_AES128
,
221 kcrypto
.Enctype
.RC4
: kcrypto
.Cksumtype
.HMAC_MD5
,
224 self
.etype
= key
.enctype
225 self
.ctype
= EncTypeChecksum
[self
.etype
]
228 def encrypt(self
, usage
, plaintext
):
229 ciphertext
= kcrypto
.encrypt(self
.key
, usage
, plaintext
)
232 def decrypt(self
, usage
, ciphertext
):
233 plaintext
= kcrypto
.decrypt(self
.key
, usage
, ciphertext
)
236 def make_zeroed_checksum(self
, ctype
=None):
240 checksum_len
= kcrypto
.checksum_len(ctype
)
241 return bytes(checksum_len
)
243 def make_checksum(self
, usage
, plaintext
, ctype
=None):
246 cksum
= kcrypto
.make_checksum(ctype
, self
.key
, usage
, plaintext
)
249 def verify_checksum(self
, usage
, plaintext
, ctype
, cksum
):
250 if self
.ctype
!= ctype
:
251 raise AssertionError(f
'{self.ctype} != {ctype}')
253 kcrypto
.verify_checksum(ctype
,
259 def export_obj(self
):
260 EncryptionKey_obj
= {
261 'keytype': self
.etype
,
262 'keyvalue': self
.key
.contents
,
264 return EncryptionKey_obj
267 class RodcPacEncryptionKey(Krb5EncryptionKey
):
268 def __init__(self
, key
, kvno
, rodc_id
=None):
269 super().__init
__(key
, kvno
)
275 kvno
&= (1 << 16) - 1
277 rodc_id
= kvno
or None
279 if rodc_id
is not None:
280 self
.rodc_id
= rodc_id
.to_bytes(2, byteorder
='little')
284 def make_zeroed_checksum(self
, ctype
=None):
285 checksum
= super().make_zeroed_checksum(ctype
)
286 return checksum
+ bytes(len(self
.rodc_id
))
288 def make_checksum(self
, usage
, plaintext
, ctype
=None):
289 checksum
= super().make_checksum(usage
, plaintext
, ctype
)
290 return checksum
+ self
.rodc_id
292 def verify_checksum(self
, usage
, plaintext
, ctype
, cksum
):
294 cksum
, cksum_rodc_id
= cksum
[:-2], cksum
[-2:]
296 if self
.rodc_id
!= cksum_rodc_id
:
297 raise AssertionError(f
'{self.rodc_id.hex()} != '
298 f
'{cksum_rodc_id.hex()}')
300 super().verify_checksum(usage
,
306 class ZeroedChecksumKey(Krb5EncryptionKey
):
307 def make_checksum(self
, usage
, plaintext
, ctype
=None):
308 return self
.make_zeroed_checksum(ctype
)
311 class WrongLengthChecksumKey(Krb5EncryptionKey
):
312 def __init__(self
, key
, kvno
, length
):
313 super().__init
__(key
, kvno
)
315 self
._length
= length
318 def _adjust_to_length(cls
, checksum
, length
):
319 diff
= length
- len(checksum
)
321 checksum
+= bytes(diff
)
323 checksum
= checksum
[:length
]
327 def make_checksum(self
, usage
, plaintext
, ctype
=None):
328 checksum
= super().make_checksum(usage
, plaintext
, ctype
)
329 return self
._adjust
_to
_length
(checksum
, self
._length
)
332 class KerberosCredentials(Credentials
):
334 fast_supported_bits
= (security
.KERB_ENCTYPE_FAST_SUPPORTED |
335 security
.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
336 security
.KERB_ENCTYPE_CLAIMS_SUPPORTED
)
339 super(KerberosCredentials
, self
).__init
__()
341 all_enc_types |
= security
.KERB_ENCTYPE_RC4_HMAC_MD5
342 all_enc_types |
= security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
343 all_enc_types |
= security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
345 self
.as_supported_enctypes
= all_enc_types
346 self
.tgs_supported_enctypes
= all_enc_types
347 self
.ap_supported_enctypes
= all_enc_types
350 self
.forced_keys
= {}
352 self
.forced_salt
= None
356 def set_as_supported_enctypes(self
, value
):
357 self
.as_supported_enctypes
= int(value
)
359 def set_tgs_supported_enctypes(self
, value
):
360 self
.tgs_supported_enctypes
= int(value
)
362 def set_ap_supported_enctypes(self
, value
):
363 self
.ap_supported_enctypes
= int(value
)
365 etype_map
= collections
.OrderedDict([
366 (kcrypto
.Enctype
.AES256
,
367 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
),
368 (kcrypto
.Enctype
.AES128
,
369 security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
),
370 (kcrypto
.Enctype
.RC4
,
371 security
.KERB_ENCTYPE_RC4_HMAC_MD5
),
372 (kcrypto
.Enctype
.DES_MD5
,
373 security
.KERB_ENCTYPE_DES_CBC_MD5
),
374 (kcrypto
.Enctype
.DES_CRC
,
375 security
.KERB_ENCTYPE_DES_CBC_CRC
)
379 def etypes_to_bits(cls
, etypes
):
382 bit
= cls
.etype_map
[etype
]
384 raise ValueError(f
'Got duplicate etype: {etype}')
390 def bits_to_etypes(cls
, bits
):
392 for etype
, bit
in cls
.etype_map
.items():
397 bits
&= ~cls
.fast_supported_bits
399 raise ValueError(f
'Unsupported etype bits: {bits}')
403 def get_as_krb5_etypes(self
):
404 return self
.bits_to_etypes(self
.as_supported_enctypes
)
406 def get_tgs_krb5_etypes(self
):
407 return self
.bits_to_etypes(self
.tgs_supported_enctypes
)
409 def get_ap_krb5_etypes(self
):
410 return self
.bits_to_etypes(self
.ap_supported_enctypes
)
412 def set_kvno(self
, kvno
):
413 # Sign-extend from 32 bits.
421 def set_forced_key(self
, etype
, hexkey
):
423 contents
= binascii
.a2b_hex(hexkey
)
424 key
= kcrypto
.Key(etype
, contents
)
425 self
.forced_keys
[etype
] = RodcPacEncryptionKey(key
, self
.kvno
)
427 def get_forced_key(self
, etype
):
429 return self
.forced_keys
.get(etype
)
431 def set_forced_salt(self
, salt
):
432 self
.forced_salt
= bytes(salt
)
434 def get_forced_salt(self
):
435 return self
.forced_salt
438 if self
.forced_salt
is not None:
439 return self
.forced_salt
441 if self
.get_workstation():
442 salt_string
= '%shost%s.%s' % (
443 self
.get_realm().upper(),
444 self
.get_username().lower().rsplit('$', 1)[0],
445 self
.get_realm().lower())
447 salt_string
= self
.get_realm().upper() + self
.get_username()
449 return salt_string
.encode('utf-8')
451 def set_dn(self
, dn
):
458 class KerberosTicketCreds
:
459 def __init__(self
, ticket
, session_key
,
460 crealm
=None, cname
=None,
461 srealm
=None, sname
=None,
464 encpart_private
=None):
466 self
.session_key
= session_key
471 self
.decryption_key
= decryption_key
472 self
.ticket_private
= ticket_private
473 self
.encpart_private
= encpart_private
476 class RawKerberosTest(TestCaseInTempDir
):
477 """A raw Kerberos Test case."""
479 pac_checksum_types
= {krb5pac
.PAC_TYPE_SRV_CHECKSUM
,
480 krb5pac
.PAC_TYPE_KDC_CHECKSUM
,
481 krb5pac
.PAC_TYPE_TICKET_CHECKSUM
}
484 {"value": -1111, "name": "dummy", },
485 {"value": kcrypto
.Enctype
.AES256
, "name": "aes128", },
486 {"value": kcrypto
.Enctype
.AES128
, "name": "aes256", },
487 {"value": kcrypto
.Enctype
.RC4
, "name": "rc4", },
490 setup_etype_test_permutations_done
= False
493 def setup_etype_test_permutations(cls
):
494 if cls
.setup_etype_test_permutations_done
:
499 num_idxs
= len(cls
.etypes_to_test
)
501 for num
in range(1, num_idxs
+ 1):
502 chunk
= list(itertools
.permutations(range(num_idxs
), num
))
505 permutations
.append(el
)
507 for p
in permutations
:
511 n
= cls
.etypes_to_test
[idx
]["name"]
516 etypes
+= (cls
.etypes_to_test
[idx
]["value"],)
518 r
= {"name": name
, "etypes": etypes
, }
521 cls
.etype_test_permutations
= res
522 cls
.setup_etype_test_permutations_done
= True
525 def etype_test_permutation_name_idx(cls
):
526 cls
.setup_etype_test_permutations()
529 for e
in cls
.etype_test_permutations
:
535 def etype_test_permutation_by_idx(self
, idx
):
536 e
= self
.etype_test_permutations
[idx
]
537 return (e
['name'], e
['etypes'])
543 cls
.host
= samba
.tests
.env_get_var_value('SERVER')
544 cls
.dc_host
= samba
.tests
.env_get_var_value('DC_SERVER')
546 # A dictionary containing credentials that have already been
550 cls
.kdc_fast_support
= False
554 self
.do_asn1_print
= False
555 self
.do_hexdump
= False
557 strict_checking
= samba
.tests
.env_get_var_value('STRICT_CHECKING',
559 if strict_checking
is None:
560 strict_checking
= '1'
561 self
.strict_checking
= bool(int(strict_checking
))
565 self
.unspecified_kvno
= object()
568 self
._disconnect
("tearDown")
571 def _disconnect(self
, reason
):
577 sys
.stderr
.write("disconnect[%s]\n" % reason
)
579 def _connect_tcp(self
, host
):
582 self
.a
= socket
.getaddrinfo(host
, tcp_port
, socket
.AF_UNSPEC
,
583 socket
.SOCK_STREAM
, socket
.SOL_TCP
,
585 self
.s
= socket
.socket(self
.a
[0][0], self
.a
[0][1], self
.a
[0][2])
586 self
.s
.settimeout(10)
587 self
.s
.connect(self
.a
[0][4])
595 def connect(self
, host
):
596 self
.assertNotConnected()
597 self
._connect
_tcp
(host
)
599 sys
.stderr
.write("connected[%s]\n" % host
)
601 def env_get_var(self
, varname
, prefix
,
602 fallback_default
=True,
603 allow_missing
=False):
605 if prefix
is not None:
606 allow_missing_prefix
= allow_missing
or fallback_default
607 val
= samba
.tests
.env_get_var_value(
608 '%s_%s' % (prefix
, varname
),
609 allow_missing
=allow_missing_prefix
)
611 fallback_default
= True
612 if val
is None and fallback_default
:
613 val
= samba
.tests
.env_get_var_value(varname
,
614 allow_missing
=allow_missing
)
617 def _get_krb5_creds_from_env(self
, prefix
,
618 default_username
=None,
619 allow_missing_password
=False,
620 allow_missing_keys
=True,
621 require_strongest_key
=False):
622 c
= KerberosCredentials()
625 domain
= self
.env_get_var('DOMAIN', prefix
)
626 realm
= self
.env_get_var('REALM', prefix
)
627 allow_missing_username
= default_username
is not None
628 username
= self
.env_get_var('USERNAME', prefix
,
629 fallback_default
=False,
630 allow_missing
=allow_missing_username
)
632 username
= default_username
633 password
= self
.env_get_var('PASSWORD', prefix
,
634 fallback_default
=False,
635 allow_missing
=allow_missing_password
)
638 c
.set_username(username
)
639 if password
is not None:
640 c
.set_password(password
)
641 as_supported_enctypes
= self
.env_get_var('AS_SUPPORTED_ENCTYPES',
642 prefix
, allow_missing
=True)
643 if as_supported_enctypes
is not None:
644 c
.set_as_supported_enctypes(as_supported_enctypes
)
645 tgs_supported_enctypes
= self
.env_get_var('TGS_SUPPORTED_ENCTYPES',
646 prefix
, allow_missing
=True)
647 if tgs_supported_enctypes
is not None:
648 c
.set_tgs_supported_enctypes(tgs_supported_enctypes
)
649 ap_supported_enctypes
= self
.env_get_var('AP_SUPPORTED_ENCTYPES',
650 prefix
, allow_missing
=True)
651 if ap_supported_enctypes
is not None:
652 c
.set_ap_supported_enctypes(ap_supported_enctypes
)
654 if require_strongest_key
:
655 kvno_allow_missing
= False
657 aes256_allow_missing
= False
659 aes256_allow_missing
= True
661 kvno_allow_missing
= allow_missing_keys
662 aes256_allow_missing
= allow_missing_keys
663 kvno
= self
.env_get_var('KVNO', prefix
,
664 fallback_default
=False,
665 allow_missing
=kvno_allow_missing
)
668 aes256_key
= self
.env_get_var('AES256_KEY_HEX', prefix
,
669 fallback_default
=False,
670 allow_missing
=aes256_allow_missing
)
671 if aes256_key
is not None:
672 c
.set_forced_key(kcrypto
.Enctype
.AES256
, aes256_key
)
673 aes128_key
= self
.env_get_var('AES128_KEY_HEX', prefix
,
674 fallback_default
=False,
676 if aes128_key
is not None:
677 c
.set_forced_key(kcrypto
.Enctype
.AES128
, aes128_key
)
678 rc4_key
= self
.env_get_var('RC4_KEY_HEX', prefix
,
679 fallback_default
=False, allow_missing
=True)
680 if rc4_key
is not None:
681 c
.set_forced_key(kcrypto
.Enctype
.RC4
, rc4_key
)
683 if not allow_missing_keys
:
684 self
.assertTrue(c
.forced_keys
,
685 'Please supply %s encryption keys '
686 'in environment' % prefix
)
690 def _get_krb5_creds(self
,
692 default_username
=None,
693 allow_missing_password
=False,
694 allow_missing_keys
=True,
695 require_strongest_key
=False,
696 fallback_creds_fn
=None):
697 if prefix
in self
.creds_dict
:
698 return self
.creds_dict
[prefix
]
700 # We don't have the credentials already
704 # Try to obtain them from the environment
705 creds
= self
._get
_krb
5_creds
_from
_env
(
707 default_username
=default_username
,
708 allow_missing_password
=allow_missing_password
,
709 allow_missing_keys
=allow_missing_keys
,
710 require_strongest_key
=require_strongest_key
)
711 except Exception as err
:
712 # An error occurred, so save it for later
715 self
.assertIsNotNone(creds
)
716 # Save the obtained credentials
717 self
.creds_dict
[prefix
] = creds
720 if fallback_creds_fn
is not None:
722 # Try to use the fallback method
723 creds
= fallback_creds_fn()
724 except Exception as err
:
725 print("ERROR FROM ENV: %r" % (env_err
))
726 print("FALLBACK-FN: %s" % (fallback_creds_fn
))
727 print("FALLBACK-ERROR: %r" % (err
))
729 self
.assertIsNotNone(creds
)
730 # Save the obtained credentials
731 self
.creds_dict
[prefix
] = creds
734 # Both methods failed, so raise the exception from the
738 def get_user_creds(self
,
739 allow_missing_password
=False,
740 allow_missing_keys
=True):
741 c
= self
._get
_krb
5_creds
(prefix
=None,
742 allow_missing_password
=allow_missing_password
,
743 allow_missing_keys
=allow_missing_keys
)
746 def get_service_creds(self
,
747 allow_missing_password
=False,
748 allow_missing_keys
=True):
749 c
= self
._get
_krb
5_creds
(prefix
='SERVICE',
750 allow_missing_password
=allow_missing_password
,
751 allow_missing_keys
=allow_missing_keys
)
754 def get_client_creds(self
,
755 allow_missing_password
=False,
756 allow_missing_keys
=True):
757 c
= self
._get
_krb
5_creds
(prefix
='CLIENT',
758 allow_missing_password
=allow_missing_password
,
759 allow_missing_keys
=allow_missing_keys
)
762 def get_server_creds(self
,
763 allow_missing_password
=False,
764 allow_missing_keys
=True):
765 c
= self
._get
_krb
5_creds
(prefix
='SERVER',
766 allow_missing_password
=allow_missing_password
,
767 allow_missing_keys
=allow_missing_keys
)
770 def get_admin_creds(self
,
771 allow_missing_password
=False,
772 allow_missing_keys
=True):
773 c
= self
._get
_krb
5_creds
(prefix
='ADMIN',
774 allow_missing_password
=allow_missing_password
,
775 allow_missing_keys
=allow_missing_keys
)
776 c
.set_gensec_features(c
.get_gensec_features() | FEATURE_SEAL
)
779 def get_rodc_krbtgt_creds(self
,
781 require_strongest_key
=False):
782 if require_strongest_key
:
783 self
.assertTrue(require_keys
)
784 c
= self
._get
_krb
5_creds
(prefix
='RODC_KRBTGT',
785 allow_missing_password
=True,
786 allow_missing_keys
=not require_keys
,
787 require_strongest_key
=require_strongest_key
)
790 def get_krbtgt_creds(self
,
792 require_strongest_key
=False):
793 if require_strongest_key
:
794 self
.assertTrue(require_keys
)
795 c
= self
._get
_krb
5_creds
(prefix
='KRBTGT',
796 default_username
='krbtgt',
797 allow_missing_password
=True,
798 allow_missing_keys
=not require_keys
,
799 require_strongest_key
=require_strongest_key
)
802 def get_anon_creds(self
):
807 def asn1_dump(self
, name
, obj
, asn1_print
=None):
808 if asn1_print
is None:
809 asn1_print
= self
.do_asn1_print
812 sys
.stderr
.write("%s:\n%s" % (name
, obj
))
814 sys
.stderr
.write("%s" % (obj
))
816 def hex_dump(self
, name
, blob
, hexdump
=None):
818 hexdump
= self
.do_hexdump
821 "%s: %d\n%s" % (name
, len(blob
), self
.hexdump(blob
)))
830 if asn1Spec
is not None:
831 class_name
= type(asn1Spec
).__name
__.split(':')[0]
833 class_name
= "<None-asn1Spec>"
834 self
.hex_dump(class_name
, blob
, hexdump
=hexdump
)
835 obj
, _
= pyasn1_der_decode(blob
, asn1Spec
=asn1Spec
)
836 self
.asn1_dump(None, obj
, asn1_print
=asn1_print
)
838 obj
= pyasn1_native_encode(obj
)
849 obj
= pyasn1_native_decode(obj
, asn1Spec
=asn1Spec
)
850 class_name
= type(obj
).__name
__.split(':')[0]
851 if class_name
is not None:
852 self
.asn1_dump(None, obj
, asn1_print
=asn1_print
)
853 blob
= pyasn1_der_encode(obj
)
854 if class_name
is not None:
855 self
.hex_dump(class_name
, blob
, hexdump
=hexdump
)
858 def send_pdu(self
, req
, asn1_print
=None, hexdump
=None):
860 k5_pdu
= self
.der_encode(
861 req
, native_decode
=False, asn1_print
=asn1_print
, hexdump
=False)
862 header
= struct
.pack('>I', len(k5_pdu
))
865 self
.hex_dump("send_pdu", header
, hexdump
=hexdump
)
866 self
.hex_dump("send_pdu", k5_pdu
, hexdump
=hexdump
)
868 sent
= self
.s
.send(req_pdu
, 0)
869 if sent
== len(req_pdu
):
871 req_pdu
= req_pdu
[sent
:]
872 except socket
.error
as e
:
873 self
._disconnect
("send_pdu: %s" % e
)
876 self
._disconnect
("send_pdu: %s" % e
)
879 def recv_raw(self
, num_recv
=0xffff, hexdump
=None, timeout
=None):
882 if timeout
is not None:
883 self
.s
.settimeout(timeout
)
884 rep_pdu
= self
.s
.recv(num_recv
, 0)
885 self
.s
.settimeout(10)
886 if len(rep_pdu
) == 0:
887 self
._disconnect
("recv_raw: EOF")
889 self
.hex_dump("recv_raw", rep_pdu
, hexdump
=hexdump
)
890 except socket
.timeout
:
891 self
.s
.settimeout(10)
892 sys
.stderr
.write("recv_raw: TIMEOUT\n")
893 except socket
.error
as e
:
894 self
._disconnect
("recv_raw: %s" % e
)
897 self
._disconnect
("recv_raw: %s" % e
)
901 def recv_pdu_raw(self
, asn1_print
=None, hexdump
=None, timeout
=None):
904 raw_pdu
= self
.recv_raw(
905 num_recv
=4, hexdump
=hexdump
, timeout
=timeout
)
908 header
= struct
.unpack(">I", raw_pdu
[0:4])
915 raw_pdu
= self
.recv_raw(
916 num_recv
=missing
, hexdump
=hexdump
, timeout
=timeout
)
917 self
.assertGreaterEqual(len(raw_pdu
), 1)
919 missing
= k5_len
- len(rep_pdu
)
920 k5_raw
= self
.der_decode(
926 pvno
= k5_raw
['field-0']
927 self
.assertEqual(pvno
, 5)
928 msg_type
= k5_raw
['field-1']
929 self
.assertIn(msg_type
, [KRB_AS_REP
, KRB_TGS_REP
, KRB_ERROR
])
930 if msg_type
== KRB_AS_REP
:
931 asn1Spec
= krb5_asn1
.AS_REP()
932 elif msg_type
== KRB_TGS_REP
:
933 asn1Spec
= krb5_asn1
.TGS_REP()
934 elif msg_type
== KRB_ERROR
:
935 asn1Spec
= krb5_asn1
.KRB_ERROR()
936 rep
= self
.der_decode(rep_pdu
, asn1Spec
=asn1Spec
,
937 asn1_print
=asn1_print
, hexdump
=False)
938 return (rep
, rep_pdu
)
940 def recv_pdu(self
, asn1_print
=None, hexdump
=None, timeout
=None):
941 (rep
, rep_pdu
) = self
.recv_pdu_raw(asn1_print
=asn1_print
,
946 def assertIsConnected(self
):
947 self
.assertIsNotNone(self
.s
, msg
="Not connected")
949 def assertNotConnected(self
):
950 self
.assertIsNone(self
.s
, msg
="Is connected")
952 def send_recv_transaction(
959 host
= self
.host
if to_rodc
else self
.dc_host
962 self
.send_pdu(req
, asn1_print
=asn1_print
, hexdump
=hexdump
)
964 asn1_print
=asn1_print
, hexdump
=hexdump
, timeout
=timeout
)
966 self
._disconnect
("transaction failed")
968 self
._disconnect
("transaction done")
971 def assertNoValue(self
, value
):
972 self
.assertTrue(value
.isNoValue
)
974 def assertHasValue(self
, value
):
975 self
.assertIsNotNone(value
)
977 def getElementValue(self
, obj
, elem
):
980 def assertElementMissing(self
, obj
, elem
):
981 v
= self
.getElementValue(obj
, elem
)
984 def assertElementPresent(self
, obj
, elem
, expect_empty
=False):
985 v
= self
.getElementValue(obj
, elem
)
986 self
.assertIsNotNone(v
)
987 if self
.strict_checking
:
988 if isinstance(v
, collections
.abc
.Container
):
990 self
.assertEqual(0, len(v
))
992 self
.assertNotEqual(0, len(v
))
994 def assertElementEqual(self
, obj
, elem
, value
):
995 v
= self
.getElementValue(obj
, elem
)
996 self
.assertIsNotNone(v
)
997 self
.assertEqual(v
, value
)
999 def assertElementEqualUTF8(self
, obj
, elem
, value
):
1000 v
= self
.getElementValue(obj
, elem
)
1001 self
.assertIsNotNone(v
)
1002 self
.assertEqual(v
, bytes(value
, 'utf8'))
1004 def assertPrincipalEqual(self
, princ1
, princ2
):
1005 self
.assertEqual(princ1
['name-type'], princ2
['name-type'])
1007 len(princ1
['name-string']),
1008 len(princ2
['name-string']),
1009 msg
="princ1=%s != princ2=%s" % (princ1
, princ2
))
1010 for idx
in range(len(princ1
['name-string'])):
1012 princ1
['name-string'][idx
],
1013 princ2
['name-string'][idx
],
1014 msg
="princ1=%s != princ2=%s" % (princ1
, princ2
))
1016 def assertElementEqualPrincipal(self
, obj
, elem
, value
):
1017 v
= self
.getElementValue(obj
, elem
)
1018 self
.assertIsNotNone(v
)
1019 v
= pyasn1_native_decode(v
, asn1Spec
=krb5_asn1
.PrincipalName())
1020 self
.assertPrincipalEqual(v
, value
)
1022 def assertElementKVNO(self
, obj
, elem
, value
):
1023 v
= self
.getElementValue(obj
, elem
)
1024 if value
== "autodetect":
1026 if value
is not None:
1027 self
.assertIsNotNone(v
)
1028 # The value on the wire should never be 0
1029 self
.assertNotEqual(v
, 0)
1030 # unspecified_kvno means we don't know the kvno,
1031 # but want to enforce its presence
1032 if value
is not self
.unspecified_kvno
:
1034 self
.assertNotEqual(value
, 0)
1035 self
.assertEqual(v
, value
)
1037 self
.assertIsNone(v
)
1039 def assertElementFlags(self
, obj
, elem
, expected
, unexpected
):
1040 v
= self
.getElementValue(obj
, elem
)
1041 self
.assertIsNotNone(v
)
1042 if expected
is not None:
1043 self
.assertIsInstance(expected
, krb5_asn1
.KDCOptions
)
1044 for i
, flag
in enumerate(expected
):
1046 self
.assertEqual('1', v
[i
],
1047 f
"'{expected.namedValues[i]}' "
1049 if unexpected
is not None:
1050 self
.assertIsInstance(unexpected
, krb5_asn1
.KDCOptions
)
1051 for i
, flag
in enumerate(unexpected
):
1053 self
.assertEqual('0', v
[i
],
1054 f
"'{unexpected.namedValues[i]}' "
1055 f
"unexpected in {v}")
1057 def get_KerberosTimeWithUsec(self
, epoch
=None, offset
=None):
1060 if offset
is not None:
1061 epoch
= epoch
+ int(offset
)
1062 dt
= datetime
.datetime
.fromtimestamp(epoch
, tz
=datetime
.timezone
.utc
)
1063 return (dt
.strftime("%Y%m%d%H%M%SZ"), dt
.microsecond
)
1065 def get_KerberosTime(self
, epoch
=None, offset
=None):
1066 (s
, _
) = self
.get_KerberosTimeWithUsec(epoch
=epoch
, offset
=offset
)
1069 def get_EpochFromKerberosTime(self
, kerberos_time
):
1070 if isinstance(kerberos_time
, bytes
):
1071 kerberos_time
= kerberos_time
.decode()
1073 epoch
= datetime
.datetime
.strptime(kerberos_time
,
1075 epoch
= epoch
.replace(tzinfo
=datetime
.timezone
.utc
)
1076 epoch
= int(epoch
.timestamp())
1080 def get_Nonce(self
):
1081 nonce_min
= 0x7f000000
1082 nonce_max
= 0x7fffffff
1083 v
= random
.randint(nonce_min
, nonce_max
)
1086 def get_pa_dict(self
, pa_data
):
1089 if pa_data
is not None:
1091 pa_type
= pa
['padata-type']
1092 if pa_type
in pa_dict
:
1093 raise RuntimeError(f
'Duplicate type {pa_type}')
1094 pa_dict
[pa_type
] = pa
['padata-value']
1098 def SessionKey_create(self
, etype
, contents
, kvno
=None):
1099 key
= kcrypto
.Key(etype
, contents
)
1100 return RodcPacEncryptionKey(key
, kvno
)
1102 def PasswordKey_create(self
, etype
=None, pwd
=None, salt
=None, kvno
=None):
1103 self
.assertIsNotNone(pwd
)
1104 self
.assertIsNotNone(salt
)
1105 key
= kcrypto
.string_to_key(etype
, pwd
, salt
)
1106 return RodcPacEncryptionKey(key
, kvno
)
1108 def PasswordKey_from_etype_info2(self
, creds
, etype_info2
, kvno
=None):
1109 e
= etype_info2
['etype']
1111 salt
= etype_info2
.get('salt')
1113 if e
== kcrypto
.Enctype
.RC4
:
1114 nthash
= creds
.get_nt_hash()
1115 return self
.SessionKey_create(etype
=e
, contents
=nthash
, kvno
=kvno
)
1117 password
= creds
.get_password()
1118 return self
.PasswordKey_create(
1119 etype
=e
, pwd
=password
, salt
=salt
, kvno
=kvno
)
1121 def TicketDecryptionKey_from_creds(self
, creds
, etype
=None):
1124 etypes
= creds
.get_tgs_krb5_etypes()
1128 etype
= kcrypto
.Enctype
.RC4
1130 forced_key
= creds
.get_forced_key(etype
)
1131 if forced_key
is not None:
1134 kvno
= creds
.get_kvno()
1136 fail_msg
= ("%s has no fixed key for etype[%s] kvno[%s] "
1137 "nor a password specified, " % (
1138 creds
.get_username(), etype
, kvno
))
1140 if etype
== kcrypto
.Enctype
.RC4
:
1141 nthash
= creds
.get_nt_hash()
1142 self
.assertIsNotNone(nthash
, msg
=fail_msg
)
1143 return self
.SessionKey_create(etype
=etype
,
1147 password
= creds
.get_password()
1148 self
.assertIsNotNone(password
, msg
=fail_msg
)
1149 salt
= creds
.get_salt()
1150 return self
.PasswordKey_create(etype
=etype
,
1155 def RandomKey(self
, etype
):
1156 e
= kcrypto
._get
_enctype
_profile
(etype
)
1157 contents
= samba
.generate_random_bytes(e
.keysize
)
1158 return self
.SessionKey_create(etype
=etype
, contents
=contents
)
1160 def EncryptionKey_import(self
, EncryptionKey_obj
):
1161 return self
.SessionKey_create(EncryptionKey_obj
['keytype'],
1162 EncryptionKey_obj
['keyvalue'])
1164 def EncryptedData_create(self
, key
, usage
, plaintext
):
1165 # EncryptedData ::= SEQUENCE {
1166 # etype [0] Int32 -- EncryptionType --,
1167 # kvno [1] Int32 OPTIONAL,
1168 # cipher [2] OCTET STRING -- ciphertext
1170 ciphertext
= key
.encrypt(usage
, plaintext
)
1171 EncryptedData_obj
= {
1173 'cipher': ciphertext
1175 if key
.kvno
is not None:
1176 EncryptedData_obj
['kvno'] = key
.kvno
1177 return EncryptedData_obj
1179 def Checksum_create(self
, key
, usage
, plaintext
, ctype
=None):
1180 # Checksum ::= SEQUENCE {
1181 # cksumtype [0] Int32,
1182 # checksum [1] OCTET STRING
1186 checksum
= key
.make_checksum(usage
, plaintext
, ctype
=ctype
)
1189 'checksum': checksum
,
1194 def PrincipalName_create(cls
, name_type
, names
):
1195 # PrincipalName ::= SEQUENCE {
1196 # name-type [0] Int32,
1197 # name-string [1] SEQUENCE OF KerberosString
1199 PrincipalName_obj
= {
1200 'name-type': name_type
,
1201 'name-string': names
,
1203 return PrincipalName_obj
1205 def AuthorizationData_create(self
, ad_type
, ad_data
):
1206 # AuthorizationData ::= SEQUENCE {
1207 # ad-type [0] Int32,
1208 # ad-data [1] OCTET STRING
1214 return AUTH_DATA_obj
1216 def PA_DATA_create(self
, padata_type
, padata_value
):
1217 # PA-DATA ::= SEQUENCE {
1218 # -- NOTE: first tag is [1], not [0]
1219 # padata-type [1] Int32,
1220 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1223 'padata-type': padata_type
,
1224 'padata-value': padata_value
,
1228 def PA_ENC_TS_ENC_create(self
, ts
, usec
):
1229 # PA-ENC-TS-ENC ::= SEQUENCE {
1230 # patimestamp[0] KerberosTime, -- client's time
1231 # pausec[1] krb5int32 OPTIONAL
1233 PA_ENC_TS_ENC_obj
= {
1237 return PA_ENC_TS_ENC_obj
1239 def PA_PAC_OPTIONS_create(self
, options
):
1240 # PA-PAC-OPTIONS ::= SEQUENCE {
1241 # options [0] PACOptionFlags
1243 PA_PAC_OPTIONS_obj
= {
1246 return PA_PAC_OPTIONS_obj
1248 def KRB_FAST_ARMOR_create(self
, armor_type
, armor_value
):
1249 # KrbFastArmor ::= SEQUENCE {
1250 # armor-type [0] Int32,
1251 # armor-value [1] OCTET STRING,
1254 KRB_FAST_ARMOR_obj
= {
1255 'armor-type': armor_type
,
1256 'armor-value': armor_value
1258 return KRB_FAST_ARMOR_obj
1260 def KRB_FAST_REQ_create(self
, fast_options
, padata
, req_body
):
1261 # KrbFastReq ::= SEQUENCE {
1262 # fast-options [0] FastOptions,
1263 # padata [1] SEQUENCE OF PA-DATA,
1264 # req-body [2] KDC-REQ-BODY,
1267 KRB_FAST_REQ_obj
= {
1268 'fast-options': fast_options
,
1270 'req-body': req_body
1272 return KRB_FAST_REQ_obj
1274 def KRB_FAST_ARMORED_REQ_create(self
, armor
, req_checksum
, enc_fast_req
):
1275 # KrbFastArmoredReq ::= SEQUENCE {
1276 # armor [0] KrbFastArmor OPTIONAL,
1277 # req-checksum [1] Checksum,
1278 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1280 KRB_FAST_ARMORED_REQ_obj
= {
1281 'req-checksum': req_checksum
,
1282 'enc-fast-req': enc_fast_req
1284 if armor
is not None:
1285 KRB_FAST_ARMORED_REQ_obj
['armor'] = armor
1286 return KRB_FAST_ARMORED_REQ_obj
1288 def PA_FX_FAST_REQUEST_create(self
, armored_data
):
1289 # PA-FX-FAST-REQUEST ::= CHOICE {
1290 # armored-data [0] KrbFastArmoredReq,
1293 PA_FX_FAST_REQUEST_obj
= {
1294 'armored-data': armored_data
1296 return PA_FX_FAST_REQUEST_obj
1298 def KERB_PA_PAC_REQUEST_create(self
, include_pac
, pa_data_create
=True):
1299 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1300 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1302 # --If FALSE, and PAC present,
1305 KERB_PA_PAC_REQUEST_obj
= {
1306 'include-pac': include_pac
,
1308 if not pa_data_create
:
1309 return KERB_PA_PAC_REQUEST_obj
1310 pa_pac
= self
.der_encode(KERB_PA_PAC_REQUEST_obj
,
1311 asn1Spec
=krb5_asn1
.KERB_PA_PAC_REQUEST())
1312 pa_data
= self
.PA_DATA_create(PADATA_PAC_REQUEST
, pa_pac
)
1315 def get_pa_pac_options(self
, options
):
1316 pac_options
= self
.PA_PAC_OPTIONS_create(options
)
1317 pac_options
= self
.der_encode(pac_options
,
1318 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
1319 pac_options
= self
.PA_DATA_create(PADATA_PAC_OPTIONS
, pac_options
)
1323 def KDC_REQ_BODY_create(self
,
1335 EncAuthorizationData
,
1336 EncAuthorizationData_key
,
1337 EncAuthorizationData_usage
,
1340 # KDC-REQ-BODY ::= SEQUENCE {
1341 # kdc-options [0] KDCOptions,
1342 # cname [1] PrincipalName OPTIONAL
1343 # -- Used only in AS-REQ --,
1346 # -- Also client's in AS-REQ --,
1347 # sname [3] PrincipalName OPTIONAL,
1348 # from [4] KerberosTime OPTIONAL,
1349 # till [5] KerberosTime,
1350 # rtime [6] KerberosTime OPTIONAL,
1352 # etype [8] SEQUENCE OF Int32
1354 # -- in preference order --,
1355 # addresses [9] HostAddresses OPTIONAL,
1356 # enc-authorization-data [10] EncryptedData OPTIONAL
1357 # -- AuthorizationData --,
1358 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1359 # -- NOTE: not empty
1361 if EncAuthorizationData
is not None:
1362 enc_ad_plain
= self
.der_encode(
1363 EncAuthorizationData
,
1364 asn1Spec
=krb5_asn1
.AuthorizationData(),
1365 asn1_print
=asn1_print
,
1367 enc_ad
= self
.EncryptedData_create(EncAuthorizationData_key
,
1368 EncAuthorizationData_usage
,
1372 KDC_REQ_BODY_obj
= {
1373 'kdc-options': kdc_options
,
1379 if cname
is not None:
1380 KDC_REQ_BODY_obj
['cname'] = cname
1381 if sname
is not None:
1382 KDC_REQ_BODY_obj
['sname'] = sname
1383 if from_time
is not None:
1384 KDC_REQ_BODY_obj
['from'] = from_time
1385 if renew_time
is not None:
1386 KDC_REQ_BODY_obj
['rtime'] = renew_time
1387 if addresses
is not None:
1388 KDC_REQ_BODY_obj
['addresses'] = addresses
1389 if enc_ad
is not None:
1390 KDC_REQ_BODY_obj
['enc-authorization-data'] = enc_ad
1391 if additional_tickets
is not None:
1392 KDC_REQ_BODY_obj
['additional-tickets'] = additional_tickets
1393 return KDC_REQ_BODY_obj
1395 def KDC_REQ_create(self
,
1402 # KDC-REQ ::= SEQUENCE {
1403 # -- NOTE: first tag is [1], not [0]
1404 # pvno [1] INTEGER (5) ,
1405 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1406 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1407 # -- NOTE: not empty --,
1408 # req-body [4] KDC-REQ-BODY
1413 'msg-type': msg_type
,
1414 'req-body': req_body
,
1416 if padata
is not None:
1417 KDC_REQ_obj
['padata'] = padata
1418 if asn1Spec
is not None:
1419 KDC_REQ_decoded
= pyasn1_native_decode(
1420 KDC_REQ_obj
, asn1Spec
=asn1Spec
)
1422 KDC_REQ_decoded
= None
1423 return KDC_REQ_obj
, KDC_REQ_decoded
1425 def AS_REQ_create(self
,
1427 kdc_options
, # required
1431 from_time
, # optional
1432 till_time
, # required
1433 renew_time
, # optional
1436 addresses
, # optional
1438 native_decoded_only
=True,
1441 # KDC-REQ ::= SEQUENCE {
1442 # -- NOTE: first tag is [1], not [0]
1443 # pvno [1] INTEGER (5) ,
1444 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1445 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1446 # -- NOTE: not empty --,
1447 # req-body [4] KDC-REQ-BODY
1450 # KDC-REQ-BODY ::= SEQUENCE {
1451 # kdc-options [0] KDCOptions,
1452 # cname [1] PrincipalName OPTIONAL
1453 # -- Used only in AS-REQ --,
1456 # -- Also client's in AS-REQ --,
1457 # sname [3] PrincipalName OPTIONAL,
1458 # from [4] KerberosTime OPTIONAL,
1459 # till [5] KerberosTime,
1460 # rtime [6] KerberosTime OPTIONAL,
1462 # etype [8] SEQUENCE OF Int32
1464 # -- in preference order --,
1465 # addresses [9] HostAddresses OPTIONAL,
1466 # enc-authorization-data [10] EncryptedData OPTIONAL
1467 # -- AuthorizationData --,
1468 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1469 # -- NOTE: not empty
1471 KDC_REQ_BODY_obj
= self
.KDC_REQ_BODY_create(
1483 EncAuthorizationData
=None,
1484 EncAuthorizationData_key
=None,
1485 EncAuthorizationData_usage
=None,
1486 asn1_print
=asn1_print
,
1488 obj
, decoded
= self
.KDC_REQ_create(
1489 msg_type
=KRB_AS_REQ
,
1491 req_body
=KDC_REQ_BODY_obj
,
1492 asn1Spec
=krb5_asn1
.AS_REQ(),
1493 asn1_print
=asn1_print
,
1495 if native_decoded_only
:
1499 def AP_REQ_create(self
, ap_options
, ticket
, authenticator
):
1500 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1501 # pvno [0] INTEGER (5),
1502 # msg-type [1] INTEGER (14),
1503 # ap-options [2] APOptions,
1504 # ticket [3] Ticket,
1505 # authenticator [4] EncryptedData -- Authenticator
1509 'msg-type': KRB_AP_REQ
,
1510 'ap-options': ap_options
,
1512 'authenticator': authenticator
,
1516 def Authenticator_create(
1517 self
, crealm
, cname
, cksum
, cusec
, ctime
, subkey
, seq_number
,
1518 authorization_data
):
1519 # -- Unencrypted authenticator
1520 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1521 # authenticator-vno [0] INTEGER (5),
1523 # cname [2] PrincipalName,
1524 # cksum [3] Checksum OPTIONAL,
1525 # cusec [4] Microseconds,
1526 # ctime [5] KerberosTime,
1527 # subkey [6] EncryptionKey OPTIONAL,
1528 # seq-number [7] UInt32 OPTIONAL,
1529 # authorization-data [8] AuthorizationData OPTIONAL
1531 Authenticator_obj
= {
1532 'authenticator-vno': 5,
1538 if cksum
is not None:
1539 Authenticator_obj
['cksum'] = cksum
1540 if subkey
is not None:
1541 Authenticator_obj
['subkey'] = subkey
1542 if seq_number
is not None:
1543 Authenticator_obj
['seq-number'] = seq_number
1544 if authorization_data
is not None:
1545 Authenticator_obj
['authorization-data'] = authorization_data
1546 return Authenticator_obj
1548 def TGS_REQ_create(self
,
1553 kdc_options
, # required
1557 from_time
, # optional
1558 till_time
, # required
1559 renew_time
, # optional
1562 addresses
, # optional
1563 EncAuthorizationData
,
1564 EncAuthorizationData_key
,
1567 authenticator_subkey
=None,
1568 body_checksum_type
=None,
1569 native_decoded_only
=True,
1572 # KDC-REQ ::= SEQUENCE {
1573 # -- NOTE: first tag is [1], not [0]
1574 # pvno [1] INTEGER (5) ,
1575 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1576 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1577 # -- NOTE: not empty --,
1578 # req-body [4] KDC-REQ-BODY
1581 # KDC-REQ-BODY ::= SEQUENCE {
1582 # kdc-options [0] KDCOptions,
1583 # cname [1] PrincipalName OPTIONAL
1584 # -- Used only in AS-REQ --,
1587 # -- Also client's in AS-REQ --,
1588 # sname [3] PrincipalName OPTIONAL,
1589 # from [4] KerberosTime OPTIONAL,
1590 # till [5] KerberosTime,
1591 # rtime [6] KerberosTime OPTIONAL,
1593 # etype [8] SEQUENCE OF Int32
1595 # -- in preference order --,
1596 # addresses [9] HostAddresses OPTIONAL,
1597 # enc-authorization-data [10] EncryptedData OPTIONAL
1598 # -- AuthorizationData --,
1599 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1600 # -- NOTE: not empty
1603 if authenticator_subkey
is not None:
1604 EncAuthorizationData_usage
= KU_TGS_REQ_AUTH_DAT_SUBKEY
1606 EncAuthorizationData_usage
= KU_TGS_REQ_AUTH_DAT_SESSION
1608 req_body
= self
.KDC_REQ_BODY_create(
1609 kdc_options
=kdc_options
,
1613 from_time
=from_time
,
1614 till_time
=till_time
,
1615 renew_time
=renew_time
,
1618 addresses
=addresses
,
1619 additional_tickets
=additional_tickets
,
1620 EncAuthorizationData
=EncAuthorizationData
,
1621 EncAuthorizationData_key
=EncAuthorizationData_key
,
1622 EncAuthorizationData_usage
=EncAuthorizationData_usage
)
1623 req_body_blob
= self
.der_encode(req_body
,
1624 asn1Spec
=krb5_asn1
.KDC_REQ_BODY(),
1625 asn1_print
=asn1_print
, hexdump
=hexdump
)
1627 req_body_checksum
= self
.Checksum_create(ticket_session_key
,
1628 KU_TGS_REQ_AUTH_CKSUM
,
1630 ctype
=body_checksum_type
)
1633 if authenticator_subkey
is not None:
1634 subkey_obj
= authenticator_subkey
.export_obj()
1635 seq_number
= random
.randint(0, 0xfffffffe)
1636 authenticator
= self
.Authenticator_create(
1639 cksum
=req_body_checksum
,
1643 seq_number
=seq_number
,
1644 authorization_data
=None)
1645 authenticator
= self
.der_encode(
1647 asn1Spec
=krb5_asn1
.Authenticator(),
1648 asn1_print
=asn1_print
,
1651 authenticator
= self
.EncryptedData_create(
1652 ticket_session_key
, KU_TGS_REQ_AUTH
, authenticator
)
1654 ap_options
= krb5_asn1
.APOptions('0')
1655 ap_req
= self
.AP_REQ_create(ap_options
=str(ap_options
),
1657 authenticator
=authenticator
)
1658 ap_req
= self
.der_encode(ap_req
, asn1Spec
=krb5_asn1
.AP_REQ(),
1659 asn1_print
=asn1_print
, hexdump
=hexdump
)
1660 pa_tgs_req
= self
.PA_DATA_create(PADATA_KDC_REQ
, ap_req
)
1661 if padata
is not None:
1662 padata
.append(pa_tgs_req
)
1664 padata
= [pa_tgs_req
]
1666 obj
, decoded
= self
.KDC_REQ_create(
1667 msg_type
=KRB_TGS_REQ
,
1670 asn1Spec
=krb5_asn1
.TGS_REQ(),
1671 asn1_print
=asn1_print
,
1673 if native_decoded_only
:
1677 def PA_S4U2Self_create(self
, name
, realm
, tgt_session_key
, ctype
=None):
1678 # PA-S4U2Self ::= SEQUENCE {
1679 # name [0] PrincipalName,
1681 # cksum [2] Checksum,
1682 # auth [3] GeneralString
1684 cksum_data
= name
['name-type'].to_bytes(4, byteorder
='little')
1685 for n
in name
['name-string']:
1686 cksum_data
+= n
.encode()
1687 cksum_data
+= realm
.encode()
1688 cksum_data
+= "Kerberos".encode()
1689 cksum
= self
.Checksum_create(tgt_session_key
,
1690 KU_NON_KERB_CKSUM_SALT
,
1700 pa_s4u2self
= self
.der_encode(
1701 PA_S4U2Self_obj
, asn1Spec
=krb5_asn1
.PA_S4U2Self())
1702 return self
.PA_DATA_create(PADATA_FOR_USER
, pa_s4u2self
)
1704 def _generic_kdc_exchange(self
,
1705 kdc_exchange_dict
, # required
1706 cname
=None, # optional
1707 realm
=None, # required
1708 sname
=None, # optional
1709 from_time
=None, # optional
1710 till_time
=None, # required
1711 renew_time
=None, # optional
1712 etypes
=None, # required
1713 addresses
=None, # optional
1714 additional_tickets
=None, # optional
1715 EncAuthorizationData
=None, # optional
1716 EncAuthorizationData_key
=None, # optional
1717 EncAuthorizationData_usage
=None): # optional
1719 check_error_fn
= kdc_exchange_dict
['check_error_fn']
1720 check_rep_fn
= kdc_exchange_dict
['check_rep_fn']
1721 generate_fast_fn
= kdc_exchange_dict
['generate_fast_fn']
1722 generate_fast_armor_fn
= kdc_exchange_dict
['generate_fast_armor_fn']
1723 generate_fast_padata_fn
= kdc_exchange_dict
['generate_fast_padata_fn']
1724 generate_padata_fn
= kdc_exchange_dict
['generate_padata_fn']
1725 callback_dict
= kdc_exchange_dict
['callback_dict']
1726 req_msg_type
= kdc_exchange_dict
['req_msg_type']
1727 req_asn1Spec
= kdc_exchange_dict
['req_asn1Spec']
1728 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
1730 expected_error_mode
= kdc_exchange_dict
['expected_error_mode']
1731 kdc_options
= kdc_exchange_dict
['kdc_options']
1733 pac_request
= kdc_exchange_dict
['pac_request']
1734 pac_options
= kdc_exchange_dict
['pac_options']
1736 # Parameters specific to the inner request body
1737 inner_req
= kdc_exchange_dict
['inner_req']
1739 # Parameters specific to the outer request body
1740 outer_req
= kdc_exchange_dict
['outer_req']
1742 if till_time
is None:
1743 till_time
= self
.get_KerberosTime(offset
=36000)
1745 if 'nonce' in kdc_exchange_dict
:
1746 nonce
= kdc_exchange_dict
['nonce']
1748 nonce
= self
.get_Nonce()
1749 kdc_exchange_dict
['nonce'] = nonce
1751 req_body
= self
.KDC_REQ_BODY_create(
1752 kdc_options
=kdc_options
,
1756 from_time
=from_time
,
1757 till_time
=till_time
,
1758 renew_time
=renew_time
,
1761 addresses
=addresses
,
1762 additional_tickets
=additional_tickets
,
1763 EncAuthorizationData
=EncAuthorizationData
,
1764 EncAuthorizationData_key
=EncAuthorizationData_key
,
1765 EncAuthorizationData_usage
=EncAuthorizationData_usage
)
1767 inner_req_body
= dict(req_body
)
1768 if inner_req
is not None:
1769 for key
, value
in inner_req
.items():
1770 if value
is not None:
1771 inner_req_body
[key
] = value
1773 del inner_req_body
[key
]
1774 if outer_req
is not None:
1775 for key
, value
in outer_req
.items():
1776 if value
is not None:
1777 req_body
[key
] = value
1781 additional_padata
= []
1782 if pac_request
is not None:
1783 pa_pac_request
= self
.KERB_PA_PAC_REQUEST_create(pac_request
)
1784 additional_padata
.append(pa_pac_request
)
1785 if pac_options
is not None:
1786 pa_pac_options
= self
.get_pa_pac_options(pac_options
)
1787 additional_padata
.append(pa_pac_options
)
1789 if req_msg_type
== KRB_AS_REQ
:
1791 tgs_req_padata
= None
1793 self
.assertEqual(KRB_TGS_REQ
, req_msg_type
)
1795 tgs_req
= self
.generate_ap_req(kdc_exchange_dict
,
1799 tgs_req_padata
= self
.PA_DATA_create(PADATA_KDC_REQ
, tgs_req
)
1801 if generate_fast_padata_fn
is not None:
1802 self
.assertIsNotNone(generate_fast_fn
)
1803 # This can alter req_body...
1804 fast_padata
, req_body
= generate_fast_padata_fn(kdc_exchange_dict
,
1810 if generate_fast_armor_fn
is not None:
1811 self
.assertIsNotNone(generate_fast_fn
)
1812 fast_ap_req
= generate_fast_armor_fn(kdc_exchange_dict
,
1817 fast_armor_type
= kdc_exchange_dict
['fast_armor_type']
1818 fast_armor
= self
.KRB_FAST_ARMOR_create(fast_armor_type
,
1823 if generate_padata_fn
is not None:
1824 # This can alter req_body...
1825 outer_padata
, req_body
= generate_padata_fn(kdc_exchange_dict
,
1828 self
.assertIsNotNone(outer_padata
)
1829 self
.assertNotIn(PADATA_KDC_REQ
,
1830 [pa
['padata-type'] for pa
in outer_padata
],
1831 'Don\'t create TGS-REQ manually')
1835 if generate_fast_fn
is not None:
1836 armor_key
= kdc_exchange_dict
['armor_key']
1837 self
.assertIsNotNone(armor_key
)
1839 if req_msg_type
== KRB_AS_REQ
:
1840 checksum_blob
= self
.der_encode(
1842 asn1Spec
=krb5_asn1
.KDC_REQ_BODY())
1844 self
.assertEqual(KRB_TGS_REQ
, req_msg_type
)
1845 checksum_blob
= tgs_req
1847 checksum
= self
.Checksum_create(armor_key
,
1851 fast_padata
+= additional_padata
1852 fast
= generate_fast_fn(kdc_exchange_dict
,
1863 if tgs_req_padata
is not None:
1864 padata
.append(tgs_req_padata
)
1866 if fast
is not None:
1869 if outer_padata
is not None:
1870 padata
+= outer_padata
1873 padata
+= additional_padata
1878 kdc_exchange_dict
['req_padata'] = padata
1879 kdc_exchange_dict
['fast_padata'] = fast_padata
1880 kdc_exchange_dict
['req_body'] = inner_req_body
1882 req_obj
, req_decoded
= self
.KDC_REQ_create(msg_type
=req_msg_type
,
1885 asn1Spec
=req_asn1Spec())
1887 to_rodc
= kdc_exchange_dict
['to_rodc']
1889 rep
= self
.send_recv_transaction(req_decoded
, to_rodc
=to_rodc
)
1890 self
.assertIsNotNone(rep
)
1892 msg_type
= self
.getElementValue(rep
, 'msg-type')
1893 self
.assertIsNotNone(msg_type
)
1895 expected_msg_type
= None
1896 if check_error_fn
is not None:
1897 expected_msg_type
= KRB_ERROR
1898 self
.assertIsNone(check_rep_fn
)
1899 self
.assertNotEqual(0, len(expected_error_mode
))
1900 self
.assertNotIn(0, expected_error_mode
)
1901 if check_rep_fn
is not None:
1902 expected_msg_type
= rep_msg_type
1903 self
.assertIsNone(check_error_fn
)
1904 self
.assertEqual(0, len(expected_error_mode
))
1905 self
.assertIsNotNone(expected_msg_type
)
1906 self
.assertEqual(msg_type
, expected_msg_type
)
1908 if msg_type
== KRB_ERROR
:
1909 return check_error_fn(kdc_exchange_dict
,
1913 return check_rep_fn(kdc_exchange_dict
, callback_dict
, rep
)
1915 def as_exchange_dict(self
,
1916 expected_crealm
=None,
1917 expected_cname
=None,
1918 expected_anon
=False,
1919 expected_srealm
=None,
1920 expected_sname
=None,
1921 expected_supported_etypes
=None,
1922 expected_flags
=None,
1923 unexpected_flags
=None,
1924 ticket_decryption_key
=None,
1925 generate_fast_fn
=None,
1926 generate_fast_armor_fn
=None,
1927 generate_fast_padata_fn
=None,
1928 fast_armor_type
=FX_FAST_ARMOR_AP_REQUEST
,
1929 generate_padata_fn
=None,
1930 check_error_fn
=None,
1932 check_kdc_private_fn
=None,
1934 expected_error_mode
=0,
1935 expected_status
=None,
1936 client_as_etypes
=None,
1938 authenticator_subkey
=None,
1951 if expected_error_mode
== 0:
1952 expected_error_mode
= ()
1953 elif not isinstance(expected_error_mode
, collections
.abc
.Container
):
1954 expected_error_mode
= (expected_error_mode
,)
1956 kdc_exchange_dict
= {
1957 'req_msg_type': KRB_AS_REQ
,
1958 'req_asn1Spec': krb5_asn1
.AS_REQ
,
1959 'rep_msg_type': KRB_AS_REP
,
1960 'rep_asn1Spec': krb5_asn1
.AS_REP
,
1961 'rep_encpart_asn1Spec': krb5_asn1
.EncASRepPart
,
1962 'expected_crealm': expected_crealm
,
1963 'expected_cname': expected_cname
,
1964 'expected_anon': expected_anon
,
1965 'expected_srealm': expected_srealm
,
1966 'expected_sname': expected_sname
,
1967 'expected_supported_etypes': expected_supported_etypes
,
1968 'expected_flags': expected_flags
,
1969 'unexpected_flags': unexpected_flags
,
1970 'ticket_decryption_key': ticket_decryption_key
,
1971 'generate_fast_fn': generate_fast_fn
,
1972 'generate_fast_armor_fn': generate_fast_armor_fn
,
1973 'generate_fast_padata_fn': generate_fast_padata_fn
,
1974 'fast_armor_type': fast_armor_type
,
1975 'generate_padata_fn': generate_padata_fn
,
1976 'check_error_fn': check_error_fn
,
1977 'check_rep_fn': check_rep_fn
,
1978 'check_kdc_private_fn': check_kdc_private_fn
,
1979 'callback_dict': callback_dict
,
1980 'expected_error_mode': expected_error_mode
,
1981 'expected_status': expected_status
,
1982 'client_as_etypes': client_as_etypes
,
1983 'expected_salt': expected_salt
,
1984 'authenticator_subkey': authenticator_subkey
,
1985 'preauth_key': preauth_key
,
1986 'armor_key': armor_key
,
1987 'armor_tgt': armor_tgt
,
1988 'armor_subkey': armor_subkey
,
1989 'auth_data': auth_data
,
1990 'kdc_options': kdc_options
,
1991 'inner_req': inner_req
,
1992 'outer_req': outer_req
,
1993 'pac_request': pac_request
,
1994 'pac_options': pac_options
,
1995 'expect_pac': expect_pac
,
1998 if callback_dict
is None:
2001 return kdc_exchange_dict
2003 def tgs_exchange_dict(self
,
2004 expected_crealm
=None,
2005 expected_cname
=None,
2006 expected_anon
=False,
2007 expected_srealm
=None,
2008 expected_sname
=None,
2009 expected_supported_etypes
=None,
2010 expected_flags
=None,
2011 unexpected_flags
=None,
2012 ticket_decryption_key
=None,
2013 generate_fast_fn
=None,
2014 generate_fast_armor_fn
=None,
2015 generate_fast_padata_fn
=None,
2016 fast_armor_type
=FX_FAST_ARMOR_AP_REQUEST
,
2017 generate_padata_fn
=None,
2018 check_error_fn
=None,
2020 check_kdc_private_fn
=None,
2021 expected_error_mode
=0,
2022 expected_status
=None,
2028 authenticator_subkey
=None,
2030 body_checksum_type
=None,
2038 if expected_error_mode
== 0:
2039 expected_error_mode
= ()
2040 elif not isinstance(expected_error_mode
, collections
.abc
.Container
):
2041 expected_error_mode
= (expected_error_mode
,)
2043 kdc_exchange_dict
= {
2044 'req_msg_type': KRB_TGS_REQ
,
2045 'req_asn1Spec': krb5_asn1
.TGS_REQ
,
2046 'rep_msg_type': KRB_TGS_REP
,
2047 'rep_asn1Spec': krb5_asn1
.TGS_REP
,
2048 'rep_encpart_asn1Spec': krb5_asn1
.EncTGSRepPart
,
2049 'expected_crealm': expected_crealm
,
2050 'expected_cname': expected_cname
,
2051 'expected_anon': expected_anon
,
2052 'expected_srealm': expected_srealm
,
2053 'expected_sname': expected_sname
,
2054 'expected_supported_etypes': expected_supported_etypes
,
2055 'expected_flags': expected_flags
,
2056 'unexpected_flags': unexpected_flags
,
2057 'ticket_decryption_key': ticket_decryption_key
,
2058 'generate_fast_fn': generate_fast_fn
,
2059 'generate_fast_armor_fn': generate_fast_armor_fn
,
2060 'generate_fast_padata_fn': generate_fast_padata_fn
,
2061 'fast_armor_type': fast_armor_type
,
2062 'generate_padata_fn': generate_padata_fn
,
2063 'check_error_fn': check_error_fn
,
2064 'check_rep_fn': check_rep_fn
,
2065 'check_kdc_private_fn': check_kdc_private_fn
,
2066 'callback_dict': callback_dict
,
2067 'expected_error_mode': expected_error_mode
,
2068 'expected_status': expected_status
,
2070 'body_checksum_type': body_checksum_type
,
2071 'armor_key': armor_key
,
2072 'armor_tgt': armor_tgt
,
2073 'armor_subkey': armor_subkey
,
2074 'auth_data': auth_data
,
2075 'authenticator_subkey': authenticator_subkey
,
2076 'kdc_options': kdc_options
,
2077 'inner_req': inner_req
,
2078 'outer_req': outer_req
,
2079 'pac_request': pac_request
,
2080 'pac_options': pac_options
,
2081 'expect_pac': expect_pac
,
2084 if callback_dict
is None:
2087 return kdc_exchange_dict
2089 def generic_check_kdc_rep(self
,
2094 expected_crealm
= kdc_exchange_dict
['expected_crealm']
2095 expected_anon
= kdc_exchange_dict
['expected_anon']
2096 expected_srealm
= kdc_exchange_dict
['expected_srealm']
2097 expected_sname
= kdc_exchange_dict
['expected_sname']
2098 ticket_decryption_key
= kdc_exchange_dict
['ticket_decryption_key']
2099 check_kdc_private_fn
= kdc_exchange_dict
['check_kdc_private_fn']
2100 rep_encpart_asn1Spec
= kdc_exchange_dict
['rep_encpart_asn1Spec']
2101 msg_type
= kdc_exchange_dict
['rep_msg_type']
2102 armor_key
= kdc_exchange_dict
['armor_key']
2104 self
.assertElementEqual(rep
, 'msg-type', msg_type
) # AS-REP | TGS-REP
2105 padata
= self
.getElementValue(rep
, 'padata')
2106 if self
.strict_checking
:
2107 self
.assertElementEqualUTF8(rep
, 'crealm', expected_crealm
)
2109 expected_cname
= self
.PrincipalName_create(
2110 name_type
=NT_WELLKNOWN
,
2111 names
=['WELLKNOWN', 'ANONYMOUS'])
2113 expected_cname
= kdc_exchange_dict
['expected_cname']
2114 self
.assertElementEqualPrincipal(rep
, 'cname', expected_cname
)
2115 self
.assertElementPresent(rep
, 'ticket')
2116 ticket
= self
.getElementValue(rep
, 'ticket')
2117 ticket_encpart
= None
2118 ticket_cipher
= None
2119 self
.assertIsNotNone(ticket
)
2120 if ticket
is not None: # Never None, but gives indentation
2121 self
.assertElementEqual(ticket
, 'tkt-vno', 5)
2122 self
.assertElementEqualUTF8(ticket
, 'realm', expected_srealm
)
2123 self
.assertElementEqualPrincipal(ticket
, 'sname', expected_sname
)
2124 self
.assertElementPresent(ticket
, 'enc-part')
2125 ticket_encpart
= self
.getElementValue(ticket
, 'enc-part')
2126 self
.assertIsNotNone(ticket_encpart
)
2127 if ticket_encpart
is not None: # Never None, but gives indentation
2128 self
.assertElementPresent(ticket_encpart
, 'etype')
2129 # 'unspecified' means present, with any value != 0
2130 self
.assertElementKVNO(ticket_encpart
, 'kvno',
2131 self
.unspecified_kvno
)
2132 self
.assertElementPresent(ticket_encpart
, 'cipher')
2133 ticket_cipher
= self
.getElementValue(ticket_encpart
, 'cipher')
2134 self
.assertElementPresent(rep
, 'enc-part')
2135 encpart
= self
.getElementValue(rep
, 'enc-part')
2136 encpart_cipher
= None
2137 self
.assertIsNotNone(encpart
)
2138 if encpart
is not None: # Never None, but gives indentation
2139 self
.assertElementPresent(encpart
, 'etype')
2140 self
.assertElementKVNO(ticket_encpart
, 'kvno', 'autodetect')
2141 self
.assertElementPresent(encpart
, 'cipher')
2142 encpart_cipher
= self
.getElementValue(encpart
, 'cipher')
2144 ticket_checksum
= None
2146 # Get the decryption key for the encrypted part
2147 encpart_decryption_key
, encpart_decryption_usage
= (
2148 self
.get_preauth_key(kdc_exchange_dict
))
2150 if armor_key
is not None:
2151 pa_dict
= self
.get_pa_dict(padata
)
2153 if PADATA_FX_FAST
in pa_dict
:
2154 fx_fast_data
= pa_dict
[PADATA_FX_FAST
]
2155 fast_response
= self
.check_fx_fast_data(kdc_exchange_dict
,
2160 if 'strengthen-key' in fast_response
:
2161 strengthen_key
= self
.EncryptionKey_import(
2162 fast_response
['strengthen-key'])
2163 encpart_decryption_key
= (
2164 self
.generate_strengthen_reply_key(
2166 encpart_decryption_key
))
2168 fast_finished
= fast_response
.get('finished')
2169 if fast_finished
is not None:
2170 ticket_checksum
= fast_finished
['ticket-checksum']
2172 self
.check_rep_padata(kdc_exchange_dict
,
2174 fast_response
['padata'],
2177 ticket_private
= None
2178 if ticket_decryption_key
is not None:
2179 self
.assertElementEqual(ticket_encpart
, 'etype',
2180 ticket_decryption_key
.etype
)
2181 self
.assertElementKVNO(ticket_encpart
, 'kvno',
2182 ticket_decryption_key
.kvno
)
2183 ticket_decpart
= ticket_decryption_key
.decrypt(KU_TICKET
,
2185 ticket_private
= self
.der_decode(
2187 asn1Spec
=krb5_asn1
.EncTicketPart())
2189 encpart_private
= None
2190 self
.assertIsNotNone(encpart_decryption_key
)
2191 if encpart_decryption_key
is not None:
2192 self
.assertElementEqual(encpart
, 'etype',
2193 encpart_decryption_key
.etype
)
2194 if self
.strict_checking
:
2195 self
.assertElementKVNO(encpart
, 'kvno',
2196 encpart_decryption_key
.kvno
)
2197 rep_decpart
= encpart_decryption_key
.decrypt(
2198 encpart_decryption_usage
,
2200 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2201 # application tag 26
2203 encpart_private
= self
.der_decode(
2205 asn1Spec
=rep_encpart_asn1Spec())
2207 encpart_private
= self
.der_decode(
2209 asn1Spec
=krb5_asn1
.EncTGSRepPart())
2211 self
.assertIsNotNone(check_kdc_private_fn
)
2212 if check_kdc_private_fn
is not None:
2213 check_kdc_private_fn(kdc_exchange_dict
, callback_dict
,
2214 rep
, ticket_private
, encpart_private
,
2219 def check_fx_fast_data(self
,
2224 expect_strengthen_key
=True):
2225 fx_fast_data
= self
.der_decode(fx_fast_data
,
2226 asn1Spec
=krb5_asn1
.PA_FX_FAST_REPLY())
2228 enc_fast_rep
= fx_fast_data
['armored-data']['enc-fast-rep']
2229 self
.assertEqual(enc_fast_rep
['etype'], armor_key
.etype
)
2231 fast_rep
= armor_key
.decrypt(KU_FAST_REP
, enc_fast_rep
['cipher'])
2233 fast_response
= self
.der_decode(fast_rep
,
2234 asn1Spec
=krb5_asn1
.KrbFastResponse())
2236 if expect_strengthen_key
and self
.strict_checking
:
2237 self
.assertIn('strengthen-key', fast_response
)
2240 self
.assertIn('finished', fast_response
)
2242 # Ensure that the nonce matches the nonce in the body of the request
2244 nonce
= kdc_exchange_dict
['nonce']
2245 self
.assertEqual(nonce
, fast_response
['nonce'])
2247 return fast_response
2249 def generic_check_kdc_private(self
,
2256 kdc_options
= kdc_exchange_dict
['kdc_options']
2257 canon_pos
= len(tuple(krb5_asn1
.KDCOptions('canonicalize'))) - 1
2258 canonicalize
= (canon_pos
< len(kdc_options
)
2259 and kdc_options
[canon_pos
] == '1')
2260 renewable_pos
= len(tuple(krb5_asn1
.KDCOptions('renewable'))) - 1
2261 renewable
= (renewable_pos
< len(kdc_options
)
2262 and kdc_options
[renewable_pos
] == '1')
2264 expected_crealm
= kdc_exchange_dict
['expected_crealm']
2265 expected_cname
= kdc_exchange_dict
['expected_cname']
2266 expected_srealm
= kdc_exchange_dict
['expected_srealm']
2267 expected_sname
= kdc_exchange_dict
['expected_sname']
2268 ticket_decryption_key
= kdc_exchange_dict
['ticket_decryption_key']
2270 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
2272 expected_flags
= kdc_exchange_dict
.get('expected_flags')
2273 unexpected_flags
= kdc_exchange_dict
.get('unexpected_flags')
2275 ticket
= self
.getElementValue(rep
, 'ticket')
2277 if ticket_checksum
is not None:
2278 armor_key
= kdc_exchange_dict
['armor_key']
2279 self
.verify_ticket_checksum(ticket
, ticket_checksum
, armor_key
)
2281 to_rodc
= kdc_exchange_dict
['to_rodc']
2283 krbtgt_creds
= self
.get_rodc_krbtgt_creds()
2285 krbtgt_creds
= self
.get_krbtgt_creds()
2286 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
2288 expect_pac
= kdc_exchange_dict
['expect_pac']
2290 ticket_session_key
= None
2291 if ticket_private
is not None:
2292 self
.assertElementFlags(ticket_private
, 'flags',
2295 self
.assertElementPresent(ticket_private
, 'key')
2296 ticket_key
= self
.getElementValue(ticket_private
, 'key')
2297 self
.assertIsNotNone(ticket_key
)
2298 if ticket_key
is not None: # Never None, but gives indentation
2299 self
.assertElementPresent(ticket_key
, 'keytype')
2300 self
.assertElementPresent(ticket_key
, 'keyvalue')
2301 ticket_session_key
= self
.EncryptionKey_import(ticket_key
)
2302 self
.assertElementEqualUTF8(ticket_private
, 'crealm',
2304 if self
.strict_checking
:
2305 self
.assertElementEqualPrincipal(ticket_private
, 'cname',
2307 self
.assertElementPresent(ticket_private
, 'transited')
2308 self
.assertElementPresent(ticket_private
, 'authtime')
2309 if self
.strict_checking
:
2310 self
.assertElementPresent(ticket_private
, 'starttime')
2311 self
.assertElementPresent(ticket_private
, 'endtime')
2313 if self
.strict_checking
:
2314 self
.assertElementPresent(ticket_private
, 'renew-till')
2316 self
.assertElementMissing(ticket_private
, 'renew-till')
2317 if self
.strict_checking
:
2318 self
.assertElementEqual(ticket_private
, 'caddr', [])
2319 self
.assertElementPresent(ticket_private
, 'authorization-data',
2320 expect_empty
=not expect_pac
)
2322 encpart_session_key
= None
2323 if encpart_private
is not None:
2324 self
.assertElementPresent(encpart_private
, 'key')
2325 encpart_key
= self
.getElementValue(encpart_private
, 'key')
2326 self
.assertIsNotNone(encpart_key
)
2327 if encpart_key
is not None: # Never None, but gives indentation
2328 self
.assertElementPresent(encpart_key
, 'keytype')
2329 self
.assertElementPresent(encpart_key
, 'keyvalue')
2330 encpart_session_key
= self
.EncryptionKey_import(encpart_key
)
2331 self
.assertElementPresent(encpart_private
, 'last-req')
2332 self
.assertElementEqual(encpart_private
, 'nonce',
2333 kdc_exchange_dict
['nonce'])
2334 if rep_msg_type
== KRB_AS_REP
:
2335 if self
.strict_checking
:
2336 self
.assertElementPresent(encpart_private
,
2339 self
.assertElementMissing(encpart_private
,
2341 self
.assertElementFlags(encpart_private
, 'flags',
2344 self
.assertElementPresent(encpart_private
, 'authtime')
2345 if self
.strict_checking
:
2346 self
.assertElementPresent(encpart_private
, 'starttime')
2347 self
.assertElementPresent(encpart_private
, 'endtime')
2349 if self
.strict_checking
:
2350 self
.assertElementPresent(encpart_private
, 'renew-till')
2352 self
.assertElementMissing(encpart_private
, 'renew-till')
2353 self
.assertElementEqualUTF8(encpart_private
, 'srealm',
2355 self
.assertElementEqualPrincipal(encpart_private
, 'sname',
2357 if self
.strict_checking
:
2358 self
.assertElementEqual(encpart_private
, 'caddr', [])
2360 sent_pac_options
= self
.get_sent_pac_options(kdc_exchange_dict
)
2362 if self
.strict_checking
:
2363 if canonicalize
or '1' in sent_pac_options
:
2364 self
.assertElementPresent(encpart_private
,
2365 'encrypted-pa-data')
2366 enc_pa_dict
= self
.get_pa_dict(
2367 encpart_private
['encrypted-pa-data'])
2369 self
.assertIn(PADATA_SUPPORTED_ETYPES
, enc_pa_dict
)
2371 expected_supported_etypes
= kdc_exchange_dict
[
2372 'expected_supported_etypes']
2373 expected_supported_etypes |
= (
2374 security
.KERB_ENCTYPE_DES_CBC_CRC |
2375 security
.KERB_ENCTYPE_DES_CBC_MD5 |
2376 security
.KERB_ENCTYPE_RC4_HMAC_MD5
)
2378 (supported_etypes
,) = struct
.unpack(
2380 enc_pa_dict
[PADATA_SUPPORTED_ETYPES
])
2382 self
.assertEqual(supported_etypes
,
2383 expected_supported_etypes
)
2385 self
.assertNotIn(PADATA_SUPPORTED_ETYPES
, enc_pa_dict
)
2387 if '1' in sent_pac_options
:
2388 self
.assertIn(PADATA_PAC_OPTIONS
, enc_pa_dict
)
2390 pac_options
= self
.der_decode(
2391 enc_pa_dict
[PADATA_PAC_OPTIONS
],
2392 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
2394 self
.assertElementEqual(pac_options
, 'options',
2397 self
.assertNotIn(PADATA_PAC_OPTIONS
, enc_pa_dict
)
2399 self
.assertElementEqual(encpart_private
,
2400 'encrypted-pa-data',
2403 if ticket_session_key
is not None and encpart_session_key
is not None:
2404 self
.assertEqual(ticket_session_key
.etype
,
2405 encpart_session_key
.etype
)
2406 self
.assertEqual(ticket_session_key
.key
.contents
,
2407 encpart_session_key
.key
.contents
)
2408 if encpart_session_key
is not None:
2409 session_key
= encpart_session_key
2411 session_key
= ticket_session_key
2412 ticket_creds
= KerberosTicketCreds(
2415 crealm
=expected_crealm
,
2416 cname
=expected_cname
,
2417 srealm
=expected_srealm
,
2418 sname
=expected_sname
,
2419 decryption_key
=ticket_decryption_key
,
2420 ticket_private
=ticket_private
,
2421 encpart_private
=encpart_private
)
2423 if ticket_decryption_key
is not None:
2424 self
.verify_ticket(ticket_creds
, krbtgt_key
, expect_pac
=expect_pac
)
2426 kdc_exchange_dict
['rep_ticket_creds'] = ticket_creds
2428 def generic_check_kdc_error(self
,
2434 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
2436 expected_anon
= kdc_exchange_dict
['expected_anon']
2437 expected_srealm
= kdc_exchange_dict
['expected_srealm']
2438 expected_sname
= kdc_exchange_dict
['expected_sname']
2439 expected_error_mode
= kdc_exchange_dict
['expected_error_mode']
2441 sent_fast
= self
.sent_fast(kdc_exchange_dict
)
2443 fast_armor_type
= kdc_exchange_dict
['fast_armor_type']
2445 self
.assertElementEqual(rep
, 'pvno', 5)
2446 self
.assertElementEqual(rep
, 'msg-type', KRB_ERROR
)
2447 error_code
= self
.getElementValue(rep
, 'error-code')
2448 self
.assertIn(error_code
, expected_error_mode
)
2449 if self
.strict_checking
:
2450 self
.assertElementMissing(rep
, 'ctime')
2451 self
.assertElementMissing(rep
, 'cusec')
2452 self
.assertElementPresent(rep
, 'stime')
2453 self
.assertElementPresent(rep
, 'susec')
2454 # error-code checked above
2455 if self
.strict_checking
:
2456 self
.assertElementMissing(rep
, 'crealm')
2457 if expected_anon
and not inner
:
2458 expected_cname
= self
.PrincipalName_create(
2459 name_type
=NT_WELLKNOWN
,
2460 names
=['WELLKNOWN', 'ANONYMOUS'])
2461 self
.assertElementEqualPrincipal(rep
, 'cname', expected_cname
)
2463 self
.assertElementMissing(rep
, 'cname')
2464 self
.assertElementEqualUTF8(rep
, 'realm', expected_srealm
)
2465 if sent_fast
and error_code
== KDC_ERR_GENERIC
:
2466 self
.assertElementEqualPrincipal(rep
, 'sname',
2467 self
.get_krbtgt_sname())
2469 self
.assertElementEqualPrincipal(rep
, 'sname', expected_sname
)
2470 self
.assertElementMissing(rep
, 'e-text')
2471 if (error_code
== KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
2472 or (rep_msg_type
== KRB_TGS_REP
2474 or (sent_fast
and fast_armor_type
is not None
2475 and fast_armor_type
!= FX_FAST_ARMOR_AP_REQUEST
)
2477 self
.assertElementMissing(rep
, 'e-data')
2479 edata
= self
.getElementValue(rep
, 'e-data')
2480 if self
.strict_checking
:
2481 if error_code
!= KDC_ERR_GENERIC
:
2482 # Predicting whether an ERR_GENERIC error contains e-data is
2484 self
.assertIsNotNone(edata
)
2485 if edata
is not None:
2486 if rep_msg_type
== KRB_TGS_REP
and not sent_fast
:
2487 rep_padata
= [self
.der_decode(edata
,
2488 asn1Spec
=krb5_asn1
.PA_DATA())]
2490 rep_padata
= self
.der_decode(edata
,
2491 asn1Spec
=krb5_asn1
.METHOD_DATA())
2492 self
.assertGreater(len(rep_padata
), 0)
2495 self
.assertEqual(1, len(rep_padata
))
2496 rep_pa_dict
= self
.get_pa_dict(rep_padata
)
2497 self
.assertIn(PADATA_FX_FAST
, rep_pa_dict
)
2499 armor_key
= kdc_exchange_dict
['armor_key']
2500 self
.assertIsNotNone(armor_key
)
2501 fast_response
= self
.check_fx_fast_data(
2503 rep_pa_dict
[PADATA_FX_FAST
],
2505 expect_strengthen_key
=False)
2507 rep_padata
= fast_response
['padata']
2509 etype_info2
= self
.check_rep_padata(kdc_exchange_dict
,
2514 kdc_exchange_dict
['preauth_etype_info2'] = etype_info2
2518 def check_rep_padata(self
,
2523 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
2525 req_body
= kdc_exchange_dict
['req_body']
2526 proposed_etypes
= req_body
['etype']
2527 client_as_etypes
= kdc_exchange_dict
.get('client_as_etypes', [])
2529 sent_fast
= self
.sent_fast(kdc_exchange_dict
)
2530 sent_enc_challenge
= self
.sent_enc_challenge(kdc_exchange_dict
)
2532 if rep_msg_type
== KRB_TGS_REP
:
2533 self
.assertTrue(sent_fast
)
2535 expect_etype_info2
= ()
2536 expect_etype_info
= False
2537 unexpect_etype_info
= True
2538 expected_aes_type
= 0
2539 expected_rc4_type
= 0
2540 if kcrypto
.Enctype
.RC4
in proposed_etypes
:
2541 expect_etype_info
= True
2542 for etype
in proposed_etypes
:
2543 if etype
in (kcrypto
.Enctype
.AES256
, kcrypto
.Enctype
.AES128
):
2544 expect_etype_info
= False
2545 if etype
not in client_as_etypes
:
2547 if etype
in (kcrypto
.Enctype
.AES256
, kcrypto
.Enctype
.AES128
):
2548 if etype
> expected_aes_type
:
2549 expected_aes_type
= etype
2550 if etype
in (kcrypto
.Enctype
.RC4
,) and error_code
!= 0:
2551 unexpect_etype_info
= False
2552 if etype
> expected_rc4_type
:
2553 expected_rc4_type
= etype
2555 if expected_aes_type
!= 0:
2556 expect_etype_info2
+= (expected_aes_type
,)
2557 if expected_rc4_type
!= 0:
2558 expect_etype_info2
+= (expected_rc4_type
,)
2560 expected_patypes
= ()
2561 if sent_fast
and error_code
!= 0:
2562 expected_patypes
+= (PADATA_FX_ERROR
,)
2563 expected_patypes
+= (PADATA_FX_COOKIE
,)
2565 if rep_msg_type
== KRB_TGS_REP
:
2566 if not sent_fast
and error_code
!= 0:
2567 expected_patypes
+= (PADATA_PW_SALT
,)
2569 sent_pac_options
= self
.get_sent_pac_options(kdc_exchange_dict
)
2570 if ('1' in sent_pac_options
2571 and error_code
not in (0, KDC_ERR_GENERIC
)):
2572 expected_patypes
+= (PADATA_PAC_OPTIONS
,)
2573 elif error_code
!= KDC_ERR_GENERIC
:
2574 if expect_etype_info
:
2575 self
.assertGreater(len(expect_etype_info2
), 0)
2576 expected_patypes
+= (PADATA_ETYPE_INFO
,)
2577 if len(expect_etype_info2
) != 0:
2578 expected_patypes
+= (PADATA_ETYPE_INFO2
,)
2580 if error_code
!= KDC_ERR_PREAUTH_FAILED
:
2582 expected_patypes
+= (PADATA_ENCRYPTED_CHALLENGE
,)
2584 expected_patypes
+= (PADATA_ENC_TIMESTAMP
,)
2586 if not sent_enc_challenge
:
2587 expected_patypes
+= (PADATA_PK_AS_REQ
,)
2588 expected_patypes
+= (PADATA_PK_AS_REP_19
,)
2590 if (self
.kdc_fast_support
2592 and not sent_enc_challenge
):
2593 expected_patypes
+= (PADATA_FX_FAST
,)
2594 expected_patypes
+= (PADATA_FX_COOKIE
,)
2596 if self
.strict_checking
:
2597 for i
, patype
in enumerate(expected_patypes
):
2598 self
.assertElementEqual(rep_padata
[i
], 'padata-type', patype
)
2599 self
.assertEqual(len(rep_padata
), len(expected_patypes
))
2603 enc_timestamp
= None
2604 enc_challenge
= None
2612 for pa
in rep_padata
:
2613 patype
= self
.getElementValue(pa
, 'padata-type')
2614 pavalue
= self
.getElementValue(pa
, 'padata-value')
2615 if patype
== PADATA_ETYPE_INFO2
:
2616 self
.assertIsNone(etype_info2
)
2617 etype_info2
= self
.der_decode(pavalue
,
2618 asn1Spec
=krb5_asn1
.ETYPE_INFO2())
2620 if patype
== PADATA_ETYPE_INFO
:
2621 self
.assertIsNone(etype_info
)
2622 etype_info
= self
.der_decode(pavalue
,
2623 asn1Spec
=krb5_asn1
.ETYPE_INFO())
2625 if patype
== PADATA_ENC_TIMESTAMP
:
2626 self
.assertIsNone(enc_timestamp
)
2627 enc_timestamp
= pavalue
2628 self
.assertEqual(len(enc_timestamp
), 0)
2630 if patype
== PADATA_ENCRYPTED_CHALLENGE
:
2631 self
.assertIsNone(enc_challenge
)
2632 enc_challenge
= pavalue
2634 if patype
== PADATA_PK_AS_REQ
:
2635 self
.assertIsNone(pk_as_req
)
2637 self
.assertEqual(len(pk_as_req
), 0)
2639 if patype
== PADATA_PK_AS_REP_19
:
2640 self
.assertIsNone(pk_as_rep19
)
2641 pk_as_rep19
= pavalue
2642 self
.assertEqual(len(pk_as_rep19
), 0)
2644 if patype
== PADATA_FX_COOKIE
:
2645 self
.assertIsNone(fast_cookie
)
2646 fast_cookie
= pavalue
2647 self
.assertIsNotNone(fast_cookie
)
2649 if patype
== PADATA_FX_ERROR
:
2650 self
.assertIsNone(fast_error
)
2651 fast_error
= pavalue
2652 self
.assertIsNotNone(fast_error
)
2654 if patype
== PADATA_FX_FAST
:
2655 self
.assertIsNone(fx_fast
)
2657 self
.assertEqual(len(fx_fast
), 0)
2659 if patype
== PADATA_PAC_OPTIONS
:
2660 self
.assertIsNone(pac_options
)
2661 pac_options
= self
.der_decode(
2663 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
2665 if patype
== PADATA_PW_SALT
:
2666 self
.assertIsNone(pw_salt
)
2668 self
.assertIsNotNone(pw_salt
)
2671 if fast_cookie
is not None:
2672 kdc_exchange_dict
['fast_cookie'] = fast_cookie
2674 if fast_error
is not None:
2675 fast_error
= self
.der_decode(fast_error
,
2676 asn1Spec
=krb5_asn1
.KRB_ERROR())
2677 self
.generic_check_kdc_error(kdc_exchange_dict
,
2682 if pac_options
is not None:
2683 self
.assertElementEqual(pac_options
, 'options', sent_pac_options
)
2685 if pw_salt
is not None:
2686 self
.assertEqual(12, len(pw_salt
))
2688 status
= int.from_bytes(pw_salt
[:4], 'little')
2689 flags
= int.from_bytes(pw_salt
[8:], 'little')
2691 expected_status
= kdc_exchange_dict
['expected_status']
2692 self
.assertEqual(expected_status
, status
)
2694 self
.assertEqual(3, flags
)
2696 self
.assertIsNone(kdc_exchange_dict
.get('expected_status'))
2698 if enc_challenge
is not None:
2699 if not sent_enc_challenge
:
2700 self
.assertEqual(len(enc_challenge
), 0)
2702 armor_key
= kdc_exchange_dict
['armor_key']
2703 self
.assertIsNotNone(armor_key
)
2705 preauth_key
, _
= self
.get_preauth_key(kdc_exchange_dict
)
2707 kdc_challenge_key
= self
.generate_kdc_challenge_key(
2708 armor_key
, preauth_key
)
2710 # Ensure that the encrypted challenge FAST factor is supported
2712 if self
.strict_checking
:
2713 self
.assertNotEqual(len(enc_challenge
), 0)
2714 if len(enc_challenge
) != 0:
2715 encrypted_challenge
= self
.der_decode(
2717 asn1Spec
=krb5_asn1
.EncryptedData())
2718 self
.assertEqual(encrypted_challenge
['etype'],
2719 kdc_challenge_key
.etype
)
2721 challenge
= kdc_challenge_key
.decrypt(
2722 KU_ENC_CHALLENGE_KDC
,
2723 encrypted_challenge
['cipher'])
2724 challenge
= self
.der_decode(
2726 asn1Spec
=krb5_asn1
.PA_ENC_TS_ENC())
2728 # Retrieve the returned timestamp.
2729 rep_patime
= challenge
['patimestamp']
2730 self
.assertIn('pausec', challenge
)
2732 # Ensure the returned time is within five minutes of the
2734 rep_time
= self
.get_EpochFromKerberosTime(rep_patime
)
2735 current_time
= time
.time()
2737 self
.assertLess(current_time
- 300, rep_time
)
2738 self
.assertLess(rep_time
, current_time
+ 300)
2740 if all(etype
not in client_as_etypes
or etype
not in proposed_etypes
2741 for etype
in (kcrypto
.Enctype
.AES256
,
2742 kcrypto
.Enctype
.AES128
,
2743 kcrypto
.Enctype
.RC4
)):
2744 self
.assertIsNone(etype_info2
)
2745 self
.assertIsNone(etype_info
)
2746 if rep_msg_type
== KRB_AS_REP
:
2747 if self
.strict_checking
:
2749 self
.assertIsNotNone(enc_challenge
)
2750 self
.assertIsNone(enc_timestamp
)
2752 self
.assertIsNotNone(enc_timestamp
)
2753 self
.assertIsNone(enc_challenge
)
2754 self
.assertIsNotNone(pk_as_req
)
2755 self
.assertIsNotNone(pk_as_rep19
)
2757 self
.assertIsNone(enc_timestamp
)
2758 self
.assertIsNone(enc_challenge
)
2759 self
.assertIsNone(pk_as_req
)
2760 self
.assertIsNone(pk_as_rep19
)
2763 if error_code
!= KDC_ERR_GENERIC
:
2764 if self
.strict_checking
:
2765 self
.assertIsNotNone(etype_info2
)
2767 self
.assertIsNone(etype_info2
)
2768 if expect_etype_info
:
2769 self
.assertIsNotNone(etype_info
)
2771 if self
.strict_checking
:
2772 self
.assertIsNone(etype_info
)
2773 if unexpect_etype_info
:
2774 self
.assertIsNone(etype_info
)
2776 if error_code
!= KDC_ERR_GENERIC
and self
.strict_checking
:
2777 self
.assertGreaterEqual(len(etype_info2
), 1)
2778 self
.assertEqual(len(etype_info2
), len(expect_etype_info2
))
2779 for i
in range(0, len(etype_info2
)):
2780 e
= self
.getElementValue(etype_info2
[i
], 'etype')
2781 self
.assertEqual(e
, expect_etype_info2
[i
])
2782 salt
= self
.getElementValue(etype_info2
[i
], 'salt')
2783 if e
== kcrypto
.Enctype
.RC4
:
2784 self
.assertIsNone(salt
)
2786 self
.assertIsNotNone(salt
)
2787 expected_salt
= kdc_exchange_dict
['expected_salt']
2788 if expected_salt
is not None:
2789 self
.assertEqual(salt
, expected_salt
)
2790 s2kparams
= self
.getElementValue(etype_info2
[i
], 's2kparams')
2791 if self
.strict_checking
:
2792 self
.assertIsNone(s2kparams
)
2793 if etype_info
is not None:
2794 self
.assertEqual(len(etype_info
), 1)
2795 e
= self
.getElementValue(etype_info
[0], 'etype')
2796 self
.assertEqual(e
, kcrypto
.Enctype
.RC4
)
2797 self
.assertEqual(e
, expect_etype_info2
[0])
2798 salt
= self
.getElementValue(etype_info
[0], 'salt')
2799 if self
.strict_checking
:
2800 self
.assertIsNotNone(salt
)
2801 self
.assertEqual(len(salt
), 0)
2803 if error_code
not in (KDC_ERR_PREAUTH_FAILED
,
2806 self
.assertIsNotNone(enc_challenge
)
2807 if self
.strict_checking
:
2808 self
.assertIsNone(enc_timestamp
)
2810 self
.assertIsNotNone(enc_timestamp
)
2811 if self
.strict_checking
:
2812 self
.assertIsNone(enc_challenge
)
2813 if not sent_enc_challenge
:
2814 if self
.strict_checking
:
2815 self
.assertIsNotNone(pk_as_req
)
2816 self
.assertIsNotNone(pk_as_rep19
)
2818 self
.assertIsNone(pk_as_req
)
2819 self
.assertIsNone(pk_as_rep19
)
2821 if self
.strict_checking
:
2822 self
.assertIsNone(enc_timestamp
)
2823 self
.assertIsNone(enc_challenge
)
2824 self
.assertIsNone(pk_as_req
)
2825 self
.assertIsNone(pk_as_rep19
)
2829 def generate_simple_fast(self
,
2837 armor_key
= kdc_exchange_dict
['armor_key']
2839 fast_req
= self
.KRB_FAST_REQ_create(fast_options
,
2842 fast_req
= self
.der_encode(fast_req
,
2843 asn1Spec
=krb5_asn1
.KrbFastReq())
2844 fast_req
= self
.EncryptedData_create(armor_key
,
2848 fast_armored_req
= self
.KRB_FAST_ARMORED_REQ_create(fast_armor
,
2852 fx_fast_request
= self
.PA_FX_FAST_REQUEST_create(fast_armored_req
)
2853 fx_fast_request
= self
.der_encode(
2855 asn1Spec
=krb5_asn1
.PA_FX_FAST_REQUEST())
2857 fast_padata
= self
.PA_DATA_create(PADATA_FX_FAST
,
2862 def generate_ap_req(self
,
2868 tgt
= kdc_exchange_dict
['armor_tgt']
2869 authenticator_subkey
= kdc_exchange_dict
['armor_subkey']
2871 req_body_checksum
= None
2873 tgt
= kdc_exchange_dict
['tgt']
2874 authenticator_subkey
= kdc_exchange_dict
['authenticator_subkey']
2875 body_checksum_type
= kdc_exchange_dict
['body_checksum_type']
2877 req_body_blob
= self
.der_encode(req_body
,
2878 asn1Spec
=krb5_asn1
.KDC_REQ_BODY())
2880 req_body_checksum
= self
.Checksum_create(tgt
.session_key
,
2881 KU_TGS_REQ_AUTH_CKSUM
,
2883 ctype
=body_checksum_type
)
2885 auth_data
= kdc_exchange_dict
['auth_data']
2888 if authenticator_subkey
is not None:
2889 subkey_obj
= authenticator_subkey
.export_obj()
2890 seq_number
= random
.randint(0, 0xfffffffe)
2891 (ctime
, cusec
) = self
.get_KerberosTimeWithUsec()
2892 authenticator_obj
= self
.Authenticator_create(
2895 cksum
=req_body_checksum
,
2899 seq_number
=seq_number
,
2900 authorization_data
=auth_data
)
2901 authenticator_blob
= self
.der_encode(
2903 asn1Spec
=krb5_asn1
.Authenticator())
2905 usage
= KU_AP_REQ_AUTH
if armor
else KU_TGS_REQ_AUTH
2906 authenticator
= self
.EncryptedData_create(tgt
.session_key
,
2910 ap_options
= krb5_asn1
.APOptions('0')
2911 ap_req_obj
= self
.AP_REQ_create(ap_options
=str(ap_options
),
2913 authenticator
=authenticator
)
2914 ap_req
= self
.der_encode(ap_req_obj
, asn1Spec
=krb5_asn1
.AP_REQ())
2918 def generate_simple_tgs_padata(self
,
2922 ap_req
= self
.generate_ap_req(kdc_exchange_dict
,
2926 pa_tgs_req
= self
.PA_DATA_create(PADATA_KDC_REQ
, ap_req
)
2927 padata
= [pa_tgs_req
]
2929 return padata
, req_body
2931 def get_preauth_key(self
, kdc_exchange_dict
):
2932 msg_type
= kdc_exchange_dict
['rep_msg_type']
2934 if msg_type
== KRB_AS_REP
:
2935 key
= kdc_exchange_dict
['preauth_key']
2936 usage
= KU_AS_REP_ENC_PART
2938 authenticator_subkey
= kdc_exchange_dict
['authenticator_subkey']
2939 if authenticator_subkey
is not None:
2940 key
= authenticator_subkey
2941 usage
= KU_TGS_REP_ENC_PART_SUB_KEY
2943 tgt
= kdc_exchange_dict
['tgt']
2944 key
= tgt
.session_key
2945 usage
= KU_TGS_REP_ENC_PART_SESSION
2947 self
.assertIsNotNone(key
)
2951 def generate_armor_key(self
, subkey
, session_key
):
2952 armor_key
= kcrypto
.cf2(subkey
.key
,
2956 armor_key
= Krb5EncryptionKey(armor_key
, None)
2960 def generate_strengthen_reply_key(self
, strengthen_key
, reply_key
):
2961 strengthen_reply_key
= kcrypto
.cf2(strengthen_key
.key
,
2965 strengthen_reply_key
= Krb5EncryptionKey(strengthen_reply_key
,
2968 return strengthen_reply_key
2970 def generate_client_challenge_key(self
, armor_key
, longterm_key
):
2971 client_challenge_key
= kcrypto
.cf2(armor_key
.key
,
2973 b
'clientchallengearmor',
2974 b
'challengelongterm')
2975 client_challenge_key
= Krb5EncryptionKey(client_challenge_key
, None)
2977 return client_challenge_key
2979 def generate_kdc_challenge_key(self
, armor_key
, longterm_key
):
2980 kdc_challenge_key
= kcrypto
.cf2(armor_key
.key
,
2982 b
'kdcchallengearmor',
2983 b
'challengelongterm')
2984 kdc_challenge_key
= Krb5EncryptionKey(kdc_challenge_key
, None)
2986 return kdc_challenge_key
2988 def verify_ticket_checksum(self
, ticket
, expected_checksum
, armor_key
):
2989 expected_type
= expected_checksum
['cksumtype']
2990 self
.assertEqual(armor_key
.ctype
, expected_type
)
2992 ticket_blob
= self
.der_encode(ticket
,
2993 asn1Spec
=krb5_asn1
.Ticket())
2994 checksum
= self
.Checksum_create(armor_key
,
2997 self
.assertEqual(expected_checksum
, checksum
)
2999 def verify_ticket(self
, ticket
, krbtgt_key
, expect_pac
=True):
3000 # Check if the ticket is a TGT.
3001 sname
= ticket
.ticket
['sname']
3002 is_tgt
= self
.is_tgs(sname
)
3004 # Decrypt the ticket.
3006 key
= ticket
.decryption_key
3007 enc_part
= ticket
.ticket
['enc-part']
3009 self
.assertElementEqual(enc_part
, 'etype', key
.etype
)
3010 self
.assertElementKVNO(enc_part
, 'kvno', key
.kvno
)
3012 enc_part
= key
.decrypt(KU_TICKET
, enc_part
['cipher'])
3013 enc_part
= self
.der_decode(
3014 enc_part
, asn1Spec
=krb5_asn1
.EncTicketPart())
3016 # Fetch the authorization data from the ticket.
3017 auth_data
= enc_part
.get('authorization-data')
3019 self
.assertIsNotNone(auth_data
)
3020 elif auth_data
is None:
3023 # Get a copy of the authdata with an empty PAC, and the existing PAC
3025 empty_pac
= self
.get_empty_pac()
3026 auth_data
, pac_data
= self
.replace_pac(auth_data
,
3028 expect_pac
=expect_pac
)
3032 # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
3033 # raw type to create a new PAC with zeroed signatures for
3034 # verification. This is because on Windows, the resource_groups field
3035 # is added to PAC_LOGON_INFO after the info3 field has been created,
3036 # which results in a different ordering of pointer values than Samba
3037 # (see commit 0e201ecdc53). Using the raw type avoids changing
3038 # PAC_LOGON_INFO, so verification against Windows can work. We still
3039 # need the PAC_DATA type to retrieve the actual checksums, because the
3040 # signatures in the raw type may contain padding bytes.
3041 pac
= ndr_unpack(krb5pac
.PAC_DATA
,
3043 raw_pac
= ndr_unpack(krb5pac
.PAC_DATA_RAW
,
3048 for pac_buffer
, raw_pac_buffer
in zip(pac
.buffers
, raw_pac
.buffers
):
3049 buffer_type
= pac_buffer
.type
3050 if buffer_type
in self
.pac_checksum_types
:
3051 self
.assertNotIn(buffer_type
, checksums
,
3052 f
'Duplicate checksum type {buffer_type}')
3054 # Fetch the checksum and the checksum type from the PAC buffer.
3055 checksum
= pac_buffer
.info
.signature
3056 ctype
= pac_buffer
.info
.type
3060 checksums
[buffer_type
] = checksum
, ctype
3062 if buffer_type
!= krb5pac
.PAC_TYPE_TICKET_CHECKSUM
:
3063 # Zero the checksum field so that we can later verify the
3064 # checksums. The ticket checksum field is not zeroed.
3066 signature
= ndr_unpack(
3067 krb5pac
.PAC_SIGNATURE_DATA
,
3068 raw_pac_buffer
.info
.remaining
)
3069 signature
.signature
= bytes(len(checksum
))
3070 raw_pac_buffer
.info
.remaining
= ndr_pack(
3073 # Re-encode the PAC.
3074 pac_data
= ndr_pack(raw_pac
)
3076 # Verify the signatures.
3078 server_checksum
, server_ctype
= checksums
[
3079 krb5pac
.PAC_TYPE_SRV_CHECKSUM
]
3080 Krb5EncryptionKey
.verify_checksum(key
,
3081 KU_NON_KERB_CKSUM_SALT
,
3086 kdc_checksum
, kdc_ctype
= checksums
[
3087 krb5pac
.PAC_TYPE_KDC_CHECKSUM
]
3088 krbtgt_key
.verify_checksum(KU_NON_KERB_CKSUM_SALT
,
3094 self
.assertNotIn(krb5pac
.PAC_TYPE_TICKET_CHECKSUM
, checksums
)
3096 ticket_checksum
, ticket_ctype
= checksums
.get(
3097 krb5pac
.PAC_TYPE_TICKET_CHECKSUM
,
3099 if self
.strict_checking
:
3100 self
.assertIsNotNone(ticket_checksum
)
3101 if ticket_checksum
is not None:
3102 enc_part
['authorization-data'] = auth_data
3103 enc_part
= self
.der_encode(enc_part
,
3104 asn1Spec
=krb5_asn1
.EncTicketPart())
3106 krbtgt_key
.verify_checksum(KU_NON_KERB_CKSUM_SALT
,
3111 def modified_ticket(self
,
3113 new_ticket_key
=None,
3117 update_pac_checksums
=True,
3119 include_checksums
=None):
3120 if checksum_keys
is None:
3121 # A dict containing a key for each checksum type to be created in
3125 if include_checksums
is None:
3126 # A dict containing a value for each checksum type; True if the
3127 # checksum type is to be included in the PAC, False if it is to be
3128 # excluded, or None/not present if the checksum is to be included
3129 # based on its presence in the original PAC.
3130 include_checksums
= {}
3132 # Check that the values passed in by the caller make sense.
3134 self
.assertLessEqual(checksum_keys
.keys(), self
.pac_checksum_types
)
3135 self
.assertLessEqual(include_checksums
.keys(), self
.pac_checksum_types
)
3138 self
.assertIsNone(modify_pac_fn
)
3140 update_pac_checksums
= False
3142 if not update_pac_checksums
:
3143 self
.assertFalse(checksum_keys
)
3144 self
.assertFalse(include_checksums
)
3146 expect_pac
= update_pac_checksums
or modify_pac_fn
is not None
3148 key
= ticket
.decryption_key
3150 if new_ticket_key
is None:
3151 # Use the same key to re-encrypt the ticket.
3152 new_ticket_key
= key
3154 if krb5pac
.PAC_TYPE_SRV_CHECKSUM
not in checksum_keys
:
3155 # If the server signature key is not present, fall back to the key
3156 # used to encrypt the ticket.
3157 checksum_keys
[krb5pac
.PAC_TYPE_SRV_CHECKSUM
] = new_ticket_key
3159 if krb5pac
.PAC_TYPE_TICKET_CHECKSUM
not in checksum_keys
:
3160 # If the ticket signature key is not present, fall back to the key
3161 # used for the KDC signature.
3162 kdc_checksum_key
= checksum_keys
.get(krb5pac
.PAC_TYPE_KDC_CHECKSUM
)
3163 if kdc_checksum_key
is not None:
3164 checksum_keys
[krb5pac
.PAC_TYPE_TICKET_CHECKSUM
] = (
3167 # Decrypt the ticket.
3169 enc_part
= ticket
.ticket
['enc-part']
3171 self
.assertElementEqual(enc_part
, 'etype', key
.etype
)
3172 self
.assertElementKVNO(enc_part
, 'kvno', key
.kvno
)
3174 enc_part
= key
.decrypt(KU_TICKET
, enc_part
['cipher'])
3175 enc_part
= self
.der_decode(
3176 enc_part
, asn1Spec
=krb5_asn1
.EncTicketPart())
3178 # Modify the ticket here.
3179 if modify_fn
is not None:
3180 enc_part
= modify_fn(enc_part
)
3182 auth_data
= enc_part
.get('authorization-data')
3184 self
.assertIsNotNone(auth_data
)
3185 if auth_data
is not None:
3188 # Get a copy of the authdata with an empty PAC, and the
3189 # existing PAC (if present).
3190 empty_pac
= self
.get_empty_pac()
3191 empty_pac_auth_data
, pac_data
= self
.replace_pac(auth_data
,
3195 self
.assertIsNotNone(pac_data
)
3196 if pac_data
is not None:
3197 pac
= ndr_unpack(krb5pac
.PAC_DATA
, pac_data
)
3199 # Modify the PAC here.
3200 if modify_pac_fn
is not None:
3201 pac
= modify_pac_fn(pac
)
3203 if update_pac_checksums
:
3204 # Get the enc-part with an empty PAC, which is needed
3205 # to create a ticket signature.
3206 enc_part_to_sign
= enc_part
.copy()
3207 enc_part_to_sign
['authorization-data'] = (
3208 empty_pac_auth_data
)
3209 enc_part_to_sign
= self
.der_encode(
3211 asn1Spec
=krb5_asn1
.EncTicketPart())
3213 self
.update_pac_checksums(pac
,
3218 # Re-encode the PAC.
3219 pac_data
= ndr_pack(pac
)
3220 new_pac
= self
.AuthorizationData_create(AD_WIN2K_PAC
,
3223 # Replace the PAC in the authorization data and re-add it to the
3225 auth_data
, _
= self
.replace_pac(auth_data
, new_pac
)
3226 enc_part
['authorization-data'] = auth_data
3228 # Re-encrypt the ticket enc-part with the new key.
3229 enc_part_new
= self
.der_encode(enc_part
,
3230 asn1Spec
=krb5_asn1
.EncTicketPart())
3231 enc_part_new
= self
.EncryptedData_create(new_ticket_key
,
3235 # Create a copy of the ticket with the new enc-part.
3236 new_ticket
= ticket
.ticket
.copy()
3237 new_ticket
['enc-part'] = enc_part_new
3239 new_ticket_creds
= KerberosTicketCreds(
3241 session_key
=ticket
.session_key
,
3242 crealm
=ticket
.crealm
,
3244 srealm
=ticket
.srealm
,
3246 decryption_key
=new_ticket_key
,
3247 ticket_private
=enc_part
,
3248 encpart_private
=ticket
.encpart_private
)
3250 return new_ticket_creds
3252 def update_pac_checksums(self
,
3257 pac_buffers
= pac
.buffers
3258 checksum_buffers
= {}
3260 # Find the relevant PAC checksum buffers.
3261 for pac_buffer
in pac_buffers
:
3262 buffer_type
= pac_buffer
.type
3263 if buffer_type
in self
.pac_checksum_types
:
3264 self
.assertNotIn(buffer_type
, checksum_buffers
,
3265 f
'Duplicate checksum type {buffer_type}')
3267 checksum_buffers
[buffer_type
] = pac_buffer
3269 # Create any additional buffers that were requested but not
3270 # present. Conversely, remove any buffers that were requested to be
3272 for buffer_type
in self
.pac_checksum_types
:
3273 if buffer_type
in checksum_buffers
:
3274 if include_checksums
.get(buffer_type
) is False:
3275 checksum_buffer
= checksum_buffers
.pop(buffer_type
)
3277 pac
.num_buffers
-= 1
3278 pac_buffers
.remove(checksum_buffer
)
3280 elif include_checksums
.get(buffer_type
) is True:
3281 info
= krb5pac
.PAC_SIGNATURE_DATA()
3283 checksum_buffer
= krb5pac
.PAC_BUFFER()
3284 checksum_buffer
.type = buffer_type
3285 checksum_buffer
.info
= info
3287 pac_buffers
.append(checksum_buffer
)
3288 pac
.num_buffers
+= 1
3290 checksum_buffers
[buffer_type
] = checksum_buffer
3292 # Fill the relevant checksum buffers.
3293 for buffer_type
, checksum_buffer
in checksum_buffers
.items():
3294 checksum_key
= checksum_keys
[buffer_type
]
3295 ctype
= checksum_key
.ctype
& ((1 << 32) - 1)
3297 if buffer_type
== krb5pac
.PAC_TYPE_TICKET_CHECKSUM
:
3298 self
.assertIsNotNone(enc_part
)
3300 signature
= checksum_key
.make_checksum(
3301 KU_NON_KERB_CKSUM_SALT
,
3304 elif buffer_type
== krb5pac
.PAC_TYPE_SRV_CHECKSUM
:
3305 signature
= Krb5EncryptionKey
.make_zeroed_checksum(
3309 signature
= checksum_key
.make_zeroed_checksum()
3311 checksum_buffer
.info
.signature
= signature
3312 checksum_buffer
.info
.type = ctype
3314 # Add the new checksum buffers to the PAC.
3315 pac
.buffers
= pac_buffers
3317 # Calculate the server and KDC checksums and insert them into the PAC.
3319 server_checksum_buffer
= checksum_buffers
.get(
3320 krb5pac
.PAC_TYPE_SRV_CHECKSUM
)
3321 if server_checksum_buffer
is not None:
3322 server_checksum_key
= checksum_keys
[krb5pac
.PAC_TYPE_SRV_CHECKSUM
]
3324 pac_data
= ndr_pack(pac
)
3325 server_checksum
= Krb5EncryptionKey
.make_checksum(
3326 server_checksum_key
,
3327 KU_NON_KERB_CKSUM_SALT
,
3330 server_checksum_buffer
.info
.signature
= server_checksum
3332 kdc_checksum_buffer
= checksum_buffers
.get(
3333 krb5pac
.PAC_TYPE_KDC_CHECKSUM
)
3334 if kdc_checksum_buffer
is not None:
3335 self
.assertIsNotNone(server_checksum_buffer
)
3337 kdc_checksum_key
= checksum_keys
[krb5pac
.PAC_TYPE_KDC_CHECKSUM
]
3339 kdc_checksum
= kdc_checksum_key
.make_checksum(
3340 KU_NON_KERB_CKSUM_SALT
,
3343 kdc_checksum_buffer
.info
.signature
= kdc_checksum
3345 def replace_pac(self
, auth_data
, new_pac
, expect_pac
=True):
3346 if new_pac
is not None:
3347 self
.assertElementEqual(new_pac
, 'ad-type', AD_WIN2K_PAC
)
3348 self
.assertElementPresent(new_pac
, 'ad-data')
3355 for authdata_elem
in auth_data
:
3356 if authdata_elem
['ad-type'] == AD_IF_RELEVANT
:
3357 ad_relevant
= self
.der_decode(
3358 authdata_elem
['ad-data'],
3359 asn1Spec
=krb5_asn1
.AD_IF_RELEVANT())
3362 for relevant_elem
in ad_relevant
:
3363 if relevant_elem
['ad-type'] == AD_WIN2K_PAC
:
3364 self
.assertIsNone(old_pac
, 'Multiple PACs detected')
3365 old_pac
= relevant_elem
['ad-data']
3367 if new_pac
is not None:
3368 relevant_elems
.append(new_pac
)
3370 relevant_elems
.append(relevant_elem
)
3372 self
.assertIsNotNone(old_pac
, 'Expected PAC')
3374 ad_relevant
= self
.der_encode(
3376 asn1Spec
=krb5_asn1
.AD_IF_RELEVANT())
3378 authdata_elem
= self
.AuthorizationData_create(AD_IF_RELEVANT
,
3381 new_auth_data
.append(authdata_elem
)
3384 self
.assertIsNotNone(ad_relevant
, 'Expected AD-RELEVANT')
3386 return new_auth_data
, old_pac
3388 def get_krbtgt_checksum_key(self
):
3389 krbtgt_creds
= self
.get_krbtgt_creds()
3390 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
3393 krb5pac
.PAC_TYPE_KDC_CHECKSUM
: krbtgt_key
3396 def is_tgs(self
, principal
):
3397 name
= principal
['name-string'][0]
3398 return name
in ('krbtgt', b
'krbtgt')
3400 def get_empty_pac(self
):
3401 return self
.AuthorizationData_create(AD_WIN2K_PAC
, bytes(1))
3403 def get_outer_pa_dict(self
, kdc_exchange_dict
):
3404 return self
.get_pa_dict(kdc_exchange_dict
['req_padata'])
3406 def get_fast_pa_dict(self
, kdc_exchange_dict
):
3407 req_pa_dict
= self
.get_pa_dict(kdc_exchange_dict
['fast_padata'])
3412 return self
.get_outer_pa_dict(kdc_exchange_dict
)
3414 def sent_fast(self
, kdc_exchange_dict
):
3415 outer_pa_dict
= self
.get_outer_pa_dict(kdc_exchange_dict
)
3417 return PADATA_FX_FAST
in outer_pa_dict
3419 def sent_enc_challenge(self
, kdc_exchange_dict
):
3420 fast_pa_dict
= self
.get_fast_pa_dict(kdc_exchange_dict
)
3422 return PADATA_ENCRYPTED_CHALLENGE
in fast_pa_dict
3424 def get_sent_pac_options(self
, kdc_exchange_dict
):
3425 fast_pa_dict
= self
.get_fast_pa_dict(kdc_exchange_dict
)
3427 if PADATA_PAC_OPTIONS
not in fast_pa_dict
:
3430 pac_options
= self
.der_decode(fast_pa_dict
[PADATA_PAC_OPTIONS
],
3431 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
3432 pac_options
= pac_options
['options']
3434 # Mask out unsupported bits.
3435 pac_options
, remaining
= pac_options
[:4], pac_options
[4:]
3436 pac_options
+= '0' * len(remaining
)
3440 def get_krbtgt_sname(self
):
3441 krbtgt_creds
= self
.get_krbtgt_creds()
3442 krbtgt_username
= krbtgt_creds
.get_username()
3443 krbtgt_realm
= krbtgt_creds
.get_realm()
3444 krbtgt_sname
= self
.PrincipalName_create(
3445 name_type
=NT_SRV_INST
, names
=[krbtgt_username
, krbtgt_realm
])
3449 def _test_as_exchange(self
,
3455 expected_error_mode
,
3464 expected_flags
=None,
3465 unexpected_flags
=None,
3466 expected_supported_etypes
=None,
3468 ticket_decryption_key
=None,
3473 def _generate_padata_copy(_kdc_exchange_dict
,
3476 return padata
, req_body
3478 if not expected_error_mode
:
3479 check_error_fn
= None
3480 check_rep_fn
= self
.generic_check_kdc_rep
3482 check_error_fn
= self
.generic_check_kdc_error
3485 if padata
is not None:
3486 generate_padata_fn
= _generate_padata_copy
3488 generate_padata_fn
= None
3490 kdc_exchange_dict
= self
.as_exchange_dict(
3491 expected_crealm
=expected_crealm
,
3492 expected_cname
=expected_cname
,
3493 expected_srealm
=expected_srealm
,
3494 expected_sname
=expected_sname
,
3495 expected_supported_etypes
=expected_supported_etypes
,
3496 ticket_decryption_key
=ticket_decryption_key
,
3497 generate_padata_fn
=generate_padata_fn
,
3498 check_error_fn
=check_error_fn
,
3499 check_rep_fn
=check_rep_fn
,
3500 check_kdc_private_fn
=self
.generic_check_kdc_private
,
3501 expected_error_mode
=expected_error_mode
,
3502 client_as_etypes
=client_as_etypes
,
3503 expected_salt
=expected_salt
,
3504 expected_flags
=expected_flags
,
3505 unexpected_flags
=unexpected_flags
,
3506 preauth_key
=preauth_key
,
3507 kdc_options
=str(kdc_options
),
3508 pac_request
=pac_request
,
3509 pac_options
=pac_options
,
3512 rep
= self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
3519 return rep
, kdc_exchange_dict