tests/krb5: Check reply FAST padata if request included FAST
[Samba.git] / python / samba / tests / krb5 / raw_testcase.py
blob965a8f9fb00972fa1d56d33b6058042310826e95
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import socket
21 import struct
22 import time
23 import datetime
24 import random
25 import binascii
26 import itertools
27 import collections
29 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
30 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
31 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
32 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
34 from pyasn1.codec.ber.encoder import BitStringEncoder
36 from samba.credentials import Credentials
37 from samba.dcerpc import security
38 from samba.gensec import FEATURE_SEAL
40 import samba.tests
41 from samba.tests import TestCaseInTempDir
43 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
44 from samba.tests.krb5.rfc4120_constants import (
45 FX_FAST_ARMOR_AP_REQUEST,
46 KDC_ERR_GENERIC,
47 KRB_AP_REQ,
48 KRB_AS_REP,
49 KRB_AS_REQ,
50 KRB_ERROR,
51 KRB_TGS_REP,
52 KRB_TGS_REQ,
53 KU_AP_REQ_AUTH,
54 KU_AS_REP_ENC_PART,
55 KU_FAST_ENC,
56 KU_FAST_FINISHED,
57 KU_FAST_REP,
58 KU_FAST_REQ_CHKSUM,
59 KU_NON_KERB_CKSUM_SALT,
60 KU_TGS_REP_ENC_PART_SESSION,
61 KU_TGS_REP_ENC_PART_SUB_KEY,
62 KU_TGS_REQ_AUTH,
63 KU_TGS_REQ_AUTH_CKSUM,
64 KU_TGS_REQ_AUTH_DAT_SESSION,
65 KU_TGS_REQ_AUTH_DAT_SUBKEY,
66 KU_TICKET,
67 NT_SRV_INST,
68 PADATA_ENC_TIMESTAMP,
69 PADATA_ETYPE_INFO,
70 PADATA_ETYPE_INFO2,
71 PADATA_FOR_USER,
72 PADATA_FX_FAST,
73 PADATA_KDC_REQ,
74 PADATA_PAC_OPTIONS,
75 PADATA_PAC_REQUEST,
76 PADATA_PK_AS_REQ,
77 PADATA_PK_AS_REP_19,
78 PADATA_SUPPORTED_ETYPES
80 import samba.tests.krb5.kcrypto as kcrypto
83 def BitStringEncoder_encodeValue32(
84 self, value, asn1Spec, encodeFun, **options):
86 # BitStrings like KDCOptions or TicketFlags should at least
87 # be 32-Bit on the wire
89 if asn1Spec is not None:
90 # TODO: try to avoid ASN.1 schema instantiation
91 value = asn1Spec.clone(value)
93 valueLength = len(value)
94 if valueLength % 8:
95 alignedValue = value << (8 - valueLength % 8)
96 else:
97 alignedValue = value
99 substrate = alignedValue.asOctets()
100 length = len(substrate)
101 # We need at least 32-Bit / 4-Bytes
102 if length < 4:
103 padding = 4 - length
104 else:
105 padding = 0
106 ret = b'\x00' + substrate + (b'\x00' * padding)
107 return ret, False, True
110 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
113 def BitString_NamedValues_prettyPrint(self, scope=0):
114 ret = "%s" % self.asBinary()
115 bits = []
116 highest_bit = 32
117 for byte in self.asNumbers():
118 for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
119 mask = 1 << bit
120 if byte & mask:
121 val = 1
122 else:
123 val = 0
124 bits.append(val)
125 if len(bits) < highest_bit:
126 for bitPosition in range(len(bits), highest_bit):
127 bits.append(0)
128 indent = " " * scope
129 delim = ": (\n%s " % indent
130 for bitPosition in range(highest_bit):
131 if bitPosition in self.prettyPrintNamedValues:
132 name = self.prettyPrintNamedValues[bitPosition]
133 elif bits[bitPosition] != 0:
134 name = "unknown-bit-%u" % bitPosition
135 else:
136 continue
137 ret += "%s%s:%u" % (delim, name, bits[bitPosition])
138 delim = ",\n%s " % indent
139 ret += "\n%s)" % indent
140 return ret
143 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
144 krb5_asn1.TicketFlagsValues.namedValues
145 krb5_asn1.TicketFlags.namedValues =\
146 krb5_asn1.TicketFlagsValues.namedValues
147 krb5_asn1.TicketFlags.prettyPrint =\
148 BitString_NamedValues_prettyPrint
149 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
150 krb5_asn1.KDCOptionsValues.namedValues
151 krb5_asn1.KDCOptions.namedValues =\
152 krb5_asn1.KDCOptionsValues.namedValues
153 krb5_asn1.KDCOptions.prettyPrint =\
154 BitString_NamedValues_prettyPrint
155 krb5_asn1.APOptions.prettyPrintNamedValues =\
156 krb5_asn1.APOptionsValues.namedValues
157 krb5_asn1.APOptions.namedValues =\
158 krb5_asn1.APOptionsValues.namedValues
159 krb5_asn1.APOptions.prettyPrint =\
160 BitString_NamedValues_prettyPrint
161 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
162 krb5_asn1.PACOptionFlagsValues.namedValues
163 krb5_asn1.PACOptionFlags.namedValues =\
164 krb5_asn1.PACOptionFlagsValues.namedValues
165 krb5_asn1.PACOptionFlags.prettyPrint =\
166 BitString_NamedValues_prettyPrint
169 def Integer_NamedValues_prettyPrint(self, scope=0):
170 intval = int(self)
171 if intval in self.prettyPrintNamedValues:
172 name = self.prettyPrintNamedValues[intval]
173 else:
174 name = "<__unknown__>"
175 ret = "%d (0x%x) %s" % (intval, intval, name)
176 return ret
179 krb5_asn1.NameType.prettyPrintNamedValues =\
180 krb5_asn1.NameTypeValues.namedValues
181 krb5_asn1.NameType.prettyPrint =\
182 Integer_NamedValues_prettyPrint
183 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
184 krb5_asn1.AuthDataTypeValues.namedValues
185 krb5_asn1.AuthDataType.prettyPrint =\
186 Integer_NamedValues_prettyPrint
187 krb5_asn1.PADataType.prettyPrintNamedValues =\
188 krb5_asn1.PADataTypeValues.namedValues
189 krb5_asn1.PADataType.prettyPrint =\
190 Integer_NamedValues_prettyPrint
191 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
192 krb5_asn1.EncryptionTypeValues.namedValues
193 krb5_asn1.EncryptionType.prettyPrint =\
194 Integer_NamedValues_prettyPrint
195 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
196 krb5_asn1.ChecksumTypeValues.namedValues
197 krb5_asn1.ChecksumType.prettyPrint =\
198 Integer_NamedValues_prettyPrint
199 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
200 krb5_asn1.KerbErrorDataTypeValues.namedValues
201 krb5_asn1.KerbErrorDataType.prettyPrint =\
202 Integer_NamedValues_prettyPrint
205 class Krb5EncryptionKey:
206 def __init__(self, key, kvno):
207 EncTypeChecksum = {
208 kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
209 kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
210 kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
212 self.key = key
213 self.etype = key.enctype
214 self.ctype = EncTypeChecksum[self.etype]
215 self.kvno = kvno
217 def encrypt(self, usage, plaintext):
218 ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
219 return ciphertext
221 def decrypt(self, usage, ciphertext):
222 plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
223 return plaintext
225 def make_checksum(self, usage, plaintext, ctype=None):
226 if ctype is None:
227 ctype = self.ctype
228 cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
229 return cksum
231 def export_obj(self):
232 EncryptionKey_obj = {
233 'keytype': self.etype,
234 'keyvalue': self.key.contents,
236 return EncryptionKey_obj
239 class KerberosCredentials(Credentials):
240 def __init__(self):
241 super(KerberosCredentials, self).__init__()
242 all_enc_types = 0
243 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
244 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
245 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
247 self.as_supported_enctypes = all_enc_types
248 self.tgs_supported_enctypes = all_enc_types
249 self.ap_supported_enctypes = all_enc_types
251 self.kvno = None
252 self.forced_keys = {}
254 self.forced_salt = None
256 def set_as_supported_enctypes(self, value):
257 self.as_supported_enctypes = int(value)
259 def set_tgs_supported_enctypes(self, value):
260 self.tgs_supported_enctypes = int(value)
262 def set_ap_supported_enctypes(self, value):
263 self.ap_supported_enctypes = int(value)
265 def _get_krb5_etypes(self, supported_enctypes):
266 etypes = ()
268 if supported_enctypes & security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96:
269 etypes += (kcrypto.Enctype.AES256,)
270 if supported_enctypes & security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96:
271 etypes += (kcrypto.Enctype.AES128,)
272 if supported_enctypes & security.KERB_ENCTYPE_RC4_HMAC_MD5:
273 etypes += (kcrypto.Enctype.RC4,)
275 return etypes
277 def get_as_krb5_etypes(self):
278 return self._get_krb5_etypes(self.as_supported_enctypes)
280 def get_tgs_krb5_etypes(self):
281 return self._get_krb5_etypes(self.tgs_supported_enctypes)
283 def get_ap_krb5_etypes(self):
284 return self._get_krb5_etypes(self.ap_supported_enctypes)
286 def set_kvno(self, kvno):
287 self.kvno = kvno
289 def get_kvno(self):
290 return self.kvno
292 def set_forced_key(self, etype, hexkey):
293 etype = int(etype)
294 contents = binascii.a2b_hex(hexkey)
295 key = kcrypto.Key(etype, contents)
296 self.forced_keys[etype] = Krb5EncryptionKey(key, self.kvno)
298 def get_forced_key(self, etype):
299 etype = int(etype)
300 return self.forced_keys.get(etype, None)
302 def set_forced_salt(self, salt):
303 self.forced_salt = bytes(salt)
305 def get_forced_salt(self):
306 return self.forced_salt
308 def get_salt(self):
309 if self.forced_salt is not None:
310 return self.forced_salt
312 if self.get_workstation():
313 salt_string = '%shost%s.%s' % (
314 self.get_realm().upper(),
315 self.get_username().lower().rsplit('$', 1)[0],
316 self.get_realm().lower())
317 else:
318 salt_string = self.get_realm().upper() + self.get_username()
320 return salt_string.encode('utf-8')
323 class KerberosTicketCreds:
324 def __init__(self, ticket, session_key,
325 crealm=None, cname=None,
326 srealm=None, sname=None,
327 decryption_key=None,
328 ticket_private=None,
329 encpart_private=None):
330 self.ticket = ticket
331 self.session_key = session_key
332 self.crealm = crealm
333 self.cname = cname
334 self.srealm = srealm
335 self.sname = sname
336 self.decryption_key = decryption_key
337 self.ticket_private = ticket_private
338 self.encpart_private = encpart_private
341 class RawKerberosTest(TestCaseInTempDir):
342 """A raw Kerberos Test case."""
344 etypes_to_test = (
345 {"value": -1111, "name": "dummy", },
346 {"value": kcrypto.Enctype.AES256, "name": "aes128", },
347 {"value": kcrypto.Enctype.AES128, "name": "aes256", },
348 {"value": kcrypto.Enctype.RC4, "name": "rc4", },
351 setup_etype_test_permutations_done = False
353 @classmethod
354 def setup_etype_test_permutations(cls):
355 if cls.setup_etype_test_permutations_done:
356 return
358 res = []
360 num_idxs = len(cls.etypes_to_test)
361 permutations = []
362 for num in range(1, num_idxs + 1):
363 chunk = list(itertools.permutations(range(num_idxs), num))
364 for e in chunk:
365 el = list(e)
366 permutations.append(el)
368 for p in permutations:
369 name = None
370 etypes = ()
371 for idx in p:
372 n = cls.etypes_to_test[idx]["name"]
373 if name is None:
374 name = n
375 else:
376 name += "_%s" % n
377 etypes += (cls.etypes_to_test[idx]["value"],)
379 r = {"name": name, "etypes": etypes, }
380 res.append(r)
382 cls.etype_test_permutations = res
383 cls.setup_etype_test_permutations_done = True
385 @classmethod
386 def etype_test_permutation_name_idx(cls):
387 cls.setup_etype_test_permutations()
388 res = []
389 idx = 0
390 for e in cls.etype_test_permutations:
391 r = (e['name'], idx)
392 idx += 1
393 res.append(r)
394 return res
396 def etype_test_permutation_by_idx(self, idx):
397 e = self.etype_test_permutations[idx]
398 return (e['name'], e['etypes'])
400 @classmethod
401 def setUpClass(cls):
402 super().setUpClass()
404 cls.host = samba.tests.env_get_var_value('SERVER')
406 # A dictionary containing credentials that have already been
407 # obtained.
408 cls.creds_dict = {}
410 def setUp(self):
411 super().setUp()
412 self.do_asn1_print = False
413 self.do_hexdump = False
415 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
416 allow_missing=True)
417 if strict_checking is None:
418 strict_checking = '1'
419 self.strict_checking = bool(int(strict_checking))
421 self.s = None
423 self.unspecified_kvno = object()
425 def tearDown(self):
426 self._disconnect("tearDown")
427 super().tearDown()
429 def _disconnect(self, reason):
430 if self.s is None:
431 return
432 self.s.close()
433 self.s = None
434 if self.do_hexdump:
435 sys.stderr.write("disconnect[%s]\n" % reason)
437 def _connect_tcp(self):
438 tcp_port = 88
439 try:
440 self.a = socket.getaddrinfo(self.host, tcp_port, socket.AF_UNSPEC,
441 socket.SOCK_STREAM, socket.SOL_TCP,
443 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
444 self.s.settimeout(10)
445 self.s.connect(self.a[0][4])
446 except socket.error:
447 self.s.close()
448 raise
449 except IOError:
450 self.s.close()
451 raise
453 def connect(self):
454 self.assertNotConnected()
455 self._connect_tcp()
456 if self.do_hexdump:
457 sys.stderr.write("connected[%s]\n" % self.host)
459 def env_get_var(self, varname, prefix,
460 fallback_default=True,
461 allow_missing=False):
462 val = None
463 if prefix is not None:
464 allow_missing_prefix = allow_missing or fallback_default
465 val = samba.tests.env_get_var_value(
466 '%s_%s' % (prefix, varname),
467 allow_missing=allow_missing_prefix)
468 else:
469 fallback_default = True
470 if val is None and fallback_default:
471 val = samba.tests.env_get_var_value(varname,
472 allow_missing=allow_missing)
473 return val
475 def _get_krb5_creds_from_env(self, prefix,
476 default_username=None,
477 allow_missing_password=False,
478 allow_missing_keys=True,
479 require_strongest_key=False):
480 c = KerberosCredentials()
481 c.guess()
483 domain = self.env_get_var('DOMAIN', prefix)
484 realm = self.env_get_var('REALM', prefix)
485 allow_missing_username = default_username is not None
486 username = self.env_get_var('USERNAME', prefix,
487 fallback_default=False,
488 allow_missing=allow_missing_username)
489 if username is None:
490 username = default_username
491 password = self.env_get_var('PASSWORD', prefix,
492 fallback_default=False,
493 allow_missing=allow_missing_password)
494 c.set_domain(domain)
495 c.set_realm(realm)
496 c.set_username(username)
497 if password is not None:
498 c.set_password(password)
499 as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
500 prefix, allow_missing=True)
501 if as_supported_enctypes is not None:
502 c.set_as_supported_enctypes(as_supported_enctypes)
503 tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
504 prefix, allow_missing=True)
505 if tgs_supported_enctypes is not None:
506 c.set_tgs_supported_enctypes(tgs_supported_enctypes)
507 ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
508 prefix, allow_missing=True)
509 if ap_supported_enctypes is not None:
510 c.set_ap_supported_enctypes(ap_supported_enctypes)
512 if require_strongest_key:
513 kvno_allow_missing = False
514 if password is None:
515 aes256_allow_missing = False
516 else:
517 aes256_allow_missing = True
518 else:
519 kvno_allow_missing = allow_missing_keys
520 aes256_allow_missing = allow_missing_keys
521 kvno = self.env_get_var('KVNO', prefix,
522 fallback_default=False,
523 allow_missing=kvno_allow_missing)
524 if kvno is not None:
525 c.set_kvno(kvno)
526 aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
527 fallback_default=False,
528 allow_missing=aes256_allow_missing)
529 if aes256_key is not None:
530 c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
531 aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
532 fallback_default=False,
533 allow_missing=True)
534 if aes128_key is not None:
535 c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
536 rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
537 fallback_default=False, allow_missing=True)
538 if rc4_key is not None:
539 c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
541 if not allow_missing_keys:
542 self.assertTrue(c.forced_keys,
543 'Please supply %s encryption keys '
544 'in environment' % prefix)
546 return c
548 def _get_krb5_creds(self,
549 prefix,
550 default_username=None,
551 allow_missing_password=False,
552 allow_missing_keys=True,
553 require_strongest_key=False,
554 fallback_creds_fn=None):
555 if prefix in self.creds_dict:
556 return self.creds_dict[prefix]
558 # We don't have the credentials already
559 creds = None
560 env_err = None
561 try:
562 # Try to obtain them from the environment
563 creds = self._get_krb5_creds_from_env(
564 prefix,
565 default_username=default_username,
566 allow_missing_password=allow_missing_password,
567 allow_missing_keys=allow_missing_keys,
568 require_strongest_key=require_strongest_key)
569 except Exception as err:
570 # An error occurred, so save it for later
571 env_err = err
572 else:
573 self.assertIsNotNone(creds)
574 # Save the obtained credentials
575 self.creds_dict[prefix] = creds
576 return creds
578 if fallback_creds_fn is not None:
579 try:
580 # Try to use the fallback method
581 creds = fallback_creds_fn()
582 except Exception as err:
583 print("ERROR FROM ENV: %r" % (env_err))
584 print("FALLBACK-FN: %s" % (fallback_creds_fn))
585 print("FALLBACK-ERROR: %r" % (err))
586 else:
587 self.assertIsNotNone(creds)
588 # Save the obtained credentials
589 self.creds_dict[prefix] = creds
590 return creds
592 # Both methods failed, so raise the exception from the
593 # environment method
594 raise env_err
596 def get_user_creds(self,
597 allow_missing_password=False,
598 allow_missing_keys=True):
599 c = self._get_krb5_creds(prefix=None,
600 allow_missing_password=allow_missing_password,
601 allow_missing_keys=allow_missing_keys)
602 return c
604 def get_service_creds(self,
605 allow_missing_password=False,
606 allow_missing_keys=True):
607 c = self._get_krb5_creds(prefix='SERVICE',
608 allow_missing_password=allow_missing_password,
609 allow_missing_keys=allow_missing_keys)
610 return c
612 def get_client_creds(self,
613 allow_missing_password=False,
614 allow_missing_keys=True):
615 c = self._get_krb5_creds(prefix='CLIENT',
616 allow_missing_password=allow_missing_password,
617 allow_missing_keys=allow_missing_keys)
618 return c
620 def get_server_creds(self,
621 allow_missing_password=False,
622 allow_missing_keys=True):
623 c = self._get_krb5_creds(prefix='SERVER',
624 allow_missing_password=allow_missing_password,
625 allow_missing_keys=allow_missing_keys)
626 return c
628 def get_admin_creds(self,
629 allow_missing_password=False,
630 allow_missing_keys=True):
631 c = self._get_krb5_creds(prefix='ADMIN',
632 allow_missing_password=allow_missing_password,
633 allow_missing_keys=allow_missing_keys)
634 c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
635 return c
637 def get_krbtgt_creds(self,
638 require_keys=True,
639 require_strongest_key=False):
640 if require_strongest_key:
641 self.assertTrue(require_keys)
642 c = self._get_krb5_creds(prefix='KRBTGT',
643 default_username='krbtgt',
644 allow_missing_password=True,
645 allow_missing_keys=not require_keys,
646 require_strongest_key=require_strongest_key)
647 return c
649 def get_anon_creds(self):
650 c = Credentials()
651 c.set_anonymous()
652 return c
654 def asn1_dump(self, name, obj, asn1_print=None):
655 if asn1_print is None:
656 asn1_print = self.do_asn1_print
657 if asn1_print:
658 if name is not None:
659 sys.stderr.write("%s:\n%s" % (name, obj))
660 else:
661 sys.stderr.write("%s" % (obj))
663 def hex_dump(self, name, blob, hexdump=None):
664 if hexdump is None:
665 hexdump = self.do_hexdump
666 if hexdump:
667 sys.stderr.write(
668 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
670 def der_decode(
671 self,
672 blob,
673 asn1Spec=None,
674 native_encode=True,
675 asn1_print=None,
676 hexdump=None):
677 if asn1Spec is not None:
678 class_name = type(asn1Spec).__name__.split(':')[0]
679 else:
680 class_name = "<None-asn1Spec>"
681 self.hex_dump(class_name, blob, hexdump=hexdump)
682 obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
683 self.asn1_dump(None, obj, asn1_print=asn1_print)
684 if native_encode:
685 obj = pyasn1_native_encode(obj)
686 return obj
688 def der_encode(
689 self,
690 obj,
691 asn1Spec=None,
692 native_decode=True,
693 asn1_print=None,
694 hexdump=None):
695 if native_decode:
696 obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
697 class_name = type(obj).__name__.split(':')[0]
698 if class_name is not None:
699 self.asn1_dump(None, obj, asn1_print=asn1_print)
700 blob = pyasn1_der_encode(obj)
701 if class_name is not None:
702 self.hex_dump(class_name, blob, hexdump=hexdump)
703 return blob
705 def send_pdu(self, req, asn1_print=None, hexdump=None):
706 try:
707 k5_pdu = self.der_encode(
708 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
709 header = struct.pack('>I', len(k5_pdu))
710 req_pdu = header
711 req_pdu += k5_pdu
712 self.hex_dump("send_pdu", header, hexdump=hexdump)
713 self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
714 while True:
715 sent = self.s.send(req_pdu, 0)
716 if sent == len(req_pdu):
717 break
718 req_pdu = req_pdu[sent:]
719 except socket.error as e:
720 self._disconnect("send_pdu: %s" % e)
721 raise
722 except IOError as e:
723 self._disconnect("send_pdu: %s" % e)
724 raise
726 def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
727 rep_pdu = None
728 try:
729 if timeout is not None:
730 self.s.settimeout(timeout)
731 rep_pdu = self.s.recv(num_recv, 0)
732 self.s.settimeout(10)
733 if len(rep_pdu) == 0:
734 self._disconnect("recv_raw: EOF")
735 return None
736 self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
737 except socket.timeout:
738 self.s.settimeout(10)
739 sys.stderr.write("recv_raw: TIMEOUT\n")
740 except socket.error as e:
741 self._disconnect("recv_raw: %s" % e)
742 raise
743 except IOError as e:
744 self._disconnect("recv_raw: %s" % e)
745 raise
746 return rep_pdu
748 def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
749 rep_pdu = None
750 rep = None
751 raw_pdu = self.recv_raw(
752 num_recv=4, hexdump=hexdump, timeout=timeout)
753 if raw_pdu is None:
754 return (None, None)
755 header = struct.unpack(">I", raw_pdu[0:4])
756 k5_len = header[0]
757 if k5_len == 0:
758 return (None, "")
759 missing = k5_len
760 rep_pdu = b''
761 while missing > 0:
762 raw_pdu = self.recv_raw(
763 num_recv=missing, hexdump=hexdump, timeout=timeout)
764 self.assertGreaterEqual(len(raw_pdu), 1)
765 rep_pdu += raw_pdu
766 missing = k5_len - len(rep_pdu)
767 k5_raw = self.der_decode(
768 rep_pdu,
769 asn1Spec=None,
770 native_encode=False,
771 asn1_print=False,
772 hexdump=False)
773 pvno = k5_raw['field-0']
774 self.assertEqual(pvno, 5)
775 msg_type = k5_raw['field-1']
776 self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
777 if msg_type == KRB_AS_REP:
778 asn1Spec = krb5_asn1.AS_REP()
779 elif msg_type == KRB_TGS_REP:
780 asn1Spec = krb5_asn1.TGS_REP()
781 elif msg_type == KRB_ERROR:
782 asn1Spec = krb5_asn1.KRB_ERROR()
783 rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
784 asn1_print=asn1_print, hexdump=False)
785 return (rep, rep_pdu)
787 def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
788 (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
789 hexdump=hexdump,
790 timeout=timeout)
791 return rep
793 def assertIsConnected(self):
794 self.assertIsNotNone(self.s, msg="Not connected")
796 def assertNotConnected(self):
797 self.assertIsNone(self.s, msg="Is connected")
799 def send_recv_transaction(
800 self,
801 req,
802 asn1_print=None,
803 hexdump=None,
804 timeout=None):
805 self.connect()
806 try:
807 self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
808 rep = self.recv_pdu(
809 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
810 except Exception:
811 self._disconnect("transaction failed")
812 raise
813 self._disconnect("transaction done")
814 return rep
816 def assertNoValue(self, value):
817 self.assertTrue(value.isNoValue)
819 def assertHasValue(self, value):
820 self.assertIsNotNone(value)
822 def getElementValue(self, obj, elem):
823 return obj.get(elem, None)
825 def assertElementMissing(self, obj, elem):
826 v = self.getElementValue(obj, elem)
827 self.assertIsNone(v)
829 def assertElementPresent(self, obj, elem):
830 v = self.getElementValue(obj, elem)
831 self.assertIsNotNone(v)
832 if self.strict_checking:
833 if isinstance(v, collections.abc.Container):
834 self.assertNotEqual(0, len(v))
836 def assertElementEqual(self, obj, elem, value):
837 v = self.getElementValue(obj, elem)
838 self.assertIsNotNone(v)
839 self.assertEqual(v, value)
841 def assertElementEqualUTF8(self, obj, elem, value):
842 v = self.getElementValue(obj, elem)
843 self.assertIsNotNone(v)
844 self.assertEqual(v, bytes(value, 'utf8'))
846 def assertPrincipalEqual(self, princ1, princ2):
847 self.assertEqual(princ1['name-type'], princ2['name-type'])
848 self.assertEqual(
849 len(princ1['name-string']),
850 len(princ2['name-string']),
851 msg="princ1=%s != princ2=%s" % (princ1, princ2))
852 for idx in range(len(princ1['name-string'])):
853 self.assertEqual(
854 princ1['name-string'][idx],
855 princ2['name-string'][idx],
856 msg="princ1=%s != princ2=%s" % (princ1, princ2))
858 def assertElementEqualPrincipal(self, obj, elem, value):
859 v = self.getElementValue(obj, elem)
860 self.assertIsNotNone(v)
861 v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
862 self.assertPrincipalEqual(v, value)
864 def assertElementKVNO(self, obj, elem, value):
865 v = self.getElementValue(obj, elem)
866 if value == "autodetect":
867 value = v
868 if value is not None:
869 self.assertIsNotNone(v)
870 # The value on the wire should never be 0
871 self.assertNotEqual(v, 0)
872 # unspecified_kvno means we don't know the kvno,
873 # but want to enforce its presence
874 if value is not self.unspecified_kvno:
875 value = int(value)
876 self.assertNotEqual(value, 0)
877 self.assertEqual(v, value)
878 else:
879 self.assertIsNone(v)
881 def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
882 if epoch is None:
883 epoch = time.time()
884 if offset is not None:
885 epoch = epoch + int(offset)
886 dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
887 return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
889 def get_KerberosTime(self, epoch=None, offset=None):
890 (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
891 return s
893 def get_EpochFromKerberosTime(self, kerberos_time):
894 if isinstance(kerberos_time, bytes):
895 kerberos_time = kerberos_time.decode()
897 epoch = datetime.datetime.strptime(kerberos_time,
898 '%Y%m%d%H%M%SZ')
899 epoch = epoch.replace(tzinfo=datetime.timezone.utc)
900 epoch = int(epoch.timestamp())
902 return epoch
904 def get_Nonce(self):
905 nonce_min = 0x7f000000
906 nonce_max = 0x7fffffff
907 v = random.randint(nonce_min, nonce_max)
908 return v
910 def get_pa_dict(self, pa_data):
911 pa_dict = {}
913 if pa_data is not None:
914 for pa in pa_data:
915 pa_type = pa['padata-type']
916 if pa_type in pa_dict:
917 raise RuntimeError(f'Duplicate type {pa_type}')
918 pa_dict[pa_type] = pa['padata-value']
920 return pa_dict
922 def SessionKey_create(self, etype, contents, kvno=None):
923 key = kcrypto.Key(etype, contents)
924 return Krb5EncryptionKey(key, kvno)
926 def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None):
927 self.assertIsNotNone(pwd)
928 self.assertIsNotNone(salt)
929 key = kcrypto.string_to_key(etype, pwd, salt)
930 return Krb5EncryptionKey(key, kvno)
932 def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
933 e = etype_info2['etype']
935 salt = etype_info2.get('salt', None)
937 if e == kcrypto.Enctype.RC4:
938 nthash = creds.get_nt_hash()
939 return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
941 password = creds.get_password()
942 return self.PasswordKey_create(
943 etype=e, pwd=password, salt=salt, kvno=kvno)
945 def TicketDecryptionKey_from_creds(self, creds, etype=None):
947 if etype is None:
948 etypes = creds.get_tgs_krb5_etypes()
949 etype = etypes[0]
951 forced_key = creds.get_forced_key(etype)
952 if forced_key is not None:
953 return forced_key
955 kvno = creds.get_kvno()
957 fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
958 "nor a password specified, " % (
959 creds.get_username(), etype, kvno))
961 if etype == kcrypto.Enctype.RC4:
962 nthash = creds.get_nt_hash()
963 self.assertIsNotNone(nthash, msg=fail_msg)
964 return self.SessionKey_create(etype=etype,
965 contents=nthash,
966 kvno=kvno)
968 password = creds.get_password()
969 self.assertIsNotNone(password, msg=fail_msg)
970 salt = creds.get_salt()
971 return self.PasswordKey_create(etype=etype,
972 pwd=password,
973 salt=salt,
974 kvno=kvno)
976 def RandomKey(self, etype):
977 e = kcrypto._get_enctype_profile(etype)
978 contents = samba.generate_random_bytes(e.keysize)
979 return self.SessionKey_create(etype=etype, contents=contents)
981 def EncryptionKey_import(self, EncryptionKey_obj):
982 return self.SessionKey_create(EncryptionKey_obj['keytype'],
983 EncryptionKey_obj['keyvalue'])
985 def EncryptedData_create(self, key, usage, plaintext):
986 # EncryptedData ::= SEQUENCE {
987 # etype [0] Int32 -- EncryptionType --,
988 # kvno [1] UInt32 OPTIONAL,
989 # cipher [2] OCTET STRING -- ciphertext
991 ciphertext = key.encrypt(usage, plaintext)
992 EncryptedData_obj = {
993 'etype': key.etype,
994 'cipher': ciphertext
996 if key.kvno is not None:
997 EncryptedData_obj['kvno'] = key.kvno
998 return EncryptedData_obj
1000 def Checksum_create(self, key, usage, plaintext, ctype=None):
1001 # Checksum ::= SEQUENCE {
1002 # cksumtype [0] Int32,
1003 # checksum [1] OCTET STRING
1005 if ctype is None:
1006 ctype = key.ctype
1007 checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1008 Checksum_obj = {
1009 'cksumtype': ctype,
1010 'checksum': checksum,
1012 return Checksum_obj
1014 @classmethod
1015 def PrincipalName_create(cls, name_type, names):
1016 # PrincipalName ::= SEQUENCE {
1017 # name-type [0] Int32,
1018 # name-string [1] SEQUENCE OF KerberosString
1020 PrincipalName_obj = {
1021 'name-type': name_type,
1022 'name-string': names,
1024 return PrincipalName_obj
1026 def AuthorizationData_create(self, ad_type, ad_data):
1027 # AuthorizationData ::= SEQUENCE {
1028 # ad-type [0] Int32,
1029 # ad-data [1] OCTET STRING
1031 AUTH_DATA_obj = {
1032 'ad-type': ad_type,
1033 'ad-data': ad_data
1035 return AUTH_DATA_obj
1037 def PA_DATA_create(self, padata_type, padata_value):
1038 # PA-DATA ::= SEQUENCE {
1039 # -- NOTE: first tag is [1], not [0]
1040 # padata-type [1] Int32,
1041 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1043 PA_DATA_obj = {
1044 'padata-type': padata_type,
1045 'padata-value': padata_value,
1047 return PA_DATA_obj
1049 def PA_ENC_TS_ENC_create(self, ts, usec):
1050 # PA-ENC-TS-ENC ::= SEQUENCE {
1051 # patimestamp[0] KerberosTime, -- client's time
1052 # pausec[1] krb5int32 OPTIONAL
1054 PA_ENC_TS_ENC_obj = {
1055 'patimestamp': ts,
1056 'pausec': usec,
1058 return PA_ENC_TS_ENC_obj
1060 def PA_PAC_OPTIONS_create(self, options):
1061 # PA-PAC-OPTIONS ::= SEQUENCE {
1062 # options [0] PACOptionFlags
1064 PA_PAC_OPTIONS_obj = {
1065 'options': options
1067 return PA_PAC_OPTIONS_obj
1069 def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1070 # KrbFastArmor ::= SEQUENCE {
1071 # armor-type [0] Int32,
1072 # armor-value [1] OCTET STRING,
1073 # ...
1075 KRB_FAST_ARMOR_obj = {
1076 'armor-type': armor_type,
1077 'armor-value': armor_value
1079 return KRB_FAST_ARMOR_obj
1081 def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1082 # KrbFastReq ::= SEQUENCE {
1083 # fast-options [0] FastOptions,
1084 # padata [1] SEQUENCE OF PA-DATA,
1085 # req-body [2] KDC-REQ-BODY,
1086 # ...
1088 KRB_FAST_REQ_obj = {
1089 'fast-options': fast_options,
1090 'padata': padata,
1091 'req-body': req_body
1093 return KRB_FAST_REQ_obj
1095 def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1096 # KrbFastArmoredReq ::= SEQUENCE {
1097 # armor [0] KrbFastArmor OPTIONAL,
1098 # req-checksum [1] Checksum,
1099 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1101 KRB_FAST_ARMORED_REQ_obj = {
1102 'req-checksum': req_checksum,
1103 'enc-fast-req': enc_fast_req
1105 if armor is not None:
1106 KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1107 return KRB_FAST_ARMORED_REQ_obj
1109 def PA_FX_FAST_REQUEST_create(self, armored_data):
1110 # PA-FX-FAST-REQUEST ::= CHOICE {
1111 # armored-data [0] KrbFastArmoredReq,
1112 # ...
1114 PA_FX_FAST_REQUEST_obj = {
1115 'armored-data': armored_data
1117 return PA_FX_FAST_REQUEST_obj
1119 def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1120 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1121 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1122 # -- include PAC.
1123 # --If FALSE, and PAC present,
1124 # -- remove PAC.
1126 KERB_PA_PAC_REQUEST_obj = {
1127 'include-pac': include_pac,
1129 if not pa_data_create:
1130 return KERB_PA_PAC_REQUEST_obj
1131 pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1132 asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1133 pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1134 return pa_data
1136 def KDC_REQ_BODY_create(self,
1137 kdc_options,
1138 cname,
1139 realm,
1140 sname,
1141 from_time,
1142 till_time,
1143 renew_time,
1144 nonce,
1145 etypes,
1146 addresses,
1147 additional_tickets,
1148 EncAuthorizationData,
1149 EncAuthorizationData_key,
1150 EncAuthorizationData_usage,
1151 asn1_print=None,
1152 hexdump=None):
1153 # KDC-REQ-BODY ::= SEQUENCE {
1154 # kdc-options [0] KDCOptions,
1155 # cname [1] PrincipalName OPTIONAL
1156 # -- Used only in AS-REQ --,
1157 # realm [2] Realm
1158 # -- Server's realm
1159 # -- Also client's in AS-REQ --,
1160 # sname [3] PrincipalName OPTIONAL,
1161 # from [4] KerberosTime OPTIONAL,
1162 # till [5] KerberosTime,
1163 # rtime [6] KerberosTime OPTIONAL,
1164 # nonce [7] UInt32,
1165 # etype [8] SEQUENCE OF Int32
1166 # -- EncryptionType
1167 # -- in preference order --,
1168 # addresses [9] HostAddresses OPTIONAL,
1169 # enc-authorization-data [10] EncryptedData OPTIONAL
1170 # -- AuthorizationData --,
1171 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1172 # -- NOTE: not empty
1174 if EncAuthorizationData is not None:
1175 enc_ad_plain = self.der_encode(
1176 EncAuthorizationData,
1177 asn1Spec=krb5_asn1.AuthorizationData(),
1178 asn1_print=asn1_print,
1179 hexdump=hexdump)
1180 enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1181 EncAuthorizationData_usage,
1182 enc_ad_plain)
1183 else:
1184 enc_ad = None
1185 KDC_REQ_BODY_obj = {
1186 'kdc-options': kdc_options,
1187 'realm': realm,
1188 'till': till_time,
1189 'nonce': nonce,
1190 'etype': etypes,
1192 if cname is not None:
1193 KDC_REQ_BODY_obj['cname'] = cname
1194 if sname is not None:
1195 KDC_REQ_BODY_obj['sname'] = sname
1196 if from_time is not None:
1197 KDC_REQ_BODY_obj['from'] = from_time
1198 if renew_time is not None:
1199 KDC_REQ_BODY_obj['rtime'] = renew_time
1200 if addresses is not None:
1201 KDC_REQ_BODY_obj['addresses'] = addresses
1202 if enc_ad is not None:
1203 KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1204 if additional_tickets is not None:
1205 KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1206 return KDC_REQ_BODY_obj
1208 def KDC_REQ_create(self,
1209 msg_type,
1210 padata,
1211 req_body,
1212 asn1Spec=None,
1213 asn1_print=None,
1214 hexdump=None):
1215 # KDC-REQ ::= SEQUENCE {
1216 # -- NOTE: first tag is [1], not [0]
1217 # pvno [1] INTEGER (5) ,
1218 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1219 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1220 # -- NOTE: not empty --,
1221 # req-body [4] KDC-REQ-BODY
1224 KDC_REQ_obj = {
1225 'pvno': 5,
1226 'msg-type': msg_type,
1227 'req-body': req_body,
1229 if padata is not None:
1230 KDC_REQ_obj['padata'] = padata
1231 if asn1Spec is not None:
1232 KDC_REQ_decoded = pyasn1_native_decode(
1233 KDC_REQ_obj, asn1Spec=asn1Spec)
1234 else:
1235 KDC_REQ_decoded = None
1236 return KDC_REQ_obj, KDC_REQ_decoded
1238 def AS_REQ_create(self,
1239 padata, # optional
1240 kdc_options, # required
1241 cname, # optional
1242 realm, # required
1243 sname, # optional
1244 from_time, # optional
1245 till_time, # required
1246 renew_time, # optional
1247 nonce, # required
1248 etypes, # required
1249 addresses, # optional
1250 additional_tickets,
1251 native_decoded_only=True,
1252 asn1_print=None,
1253 hexdump=None):
1254 # KDC-REQ ::= SEQUENCE {
1255 # -- NOTE: first tag is [1], not [0]
1256 # pvno [1] INTEGER (5) ,
1257 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1258 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1259 # -- NOTE: not empty --,
1260 # req-body [4] KDC-REQ-BODY
1263 # KDC-REQ-BODY ::= SEQUENCE {
1264 # kdc-options [0] KDCOptions,
1265 # cname [1] PrincipalName OPTIONAL
1266 # -- Used only in AS-REQ --,
1267 # realm [2] Realm
1268 # -- Server's realm
1269 # -- Also client's in AS-REQ --,
1270 # sname [3] PrincipalName OPTIONAL,
1271 # from [4] KerberosTime OPTIONAL,
1272 # till [5] KerberosTime,
1273 # rtime [6] KerberosTime OPTIONAL,
1274 # nonce [7] UInt32,
1275 # etype [8] SEQUENCE OF Int32
1276 # -- EncryptionType
1277 # -- in preference order --,
1278 # addresses [9] HostAddresses OPTIONAL,
1279 # enc-authorization-data [10] EncryptedData OPTIONAL
1280 # -- AuthorizationData --,
1281 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1282 # -- NOTE: not empty
1284 KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1285 kdc_options,
1286 cname,
1287 realm,
1288 sname,
1289 from_time,
1290 till_time,
1291 renew_time,
1292 nonce,
1293 etypes,
1294 addresses,
1295 additional_tickets,
1296 EncAuthorizationData=None,
1297 EncAuthorizationData_key=None,
1298 EncAuthorizationData_usage=None,
1299 asn1_print=asn1_print,
1300 hexdump=hexdump)
1301 obj, decoded = self.KDC_REQ_create(
1302 msg_type=KRB_AS_REQ,
1303 padata=padata,
1304 req_body=KDC_REQ_BODY_obj,
1305 asn1Spec=krb5_asn1.AS_REQ(),
1306 asn1_print=asn1_print,
1307 hexdump=hexdump)
1308 if native_decoded_only:
1309 return decoded
1310 return decoded, obj
1312 def AP_REQ_create(self, ap_options, ticket, authenticator):
1313 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1314 # pvno [0] INTEGER (5),
1315 # msg-type [1] INTEGER (14),
1316 # ap-options [2] APOptions,
1317 # ticket [3] Ticket,
1318 # authenticator [4] EncryptedData -- Authenticator
1320 AP_REQ_obj = {
1321 'pvno': 5,
1322 'msg-type': KRB_AP_REQ,
1323 'ap-options': ap_options,
1324 'ticket': ticket,
1325 'authenticator': authenticator,
1327 return AP_REQ_obj
1329 def Authenticator_create(
1330 self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1331 authorization_data):
1332 # -- Unencrypted authenticator
1333 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1334 # authenticator-vno [0] INTEGER (5),
1335 # crealm [1] Realm,
1336 # cname [2] PrincipalName,
1337 # cksum [3] Checksum OPTIONAL,
1338 # cusec [4] Microseconds,
1339 # ctime [5] KerberosTime,
1340 # subkey [6] EncryptionKey OPTIONAL,
1341 # seq-number [7] UInt32 OPTIONAL,
1342 # authorization-data [8] AuthorizationData OPTIONAL
1344 Authenticator_obj = {
1345 'authenticator-vno': 5,
1346 'crealm': crealm,
1347 'cname': cname,
1348 'cusec': cusec,
1349 'ctime': ctime,
1351 if cksum is not None:
1352 Authenticator_obj['cksum'] = cksum
1353 if subkey is not None:
1354 Authenticator_obj['subkey'] = subkey
1355 if seq_number is not None:
1356 Authenticator_obj['seq-number'] = seq_number
1357 if authorization_data is not None:
1358 Authenticator_obj['authorization-data'] = authorization_data
1359 return Authenticator_obj
1361 def TGS_REQ_create(self,
1362 padata, # optional
1363 cusec,
1364 ctime,
1365 ticket,
1366 kdc_options, # required
1367 cname, # optional
1368 realm, # required
1369 sname, # optional
1370 from_time, # optional
1371 till_time, # required
1372 renew_time, # optional
1373 nonce, # required
1374 etypes, # required
1375 addresses, # optional
1376 EncAuthorizationData,
1377 EncAuthorizationData_key,
1378 additional_tickets,
1379 ticket_session_key,
1380 authenticator_subkey=None,
1381 body_checksum_type=None,
1382 native_decoded_only=True,
1383 asn1_print=None,
1384 hexdump=None):
1385 # KDC-REQ ::= SEQUENCE {
1386 # -- NOTE: first tag is [1], not [0]
1387 # pvno [1] INTEGER (5) ,
1388 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1389 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1390 # -- NOTE: not empty --,
1391 # req-body [4] KDC-REQ-BODY
1394 # KDC-REQ-BODY ::= SEQUENCE {
1395 # kdc-options [0] KDCOptions,
1396 # cname [1] PrincipalName OPTIONAL
1397 # -- Used only in AS-REQ --,
1398 # realm [2] Realm
1399 # -- Server's realm
1400 # -- Also client's in AS-REQ --,
1401 # sname [3] PrincipalName OPTIONAL,
1402 # from [4] KerberosTime OPTIONAL,
1403 # till [5] KerberosTime,
1404 # rtime [6] KerberosTime OPTIONAL,
1405 # nonce [7] UInt32,
1406 # etype [8] SEQUENCE OF Int32
1407 # -- EncryptionType
1408 # -- in preference order --,
1409 # addresses [9] HostAddresses OPTIONAL,
1410 # enc-authorization-data [10] EncryptedData OPTIONAL
1411 # -- AuthorizationData --,
1412 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1413 # -- NOTE: not empty
1416 if authenticator_subkey is not None:
1417 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1418 else:
1419 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1421 req_body = self.KDC_REQ_BODY_create(
1422 kdc_options=kdc_options,
1423 cname=None,
1424 realm=realm,
1425 sname=sname,
1426 from_time=from_time,
1427 till_time=till_time,
1428 renew_time=renew_time,
1429 nonce=nonce,
1430 etypes=etypes,
1431 addresses=addresses,
1432 additional_tickets=additional_tickets,
1433 EncAuthorizationData=EncAuthorizationData,
1434 EncAuthorizationData_key=EncAuthorizationData_key,
1435 EncAuthorizationData_usage=EncAuthorizationData_usage)
1436 req_body_blob = self.der_encode(req_body,
1437 asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1438 asn1_print=asn1_print, hexdump=hexdump)
1440 req_body_checksum = self.Checksum_create(ticket_session_key,
1441 KU_TGS_REQ_AUTH_CKSUM,
1442 req_body_blob,
1443 ctype=body_checksum_type)
1445 subkey_obj = None
1446 if authenticator_subkey is not None:
1447 subkey_obj = authenticator_subkey.export_obj()
1448 seq_number = random.randint(0, 0xfffffffe)
1449 authenticator = self.Authenticator_create(
1450 crealm=realm,
1451 cname=cname,
1452 cksum=req_body_checksum,
1453 cusec=cusec,
1454 ctime=ctime,
1455 subkey=subkey_obj,
1456 seq_number=seq_number,
1457 authorization_data=None)
1458 authenticator = self.der_encode(
1459 authenticator,
1460 asn1Spec=krb5_asn1.Authenticator(),
1461 asn1_print=asn1_print,
1462 hexdump=hexdump)
1464 authenticator = self.EncryptedData_create(
1465 ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1467 ap_options = krb5_asn1.APOptions('0')
1468 ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1469 ticket=ticket,
1470 authenticator=authenticator)
1471 ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1472 asn1_print=asn1_print, hexdump=hexdump)
1473 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1474 if padata is not None:
1475 padata.append(pa_tgs_req)
1476 else:
1477 padata = [pa_tgs_req]
1479 obj, decoded = self.KDC_REQ_create(
1480 msg_type=KRB_TGS_REQ,
1481 padata=padata,
1482 req_body=req_body,
1483 asn1Spec=krb5_asn1.TGS_REQ(),
1484 asn1_print=asn1_print,
1485 hexdump=hexdump)
1486 if native_decoded_only:
1487 return decoded
1488 return decoded, obj
1490 def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1491 # PA-S4U2Self ::= SEQUENCE {
1492 # name [0] PrincipalName,
1493 # realm [1] Realm,
1494 # cksum [2] Checksum,
1495 # auth [3] GeneralString
1497 cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1498 for n in name['name-string']:
1499 cksum_data += n.encode()
1500 cksum_data += realm.encode()
1501 cksum_data += "Kerberos".encode()
1502 cksum = self.Checksum_create(tgt_session_key,
1503 KU_NON_KERB_CKSUM_SALT,
1504 cksum_data,
1505 ctype)
1507 PA_S4U2Self_obj = {
1508 'name': name,
1509 'realm': realm,
1510 'cksum': cksum,
1511 'auth': "Kerberos",
1513 pa_s4u2self = self.der_encode(
1514 PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
1515 return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
1517 def _generic_kdc_exchange(self,
1518 kdc_exchange_dict, # required
1519 cname=None, # optional
1520 realm=None, # required
1521 sname=None, # optional
1522 from_time=None, # optional
1523 till_time=None, # required
1524 renew_time=None, # optional
1525 etypes=None, # required
1526 addresses=None, # optional
1527 additional_tickets=None, # optional
1528 EncAuthorizationData=None, # optional
1529 EncAuthorizationData_key=None, # optional
1530 EncAuthorizationData_usage=None): # optional
1532 check_error_fn = kdc_exchange_dict['check_error_fn']
1533 check_rep_fn = kdc_exchange_dict['check_rep_fn']
1534 generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
1535 generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
1536 generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
1537 generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
1538 callback_dict = kdc_exchange_dict['callback_dict']
1539 req_msg_type = kdc_exchange_dict['req_msg_type']
1540 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
1541 rep_msg_type = kdc_exchange_dict['rep_msg_type']
1543 expected_error_mode = kdc_exchange_dict['expected_error_mode']
1544 kdc_options = kdc_exchange_dict['kdc_options']
1546 # Parameters specific to the outer request body
1547 outer_req = kdc_exchange_dict['outer_req']
1549 if till_time is None:
1550 till_time = self.get_KerberosTime(offset=36000)
1552 if 'nonce' in kdc_exchange_dict:
1553 nonce = kdc_exchange_dict['nonce']
1554 else:
1555 nonce = self.get_Nonce()
1556 kdc_exchange_dict['nonce'] = nonce
1558 req_body = self.KDC_REQ_BODY_create(
1559 kdc_options=kdc_options,
1560 cname=cname,
1561 realm=realm,
1562 sname=sname,
1563 from_time=from_time,
1564 till_time=till_time,
1565 renew_time=renew_time,
1566 nonce=nonce,
1567 etypes=etypes,
1568 addresses=addresses,
1569 additional_tickets=additional_tickets,
1570 EncAuthorizationData=EncAuthorizationData,
1571 EncAuthorizationData_key=EncAuthorizationData_key,
1572 EncAuthorizationData_usage=EncAuthorizationData_usage)
1574 inner_req_body = dict(req_body)
1575 if outer_req is not None:
1576 for key, value in outer_req.items():
1577 if value is not None:
1578 req_body[key] = value
1579 else:
1580 del req_body[key]
1582 if req_msg_type == KRB_AS_REQ:
1583 tgs_req = None
1584 tgs_req_padata = None
1585 else:
1586 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1588 tgs_req = self.generate_ap_req(kdc_exchange_dict,
1589 callback_dict,
1590 req_body,
1591 armor=False)
1592 tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
1594 if generate_fast_padata_fn is not None:
1595 self.assertIsNotNone(generate_fast_fn)
1596 # This can alter req_body...
1597 fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
1598 callback_dict,
1599 req_body)
1600 else:
1601 fast_padata = []
1603 if generate_fast_armor_fn is not None:
1604 self.assertIsNotNone(generate_fast_fn)
1605 fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
1606 callback_dict,
1607 req_body,
1608 armor=True)
1610 fast_armor_type = kdc_exchange_dict['fast_armor_type']
1611 fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
1612 fast_ap_req)
1613 else:
1614 fast_armor = None
1616 if generate_padata_fn is not None:
1617 # This can alter req_body...
1618 outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
1619 callback_dict,
1620 req_body)
1621 self.assertIsNotNone(outer_padata)
1622 self.assertNotIn(PADATA_KDC_REQ,
1623 [pa['padata-type'] for pa in outer_padata],
1624 'Don\'t create TGS-REQ manually')
1625 else:
1626 outer_padata = None
1628 if generate_fast_fn is not None:
1629 armor_key = kdc_exchange_dict['armor_key']
1630 self.assertIsNotNone(armor_key)
1632 if req_msg_type == KRB_AS_REQ:
1633 checksum_blob = self.der_encode(
1634 req_body,
1635 asn1Spec=krb5_asn1.KDC_REQ_BODY())
1636 else:
1637 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1638 checksum_blob = tgs_req
1640 checksum = self.Checksum_create(armor_key,
1641 KU_FAST_REQ_CHKSUM,
1642 checksum_blob)
1644 fast = generate_fast_fn(kdc_exchange_dict,
1645 callback_dict,
1646 inner_req_body,
1647 fast_padata,
1648 fast_armor,
1649 checksum)
1650 else:
1651 fast = None
1653 padata = []
1655 if tgs_req_padata is not None:
1656 padata.append(tgs_req_padata)
1658 if fast is not None:
1659 padata.append(fast)
1661 if outer_padata is not None:
1662 padata += outer_padata
1664 if not padata:
1665 padata = None
1667 kdc_exchange_dict['req_padata'] = padata
1668 kdc_exchange_dict['fast_padata'] = fast_padata
1669 kdc_exchange_dict['req_body'] = inner_req_body
1671 req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
1672 padata=padata,
1673 req_body=req_body,
1674 asn1Spec=req_asn1Spec())
1676 rep = self.send_recv_transaction(req_decoded)
1677 self.assertIsNotNone(rep)
1679 msg_type = self.getElementValue(rep, 'msg-type')
1680 self.assertIsNotNone(msg_type)
1682 expected_msg_type = None
1683 if check_error_fn is not None:
1684 expected_msg_type = KRB_ERROR
1685 self.assertIsNone(check_rep_fn)
1686 self.assertNotEqual(0, expected_error_mode)
1687 if check_rep_fn is not None:
1688 expected_msg_type = rep_msg_type
1689 self.assertIsNone(check_error_fn)
1690 self.assertEqual(0, expected_error_mode)
1691 self.assertIsNotNone(expected_msg_type)
1692 self.assertEqual(msg_type, expected_msg_type)
1694 if msg_type == KRB_ERROR:
1695 return check_error_fn(kdc_exchange_dict,
1696 callback_dict,
1697 rep)
1699 return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
1701 def as_exchange_dict(self,
1702 expected_crealm=None,
1703 expected_cname=None,
1704 expected_cname_private=None,
1705 expected_srealm=None,
1706 expected_sname=None,
1707 ticket_decryption_key=None,
1708 generate_fast_fn=None,
1709 generate_fast_armor_fn=None,
1710 generate_fast_padata_fn=None,
1711 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
1712 generate_padata_fn=None,
1713 check_error_fn=None,
1714 check_rep_fn=None,
1715 check_padata_fn=None,
1716 check_kdc_private_fn=None,
1717 callback_dict=None,
1718 expected_error_mode=0,
1719 client_as_etypes=None,
1720 expected_salt=None,
1721 authenticator_subkey=None,
1722 armor_key=None,
1723 armor_tgt=None,
1724 armor_subkey=None,
1725 auth_data=None,
1726 kdc_options='',
1727 outer_req=None):
1728 kdc_exchange_dict = {
1729 'req_msg_type': KRB_AS_REQ,
1730 'req_asn1Spec': krb5_asn1.AS_REQ,
1731 'rep_msg_type': KRB_AS_REP,
1732 'rep_asn1Spec': krb5_asn1.AS_REP,
1733 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
1734 'expected_crealm': expected_crealm,
1735 'expected_cname': expected_cname,
1736 'expected_srealm': expected_srealm,
1737 'expected_sname': expected_sname,
1738 'ticket_decryption_key': ticket_decryption_key,
1739 'generate_fast_fn': generate_fast_fn,
1740 'generate_fast_armor_fn': generate_fast_armor_fn,
1741 'generate_fast_padata_fn': generate_fast_padata_fn,
1742 'fast_armor_type': fast_armor_type,
1743 'generate_padata_fn': generate_padata_fn,
1744 'check_error_fn': check_error_fn,
1745 'check_rep_fn': check_rep_fn,
1746 'check_padata_fn': check_padata_fn,
1747 'check_kdc_private_fn': check_kdc_private_fn,
1748 'callback_dict': callback_dict,
1749 'expected_error_mode': expected_error_mode,
1750 'client_as_etypes': client_as_etypes,
1751 'expected_salt': expected_salt,
1752 'authenticator_subkey': authenticator_subkey,
1753 'armor_key': armor_key,
1754 'armor_tgt': armor_tgt,
1755 'armor_subkey': armor_subkey,
1756 'auth_data': auth_data,
1757 'kdc_options': kdc_options,
1758 'outer_req': outer_req
1760 if expected_cname_private is not None:
1761 kdc_exchange_dict['expected_cname_private'] = (
1762 expected_cname_private)
1764 if callback_dict is None:
1765 callback_dict = {}
1767 return kdc_exchange_dict
1769 def tgs_exchange_dict(self,
1770 expected_crealm=None,
1771 expected_cname=None,
1772 expected_cname_private=None,
1773 expected_srealm=None,
1774 expected_sname=None,
1775 ticket_decryption_key=None,
1776 generate_fast_fn=None,
1777 generate_fast_armor_fn=None,
1778 generate_fast_padata_fn=None,
1779 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
1780 generate_padata_fn=None,
1781 check_error_fn=None,
1782 check_rep_fn=None,
1783 check_padata_fn=None,
1784 check_kdc_private_fn=None,
1785 callback_dict=None,
1786 tgt=None,
1787 armor_key=None,
1788 armor_tgt=None,
1789 armor_subkey=None,
1790 authenticator_subkey=None,
1791 auth_data=None,
1792 body_checksum_type=None,
1793 kdc_options='',
1794 outer_req=None):
1795 kdc_exchange_dict = {
1796 'req_msg_type': KRB_TGS_REQ,
1797 'req_asn1Spec': krb5_asn1.TGS_REQ,
1798 'rep_msg_type': KRB_TGS_REP,
1799 'rep_asn1Spec': krb5_asn1.TGS_REP,
1800 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
1801 'expected_crealm': expected_crealm,
1802 'expected_cname': expected_cname,
1803 'expected_srealm': expected_srealm,
1804 'expected_sname': expected_sname,
1805 'ticket_decryption_key': ticket_decryption_key,
1806 'generate_fast_fn': generate_fast_fn,
1807 'generate_fast_armor_fn': generate_fast_armor_fn,
1808 'generate_fast_padata_fn': generate_fast_padata_fn,
1809 'fast_armor_type': fast_armor_type,
1810 'generate_padata_fn': generate_padata_fn,
1811 'check_error_fn': check_error_fn,
1812 'check_rep_fn': check_rep_fn,
1813 'check_padata_fn': check_padata_fn,
1814 'check_kdc_private_fn': check_kdc_private_fn,
1815 'callback_dict': callback_dict,
1816 'tgt': tgt,
1817 'body_checksum_type': body_checksum_type,
1818 'armor_key': armor_key,
1819 'armor_tgt': armor_tgt,
1820 'armor_subkey': armor_subkey,
1821 'auth_data': auth_data,
1822 'authenticator_subkey': authenticator_subkey,
1823 'kdc_options': kdc_options,
1824 'outer_req': outer_req
1826 if expected_cname_private is not None:
1827 kdc_exchange_dict['expected_cname_private'] = (
1828 expected_cname_private)
1830 if callback_dict is None:
1831 callback_dict = {}
1833 return kdc_exchange_dict
1835 def generic_check_kdc_rep(self,
1836 kdc_exchange_dict,
1837 callback_dict,
1838 rep):
1840 expected_crealm = kdc_exchange_dict['expected_crealm']
1841 expected_cname = kdc_exchange_dict['expected_cname']
1842 expected_srealm = kdc_exchange_dict['expected_srealm']
1843 expected_sname = kdc_exchange_dict['expected_sname']
1844 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
1845 check_padata_fn = kdc_exchange_dict['check_padata_fn']
1846 check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
1847 rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
1848 msg_type = kdc_exchange_dict['rep_msg_type']
1849 armor_key = kdc_exchange_dict['armor_key']
1851 self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
1852 padata = self.getElementValue(rep, 'padata')
1853 if self.strict_checking:
1854 self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
1855 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
1856 self.assertElementPresent(rep, 'ticket')
1857 ticket = self.getElementValue(rep, 'ticket')
1858 ticket_encpart = None
1859 ticket_cipher = None
1860 self.assertIsNotNone(ticket)
1861 if ticket is not None: # Never None, but gives indentation
1862 self.assertElementEqual(ticket, 'tkt-vno', 5)
1863 self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
1864 self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
1865 self.assertElementPresent(ticket, 'enc-part')
1866 ticket_encpart = self.getElementValue(ticket, 'enc-part')
1867 self.assertIsNotNone(ticket_encpart)
1868 if ticket_encpart is not None: # Never None, but gives indentation
1869 self.assertElementPresent(ticket_encpart, 'etype')
1870 # 'unspecified' means present, with any value != 0
1871 self.assertElementKVNO(ticket_encpart, 'kvno',
1872 self.unspecified_kvno)
1873 self.assertElementPresent(ticket_encpart, 'cipher')
1874 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
1875 self.assertElementPresent(rep, 'enc-part')
1876 encpart = self.getElementValue(rep, 'enc-part')
1877 encpart_cipher = None
1878 self.assertIsNotNone(encpart)
1879 if encpart is not None: # Never None, but gives indentation
1880 self.assertElementPresent(encpart, 'etype')
1881 self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
1882 self.assertElementPresent(encpart, 'cipher')
1883 encpart_cipher = self.getElementValue(encpart, 'cipher')
1885 ticket_checksum = None
1887 encpart_decryption_key = None
1888 self.assertIsNotNone(check_padata_fn)
1889 if check_padata_fn is not None:
1890 # See if we can get the decryption key from the preauth phase
1891 encpart_decryption_key, encpart_decryption_usage = (
1892 check_padata_fn(kdc_exchange_dict, callback_dict,
1893 rep, padata))
1895 if armor_key is not None:
1896 pa_dict = self.get_pa_dict(padata)
1898 if PADATA_FX_FAST in pa_dict:
1899 fx_fast_data = pa_dict[PADATA_FX_FAST]
1900 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
1901 fx_fast_data,
1902 armor_key,
1903 finished=True)
1905 if 'strengthen-key' in fast_response:
1906 strengthen_key = self.EncryptionKey_import(
1907 fast_response['strengthen-key'])
1908 encpart_decryption_key = (
1909 self.generate_strengthen_reply_key(
1910 strengthen_key,
1911 encpart_decryption_key))
1913 fast_finished = fast_response.get('finished', None)
1914 if fast_finished is not None:
1915 ticket_checksum = fast_finished['ticket-checksum']
1917 self.check_rep_padata(kdc_exchange_dict,
1918 callback_dict,
1919 rep,
1920 fast_response['padata'])
1922 ticket_private = None
1923 self.assertIsNotNone(ticket_decryption_key)
1924 if ticket_decryption_key is not None:
1925 self.assertElementEqual(ticket_encpart, 'etype',
1926 ticket_decryption_key.etype)
1927 self.assertElementKVNO(ticket_encpart, 'kvno',
1928 ticket_decryption_key.kvno)
1929 ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
1930 ticket_cipher)
1931 ticket_private = self.der_decode(
1932 ticket_decpart,
1933 asn1Spec=krb5_asn1.EncTicketPart())
1935 encpart_private = None
1936 self.assertIsNotNone(encpart_decryption_key)
1937 if encpart_decryption_key is not None:
1938 self.assertElementEqual(encpart, 'etype',
1939 encpart_decryption_key.etype)
1940 if self.strict_checking:
1941 self.assertElementKVNO(encpart, 'kvno',
1942 encpart_decryption_key.kvno)
1943 rep_decpart = encpart_decryption_key.decrypt(
1944 encpart_decryption_usage,
1945 encpart_cipher)
1946 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
1947 # application tag 26
1948 try:
1949 encpart_private = self.der_decode(
1950 rep_decpart,
1951 asn1Spec=rep_encpart_asn1Spec())
1952 except Exception:
1953 encpart_private = self.der_decode(
1954 rep_decpart,
1955 asn1Spec=krb5_asn1.EncTGSRepPart())
1957 self.assertIsNotNone(check_kdc_private_fn)
1958 if check_kdc_private_fn is not None:
1959 check_kdc_private_fn(kdc_exchange_dict, callback_dict,
1960 rep, ticket_private, encpart_private,
1961 ticket_checksum)
1963 return rep
1965 def check_fx_fast_data(self,
1966 kdc_exchange_dict,
1967 fx_fast_data,
1968 armor_key,
1969 finished=False,
1970 expect_strengthen_key=True):
1971 fx_fast_data = self.der_decode(fx_fast_data,
1972 asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
1974 enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
1975 self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
1977 fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
1979 fast_response = self.der_decode(fast_rep,
1980 asn1Spec=krb5_asn1.KrbFastResponse())
1982 if expect_strengthen_key and self.strict_checking:
1983 self.assertIn('strengthen-key', fast_response)
1985 if finished:
1986 self.assertIn('finished', fast_response)
1988 # Ensure that the nonce matches the nonce in the body of the request
1989 # (RFC6113 5.4.3).
1990 nonce = kdc_exchange_dict['nonce']
1991 self.assertEqual(nonce, fast_response['nonce'])
1993 return fast_response
1995 def generic_check_kdc_private(self,
1996 kdc_exchange_dict,
1997 callback_dict,
1998 rep,
1999 ticket_private,
2000 encpart_private,
2001 ticket_checksum):
2002 kdc_options = kdc_exchange_dict['kdc_options']
2003 canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
2004 canonicalize = (canon_pos < len(kdc_options)
2005 and kdc_options[canon_pos] == '1')
2007 expected_crealm = kdc_exchange_dict['expected_crealm']
2008 expected_srealm = kdc_exchange_dict['expected_srealm']
2009 expected_sname = kdc_exchange_dict['expected_sname']
2010 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2012 try:
2013 expected_cname = kdc_exchange_dict['expected_cname_private']
2014 except KeyError:
2015 expected_cname = kdc_exchange_dict['expected_cname']
2017 ticket = self.getElementValue(rep, 'ticket')
2019 if ticket_checksum is not None:
2020 armor_key = kdc_exchange_dict['armor_key']
2021 self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
2023 ticket_session_key = None
2024 if ticket_private is not None:
2025 self.assertElementPresent(ticket_private, 'flags')
2026 self.assertElementPresent(ticket_private, 'key')
2027 ticket_key = self.getElementValue(ticket_private, 'key')
2028 self.assertIsNotNone(ticket_key)
2029 if ticket_key is not None: # Never None, but gives indentation
2030 self.assertElementPresent(ticket_key, 'keytype')
2031 self.assertElementPresent(ticket_key, 'keyvalue')
2032 ticket_session_key = self.EncryptionKey_import(ticket_key)
2033 self.assertElementEqualUTF8(ticket_private, 'crealm',
2034 expected_crealm)
2035 self.assertElementEqualPrincipal(ticket_private, 'cname',
2036 expected_cname)
2037 self.assertElementPresent(ticket_private, 'transited')
2038 self.assertElementPresent(ticket_private, 'authtime')
2039 if self.strict_checking:
2040 self.assertElementPresent(ticket_private, 'starttime')
2041 self.assertElementPresent(ticket_private, 'endtime')
2042 # TODO self.assertElementPresent(ticket_private, 'renew-till')
2043 # TODO self.assertElementMissing(ticket_private, 'caddr')
2044 self.assertElementPresent(ticket_private, 'authorization-data')
2046 encpart_session_key = None
2047 if encpart_private is not None:
2048 self.assertElementPresent(encpart_private, 'key')
2049 encpart_key = self.getElementValue(encpart_private, 'key')
2050 self.assertIsNotNone(encpart_key)
2051 if encpart_key is not None: # Never None, but gives indentation
2052 self.assertElementPresent(encpart_key, 'keytype')
2053 self.assertElementPresent(encpart_key, 'keyvalue')
2054 encpart_session_key = self.EncryptionKey_import(encpart_key)
2055 self.assertElementPresent(encpart_private, 'last-req')
2056 self.assertElementEqual(encpart_private, 'nonce',
2057 kdc_exchange_dict['nonce'])
2058 # TODO self.assertElementPresent(encpart_private,
2059 # 'key-expiration')
2060 self.assertElementPresent(encpart_private, 'flags')
2061 self.assertElementPresent(encpart_private, 'authtime')
2062 if self.strict_checking:
2063 self.assertElementPresent(encpart_private, 'starttime')
2064 self.assertElementPresent(encpart_private, 'endtime')
2065 # TODO self.assertElementPresent(encpart_private, 'renew-till')
2066 self.assertElementEqualUTF8(encpart_private, 'srealm',
2067 expected_srealm)
2068 self.assertElementEqualPrincipal(encpart_private, 'sname',
2069 expected_sname)
2070 # TODO self.assertElementMissing(encpart_private, 'caddr')
2072 sent_claims = self.sent_claims(kdc_exchange_dict)
2074 if self.strict_checking:
2075 if sent_claims or canonicalize:
2076 self.assertElementPresent(encpart_private,
2077 'encrypted-pa-data')
2078 enc_pa_dict = self.get_pa_dict(
2079 encpart_private['encrypted-pa-data'])
2080 if canonicalize:
2081 self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2083 (supported_etypes,) = struct.unpack(
2084 '<L',
2085 enc_pa_dict[PADATA_SUPPORTED_ETYPES])
2087 self.assertTrue(
2088 security.KERB_ENCTYPE_FAST_SUPPORTED
2089 & supported_etypes)
2090 self.assertTrue(
2091 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED
2092 & supported_etypes)
2093 self.assertTrue(
2094 security.KERB_ENCTYPE_CLAIMS_SUPPORTED
2095 & supported_etypes)
2096 else:
2097 self.assertNotIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2099 # ClaimsCompIdFASTSupported registry key
2100 if sent_claims:
2101 self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2103 self.check_pac_options_claims_support(
2104 enc_pa_dict[PADATA_PAC_OPTIONS])
2105 else:
2106 self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2107 else:
2108 self.assertElementEqual(encpart_private,
2109 'encrypted-pa-data',
2112 if ticket_session_key is not None and encpart_session_key is not None:
2113 self.assertEqual(ticket_session_key.etype,
2114 encpart_session_key.etype)
2115 self.assertEqual(ticket_session_key.key.contents,
2116 encpart_session_key.key.contents)
2117 if encpart_session_key is not None:
2118 session_key = encpart_session_key
2119 else:
2120 session_key = ticket_session_key
2121 ticket_creds = KerberosTicketCreds(
2122 ticket,
2123 session_key,
2124 crealm=expected_crealm,
2125 cname=expected_cname,
2126 srealm=expected_srealm,
2127 sname=expected_sname,
2128 decryption_key=ticket_decryption_key,
2129 ticket_private=ticket_private,
2130 encpart_private=encpart_private)
2132 kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
2134 def check_pac_options_claims_support(self, pac_options):
2135 pac_options = self.der_decode(pac_options,
2136 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2137 self.assertEqual('1', pac_options['options'][0]) # claims bit
2139 def generic_check_kdc_error(self,
2140 kdc_exchange_dict,
2141 callback_dict,
2142 rep):
2144 expected_cname = kdc_exchange_dict['expected_cname']
2145 expected_srealm = kdc_exchange_dict['expected_srealm']
2146 expected_sname = kdc_exchange_dict['expected_sname']
2147 expected_error_mode = kdc_exchange_dict['expected_error_mode']
2149 sent_fast = self.sent_fast(kdc_exchange_dict)
2151 self.assertElementEqual(rep, 'pvno', 5)
2152 self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
2153 self.assertElementEqual(rep, 'error-code', expected_error_mode)
2154 if self.strict_checking:
2155 self.assertElementMissing(rep, 'ctime')
2156 self.assertElementMissing(rep, 'cusec')
2157 self.assertElementPresent(rep, 'stime')
2158 self.assertElementPresent(rep, 'susec')
2159 # error-code checked above
2160 if self.strict_checking:
2161 self.assertElementMissing(rep, 'crealm')
2162 self.assertElementMissing(rep, 'cname')
2163 self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
2164 if sent_fast and expected_error_mode == KDC_ERR_GENERIC:
2165 self.assertElementEqualPrincipal(rep, 'sname',
2166 self.get_krbtgt_sname())
2167 else:
2168 self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
2169 self.assertElementMissing(rep, 'e-text')
2170 if expected_error_mode == KDC_ERR_GENERIC:
2171 self.assertElementMissing(rep, 'e-data')
2172 return rep
2173 edata = self.getElementValue(rep, 'e-data')
2174 if self.strict_checking:
2175 self.assertIsNotNone(edata)
2176 if edata is not None:
2177 rep_padata = self.der_decode(edata,
2178 asn1Spec=krb5_asn1.METHOD_DATA())
2179 self.assertGreater(len(rep_padata), 0)
2181 if sent_fast:
2182 self.assertEqual(1, len(rep_padata))
2183 rep_pa_dict = self.get_pa_dict(rep_padata)
2184 self.assertIn(PADATA_FX_FAST, rep_pa_dict)
2186 armor_key = kdc_exchange_dict['armor_key']
2187 self.assertIsNotNone(armor_key)
2188 fast_response = self.check_fx_fast_data(
2189 kdc_exchange_dict,
2190 rep_pa_dict[PADATA_FX_FAST],
2191 armor_key,
2192 expect_strengthen_key=False)
2194 rep_padata = fast_response['padata']
2195 else:
2196 rep_padata = []
2198 etype_info2 = self.check_rep_padata(kdc_exchange_dict,
2199 callback_dict,
2200 rep,
2201 rep_padata)
2203 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
2205 return rep
2207 def check_rep_padata(self,
2208 kdc_exchange_dict,
2209 callback_dict,
2210 rep,
2211 rep_padata):
2212 expected_error_mode = kdc_exchange_dict['expected_error_mode']
2213 req_body = kdc_exchange_dict['req_body']
2214 proposed_etypes = req_body['etype']
2215 client_as_etypes = kdc_exchange_dict.get('client_as_etypes', [])
2217 expect_etype_info2 = ()
2218 expect_etype_info = False
2219 unexpect_etype_info = True
2220 expected_aes_type = 0
2221 expected_rc4_type = 0
2222 if kcrypto.Enctype.RC4 in proposed_etypes:
2223 expect_etype_info = True
2224 for etype in proposed_etypes:
2225 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2226 expect_etype_info = False
2227 if etype not in client_as_etypes:
2228 continue
2229 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2230 if etype > expected_aes_type:
2231 expected_aes_type = etype
2232 if etype in (kcrypto.Enctype.RC4,) and expected_error_mode != 0:
2233 unexpect_etype_info = False
2234 if etype > expected_rc4_type:
2235 expected_rc4_type = etype
2237 if expected_aes_type != 0:
2238 expect_etype_info2 += (expected_aes_type,)
2239 if expected_rc4_type != 0:
2240 expect_etype_info2 += (expected_rc4_type,)
2242 expected_patypes = ()
2243 if expect_etype_info:
2244 self.assertGreater(len(expect_etype_info2), 0)
2245 expected_patypes += (PADATA_ETYPE_INFO,)
2246 if len(expect_etype_info2) != 0:
2247 expected_patypes += (PADATA_ETYPE_INFO2,)
2249 expected_patypes += (PADATA_ENC_TIMESTAMP,)
2250 expected_patypes += (PADATA_PK_AS_REQ,)
2251 expected_patypes += (PADATA_PK_AS_REP_19,)
2253 if self.strict_checking:
2254 for i, patype in enumerate(expected_patypes):
2255 self.assertElementEqual(rep_padata[i], 'padata-type', patype)
2256 self.assertEqual(len(rep_padata), len(expected_patypes))
2258 etype_info2 = None
2259 etype_info = None
2260 enc_timestamp = None
2261 pk_as_req = None
2262 pk_as_rep19 = None
2263 for pa in rep_padata:
2264 patype = self.getElementValue(pa, 'padata-type')
2265 pavalue = self.getElementValue(pa, 'padata-value')
2266 if patype == PADATA_ETYPE_INFO2:
2267 self.assertIsNone(etype_info2)
2268 etype_info2 = self.der_decode(pavalue,
2269 asn1Spec=krb5_asn1.ETYPE_INFO2())
2270 continue
2271 if patype == PADATA_ETYPE_INFO:
2272 self.assertIsNone(etype_info)
2273 etype_info = self.der_decode(pavalue,
2274 asn1Spec=krb5_asn1.ETYPE_INFO())
2275 continue
2276 if patype == PADATA_ENC_TIMESTAMP:
2277 self.assertIsNone(enc_timestamp)
2278 enc_timestamp = pavalue
2279 self.assertEqual(len(enc_timestamp), 0)
2280 continue
2281 if patype == PADATA_PK_AS_REQ:
2282 self.assertIsNone(pk_as_req)
2283 pk_as_req = pavalue
2284 self.assertEqual(len(pk_as_req), 0)
2285 continue
2286 if patype == PADATA_PK_AS_REP_19:
2287 self.assertIsNone(pk_as_rep19)
2288 pk_as_rep19 = pavalue
2289 self.assertEqual(len(pk_as_rep19), 0)
2290 continue
2292 if all(etype not in client_as_etypes or etype not in proposed_etypes
2293 for etype in (kcrypto.Enctype.AES256,
2294 kcrypto.Enctype.AES128,
2295 kcrypto.Enctype.RC4)):
2296 self.assertIsNone(etype_info2)
2297 self.assertIsNone(etype_info)
2298 if self.strict_checking:
2299 self.assertIsNotNone(enc_timestamp)
2300 self.assertIsNotNone(pk_as_req)
2301 self.assertIsNotNone(pk_as_rep19)
2302 return None
2304 if self.strict_checking:
2305 self.assertIsNotNone(etype_info2)
2306 if expect_etype_info:
2307 self.assertIsNotNone(etype_info)
2308 else:
2309 if self.strict_checking:
2310 self.assertIsNone(etype_info)
2311 if unexpect_etype_info:
2312 self.assertIsNone(etype_info)
2314 if self.strict_checking:
2315 self.assertGreaterEqual(len(etype_info2), 1)
2316 self.assertEqual(len(etype_info2), len(expect_etype_info2))
2317 for i in range(0, len(etype_info2)):
2318 e = self.getElementValue(etype_info2[i], 'etype')
2319 self.assertEqual(e, expect_etype_info2[i])
2320 salt = self.getElementValue(etype_info2[i], 'salt')
2321 if e == kcrypto.Enctype.RC4:
2322 self.assertIsNone(salt)
2323 else:
2324 self.assertIsNotNone(salt)
2325 expected_salt = kdc_exchange_dict['expected_salt']
2326 if expected_salt is not None:
2327 self.assertEqual(salt, expected_salt)
2328 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
2329 if self.strict_checking:
2330 self.assertIsNone(s2kparams)
2331 if etype_info is not None:
2332 self.assertEqual(len(etype_info), 1)
2333 e = self.getElementValue(etype_info[0], 'etype')
2334 self.assertEqual(e, kcrypto.Enctype.RC4)
2335 self.assertEqual(e, expect_etype_info2[0])
2336 salt = self.getElementValue(etype_info[0], 'salt')
2337 if self.strict_checking:
2338 self.assertIsNotNone(salt)
2339 self.assertEqual(len(salt), 0)
2341 self.assertIsNotNone(enc_timestamp)
2342 self.assertIsNotNone(pk_as_req)
2343 self.assertIsNotNone(pk_as_rep19)
2345 return etype_info2
2347 def generate_simple_fast(self,
2348 kdc_exchange_dict,
2349 _callback_dict,
2350 req_body,
2351 fast_padata,
2352 fast_armor,
2353 checksum,
2354 fast_options=''):
2355 armor_key = kdc_exchange_dict['armor_key']
2357 fast_req = self.KRB_FAST_REQ_create(fast_options,
2358 fast_padata,
2359 req_body)
2360 fast_req = self.der_encode(fast_req,
2361 asn1Spec=krb5_asn1.KrbFastReq())
2362 fast_req = self.EncryptedData_create(armor_key,
2363 KU_FAST_ENC,
2364 fast_req)
2366 fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
2367 checksum,
2368 fast_req)
2370 fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
2371 fx_fast_request = self.der_encode(
2372 fx_fast_request,
2373 asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
2375 fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
2376 fx_fast_request)
2378 return fast_padata
2380 def generate_ap_req(self,
2381 kdc_exchange_dict,
2382 _callback_dict,
2383 req_body,
2384 armor):
2385 if armor:
2386 tgt = kdc_exchange_dict['armor_tgt']
2387 authenticator_subkey = kdc_exchange_dict['armor_subkey']
2389 req_body_checksum = None
2390 else:
2391 tgt = kdc_exchange_dict['tgt']
2392 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2393 body_checksum_type = kdc_exchange_dict['body_checksum_type']
2395 req_body_blob = self.der_encode(req_body,
2396 asn1Spec=krb5_asn1.KDC_REQ_BODY())
2398 req_body_checksum = self.Checksum_create(tgt.session_key,
2399 KU_TGS_REQ_AUTH_CKSUM,
2400 req_body_blob,
2401 ctype=body_checksum_type)
2403 auth_data = kdc_exchange_dict['auth_data']
2405 subkey_obj = None
2406 if authenticator_subkey is not None:
2407 subkey_obj = authenticator_subkey.export_obj()
2408 seq_number = random.randint(0, 0xfffffffe)
2409 (ctime, cusec) = self.get_KerberosTimeWithUsec()
2410 authenticator_obj = self.Authenticator_create(
2411 crealm=tgt.crealm,
2412 cname=tgt.cname,
2413 cksum=req_body_checksum,
2414 cusec=cusec,
2415 ctime=ctime,
2416 subkey=subkey_obj,
2417 seq_number=seq_number,
2418 authorization_data=auth_data)
2419 authenticator_blob = self.der_encode(
2420 authenticator_obj,
2421 asn1Spec=krb5_asn1.Authenticator())
2423 usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
2424 authenticator = self.EncryptedData_create(tgt.session_key,
2425 usage,
2426 authenticator_blob)
2428 ap_options = krb5_asn1.APOptions('0')
2429 ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
2430 ticket=tgt.ticket,
2431 authenticator=authenticator)
2432 ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
2434 return ap_req
2436 def generate_simple_tgs_padata(self,
2437 kdc_exchange_dict,
2438 callback_dict,
2439 req_body):
2440 ap_req = self.generate_ap_req(kdc_exchange_dict,
2441 callback_dict,
2442 req_body,
2443 armor=False)
2444 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
2445 padata = [pa_tgs_req]
2447 return padata, req_body
2449 def check_simple_tgs_padata(self,
2450 kdc_exchange_dict,
2451 callback_dict,
2452 rep,
2453 padata):
2454 tgt = kdc_exchange_dict['tgt']
2455 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2456 if authenticator_subkey is not None:
2457 subkey = authenticator_subkey
2458 subkey_usage = KU_TGS_REP_ENC_PART_SUB_KEY
2459 else:
2460 subkey = tgt.session_key
2461 subkey_usage = KU_TGS_REP_ENC_PART_SESSION
2463 return subkey, subkey_usage
2465 def generate_armor_key(self, subkey, session_key):
2466 armor_key = kcrypto.cf2(subkey.key,
2467 session_key.key,
2468 b'subkeyarmor',
2469 b'ticketarmor')
2470 armor_key = Krb5EncryptionKey(armor_key, None)
2472 return armor_key
2474 def generate_strengthen_reply_key(self, strengthen_key, reply_key):
2475 strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
2476 reply_key.key,
2477 b'strengthenkey',
2478 b'replykey')
2479 strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
2480 reply_key.kvno)
2482 return strengthen_reply_key
2484 def generate_client_challenge_key(self, armor_key, longterm_key):
2485 client_challenge_key = kcrypto.cf2(armor_key.key,
2486 longterm_key.key,
2487 b'clientchallengearmor',
2488 b'challengelongterm')
2489 client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
2491 return client_challenge_key
2493 def generate_kdc_challenge_key(self, armor_key, longterm_key):
2494 kdc_challenge_key = kcrypto.cf2(armor_key.key,
2495 longterm_key.key,
2496 b'kdcchallengearmor',
2497 b'challengelongterm')
2498 kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
2500 return kdc_challenge_key
2502 def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
2503 expected_type = expected_checksum['cksumtype']
2504 self.assertEqual(armor_key.ctype, expected_type)
2506 ticket_blob = self.der_encode(ticket,
2507 asn1Spec=krb5_asn1.Ticket())
2508 checksum = self.Checksum_create(armor_key,
2509 KU_FAST_FINISHED,
2510 ticket_blob)
2511 self.assertEqual(expected_checksum, checksum)
2513 def get_outer_pa_dict(self, kdc_exchange_dict):
2514 return self.get_pa_dict(kdc_exchange_dict['req_padata'])
2516 def get_fast_pa_dict(self, kdc_exchange_dict):
2517 req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
2519 if req_pa_dict:
2520 return req_pa_dict
2522 return self.get_outer_pa_dict(kdc_exchange_dict)
2524 def sent_fast(self, kdc_exchange_dict):
2525 outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
2527 return PADATA_FX_FAST in outer_pa_dict
2529 def sent_enc_challenge(self, kdc_exchange_dict):
2530 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
2532 return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
2534 def sent_claims(self, kdc_exchange_dict):
2535 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
2537 if PADATA_PAC_OPTIONS not in fast_pa_dict:
2538 return False
2540 pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
2541 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2542 pac_options = pac_options['options']
2543 claims_pos = len(tuple(krb5_asn1.PACOptionFlags('claims'))) - 1
2545 return (claims_pos < len(pac_options)
2546 and pac_options[claims_pos] == '1')
2548 def get_krbtgt_sname(self):
2549 krbtgt_creds = self.get_krbtgt_creds()
2550 krbtgt_username = krbtgt_creds.get_username()
2551 krbtgt_realm = krbtgt_creds.get_realm()
2552 krbtgt_sname = self.PrincipalName_create(
2553 name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm])
2555 return krbtgt_sname
2557 def _test_as_exchange(self,
2558 cname,
2559 realm,
2560 sname,
2561 till,
2562 client_as_etypes,
2563 expected_error_mode,
2564 expected_crealm,
2565 expected_cname,
2566 expected_srealm,
2567 expected_sname,
2568 expected_salt,
2569 etypes,
2570 padata,
2571 kdc_options,
2572 preauth_key=None,
2573 ticket_decryption_key=None):
2575 def _generate_padata_copy(_kdc_exchange_dict,
2576 _callback_dict,
2577 req_body):
2578 return padata, req_body
2580 def _check_padata_preauth_key(_kdc_exchange_dict,
2581 _callback_dict,
2582 rep,
2583 padata):
2584 as_rep_usage = KU_AS_REP_ENC_PART
2585 return preauth_key, as_rep_usage
2587 if expected_error_mode == 0:
2588 check_error_fn = None
2589 check_rep_fn = self.generic_check_kdc_rep
2590 else:
2591 check_error_fn = self.generic_check_kdc_error
2592 check_rep_fn = None
2594 if padata is not None:
2595 generate_padata_fn = _generate_padata_copy
2596 else:
2597 generate_padata_fn = None
2599 kdc_exchange_dict = self.as_exchange_dict(
2600 expected_crealm=expected_crealm,
2601 expected_cname=expected_cname,
2602 expected_srealm=expected_srealm,
2603 expected_sname=expected_sname,
2604 ticket_decryption_key=ticket_decryption_key,
2605 generate_padata_fn=generate_padata_fn,
2606 check_error_fn=check_error_fn,
2607 check_rep_fn=check_rep_fn,
2608 check_padata_fn=_check_padata_preauth_key,
2609 check_kdc_private_fn=self.generic_check_kdc_private,
2610 expected_error_mode=expected_error_mode,
2611 client_as_etypes=client_as_etypes,
2612 expected_salt=expected_salt,
2613 kdc_options=str(kdc_options))
2615 rep = self._generic_kdc_exchange(kdc_exchange_dict,
2616 cname=cname,
2617 realm=realm,
2618 sname=sname,
2619 till_time=till,
2620 etypes=etypes)
2622 return rep, kdc_exchange_dict