1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
29 from pyasn1
.codec
.der
.decoder
import decode
as pyasn1_der_decode
30 from pyasn1
.codec
.der
.encoder
import encode
as pyasn1_der_encode
31 from pyasn1
.codec
.native
.decoder
import decode
as pyasn1_native_decode
32 from pyasn1
.codec
.native
.encoder
import encode
as pyasn1_native_encode
34 from pyasn1
.codec
.ber
.encoder
import BitStringEncoder
36 from samba
.credentials
import Credentials
37 from samba
.dcerpc
import security
38 from samba
.gensec
import FEATURE_SEAL
41 from samba
.tests
import TestCaseInTempDir
43 import samba
.tests
.krb5
.rfc4120_pyasn1
as krb5_asn1
44 from samba
.tests
.krb5
.rfc4120_constants
import (
53 KU_NON_KERB_CKSUM_SALT
,
54 KU_TGS_REP_ENC_PART_SESSION
,
55 KU_TGS_REP_ENC_PART_SUB_KEY
,
57 KU_TGS_REQ_AUTH_CKSUM
,
58 KU_TGS_REQ_AUTH_DAT_SESSION
,
59 KU_TGS_REQ_AUTH_DAT_SUBKEY
,
70 import samba
.tests
.krb5
.kcrypto
as kcrypto
73 def BitStringEncoder_encodeValue32(
74 self
, value
, asn1Spec
, encodeFun
, **options
):
76 # BitStrings like KDCOptions or TicketFlags should at least
77 # be 32-Bit on the wire
79 if asn1Spec
is not None:
80 # TODO: try to avoid ASN.1 schema instantiation
81 value
= asn1Spec
.clone(value
)
83 valueLength
= len(value
)
85 alignedValue
= value
<< (8 - valueLength
% 8)
89 substrate
= alignedValue
.asOctets()
90 length
= len(substrate
)
91 # We need at least 32-Bit / 4-Bytes
96 ret
= b
'\x00' + substrate
+ (b
'\x00' * padding
)
97 return ret
, False, True
100 BitStringEncoder
.encodeValue
= BitStringEncoder_encodeValue32
103 def BitString_NamedValues_prettyPrint(self
, scope
=0):
104 ret
= "%s" % self
.asBinary()
107 for byte
in self
.asNumbers():
108 for bit
in [7, 6, 5, 4, 3, 2, 1, 0]:
115 if len(bits
) < highest_bit
:
116 for bitPosition
in range(len(bits
), highest_bit
):
119 delim
= ": (\n%s " % indent
120 for bitPosition
in range(highest_bit
):
121 if bitPosition
in self
.prettyPrintNamedValues
:
122 name
= self
.prettyPrintNamedValues
[bitPosition
]
123 elif bits
[bitPosition
] != 0:
124 name
= "unknown-bit-%u" % bitPosition
127 ret
+= "%s%s:%u" % (delim
, name
, bits
[bitPosition
])
128 delim
= ",\n%s " % indent
129 ret
+= "\n%s)" % indent
133 krb5_asn1
.TicketFlags
.prettyPrintNamedValues
=\
134 krb5_asn1
.TicketFlagsValues
.namedValues
135 krb5_asn1
.TicketFlags
.namedValues
=\
136 krb5_asn1
.TicketFlagsValues
.namedValues
137 krb5_asn1
.TicketFlags
.prettyPrint
=\
138 BitString_NamedValues_prettyPrint
139 krb5_asn1
.KDCOptions
.prettyPrintNamedValues
=\
140 krb5_asn1
.KDCOptionsValues
.namedValues
141 krb5_asn1
.KDCOptions
.namedValues
=\
142 krb5_asn1
.KDCOptionsValues
.namedValues
143 krb5_asn1
.KDCOptions
.prettyPrint
=\
144 BitString_NamedValues_prettyPrint
145 krb5_asn1
.APOptions
.prettyPrintNamedValues
=\
146 krb5_asn1
.APOptionsValues
.namedValues
147 krb5_asn1
.APOptions
.namedValues
=\
148 krb5_asn1
.APOptionsValues
.namedValues
149 krb5_asn1
.APOptions
.prettyPrint
=\
150 BitString_NamedValues_prettyPrint
151 krb5_asn1
.PACOptionFlags
.prettyPrintNamedValues
=\
152 krb5_asn1
.PACOptionFlagsValues
.namedValues
153 krb5_asn1
.PACOptionFlags
.namedValues
=\
154 krb5_asn1
.PACOptionFlagsValues
.namedValues
155 krb5_asn1
.PACOptionFlags
.prettyPrint
=\
156 BitString_NamedValues_prettyPrint
159 def Integer_NamedValues_prettyPrint(self
, scope
=0):
161 if intval
in self
.prettyPrintNamedValues
:
162 name
= self
.prettyPrintNamedValues
[intval
]
164 name
= "<__unknown__>"
165 ret
= "%d (0x%x) %s" % (intval
, intval
, name
)
169 krb5_asn1
.NameType
.prettyPrintNamedValues
=\
170 krb5_asn1
.NameTypeValues
.namedValues
171 krb5_asn1
.NameType
.prettyPrint
=\
172 Integer_NamedValues_prettyPrint
173 krb5_asn1
.AuthDataType
.prettyPrintNamedValues
=\
174 krb5_asn1
.AuthDataTypeValues
.namedValues
175 krb5_asn1
.AuthDataType
.prettyPrint
=\
176 Integer_NamedValues_prettyPrint
177 krb5_asn1
.PADataType
.prettyPrintNamedValues
=\
178 krb5_asn1
.PADataTypeValues
.namedValues
179 krb5_asn1
.PADataType
.prettyPrint
=\
180 Integer_NamedValues_prettyPrint
181 krb5_asn1
.EncryptionType
.prettyPrintNamedValues
=\
182 krb5_asn1
.EncryptionTypeValues
.namedValues
183 krb5_asn1
.EncryptionType
.prettyPrint
=\
184 Integer_NamedValues_prettyPrint
185 krb5_asn1
.ChecksumType
.prettyPrintNamedValues
=\
186 krb5_asn1
.ChecksumTypeValues
.namedValues
187 krb5_asn1
.ChecksumType
.prettyPrint
=\
188 Integer_NamedValues_prettyPrint
189 krb5_asn1
.KerbErrorDataType
.prettyPrintNamedValues
=\
190 krb5_asn1
.KerbErrorDataTypeValues
.namedValues
191 krb5_asn1
.KerbErrorDataType
.prettyPrint
=\
192 Integer_NamedValues_prettyPrint
195 class Krb5EncryptionKey
:
196 def __init__(self
, key
, kvno
):
198 kcrypto
.Enctype
.AES256
: kcrypto
.Cksumtype
.SHA1_AES256
,
199 kcrypto
.Enctype
.AES128
: kcrypto
.Cksumtype
.SHA1_AES128
,
200 kcrypto
.Enctype
.RC4
: kcrypto
.Cksumtype
.HMAC_MD5
,
203 self
.etype
= key
.enctype
204 self
.ctype
= EncTypeChecksum
[self
.etype
]
207 def encrypt(self
, usage
, plaintext
):
208 ciphertext
= kcrypto
.encrypt(self
.key
, usage
, plaintext
)
211 def decrypt(self
, usage
, ciphertext
):
212 plaintext
= kcrypto
.decrypt(self
.key
, usage
, ciphertext
)
215 def make_checksum(self
, usage
, plaintext
, ctype
=None):
218 cksum
= kcrypto
.make_checksum(ctype
, self
.key
, usage
, plaintext
)
221 def export_obj(self
):
222 EncryptionKey_obj
= {
223 'keytype': self
.etype
,
224 'keyvalue': self
.key
.contents
,
226 return EncryptionKey_obj
229 class KerberosCredentials(Credentials
):
231 super(KerberosCredentials
, self
).__init
__()
233 all_enc_types |
= security
.KERB_ENCTYPE_RC4_HMAC_MD5
234 all_enc_types |
= security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
235 all_enc_types |
= security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
237 self
.as_supported_enctypes
= all_enc_types
238 self
.tgs_supported_enctypes
= all_enc_types
239 self
.ap_supported_enctypes
= all_enc_types
242 self
.forced_keys
= {}
244 self
.forced_salt
= None
246 def set_as_supported_enctypes(self
, value
):
247 self
.as_supported_enctypes
= int(value
)
249 def set_tgs_supported_enctypes(self
, value
):
250 self
.tgs_supported_enctypes
= int(value
)
252 def set_ap_supported_enctypes(self
, value
):
253 self
.ap_supported_enctypes
= int(value
)
255 def _get_krb5_etypes(self
, supported_enctypes
):
258 if supported_enctypes
& security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
:
259 etypes
+= (kcrypto
.Enctype
.AES256
,)
260 if supported_enctypes
& security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
:
261 etypes
+= (kcrypto
.Enctype
.AES128
,)
262 if supported_enctypes
& security
.KERB_ENCTYPE_RC4_HMAC_MD5
:
263 etypes
+= (kcrypto
.Enctype
.RC4
,)
267 def get_as_krb5_etypes(self
):
268 return self
._get
_krb
5_etypes
(self
.as_supported_enctypes
)
270 def get_tgs_krb5_etypes(self
):
271 return self
._get
_krb
5_etypes
(self
.tgs_supported_enctypes
)
273 def get_ap_krb5_etypes(self
):
274 return self
._get
_krb
5_etypes
(self
.ap_supported_enctypes
)
276 def set_kvno(self
, kvno
):
282 def set_forced_key(self
, etype
, hexkey
):
284 contents
= binascii
.a2b_hex(hexkey
)
285 key
= kcrypto
.Key(etype
, contents
)
286 self
.forced_keys
[etype
] = Krb5EncryptionKey(key
, self
.kvno
)
288 def get_forced_key(self
, etype
):
290 return self
.forced_keys
.get(etype
, None)
292 def set_forced_salt(self
, salt
):
293 self
.forced_salt
= bytes(salt
)
295 def get_forced_salt(self
):
296 return self
.forced_salt
299 if self
.forced_salt
is not None:
300 return self
.forced_salt
302 if self
.get_workstation():
303 salt_string
= '%shost%s.%s' % (
304 self
.get_realm().upper(),
305 self
.get_username().lower().rsplit('$', 1)[0],
306 self
.get_realm().lower())
308 salt_string
= self
.get_realm().upper() + self
.get_username()
310 return salt_string
.encode('utf-8')
313 class KerberosTicketCreds
:
314 def __init__(self
, ticket
, session_key
,
315 crealm
=None, cname
=None,
316 srealm
=None, sname
=None,
319 encpart_private
=None):
321 self
.session_key
= session_key
326 self
.decryption_key
= decryption_key
327 self
.ticket_private
= ticket_private
328 self
.encpart_private
= encpart_private
331 class RawKerberosTest(TestCaseInTempDir
):
332 """A raw Kerberos Test case."""
335 {"value": -1111, "name": "dummy", },
336 {"value": kcrypto
.Enctype
.AES256
, "name": "aes128", },
337 {"value": kcrypto
.Enctype
.AES128
, "name": "aes256", },
338 {"value": kcrypto
.Enctype
.RC4
, "name": "rc4", },
341 setup_etype_test_permutations_done
= False
344 def setup_etype_test_permutations(cls
):
345 if cls
.setup_etype_test_permutations_done
:
350 num_idxs
= len(cls
.etypes_to_test
)
352 for num
in range(1, num_idxs
+ 1):
353 chunk
= list(itertools
.permutations(range(num_idxs
), num
))
356 permutations
.append(el
)
358 for p
in permutations
:
362 n
= cls
.etypes_to_test
[idx
]["name"]
367 etypes
+= (cls
.etypes_to_test
[idx
]["value"],)
369 r
= {"name": name
, "etypes": etypes
, }
372 cls
.etype_test_permutations
= res
373 cls
.setup_etype_test_permutations_done
= True
376 def etype_test_permutation_name_idx(cls
):
377 cls
.setup_etype_test_permutations()
380 for e
in cls
.etype_test_permutations
:
386 def etype_test_permutation_by_idx(self
, idx
):
387 e
= self
.etype_test_permutations
[idx
]
388 return (e
['name'], e
['etypes'])
394 cls
.host
= samba
.tests
.env_get_var_value('SERVER')
396 # A dictionary containing credentials that have already been
402 self
.do_asn1_print
= False
403 self
.do_hexdump
= False
405 strict_checking
= samba
.tests
.env_get_var_value('STRICT_CHECKING',
407 if strict_checking
is None:
408 strict_checking
= '1'
409 self
.strict_checking
= bool(int(strict_checking
))
413 self
.unspecified_kvno
= object()
416 self
._disconnect
("tearDown")
419 def _disconnect(self
, reason
):
425 sys
.stderr
.write("disconnect[%s]\n" % reason
)
427 def _connect_tcp(self
):
430 self
.a
= socket
.getaddrinfo(self
.host
, tcp_port
, socket
.AF_UNSPEC
,
431 socket
.SOCK_STREAM
, socket
.SOL_TCP
,
433 self
.s
= socket
.socket(self
.a
[0][0], self
.a
[0][1], self
.a
[0][2])
434 self
.s
.settimeout(10)
435 self
.s
.connect(self
.a
[0][4])
444 self
.assertNotConnected()
447 sys
.stderr
.write("connected[%s]\n" % self
.host
)
449 def env_get_var(self
, varname
, prefix
,
450 fallback_default
=True,
451 allow_missing
=False):
453 if prefix
is not None:
454 allow_missing_prefix
= allow_missing
or fallback_default
455 val
= samba
.tests
.env_get_var_value(
456 '%s_%s' % (prefix
, varname
),
457 allow_missing
=allow_missing_prefix
)
459 fallback_default
= True
460 if val
is None and fallback_default
:
461 val
= samba
.tests
.env_get_var_value(varname
,
462 allow_missing
=allow_missing
)
465 def _get_krb5_creds_from_env(self
, prefix
,
466 default_username
=None,
467 allow_missing_password
=False,
468 allow_missing_keys
=True,
469 require_strongest_key
=False):
470 c
= KerberosCredentials()
473 domain
= self
.env_get_var('DOMAIN', prefix
)
474 realm
= self
.env_get_var('REALM', prefix
)
475 allow_missing_username
= default_username
is not None
476 username
= self
.env_get_var('USERNAME', prefix
,
477 fallback_default
=False,
478 allow_missing
=allow_missing_username
)
480 username
= default_username
481 password
= self
.env_get_var('PASSWORD', prefix
,
482 fallback_default
=False,
483 allow_missing
=allow_missing_password
)
486 c
.set_username(username
)
487 if password
is not None:
488 c
.set_password(password
)
489 as_supported_enctypes
= self
.env_get_var('AS_SUPPORTED_ENCTYPES',
490 prefix
, allow_missing
=True)
491 if as_supported_enctypes
is not None:
492 c
.set_as_supported_enctypes(as_supported_enctypes
)
493 tgs_supported_enctypes
= self
.env_get_var('TGS_SUPPORTED_ENCTYPES',
494 prefix
, allow_missing
=True)
495 if tgs_supported_enctypes
is not None:
496 c
.set_tgs_supported_enctypes(tgs_supported_enctypes
)
497 ap_supported_enctypes
= self
.env_get_var('AP_SUPPORTED_ENCTYPES',
498 prefix
, allow_missing
=True)
499 if ap_supported_enctypes
is not None:
500 c
.set_ap_supported_enctypes(ap_supported_enctypes
)
502 if require_strongest_key
:
503 kvno_allow_missing
= False
505 aes256_allow_missing
= False
507 aes256_allow_missing
= True
509 kvno_allow_missing
= allow_missing_keys
510 aes256_allow_missing
= allow_missing_keys
511 kvno
= self
.env_get_var('KVNO', prefix
,
512 fallback_default
=False,
513 allow_missing
=kvno_allow_missing
)
516 aes256_key
= self
.env_get_var('AES256_KEY_HEX', prefix
,
517 fallback_default
=False,
518 allow_missing
=aes256_allow_missing
)
519 if aes256_key
is not None:
520 c
.set_forced_key(kcrypto
.Enctype
.AES256
, aes256_key
)
521 aes128_key
= self
.env_get_var('AES128_KEY_HEX', prefix
,
522 fallback_default
=False,
524 if aes128_key
is not None:
525 c
.set_forced_key(kcrypto
.Enctype
.AES128
, aes128_key
)
526 rc4_key
= self
.env_get_var('RC4_KEY_HEX', prefix
,
527 fallback_default
=False, allow_missing
=True)
528 if rc4_key
is not None:
529 c
.set_forced_key(kcrypto
.Enctype
.RC4
, rc4_key
)
531 if not allow_missing_keys
:
532 self
.assertTrue(c
.forced_keys
,
533 'Please supply %s encryption keys '
534 'in environment' % prefix
)
538 def _get_krb5_creds(self
,
540 default_username
=None,
541 allow_missing_password
=False,
542 allow_missing_keys
=True,
543 require_strongest_key
=False,
544 fallback_creds_fn
=None):
545 if prefix
in self
.creds_dict
:
546 return self
.creds_dict
[prefix
]
548 # We don't have the credentials already
552 # Try to obtain them from the environment
553 creds
= self
._get
_krb
5_creds
_from
_env
(
555 default_username
=default_username
,
556 allow_missing_password
=allow_missing_password
,
557 allow_missing_keys
=allow_missing_keys
,
558 require_strongest_key
=require_strongest_key
)
559 except Exception as err
:
560 # An error occurred, so save it for later
563 self
.assertIsNotNone(creds
)
564 # Save the obtained credentials
565 self
.creds_dict
[prefix
] = creds
568 if fallback_creds_fn
is not None:
570 # Try to use the fallback method
571 creds
= fallback_creds_fn()
572 except Exception as err
:
573 print("ERROR FROM ENV: %r" % (env_err
))
574 print("FALLBACK-FN: %s" % (fallback_creds_fn
))
575 print("FALLBACK-ERROR: %r" % (err
))
577 self
.assertIsNotNone(creds
)
578 # Save the obtained credentials
579 self
.creds_dict
[prefix
] = creds
582 # Both methods failed, so raise the exception from the
586 def get_user_creds(self
,
587 allow_missing_password
=False,
588 allow_missing_keys
=True):
589 c
= self
._get
_krb
5_creds
(prefix
=None,
590 allow_missing_password
=allow_missing_password
,
591 allow_missing_keys
=allow_missing_keys
)
594 def get_service_creds(self
,
595 allow_missing_password
=False,
596 allow_missing_keys
=True):
597 c
= self
._get
_krb
5_creds
(prefix
='SERVICE',
598 allow_missing_password
=allow_missing_password
,
599 allow_missing_keys
=allow_missing_keys
)
602 def get_client_creds(self
,
603 allow_missing_password
=False,
604 allow_missing_keys
=True):
605 c
= self
._get
_krb
5_creds
(prefix
='CLIENT',
606 allow_missing_password
=allow_missing_password
,
607 allow_missing_keys
=allow_missing_keys
)
610 def get_server_creds(self
,
611 allow_missing_password
=False,
612 allow_missing_keys
=True):
613 c
= self
._get
_krb
5_creds
(prefix
='SERVER',
614 allow_missing_password
=allow_missing_password
,
615 allow_missing_keys
=allow_missing_keys
)
618 def get_admin_creds(self
,
619 allow_missing_password
=False,
620 allow_missing_keys
=True):
621 c
= self
._get
_krb
5_creds
(prefix
='ADMIN',
622 allow_missing_password
=allow_missing_password
,
623 allow_missing_keys
=allow_missing_keys
)
624 c
.set_gensec_features(c
.get_gensec_features() | FEATURE_SEAL
)
627 def get_krbtgt_creds(self
,
629 require_strongest_key
=False):
630 if require_strongest_key
:
631 self
.assertTrue(require_keys
)
632 c
= self
._get
_krb
5_creds
(prefix
='KRBTGT',
633 default_username
='krbtgt',
634 allow_missing_password
=True,
635 allow_missing_keys
=not require_keys
,
636 require_strongest_key
=require_strongest_key
)
639 def get_anon_creds(self
):
644 def asn1_dump(self
, name
, obj
, asn1_print
=None):
645 if asn1_print
is None:
646 asn1_print
= self
.do_asn1_print
649 sys
.stderr
.write("%s:\n%s" % (name
, obj
))
651 sys
.stderr
.write("%s" % (obj
))
653 def hex_dump(self
, name
, blob
, hexdump
=None):
655 hexdump
= self
.do_hexdump
658 "%s: %d\n%s" % (name
, len(blob
), self
.hexdump(blob
)))
667 if asn1Spec
is not None:
668 class_name
= type(asn1Spec
).__name
__.split(':')[0]
670 class_name
= "<None-asn1Spec>"
671 self
.hex_dump(class_name
, blob
, hexdump
=hexdump
)
672 obj
, _
= pyasn1_der_decode(blob
, asn1Spec
=asn1Spec
)
673 self
.asn1_dump(None, obj
, asn1_print
=asn1_print
)
675 obj
= pyasn1_native_encode(obj
)
686 obj
= pyasn1_native_decode(obj
, asn1Spec
=asn1Spec
)
687 class_name
= type(obj
).__name
__.split(':')[0]
688 if class_name
is not None:
689 self
.asn1_dump(None, obj
, asn1_print
=asn1_print
)
690 blob
= pyasn1_der_encode(obj
)
691 if class_name
is not None:
692 self
.hex_dump(class_name
, blob
, hexdump
=hexdump
)
695 def send_pdu(self
, req
, asn1_print
=None, hexdump
=None):
697 k5_pdu
= self
.der_encode(
698 req
, native_decode
=False, asn1_print
=asn1_print
, hexdump
=False)
699 header
= struct
.pack('>I', len(k5_pdu
))
702 self
.hex_dump("send_pdu", header
, hexdump
=hexdump
)
703 self
.hex_dump("send_pdu", k5_pdu
, hexdump
=hexdump
)
705 sent
= self
.s
.send(req_pdu
, 0)
706 if sent
== len(req_pdu
):
708 req_pdu
= req_pdu
[sent
:]
709 except socket
.error
as e
:
710 self
._disconnect
("send_pdu: %s" % e
)
713 self
._disconnect
("send_pdu: %s" % e
)
716 def recv_raw(self
, num_recv
=0xffff, hexdump
=None, timeout
=None):
719 if timeout
is not None:
720 self
.s
.settimeout(timeout
)
721 rep_pdu
= self
.s
.recv(num_recv
, 0)
722 self
.s
.settimeout(10)
723 if len(rep_pdu
) == 0:
724 self
._disconnect
("recv_raw: EOF")
726 self
.hex_dump("recv_raw", rep_pdu
, hexdump
=hexdump
)
727 except socket
.timeout
:
728 self
.s
.settimeout(10)
729 sys
.stderr
.write("recv_raw: TIMEOUT\n")
730 except socket
.error
as e
:
731 self
._disconnect
("recv_raw: %s" % e
)
734 self
._disconnect
("recv_raw: %s" % e
)
738 def recv_pdu_raw(self
, asn1_print
=None, hexdump
=None, timeout
=None):
741 raw_pdu
= self
.recv_raw(
742 num_recv
=4, hexdump
=hexdump
, timeout
=timeout
)
745 header
= struct
.unpack(">I", raw_pdu
[0:4])
752 raw_pdu
= self
.recv_raw(
753 num_recv
=missing
, hexdump
=hexdump
, timeout
=timeout
)
754 self
.assertGreaterEqual(len(raw_pdu
), 1)
756 missing
= k5_len
- len(rep_pdu
)
757 k5_raw
= self
.der_decode(
763 pvno
= k5_raw
['field-0']
764 self
.assertEqual(pvno
, 5)
765 msg_type
= k5_raw
['field-1']
766 self
.assertIn(msg_type
, [KRB_AS_REP
, KRB_TGS_REP
, KRB_ERROR
])
767 if msg_type
== KRB_AS_REP
:
768 asn1Spec
= krb5_asn1
.AS_REP()
769 elif msg_type
== KRB_TGS_REP
:
770 asn1Spec
= krb5_asn1
.TGS_REP()
771 elif msg_type
== KRB_ERROR
:
772 asn1Spec
= krb5_asn1
.KRB_ERROR()
773 rep
= self
.der_decode(rep_pdu
, asn1Spec
=asn1Spec
,
774 asn1_print
=asn1_print
, hexdump
=False)
775 return (rep
, rep_pdu
)
777 def recv_pdu(self
, asn1_print
=None, hexdump
=None, timeout
=None):
778 (rep
, rep_pdu
) = self
.recv_pdu_raw(asn1_print
=asn1_print
,
783 def assertIsConnected(self
):
784 self
.assertIsNotNone(self
.s
, msg
="Not connected")
786 def assertNotConnected(self
):
787 self
.assertIsNone(self
.s
, msg
="Is connected")
789 def send_recv_transaction(
797 self
.send_pdu(req
, asn1_print
=asn1_print
, hexdump
=hexdump
)
799 asn1_print
=asn1_print
, hexdump
=hexdump
, timeout
=timeout
)
801 self
._disconnect
("transaction failed")
803 self
._disconnect
("transaction done")
806 def assertNoValue(self
, value
):
807 self
.assertTrue(value
.isNoValue
)
809 def assertHasValue(self
, value
):
810 self
.assertIsNotNone(value
)
812 def getElementValue(self
, obj
, elem
):
813 return obj
.get(elem
, None)
815 def assertElementMissing(self
, obj
, elem
):
816 v
= self
.getElementValue(obj
, elem
)
819 def assertElementPresent(self
, obj
, elem
):
820 v
= self
.getElementValue(obj
, elem
)
821 self
.assertIsNotNone(v
)
822 if self
.strict_checking
:
823 if isinstance(v
, collections
.abc
.Container
):
824 self
.assertNotEqual(0, len(v
))
826 def assertElementEqual(self
, obj
, elem
, value
):
827 v
= self
.getElementValue(obj
, elem
)
828 self
.assertIsNotNone(v
)
829 self
.assertEqual(v
, value
)
831 def assertElementEqualUTF8(self
, obj
, elem
, value
):
832 v
= self
.getElementValue(obj
, elem
)
833 self
.assertIsNotNone(v
)
834 self
.assertEqual(v
, bytes(value
, 'utf8'))
836 def assertPrincipalEqual(self
, princ1
, princ2
):
837 self
.assertEqual(princ1
['name-type'], princ2
['name-type'])
839 len(princ1
['name-string']),
840 len(princ2
['name-string']),
841 msg
="princ1=%s != princ2=%s" % (princ1
, princ2
))
842 for idx
in range(len(princ1
['name-string'])):
844 princ1
['name-string'][idx
],
845 princ2
['name-string'][idx
],
846 msg
="princ1=%s != princ2=%s" % (princ1
, princ2
))
848 def assertElementEqualPrincipal(self
, obj
, elem
, value
):
849 v
= self
.getElementValue(obj
, elem
)
850 self
.assertIsNotNone(v
)
851 v
= pyasn1_native_decode(v
, asn1Spec
=krb5_asn1
.PrincipalName())
852 self
.assertPrincipalEqual(v
, value
)
854 def assertElementKVNO(self
, obj
, elem
, value
):
855 v
= self
.getElementValue(obj
, elem
)
856 if value
== "autodetect":
858 if value
is not None:
859 self
.assertIsNotNone(v
)
860 # The value on the wire should never be 0
861 self
.assertNotEqual(v
, 0)
862 # unspecified_kvno means we don't know the kvno,
863 # but want to enforce its presence
864 if value
is not self
.unspecified_kvno
:
866 self
.assertNotEqual(value
, 0)
867 self
.assertEqual(v
, value
)
871 def get_KerberosTimeWithUsec(self
, epoch
=None, offset
=None):
874 if offset
is not None:
875 epoch
= epoch
+ int(offset
)
876 dt
= datetime
.datetime
.fromtimestamp(epoch
, tz
=datetime
.timezone
.utc
)
877 return (dt
.strftime("%Y%m%d%H%M%SZ"), dt
.microsecond
)
879 def get_KerberosTime(self
, epoch
=None, offset
=None):
880 (s
, _
) = self
.get_KerberosTimeWithUsec(epoch
=epoch
, offset
=offset
)
883 def get_EpochFromKerberosTime(self
, kerberos_time
):
884 if isinstance(kerberos_time
, bytes
):
885 kerberos_time
= kerberos_time
.decode()
887 epoch
= datetime
.datetime
.strptime(kerberos_time
,
889 epoch
= epoch
.replace(tzinfo
=datetime
.timezone
.utc
)
890 epoch
= int(epoch
.timestamp())
895 nonce_min
= 0x7f000000
896 nonce_max
= 0x7fffffff
897 v
= random
.randint(nonce_min
, nonce_max
)
900 def get_pa_dict(self
, pa_data
):
903 if pa_data
is not None:
905 pa_type
= pa
['padata-type']
906 if pa_type
in pa_dict
:
907 raise RuntimeError(f
'Duplicate type {pa_type}')
908 pa_dict
[pa_type
] = pa
['padata-value']
912 def SessionKey_create(self
, etype
, contents
, kvno
=None):
913 key
= kcrypto
.Key(etype
, contents
)
914 return Krb5EncryptionKey(key
, kvno
)
916 def PasswordKey_create(self
, etype
=None, pwd
=None, salt
=None, kvno
=None):
917 self
.assertIsNotNone(pwd
)
918 self
.assertIsNotNone(salt
)
919 key
= kcrypto
.string_to_key(etype
, pwd
, salt
)
920 return Krb5EncryptionKey(key
, kvno
)
922 def PasswordKey_from_etype_info2(self
, creds
, etype_info2
, kvno
=None):
923 e
= etype_info2
['etype']
925 salt
= etype_info2
.get('salt', None)
927 if e
== kcrypto
.Enctype
.RC4
:
928 nthash
= creds
.get_nt_hash()
929 return self
.SessionKey_create(etype
=e
, contents
=nthash
, kvno
=kvno
)
931 password
= creds
.get_password()
932 return self
.PasswordKey_create(
933 etype
=e
, pwd
=password
, salt
=salt
, kvno
=kvno
)
935 def TicketDecryptionKey_from_creds(self
, creds
, etype
=None):
938 etypes
= creds
.get_tgs_krb5_etypes()
941 forced_key
= creds
.get_forced_key(etype
)
942 if forced_key
is not None:
945 kvno
= creds
.get_kvno()
947 fail_msg
= ("%s has no fixed key for etype[%s] kvno[%s] "
948 "nor a password specified, " % (
949 creds
.get_username(), etype
, kvno
))
951 if etype
== kcrypto
.Enctype
.RC4
:
952 nthash
= creds
.get_nt_hash()
953 self
.assertIsNotNone(nthash
, msg
=fail_msg
)
954 return self
.SessionKey_create(etype
=etype
,
958 password
= creds
.get_password()
959 self
.assertIsNotNone(password
, msg
=fail_msg
)
960 salt
= creds
.get_salt()
961 return self
.PasswordKey_create(etype
=etype
,
966 def RandomKey(self
, etype
):
967 e
= kcrypto
._get
_enctype
_profile
(etype
)
968 contents
= samba
.generate_random_bytes(e
.keysize
)
969 return self
.SessionKey_create(etype
=etype
, contents
=contents
)
971 def EncryptionKey_import(self
, EncryptionKey_obj
):
972 return self
.SessionKey_create(EncryptionKey_obj
['keytype'],
973 EncryptionKey_obj
['keyvalue'])
975 def EncryptedData_create(self
, key
, usage
, plaintext
):
976 # EncryptedData ::= SEQUENCE {
977 # etype [0] Int32 -- EncryptionType --,
978 # kvno [1] UInt32 OPTIONAL,
979 # cipher [2] OCTET STRING -- ciphertext
981 ciphertext
= key
.encrypt(usage
, plaintext
)
982 EncryptedData_obj
= {
986 if key
.kvno
is not None:
987 EncryptedData_obj
['kvno'] = key
.kvno
988 return EncryptedData_obj
990 def Checksum_create(self
, key
, usage
, plaintext
, ctype
=None):
991 # Checksum ::= SEQUENCE {
992 # cksumtype [0] Int32,
993 # checksum [1] OCTET STRING
997 checksum
= key
.make_checksum(usage
, plaintext
, ctype
=ctype
)
1000 'checksum': checksum
,
1005 def PrincipalName_create(cls
, name_type
, names
):
1006 # PrincipalName ::= SEQUENCE {
1007 # name-type [0] Int32,
1008 # name-string [1] SEQUENCE OF KerberosString
1010 PrincipalName_obj
= {
1011 'name-type': name_type
,
1012 'name-string': names
,
1014 return PrincipalName_obj
1016 def PA_DATA_create(self
, padata_type
, padata_value
):
1017 # PA-DATA ::= SEQUENCE {
1018 # -- NOTE: first tag is [1], not [0]
1019 # padata-type [1] Int32,
1020 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1023 'padata-type': padata_type
,
1024 'padata-value': padata_value
,
1028 def PA_ENC_TS_ENC_create(self
, ts
, usec
):
1029 # PA-ENC-TS-ENC ::= SEQUENCE {
1030 # patimestamp[0] KerberosTime, -- client's time
1031 # pausec[1] krb5int32 OPTIONAL
1033 PA_ENC_TS_ENC_obj
= {
1037 return PA_ENC_TS_ENC_obj
1039 def KERB_PA_PAC_REQUEST_create(self
, include_pac
, pa_data_create
=True):
1040 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1041 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1043 # --If FALSE, and PAC present,
1046 KERB_PA_PAC_REQUEST_obj
= {
1047 'include-pac': include_pac
,
1049 if not pa_data_create
:
1050 return KERB_PA_PAC_REQUEST_obj
1051 pa_pac
= self
.der_encode(KERB_PA_PAC_REQUEST_obj
,
1052 asn1Spec
=krb5_asn1
.KERB_PA_PAC_REQUEST())
1053 pa_data
= self
.PA_DATA_create(PADATA_PAC_REQUEST
, pa_pac
)
1056 def KDC_REQ_BODY_create(self
,
1068 EncAuthorizationData
,
1069 EncAuthorizationData_key
,
1070 EncAuthorizationData_usage
,
1073 # KDC-REQ-BODY ::= SEQUENCE {
1074 # kdc-options [0] KDCOptions,
1075 # cname [1] PrincipalName OPTIONAL
1076 # -- Used only in AS-REQ --,
1079 # -- Also client's in AS-REQ --,
1080 # sname [3] PrincipalName OPTIONAL,
1081 # from [4] KerberosTime OPTIONAL,
1082 # till [5] KerberosTime,
1083 # rtime [6] KerberosTime OPTIONAL,
1085 # etype [8] SEQUENCE OF Int32
1087 # -- in preference order --,
1088 # addresses [9] HostAddresses OPTIONAL,
1089 # enc-authorization-data [10] EncryptedData OPTIONAL
1090 # -- AuthorizationData --,
1091 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1092 # -- NOTE: not empty
1094 if EncAuthorizationData
is not None:
1095 enc_ad_plain
= self
.der_encode(
1096 EncAuthorizationData
,
1097 asn1Spec
=krb5_asn1
.AuthorizationData(),
1098 asn1_print
=asn1_print
,
1100 enc_ad
= self
.EncryptedData_create(EncAuthorizationData_key
,
1101 EncAuthorizationData_usage
,
1105 KDC_REQ_BODY_obj
= {
1106 'kdc-options': kdc_options
,
1112 if cname
is not None:
1113 KDC_REQ_BODY_obj
['cname'] = cname
1114 if sname
is not None:
1115 KDC_REQ_BODY_obj
['sname'] = sname
1116 if from_time
is not None:
1117 KDC_REQ_BODY_obj
['from'] = from_time
1118 if renew_time
is not None:
1119 KDC_REQ_BODY_obj
['rtime'] = renew_time
1120 if addresses
is not None:
1121 KDC_REQ_BODY_obj
['addresses'] = addresses
1122 if enc_ad
is not None:
1123 KDC_REQ_BODY_obj
['enc-authorization-data'] = enc_ad
1124 if additional_tickets
is not None:
1125 KDC_REQ_BODY_obj
['additional-tickets'] = additional_tickets
1126 return KDC_REQ_BODY_obj
1128 def KDC_REQ_create(self
,
1135 # KDC-REQ ::= SEQUENCE {
1136 # -- NOTE: first tag is [1], not [0]
1137 # pvno [1] INTEGER (5) ,
1138 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1139 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1140 # -- NOTE: not empty --,
1141 # req-body [4] KDC-REQ-BODY
1146 'msg-type': msg_type
,
1147 'req-body': req_body
,
1149 if padata
is not None:
1150 KDC_REQ_obj
['padata'] = padata
1151 if asn1Spec
is not None:
1152 KDC_REQ_decoded
= pyasn1_native_decode(
1153 KDC_REQ_obj
, asn1Spec
=asn1Spec
)
1155 KDC_REQ_decoded
= None
1156 return KDC_REQ_obj
, KDC_REQ_decoded
1158 def AS_REQ_create(self
,
1160 kdc_options
, # required
1164 from_time
, # optional
1165 till_time
, # required
1166 renew_time
, # optional
1169 addresses
, # optional
1171 native_decoded_only
=True,
1174 # KDC-REQ ::= SEQUENCE {
1175 # -- NOTE: first tag is [1], not [0]
1176 # pvno [1] INTEGER (5) ,
1177 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1178 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1179 # -- NOTE: not empty --,
1180 # req-body [4] KDC-REQ-BODY
1183 # KDC-REQ-BODY ::= SEQUENCE {
1184 # kdc-options [0] KDCOptions,
1185 # cname [1] PrincipalName OPTIONAL
1186 # -- Used only in AS-REQ --,
1189 # -- Also client's in AS-REQ --,
1190 # sname [3] PrincipalName OPTIONAL,
1191 # from [4] KerberosTime OPTIONAL,
1192 # till [5] KerberosTime,
1193 # rtime [6] KerberosTime OPTIONAL,
1195 # etype [8] SEQUENCE OF Int32
1197 # -- in preference order --,
1198 # addresses [9] HostAddresses OPTIONAL,
1199 # enc-authorization-data [10] EncryptedData OPTIONAL
1200 # -- AuthorizationData --,
1201 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1202 # -- NOTE: not empty
1204 KDC_REQ_BODY_obj
= self
.KDC_REQ_BODY_create(
1216 EncAuthorizationData
=None,
1217 EncAuthorizationData_key
=None,
1218 EncAuthorizationData_usage
=None,
1219 asn1_print
=asn1_print
,
1221 obj
, decoded
= self
.KDC_REQ_create(
1222 msg_type
=KRB_AS_REQ
,
1224 req_body
=KDC_REQ_BODY_obj
,
1225 asn1Spec
=krb5_asn1
.AS_REQ(),
1226 asn1_print
=asn1_print
,
1228 if native_decoded_only
:
1232 def AP_REQ_create(self
, ap_options
, ticket
, authenticator
):
1233 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1234 # pvno [0] INTEGER (5),
1235 # msg-type [1] INTEGER (14),
1236 # ap-options [2] APOptions,
1237 # ticket [3] Ticket,
1238 # authenticator [4] EncryptedData -- Authenticator
1242 'msg-type': KRB_AP_REQ
,
1243 'ap-options': ap_options
,
1245 'authenticator': authenticator
,
1249 def Authenticator_create(
1250 self
, crealm
, cname
, cksum
, cusec
, ctime
, subkey
, seq_number
,
1251 authorization_data
):
1252 # -- Unencrypted authenticator
1253 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1254 # authenticator-vno [0] INTEGER (5),
1256 # cname [2] PrincipalName,
1257 # cksum [3] Checksum OPTIONAL,
1258 # cusec [4] Microseconds,
1259 # ctime [5] KerberosTime,
1260 # subkey [6] EncryptionKey OPTIONAL,
1261 # seq-number [7] UInt32 OPTIONAL,
1262 # authorization-data [8] AuthorizationData OPTIONAL
1264 Authenticator_obj
= {
1265 'authenticator-vno': 5,
1271 if cksum
is not None:
1272 Authenticator_obj
['cksum'] = cksum
1273 if subkey
is not None:
1274 Authenticator_obj
['subkey'] = subkey
1275 if seq_number
is not None:
1276 Authenticator_obj
['seq-number'] = seq_number
1277 if authorization_data
is not None:
1278 Authenticator_obj
['authorization-data'] = authorization_data
1279 return Authenticator_obj
1281 def TGS_REQ_create(self
,
1286 kdc_options
, # required
1290 from_time
, # optional
1291 till_time
, # required
1292 renew_time
, # optional
1295 addresses
, # optional
1296 EncAuthorizationData
,
1297 EncAuthorizationData_key
,
1300 authenticator_subkey
=None,
1301 body_checksum_type
=None,
1302 native_decoded_only
=True,
1305 # KDC-REQ ::= SEQUENCE {
1306 # -- NOTE: first tag is [1], not [0]
1307 # pvno [1] INTEGER (5) ,
1308 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1309 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1310 # -- NOTE: not empty --,
1311 # req-body [4] KDC-REQ-BODY
1314 # KDC-REQ-BODY ::= SEQUENCE {
1315 # kdc-options [0] KDCOptions,
1316 # cname [1] PrincipalName OPTIONAL
1317 # -- Used only in AS-REQ --,
1320 # -- Also client's in AS-REQ --,
1321 # sname [3] PrincipalName OPTIONAL,
1322 # from [4] KerberosTime OPTIONAL,
1323 # till [5] KerberosTime,
1324 # rtime [6] KerberosTime OPTIONAL,
1326 # etype [8] SEQUENCE OF Int32
1328 # -- in preference order --,
1329 # addresses [9] HostAddresses OPTIONAL,
1330 # enc-authorization-data [10] EncryptedData OPTIONAL
1331 # -- AuthorizationData --,
1332 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1333 # -- NOTE: not empty
1336 if authenticator_subkey
is not None:
1337 EncAuthorizationData_usage
= KU_TGS_REQ_AUTH_DAT_SUBKEY
1339 EncAuthorizationData_usage
= KU_TGS_REQ_AUTH_DAT_SESSION
1341 req_body
= self
.KDC_REQ_BODY_create(
1342 kdc_options
=kdc_options
,
1346 from_time
=from_time
,
1347 till_time
=till_time
,
1348 renew_time
=renew_time
,
1351 addresses
=addresses
,
1352 additional_tickets
=additional_tickets
,
1353 EncAuthorizationData
=EncAuthorizationData
,
1354 EncAuthorizationData_key
=EncAuthorizationData_key
,
1355 EncAuthorizationData_usage
=EncAuthorizationData_usage
)
1356 req_body_blob
= self
.der_encode(req_body
,
1357 asn1Spec
=krb5_asn1
.KDC_REQ_BODY(),
1358 asn1_print
=asn1_print
, hexdump
=hexdump
)
1360 req_body_checksum
= self
.Checksum_create(ticket_session_key
,
1361 KU_TGS_REQ_AUTH_CKSUM
,
1363 ctype
=body_checksum_type
)
1366 if authenticator_subkey
is not None:
1367 subkey_obj
= authenticator_subkey
.export_obj()
1368 seq_number
= random
.randint(0, 0xfffffffe)
1369 authenticator
= self
.Authenticator_create(
1372 cksum
=req_body_checksum
,
1376 seq_number
=seq_number
,
1377 authorization_data
=None)
1378 authenticator
= self
.der_encode(
1380 asn1Spec
=krb5_asn1
.Authenticator(),
1381 asn1_print
=asn1_print
,
1384 authenticator
= self
.EncryptedData_create(
1385 ticket_session_key
, KU_TGS_REQ_AUTH
, authenticator
)
1387 ap_options
= krb5_asn1
.APOptions('0')
1388 ap_req
= self
.AP_REQ_create(ap_options
=str(ap_options
),
1390 authenticator
=authenticator
)
1391 ap_req
= self
.der_encode(ap_req
, asn1Spec
=krb5_asn1
.AP_REQ(),
1392 asn1_print
=asn1_print
, hexdump
=hexdump
)
1393 pa_tgs_req
= self
.PA_DATA_create(PADATA_KDC_REQ
, ap_req
)
1394 if padata
is not None:
1395 padata
.append(pa_tgs_req
)
1397 padata
= [pa_tgs_req
]
1399 obj
, decoded
= self
.KDC_REQ_create(
1400 msg_type
=KRB_TGS_REQ
,
1403 asn1Spec
=krb5_asn1
.TGS_REQ(),
1404 asn1_print
=asn1_print
,
1406 if native_decoded_only
:
1410 def PA_S4U2Self_create(self
, name
, realm
, tgt_session_key
, ctype
=None):
1411 # PA-S4U2Self ::= SEQUENCE {
1412 # name [0] PrincipalName,
1414 # cksum [2] Checksum,
1415 # auth [3] GeneralString
1417 cksum_data
= name
['name-type'].to_bytes(4, byteorder
='little')
1418 for n
in name
['name-string']:
1419 cksum_data
+= n
.encode()
1420 cksum_data
+= realm
.encode()
1421 cksum_data
+= "Kerberos".encode()
1422 cksum
= self
.Checksum_create(tgt_session_key
,
1423 KU_NON_KERB_CKSUM_SALT
,
1433 pa_s4u2self
= self
.der_encode(
1434 PA_S4U2Self_obj
, asn1Spec
=krb5_asn1
.PA_S4U2Self())
1435 return self
.PA_DATA_create(PADATA_FOR_USER
, pa_s4u2self
)
1437 def _generic_kdc_exchange(self
,
1438 kdc_exchange_dict
, # required
1439 cname
=None, # optional
1440 realm
=None, # required
1441 sname
=None, # optional
1442 from_time
=None, # optional
1443 till_time
=None, # required
1444 renew_time
=None, # optional
1445 nonce
=None, # required
1446 etypes
=None, # required
1447 addresses
=None, # optional
1448 additional_tickets
=None, # optional
1449 EncAuthorizationData
=None, # optional
1450 EncAuthorizationData_key
=None, # optional
1451 EncAuthorizationData_usage
=None): # optional
1453 check_error_fn
= kdc_exchange_dict
['check_error_fn']
1454 check_rep_fn
= kdc_exchange_dict
['check_rep_fn']
1455 generate_padata_fn
= kdc_exchange_dict
['generate_padata_fn']
1456 callback_dict
= kdc_exchange_dict
['callback_dict']
1457 req_msg_type
= kdc_exchange_dict
['req_msg_type']
1458 req_asn1Spec
= kdc_exchange_dict
['req_asn1Spec']
1459 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
1461 expected_error_mode
= kdc_exchange_dict
['expected_error_mode']
1462 kdc_options
= kdc_exchange_dict
['kdc_options']
1464 if till_time
is None:
1465 till_time
= self
.get_KerberosTime(offset
=36000)
1467 nonce
= self
.get_Nonce()
1469 req_body
= self
.KDC_REQ_BODY_create(
1470 kdc_options
=kdc_options
,
1474 from_time
=from_time
,
1475 till_time
=till_time
,
1476 renew_time
=renew_time
,
1479 addresses
=addresses
,
1480 additional_tickets
=additional_tickets
,
1481 EncAuthorizationData
=EncAuthorizationData
,
1482 EncAuthorizationData_key
=EncAuthorizationData_key
,
1483 EncAuthorizationData_usage
=EncAuthorizationData_usage
)
1484 if generate_padata_fn
is not None:
1485 # This can alter req_body...
1486 padata
, req_body
= generate_padata_fn(kdc_exchange_dict
,
1492 kdc_exchange_dict
['req_padata'] = padata
1493 kdc_exchange_dict
['req_body'] = req_body
1495 req_obj
, req_decoded
= self
.KDC_REQ_create(msg_type
=req_msg_type
,
1498 asn1Spec
=req_asn1Spec())
1500 rep
= self
.send_recv_transaction(req_decoded
)
1501 self
.assertIsNotNone(rep
)
1503 msg_type
= self
.getElementValue(rep
, 'msg-type')
1504 self
.assertIsNotNone(msg_type
)
1506 expected_msg_type
= None
1507 if check_error_fn
is not None:
1508 expected_msg_type
= KRB_ERROR
1509 self
.assertIsNone(check_rep_fn
)
1510 self
.assertNotEqual(0, expected_error_mode
)
1511 if check_rep_fn
is not None:
1512 expected_msg_type
= rep_msg_type
1513 self
.assertIsNone(check_error_fn
)
1514 self
.assertEqual(0, expected_error_mode
)
1515 self
.assertIsNotNone(expected_msg_type
)
1516 self
.assertEqual(msg_type
, expected_msg_type
)
1518 if msg_type
== KRB_ERROR
:
1519 return check_error_fn(kdc_exchange_dict
,
1523 return check_rep_fn(kdc_exchange_dict
, callback_dict
, rep
)
1525 def as_exchange_dict(self
,
1526 expected_crealm
=None,
1527 expected_cname
=None,
1528 expected_srealm
=None,
1529 expected_sname
=None,
1530 ticket_decryption_key
=None,
1531 generate_padata_fn
=None,
1532 check_error_fn
=None,
1534 check_padata_fn
=None,
1535 check_kdc_private_fn
=None,
1537 expected_error_mode
=0,
1538 client_as_etypes
=None,
1541 kdc_exchange_dict
= {
1542 'req_msg_type': KRB_AS_REQ
,
1543 'req_asn1Spec': krb5_asn1
.AS_REQ
,
1544 'rep_msg_type': KRB_AS_REP
,
1545 'rep_asn1Spec': krb5_asn1
.AS_REP
,
1546 'rep_encpart_asn1Spec': krb5_asn1
.EncASRepPart
,
1547 'expected_crealm': expected_crealm
,
1548 'expected_cname': expected_cname
,
1549 'expected_srealm': expected_srealm
,
1550 'expected_sname': expected_sname
,
1551 'ticket_decryption_key': ticket_decryption_key
,
1552 'generate_padata_fn': generate_padata_fn
,
1553 'check_error_fn': check_error_fn
,
1554 'check_rep_fn': check_rep_fn
,
1555 'check_padata_fn': check_padata_fn
,
1556 'check_kdc_private_fn': check_kdc_private_fn
,
1557 'callback_dict': callback_dict
,
1558 'expected_error_mode': expected_error_mode
,
1559 'client_as_etypes': client_as_etypes
,
1560 'expected_salt': expected_salt
,
1561 'kdc_options': kdc_options
,
1563 if callback_dict
is None:
1566 return kdc_exchange_dict
1568 def tgs_exchange_dict(self
,
1569 expected_crealm
=None,
1570 expected_cname
=None,
1571 expected_srealm
=None,
1572 expected_sname
=None,
1573 ticket_decryption_key
=None,
1574 generate_padata_fn
=None,
1575 check_error_fn
=None,
1577 check_padata_fn
=None,
1578 check_kdc_private_fn
=None,
1581 authenticator_subkey
=None,
1582 body_checksum_type
=None,
1584 kdc_exchange_dict
= {
1585 'req_msg_type': KRB_TGS_REQ
,
1586 'req_asn1Spec': krb5_asn1
.TGS_REQ
,
1587 'rep_msg_type': KRB_TGS_REP
,
1588 'rep_asn1Spec': krb5_asn1
.TGS_REP
,
1589 'rep_encpart_asn1Spec': krb5_asn1
.EncTGSRepPart
,
1590 'expected_crealm': expected_crealm
,
1591 'expected_cname': expected_cname
,
1592 'expected_srealm': expected_srealm
,
1593 'expected_sname': expected_sname
,
1594 'ticket_decryption_key': ticket_decryption_key
,
1595 'generate_padata_fn': generate_padata_fn
,
1596 'check_error_fn': check_error_fn
,
1597 'check_rep_fn': check_rep_fn
,
1598 'check_padata_fn': check_padata_fn
,
1599 'check_kdc_private_fn': check_kdc_private_fn
,
1600 'callback_dict': callback_dict
,
1602 'body_checksum_type': body_checksum_type
,
1603 'authenticator_subkey': authenticator_subkey
,
1604 'kdc_options': kdc_options
1606 if callback_dict
is None:
1609 return kdc_exchange_dict
1611 def generic_check_kdc_rep(self
,
1616 expected_crealm
= kdc_exchange_dict
['expected_crealm']
1617 expected_cname
= kdc_exchange_dict
['expected_cname']
1618 expected_srealm
= kdc_exchange_dict
['expected_srealm']
1619 expected_sname
= kdc_exchange_dict
['expected_sname']
1620 ticket_decryption_key
= kdc_exchange_dict
['ticket_decryption_key']
1621 check_padata_fn
= kdc_exchange_dict
['check_padata_fn']
1622 check_kdc_private_fn
= kdc_exchange_dict
['check_kdc_private_fn']
1623 rep_encpart_asn1Spec
= kdc_exchange_dict
['rep_encpart_asn1Spec']
1624 msg_type
= kdc_exchange_dict
['rep_msg_type']
1626 self
.assertElementEqual(rep
, 'msg-type', msg_type
) # AS-REP | TGS-REP
1627 padata
= self
.getElementValue(rep
, 'padata')
1628 self
.assertElementEqualUTF8(rep
, 'crealm', expected_crealm
)
1629 self
.assertElementEqualPrincipal(rep
, 'cname', expected_cname
)
1630 self
.assertElementPresent(rep
, 'ticket')
1631 ticket
= self
.getElementValue(rep
, 'ticket')
1632 ticket_encpart
= None
1633 ticket_cipher
= None
1634 self
.assertIsNotNone(ticket
)
1635 if ticket
is not None: # Never None, but gives indentation
1636 self
.assertElementEqual(ticket
, 'tkt-vno', 5)
1637 self
.assertElementEqualUTF8(ticket
, 'realm', expected_srealm
)
1638 self
.assertElementEqualPrincipal(ticket
, 'sname', expected_sname
)
1639 self
.assertElementPresent(ticket
, 'enc-part')
1640 ticket_encpart
= self
.getElementValue(ticket
, 'enc-part')
1641 self
.assertIsNotNone(ticket_encpart
)
1642 if ticket_encpart
is not None: # Never None, but gives indentation
1643 self
.assertElementPresent(ticket_encpart
, 'etype')
1644 # 'unspecified' means present, with any value != 0
1645 self
.assertElementKVNO(ticket_encpart
, 'kvno',
1646 self
.unspecified_kvno
)
1647 self
.assertElementPresent(ticket_encpart
, 'cipher')
1648 ticket_cipher
= self
.getElementValue(ticket_encpart
, 'cipher')
1649 self
.assertElementPresent(rep
, 'enc-part')
1650 encpart
= self
.getElementValue(rep
, 'enc-part')
1651 encpart_cipher
= None
1652 self
.assertIsNotNone(encpart
)
1653 if encpart
is not None: # Never None, but gives indentation
1654 self
.assertElementPresent(encpart
, 'etype')
1655 self
.assertElementKVNO(ticket_encpart
, 'kvno', 'autodetect')
1656 self
.assertElementPresent(encpart
, 'cipher')
1657 encpart_cipher
= self
.getElementValue(encpart
, 'cipher')
1659 encpart_decryption_key
= None
1660 self
.assertIsNotNone(check_padata_fn
)
1661 if check_padata_fn
is not None:
1662 # See if we can get the decryption key from the preauth phase
1663 encpart_decryption_key
, encpart_decryption_usage
= (
1664 check_padata_fn(kdc_exchange_dict
, callback_dict
,
1667 ticket_private
= None
1668 self
.assertIsNotNone(ticket_decryption_key
)
1669 if ticket_decryption_key
is not None:
1670 self
.assertElementEqual(ticket_encpart
, 'etype',
1671 ticket_decryption_key
.etype
)
1672 self
.assertElementKVNO(ticket_encpart
, 'kvno',
1673 ticket_decryption_key
.kvno
)
1674 ticket_decpart
= ticket_decryption_key
.decrypt(KU_TICKET
,
1676 ticket_private
= self
.der_decode(
1678 asn1Spec
=krb5_asn1
.EncTicketPart())
1680 encpart_private
= None
1681 self
.assertIsNotNone(encpart_decryption_key
)
1682 if encpart_decryption_key
is not None:
1683 self
.assertElementEqual(encpart
, 'etype',
1684 encpart_decryption_key
.etype
)
1685 self
.assertElementKVNO(encpart
, 'kvno',
1686 encpart_decryption_key
.kvno
)
1687 rep_decpart
= encpart_decryption_key
.decrypt(
1688 encpart_decryption_usage
,
1690 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
1691 # application tag 26
1693 encpart_private
= self
.der_decode(
1695 asn1Spec
=rep_encpart_asn1Spec())
1697 encpart_private
= self
.der_decode(
1699 asn1Spec
=krb5_asn1
.EncTGSRepPart())
1701 self
.assertIsNotNone(check_kdc_private_fn
)
1702 if check_kdc_private_fn
is not None:
1703 check_kdc_private_fn(kdc_exchange_dict
, callback_dict
,
1704 rep
, ticket_private
, encpart_private
)
1708 def generic_check_kdc_private(self
,
1715 expected_crealm
= kdc_exchange_dict
['expected_crealm']
1716 expected_cname
= kdc_exchange_dict
['expected_cname']
1717 expected_srealm
= kdc_exchange_dict
['expected_srealm']
1718 expected_sname
= kdc_exchange_dict
['expected_sname']
1719 ticket_decryption_key
= kdc_exchange_dict
['ticket_decryption_key']
1721 ticket
= self
.getElementValue(rep
, 'ticket')
1723 ticket_session_key
= None
1724 if ticket_private
is not None:
1725 self
.assertElementPresent(ticket_private
, 'flags')
1726 self
.assertElementPresent(ticket_private
, 'key')
1727 ticket_key
= self
.getElementValue(ticket_private
, 'key')
1728 self
.assertIsNotNone(ticket_key
)
1729 if ticket_key
is not None: # Never None, but gives indentation
1730 self
.assertElementPresent(ticket_key
, 'keytype')
1731 self
.assertElementPresent(ticket_key
, 'keyvalue')
1732 ticket_session_key
= self
.EncryptionKey_import(ticket_key
)
1733 self
.assertElementEqualUTF8(ticket_private
, 'crealm',
1735 self
.assertElementEqualPrincipal(ticket_private
, 'cname',
1737 self
.assertElementPresent(ticket_private
, 'transited')
1738 self
.assertElementPresent(ticket_private
, 'authtime')
1739 if self
.strict_checking
:
1740 self
.assertElementPresent(ticket_private
, 'starttime')
1741 self
.assertElementPresent(ticket_private
, 'endtime')
1742 # TODO self.assertElementPresent(ticket_private, 'renew-till')
1743 # TODO self.assertElementMissing(ticket_private, 'caddr')
1744 self
.assertElementPresent(ticket_private
, 'authorization-data')
1746 encpart_session_key
= None
1747 if encpart_private
is not None:
1748 self
.assertElementPresent(encpart_private
, 'key')
1749 encpart_key
= self
.getElementValue(encpart_private
, 'key')
1750 self
.assertIsNotNone(encpart_key
)
1751 if encpart_key
is not None: # Never None, but gives indentation
1752 self
.assertElementPresent(encpart_key
, 'keytype')
1753 self
.assertElementPresent(encpart_key
, 'keyvalue')
1754 encpart_session_key
= self
.EncryptionKey_import(encpart_key
)
1755 self
.assertElementPresent(encpart_private
, 'last-req')
1756 self
.assertElementPresent(encpart_private
, 'nonce')
1757 # TODO self.assertElementPresent(encpart_private,
1759 self
.assertElementPresent(encpart_private
, 'flags')
1760 self
.assertElementPresent(encpart_private
, 'authtime')
1761 if self
.strict_checking
:
1762 self
.assertElementPresent(encpart_private
, 'starttime')
1763 self
.assertElementPresent(encpart_private
, 'endtime')
1764 # TODO self.assertElementPresent(encpart_private, 'renew-till')
1765 self
.assertElementEqualUTF8(encpart_private
, 'srealm',
1767 self
.assertElementEqualPrincipal(encpart_private
, 'sname',
1769 # TODO self.assertElementMissing(encpart_private, 'caddr')
1771 if ticket_session_key
is not None and encpart_session_key
is not None:
1772 self
.assertEqual(ticket_session_key
.etype
,
1773 encpart_session_key
.etype
)
1774 self
.assertEqual(ticket_session_key
.key
.contents
,
1775 encpart_session_key
.key
.contents
)
1776 if encpart_session_key
is not None:
1777 session_key
= encpart_session_key
1779 session_key
= ticket_session_key
1780 ticket_creds
= KerberosTicketCreds(
1783 crealm
=expected_crealm
,
1784 cname
=expected_cname
,
1785 srealm
=expected_srealm
,
1786 sname
=expected_sname
,
1787 decryption_key
=ticket_decryption_key
,
1788 ticket_private
=ticket_private
,
1789 encpart_private
=encpart_private
)
1791 kdc_exchange_dict
['rep_ticket_creds'] = ticket_creds
1793 def generic_check_as_error(self
,
1798 expected_crealm
= kdc_exchange_dict
['expected_crealm']
1799 expected_cname
= kdc_exchange_dict
['expected_cname']
1800 expected_srealm
= kdc_exchange_dict
['expected_srealm']
1801 expected_sname
= kdc_exchange_dict
['expected_sname']
1802 expected_salt
= kdc_exchange_dict
['expected_salt']
1803 client_as_etypes
= kdc_exchange_dict
['client_as_etypes']
1804 expected_error_mode
= kdc_exchange_dict
['expected_error_mode']
1805 req_body
= kdc_exchange_dict
['req_body']
1806 proposed_etypes
= req_body
['etype']
1808 kdc_exchange_dict
['preauth_etype_info2'] = None
1810 expect_etype_info2
= ()
1811 expect_etype_info
= False
1812 unexpect_etype_info
= True
1813 expected_aes_type
= 0
1814 expected_rc4_type
= 0
1815 if kcrypto
.Enctype
.RC4
in proposed_etypes
:
1816 expect_etype_info
= True
1817 for etype
in proposed_etypes
:
1818 if etype
in (kcrypto
.Enctype
.AES256
, kcrypto
.Enctype
.AES128
):
1819 expect_etype_info
= False
1820 if etype
not in client_as_etypes
:
1822 if etype
in (kcrypto
.Enctype
.AES256
, kcrypto
.Enctype
.AES128
):
1823 if etype
> expected_aes_type
:
1824 expected_aes_type
= etype
1825 if etype
in (kcrypto
.Enctype
.RC4
,):
1826 unexpect_etype_info
= False
1827 if etype
> expected_rc4_type
:
1828 expected_rc4_type
= etype
1830 if expected_aes_type
!= 0:
1831 expect_etype_info2
+= (expected_aes_type
,)
1832 if expected_rc4_type
!= 0:
1833 expect_etype_info2
+= (expected_rc4_type
,)
1835 expected_patypes
= ()
1836 if expect_etype_info
:
1837 self
.assertGreater(len(expect_etype_info2
), 0)
1838 expected_patypes
+= (PADATA_ETYPE_INFO
,)
1839 if len(expect_etype_info2
) != 0:
1840 expected_patypes
+= (PADATA_ETYPE_INFO2
,)
1842 expected_patypes
+= (PADATA_ENC_TIMESTAMP
,)
1843 expected_patypes
+= (PADATA_PK_AS_REQ
,)
1844 expected_patypes
+= (PADATA_PK_AS_REP_19
,)
1846 self
.assertElementEqual(rep
, 'pvno', 5)
1847 self
.assertElementEqual(rep
, 'msg-type', KRB_ERROR
)
1848 self
.assertElementEqual(rep
, 'error-code', expected_error_mode
)
1849 self
.assertElementMissing(rep
, 'ctime')
1850 self
.assertElementMissing(rep
, 'cusec')
1851 self
.assertElementPresent(rep
, 'stime')
1852 self
.assertElementPresent(rep
, 'susec')
1853 # error-code checked above
1854 if self
.strict_checking
:
1855 self
.assertElementMissing(rep
, 'crealm')
1856 self
.assertElementMissing(rep
, 'cname')
1857 self
.assertElementEqualUTF8(rep
, 'realm', expected_srealm
)
1858 self
.assertElementEqualPrincipal(rep
, 'sname', expected_sname
)
1859 if self
.strict_checking
:
1860 self
.assertElementMissing(rep
, 'e-text')
1861 if expected_error_mode
== KDC_ERR_GENERIC
:
1862 self
.assertElementMissing(rep
, 'e-data')
1864 edata
= self
.getElementValue(rep
, 'e-data')
1865 if self
.strict_checking
:
1866 self
.assertIsNotNone(edata
)
1867 if edata
is not None:
1868 rep_padata
= self
.der_decode(edata
,
1869 asn1Spec
=krb5_asn1
.METHOD_DATA())
1870 self
.assertGreater(len(rep_padata
), 0)
1874 if self
.strict_checking
:
1875 for i
, patype
in enumerate(expected_patypes
):
1876 self
.assertElementEqual(rep_padata
[i
], 'padata-type', patype
)
1877 self
.assertEqual(len(rep_padata
), len(expected_patypes
))
1881 enc_timestamp
= None
1884 for pa
in rep_padata
:
1885 patype
= self
.getElementValue(pa
, 'padata-type')
1886 pavalue
= self
.getElementValue(pa
, 'padata-value')
1887 if patype
== PADATA_ETYPE_INFO2
:
1888 self
.assertIsNone(etype_info2
)
1889 etype_info2
= self
.der_decode(pavalue
,
1890 asn1Spec
=krb5_asn1
.ETYPE_INFO2())
1892 if patype
== PADATA_ETYPE_INFO
:
1893 self
.assertIsNone(etype_info
)
1894 etype_info
= self
.der_decode(pavalue
,
1895 asn1Spec
=krb5_asn1
.ETYPE_INFO())
1897 if patype
== PADATA_ENC_TIMESTAMP
:
1898 self
.assertIsNone(enc_timestamp
)
1899 enc_timestamp
= pavalue
1900 self
.assertEqual(len(enc_timestamp
), 0)
1902 if patype
== PADATA_PK_AS_REQ
:
1903 self
.assertIsNone(pk_as_req
)
1905 self
.assertEqual(len(pk_as_req
), 0)
1907 if patype
== PADATA_PK_AS_REP_19
:
1908 self
.assertIsNone(pk_as_rep19
)
1909 pk_as_rep19
= pavalue
1910 self
.assertEqual(len(pk_as_rep19
), 0)
1913 if all(etype
not in client_as_etypes
or etype
not in proposed_etypes
1914 for etype
in (kcrypto
.Enctype
.AES256
,
1915 kcrypto
.Enctype
.AES128
,
1916 kcrypto
.Enctype
.RC4
)):
1917 self
.assertIsNone(etype_info2
)
1918 self
.assertIsNone(etype_info
)
1919 if self
.strict_checking
:
1920 self
.assertIsNotNone(enc_timestamp
)
1921 self
.assertIsNotNone(pk_as_req
)
1922 self
.assertIsNotNone(pk_as_rep19
)
1925 self
.assertIsNotNone(etype_info2
)
1926 if expect_etype_info
:
1927 self
.assertIsNotNone(etype_info
)
1929 if self
.strict_checking
:
1930 self
.assertIsNone(etype_info
)
1931 if unexpect_etype_info
:
1932 self
.assertIsNone(etype_info
)
1934 self
.assertGreaterEqual(len(etype_info2
), 1)
1935 self
.assertLessEqual(len(etype_info2
), len(expect_etype_info2
))
1936 if self
.strict_checking
:
1937 self
.assertEqual(len(etype_info2
), len(expect_etype_info2
))
1938 for i
in range(0, len(etype_info2
)):
1939 e
= self
.getElementValue(etype_info2
[i
], 'etype')
1940 self
.assertEqual(e
, expect_etype_info2
[i
])
1941 salt
= self
.getElementValue(etype_info2
[i
], 'salt')
1942 if e
== kcrypto
.Enctype
.RC4
:
1943 self
.assertIsNone(salt
)
1945 self
.assertIsNotNone(salt
)
1946 if expected_salt
is not None:
1947 self
.assertEqual(salt
, expected_salt
)
1948 s2kparams
= self
.getElementValue(etype_info2
[i
], 's2kparams')
1949 if self
.strict_checking
:
1950 self
.assertIsNone(s2kparams
)
1951 if etype_info
is not None:
1952 self
.assertEqual(len(etype_info
), 1)
1953 e
= self
.getElementValue(etype_info
[0], 'etype')
1954 self
.assertEqual(e
, kcrypto
.Enctype
.RC4
)
1955 self
.assertEqual(e
, expect_etype_info2
[0])
1956 salt
= self
.getElementValue(etype_info
[0], 'salt')
1957 if self
.strict_checking
:
1958 self
.assertIsNotNone(salt
)
1959 self
.assertEqual(len(salt
), 0)
1961 self
.assertIsNotNone(enc_timestamp
)
1962 self
.assertIsNotNone(pk_as_req
)
1963 self
.assertIsNotNone(pk_as_rep19
)
1965 kdc_exchange_dict
['preauth_etype_info2'] = etype_info2
1968 def generate_simple_tgs_padata(self
,
1972 tgt
= kdc_exchange_dict
['tgt']
1973 authenticator_subkey
= kdc_exchange_dict
['authenticator_subkey']
1974 body_checksum_type
= kdc_exchange_dict
['body_checksum_type']
1976 req_body_blob
= self
.der_encode(req_body
,
1977 asn1Spec
=krb5_asn1
.KDC_REQ_BODY())
1979 req_body_checksum
= self
.Checksum_create(tgt
.session_key
,
1980 KU_TGS_REQ_AUTH_CKSUM
,
1982 ctype
=body_checksum_type
)
1985 if authenticator_subkey
is not None:
1986 subkey_obj
= authenticator_subkey
.export_obj()
1987 seq_number
= random
.randint(0, 0xfffffffe)
1988 (ctime
, cusec
) = self
.get_KerberosTimeWithUsec()
1989 authenticator_obj
= self
.Authenticator_create(
1992 cksum
=req_body_checksum
,
1996 seq_number
=seq_number
,
1997 authorization_data
=None)
1998 authenticator_blob
= self
.der_encode(
2000 asn1Spec
=krb5_asn1
.Authenticator())
2002 authenticator
= self
.EncryptedData_create(tgt
.session_key
,
2006 ap_options
= krb5_asn1
.APOptions('0')
2007 ap_req_obj
= self
.AP_REQ_create(ap_options
=str(ap_options
),
2009 authenticator
=authenticator
)
2010 ap_req
= self
.der_encode(ap_req_obj
, asn1Spec
=krb5_asn1
.AP_REQ())
2011 pa_tgs_req
= self
.PA_DATA_create(PADATA_KDC_REQ
, ap_req
)
2012 padata
= [pa_tgs_req
]
2014 return padata
, req_body
2016 def check_simple_tgs_padata(self
,
2021 tgt
= kdc_exchange_dict
['tgt']
2022 authenticator_subkey
= kdc_exchange_dict
['authenticator_subkey']
2023 if authenticator_subkey
is not None:
2024 subkey
= authenticator_subkey
2025 subkey_usage
= KU_TGS_REP_ENC_PART_SUB_KEY
2027 subkey
= tgt
.session_key
2028 subkey_usage
= KU_TGS_REP_ENC_PART_SESSION
2030 return subkey
, subkey_usage
2032 def _test_as_exchange(self
,
2038 expected_error_mode
,
2048 ticket_decryption_key
=None):
2050 def _generate_padata_copy(_kdc_exchange_dict
,
2053 return padata
, req_body
2055 def _check_padata_preauth_key(_kdc_exchange_dict
,
2059 as_rep_usage
= KU_AS_REP_ENC_PART
2060 return preauth_key
, as_rep_usage
2062 if expected_error_mode
== 0:
2063 check_error_fn
= None
2064 check_rep_fn
= self
.generic_check_kdc_rep
2066 check_error_fn
= self
.generic_check_as_error
2069 kdc_exchange_dict
= self
.as_exchange_dict(
2070 expected_crealm
=expected_crealm
,
2071 expected_cname
=expected_cname
,
2072 expected_srealm
=expected_srealm
,
2073 expected_sname
=expected_sname
,
2074 ticket_decryption_key
=ticket_decryption_key
,
2075 generate_padata_fn
=_generate_padata_copy
,
2076 check_error_fn
=check_error_fn
,
2077 check_rep_fn
=check_rep_fn
,
2078 check_padata_fn
=_check_padata_preauth_key
,
2079 check_kdc_private_fn
=self
.generic_check_kdc_private
,
2080 expected_error_mode
=expected_error_mode
,
2081 client_as_etypes
=client_as_etypes
,
2082 expected_salt
=expected_salt
,
2083 kdc_options
=str(kdc_options
))
2085 rep
= self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
2092 return rep
, kdc_exchange_dict