tests/krb5: Check version number of obtained ticket
[Samba.git] / python / samba / tests / krb5 / raw_testcase.py
blob70062ca338abdf724bb10f2dcf52de901890ad28
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import socket
21 import struct
22 import time
23 import datetime
24 import random
25 import binascii
26 import itertools
27 import collections
29 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
30 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
31 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
32 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
34 from pyasn1.codec.ber.encoder import BitStringEncoder
36 from samba.credentials import Credentials
37 from samba.dcerpc import security
38 from samba.gensec import FEATURE_SEAL
40 import samba.tests
41 from samba.tests import TestCaseInTempDir
43 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
44 from samba.tests.krb5.rfc4120_constants import (
45 KDC_ERR_GENERIC,
46 KRB_AP_REQ,
47 KRB_AS_REP,
48 KRB_AS_REQ,
49 KRB_ERROR,
50 KRB_TGS_REP,
51 KRB_TGS_REQ,
52 KU_AS_REP_ENC_PART,
53 KU_NON_KERB_CKSUM_SALT,
54 KU_TGS_REP_ENC_PART_SESSION,
55 KU_TGS_REP_ENC_PART_SUB_KEY,
56 KU_TGS_REQ_AUTH,
57 KU_TGS_REQ_AUTH_CKSUM,
58 KU_TGS_REQ_AUTH_DAT_SESSION,
59 KU_TGS_REQ_AUTH_DAT_SUBKEY,
60 KU_TICKET,
61 PADATA_ENC_TIMESTAMP,
62 PADATA_ETYPE_INFO,
63 PADATA_ETYPE_INFO2,
64 PADATA_FOR_USER,
65 PADATA_KDC_REQ,
66 PADATA_PAC_REQUEST,
67 PADATA_PK_AS_REQ,
68 PADATA_PK_AS_REP_19
70 import samba.tests.krb5.kcrypto as kcrypto
73 def BitStringEncoder_encodeValue32(
74 self, value, asn1Spec, encodeFun, **options):
76 # BitStrings like KDCOptions or TicketFlags should at least
77 # be 32-Bit on the wire
79 if asn1Spec is not None:
80 # TODO: try to avoid ASN.1 schema instantiation
81 value = asn1Spec.clone(value)
83 valueLength = len(value)
84 if valueLength % 8:
85 alignedValue = value << (8 - valueLength % 8)
86 else:
87 alignedValue = value
89 substrate = alignedValue.asOctets()
90 length = len(substrate)
91 # We need at least 32-Bit / 4-Bytes
92 if length < 4:
93 padding = 4 - length
94 else:
95 padding = 0
96 ret = b'\x00' + substrate + (b'\x00' * padding)
97 return ret, False, True
100 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
103 def BitString_NamedValues_prettyPrint(self, scope=0):
104 ret = "%s" % self.asBinary()
105 bits = []
106 highest_bit = 32
107 for byte in self.asNumbers():
108 for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
109 mask = 1 << bit
110 if byte & mask:
111 val = 1
112 else:
113 val = 0
114 bits.append(val)
115 if len(bits) < highest_bit:
116 for bitPosition in range(len(bits), highest_bit):
117 bits.append(0)
118 indent = " " * scope
119 delim = ": (\n%s " % indent
120 for bitPosition in range(highest_bit):
121 if bitPosition in self.prettyPrintNamedValues:
122 name = self.prettyPrintNamedValues[bitPosition]
123 elif bits[bitPosition] != 0:
124 name = "unknown-bit-%u" % bitPosition
125 else:
126 continue
127 ret += "%s%s:%u" % (delim, name, bits[bitPosition])
128 delim = ",\n%s " % indent
129 ret += "\n%s)" % indent
130 return ret
133 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
134 krb5_asn1.TicketFlagsValues.namedValues
135 krb5_asn1.TicketFlags.namedValues =\
136 krb5_asn1.TicketFlagsValues.namedValues
137 krb5_asn1.TicketFlags.prettyPrint =\
138 BitString_NamedValues_prettyPrint
139 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
140 krb5_asn1.KDCOptionsValues.namedValues
141 krb5_asn1.KDCOptions.namedValues =\
142 krb5_asn1.KDCOptionsValues.namedValues
143 krb5_asn1.KDCOptions.prettyPrint =\
144 BitString_NamedValues_prettyPrint
145 krb5_asn1.APOptions.prettyPrintNamedValues =\
146 krb5_asn1.APOptionsValues.namedValues
147 krb5_asn1.APOptions.namedValues =\
148 krb5_asn1.APOptionsValues.namedValues
149 krb5_asn1.APOptions.prettyPrint =\
150 BitString_NamedValues_prettyPrint
151 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
152 krb5_asn1.PACOptionFlagsValues.namedValues
153 krb5_asn1.PACOptionFlags.namedValues =\
154 krb5_asn1.PACOptionFlagsValues.namedValues
155 krb5_asn1.PACOptionFlags.prettyPrint =\
156 BitString_NamedValues_prettyPrint
159 def Integer_NamedValues_prettyPrint(self, scope=0):
160 intval = int(self)
161 if intval in self.prettyPrintNamedValues:
162 name = self.prettyPrintNamedValues[intval]
163 else:
164 name = "<__unknown__>"
165 ret = "%d (0x%x) %s" % (intval, intval, name)
166 return ret
169 krb5_asn1.NameType.prettyPrintNamedValues =\
170 krb5_asn1.NameTypeValues.namedValues
171 krb5_asn1.NameType.prettyPrint =\
172 Integer_NamedValues_prettyPrint
173 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
174 krb5_asn1.AuthDataTypeValues.namedValues
175 krb5_asn1.AuthDataType.prettyPrint =\
176 Integer_NamedValues_prettyPrint
177 krb5_asn1.PADataType.prettyPrintNamedValues =\
178 krb5_asn1.PADataTypeValues.namedValues
179 krb5_asn1.PADataType.prettyPrint =\
180 Integer_NamedValues_prettyPrint
181 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
182 krb5_asn1.EncryptionTypeValues.namedValues
183 krb5_asn1.EncryptionType.prettyPrint =\
184 Integer_NamedValues_prettyPrint
185 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
186 krb5_asn1.ChecksumTypeValues.namedValues
187 krb5_asn1.ChecksumType.prettyPrint =\
188 Integer_NamedValues_prettyPrint
189 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
190 krb5_asn1.KerbErrorDataTypeValues.namedValues
191 krb5_asn1.KerbErrorDataType.prettyPrint =\
192 Integer_NamedValues_prettyPrint
195 class Krb5EncryptionKey:
196 def __init__(self, key, kvno):
197 EncTypeChecksum = {
198 kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
199 kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
200 kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
202 self.key = key
203 self.etype = key.enctype
204 self.ctype = EncTypeChecksum[self.etype]
205 self.kvno = kvno
207 def encrypt(self, usage, plaintext):
208 ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
209 return ciphertext
211 def decrypt(self, usage, ciphertext):
212 plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
213 return plaintext
215 def make_checksum(self, usage, plaintext, ctype=None):
216 if ctype is None:
217 ctype = self.ctype
218 cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
219 return cksum
221 def export_obj(self):
222 EncryptionKey_obj = {
223 'keytype': self.etype,
224 'keyvalue': self.key.contents,
226 return EncryptionKey_obj
229 class KerberosCredentials(Credentials):
230 def __init__(self):
231 super(KerberosCredentials, self).__init__()
232 all_enc_types = 0
233 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
234 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
235 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
237 self.as_supported_enctypes = all_enc_types
238 self.tgs_supported_enctypes = all_enc_types
239 self.ap_supported_enctypes = all_enc_types
241 self.kvno = None
242 self.forced_keys = {}
244 self.forced_salt = None
246 def set_as_supported_enctypes(self, value):
247 self.as_supported_enctypes = int(value)
249 def set_tgs_supported_enctypes(self, value):
250 self.tgs_supported_enctypes = int(value)
252 def set_ap_supported_enctypes(self, value):
253 self.ap_supported_enctypes = int(value)
255 def _get_krb5_etypes(self, supported_enctypes):
256 etypes = ()
258 if supported_enctypes & security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96:
259 etypes += (kcrypto.Enctype.AES256,)
260 if supported_enctypes & security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96:
261 etypes += (kcrypto.Enctype.AES128,)
262 if supported_enctypes & security.KERB_ENCTYPE_RC4_HMAC_MD5:
263 etypes += (kcrypto.Enctype.RC4,)
265 return etypes
267 def get_as_krb5_etypes(self):
268 return self._get_krb5_etypes(self.as_supported_enctypes)
270 def get_tgs_krb5_etypes(self):
271 return self._get_krb5_etypes(self.tgs_supported_enctypes)
273 def get_ap_krb5_etypes(self):
274 return self._get_krb5_etypes(self.ap_supported_enctypes)
276 def set_kvno(self, kvno):
277 self.kvno = kvno
279 def get_kvno(self):
280 return self.kvno
282 def set_forced_key(self, etype, hexkey):
283 etype = int(etype)
284 contents = binascii.a2b_hex(hexkey)
285 key = kcrypto.Key(etype, contents)
286 self.forced_keys[etype] = Krb5EncryptionKey(key, self.kvno)
288 def get_forced_key(self, etype):
289 etype = int(etype)
290 return self.forced_keys.get(etype, None)
292 def set_forced_salt(self, salt):
293 self.forced_salt = bytes(salt)
295 def get_forced_salt(self):
296 return self.forced_salt
298 def get_salt(self):
299 if self.forced_salt is not None:
300 return self.forced_salt
302 if self.get_workstation():
303 salt_string = '%shost%s.%s' % (
304 self.get_realm().upper(),
305 self.get_username().lower().rsplit('$', 1)[0],
306 self.get_realm().lower())
307 else:
308 salt_string = self.get_realm().upper() + self.get_username()
310 return salt_string.encode('utf-8')
313 class KerberosTicketCreds:
314 def __init__(self, ticket, session_key,
315 crealm=None, cname=None,
316 srealm=None, sname=None,
317 decryption_key=None,
318 ticket_private=None,
319 encpart_private=None):
320 self.ticket = ticket
321 self.session_key = session_key
322 self.crealm = crealm
323 self.cname = cname
324 self.srealm = srealm
325 self.sname = sname
326 self.decryption_key = decryption_key
327 self.ticket_private = ticket_private
328 self.encpart_private = encpart_private
331 class RawKerberosTest(TestCaseInTempDir):
332 """A raw Kerberos Test case."""
334 etypes_to_test = (
335 {"value": -1111, "name": "dummy", },
336 {"value": kcrypto.Enctype.AES256, "name": "aes128", },
337 {"value": kcrypto.Enctype.AES128, "name": "aes256", },
338 {"value": kcrypto.Enctype.RC4, "name": "rc4", },
341 setup_etype_test_permutations_done = False
343 @classmethod
344 def setup_etype_test_permutations(cls):
345 if cls.setup_etype_test_permutations_done:
346 return
348 res = []
350 num_idxs = len(cls.etypes_to_test)
351 permutations = []
352 for num in range(1, num_idxs + 1):
353 chunk = list(itertools.permutations(range(num_idxs), num))
354 for e in chunk:
355 el = list(e)
356 permutations.append(el)
358 for p in permutations:
359 name = None
360 etypes = ()
361 for idx in p:
362 n = cls.etypes_to_test[idx]["name"]
363 if name is None:
364 name = n
365 else:
366 name += "_%s" % n
367 etypes += (cls.etypes_to_test[idx]["value"],)
369 r = {"name": name, "etypes": etypes, }
370 res.append(r)
372 cls.etype_test_permutations = res
373 cls.setup_etype_test_permutations_done = True
375 @classmethod
376 def etype_test_permutation_name_idx(cls):
377 cls.setup_etype_test_permutations()
378 res = []
379 idx = 0
380 for e in cls.etype_test_permutations:
381 r = (e['name'], idx)
382 idx += 1
383 res.append(r)
384 return res
386 def etype_test_permutation_by_idx(self, idx):
387 e = self.etype_test_permutations[idx]
388 return (e['name'], e['etypes'])
390 @classmethod
391 def setUpClass(cls):
392 super().setUpClass()
394 cls.host = samba.tests.env_get_var_value('SERVER')
396 # A dictionary containing credentials that have already been
397 # obtained.
398 cls.creds_dict = {}
400 def setUp(self):
401 super().setUp()
402 self.do_asn1_print = False
403 self.do_hexdump = False
405 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
406 allow_missing=True)
407 if strict_checking is None:
408 strict_checking = '1'
409 self.strict_checking = bool(int(strict_checking))
411 self.s = None
413 self.unspecified_kvno = object()
415 def tearDown(self):
416 self._disconnect("tearDown")
417 super().tearDown()
419 def _disconnect(self, reason):
420 if self.s is None:
421 return
422 self.s.close()
423 self.s = None
424 if self.do_hexdump:
425 sys.stderr.write("disconnect[%s]\n" % reason)
427 def _connect_tcp(self):
428 tcp_port = 88
429 try:
430 self.a = socket.getaddrinfo(self.host, tcp_port, socket.AF_UNSPEC,
431 socket.SOCK_STREAM, socket.SOL_TCP,
433 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
434 self.s.settimeout(10)
435 self.s.connect(self.a[0][4])
436 except socket.error:
437 self.s.close()
438 raise
439 except IOError:
440 self.s.close()
441 raise
443 def connect(self):
444 self.assertNotConnected()
445 self._connect_tcp()
446 if self.do_hexdump:
447 sys.stderr.write("connected[%s]\n" % self.host)
449 def env_get_var(self, varname, prefix,
450 fallback_default=True,
451 allow_missing=False):
452 val = None
453 if prefix is not None:
454 allow_missing_prefix = allow_missing or fallback_default
455 val = samba.tests.env_get_var_value(
456 '%s_%s' % (prefix, varname),
457 allow_missing=allow_missing_prefix)
458 else:
459 fallback_default = True
460 if val is None and fallback_default:
461 val = samba.tests.env_get_var_value(varname,
462 allow_missing=allow_missing)
463 return val
465 def _get_krb5_creds_from_env(self, prefix,
466 default_username=None,
467 allow_missing_password=False,
468 allow_missing_keys=True,
469 require_strongest_key=False):
470 c = KerberosCredentials()
471 c.guess()
473 domain = self.env_get_var('DOMAIN', prefix)
474 realm = self.env_get_var('REALM', prefix)
475 allow_missing_username = default_username is not None
476 username = self.env_get_var('USERNAME', prefix,
477 fallback_default=False,
478 allow_missing=allow_missing_username)
479 if username is None:
480 username = default_username
481 password = self.env_get_var('PASSWORD', prefix,
482 fallback_default=False,
483 allow_missing=allow_missing_password)
484 c.set_domain(domain)
485 c.set_realm(realm)
486 c.set_username(username)
487 if password is not None:
488 c.set_password(password)
489 as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
490 prefix, allow_missing=True)
491 if as_supported_enctypes is not None:
492 c.set_as_supported_enctypes(as_supported_enctypes)
493 tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
494 prefix, allow_missing=True)
495 if tgs_supported_enctypes is not None:
496 c.set_tgs_supported_enctypes(tgs_supported_enctypes)
497 ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
498 prefix, allow_missing=True)
499 if ap_supported_enctypes is not None:
500 c.set_ap_supported_enctypes(ap_supported_enctypes)
502 if require_strongest_key:
503 kvno_allow_missing = False
504 if password is None:
505 aes256_allow_missing = False
506 else:
507 aes256_allow_missing = True
508 else:
509 kvno_allow_missing = allow_missing_keys
510 aes256_allow_missing = allow_missing_keys
511 kvno = self.env_get_var('KVNO', prefix,
512 fallback_default=False,
513 allow_missing=kvno_allow_missing)
514 if kvno is not None:
515 c.set_kvno(kvno)
516 aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
517 fallback_default=False,
518 allow_missing=aes256_allow_missing)
519 if aes256_key is not None:
520 c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
521 aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
522 fallback_default=False,
523 allow_missing=True)
524 if aes128_key is not None:
525 c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
526 rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
527 fallback_default=False, allow_missing=True)
528 if rc4_key is not None:
529 c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
531 if not allow_missing_keys:
532 self.assertTrue(c.forced_keys,
533 'Please supply %s encryption keys '
534 'in environment' % prefix)
536 return c
538 def _get_krb5_creds(self,
539 prefix,
540 default_username=None,
541 allow_missing_password=False,
542 allow_missing_keys=True,
543 require_strongest_key=False,
544 fallback_creds_fn=None):
545 if prefix in self.creds_dict:
546 return self.creds_dict[prefix]
548 # We don't have the credentials already
549 creds = None
550 env_err = None
551 try:
552 # Try to obtain them from the environment
553 creds = self._get_krb5_creds_from_env(
554 prefix,
555 default_username=default_username,
556 allow_missing_password=allow_missing_password,
557 allow_missing_keys=allow_missing_keys,
558 require_strongest_key=require_strongest_key)
559 except Exception as err:
560 # An error occurred, so save it for later
561 env_err = err
562 else:
563 self.assertIsNotNone(creds)
564 # Save the obtained credentials
565 self.creds_dict[prefix] = creds
566 return creds
568 if fallback_creds_fn is not None:
569 try:
570 # Try to use the fallback method
571 creds = fallback_creds_fn()
572 except Exception as err:
573 print("ERROR FROM ENV: %r" % (env_err))
574 print("FALLBACK-FN: %s" % (fallback_creds_fn))
575 print("FALLBACK-ERROR: %r" % (err))
576 else:
577 self.assertIsNotNone(creds)
578 # Save the obtained credentials
579 self.creds_dict[prefix] = creds
580 return creds
582 # Both methods failed, so raise the exception from the
583 # environment method
584 raise env_err
586 def get_user_creds(self,
587 allow_missing_password=False,
588 allow_missing_keys=True):
589 c = self._get_krb5_creds(prefix=None,
590 allow_missing_password=allow_missing_password,
591 allow_missing_keys=allow_missing_keys)
592 return c
594 def get_service_creds(self,
595 allow_missing_password=False,
596 allow_missing_keys=True):
597 c = self._get_krb5_creds(prefix='SERVICE',
598 allow_missing_password=allow_missing_password,
599 allow_missing_keys=allow_missing_keys)
600 return c
602 def get_client_creds(self,
603 allow_missing_password=False,
604 allow_missing_keys=True):
605 c = self._get_krb5_creds(prefix='CLIENT',
606 allow_missing_password=allow_missing_password,
607 allow_missing_keys=allow_missing_keys)
608 return c
610 def get_server_creds(self,
611 allow_missing_password=False,
612 allow_missing_keys=True):
613 c = self._get_krb5_creds(prefix='SERVER',
614 allow_missing_password=allow_missing_password,
615 allow_missing_keys=allow_missing_keys)
616 return c
618 def get_admin_creds(self,
619 allow_missing_password=False,
620 allow_missing_keys=True):
621 c = self._get_krb5_creds(prefix='ADMIN',
622 allow_missing_password=allow_missing_password,
623 allow_missing_keys=allow_missing_keys)
624 c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
625 return c
627 def get_krbtgt_creds(self,
628 require_keys=True,
629 require_strongest_key=False):
630 if require_strongest_key:
631 self.assertTrue(require_keys)
632 c = self._get_krb5_creds(prefix='KRBTGT',
633 default_username='krbtgt',
634 allow_missing_password=True,
635 allow_missing_keys=not require_keys,
636 require_strongest_key=require_strongest_key)
637 return c
639 def get_anon_creds(self):
640 c = Credentials()
641 c.set_anonymous()
642 return c
644 def asn1_dump(self, name, obj, asn1_print=None):
645 if asn1_print is None:
646 asn1_print = self.do_asn1_print
647 if asn1_print:
648 if name is not None:
649 sys.stderr.write("%s:\n%s" % (name, obj))
650 else:
651 sys.stderr.write("%s" % (obj))
653 def hex_dump(self, name, blob, hexdump=None):
654 if hexdump is None:
655 hexdump = self.do_hexdump
656 if hexdump:
657 sys.stderr.write(
658 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
660 def der_decode(
661 self,
662 blob,
663 asn1Spec=None,
664 native_encode=True,
665 asn1_print=None,
666 hexdump=None):
667 if asn1Spec is not None:
668 class_name = type(asn1Spec).__name__.split(':')[0]
669 else:
670 class_name = "<None-asn1Spec>"
671 self.hex_dump(class_name, blob, hexdump=hexdump)
672 obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
673 self.asn1_dump(None, obj, asn1_print=asn1_print)
674 if native_encode:
675 obj = pyasn1_native_encode(obj)
676 return obj
678 def der_encode(
679 self,
680 obj,
681 asn1Spec=None,
682 native_decode=True,
683 asn1_print=None,
684 hexdump=None):
685 if native_decode:
686 obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
687 class_name = type(obj).__name__.split(':')[0]
688 if class_name is not None:
689 self.asn1_dump(None, obj, asn1_print=asn1_print)
690 blob = pyasn1_der_encode(obj)
691 if class_name is not None:
692 self.hex_dump(class_name, blob, hexdump=hexdump)
693 return blob
695 def send_pdu(self, req, asn1_print=None, hexdump=None):
696 try:
697 k5_pdu = self.der_encode(
698 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
699 header = struct.pack('>I', len(k5_pdu))
700 req_pdu = header
701 req_pdu += k5_pdu
702 self.hex_dump("send_pdu", header, hexdump=hexdump)
703 self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
704 while True:
705 sent = self.s.send(req_pdu, 0)
706 if sent == len(req_pdu):
707 break
708 req_pdu = req_pdu[sent:]
709 except socket.error as e:
710 self._disconnect("send_pdu: %s" % e)
711 raise
712 except IOError as e:
713 self._disconnect("send_pdu: %s" % e)
714 raise
716 def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
717 rep_pdu = None
718 try:
719 if timeout is not None:
720 self.s.settimeout(timeout)
721 rep_pdu = self.s.recv(num_recv, 0)
722 self.s.settimeout(10)
723 if len(rep_pdu) == 0:
724 self._disconnect("recv_raw: EOF")
725 return None
726 self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
727 except socket.timeout:
728 self.s.settimeout(10)
729 sys.stderr.write("recv_raw: TIMEOUT\n")
730 except socket.error as e:
731 self._disconnect("recv_raw: %s" % e)
732 raise
733 except IOError as e:
734 self._disconnect("recv_raw: %s" % e)
735 raise
736 return rep_pdu
738 def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
739 rep_pdu = None
740 rep = None
741 raw_pdu = self.recv_raw(
742 num_recv=4, hexdump=hexdump, timeout=timeout)
743 if raw_pdu is None:
744 return (None, None)
745 header = struct.unpack(">I", raw_pdu[0:4])
746 k5_len = header[0]
747 if k5_len == 0:
748 return (None, "")
749 missing = k5_len
750 rep_pdu = b''
751 while missing > 0:
752 raw_pdu = self.recv_raw(
753 num_recv=missing, hexdump=hexdump, timeout=timeout)
754 self.assertGreaterEqual(len(raw_pdu), 1)
755 rep_pdu += raw_pdu
756 missing = k5_len - len(rep_pdu)
757 k5_raw = self.der_decode(
758 rep_pdu,
759 asn1Spec=None,
760 native_encode=False,
761 asn1_print=False,
762 hexdump=False)
763 pvno = k5_raw['field-0']
764 self.assertEqual(pvno, 5)
765 msg_type = k5_raw['field-1']
766 self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
767 if msg_type == KRB_AS_REP:
768 asn1Spec = krb5_asn1.AS_REP()
769 elif msg_type == KRB_TGS_REP:
770 asn1Spec = krb5_asn1.TGS_REP()
771 elif msg_type == KRB_ERROR:
772 asn1Spec = krb5_asn1.KRB_ERROR()
773 rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
774 asn1_print=asn1_print, hexdump=False)
775 return (rep, rep_pdu)
777 def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
778 (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
779 hexdump=hexdump,
780 timeout=timeout)
781 return rep
783 def assertIsConnected(self):
784 self.assertIsNotNone(self.s, msg="Not connected")
786 def assertNotConnected(self):
787 self.assertIsNone(self.s, msg="Is connected")
789 def send_recv_transaction(
790 self,
791 req,
792 asn1_print=None,
793 hexdump=None,
794 timeout=None):
795 self.connect()
796 try:
797 self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
798 rep = self.recv_pdu(
799 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
800 except Exception:
801 self._disconnect("transaction failed")
802 raise
803 self._disconnect("transaction done")
804 return rep
806 def assertNoValue(self, value):
807 self.assertTrue(value.isNoValue)
809 def assertHasValue(self, value):
810 self.assertIsNotNone(value)
812 def getElementValue(self, obj, elem):
813 return obj.get(elem, None)
815 def assertElementMissing(self, obj, elem):
816 v = self.getElementValue(obj, elem)
817 self.assertIsNone(v)
819 def assertElementPresent(self, obj, elem):
820 v = self.getElementValue(obj, elem)
821 self.assertIsNotNone(v)
822 if self.strict_checking:
823 if isinstance(v, collections.abc.Container):
824 self.assertNotEqual(0, len(v))
826 def assertElementEqual(self, obj, elem, value):
827 v = self.getElementValue(obj, elem)
828 self.assertIsNotNone(v)
829 self.assertEqual(v, value)
831 def assertElementEqualUTF8(self, obj, elem, value):
832 v = self.getElementValue(obj, elem)
833 self.assertIsNotNone(v)
834 self.assertEqual(v, bytes(value, 'utf8'))
836 def assertPrincipalEqual(self, princ1, princ2):
837 self.assertEqual(princ1['name-type'], princ2['name-type'])
838 self.assertEqual(
839 len(princ1['name-string']),
840 len(princ2['name-string']),
841 msg="princ1=%s != princ2=%s" % (princ1, princ2))
842 for idx in range(len(princ1['name-string'])):
843 self.assertEqual(
844 princ1['name-string'][idx],
845 princ2['name-string'][idx],
846 msg="princ1=%s != princ2=%s" % (princ1, princ2))
848 def assertElementEqualPrincipal(self, obj, elem, value):
849 v = self.getElementValue(obj, elem)
850 self.assertIsNotNone(v)
851 v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
852 self.assertPrincipalEqual(v, value)
854 def assertElementKVNO(self, obj, elem, value):
855 v = self.getElementValue(obj, elem)
856 if value == "autodetect":
857 value = v
858 if value is not None:
859 self.assertIsNotNone(v)
860 # The value on the wire should never be 0
861 self.assertNotEqual(v, 0)
862 # unspecified_kvno means we don't know the kvno,
863 # but want to enforce its presence
864 if value is not self.unspecified_kvno:
865 value = int(value)
866 self.assertNotEqual(value, 0)
867 self.assertEqual(v, value)
868 else:
869 self.assertIsNone(v)
871 def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
872 if epoch is None:
873 epoch = time.time()
874 if offset is not None:
875 epoch = epoch + int(offset)
876 dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
877 return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
879 def get_KerberosTime(self, epoch=None, offset=None):
880 (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
881 return s
883 def get_EpochFromKerberosTime(self, kerberos_time):
884 if isinstance(kerberos_time, bytes):
885 kerberos_time = kerberos_time.decode()
887 epoch = datetime.datetime.strptime(kerberos_time,
888 '%Y%m%d%H%M%SZ')
889 epoch = epoch.replace(tzinfo=datetime.timezone.utc)
890 epoch = int(epoch.timestamp())
892 return epoch
894 def get_Nonce(self):
895 nonce_min = 0x7f000000
896 nonce_max = 0x7fffffff
897 v = random.randint(nonce_min, nonce_max)
898 return v
900 def get_pa_dict(self, pa_data):
901 pa_dict = {}
903 if pa_data is not None:
904 for pa in pa_data:
905 pa_type = pa['padata-type']
906 if pa_type in pa_dict:
907 raise RuntimeError(f'Duplicate type {pa_type}')
908 pa_dict[pa_type] = pa['padata-value']
910 return pa_dict
912 def SessionKey_create(self, etype, contents, kvno=None):
913 key = kcrypto.Key(etype, contents)
914 return Krb5EncryptionKey(key, kvno)
916 def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None):
917 self.assertIsNotNone(pwd)
918 self.assertIsNotNone(salt)
919 key = kcrypto.string_to_key(etype, pwd, salt)
920 return Krb5EncryptionKey(key, kvno)
922 def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
923 e = etype_info2['etype']
925 salt = etype_info2.get('salt', None)
927 if e == kcrypto.Enctype.RC4:
928 nthash = creds.get_nt_hash()
929 return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
931 password = creds.get_password()
932 return self.PasswordKey_create(
933 etype=e, pwd=password, salt=salt, kvno=kvno)
935 def TicketDecryptionKey_from_creds(self, creds, etype=None):
937 if etype is None:
938 etypes = creds.get_tgs_krb5_etypes()
939 etype = etypes[0]
941 forced_key = creds.get_forced_key(etype)
942 if forced_key is not None:
943 return forced_key
945 kvno = creds.get_kvno()
947 fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
948 "nor a password specified, " % (
949 creds.get_username(), etype, kvno))
951 if etype == kcrypto.Enctype.RC4:
952 nthash = creds.get_nt_hash()
953 self.assertIsNotNone(nthash, msg=fail_msg)
954 return self.SessionKey_create(etype=etype,
955 contents=nthash,
956 kvno=kvno)
958 password = creds.get_password()
959 self.assertIsNotNone(password, msg=fail_msg)
960 salt = creds.get_salt()
961 return self.PasswordKey_create(etype=etype,
962 pwd=password,
963 salt=salt,
964 kvno=kvno)
966 def RandomKey(self, etype):
967 e = kcrypto._get_enctype_profile(etype)
968 contents = samba.generate_random_bytes(e.keysize)
969 return self.SessionKey_create(etype=etype, contents=contents)
971 def EncryptionKey_import(self, EncryptionKey_obj):
972 return self.SessionKey_create(EncryptionKey_obj['keytype'],
973 EncryptionKey_obj['keyvalue'])
975 def EncryptedData_create(self, key, usage, plaintext):
976 # EncryptedData ::= SEQUENCE {
977 # etype [0] Int32 -- EncryptionType --,
978 # kvno [1] UInt32 OPTIONAL,
979 # cipher [2] OCTET STRING -- ciphertext
981 ciphertext = key.encrypt(usage, plaintext)
982 EncryptedData_obj = {
983 'etype': key.etype,
984 'cipher': ciphertext
986 if key.kvno is not None:
987 EncryptedData_obj['kvno'] = key.kvno
988 return EncryptedData_obj
990 def Checksum_create(self, key, usage, plaintext, ctype=None):
991 # Checksum ::= SEQUENCE {
992 # cksumtype [0] Int32,
993 # checksum [1] OCTET STRING
995 if ctype is None:
996 ctype = key.ctype
997 checksum = key.make_checksum(usage, plaintext, ctype=ctype)
998 Checksum_obj = {
999 'cksumtype': ctype,
1000 'checksum': checksum,
1002 return Checksum_obj
1004 @classmethod
1005 def PrincipalName_create(cls, name_type, names):
1006 # PrincipalName ::= SEQUENCE {
1007 # name-type [0] Int32,
1008 # name-string [1] SEQUENCE OF KerberosString
1010 PrincipalName_obj = {
1011 'name-type': name_type,
1012 'name-string': names,
1014 return PrincipalName_obj
1016 def PA_DATA_create(self, padata_type, padata_value):
1017 # PA-DATA ::= SEQUENCE {
1018 # -- NOTE: first tag is [1], not [0]
1019 # padata-type [1] Int32,
1020 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1022 PA_DATA_obj = {
1023 'padata-type': padata_type,
1024 'padata-value': padata_value,
1026 return PA_DATA_obj
1028 def PA_ENC_TS_ENC_create(self, ts, usec):
1029 # PA-ENC-TS-ENC ::= SEQUENCE {
1030 # patimestamp[0] KerberosTime, -- client's time
1031 # pausec[1] krb5int32 OPTIONAL
1033 PA_ENC_TS_ENC_obj = {
1034 'patimestamp': ts,
1035 'pausec': usec,
1037 return PA_ENC_TS_ENC_obj
1039 def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1040 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1041 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1042 # -- include PAC.
1043 # --If FALSE, and PAC present,
1044 # -- remove PAC.
1046 KERB_PA_PAC_REQUEST_obj = {
1047 'include-pac': include_pac,
1049 if not pa_data_create:
1050 return KERB_PA_PAC_REQUEST_obj
1051 pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1052 asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1053 pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1054 return pa_data
1056 def KDC_REQ_BODY_create(self,
1057 kdc_options,
1058 cname,
1059 realm,
1060 sname,
1061 from_time,
1062 till_time,
1063 renew_time,
1064 nonce,
1065 etypes,
1066 addresses,
1067 additional_tickets,
1068 EncAuthorizationData,
1069 EncAuthorizationData_key,
1070 EncAuthorizationData_usage,
1071 asn1_print=None,
1072 hexdump=None):
1073 # KDC-REQ-BODY ::= SEQUENCE {
1074 # kdc-options [0] KDCOptions,
1075 # cname [1] PrincipalName OPTIONAL
1076 # -- Used only in AS-REQ --,
1077 # realm [2] Realm
1078 # -- Server's realm
1079 # -- Also client's in AS-REQ --,
1080 # sname [3] PrincipalName OPTIONAL,
1081 # from [4] KerberosTime OPTIONAL,
1082 # till [5] KerberosTime,
1083 # rtime [6] KerberosTime OPTIONAL,
1084 # nonce [7] UInt32,
1085 # etype [8] SEQUENCE OF Int32
1086 # -- EncryptionType
1087 # -- in preference order --,
1088 # addresses [9] HostAddresses OPTIONAL,
1089 # enc-authorization-data [10] EncryptedData OPTIONAL
1090 # -- AuthorizationData --,
1091 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1092 # -- NOTE: not empty
1094 if EncAuthorizationData is not None:
1095 enc_ad_plain = self.der_encode(
1096 EncAuthorizationData,
1097 asn1Spec=krb5_asn1.AuthorizationData(),
1098 asn1_print=asn1_print,
1099 hexdump=hexdump)
1100 enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1101 EncAuthorizationData_usage,
1102 enc_ad_plain)
1103 else:
1104 enc_ad = None
1105 KDC_REQ_BODY_obj = {
1106 'kdc-options': kdc_options,
1107 'realm': realm,
1108 'till': till_time,
1109 'nonce': nonce,
1110 'etype': etypes,
1112 if cname is not None:
1113 KDC_REQ_BODY_obj['cname'] = cname
1114 if sname is not None:
1115 KDC_REQ_BODY_obj['sname'] = sname
1116 if from_time is not None:
1117 KDC_REQ_BODY_obj['from'] = from_time
1118 if renew_time is not None:
1119 KDC_REQ_BODY_obj['rtime'] = renew_time
1120 if addresses is not None:
1121 KDC_REQ_BODY_obj['addresses'] = addresses
1122 if enc_ad is not None:
1123 KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1124 if additional_tickets is not None:
1125 KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1126 return KDC_REQ_BODY_obj
1128 def KDC_REQ_create(self,
1129 msg_type,
1130 padata,
1131 req_body,
1132 asn1Spec=None,
1133 asn1_print=None,
1134 hexdump=None):
1135 # KDC-REQ ::= SEQUENCE {
1136 # -- NOTE: first tag is [1], not [0]
1137 # pvno [1] INTEGER (5) ,
1138 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1139 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1140 # -- NOTE: not empty --,
1141 # req-body [4] KDC-REQ-BODY
1144 KDC_REQ_obj = {
1145 'pvno': 5,
1146 'msg-type': msg_type,
1147 'req-body': req_body,
1149 if padata is not None:
1150 KDC_REQ_obj['padata'] = padata
1151 if asn1Spec is not None:
1152 KDC_REQ_decoded = pyasn1_native_decode(
1153 KDC_REQ_obj, asn1Spec=asn1Spec)
1154 else:
1155 KDC_REQ_decoded = None
1156 return KDC_REQ_obj, KDC_REQ_decoded
1158 def AS_REQ_create(self,
1159 padata, # optional
1160 kdc_options, # required
1161 cname, # optional
1162 realm, # required
1163 sname, # optional
1164 from_time, # optional
1165 till_time, # required
1166 renew_time, # optional
1167 nonce, # required
1168 etypes, # required
1169 addresses, # optional
1170 additional_tickets,
1171 native_decoded_only=True,
1172 asn1_print=None,
1173 hexdump=None):
1174 # KDC-REQ ::= SEQUENCE {
1175 # -- NOTE: first tag is [1], not [0]
1176 # pvno [1] INTEGER (5) ,
1177 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1178 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1179 # -- NOTE: not empty --,
1180 # req-body [4] KDC-REQ-BODY
1183 # KDC-REQ-BODY ::= SEQUENCE {
1184 # kdc-options [0] KDCOptions,
1185 # cname [1] PrincipalName OPTIONAL
1186 # -- Used only in AS-REQ --,
1187 # realm [2] Realm
1188 # -- Server's realm
1189 # -- Also client's in AS-REQ --,
1190 # sname [3] PrincipalName OPTIONAL,
1191 # from [4] KerberosTime OPTIONAL,
1192 # till [5] KerberosTime,
1193 # rtime [6] KerberosTime OPTIONAL,
1194 # nonce [7] UInt32,
1195 # etype [8] SEQUENCE OF Int32
1196 # -- EncryptionType
1197 # -- in preference order --,
1198 # addresses [9] HostAddresses OPTIONAL,
1199 # enc-authorization-data [10] EncryptedData OPTIONAL
1200 # -- AuthorizationData --,
1201 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1202 # -- NOTE: not empty
1204 KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1205 kdc_options,
1206 cname,
1207 realm,
1208 sname,
1209 from_time,
1210 till_time,
1211 renew_time,
1212 nonce,
1213 etypes,
1214 addresses,
1215 additional_tickets,
1216 EncAuthorizationData=None,
1217 EncAuthorizationData_key=None,
1218 EncAuthorizationData_usage=None,
1219 asn1_print=asn1_print,
1220 hexdump=hexdump)
1221 obj, decoded = self.KDC_REQ_create(
1222 msg_type=KRB_AS_REQ,
1223 padata=padata,
1224 req_body=KDC_REQ_BODY_obj,
1225 asn1Spec=krb5_asn1.AS_REQ(),
1226 asn1_print=asn1_print,
1227 hexdump=hexdump)
1228 if native_decoded_only:
1229 return decoded
1230 return decoded, obj
1232 def AP_REQ_create(self, ap_options, ticket, authenticator):
1233 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1234 # pvno [0] INTEGER (5),
1235 # msg-type [1] INTEGER (14),
1236 # ap-options [2] APOptions,
1237 # ticket [3] Ticket,
1238 # authenticator [4] EncryptedData -- Authenticator
1240 AP_REQ_obj = {
1241 'pvno': 5,
1242 'msg-type': KRB_AP_REQ,
1243 'ap-options': ap_options,
1244 'ticket': ticket,
1245 'authenticator': authenticator,
1247 return AP_REQ_obj
1249 def Authenticator_create(
1250 self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1251 authorization_data):
1252 # -- Unencrypted authenticator
1253 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1254 # authenticator-vno [0] INTEGER (5),
1255 # crealm [1] Realm,
1256 # cname [2] PrincipalName,
1257 # cksum [3] Checksum OPTIONAL,
1258 # cusec [4] Microseconds,
1259 # ctime [5] KerberosTime,
1260 # subkey [6] EncryptionKey OPTIONAL,
1261 # seq-number [7] UInt32 OPTIONAL,
1262 # authorization-data [8] AuthorizationData OPTIONAL
1264 Authenticator_obj = {
1265 'authenticator-vno': 5,
1266 'crealm': crealm,
1267 'cname': cname,
1268 'cusec': cusec,
1269 'ctime': ctime,
1271 if cksum is not None:
1272 Authenticator_obj['cksum'] = cksum
1273 if subkey is not None:
1274 Authenticator_obj['subkey'] = subkey
1275 if seq_number is not None:
1276 Authenticator_obj['seq-number'] = seq_number
1277 if authorization_data is not None:
1278 Authenticator_obj['authorization-data'] = authorization_data
1279 return Authenticator_obj
1281 def TGS_REQ_create(self,
1282 padata, # optional
1283 cusec,
1284 ctime,
1285 ticket,
1286 kdc_options, # required
1287 cname, # optional
1288 realm, # required
1289 sname, # optional
1290 from_time, # optional
1291 till_time, # required
1292 renew_time, # optional
1293 nonce, # required
1294 etypes, # required
1295 addresses, # optional
1296 EncAuthorizationData,
1297 EncAuthorizationData_key,
1298 additional_tickets,
1299 ticket_session_key,
1300 authenticator_subkey=None,
1301 body_checksum_type=None,
1302 native_decoded_only=True,
1303 asn1_print=None,
1304 hexdump=None):
1305 # KDC-REQ ::= SEQUENCE {
1306 # -- NOTE: first tag is [1], not [0]
1307 # pvno [1] INTEGER (5) ,
1308 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1309 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1310 # -- NOTE: not empty --,
1311 # req-body [4] KDC-REQ-BODY
1314 # KDC-REQ-BODY ::= SEQUENCE {
1315 # kdc-options [0] KDCOptions,
1316 # cname [1] PrincipalName OPTIONAL
1317 # -- Used only in AS-REQ --,
1318 # realm [2] Realm
1319 # -- Server's realm
1320 # -- Also client's in AS-REQ --,
1321 # sname [3] PrincipalName OPTIONAL,
1322 # from [4] KerberosTime OPTIONAL,
1323 # till [5] KerberosTime,
1324 # rtime [6] KerberosTime OPTIONAL,
1325 # nonce [7] UInt32,
1326 # etype [8] SEQUENCE OF Int32
1327 # -- EncryptionType
1328 # -- in preference order --,
1329 # addresses [9] HostAddresses OPTIONAL,
1330 # enc-authorization-data [10] EncryptedData OPTIONAL
1331 # -- AuthorizationData --,
1332 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1333 # -- NOTE: not empty
1336 if authenticator_subkey is not None:
1337 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1338 else:
1339 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1341 req_body = self.KDC_REQ_BODY_create(
1342 kdc_options=kdc_options,
1343 cname=None,
1344 realm=realm,
1345 sname=sname,
1346 from_time=from_time,
1347 till_time=till_time,
1348 renew_time=renew_time,
1349 nonce=nonce,
1350 etypes=etypes,
1351 addresses=addresses,
1352 additional_tickets=additional_tickets,
1353 EncAuthorizationData=EncAuthorizationData,
1354 EncAuthorizationData_key=EncAuthorizationData_key,
1355 EncAuthorizationData_usage=EncAuthorizationData_usage)
1356 req_body_blob = self.der_encode(req_body,
1357 asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1358 asn1_print=asn1_print, hexdump=hexdump)
1360 req_body_checksum = self.Checksum_create(ticket_session_key,
1361 KU_TGS_REQ_AUTH_CKSUM,
1362 req_body_blob,
1363 ctype=body_checksum_type)
1365 subkey_obj = None
1366 if authenticator_subkey is not None:
1367 subkey_obj = authenticator_subkey.export_obj()
1368 seq_number = random.randint(0, 0xfffffffe)
1369 authenticator = self.Authenticator_create(
1370 crealm=realm,
1371 cname=cname,
1372 cksum=req_body_checksum,
1373 cusec=cusec,
1374 ctime=ctime,
1375 subkey=subkey_obj,
1376 seq_number=seq_number,
1377 authorization_data=None)
1378 authenticator = self.der_encode(
1379 authenticator,
1380 asn1Spec=krb5_asn1.Authenticator(),
1381 asn1_print=asn1_print,
1382 hexdump=hexdump)
1384 authenticator = self.EncryptedData_create(
1385 ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1387 ap_options = krb5_asn1.APOptions('0')
1388 ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1389 ticket=ticket,
1390 authenticator=authenticator)
1391 ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1392 asn1_print=asn1_print, hexdump=hexdump)
1393 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1394 if padata is not None:
1395 padata.append(pa_tgs_req)
1396 else:
1397 padata = [pa_tgs_req]
1399 obj, decoded = self.KDC_REQ_create(
1400 msg_type=KRB_TGS_REQ,
1401 padata=padata,
1402 req_body=req_body,
1403 asn1Spec=krb5_asn1.TGS_REQ(),
1404 asn1_print=asn1_print,
1405 hexdump=hexdump)
1406 if native_decoded_only:
1407 return decoded
1408 return decoded, obj
1410 def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1411 # PA-S4U2Self ::= SEQUENCE {
1412 # name [0] PrincipalName,
1413 # realm [1] Realm,
1414 # cksum [2] Checksum,
1415 # auth [3] GeneralString
1417 cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1418 for n in name['name-string']:
1419 cksum_data += n.encode()
1420 cksum_data += realm.encode()
1421 cksum_data += "Kerberos".encode()
1422 cksum = self.Checksum_create(tgt_session_key,
1423 KU_NON_KERB_CKSUM_SALT,
1424 cksum_data,
1425 ctype)
1427 PA_S4U2Self_obj = {
1428 'name': name,
1429 'realm': realm,
1430 'cksum': cksum,
1431 'auth': "Kerberos",
1433 pa_s4u2self = self.der_encode(
1434 PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
1435 return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
1437 def _generic_kdc_exchange(self,
1438 kdc_exchange_dict, # required
1439 cname=None, # optional
1440 realm=None, # required
1441 sname=None, # optional
1442 from_time=None, # optional
1443 till_time=None, # required
1444 renew_time=None, # optional
1445 nonce=None, # required
1446 etypes=None, # required
1447 addresses=None, # optional
1448 additional_tickets=None, # optional
1449 EncAuthorizationData=None, # optional
1450 EncAuthorizationData_key=None, # optional
1451 EncAuthorizationData_usage=None): # optional
1453 check_error_fn = kdc_exchange_dict['check_error_fn']
1454 check_rep_fn = kdc_exchange_dict['check_rep_fn']
1455 generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
1456 callback_dict = kdc_exchange_dict['callback_dict']
1457 req_msg_type = kdc_exchange_dict['req_msg_type']
1458 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
1459 rep_msg_type = kdc_exchange_dict['rep_msg_type']
1461 expected_error_mode = kdc_exchange_dict['expected_error_mode']
1462 kdc_options = kdc_exchange_dict['kdc_options']
1464 if till_time is None:
1465 till_time = self.get_KerberosTime(offset=36000)
1466 if nonce is None:
1467 nonce = self.get_Nonce()
1469 req_body = self.KDC_REQ_BODY_create(
1470 kdc_options=kdc_options,
1471 cname=cname,
1472 realm=realm,
1473 sname=sname,
1474 from_time=from_time,
1475 till_time=till_time,
1476 renew_time=renew_time,
1477 nonce=nonce,
1478 etypes=etypes,
1479 addresses=addresses,
1480 additional_tickets=additional_tickets,
1481 EncAuthorizationData=EncAuthorizationData,
1482 EncAuthorizationData_key=EncAuthorizationData_key,
1483 EncAuthorizationData_usage=EncAuthorizationData_usage)
1484 if generate_padata_fn is not None:
1485 # This can alter req_body...
1486 padata, req_body = generate_padata_fn(kdc_exchange_dict,
1487 callback_dict,
1488 req_body)
1489 else:
1490 padata = None
1492 kdc_exchange_dict['req_padata'] = padata
1493 kdc_exchange_dict['req_body'] = req_body
1495 req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
1496 padata=padata,
1497 req_body=req_body,
1498 asn1Spec=req_asn1Spec())
1500 rep = self.send_recv_transaction(req_decoded)
1501 self.assertIsNotNone(rep)
1503 msg_type = self.getElementValue(rep, 'msg-type')
1504 self.assertIsNotNone(msg_type)
1506 expected_msg_type = None
1507 if check_error_fn is not None:
1508 expected_msg_type = KRB_ERROR
1509 self.assertIsNone(check_rep_fn)
1510 self.assertNotEqual(0, expected_error_mode)
1511 if check_rep_fn is not None:
1512 expected_msg_type = rep_msg_type
1513 self.assertIsNone(check_error_fn)
1514 self.assertEqual(0, expected_error_mode)
1515 self.assertIsNotNone(expected_msg_type)
1516 self.assertEqual(msg_type, expected_msg_type)
1518 if msg_type == KRB_ERROR:
1519 return check_error_fn(kdc_exchange_dict,
1520 callback_dict,
1521 rep)
1523 return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
1525 def as_exchange_dict(self,
1526 expected_crealm=None,
1527 expected_cname=None,
1528 expected_srealm=None,
1529 expected_sname=None,
1530 ticket_decryption_key=None,
1531 generate_padata_fn=None,
1532 check_error_fn=None,
1533 check_rep_fn=None,
1534 check_padata_fn=None,
1535 check_kdc_private_fn=None,
1536 callback_dict=None,
1537 expected_error_mode=0,
1538 client_as_etypes=None,
1539 expected_salt=None,
1540 kdc_options=''):
1541 kdc_exchange_dict = {
1542 'req_msg_type': KRB_AS_REQ,
1543 'req_asn1Spec': krb5_asn1.AS_REQ,
1544 'rep_msg_type': KRB_AS_REP,
1545 'rep_asn1Spec': krb5_asn1.AS_REP,
1546 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
1547 'expected_crealm': expected_crealm,
1548 'expected_cname': expected_cname,
1549 'expected_srealm': expected_srealm,
1550 'expected_sname': expected_sname,
1551 'ticket_decryption_key': ticket_decryption_key,
1552 'generate_padata_fn': generate_padata_fn,
1553 'check_error_fn': check_error_fn,
1554 'check_rep_fn': check_rep_fn,
1555 'check_padata_fn': check_padata_fn,
1556 'check_kdc_private_fn': check_kdc_private_fn,
1557 'callback_dict': callback_dict,
1558 'expected_error_mode': expected_error_mode,
1559 'client_as_etypes': client_as_etypes,
1560 'expected_salt': expected_salt,
1561 'kdc_options': kdc_options,
1563 if callback_dict is None:
1564 callback_dict = {}
1566 return kdc_exchange_dict
1568 def tgs_exchange_dict(self,
1569 expected_crealm=None,
1570 expected_cname=None,
1571 expected_srealm=None,
1572 expected_sname=None,
1573 ticket_decryption_key=None,
1574 generate_padata_fn=None,
1575 check_error_fn=None,
1576 check_rep_fn=None,
1577 check_padata_fn=None,
1578 check_kdc_private_fn=None,
1579 callback_dict=None,
1580 tgt=None,
1581 authenticator_subkey=None,
1582 body_checksum_type=None,
1583 kdc_options=''):
1584 kdc_exchange_dict = {
1585 'req_msg_type': KRB_TGS_REQ,
1586 'req_asn1Spec': krb5_asn1.TGS_REQ,
1587 'rep_msg_type': KRB_TGS_REP,
1588 'rep_asn1Spec': krb5_asn1.TGS_REP,
1589 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
1590 'expected_crealm': expected_crealm,
1591 'expected_cname': expected_cname,
1592 'expected_srealm': expected_srealm,
1593 'expected_sname': expected_sname,
1594 'ticket_decryption_key': ticket_decryption_key,
1595 'generate_padata_fn': generate_padata_fn,
1596 'check_error_fn': check_error_fn,
1597 'check_rep_fn': check_rep_fn,
1598 'check_padata_fn': check_padata_fn,
1599 'check_kdc_private_fn': check_kdc_private_fn,
1600 'callback_dict': callback_dict,
1601 'tgt': tgt,
1602 'body_checksum_type': body_checksum_type,
1603 'authenticator_subkey': authenticator_subkey,
1604 'kdc_options': kdc_options
1606 if callback_dict is None:
1607 callback_dict = {}
1609 return kdc_exchange_dict
1611 def generic_check_kdc_rep(self,
1612 kdc_exchange_dict,
1613 callback_dict,
1614 rep):
1616 expected_crealm = kdc_exchange_dict['expected_crealm']
1617 expected_cname = kdc_exchange_dict['expected_cname']
1618 expected_srealm = kdc_exchange_dict['expected_srealm']
1619 expected_sname = kdc_exchange_dict['expected_sname']
1620 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
1621 check_padata_fn = kdc_exchange_dict['check_padata_fn']
1622 check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
1623 rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
1624 msg_type = kdc_exchange_dict['rep_msg_type']
1626 self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
1627 padata = self.getElementValue(rep, 'padata')
1628 self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
1629 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
1630 self.assertElementPresent(rep, 'ticket')
1631 ticket = self.getElementValue(rep, 'ticket')
1632 ticket_encpart = None
1633 ticket_cipher = None
1634 self.assertIsNotNone(ticket)
1635 if ticket is not None: # Never None, but gives indentation
1636 self.assertElementEqual(ticket, 'tkt-vno', 5)
1637 self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
1638 self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
1639 self.assertElementPresent(ticket, 'enc-part')
1640 ticket_encpart = self.getElementValue(ticket, 'enc-part')
1641 self.assertIsNotNone(ticket_encpart)
1642 if ticket_encpart is not None: # Never None, but gives indentation
1643 self.assertElementPresent(ticket_encpart, 'etype')
1644 # 'unspecified' means present, with any value != 0
1645 self.assertElementKVNO(ticket_encpart, 'kvno',
1646 self.unspecified_kvno)
1647 self.assertElementPresent(ticket_encpart, 'cipher')
1648 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
1649 self.assertElementPresent(rep, 'enc-part')
1650 encpart = self.getElementValue(rep, 'enc-part')
1651 encpart_cipher = None
1652 self.assertIsNotNone(encpart)
1653 if encpart is not None: # Never None, but gives indentation
1654 self.assertElementPresent(encpart, 'etype')
1655 self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
1656 self.assertElementPresent(encpart, 'cipher')
1657 encpart_cipher = self.getElementValue(encpart, 'cipher')
1659 encpart_decryption_key = None
1660 self.assertIsNotNone(check_padata_fn)
1661 if check_padata_fn is not None:
1662 # See if we can get the decryption key from the preauth phase
1663 encpart_decryption_key, encpart_decryption_usage = (
1664 check_padata_fn(kdc_exchange_dict, callback_dict,
1665 rep, padata))
1667 ticket_private = None
1668 self.assertIsNotNone(ticket_decryption_key)
1669 if ticket_decryption_key is not None:
1670 self.assertElementEqual(ticket_encpart, 'etype',
1671 ticket_decryption_key.etype)
1672 self.assertElementKVNO(ticket_encpart, 'kvno',
1673 ticket_decryption_key.kvno)
1674 ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
1675 ticket_cipher)
1676 ticket_private = self.der_decode(
1677 ticket_decpart,
1678 asn1Spec=krb5_asn1.EncTicketPart())
1680 encpart_private = None
1681 self.assertIsNotNone(encpart_decryption_key)
1682 if encpart_decryption_key is not None:
1683 self.assertElementEqual(encpart, 'etype',
1684 encpart_decryption_key.etype)
1685 self.assertElementKVNO(encpart, 'kvno',
1686 encpart_decryption_key.kvno)
1687 rep_decpart = encpart_decryption_key.decrypt(
1688 encpart_decryption_usage,
1689 encpart_cipher)
1690 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
1691 # application tag 26
1692 try:
1693 encpart_private = self.der_decode(
1694 rep_decpart,
1695 asn1Spec=rep_encpart_asn1Spec())
1696 except Exception:
1697 encpart_private = self.der_decode(
1698 rep_decpart,
1699 asn1Spec=krb5_asn1.EncTGSRepPart())
1701 self.assertIsNotNone(check_kdc_private_fn)
1702 if check_kdc_private_fn is not None:
1703 check_kdc_private_fn(kdc_exchange_dict, callback_dict,
1704 rep, ticket_private, encpart_private)
1706 return rep
1708 def generic_check_kdc_private(self,
1709 kdc_exchange_dict,
1710 callback_dict,
1711 rep,
1712 ticket_private,
1713 encpart_private):
1715 expected_crealm = kdc_exchange_dict['expected_crealm']
1716 expected_cname = kdc_exchange_dict['expected_cname']
1717 expected_srealm = kdc_exchange_dict['expected_srealm']
1718 expected_sname = kdc_exchange_dict['expected_sname']
1719 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
1721 ticket = self.getElementValue(rep, 'ticket')
1723 ticket_session_key = None
1724 if ticket_private is not None:
1725 self.assertElementPresent(ticket_private, 'flags')
1726 self.assertElementPresent(ticket_private, 'key')
1727 ticket_key = self.getElementValue(ticket_private, 'key')
1728 self.assertIsNotNone(ticket_key)
1729 if ticket_key is not None: # Never None, but gives indentation
1730 self.assertElementPresent(ticket_key, 'keytype')
1731 self.assertElementPresent(ticket_key, 'keyvalue')
1732 ticket_session_key = self.EncryptionKey_import(ticket_key)
1733 self.assertElementEqualUTF8(ticket_private, 'crealm',
1734 expected_crealm)
1735 self.assertElementEqualPrincipal(ticket_private, 'cname',
1736 expected_cname)
1737 self.assertElementPresent(ticket_private, 'transited')
1738 self.assertElementPresent(ticket_private, 'authtime')
1739 if self.strict_checking:
1740 self.assertElementPresent(ticket_private, 'starttime')
1741 self.assertElementPresent(ticket_private, 'endtime')
1742 # TODO self.assertElementPresent(ticket_private, 'renew-till')
1743 # TODO self.assertElementMissing(ticket_private, 'caddr')
1744 self.assertElementPresent(ticket_private, 'authorization-data')
1746 encpart_session_key = None
1747 if encpart_private is not None:
1748 self.assertElementPresent(encpart_private, 'key')
1749 encpart_key = self.getElementValue(encpart_private, 'key')
1750 self.assertIsNotNone(encpart_key)
1751 if encpart_key is not None: # Never None, but gives indentation
1752 self.assertElementPresent(encpart_key, 'keytype')
1753 self.assertElementPresent(encpart_key, 'keyvalue')
1754 encpart_session_key = self.EncryptionKey_import(encpart_key)
1755 self.assertElementPresent(encpart_private, 'last-req')
1756 self.assertElementPresent(encpart_private, 'nonce')
1757 # TODO self.assertElementPresent(encpart_private,
1758 # 'key-expiration')
1759 self.assertElementPresent(encpart_private, 'flags')
1760 self.assertElementPresent(encpart_private, 'authtime')
1761 if self.strict_checking:
1762 self.assertElementPresent(encpart_private, 'starttime')
1763 self.assertElementPresent(encpart_private, 'endtime')
1764 # TODO self.assertElementPresent(encpart_private, 'renew-till')
1765 self.assertElementEqualUTF8(encpart_private, 'srealm',
1766 expected_srealm)
1767 self.assertElementEqualPrincipal(encpart_private, 'sname',
1768 expected_sname)
1769 # TODO self.assertElementMissing(encpart_private, 'caddr')
1771 if ticket_session_key is not None and encpart_session_key is not None:
1772 self.assertEqual(ticket_session_key.etype,
1773 encpart_session_key.etype)
1774 self.assertEqual(ticket_session_key.key.contents,
1775 encpart_session_key.key.contents)
1776 if encpart_session_key is not None:
1777 session_key = encpart_session_key
1778 else:
1779 session_key = ticket_session_key
1780 ticket_creds = KerberosTicketCreds(
1781 ticket,
1782 session_key,
1783 crealm=expected_crealm,
1784 cname=expected_cname,
1785 srealm=expected_srealm,
1786 sname=expected_sname,
1787 decryption_key=ticket_decryption_key,
1788 ticket_private=ticket_private,
1789 encpart_private=encpart_private)
1791 kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
1793 def generic_check_as_error(self,
1794 kdc_exchange_dict,
1795 callback_dict,
1796 rep):
1798 expected_crealm = kdc_exchange_dict['expected_crealm']
1799 expected_cname = kdc_exchange_dict['expected_cname']
1800 expected_srealm = kdc_exchange_dict['expected_srealm']
1801 expected_sname = kdc_exchange_dict['expected_sname']
1802 expected_salt = kdc_exchange_dict['expected_salt']
1803 client_as_etypes = kdc_exchange_dict['client_as_etypes']
1804 expected_error_mode = kdc_exchange_dict['expected_error_mode']
1805 req_body = kdc_exchange_dict['req_body']
1806 proposed_etypes = req_body['etype']
1808 kdc_exchange_dict['preauth_etype_info2'] = None
1810 expect_etype_info2 = ()
1811 expect_etype_info = False
1812 unexpect_etype_info = True
1813 expected_aes_type = 0
1814 expected_rc4_type = 0
1815 if kcrypto.Enctype.RC4 in proposed_etypes:
1816 expect_etype_info = True
1817 for etype in proposed_etypes:
1818 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
1819 expect_etype_info = False
1820 if etype not in client_as_etypes:
1821 continue
1822 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
1823 if etype > expected_aes_type:
1824 expected_aes_type = etype
1825 if etype in (kcrypto.Enctype.RC4,):
1826 unexpect_etype_info = False
1827 if etype > expected_rc4_type:
1828 expected_rc4_type = etype
1830 if expected_aes_type != 0:
1831 expect_etype_info2 += (expected_aes_type,)
1832 if expected_rc4_type != 0:
1833 expect_etype_info2 += (expected_rc4_type,)
1835 expected_patypes = ()
1836 if expect_etype_info:
1837 self.assertGreater(len(expect_etype_info2), 0)
1838 expected_patypes += (PADATA_ETYPE_INFO,)
1839 if len(expect_etype_info2) != 0:
1840 expected_patypes += (PADATA_ETYPE_INFO2,)
1842 expected_patypes += (PADATA_ENC_TIMESTAMP,)
1843 expected_patypes += (PADATA_PK_AS_REQ,)
1844 expected_patypes += (PADATA_PK_AS_REP_19,)
1846 self.assertElementEqual(rep, 'pvno', 5)
1847 self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
1848 self.assertElementEqual(rep, 'error-code', expected_error_mode)
1849 self.assertElementMissing(rep, 'ctime')
1850 self.assertElementMissing(rep, 'cusec')
1851 self.assertElementPresent(rep, 'stime')
1852 self.assertElementPresent(rep, 'susec')
1853 # error-code checked above
1854 if self.strict_checking:
1855 self.assertElementMissing(rep, 'crealm')
1856 self.assertElementMissing(rep, 'cname')
1857 self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
1858 self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
1859 if self.strict_checking:
1860 self.assertElementMissing(rep, 'e-text')
1861 if expected_error_mode == KDC_ERR_GENERIC:
1862 self.assertElementMissing(rep, 'e-data')
1863 return
1864 edata = self.getElementValue(rep, 'e-data')
1865 if self.strict_checking:
1866 self.assertIsNotNone(edata)
1867 if edata is not None:
1868 rep_padata = self.der_decode(edata,
1869 asn1Spec=krb5_asn1.METHOD_DATA())
1870 self.assertGreater(len(rep_padata), 0)
1871 else:
1872 rep_padata = []
1874 if self.strict_checking:
1875 for i, patype in enumerate(expected_patypes):
1876 self.assertElementEqual(rep_padata[i], 'padata-type', patype)
1877 self.assertEqual(len(rep_padata), len(expected_patypes))
1879 etype_info2 = None
1880 etype_info = None
1881 enc_timestamp = None
1882 pk_as_req = None
1883 pk_as_rep19 = None
1884 for pa in rep_padata:
1885 patype = self.getElementValue(pa, 'padata-type')
1886 pavalue = self.getElementValue(pa, 'padata-value')
1887 if patype == PADATA_ETYPE_INFO2:
1888 self.assertIsNone(etype_info2)
1889 etype_info2 = self.der_decode(pavalue,
1890 asn1Spec=krb5_asn1.ETYPE_INFO2())
1891 continue
1892 if patype == PADATA_ETYPE_INFO:
1893 self.assertIsNone(etype_info)
1894 etype_info = self.der_decode(pavalue,
1895 asn1Spec=krb5_asn1.ETYPE_INFO())
1896 continue
1897 if patype == PADATA_ENC_TIMESTAMP:
1898 self.assertIsNone(enc_timestamp)
1899 enc_timestamp = pavalue
1900 self.assertEqual(len(enc_timestamp), 0)
1901 continue
1902 if patype == PADATA_PK_AS_REQ:
1903 self.assertIsNone(pk_as_req)
1904 pk_as_req = pavalue
1905 self.assertEqual(len(pk_as_req), 0)
1906 continue
1907 if patype == PADATA_PK_AS_REP_19:
1908 self.assertIsNone(pk_as_rep19)
1909 pk_as_rep19 = pavalue
1910 self.assertEqual(len(pk_as_rep19), 0)
1911 continue
1913 if all(etype not in client_as_etypes or etype not in proposed_etypes
1914 for etype in (kcrypto.Enctype.AES256,
1915 kcrypto.Enctype.AES128,
1916 kcrypto.Enctype.RC4)):
1917 self.assertIsNone(etype_info2)
1918 self.assertIsNone(etype_info)
1919 if self.strict_checking:
1920 self.assertIsNotNone(enc_timestamp)
1921 self.assertIsNotNone(pk_as_req)
1922 self.assertIsNotNone(pk_as_rep19)
1923 return
1925 self.assertIsNotNone(etype_info2)
1926 if expect_etype_info:
1927 self.assertIsNotNone(etype_info)
1928 else:
1929 if self.strict_checking:
1930 self.assertIsNone(etype_info)
1931 if unexpect_etype_info:
1932 self.assertIsNone(etype_info)
1934 self.assertGreaterEqual(len(etype_info2), 1)
1935 self.assertLessEqual(len(etype_info2), len(expect_etype_info2))
1936 if self.strict_checking:
1937 self.assertEqual(len(etype_info2), len(expect_etype_info2))
1938 for i in range(0, len(etype_info2)):
1939 e = self.getElementValue(etype_info2[i], 'etype')
1940 self.assertEqual(e, expect_etype_info2[i])
1941 salt = self.getElementValue(etype_info2[i], 'salt')
1942 if e == kcrypto.Enctype.RC4:
1943 self.assertIsNone(salt)
1944 else:
1945 self.assertIsNotNone(salt)
1946 if expected_salt is not None:
1947 self.assertEqual(salt, expected_salt)
1948 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
1949 if self.strict_checking:
1950 self.assertIsNone(s2kparams)
1951 if etype_info is not None:
1952 self.assertEqual(len(etype_info), 1)
1953 e = self.getElementValue(etype_info[0], 'etype')
1954 self.assertEqual(e, kcrypto.Enctype.RC4)
1955 self.assertEqual(e, expect_etype_info2[0])
1956 salt = self.getElementValue(etype_info[0], 'salt')
1957 if self.strict_checking:
1958 self.assertIsNotNone(salt)
1959 self.assertEqual(len(salt), 0)
1961 self.assertIsNotNone(enc_timestamp)
1962 self.assertIsNotNone(pk_as_req)
1963 self.assertIsNotNone(pk_as_rep19)
1965 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
1966 return
1968 def generate_simple_tgs_padata(self,
1969 kdc_exchange_dict,
1970 callback_dict,
1971 req_body):
1972 tgt = kdc_exchange_dict['tgt']
1973 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
1974 body_checksum_type = kdc_exchange_dict['body_checksum_type']
1976 req_body_blob = self.der_encode(req_body,
1977 asn1Spec=krb5_asn1.KDC_REQ_BODY())
1979 req_body_checksum = self.Checksum_create(tgt.session_key,
1980 KU_TGS_REQ_AUTH_CKSUM,
1981 req_body_blob,
1982 ctype=body_checksum_type)
1984 subkey_obj = None
1985 if authenticator_subkey is not None:
1986 subkey_obj = authenticator_subkey.export_obj()
1987 seq_number = random.randint(0, 0xfffffffe)
1988 (ctime, cusec) = self.get_KerberosTimeWithUsec()
1989 authenticator_obj = self.Authenticator_create(
1990 crealm=tgt.crealm,
1991 cname=tgt.cname,
1992 cksum=req_body_checksum,
1993 cusec=cusec,
1994 ctime=ctime,
1995 subkey=subkey_obj,
1996 seq_number=seq_number,
1997 authorization_data=None)
1998 authenticator_blob = self.der_encode(
1999 authenticator_obj,
2000 asn1Spec=krb5_asn1.Authenticator())
2002 authenticator = self.EncryptedData_create(tgt.session_key,
2003 KU_TGS_REQ_AUTH,
2004 authenticator_blob)
2006 ap_options = krb5_asn1.APOptions('0')
2007 ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
2008 ticket=tgt.ticket,
2009 authenticator=authenticator)
2010 ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
2011 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
2012 padata = [pa_tgs_req]
2014 return padata, req_body
2016 def check_simple_tgs_padata(self,
2017 kdc_exchange_dict,
2018 callback_dict,
2019 rep,
2020 padata):
2021 tgt = kdc_exchange_dict['tgt']
2022 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
2023 if authenticator_subkey is not None:
2024 subkey = authenticator_subkey
2025 subkey_usage = KU_TGS_REP_ENC_PART_SUB_KEY
2026 else:
2027 subkey = tgt.session_key
2028 subkey_usage = KU_TGS_REP_ENC_PART_SESSION
2030 return subkey, subkey_usage
2032 def _test_as_exchange(self,
2033 cname,
2034 realm,
2035 sname,
2036 till,
2037 client_as_etypes,
2038 expected_error_mode,
2039 expected_crealm,
2040 expected_cname,
2041 expected_srealm,
2042 expected_sname,
2043 expected_salt,
2044 etypes,
2045 padata,
2046 kdc_options,
2047 preauth_key=None,
2048 ticket_decryption_key=None):
2050 def _generate_padata_copy(_kdc_exchange_dict,
2051 _callback_dict,
2052 req_body):
2053 return padata, req_body
2055 def _check_padata_preauth_key(_kdc_exchange_dict,
2056 _callback_dict,
2057 rep,
2058 padata):
2059 as_rep_usage = KU_AS_REP_ENC_PART
2060 return preauth_key, as_rep_usage
2062 if expected_error_mode == 0:
2063 check_error_fn = None
2064 check_rep_fn = self.generic_check_kdc_rep
2065 else:
2066 check_error_fn = self.generic_check_as_error
2067 check_rep_fn = None
2069 kdc_exchange_dict = self.as_exchange_dict(
2070 expected_crealm=expected_crealm,
2071 expected_cname=expected_cname,
2072 expected_srealm=expected_srealm,
2073 expected_sname=expected_sname,
2074 ticket_decryption_key=ticket_decryption_key,
2075 generate_padata_fn=_generate_padata_copy,
2076 check_error_fn=check_error_fn,
2077 check_rep_fn=check_rep_fn,
2078 check_padata_fn=_check_padata_preauth_key,
2079 check_kdc_private_fn=self.generic_check_kdc_private,
2080 expected_error_mode=expected_error_mode,
2081 client_as_etypes=client_as_etypes,
2082 expected_salt=expected_salt,
2083 kdc_options=str(kdc_options))
2085 rep = self._generic_kdc_exchange(kdc_exchange_dict,
2086 cname=cname,
2087 realm=realm,
2088 sname=sname,
2089 till_time=till,
2090 etypes=etypes)
2092 return rep, kdc_exchange_dict