tests/krb5: Don't expect RC4 in ETYPE-INFO2 for a non-error reply
[Samba.git] / python / samba / tests / krb5 / raw_testcase.py
blob7a66b74adfed0623040988433ab2e03f95c14641
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import socket
21 import struct
22 import time
23 import datetime
24 import random
25 import binascii
26 import itertools
27 import collections
29 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
30 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
31 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
32 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
34 from pyasn1.codec.ber.encoder import BitStringEncoder
36 from samba.credentials import Credentials
37 from samba.dcerpc import security
38 from samba.gensec import FEATURE_SEAL
40 import samba.tests
41 from samba.tests import TestCaseInTempDir
43 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
44 from samba.tests.krb5.rfc4120_constants import (
45 FX_FAST_ARMOR_AP_REQUEST,
46 KDC_ERR_GENERIC,
47 KRB_AP_REQ,
48 KRB_AS_REP,
49 KRB_AS_REQ,
50 KRB_ERROR,
51 KRB_TGS_REP,
52 KRB_TGS_REQ,
53 KU_AP_REQ_AUTH,
54 KU_AS_REP_ENC_PART,
55 KU_FAST_ENC,
56 KU_FAST_FINISHED,
57 KU_FAST_REP,
58 KU_FAST_REQ_CHKSUM,
59 KU_NON_KERB_CKSUM_SALT,
60 KU_TGS_REP_ENC_PART_SESSION,
61 KU_TGS_REP_ENC_PART_SUB_KEY,
62 KU_TGS_REQ_AUTH,
63 KU_TGS_REQ_AUTH_CKSUM,
64 KU_TGS_REQ_AUTH_DAT_SESSION,
65 KU_TGS_REQ_AUTH_DAT_SUBKEY,
66 KU_TICKET,
67 PADATA_ENC_TIMESTAMP,
68 PADATA_ETYPE_INFO,
69 PADATA_ETYPE_INFO2,
70 PADATA_FOR_USER,
71 PADATA_FX_FAST,
72 PADATA_KDC_REQ,
73 PADATA_PAC_OPTIONS,
74 PADATA_PAC_REQUEST,
75 PADATA_PK_AS_REQ,
76 PADATA_PK_AS_REP_19,
77 PADATA_SUPPORTED_ETYPES
79 import samba.tests.krb5.kcrypto as kcrypto
82 def BitStringEncoder_encodeValue32(
83 self, value, asn1Spec, encodeFun, **options):
85 # BitStrings like KDCOptions or TicketFlags should at least
86 # be 32-Bit on the wire
88 if asn1Spec is not None:
89 # TODO: try to avoid ASN.1 schema instantiation
90 value = asn1Spec.clone(value)
92 valueLength = len(value)
93 if valueLength % 8:
94 alignedValue = value << (8 - valueLength % 8)
95 else:
96 alignedValue = value
98 substrate = alignedValue.asOctets()
99 length = len(substrate)
100 # We need at least 32-Bit / 4-Bytes
101 if length < 4:
102 padding = 4 - length
103 else:
104 padding = 0
105 ret = b'\x00' + substrate + (b'\x00' * padding)
106 return ret, False, True
109 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
112 def BitString_NamedValues_prettyPrint(self, scope=0):
113 ret = "%s" % self.asBinary()
114 bits = []
115 highest_bit = 32
116 for byte in self.asNumbers():
117 for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
118 mask = 1 << bit
119 if byte & mask:
120 val = 1
121 else:
122 val = 0
123 bits.append(val)
124 if len(bits) < highest_bit:
125 for bitPosition in range(len(bits), highest_bit):
126 bits.append(0)
127 indent = " " * scope
128 delim = ": (\n%s " % indent
129 for bitPosition in range(highest_bit):
130 if bitPosition in self.prettyPrintNamedValues:
131 name = self.prettyPrintNamedValues[bitPosition]
132 elif bits[bitPosition] != 0:
133 name = "unknown-bit-%u" % bitPosition
134 else:
135 continue
136 ret += "%s%s:%u" % (delim, name, bits[bitPosition])
137 delim = ",\n%s " % indent
138 ret += "\n%s)" % indent
139 return ret
142 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
143 krb5_asn1.TicketFlagsValues.namedValues
144 krb5_asn1.TicketFlags.namedValues =\
145 krb5_asn1.TicketFlagsValues.namedValues
146 krb5_asn1.TicketFlags.prettyPrint =\
147 BitString_NamedValues_prettyPrint
148 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
149 krb5_asn1.KDCOptionsValues.namedValues
150 krb5_asn1.KDCOptions.namedValues =\
151 krb5_asn1.KDCOptionsValues.namedValues
152 krb5_asn1.KDCOptions.prettyPrint =\
153 BitString_NamedValues_prettyPrint
154 krb5_asn1.APOptions.prettyPrintNamedValues =\
155 krb5_asn1.APOptionsValues.namedValues
156 krb5_asn1.APOptions.namedValues =\
157 krb5_asn1.APOptionsValues.namedValues
158 krb5_asn1.APOptions.prettyPrint =\
159 BitString_NamedValues_prettyPrint
160 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
161 krb5_asn1.PACOptionFlagsValues.namedValues
162 krb5_asn1.PACOptionFlags.namedValues =\
163 krb5_asn1.PACOptionFlagsValues.namedValues
164 krb5_asn1.PACOptionFlags.prettyPrint =\
165 BitString_NamedValues_prettyPrint
168 def Integer_NamedValues_prettyPrint(self, scope=0):
169 intval = int(self)
170 if intval in self.prettyPrintNamedValues:
171 name = self.prettyPrintNamedValues[intval]
172 else:
173 name = "<__unknown__>"
174 ret = "%d (0x%x) %s" % (intval, intval, name)
175 return ret
178 krb5_asn1.NameType.prettyPrintNamedValues =\
179 krb5_asn1.NameTypeValues.namedValues
180 krb5_asn1.NameType.prettyPrint =\
181 Integer_NamedValues_prettyPrint
182 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
183 krb5_asn1.AuthDataTypeValues.namedValues
184 krb5_asn1.AuthDataType.prettyPrint =\
185 Integer_NamedValues_prettyPrint
186 krb5_asn1.PADataType.prettyPrintNamedValues =\
187 krb5_asn1.PADataTypeValues.namedValues
188 krb5_asn1.PADataType.prettyPrint =\
189 Integer_NamedValues_prettyPrint
190 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
191 krb5_asn1.EncryptionTypeValues.namedValues
192 krb5_asn1.EncryptionType.prettyPrint =\
193 Integer_NamedValues_prettyPrint
194 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
195 krb5_asn1.ChecksumTypeValues.namedValues
196 krb5_asn1.ChecksumType.prettyPrint =\
197 Integer_NamedValues_prettyPrint
198 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
199 krb5_asn1.KerbErrorDataTypeValues.namedValues
200 krb5_asn1.KerbErrorDataType.prettyPrint =\
201 Integer_NamedValues_prettyPrint
204 class Krb5EncryptionKey:
205 def __init__(self, key, kvno):
206 EncTypeChecksum = {
207 kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
208 kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
209 kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
211 self.key = key
212 self.etype = key.enctype
213 self.ctype = EncTypeChecksum[self.etype]
214 self.kvno = kvno
216 def encrypt(self, usage, plaintext):
217 ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
218 return ciphertext
220 def decrypt(self, usage, ciphertext):
221 plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
222 return plaintext
224 def make_checksum(self, usage, plaintext, ctype=None):
225 if ctype is None:
226 ctype = self.ctype
227 cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
228 return cksum
230 def export_obj(self):
231 EncryptionKey_obj = {
232 'keytype': self.etype,
233 'keyvalue': self.key.contents,
235 return EncryptionKey_obj
238 class KerberosCredentials(Credentials):
239 def __init__(self):
240 super(KerberosCredentials, self).__init__()
241 all_enc_types = 0
242 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
243 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
244 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
246 self.as_supported_enctypes = all_enc_types
247 self.tgs_supported_enctypes = all_enc_types
248 self.ap_supported_enctypes = all_enc_types
250 self.kvno = None
251 self.forced_keys = {}
253 self.forced_salt = None
255 def set_as_supported_enctypes(self, value):
256 self.as_supported_enctypes = int(value)
258 def set_tgs_supported_enctypes(self, value):
259 self.tgs_supported_enctypes = int(value)
261 def set_ap_supported_enctypes(self, value):
262 self.ap_supported_enctypes = int(value)
264 def _get_krb5_etypes(self, supported_enctypes):
265 etypes = ()
267 if supported_enctypes & security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96:
268 etypes += (kcrypto.Enctype.AES256,)
269 if supported_enctypes & security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96:
270 etypes += (kcrypto.Enctype.AES128,)
271 if supported_enctypes & security.KERB_ENCTYPE_RC4_HMAC_MD5:
272 etypes += (kcrypto.Enctype.RC4,)
274 return etypes
276 def get_as_krb5_etypes(self):
277 return self._get_krb5_etypes(self.as_supported_enctypes)
279 def get_tgs_krb5_etypes(self):
280 return self._get_krb5_etypes(self.tgs_supported_enctypes)
282 def get_ap_krb5_etypes(self):
283 return self._get_krb5_etypes(self.ap_supported_enctypes)
285 def set_kvno(self, kvno):
286 self.kvno = kvno
288 def get_kvno(self):
289 return self.kvno
291 def set_forced_key(self, etype, hexkey):
292 etype = int(etype)
293 contents = binascii.a2b_hex(hexkey)
294 key = kcrypto.Key(etype, contents)
295 self.forced_keys[etype] = Krb5EncryptionKey(key, self.kvno)
297 def get_forced_key(self, etype):
298 etype = int(etype)
299 return self.forced_keys.get(etype, None)
301 def set_forced_salt(self, salt):
302 self.forced_salt = bytes(salt)
304 def get_forced_salt(self):
305 return self.forced_salt
307 def get_salt(self):
308 if self.forced_salt is not None:
309 return self.forced_salt
311 if self.get_workstation():
312 salt_string = '%shost%s.%s' % (
313 self.get_realm().upper(),
314 self.get_username().lower().rsplit('$', 1)[0],
315 self.get_realm().lower())
316 else:
317 salt_string = self.get_realm().upper() + self.get_username()
319 return salt_string.encode('utf-8')
322 class KerberosTicketCreds:
323 def __init__(self, ticket, session_key,
324 crealm=None, cname=None,
325 srealm=None, sname=None,
326 decryption_key=None,
327 ticket_private=None,
328 encpart_private=None):
329 self.ticket = ticket
330 self.session_key = session_key
331 self.crealm = crealm
332 self.cname = cname
333 self.srealm = srealm
334 self.sname = sname
335 self.decryption_key = decryption_key
336 self.ticket_private = ticket_private
337 self.encpart_private = encpart_private
340 class RawKerberosTest(TestCaseInTempDir):
341 """A raw Kerberos Test case."""
343 etypes_to_test = (
344 {"value": -1111, "name": "dummy", },
345 {"value": kcrypto.Enctype.AES256, "name": "aes128", },
346 {"value": kcrypto.Enctype.AES128, "name": "aes256", },
347 {"value": kcrypto.Enctype.RC4, "name": "rc4", },
350 setup_etype_test_permutations_done = False
352 @classmethod
353 def setup_etype_test_permutations(cls):
354 if cls.setup_etype_test_permutations_done:
355 return
357 res = []
359 num_idxs = len(cls.etypes_to_test)
360 permutations = []
361 for num in range(1, num_idxs + 1):
362 chunk = list(itertools.permutations(range(num_idxs), num))
363 for e in chunk:
364 el = list(e)
365 permutations.append(el)
367 for p in permutations:
368 name = None
369 etypes = ()
370 for idx in p:
371 n = cls.etypes_to_test[idx]["name"]
372 if name is None:
373 name = n
374 else:
375 name += "_%s" % n
376 etypes += (cls.etypes_to_test[idx]["value"],)
378 r = {"name": name, "etypes": etypes, }
379 res.append(r)
381 cls.etype_test_permutations = res
382 cls.setup_etype_test_permutations_done = True
384 @classmethod
385 def etype_test_permutation_name_idx(cls):
386 cls.setup_etype_test_permutations()
387 res = []
388 idx = 0
389 for e in cls.etype_test_permutations:
390 r = (e['name'], idx)
391 idx += 1
392 res.append(r)
393 return res
395 def etype_test_permutation_by_idx(self, idx):
396 e = self.etype_test_permutations[idx]
397 return (e['name'], e['etypes'])
399 @classmethod
400 def setUpClass(cls):
401 super().setUpClass()
403 cls.host = samba.tests.env_get_var_value('SERVER')
405 # A dictionary containing credentials that have already been
406 # obtained.
407 cls.creds_dict = {}
409 def setUp(self):
410 super().setUp()
411 self.do_asn1_print = False
412 self.do_hexdump = False
414 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
415 allow_missing=True)
416 if strict_checking is None:
417 strict_checking = '1'
418 self.strict_checking = bool(int(strict_checking))
420 self.s = None
422 self.unspecified_kvno = object()
424 def tearDown(self):
425 self._disconnect("tearDown")
426 super().tearDown()
428 def _disconnect(self, reason):
429 if self.s is None:
430 return
431 self.s.close()
432 self.s = None
433 if self.do_hexdump:
434 sys.stderr.write("disconnect[%s]\n" % reason)
436 def _connect_tcp(self):
437 tcp_port = 88
438 try:
439 self.a = socket.getaddrinfo(self.host, tcp_port, socket.AF_UNSPEC,
440 socket.SOCK_STREAM, socket.SOL_TCP,
442 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
443 self.s.settimeout(10)
444 self.s.connect(self.a[0][4])
445 except socket.error:
446 self.s.close()
447 raise
448 except IOError:
449 self.s.close()
450 raise
452 def connect(self):
453 self.assertNotConnected()
454 self._connect_tcp()
455 if self.do_hexdump:
456 sys.stderr.write("connected[%s]\n" % self.host)
458 def env_get_var(self, varname, prefix,
459 fallback_default=True,
460 allow_missing=False):
461 val = None
462 if prefix is not None:
463 allow_missing_prefix = allow_missing or fallback_default
464 val = samba.tests.env_get_var_value(
465 '%s_%s' % (prefix, varname),
466 allow_missing=allow_missing_prefix)
467 else:
468 fallback_default = True
469 if val is None and fallback_default:
470 val = samba.tests.env_get_var_value(varname,
471 allow_missing=allow_missing)
472 return val
474 def _get_krb5_creds_from_env(self, prefix,
475 default_username=None,
476 allow_missing_password=False,
477 allow_missing_keys=True,
478 require_strongest_key=False):
479 c = KerberosCredentials()
480 c.guess()
482 domain = self.env_get_var('DOMAIN', prefix)
483 realm = self.env_get_var('REALM', prefix)
484 allow_missing_username = default_username is not None
485 username = self.env_get_var('USERNAME', prefix,
486 fallback_default=False,
487 allow_missing=allow_missing_username)
488 if username is None:
489 username = default_username
490 password = self.env_get_var('PASSWORD', prefix,
491 fallback_default=False,
492 allow_missing=allow_missing_password)
493 c.set_domain(domain)
494 c.set_realm(realm)
495 c.set_username(username)
496 if password is not None:
497 c.set_password(password)
498 as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
499 prefix, allow_missing=True)
500 if as_supported_enctypes is not None:
501 c.set_as_supported_enctypes(as_supported_enctypes)
502 tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
503 prefix, allow_missing=True)
504 if tgs_supported_enctypes is not None:
505 c.set_tgs_supported_enctypes(tgs_supported_enctypes)
506 ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
507 prefix, allow_missing=True)
508 if ap_supported_enctypes is not None:
509 c.set_ap_supported_enctypes(ap_supported_enctypes)
511 if require_strongest_key:
512 kvno_allow_missing = False
513 if password is None:
514 aes256_allow_missing = False
515 else:
516 aes256_allow_missing = True
517 else:
518 kvno_allow_missing = allow_missing_keys
519 aes256_allow_missing = allow_missing_keys
520 kvno = self.env_get_var('KVNO', prefix,
521 fallback_default=False,
522 allow_missing=kvno_allow_missing)
523 if kvno is not None:
524 c.set_kvno(kvno)
525 aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
526 fallback_default=False,
527 allow_missing=aes256_allow_missing)
528 if aes256_key is not None:
529 c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
530 aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
531 fallback_default=False,
532 allow_missing=True)
533 if aes128_key is not None:
534 c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
535 rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
536 fallback_default=False, allow_missing=True)
537 if rc4_key is not None:
538 c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
540 if not allow_missing_keys:
541 self.assertTrue(c.forced_keys,
542 'Please supply %s encryption keys '
543 'in environment' % prefix)
545 return c
547 def _get_krb5_creds(self,
548 prefix,
549 default_username=None,
550 allow_missing_password=False,
551 allow_missing_keys=True,
552 require_strongest_key=False,
553 fallback_creds_fn=None):
554 if prefix in self.creds_dict:
555 return self.creds_dict[prefix]
557 # We don't have the credentials already
558 creds = None
559 env_err = None
560 try:
561 # Try to obtain them from the environment
562 creds = self._get_krb5_creds_from_env(
563 prefix,
564 default_username=default_username,
565 allow_missing_password=allow_missing_password,
566 allow_missing_keys=allow_missing_keys,
567 require_strongest_key=require_strongest_key)
568 except Exception as err:
569 # An error occurred, so save it for later
570 env_err = err
571 else:
572 self.assertIsNotNone(creds)
573 # Save the obtained credentials
574 self.creds_dict[prefix] = creds
575 return creds
577 if fallback_creds_fn is not None:
578 try:
579 # Try to use the fallback method
580 creds = fallback_creds_fn()
581 except Exception as err:
582 print("ERROR FROM ENV: %r" % (env_err))
583 print("FALLBACK-FN: %s" % (fallback_creds_fn))
584 print("FALLBACK-ERROR: %r" % (err))
585 else:
586 self.assertIsNotNone(creds)
587 # Save the obtained credentials
588 self.creds_dict[prefix] = creds
589 return creds
591 # Both methods failed, so raise the exception from the
592 # environment method
593 raise env_err
595 def get_user_creds(self,
596 allow_missing_password=False,
597 allow_missing_keys=True):
598 c = self._get_krb5_creds(prefix=None,
599 allow_missing_password=allow_missing_password,
600 allow_missing_keys=allow_missing_keys)
601 return c
603 def get_service_creds(self,
604 allow_missing_password=False,
605 allow_missing_keys=True):
606 c = self._get_krb5_creds(prefix='SERVICE',
607 allow_missing_password=allow_missing_password,
608 allow_missing_keys=allow_missing_keys)
609 return c
611 def get_client_creds(self,
612 allow_missing_password=False,
613 allow_missing_keys=True):
614 c = self._get_krb5_creds(prefix='CLIENT',
615 allow_missing_password=allow_missing_password,
616 allow_missing_keys=allow_missing_keys)
617 return c
619 def get_server_creds(self,
620 allow_missing_password=False,
621 allow_missing_keys=True):
622 c = self._get_krb5_creds(prefix='SERVER',
623 allow_missing_password=allow_missing_password,
624 allow_missing_keys=allow_missing_keys)
625 return c
627 def get_admin_creds(self,
628 allow_missing_password=False,
629 allow_missing_keys=True):
630 c = self._get_krb5_creds(prefix='ADMIN',
631 allow_missing_password=allow_missing_password,
632 allow_missing_keys=allow_missing_keys)
633 c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
634 return c
636 def get_krbtgt_creds(self,
637 require_keys=True,
638 require_strongest_key=False):
639 if require_strongest_key:
640 self.assertTrue(require_keys)
641 c = self._get_krb5_creds(prefix='KRBTGT',
642 default_username='krbtgt',
643 allow_missing_password=True,
644 allow_missing_keys=not require_keys,
645 require_strongest_key=require_strongest_key)
646 return c
648 def get_anon_creds(self):
649 c = Credentials()
650 c.set_anonymous()
651 return c
653 def asn1_dump(self, name, obj, asn1_print=None):
654 if asn1_print is None:
655 asn1_print = self.do_asn1_print
656 if asn1_print:
657 if name is not None:
658 sys.stderr.write("%s:\n%s" % (name, obj))
659 else:
660 sys.stderr.write("%s" % (obj))
662 def hex_dump(self, name, blob, hexdump=None):
663 if hexdump is None:
664 hexdump = self.do_hexdump
665 if hexdump:
666 sys.stderr.write(
667 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
669 def der_decode(
670 self,
671 blob,
672 asn1Spec=None,
673 native_encode=True,
674 asn1_print=None,
675 hexdump=None):
676 if asn1Spec is not None:
677 class_name = type(asn1Spec).__name__.split(':')[0]
678 else:
679 class_name = "<None-asn1Spec>"
680 self.hex_dump(class_name, blob, hexdump=hexdump)
681 obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
682 self.asn1_dump(None, obj, asn1_print=asn1_print)
683 if native_encode:
684 obj = pyasn1_native_encode(obj)
685 return obj
687 def der_encode(
688 self,
689 obj,
690 asn1Spec=None,
691 native_decode=True,
692 asn1_print=None,
693 hexdump=None):
694 if native_decode:
695 obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
696 class_name = type(obj).__name__.split(':')[0]
697 if class_name is not None:
698 self.asn1_dump(None, obj, asn1_print=asn1_print)
699 blob = pyasn1_der_encode(obj)
700 if class_name is not None:
701 self.hex_dump(class_name, blob, hexdump=hexdump)
702 return blob
704 def send_pdu(self, req, asn1_print=None, hexdump=None):
705 try:
706 k5_pdu = self.der_encode(
707 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
708 header = struct.pack('>I', len(k5_pdu))
709 req_pdu = header
710 req_pdu += k5_pdu
711 self.hex_dump("send_pdu", header, hexdump=hexdump)
712 self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
713 while True:
714 sent = self.s.send(req_pdu, 0)
715 if sent == len(req_pdu):
716 break
717 req_pdu = req_pdu[sent:]
718 except socket.error as e:
719 self._disconnect("send_pdu: %s" % e)
720 raise
721 except IOError as e:
722 self._disconnect("send_pdu: %s" % e)
723 raise
725 def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
726 rep_pdu = None
727 try:
728 if timeout is not None:
729 self.s.settimeout(timeout)
730 rep_pdu = self.s.recv(num_recv, 0)
731 self.s.settimeout(10)
732 if len(rep_pdu) == 0:
733 self._disconnect("recv_raw: EOF")
734 return None
735 self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
736 except socket.timeout:
737 self.s.settimeout(10)
738 sys.stderr.write("recv_raw: TIMEOUT\n")
739 except socket.error as e:
740 self._disconnect("recv_raw: %s" % e)
741 raise
742 except IOError as e:
743 self._disconnect("recv_raw: %s" % e)
744 raise
745 return rep_pdu
747 def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
748 rep_pdu = None
749 rep = None
750 raw_pdu = self.recv_raw(
751 num_recv=4, hexdump=hexdump, timeout=timeout)
752 if raw_pdu is None:
753 return (None, None)
754 header = struct.unpack(">I", raw_pdu[0:4])
755 k5_len = header[0]
756 if k5_len == 0:
757 return (None, "")
758 missing = k5_len
759 rep_pdu = b''
760 while missing > 0:
761 raw_pdu = self.recv_raw(
762 num_recv=missing, hexdump=hexdump, timeout=timeout)
763 self.assertGreaterEqual(len(raw_pdu), 1)
764 rep_pdu += raw_pdu
765 missing = k5_len - len(rep_pdu)
766 k5_raw = self.der_decode(
767 rep_pdu,
768 asn1Spec=None,
769 native_encode=False,
770 asn1_print=False,
771 hexdump=False)
772 pvno = k5_raw['field-0']
773 self.assertEqual(pvno, 5)
774 msg_type = k5_raw['field-1']
775 self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
776 if msg_type == KRB_AS_REP:
777 asn1Spec = krb5_asn1.AS_REP()
778 elif msg_type == KRB_TGS_REP:
779 asn1Spec = krb5_asn1.TGS_REP()
780 elif msg_type == KRB_ERROR:
781 asn1Spec = krb5_asn1.KRB_ERROR()
782 rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
783 asn1_print=asn1_print, hexdump=False)
784 return (rep, rep_pdu)
786 def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
787 (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
788 hexdump=hexdump,
789 timeout=timeout)
790 return rep
792 def assertIsConnected(self):
793 self.assertIsNotNone(self.s, msg="Not connected")
795 def assertNotConnected(self):
796 self.assertIsNone(self.s, msg="Is connected")
798 def send_recv_transaction(
799 self,
800 req,
801 asn1_print=None,
802 hexdump=None,
803 timeout=None):
804 self.connect()
805 try:
806 self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
807 rep = self.recv_pdu(
808 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
809 except Exception:
810 self._disconnect("transaction failed")
811 raise
812 self._disconnect("transaction done")
813 return rep
815 def assertNoValue(self, value):
816 self.assertTrue(value.isNoValue)
818 def assertHasValue(self, value):
819 self.assertIsNotNone(value)
821 def getElementValue(self, obj, elem):
822 return obj.get(elem, None)
824 def assertElementMissing(self, obj, elem):
825 v = self.getElementValue(obj, elem)
826 self.assertIsNone(v)
828 def assertElementPresent(self, obj, elem):
829 v = self.getElementValue(obj, elem)
830 self.assertIsNotNone(v)
831 if self.strict_checking:
832 if isinstance(v, collections.abc.Container):
833 self.assertNotEqual(0, len(v))
835 def assertElementEqual(self, obj, elem, value):
836 v = self.getElementValue(obj, elem)
837 self.assertIsNotNone(v)
838 self.assertEqual(v, value)
840 def assertElementEqualUTF8(self, obj, elem, value):
841 v = self.getElementValue(obj, elem)
842 self.assertIsNotNone(v)
843 self.assertEqual(v, bytes(value, 'utf8'))
845 def assertPrincipalEqual(self, princ1, princ2):
846 self.assertEqual(princ1['name-type'], princ2['name-type'])
847 self.assertEqual(
848 len(princ1['name-string']),
849 len(princ2['name-string']),
850 msg="princ1=%s != princ2=%s" % (princ1, princ2))
851 for idx in range(len(princ1['name-string'])):
852 self.assertEqual(
853 princ1['name-string'][idx],
854 princ2['name-string'][idx],
855 msg="princ1=%s != princ2=%s" % (princ1, princ2))
857 def assertElementEqualPrincipal(self, obj, elem, value):
858 v = self.getElementValue(obj, elem)
859 self.assertIsNotNone(v)
860 v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
861 self.assertPrincipalEqual(v, value)
863 def assertElementKVNO(self, obj, elem, value):
864 v = self.getElementValue(obj, elem)
865 if value == "autodetect":
866 value = v
867 if value is not None:
868 self.assertIsNotNone(v)
869 # The value on the wire should never be 0
870 self.assertNotEqual(v, 0)
871 # unspecified_kvno means we don't know the kvno,
872 # but want to enforce its presence
873 if value is not self.unspecified_kvno:
874 value = int(value)
875 self.assertNotEqual(value, 0)
876 self.assertEqual(v, value)
877 else:
878 self.assertIsNone(v)
880 def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
881 if epoch is None:
882 epoch = time.time()
883 if offset is not None:
884 epoch = epoch + int(offset)
885 dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
886 return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
888 def get_KerberosTime(self, epoch=None, offset=None):
889 (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
890 return s
892 def get_EpochFromKerberosTime(self, kerberos_time):
893 if isinstance(kerberos_time, bytes):
894 kerberos_time = kerberos_time.decode()
896 epoch = datetime.datetime.strptime(kerberos_time,
897 '%Y%m%d%H%M%SZ')
898 epoch = epoch.replace(tzinfo=datetime.timezone.utc)
899 epoch = int(epoch.timestamp())
901 return epoch
903 def get_Nonce(self):
904 nonce_min = 0x7f000000
905 nonce_max = 0x7fffffff
906 v = random.randint(nonce_min, nonce_max)
907 return v
909 def get_pa_dict(self, pa_data):
910 pa_dict = {}
912 if pa_data is not None:
913 for pa in pa_data:
914 pa_type = pa['padata-type']
915 if pa_type in pa_dict:
916 raise RuntimeError(f'Duplicate type {pa_type}')
917 pa_dict[pa_type] = pa['padata-value']
919 return pa_dict
921 def SessionKey_create(self, etype, contents, kvno=None):
922 key = kcrypto.Key(etype, contents)
923 return Krb5EncryptionKey(key, kvno)
925 def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None):
926 self.assertIsNotNone(pwd)
927 self.assertIsNotNone(salt)
928 key = kcrypto.string_to_key(etype, pwd, salt)
929 return Krb5EncryptionKey(key, kvno)
931 def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
932 e = etype_info2['etype']
934 salt = etype_info2.get('salt', None)
936 if e == kcrypto.Enctype.RC4:
937 nthash = creds.get_nt_hash()
938 return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
940 password = creds.get_password()
941 return self.PasswordKey_create(
942 etype=e, pwd=password, salt=salt, kvno=kvno)
944 def TicketDecryptionKey_from_creds(self, creds, etype=None):
946 if etype is None:
947 etypes = creds.get_tgs_krb5_etypes()
948 etype = etypes[0]
950 forced_key = creds.get_forced_key(etype)
951 if forced_key is not None:
952 return forced_key
954 kvno = creds.get_kvno()
956 fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
957 "nor a password specified, " % (
958 creds.get_username(), etype, kvno))
960 if etype == kcrypto.Enctype.RC4:
961 nthash = creds.get_nt_hash()
962 self.assertIsNotNone(nthash, msg=fail_msg)
963 return self.SessionKey_create(etype=etype,
964 contents=nthash,
965 kvno=kvno)
967 password = creds.get_password()
968 self.assertIsNotNone(password, msg=fail_msg)
969 salt = creds.get_salt()
970 return self.PasswordKey_create(etype=etype,
971 pwd=password,
972 salt=salt,
973 kvno=kvno)
975 def RandomKey(self, etype):
976 e = kcrypto._get_enctype_profile(etype)
977 contents = samba.generate_random_bytes(e.keysize)
978 return self.SessionKey_create(etype=etype, contents=contents)
980 def EncryptionKey_import(self, EncryptionKey_obj):
981 return self.SessionKey_create(EncryptionKey_obj['keytype'],
982 EncryptionKey_obj['keyvalue'])
984 def EncryptedData_create(self, key, usage, plaintext):
985 # EncryptedData ::= SEQUENCE {
986 # etype [0] Int32 -- EncryptionType --,
987 # kvno [1] UInt32 OPTIONAL,
988 # cipher [2] OCTET STRING -- ciphertext
990 ciphertext = key.encrypt(usage, plaintext)
991 EncryptedData_obj = {
992 'etype': key.etype,
993 'cipher': ciphertext
995 if key.kvno is not None:
996 EncryptedData_obj['kvno'] = key.kvno
997 return EncryptedData_obj
999 def Checksum_create(self, key, usage, plaintext, ctype=None):
1000 # Checksum ::= SEQUENCE {
1001 # cksumtype [0] Int32,
1002 # checksum [1] OCTET STRING
1004 if ctype is None:
1005 ctype = key.ctype
1006 checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1007 Checksum_obj = {
1008 'cksumtype': ctype,
1009 'checksum': checksum,
1011 return Checksum_obj
1013 @classmethod
1014 def PrincipalName_create(cls, name_type, names):
1015 # PrincipalName ::= SEQUENCE {
1016 # name-type [0] Int32,
1017 # name-string [1] SEQUENCE OF KerberosString
1019 PrincipalName_obj = {
1020 'name-type': name_type,
1021 'name-string': names,
1023 return PrincipalName_obj
1025 def AuthorizationData_create(self, ad_type, ad_data):
1026 # AuthorizationData ::= SEQUENCE {
1027 # ad-type [0] Int32,
1028 # ad-data [1] OCTET STRING
1030 AUTH_DATA_obj = {
1031 'ad-type': ad_type,
1032 'ad-data': ad_data
1034 return AUTH_DATA_obj
1036 def PA_DATA_create(self, padata_type, padata_value):
1037 # PA-DATA ::= SEQUENCE {
1038 # -- NOTE: first tag is [1], not [0]
1039 # padata-type [1] Int32,
1040 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1042 PA_DATA_obj = {
1043 'padata-type': padata_type,
1044 'padata-value': padata_value,
1046 return PA_DATA_obj
1048 def PA_ENC_TS_ENC_create(self, ts, usec):
1049 # PA-ENC-TS-ENC ::= SEQUENCE {
1050 # patimestamp[0] KerberosTime, -- client's time
1051 # pausec[1] krb5int32 OPTIONAL
1053 PA_ENC_TS_ENC_obj = {
1054 'patimestamp': ts,
1055 'pausec': usec,
1057 return PA_ENC_TS_ENC_obj
1059 def PA_PAC_OPTIONS_create(self, options):
1060 # PA-PAC-OPTIONS ::= SEQUENCE {
1061 # options [0] PACOptionFlags
1063 PA_PAC_OPTIONS_obj = {
1064 'options': options
1066 return PA_PAC_OPTIONS_obj
1068 def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1069 # KrbFastArmor ::= SEQUENCE {
1070 # armor-type [0] Int32,
1071 # armor-value [1] OCTET STRING,
1072 # ...
1074 KRB_FAST_ARMOR_obj = {
1075 'armor-type': armor_type,
1076 'armor-value': armor_value
1078 return KRB_FAST_ARMOR_obj
1080 def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1081 # KrbFastReq ::= SEQUENCE {
1082 # fast-options [0] FastOptions,
1083 # padata [1] SEQUENCE OF PA-DATA,
1084 # req-body [2] KDC-REQ-BODY,
1085 # ...
1087 KRB_FAST_REQ_obj = {
1088 'fast-options': fast_options,
1089 'padata': padata,
1090 'req-body': req_body
1092 return KRB_FAST_REQ_obj
1094 def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1095 # KrbFastArmoredReq ::= SEQUENCE {
1096 # armor [0] KrbFastArmor OPTIONAL,
1097 # req-checksum [1] Checksum,
1098 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1100 KRB_FAST_ARMORED_REQ_obj = {
1101 'req-checksum': req_checksum,
1102 'enc-fast-req': enc_fast_req
1104 if armor is not None:
1105 KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1106 return KRB_FAST_ARMORED_REQ_obj
1108 def PA_FX_FAST_REQUEST_create(self, armored_data):
1109 # PA-FX-FAST-REQUEST ::= CHOICE {
1110 # armored-data [0] KrbFastArmoredReq,
1111 # ...
1113 PA_FX_FAST_REQUEST_obj = {
1114 'armored-data': armored_data
1116 return PA_FX_FAST_REQUEST_obj
1118 def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1119 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1120 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1121 # -- include PAC.
1122 # --If FALSE, and PAC present,
1123 # -- remove PAC.
1125 KERB_PA_PAC_REQUEST_obj = {
1126 'include-pac': include_pac,
1128 if not pa_data_create:
1129 return KERB_PA_PAC_REQUEST_obj
1130 pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1131 asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1132 pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1133 return pa_data
1135 def KDC_REQ_BODY_create(self,
1136 kdc_options,
1137 cname,
1138 realm,
1139 sname,
1140 from_time,
1141 till_time,
1142 renew_time,
1143 nonce,
1144 etypes,
1145 addresses,
1146 additional_tickets,
1147 EncAuthorizationData,
1148 EncAuthorizationData_key,
1149 EncAuthorizationData_usage,
1150 asn1_print=None,
1151 hexdump=None):
1152 # KDC-REQ-BODY ::= SEQUENCE {
1153 # kdc-options [0] KDCOptions,
1154 # cname [1] PrincipalName OPTIONAL
1155 # -- Used only in AS-REQ --,
1156 # realm [2] Realm
1157 # -- Server's realm
1158 # -- Also client's in AS-REQ --,
1159 # sname [3] PrincipalName OPTIONAL,
1160 # from [4] KerberosTime OPTIONAL,
1161 # till [5] KerberosTime,
1162 # rtime [6] KerberosTime OPTIONAL,
1163 # nonce [7] UInt32,
1164 # etype [8] SEQUENCE OF Int32
1165 # -- EncryptionType
1166 # -- in preference order --,
1167 # addresses [9] HostAddresses OPTIONAL,
1168 # enc-authorization-data [10] EncryptedData OPTIONAL
1169 # -- AuthorizationData --,
1170 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1171 # -- NOTE: not empty
1173 if EncAuthorizationData is not None:
1174 enc_ad_plain = self.der_encode(
1175 EncAuthorizationData,
1176 asn1Spec=krb5_asn1.AuthorizationData(),
1177 asn1_print=asn1_print,
1178 hexdump=hexdump)
1179 enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1180 EncAuthorizationData_usage,
1181 enc_ad_plain)
1182 else:
1183 enc_ad = None
1184 KDC_REQ_BODY_obj = {
1185 'kdc-options': kdc_options,
1186 'realm': realm,
1187 'till': till_time,
1188 'nonce': nonce,
1189 'etype': etypes,
1191 if cname is not None:
1192 KDC_REQ_BODY_obj['cname'] = cname
1193 if sname is not None:
1194 KDC_REQ_BODY_obj['sname'] = sname
1195 if from_time is not None:
1196 KDC_REQ_BODY_obj['from'] = from_time
1197 if renew_time is not None:
1198 KDC_REQ_BODY_obj['rtime'] = renew_time
1199 if addresses is not None:
1200 KDC_REQ_BODY_obj['addresses'] = addresses
1201 if enc_ad is not None:
1202 KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1203 if additional_tickets is not None:
1204 KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1205 return KDC_REQ_BODY_obj
1207 def KDC_REQ_create(self,
1208 msg_type,
1209 padata,
1210 req_body,
1211 asn1Spec=None,
1212 asn1_print=None,
1213 hexdump=None):
1214 # KDC-REQ ::= SEQUENCE {
1215 # -- NOTE: first tag is [1], not [0]
1216 # pvno [1] INTEGER (5) ,
1217 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1218 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1219 # -- NOTE: not empty --,
1220 # req-body [4] KDC-REQ-BODY
1223 KDC_REQ_obj = {
1224 'pvno': 5,
1225 'msg-type': msg_type,
1226 'req-body': req_body,
1228 if padata is not None:
1229 KDC_REQ_obj['padata'] = padata
1230 if asn1Spec is not None:
1231 KDC_REQ_decoded = pyasn1_native_decode(
1232 KDC_REQ_obj, asn1Spec=asn1Spec)
1233 else:
1234 KDC_REQ_decoded = None
1235 return KDC_REQ_obj, KDC_REQ_decoded
1237 def AS_REQ_create(self,
1238 padata, # optional
1239 kdc_options, # required
1240 cname, # optional
1241 realm, # required
1242 sname, # optional
1243 from_time, # optional
1244 till_time, # required
1245 renew_time, # optional
1246 nonce, # required
1247 etypes, # required
1248 addresses, # optional
1249 additional_tickets,
1250 native_decoded_only=True,
1251 asn1_print=None,
1252 hexdump=None):
1253 # KDC-REQ ::= SEQUENCE {
1254 # -- NOTE: first tag is [1], not [0]
1255 # pvno [1] INTEGER (5) ,
1256 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1257 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1258 # -- NOTE: not empty --,
1259 # req-body [4] KDC-REQ-BODY
1262 # KDC-REQ-BODY ::= SEQUENCE {
1263 # kdc-options [0] KDCOptions,
1264 # cname [1] PrincipalName OPTIONAL
1265 # -- Used only in AS-REQ --,
1266 # realm [2] Realm
1267 # -- Server's realm
1268 # -- Also client's in AS-REQ --,
1269 # sname [3] PrincipalName OPTIONAL,
1270 # from [4] KerberosTime OPTIONAL,
1271 # till [5] KerberosTime,
1272 # rtime [6] KerberosTime OPTIONAL,
1273 # nonce [7] UInt32,
1274 # etype [8] SEQUENCE OF Int32
1275 # -- EncryptionType
1276 # -- in preference order --,
1277 # addresses [9] HostAddresses OPTIONAL,
1278 # enc-authorization-data [10] EncryptedData OPTIONAL
1279 # -- AuthorizationData --,
1280 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1281 # -- NOTE: not empty
1283 KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1284 kdc_options,
1285 cname,
1286 realm,
1287 sname,
1288 from_time,
1289 till_time,
1290 renew_time,
1291 nonce,
1292 etypes,
1293 addresses,
1294 additional_tickets,
1295 EncAuthorizationData=None,
1296 EncAuthorizationData_key=None,
1297 EncAuthorizationData_usage=None,
1298 asn1_print=asn1_print,
1299 hexdump=hexdump)
1300 obj, decoded = self.KDC_REQ_create(
1301 msg_type=KRB_AS_REQ,
1302 padata=padata,
1303 req_body=KDC_REQ_BODY_obj,
1304 asn1Spec=krb5_asn1.AS_REQ(),
1305 asn1_print=asn1_print,
1306 hexdump=hexdump)
1307 if native_decoded_only:
1308 return decoded
1309 return decoded, obj
1311 def AP_REQ_create(self, ap_options, ticket, authenticator):
1312 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1313 # pvno [0] INTEGER (5),
1314 # msg-type [1] INTEGER (14),
1315 # ap-options [2] APOptions,
1316 # ticket [3] Ticket,
1317 # authenticator [4] EncryptedData -- Authenticator
1319 AP_REQ_obj = {
1320 'pvno': 5,
1321 'msg-type': KRB_AP_REQ,
1322 'ap-options': ap_options,
1323 'ticket': ticket,
1324 'authenticator': authenticator,
1326 return AP_REQ_obj
1328 def Authenticator_create(
1329 self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1330 authorization_data):
1331 # -- Unencrypted authenticator
1332 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1333 # authenticator-vno [0] INTEGER (5),
1334 # crealm [1] Realm,
1335 # cname [2] PrincipalName,
1336 # cksum [3] Checksum OPTIONAL,
1337 # cusec [4] Microseconds,
1338 # ctime [5] KerberosTime,
1339 # subkey [6] EncryptionKey OPTIONAL,
1340 # seq-number [7] UInt32 OPTIONAL,
1341 # authorization-data [8] AuthorizationData OPTIONAL
1343 Authenticator_obj = {
1344 'authenticator-vno': 5,
1345 'crealm': crealm,
1346 'cname': cname,
1347 'cusec': cusec,
1348 'ctime': ctime,
1350 if cksum is not None:
1351 Authenticator_obj['cksum'] = cksum
1352 if subkey is not None:
1353 Authenticator_obj['subkey'] = subkey
1354 if seq_number is not None:
1355 Authenticator_obj['seq-number'] = seq_number
1356 if authorization_data is not None:
1357 Authenticator_obj['authorization-data'] = authorization_data
1358 return Authenticator_obj
1360 def TGS_REQ_create(self,
1361 padata, # optional
1362 cusec,
1363 ctime,
1364 ticket,
1365 kdc_options, # required
1366 cname, # optional
1367 realm, # required
1368 sname, # optional
1369 from_time, # optional
1370 till_time, # required
1371 renew_time, # optional
1372 nonce, # required
1373 etypes, # required
1374 addresses, # optional
1375 EncAuthorizationData,
1376 EncAuthorizationData_key,
1377 additional_tickets,
1378 ticket_session_key,
1379 authenticator_subkey=None,
1380 body_checksum_type=None,
1381 native_decoded_only=True,
1382 asn1_print=None,
1383 hexdump=None):
1384 # KDC-REQ ::= SEQUENCE {
1385 # -- NOTE: first tag is [1], not [0]
1386 # pvno [1] INTEGER (5) ,
1387 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1388 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1389 # -- NOTE: not empty --,
1390 # req-body [4] KDC-REQ-BODY
1393 # KDC-REQ-BODY ::= SEQUENCE {
1394 # kdc-options [0] KDCOptions,
1395 # cname [1] PrincipalName OPTIONAL
1396 # -- Used only in AS-REQ --,
1397 # realm [2] Realm
1398 # -- Server's realm
1399 # -- Also client's in AS-REQ --,
1400 # sname [3] PrincipalName OPTIONAL,
1401 # from [4] KerberosTime OPTIONAL,
1402 # till [5] KerberosTime,
1403 # rtime [6] KerberosTime OPTIONAL,
1404 # nonce [7] UInt32,
1405 # etype [8] SEQUENCE OF Int32
1406 # -- EncryptionType
1407 # -- in preference order --,
1408 # addresses [9] HostAddresses OPTIONAL,
1409 # enc-authorization-data [10] EncryptedData OPTIONAL
1410 # -- AuthorizationData --,
1411 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1412 # -- NOTE: not empty
1415 if authenticator_subkey is not None:
1416 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1417 else:
1418 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1420 req_body = self.KDC_REQ_BODY_create(
1421 kdc_options=kdc_options,
1422 cname=None,
1423 realm=realm,
1424 sname=sname,
1425 from_time=from_time,
1426 till_time=till_time,
1427 renew_time=renew_time,
1428 nonce=nonce,
1429 etypes=etypes,
1430 addresses=addresses,
1431 additional_tickets=additional_tickets,
1432 EncAuthorizationData=EncAuthorizationData,
1433 EncAuthorizationData_key=EncAuthorizationData_key,
1434 EncAuthorizationData_usage=EncAuthorizationData_usage)
1435 req_body_blob = self.der_encode(req_body,
1436 asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1437 asn1_print=asn1_print, hexdump=hexdump)
1439 req_body_checksum = self.Checksum_create(ticket_session_key,
1440 KU_TGS_REQ_AUTH_CKSUM,
1441 req_body_blob,
1442 ctype=body_checksum_type)
1444 subkey_obj = None
1445 if authenticator_subkey is not None:
1446 subkey_obj = authenticator_subkey.export_obj()
1447 seq_number = random.randint(0, 0xfffffffe)
1448 authenticator = self.Authenticator_create(
1449 crealm=realm,
1450 cname=cname,
1451 cksum=req_body_checksum,
1452 cusec=cusec,
1453 ctime=ctime,
1454 subkey=subkey_obj,
1455 seq_number=seq_number,
1456 authorization_data=None)
1457 authenticator = self.der_encode(
1458 authenticator,
1459 asn1Spec=krb5_asn1.Authenticator(),
1460 asn1_print=asn1_print,
1461 hexdump=hexdump)
1463 authenticator = self.EncryptedData_create(
1464 ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1466 ap_options = krb5_asn1.APOptions('0')
1467 ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1468 ticket=ticket,
1469 authenticator=authenticator)
1470 ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1471 asn1_print=asn1_print, hexdump=hexdump)
1472 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1473 if padata is not None:
1474 padata.append(pa_tgs_req)
1475 else:
1476 padata = [pa_tgs_req]
1478 obj, decoded = self.KDC_REQ_create(
1479 msg_type=KRB_TGS_REQ,
1480 padata=padata,
1481 req_body=req_body,
1482 asn1Spec=krb5_asn1.TGS_REQ(),
1483 asn1_print=asn1_print,
1484 hexdump=hexdump)
1485 if native_decoded_only:
1486 return decoded
1487 return decoded, obj
1489 def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1490 # PA-S4U2Self ::= SEQUENCE {
1491 # name [0] PrincipalName,
1492 # realm [1] Realm,
1493 # cksum [2] Checksum,
1494 # auth [3] GeneralString
1496 cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1497 for n in name['name-string']:
1498 cksum_data += n.encode()
1499 cksum_data += realm.encode()
1500 cksum_data += "Kerberos".encode()
1501 cksum = self.Checksum_create(tgt_session_key,
1502 KU_NON_KERB_CKSUM_SALT,
1503 cksum_data,
1504 ctype)
1506 PA_S4U2Self_obj = {
1507 'name': name,
1508 'realm': realm,
1509 'cksum': cksum,
1510 'auth': "Kerberos",
1512 pa_s4u2self = self.der_encode(
1513 PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
1514 return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
1516 def _generic_kdc_exchange(self,
1517 kdc_exchange_dict, # required
1518 cname=None, # optional
1519 realm=None, # required
1520 sname=None, # optional
1521 from_time=None, # optional
1522 till_time=None, # required
1523 renew_time=None, # optional
1524 etypes=None, # required
1525 addresses=None, # optional
1526 additional_tickets=None, # optional
1527 EncAuthorizationData=None, # optional
1528 EncAuthorizationData_key=None, # optional
1529 EncAuthorizationData_usage=None): # optional
1531 check_error_fn = kdc_exchange_dict['check_error_fn']
1532 check_rep_fn = kdc_exchange_dict['check_rep_fn']
1533 generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
1534 generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
1535 generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
1536 generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
1537 callback_dict = kdc_exchange_dict['callback_dict']
1538 req_msg_type = kdc_exchange_dict['req_msg_type']
1539 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
1540 rep_msg_type = kdc_exchange_dict['rep_msg_type']
1542 expected_error_mode = kdc_exchange_dict['expected_error_mode']
1543 kdc_options = kdc_exchange_dict['kdc_options']
1545 # Parameters specific to the outer request body
1546 outer_req = kdc_exchange_dict['outer_req']
1548 if till_time is None:
1549 till_time = self.get_KerberosTime(offset=36000)
1551 if 'nonce' in kdc_exchange_dict:
1552 nonce = kdc_exchange_dict['nonce']
1553 else:
1554 nonce = self.get_Nonce()
1555 kdc_exchange_dict['nonce'] = nonce
1557 req_body = self.KDC_REQ_BODY_create(
1558 kdc_options=kdc_options,
1559 cname=cname,
1560 realm=realm,
1561 sname=sname,
1562 from_time=from_time,
1563 till_time=till_time,
1564 renew_time=renew_time,
1565 nonce=nonce,
1566 etypes=etypes,
1567 addresses=addresses,
1568 additional_tickets=additional_tickets,
1569 EncAuthorizationData=EncAuthorizationData,
1570 EncAuthorizationData_key=EncAuthorizationData_key,
1571 EncAuthorizationData_usage=EncAuthorizationData_usage)
1573 inner_req_body = dict(req_body)
1574 if outer_req is not None:
1575 for key, value in outer_req.items():
1576 if value is not None:
1577 req_body[key] = value
1578 else:
1579 del req_body[key]
1581 if req_msg_type == KRB_AS_REQ:
1582 tgs_req = None
1583 tgs_req_padata = None
1584 else:
1585 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1587 tgs_req = self.generate_ap_req(kdc_exchange_dict,
1588 callback_dict,
1589 req_body,
1590 armor=False)
1591 tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
1593 if generate_fast_padata_fn is not None:
1594 self.assertIsNotNone(generate_fast_fn)
1595 # This can alter req_body...
1596 fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
1597 callback_dict,
1598 req_body)
1599 else:
1600 fast_padata = []
1602 if generate_fast_armor_fn is not None:
1603 self.assertIsNotNone(generate_fast_fn)
1604 fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
1605 callback_dict,
1606 req_body,
1607 armor=True)
1609 fast_armor_type = kdc_exchange_dict['fast_armor_type']
1610 fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
1611 fast_ap_req)
1612 else:
1613 fast_armor = None
1615 if generate_padata_fn is not None:
1616 # This can alter req_body...
1617 outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
1618 callback_dict,
1619 req_body)
1620 self.assertIsNotNone(outer_padata)
1621 self.assertNotIn(PADATA_KDC_REQ,
1622 [pa['padata-type'] for pa in outer_padata],
1623 'Don\'t create TGS-REQ manually')
1624 else:
1625 outer_padata = None
1627 if generate_fast_fn is not None:
1628 armor_key = kdc_exchange_dict['armor_key']
1629 self.assertIsNotNone(armor_key)
1631 if req_msg_type == KRB_AS_REQ:
1632 checksum_blob = self.der_encode(
1633 req_body,
1634 asn1Spec=krb5_asn1.KDC_REQ_BODY())
1635 else:
1636 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1637 checksum_blob = tgs_req
1639 checksum = self.Checksum_create(armor_key,
1640 KU_FAST_REQ_CHKSUM,
1641 checksum_blob)
1643 fast = generate_fast_fn(kdc_exchange_dict,
1644 callback_dict,
1645 inner_req_body,
1646 fast_padata,
1647 fast_armor,
1648 checksum)
1649 else:
1650 fast = None
1652 padata = []
1654 if tgs_req_padata is not None:
1655 padata.append(tgs_req_padata)
1657 if fast is not None:
1658 padata.append(fast)
1660 if outer_padata is not None:
1661 padata += outer_padata
1663 if not padata:
1664 padata = None
1666 kdc_exchange_dict['req_padata'] = padata
1667 kdc_exchange_dict['fast_padata'] = fast_padata
1668 kdc_exchange_dict['req_body'] = inner_req_body
1670 req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
1671 padata=padata,
1672 req_body=req_body,
1673 asn1Spec=req_asn1Spec())
1675 rep = self.send_recv_transaction(req_decoded)
1676 self.assertIsNotNone(rep)
1678 msg_type = self.getElementValue(rep, 'msg-type')
1679 self.assertIsNotNone(msg_type)
1681 expected_msg_type = None
1682 if check_error_fn is not None:
1683 expected_msg_type = KRB_ERROR
1684 self.assertIsNone(check_rep_fn)
1685 self.assertNotEqual(0, expected_error_mode)
1686 if check_rep_fn is not None:
1687 expected_msg_type = rep_msg_type
1688 self.assertIsNone(check_error_fn)
1689 self.assertEqual(0, expected_error_mode)
1690 self.assertIsNotNone(expected_msg_type)
1691 self.assertEqual(msg_type, expected_msg_type)
1693 if msg_type == KRB_ERROR:
1694 return check_error_fn(kdc_exchange_dict,
1695 callback_dict,
1696 rep)
1698 return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
1700 def as_exchange_dict(self,
1701 expected_crealm=None,
1702 expected_cname=None,
1703 expected_cname_private=None,
1704 expected_srealm=None,
1705 expected_sname=None,
1706 ticket_decryption_key=None,
1707 generate_fast_fn=None,
1708 generate_fast_armor_fn=None,
1709 generate_fast_padata_fn=None,
1710 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
1711 generate_padata_fn=None,
1712 check_error_fn=None,
1713 check_rep_fn=None,
1714 check_padata_fn=None,
1715 check_kdc_private_fn=None,
1716 callback_dict=None,
1717 expected_error_mode=0,
1718 client_as_etypes=None,
1719 expected_salt=None,
1720 authenticator_subkey=None,
1721 armor_key=None,
1722 armor_tgt=None,
1723 armor_subkey=None,
1724 auth_data=None,
1725 kdc_options='',
1726 outer_req=None):
1727 kdc_exchange_dict = {
1728 'req_msg_type': KRB_AS_REQ,
1729 'req_asn1Spec': krb5_asn1.AS_REQ,
1730 'rep_msg_type': KRB_AS_REP,
1731 'rep_asn1Spec': krb5_asn1.AS_REP,
1732 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
1733 'expected_crealm': expected_crealm,
1734 'expected_cname': expected_cname,
1735 'expected_srealm': expected_srealm,
1736 'expected_sname': expected_sname,
1737 'ticket_decryption_key': ticket_decryption_key,
1738 'generate_fast_fn': generate_fast_fn,
1739 'generate_fast_armor_fn': generate_fast_armor_fn,
1740 'generate_fast_padata_fn': generate_fast_padata_fn,
1741 'fast_armor_type': fast_armor_type,
1742 'generate_padata_fn': generate_padata_fn,
1743 'check_error_fn': check_error_fn,
1744 'check_rep_fn': check_rep_fn,
1745 'check_padata_fn': check_padata_fn,
1746 'check_kdc_private_fn': check_kdc_private_fn,
1747 'callback_dict': callback_dict,
1748 'expected_error_mode': expected_error_mode,
1749 'client_as_etypes': client_as_etypes,
1750 'expected_salt': expected_salt,
1751 'authenticator_subkey': authenticator_subkey,
1752 'armor_key': armor_key,
1753 'armor_tgt': armor_tgt,
1754 'armor_subkey': armor_subkey,
1755 'auth_data': auth_data,
1756 'kdc_options': kdc_options,
1757 'outer_req': outer_req
1759 if expected_cname_private is not None:
1760 kdc_exchange_dict['expected_cname_private'] = (
1761 expected_cname_private)
1763 if callback_dict is None:
1764 callback_dict = {}
1766 return kdc_exchange_dict
1768 def tgs_exchange_dict(self,
1769 expected_crealm=None,
1770 expected_cname=None,
1771 expected_cname_private=None,
1772 expected_srealm=None,
1773 expected_sname=None,
1774 ticket_decryption_key=None,
1775 generate_fast_fn=None,
1776 generate_fast_armor_fn=None,
1777 generate_fast_padata_fn=None,
1778 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
1779 generate_padata_fn=None,
1780 check_error_fn=None,
1781 check_rep_fn=None,
1782 check_padata_fn=None,
1783 check_kdc_private_fn=None,
1784 callback_dict=None,
1785 tgt=None,
1786 armor_key=None,
1787 armor_tgt=None,
1788 armor_subkey=None,
1789 authenticator_subkey=None,
1790 auth_data=None,
1791 body_checksum_type=None,
1792 kdc_options='',
1793 outer_req=None):
1794 kdc_exchange_dict = {
1795 'req_msg_type': KRB_TGS_REQ,
1796 'req_asn1Spec': krb5_asn1.TGS_REQ,
1797 'rep_msg_type': KRB_TGS_REP,
1798 'rep_asn1Spec': krb5_asn1.TGS_REP,
1799 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
1800 'expected_crealm': expected_crealm,
1801 'expected_cname': expected_cname,
1802 'expected_srealm': expected_srealm,
1803 'expected_sname': expected_sname,
1804 'ticket_decryption_key': ticket_decryption_key,
1805 'generate_fast_fn': generate_fast_fn,
1806 'generate_fast_armor_fn': generate_fast_armor_fn,
1807 'generate_fast_padata_fn': generate_fast_padata_fn,
1808 'fast_armor_type': fast_armor_type,
1809 'generate_padata_fn': generate_padata_fn,
1810 'check_error_fn': check_error_fn,
1811 'check_rep_fn': check_rep_fn,
1812 'check_padata_fn': check_padata_fn,
1813 'check_kdc_private_fn': check_kdc_private_fn,
1814 'callback_dict': callback_dict,
1815 'tgt': tgt,
1816 'body_checksum_type': body_checksum_type,
1817 'armor_key': armor_key,
1818 'armor_tgt': armor_tgt,
1819 'armor_subkey': armor_subkey,
1820 'auth_data': auth_data,
1821 'authenticator_subkey': authenticator_subkey,
1822 'kdc_options': kdc_options,
1823 'outer_req': outer_req
1825 if expected_cname_private is not None:
1826 kdc_exchange_dict['expected_cname_private'] = (
1827 expected_cname_private)
1829 if callback_dict is None:
1830 callback_dict = {}
1832 return kdc_exchange_dict
1834 def generic_check_kdc_rep(self,
1835 kdc_exchange_dict,
1836 callback_dict,
1837 rep):
1839 expected_crealm = kdc_exchange_dict['expected_crealm']
1840 expected_cname = kdc_exchange_dict['expected_cname']
1841 expected_srealm = kdc_exchange_dict['expected_srealm']
1842 expected_sname = kdc_exchange_dict['expected_sname']
1843 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
1844 check_padata_fn = kdc_exchange_dict['check_padata_fn']
1845 check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
1846 rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
1847 msg_type = kdc_exchange_dict['rep_msg_type']
1848 armor_key = kdc_exchange_dict['armor_key']
1850 self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
1851 padata = self.getElementValue(rep, 'padata')
1852 if self.strict_checking:
1853 self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
1854 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
1855 self.assertElementPresent(rep, 'ticket')
1856 ticket = self.getElementValue(rep, 'ticket')
1857 ticket_encpart = None
1858 ticket_cipher = None
1859 self.assertIsNotNone(ticket)
1860 if ticket is not None: # Never None, but gives indentation
1861 self.assertElementEqual(ticket, 'tkt-vno', 5)
1862 self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
1863 self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
1864 self.assertElementPresent(ticket, 'enc-part')
1865 ticket_encpart = self.getElementValue(ticket, 'enc-part')
1866 self.assertIsNotNone(ticket_encpart)
1867 if ticket_encpart is not None: # Never None, but gives indentation
1868 self.assertElementPresent(ticket_encpart, 'etype')
1869 # 'unspecified' means present, with any value != 0
1870 self.assertElementKVNO(ticket_encpart, 'kvno',
1871 self.unspecified_kvno)
1872 self.assertElementPresent(ticket_encpart, 'cipher')
1873 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
1874 self.assertElementPresent(rep, 'enc-part')
1875 encpart = self.getElementValue(rep, 'enc-part')
1876 encpart_cipher = None
1877 self.assertIsNotNone(encpart)
1878 if encpart is not None: # Never None, but gives indentation
1879 self.assertElementPresent(encpart, 'etype')
1880 self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
1881 self.assertElementPresent(encpart, 'cipher')
1882 encpart_cipher = self.getElementValue(encpart, 'cipher')
1884 ticket_checksum = None
1886 encpart_decryption_key = None
1887 self.assertIsNotNone(check_padata_fn)
1888 if check_padata_fn is not None:
1889 # See if we can get the decryption key from the preauth phase
1890 encpart_decryption_key, encpart_decryption_usage = (
1891 check_padata_fn(kdc_exchange_dict, callback_dict,
1892 rep, padata))
1894 if armor_key is not None:
1895 pa_dict = self.get_pa_dict(padata)
1897 if PADATA_FX_FAST in pa_dict:
1898 fx_fast_data = pa_dict[PADATA_FX_FAST]
1899 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
1900 fx_fast_data,
1901 armor_key,
1902 finished=True)
1904 if 'strengthen-key' in fast_response:
1905 strengthen_key = self.EncryptionKey_import(
1906 fast_response['strengthen-key'])
1907 encpart_decryption_key = (
1908 self.generate_strengthen_reply_key(
1909 strengthen_key,
1910 encpart_decryption_key))
1912 fast_finished = fast_response.get('finished', None)
1913 if fast_finished is not None:
1914 ticket_checksum = fast_finished['ticket-checksum']
1916 self.check_rep_padata(kdc_exchange_dict,
1917 callback_dict,
1918 rep,
1919 fast_response['padata'])
1921 ticket_private = None
1922 self.assertIsNotNone(ticket_decryption_key)
1923 if ticket_decryption_key is not None:
1924 self.assertElementEqual(ticket_encpart, 'etype',
1925 ticket_decryption_key.etype)
1926 self.assertElementKVNO(ticket_encpart, 'kvno',
1927 ticket_decryption_key.kvno)
1928 ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
1929 ticket_cipher)
1930 ticket_private = self.der_decode(
1931 ticket_decpart,
1932 asn1Spec=krb5_asn1.EncTicketPart())
1934 encpart_private = None
1935 self.assertIsNotNone(encpart_decryption_key)
1936 if encpart_decryption_key is not None:
1937 self.assertElementEqual(encpart, 'etype',
1938 encpart_decryption_key.etype)
1939 if self.strict_checking:
1940 self.assertElementKVNO(encpart, 'kvno',
1941 encpart_decryption_key.kvno)
1942 rep_decpart = encpart_decryption_key.decrypt(
1943 encpart_decryption_usage,
1944 encpart_cipher)
1945 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
1946 # application tag 26
1947 try:
1948 encpart_private = self.der_decode(
1949 rep_decpart,
1950 asn1Spec=rep_encpart_asn1Spec())
1951 except Exception:
1952 encpart_private = self.der_decode(
1953 rep_decpart,
1954 asn1Spec=krb5_asn1.EncTGSRepPart())
1956 self.assertIsNotNone(check_kdc_private_fn)
1957 if check_kdc_private_fn is not None:
1958 check_kdc_private_fn(kdc_exchange_dict, callback_dict,
1959 rep, ticket_private, encpart_private,
1960 ticket_checksum)
1962 return rep
1964 def check_fx_fast_data(self,
1965 kdc_exchange_dict,
1966 fx_fast_data,
1967 armor_key,
1968 finished=False,
1969 expect_strengthen_key=True):
1970 fx_fast_data = self.der_decode(fx_fast_data,
1971 asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
1973 enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
1974 self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
1976 fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
1978 fast_response = self.der_decode(fast_rep,
1979 asn1Spec=krb5_asn1.KrbFastResponse())
1981 if expect_strengthen_key and self.strict_checking:
1982 self.assertIn('strengthen-key', fast_response)
1984 if finished:
1985 self.assertIn('finished', fast_response)
1987 # Ensure that the nonce matches the nonce in the body of the request
1988 # (RFC6113 5.4.3).
1989 nonce = kdc_exchange_dict['nonce']
1990 self.assertEqual(nonce, fast_response['nonce'])
1992 return fast_response
1994 def generic_check_kdc_private(self,
1995 kdc_exchange_dict,
1996 callback_dict,
1997 rep,
1998 ticket_private,
1999 encpart_private,
2000 ticket_checksum):
2001 kdc_options = kdc_exchange_dict['kdc_options']
2002 canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
2003 canonicalize = (canon_pos < len(kdc_options)
2004 and kdc_options[canon_pos] == '1')
2006 expected_crealm = kdc_exchange_dict['expected_crealm']
2007 expected_srealm = kdc_exchange_dict['expected_srealm']
2008 expected_sname = kdc_exchange_dict['expected_sname']
2009 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2011 try:
2012 expected_cname = kdc_exchange_dict['expected_cname_private']
2013 except KeyError:
2014 expected_cname = kdc_exchange_dict['expected_cname']
2016 ticket = self.getElementValue(rep, 'ticket')
2018 if ticket_checksum is not None:
2019 armor_key = kdc_exchange_dict['armor_key']
2020 self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
2022 ticket_session_key = None
2023 if ticket_private is not None:
2024 self.assertElementPresent(ticket_private, 'flags')
2025 self.assertElementPresent(ticket_private, 'key')
2026 ticket_key = self.getElementValue(ticket_private, 'key')
2027 self.assertIsNotNone(ticket_key)
2028 if ticket_key is not None: # Never None, but gives indentation
2029 self.assertElementPresent(ticket_key, 'keytype')
2030 self.assertElementPresent(ticket_key, 'keyvalue')
2031 ticket_session_key = self.EncryptionKey_import(ticket_key)
2032 self.assertElementEqualUTF8(ticket_private, 'crealm',
2033 expected_crealm)
2034 self.assertElementEqualPrincipal(ticket_private, 'cname',
2035 expected_cname)
2036 self.assertElementPresent(ticket_private, 'transited')
2037 self.assertElementPresent(ticket_private, 'authtime')
2038 if self.strict_checking:
2039 self.assertElementPresent(ticket_private, 'starttime')
2040 self.assertElementPresent(ticket_private, 'endtime')
2041 # TODO self.assertElementPresent(ticket_private, 'renew-till')
2042 # TODO self.assertElementMissing(ticket_private, 'caddr')
2043 self.assertElementPresent(ticket_private, 'authorization-data')
2045 encpart_session_key = None
2046 if encpart_private is not None:
2047 self.assertElementPresent(encpart_private, 'key')
2048 encpart_key = self.getElementValue(encpart_private, 'key')
2049 self.assertIsNotNone(encpart_key)
2050 if encpart_key is not None: # Never None, but gives indentation
2051 self.assertElementPresent(encpart_key, 'keytype')
2052 self.assertElementPresent(encpart_key, 'keyvalue')
2053 encpart_session_key = self.EncryptionKey_import(encpart_key)
2054 self.assertElementPresent(encpart_private, 'last-req')
2055 self.assertElementEqual(encpart_private, 'nonce',
2056 kdc_exchange_dict['nonce'])
2057 # TODO self.assertElementPresent(encpart_private,
2058 # 'key-expiration')
2059 self.assertElementPresent(encpart_private, 'flags')
2060 self.assertElementPresent(encpart_private, 'authtime')
2061 if self.strict_checking:
2062 self.assertElementPresent(encpart_private, 'starttime')
2063 self.assertElementPresent(encpart_private, 'endtime')
2064 # TODO self.assertElementPresent(encpart_private, 'renew-till')
2065 self.assertElementEqualUTF8(encpart_private, 'srealm',
2066 expected_srealm)
2067 self.assertElementEqualPrincipal(encpart_private, 'sname',
2068 expected_sname)
2069 # TODO self.assertElementMissing(encpart_private, 'caddr')
2071 sent_claims = self.sent_claims(kdc_exchange_dict)
2073 if self.strict_checking:
2074 if sent_claims or canonicalize:
2075 self.assertElementPresent(encpart_private,
2076 'encrypted-pa-data')
2077 enc_pa_dict = self.get_pa_dict(
2078 encpart_private['encrypted-pa-data'])
2079 if canonicalize:
2080 self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2082 (supported_etypes,) = struct.unpack(
2083 '<L',
2084 enc_pa_dict[PADATA_SUPPORTED_ETYPES])
2086 self.assertTrue(
2087 security.KERB_ENCTYPE_FAST_SUPPORTED
2088 & supported_etypes)
2089 self.assertTrue(
2090 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED
2091 & supported_etypes)
2092 self.assertTrue(
2093 security.KERB_ENCTYPE_CLAIMS_SUPPORTED
2094 & supported_etypes)
2095 else:
2096 self.assertNotIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2098 # ClaimsCompIdFASTSupported registry key
2099 if sent_claims:
2100 self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2102 self.check_pac_options_claims_support(
2103 enc_pa_dict[PADATA_PAC_OPTIONS])
2104 else:
2105 self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2106 else:
2107 self.assertElementEqual(encpart_private,
2108 'encrypted-pa-data',
2111 if ticket_session_key is not None and encpart_session_key is not None:
2112 self.assertEqual(ticket_session_key.etype,
2113 encpart_session_key.etype)
2114 self.assertEqual(ticket_session_key.key.contents,
2115 encpart_session_key.key.contents)
2116 if encpart_session_key is not None:
2117 session_key = encpart_session_key
2118 else:
2119 session_key = ticket_session_key
2120 ticket_creds = KerberosTicketCreds(
2121 ticket,
2122 session_key,
2123 crealm=expected_crealm,
2124 cname=expected_cname,
2125 srealm=expected_srealm,
2126 sname=expected_sname,
2127 decryption_key=ticket_decryption_key,
2128 ticket_private=ticket_private,
2129 encpart_private=encpart_private)
2131 kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
2133 def check_pac_options_claims_support(self, pac_options):
2134 pac_options = self.der_decode(pac_options,
2135 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2136 self.assertEqual('1', pac_options['options'][0]) # claims bit
2138 def generic_check_kdc_error(self,
2139 kdc_exchange_dict,
2140 callback_dict,
2141 rep):
2143 expected_crealm = kdc_exchange_dict['expected_crealm']
2144 expected_cname = kdc_exchange_dict['expected_cname']
2145 expected_srealm = kdc_exchange_dict['expected_srealm']
2146 expected_sname = kdc_exchange_dict['expected_sname']
2147 expected_error_mode = kdc_exchange_dict['expected_error_mode']
2149 self.assertElementEqual(rep, 'pvno', 5)
2150 self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
2151 self.assertElementEqual(rep, 'error-code', expected_error_mode)
2152 if self.strict_checking:
2153 self.assertElementMissing(rep, 'ctime')
2154 self.assertElementMissing(rep, 'cusec')
2155 self.assertElementPresent(rep, 'stime')
2156 self.assertElementPresent(rep, 'susec')
2157 # error-code checked above
2158 if self.strict_checking:
2159 self.assertElementMissing(rep, 'crealm')
2160 self.assertElementMissing(rep, 'cname')
2161 self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
2162 self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
2163 self.assertElementMissing(rep, 'e-text')
2164 if expected_error_mode == KDC_ERR_GENERIC:
2165 self.assertElementMissing(rep, 'e-data')
2166 return rep
2167 edata = self.getElementValue(rep, 'e-data')
2168 if self.strict_checking:
2169 self.assertIsNotNone(edata)
2170 if edata is not None:
2171 rep_padata = self.der_decode(edata,
2172 asn1Spec=krb5_asn1.METHOD_DATA())
2173 self.assertGreater(len(rep_padata), 0)
2174 else:
2175 rep_padata = []
2177 etype_info2 = self.check_rep_padata(kdc_exchange_dict,
2178 callback_dict,
2179 rep,
2180 rep_padata)
2182 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
2184 return rep
2186 def check_rep_padata(self,
2187 kdc_exchange_dict,
2188 callback_dict,
2189 rep,
2190 rep_padata):
2191 expected_error_mode = kdc_exchange_dict['expected_error_mode']
2192 req_body = kdc_exchange_dict['req_body']
2193 proposed_etypes = req_body['etype']
2194 client_as_etypes = kdc_exchange_dict.get('client_as_etypes', [])
2196 expect_etype_info2 = ()
2197 expect_etype_info = False
2198 unexpect_etype_info = True
2199 expected_aes_type = 0
2200 expected_rc4_type = 0
2201 if kcrypto.Enctype.RC4 in proposed_etypes:
2202 expect_etype_info = True
2203 for etype in proposed_etypes:
2204 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2205 expect_etype_info = False
2206 if etype not in client_as_etypes:
2207 continue
2208 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2209 if etype > expected_aes_type:
2210 expected_aes_type = etype
2211 if etype in (kcrypto.Enctype.RC4,) and expected_error_mode != 0:
2212 unexpect_etype_info = False
2213 if etype > expected_rc4_type:
2214 expected_rc4_type = etype
2216 if expected_aes_type != 0:
2217 expect_etype_info2 += (expected_aes_type,)
2218 if expected_rc4_type != 0:
2219 expect_etype_info2 += (expected_rc4_type,)
2221 expected_patypes = ()
2222 if expect_etype_info:
2223 self.assertGreater(len(expect_etype_info2), 0)
2224 expected_patypes += (PADATA_ETYPE_INFO,)
2225 if len(expect_etype_info2) != 0:
2226 expected_patypes += (PADATA_ETYPE_INFO2,)
2228 expected_patypes += (PADATA_ENC_TIMESTAMP,)
2229 expected_patypes += (PADATA_PK_AS_REQ,)
2230 expected_patypes += (PADATA_PK_AS_REP_19,)
2232 if self.strict_checking:
2233 for i, patype in enumerate(expected_patypes):
2234 self.assertElementEqual(rep_padata[i], 'padata-type', patype)
2235 self.assertEqual(len(rep_padata), len(expected_patypes))
2237 etype_info2 = None
2238 etype_info = None
2239 enc_timestamp = None
2240 pk_as_req = None
2241 pk_as_rep19 = None
2242 for pa in rep_padata:
2243 patype = self.getElementValue(pa, 'padata-type')
2244 pavalue = self.getElementValue(pa, 'padata-value')
2245 if patype == PADATA_ETYPE_INFO2:
2246 self.assertIsNone(etype_info2)
2247 etype_info2 = self.der_decode(pavalue,
2248 asn1Spec=krb5_asn1.ETYPE_INFO2())
2249 continue
2250 if patype == PADATA_ETYPE_INFO:
2251 self.assertIsNone(etype_info)
2252 etype_info = self.der_decode(pavalue,
2253 asn1Spec=krb5_asn1.ETYPE_INFO())
2254 continue
2255 if patype == PADATA_ENC_TIMESTAMP:
2256 self.assertIsNone(enc_timestamp)
2257 enc_timestamp = pavalue
2258 self.assertEqual(len(enc_timestamp), 0)
2259 continue
2260 if patype == PADATA_PK_AS_REQ:
2261 self.assertIsNone(pk_as_req)
2262 pk_as_req = pavalue
2263 self.assertEqual(len(pk_as_req), 0)
2264 continue
2265 if patype == PADATA_PK_AS_REP_19:
2266 self.assertIsNone(pk_as_rep19)
2267 pk_as_rep19 = pavalue
2268 self.assertEqual(len(pk_as_rep19), 0)
2269 continue
2271 if all(etype not in client_as_etypes or etype not in proposed_etypes
2272 for etype in (kcrypto.Enctype.AES256,
2273 kcrypto.Enctype.AES128,
2274 kcrypto.Enctype.RC4)):
2275 self.assertIsNone(etype_info2)
2276 self.assertIsNone(etype_info)
2277 if self.strict_checking:
2278 self.assertIsNotNone(enc_timestamp)
2279 self.assertIsNotNone(pk_as_req)
2280 self.assertIsNotNone(pk_as_rep19)
2281 return None
2283 if self.strict_checking:
2284 self.assertIsNotNone(etype_info2)
2285 if expect_etype_info:
2286 self.assertIsNotNone(etype_info)
2287 else:
2288 if self.strict_checking:
2289 self.assertIsNone(etype_info)
2290 if unexpect_etype_info:
2291 self.assertIsNone(etype_info)
2293 if self.strict_checking:
2294 self.assertGreaterEqual(len(etype_info2), 1)
2295 self.assertEqual(len(etype_info2), len(expect_etype_info2))
2296 for i in range(0, len(etype_info2)):
2297 e = self.getElementValue(etype_info2[i], 'etype')
2298 self.assertEqual(e, expect_etype_info2[i])
2299 salt = self.getElementValue(etype_info2[i], 'salt')
2300 if e == kcrypto.Enctype.RC4:
2301 self.assertIsNone(salt)
2302 else:
2303 self.assertIsNotNone(salt)
2304 expected_salt = kdc_exchange_dict['expected_salt']
2305 if expected_salt is not None:
2306 self.assertEqual(salt, expected_salt)
2307 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
2308 if self.strict_checking:
2309 self.assertIsNone(s2kparams)
2310 if etype_info is not None:
2311 self.assertEqual(len(etype_info), 1)
2312 e = self.getElementValue(etype_info[0], 'etype')
2313 self.assertEqual(e, kcrypto.Enctype.RC4)
2314 self.assertEqual(e, expect_etype_info2[0])
2315 salt = self.getElementValue(etype_info[0], 'salt')
2316 if self.strict_checking:
2317 self.assertIsNotNone(salt)
2318 self.assertEqual(len(salt), 0)
2320 self.assertIsNotNone(enc_timestamp)
2321 self.assertIsNotNone(pk_as_req)
2322 self.assertIsNotNone(pk_as_rep19)
2324 return etype_info2
2326 def generate_simple_fast(self,
2327 kdc_exchange_dict,
2328 _callback_dict,
2329 req_body,
2330 fast_padata,
2331 fast_armor,
2332 checksum,
2333 fast_options=''):
2334 armor_key = kdc_exchange_dict['armor_key']
2336 fast_req = self.KRB_FAST_REQ_create(fast_options,
2337 fast_padata,
2338 req_body)
2339 fast_req = self.der_encode(fast_req,
2340 asn1Spec=krb5_asn1.KrbFastReq())
2341 fast_req = self.EncryptedData_create(armor_key,
2342 KU_FAST_ENC,
2343 fast_req)
2345 fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
2346 checksum,
2347 fast_req)
2349 fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
2350 fx_fast_request = self.der_encode(
2351 fx_fast_request,
2352 asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
2354 fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
2355 fx_fast_request)
2357 return fast_padata
2359 def generate_ap_req(self,
2360 kdc_exchange_dict,
2361 _callback_dict,
2362 req_body,
2363 armor):
2364 if armor:
2365 tgt = kdc_exchange_dict['armor_tgt']
2366 authenticator_subkey = kdc_exchange_dict['armor_subkey']
2368 req_body_checksum = None
2369 else:
2370 tgt = kdc_exchange_dict['tgt']
2371 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2372 body_checksum_type = kdc_exchange_dict['body_checksum_type']
2374 req_body_blob = self.der_encode(req_body,
2375 asn1Spec=krb5_asn1.KDC_REQ_BODY())
2377 req_body_checksum = self.Checksum_create(tgt.session_key,
2378 KU_TGS_REQ_AUTH_CKSUM,
2379 req_body_blob,
2380 ctype=body_checksum_type)
2382 auth_data = kdc_exchange_dict['auth_data']
2384 subkey_obj = None
2385 if authenticator_subkey is not None:
2386 subkey_obj = authenticator_subkey.export_obj()
2387 seq_number = random.randint(0, 0xfffffffe)
2388 (ctime, cusec) = self.get_KerberosTimeWithUsec()
2389 authenticator_obj = self.Authenticator_create(
2390 crealm=tgt.crealm,
2391 cname=tgt.cname,
2392 cksum=req_body_checksum,
2393 cusec=cusec,
2394 ctime=ctime,
2395 subkey=subkey_obj,
2396 seq_number=seq_number,
2397 authorization_data=auth_data)
2398 authenticator_blob = self.der_encode(
2399 authenticator_obj,
2400 asn1Spec=krb5_asn1.Authenticator())
2402 usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
2403 authenticator = self.EncryptedData_create(tgt.session_key,
2404 usage,
2405 authenticator_blob)
2407 ap_options = krb5_asn1.APOptions('0')
2408 ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
2409 ticket=tgt.ticket,
2410 authenticator=authenticator)
2411 ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
2413 return ap_req
2415 def generate_simple_tgs_padata(self,
2416 kdc_exchange_dict,
2417 callback_dict,
2418 req_body):
2419 ap_req = self.generate_ap_req(kdc_exchange_dict,
2420 callback_dict,
2421 req_body,
2422 armor=False)
2423 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
2424 padata = [pa_tgs_req]
2426 return padata, req_body
2428 def check_simple_tgs_padata(self,
2429 kdc_exchange_dict,
2430 callback_dict,
2431 rep,
2432 padata):
2433 tgt = kdc_exchange_dict['tgt']
2434 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2435 if authenticator_subkey is not None:
2436 subkey = authenticator_subkey
2437 subkey_usage = KU_TGS_REP_ENC_PART_SUB_KEY
2438 else:
2439 subkey = tgt.session_key
2440 subkey_usage = KU_TGS_REP_ENC_PART_SESSION
2442 return subkey, subkey_usage
2444 def generate_armor_key(self, subkey, session_key):
2445 armor_key = kcrypto.cf2(subkey.key,
2446 session_key.key,
2447 b'subkeyarmor',
2448 b'ticketarmor')
2449 armor_key = Krb5EncryptionKey(armor_key, None)
2451 return armor_key
2453 def generate_strengthen_reply_key(self, strengthen_key, reply_key):
2454 strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
2455 reply_key.key,
2456 b'strengthenkey',
2457 b'replykey')
2458 strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
2459 reply_key.kvno)
2461 return strengthen_reply_key
2463 def generate_client_challenge_key(self, armor_key, longterm_key):
2464 client_challenge_key = kcrypto.cf2(armor_key.key,
2465 longterm_key.key,
2466 b'clientchallengearmor',
2467 b'challengelongterm')
2468 client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
2470 return client_challenge_key
2472 def generate_kdc_challenge_key(self, armor_key, longterm_key):
2473 kdc_challenge_key = kcrypto.cf2(armor_key.key,
2474 longterm_key.key,
2475 b'kdcchallengearmor',
2476 b'challengelongterm')
2477 kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
2479 return kdc_challenge_key
2481 def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
2482 expected_type = expected_checksum['cksumtype']
2483 self.assertEqual(armor_key.ctype, expected_type)
2485 ticket_blob = self.der_encode(ticket,
2486 asn1Spec=krb5_asn1.Ticket())
2487 checksum = self.Checksum_create(armor_key,
2488 KU_FAST_FINISHED,
2489 ticket_blob)
2490 self.assertEqual(expected_checksum, checksum)
2492 def get_outer_pa_dict(self, kdc_exchange_dict):
2493 return self.get_pa_dict(kdc_exchange_dict['req_padata'])
2495 def get_fast_pa_dict(self, kdc_exchange_dict):
2496 req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
2498 if req_pa_dict:
2499 return req_pa_dict
2501 return self.get_outer_pa_dict(kdc_exchange_dict)
2503 def sent_fast(self, kdc_exchange_dict):
2504 outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
2506 return PADATA_FX_FAST in outer_pa_dict
2508 def sent_enc_challenge(self, kdc_exchange_dict):
2509 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
2511 return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
2513 def sent_claims(self, kdc_exchange_dict):
2514 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
2516 if PADATA_PAC_OPTIONS not in fast_pa_dict:
2517 return False
2519 pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
2520 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2521 pac_options = pac_options['options']
2522 claims_pos = len(tuple(krb5_asn1.PACOptionFlags('claims'))) - 1
2524 return (claims_pos < len(pac_options)
2525 and pac_options[claims_pos] == '1')
2527 def _test_as_exchange(self,
2528 cname,
2529 realm,
2530 sname,
2531 till,
2532 client_as_etypes,
2533 expected_error_mode,
2534 expected_crealm,
2535 expected_cname,
2536 expected_srealm,
2537 expected_sname,
2538 expected_salt,
2539 etypes,
2540 padata,
2541 kdc_options,
2542 preauth_key=None,
2543 ticket_decryption_key=None):
2545 def _generate_padata_copy(_kdc_exchange_dict,
2546 _callback_dict,
2547 req_body):
2548 return padata, req_body
2550 def _check_padata_preauth_key(_kdc_exchange_dict,
2551 _callback_dict,
2552 rep,
2553 padata):
2554 as_rep_usage = KU_AS_REP_ENC_PART
2555 return preauth_key, as_rep_usage
2557 if expected_error_mode == 0:
2558 check_error_fn = None
2559 check_rep_fn = self.generic_check_kdc_rep
2560 else:
2561 check_error_fn = self.generic_check_kdc_error
2562 check_rep_fn = None
2564 if padata is not None:
2565 generate_padata_fn = _generate_padata_copy
2566 else:
2567 generate_padata_fn = None
2569 kdc_exchange_dict = self.as_exchange_dict(
2570 expected_crealm=expected_crealm,
2571 expected_cname=expected_cname,
2572 expected_srealm=expected_srealm,
2573 expected_sname=expected_sname,
2574 ticket_decryption_key=ticket_decryption_key,
2575 generate_padata_fn=generate_padata_fn,
2576 check_error_fn=check_error_fn,
2577 check_rep_fn=check_rep_fn,
2578 check_padata_fn=_check_padata_preauth_key,
2579 check_kdc_private_fn=self.generic_check_kdc_private,
2580 expected_error_mode=expected_error_mode,
2581 client_as_etypes=client_as_etypes,
2582 expected_salt=expected_salt,
2583 kdc_options=str(kdc_options))
2585 rep = self._generic_kdc_exchange(kdc_exchange_dict,
2586 cname=cname,
2587 realm=realm,
2588 sname=sname,
2589 till_time=till,
2590 etypes=etypes)
2592 return rep, kdc_exchange_dict