1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
29 from pyasn1
.codec
.der
.decoder
import decode
as pyasn1_der_decode
30 from pyasn1
.codec
.der
.encoder
import encode
as pyasn1_der_encode
31 from pyasn1
.codec
.native
.decoder
import decode
as pyasn1_native_decode
32 from pyasn1
.codec
.native
.encoder
import encode
as pyasn1_native_encode
34 from pyasn1
.codec
.ber
.encoder
import BitStringEncoder
36 from samba
.credentials
import Credentials
37 from samba
.dcerpc
import security
38 from samba
.gensec
import FEATURE_SEAL
41 from samba
.tests
import TestCaseInTempDir
43 import samba
.tests
.krb5
.rfc4120_pyasn1
as krb5_asn1
44 from samba
.tests
.krb5
.rfc4120_constants
import (
45 FX_FAST_ARMOR_AP_REQUEST
,
59 KU_NON_KERB_CKSUM_SALT
,
60 KU_TGS_REP_ENC_PART_SESSION
,
61 KU_TGS_REP_ENC_PART_SUB_KEY
,
63 KU_TGS_REQ_AUTH_CKSUM
,
64 KU_TGS_REQ_AUTH_DAT_SESSION
,
65 KU_TGS_REQ_AUTH_DAT_SUBKEY
,
78 PADATA_SUPPORTED_ETYPES
80 import samba
.tests
.krb5
.kcrypto
as kcrypto
83 def BitStringEncoder_encodeValue32(
84 self
, value
, asn1Spec
, encodeFun
, **options
):
86 # BitStrings like KDCOptions or TicketFlags should at least
87 # be 32-Bit on the wire
89 if asn1Spec
is not None:
90 # TODO: try to avoid ASN.1 schema instantiation
91 value
= asn1Spec
.clone(value
)
93 valueLength
= len(value
)
95 alignedValue
= value
<< (8 - valueLength
% 8)
99 substrate
= alignedValue
.asOctets()
100 length
= len(substrate
)
101 # We need at least 32-Bit / 4-Bytes
106 ret
= b
'\x00' + substrate
+ (b
'\x00' * padding
)
107 return ret
, False, True
110 BitStringEncoder
.encodeValue
= BitStringEncoder_encodeValue32
113 def BitString_NamedValues_prettyPrint(self
, scope
=0):
114 ret
= "%s" % self
.asBinary()
117 for byte
in self
.asNumbers():
118 for bit
in [7, 6, 5, 4, 3, 2, 1, 0]:
125 if len(bits
) < highest_bit
:
126 for bitPosition
in range(len(bits
), highest_bit
):
129 delim
= ": (\n%s " % indent
130 for bitPosition
in range(highest_bit
):
131 if bitPosition
in self
.prettyPrintNamedValues
:
132 name
= self
.prettyPrintNamedValues
[bitPosition
]
133 elif bits
[bitPosition
] != 0:
134 name
= "unknown-bit-%u" % bitPosition
137 ret
+= "%s%s:%u" % (delim
, name
, bits
[bitPosition
])
138 delim
= ",\n%s " % indent
139 ret
+= "\n%s)" % indent
143 krb5_asn1
.TicketFlags
.prettyPrintNamedValues
=\
144 krb5_asn1
.TicketFlagsValues
.namedValues
145 krb5_asn1
.TicketFlags
.namedValues
=\
146 krb5_asn1
.TicketFlagsValues
.namedValues
147 krb5_asn1
.TicketFlags
.prettyPrint
=\
148 BitString_NamedValues_prettyPrint
149 krb5_asn1
.KDCOptions
.prettyPrintNamedValues
=\
150 krb5_asn1
.KDCOptionsValues
.namedValues
151 krb5_asn1
.KDCOptions
.namedValues
=\
152 krb5_asn1
.KDCOptionsValues
.namedValues
153 krb5_asn1
.KDCOptions
.prettyPrint
=\
154 BitString_NamedValues_prettyPrint
155 krb5_asn1
.APOptions
.prettyPrintNamedValues
=\
156 krb5_asn1
.APOptionsValues
.namedValues
157 krb5_asn1
.APOptions
.namedValues
=\
158 krb5_asn1
.APOptionsValues
.namedValues
159 krb5_asn1
.APOptions
.prettyPrint
=\
160 BitString_NamedValues_prettyPrint
161 krb5_asn1
.PACOptionFlags
.prettyPrintNamedValues
=\
162 krb5_asn1
.PACOptionFlagsValues
.namedValues
163 krb5_asn1
.PACOptionFlags
.namedValues
=\
164 krb5_asn1
.PACOptionFlagsValues
.namedValues
165 krb5_asn1
.PACOptionFlags
.prettyPrint
=\
166 BitString_NamedValues_prettyPrint
169 def Integer_NamedValues_prettyPrint(self
, scope
=0):
171 if intval
in self
.prettyPrintNamedValues
:
172 name
= self
.prettyPrintNamedValues
[intval
]
174 name
= "<__unknown__>"
175 ret
= "%d (0x%x) %s" % (intval
, intval
, name
)
179 krb5_asn1
.NameType
.prettyPrintNamedValues
=\
180 krb5_asn1
.NameTypeValues
.namedValues
181 krb5_asn1
.NameType
.prettyPrint
=\
182 Integer_NamedValues_prettyPrint
183 krb5_asn1
.AuthDataType
.prettyPrintNamedValues
=\
184 krb5_asn1
.AuthDataTypeValues
.namedValues
185 krb5_asn1
.AuthDataType
.prettyPrint
=\
186 Integer_NamedValues_prettyPrint
187 krb5_asn1
.PADataType
.prettyPrintNamedValues
=\
188 krb5_asn1
.PADataTypeValues
.namedValues
189 krb5_asn1
.PADataType
.prettyPrint
=\
190 Integer_NamedValues_prettyPrint
191 krb5_asn1
.EncryptionType
.prettyPrintNamedValues
=\
192 krb5_asn1
.EncryptionTypeValues
.namedValues
193 krb5_asn1
.EncryptionType
.prettyPrint
=\
194 Integer_NamedValues_prettyPrint
195 krb5_asn1
.ChecksumType
.prettyPrintNamedValues
=\
196 krb5_asn1
.ChecksumTypeValues
.namedValues
197 krb5_asn1
.ChecksumType
.prettyPrint
=\
198 Integer_NamedValues_prettyPrint
199 krb5_asn1
.KerbErrorDataType
.prettyPrintNamedValues
=\
200 krb5_asn1
.KerbErrorDataTypeValues
.namedValues
201 krb5_asn1
.KerbErrorDataType
.prettyPrint
=\
202 Integer_NamedValues_prettyPrint
205 class Krb5EncryptionKey
:
206 def __init__(self
, key
, kvno
):
208 kcrypto
.Enctype
.AES256
: kcrypto
.Cksumtype
.SHA1_AES256
,
209 kcrypto
.Enctype
.AES128
: kcrypto
.Cksumtype
.SHA1_AES128
,
210 kcrypto
.Enctype
.RC4
: kcrypto
.Cksumtype
.HMAC_MD5
,
213 self
.etype
= key
.enctype
214 self
.ctype
= EncTypeChecksum
[self
.etype
]
217 def encrypt(self
, usage
, plaintext
):
218 ciphertext
= kcrypto
.encrypt(self
.key
, usage
, plaintext
)
221 def decrypt(self
, usage
, ciphertext
):
222 plaintext
= kcrypto
.decrypt(self
.key
, usage
, ciphertext
)
225 def make_checksum(self
, usage
, plaintext
, ctype
=None):
228 cksum
= kcrypto
.make_checksum(ctype
, self
.key
, usage
, plaintext
)
231 def export_obj(self
):
232 EncryptionKey_obj
= {
233 'keytype': self
.etype
,
234 'keyvalue': self
.key
.contents
,
236 return EncryptionKey_obj
239 class KerberosCredentials(Credentials
):
241 super(KerberosCredentials
, self
).__init
__()
243 all_enc_types |
= security
.KERB_ENCTYPE_RC4_HMAC_MD5
244 all_enc_types |
= security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
245 all_enc_types |
= security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
247 self
.as_supported_enctypes
= all_enc_types
248 self
.tgs_supported_enctypes
= all_enc_types
249 self
.ap_supported_enctypes
= all_enc_types
252 self
.forced_keys
= {}
254 self
.forced_salt
= None
256 def set_as_supported_enctypes(self
, value
):
257 self
.as_supported_enctypes
= int(value
)
259 def set_tgs_supported_enctypes(self
, value
):
260 self
.tgs_supported_enctypes
= int(value
)
262 def set_ap_supported_enctypes(self
, value
):
263 self
.ap_supported_enctypes
= int(value
)
265 def _get_krb5_etypes(self
, supported_enctypes
):
268 if supported_enctypes
& security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
:
269 etypes
+= (kcrypto
.Enctype
.AES256
,)
270 if supported_enctypes
& security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
:
271 etypes
+= (kcrypto
.Enctype
.AES128
,)
272 if supported_enctypes
& security
.KERB_ENCTYPE_RC4_HMAC_MD5
:
273 etypes
+= (kcrypto
.Enctype
.RC4
,)
277 def get_as_krb5_etypes(self
):
278 return self
._get
_krb
5_etypes
(self
.as_supported_enctypes
)
280 def get_tgs_krb5_etypes(self
):
281 return self
._get
_krb
5_etypes
(self
.tgs_supported_enctypes
)
283 def get_ap_krb5_etypes(self
):
284 return self
._get
_krb
5_etypes
(self
.ap_supported_enctypes
)
286 def set_kvno(self
, kvno
):
292 def set_forced_key(self
, etype
, hexkey
):
294 contents
= binascii
.a2b_hex(hexkey
)
295 key
= kcrypto
.Key(etype
, contents
)
296 self
.forced_keys
[etype
] = Krb5EncryptionKey(key
, self
.kvno
)
298 def get_forced_key(self
, etype
):
300 return self
.forced_keys
.get(etype
, None)
302 def set_forced_salt(self
, salt
):
303 self
.forced_salt
= bytes(salt
)
305 def get_forced_salt(self
):
306 return self
.forced_salt
309 if self
.forced_salt
is not None:
310 return self
.forced_salt
312 if self
.get_workstation():
313 salt_string
= '%shost%s.%s' % (
314 self
.get_realm().upper(),
315 self
.get_username().lower().rsplit('$', 1)[0],
316 self
.get_realm().lower())
318 salt_string
= self
.get_realm().upper() + self
.get_username()
320 return salt_string
.encode('utf-8')
323 class KerberosTicketCreds
:
324 def __init__(self
, ticket
, session_key
,
325 crealm
=None, cname
=None,
326 srealm
=None, sname
=None,
329 encpart_private
=None):
331 self
.session_key
= session_key
336 self
.decryption_key
= decryption_key
337 self
.ticket_private
= ticket_private
338 self
.encpart_private
= encpart_private
341 class RawKerberosTest(TestCaseInTempDir
):
342 """A raw Kerberos Test case."""
345 {"value": -1111, "name": "dummy", },
346 {"value": kcrypto
.Enctype
.AES256
, "name": "aes128", },
347 {"value": kcrypto
.Enctype
.AES128
, "name": "aes256", },
348 {"value": kcrypto
.Enctype
.RC4
, "name": "rc4", },
351 setup_etype_test_permutations_done
= False
354 def setup_etype_test_permutations(cls
):
355 if cls
.setup_etype_test_permutations_done
:
360 num_idxs
= len(cls
.etypes_to_test
)
362 for num
in range(1, num_idxs
+ 1):
363 chunk
= list(itertools
.permutations(range(num_idxs
), num
))
366 permutations
.append(el
)
368 for p
in permutations
:
372 n
= cls
.etypes_to_test
[idx
]["name"]
377 etypes
+= (cls
.etypes_to_test
[idx
]["value"],)
379 r
= {"name": name
, "etypes": etypes
, }
382 cls
.etype_test_permutations
= res
383 cls
.setup_etype_test_permutations_done
= True
386 def etype_test_permutation_name_idx(cls
):
387 cls
.setup_etype_test_permutations()
390 for e
in cls
.etype_test_permutations
:
396 def etype_test_permutation_by_idx(self
, idx
):
397 e
= self
.etype_test_permutations
[idx
]
398 return (e
['name'], e
['etypes'])
404 cls
.host
= samba
.tests
.env_get_var_value('SERVER')
406 # A dictionary containing credentials that have already been
412 self
.do_asn1_print
= False
413 self
.do_hexdump
= False
415 strict_checking
= samba
.tests
.env_get_var_value('STRICT_CHECKING',
417 if strict_checking
is None:
418 strict_checking
= '1'
419 self
.strict_checking
= bool(int(strict_checking
))
423 self
.unspecified_kvno
= object()
426 self
._disconnect
("tearDown")
429 def _disconnect(self
, reason
):
435 sys
.stderr
.write("disconnect[%s]\n" % reason
)
437 def _connect_tcp(self
):
440 self
.a
= socket
.getaddrinfo(self
.host
, tcp_port
, socket
.AF_UNSPEC
,
441 socket
.SOCK_STREAM
, socket
.SOL_TCP
,
443 self
.s
= socket
.socket(self
.a
[0][0], self
.a
[0][1], self
.a
[0][2])
444 self
.s
.settimeout(10)
445 self
.s
.connect(self
.a
[0][4])
454 self
.assertNotConnected()
457 sys
.stderr
.write("connected[%s]\n" % self
.host
)
459 def env_get_var(self
, varname
, prefix
,
460 fallback_default
=True,
461 allow_missing
=False):
463 if prefix
is not None:
464 allow_missing_prefix
= allow_missing
or fallback_default
465 val
= samba
.tests
.env_get_var_value(
466 '%s_%s' % (prefix
, varname
),
467 allow_missing
=allow_missing_prefix
)
469 fallback_default
= True
470 if val
is None and fallback_default
:
471 val
= samba
.tests
.env_get_var_value(varname
,
472 allow_missing
=allow_missing
)
475 def _get_krb5_creds_from_env(self
, prefix
,
476 default_username
=None,
477 allow_missing_password
=False,
478 allow_missing_keys
=True,
479 require_strongest_key
=False):
480 c
= KerberosCredentials()
483 domain
= self
.env_get_var('DOMAIN', prefix
)
484 realm
= self
.env_get_var('REALM', prefix
)
485 allow_missing_username
= default_username
is not None
486 username
= self
.env_get_var('USERNAME', prefix
,
487 fallback_default
=False,
488 allow_missing
=allow_missing_username
)
490 username
= default_username
491 password
= self
.env_get_var('PASSWORD', prefix
,
492 fallback_default
=False,
493 allow_missing
=allow_missing_password
)
496 c
.set_username(username
)
497 if password
is not None:
498 c
.set_password(password
)
499 as_supported_enctypes
= self
.env_get_var('AS_SUPPORTED_ENCTYPES',
500 prefix
, allow_missing
=True)
501 if as_supported_enctypes
is not None:
502 c
.set_as_supported_enctypes(as_supported_enctypes
)
503 tgs_supported_enctypes
= self
.env_get_var('TGS_SUPPORTED_ENCTYPES',
504 prefix
, allow_missing
=True)
505 if tgs_supported_enctypes
is not None:
506 c
.set_tgs_supported_enctypes(tgs_supported_enctypes
)
507 ap_supported_enctypes
= self
.env_get_var('AP_SUPPORTED_ENCTYPES',
508 prefix
, allow_missing
=True)
509 if ap_supported_enctypes
is not None:
510 c
.set_ap_supported_enctypes(ap_supported_enctypes
)
512 if require_strongest_key
:
513 kvno_allow_missing
= False
515 aes256_allow_missing
= False
517 aes256_allow_missing
= True
519 kvno_allow_missing
= allow_missing_keys
520 aes256_allow_missing
= allow_missing_keys
521 kvno
= self
.env_get_var('KVNO', prefix
,
522 fallback_default
=False,
523 allow_missing
=kvno_allow_missing
)
526 aes256_key
= self
.env_get_var('AES256_KEY_HEX', prefix
,
527 fallback_default
=False,
528 allow_missing
=aes256_allow_missing
)
529 if aes256_key
is not None:
530 c
.set_forced_key(kcrypto
.Enctype
.AES256
, aes256_key
)
531 aes128_key
= self
.env_get_var('AES128_KEY_HEX', prefix
,
532 fallback_default
=False,
534 if aes128_key
is not None:
535 c
.set_forced_key(kcrypto
.Enctype
.AES128
, aes128_key
)
536 rc4_key
= self
.env_get_var('RC4_KEY_HEX', prefix
,
537 fallback_default
=False, allow_missing
=True)
538 if rc4_key
is not None:
539 c
.set_forced_key(kcrypto
.Enctype
.RC4
, rc4_key
)
541 if not allow_missing_keys
:
542 self
.assertTrue(c
.forced_keys
,
543 'Please supply %s encryption keys '
544 'in environment' % prefix
)
548 def _get_krb5_creds(self
,
550 default_username
=None,
551 allow_missing_password
=False,
552 allow_missing_keys
=True,
553 require_strongest_key
=False,
554 fallback_creds_fn
=None):
555 if prefix
in self
.creds_dict
:
556 return self
.creds_dict
[prefix
]
558 # We don't have the credentials already
562 # Try to obtain them from the environment
563 creds
= self
._get
_krb
5_creds
_from
_env
(
565 default_username
=default_username
,
566 allow_missing_password
=allow_missing_password
,
567 allow_missing_keys
=allow_missing_keys
,
568 require_strongest_key
=require_strongest_key
)
569 except Exception as err
:
570 # An error occurred, so save it for later
573 self
.assertIsNotNone(creds
)
574 # Save the obtained credentials
575 self
.creds_dict
[prefix
] = creds
578 if fallback_creds_fn
is not None:
580 # Try to use the fallback method
581 creds
= fallback_creds_fn()
582 except Exception as err
:
583 print("ERROR FROM ENV: %r" % (env_err
))
584 print("FALLBACK-FN: %s" % (fallback_creds_fn
))
585 print("FALLBACK-ERROR: %r" % (err
))
587 self
.assertIsNotNone(creds
)
588 # Save the obtained credentials
589 self
.creds_dict
[prefix
] = creds
592 # Both methods failed, so raise the exception from the
596 def get_user_creds(self
,
597 allow_missing_password
=False,
598 allow_missing_keys
=True):
599 c
= self
._get
_krb
5_creds
(prefix
=None,
600 allow_missing_password
=allow_missing_password
,
601 allow_missing_keys
=allow_missing_keys
)
604 def get_service_creds(self
,
605 allow_missing_password
=False,
606 allow_missing_keys
=True):
607 c
= self
._get
_krb
5_creds
(prefix
='SERVICE',
608 allow_missing_password
=allow_missing_password
,
609 allow_missing_keys
=allow_missing_keys
)
612 def get_client_creds(self
,
613 allow_missing_password
=False,
614 allow_missing_keys
=True):
615 c
= self
._get
_krb
5_creds
(prefix
='CLIENT',
616 allow_missing_password
=allow_missing_password
,
617 allow_missing_keys
=allow_missing_keys
)
620 def get_server_creds(self
,
621 allow_missing_password
=False,
622 allow_missing_keys
=True):
623 c
= self
._get
_krb
5_creds
(prefix
='SERVER',
624 allow_missing_password
=allow_missing_password
,
625 allow_missing_keys
=allow_missing_keys
)
628 def get_admin_creds(self
,
629 allow_missing_password
=False,
630 allow_missing_keys
=True):
631 c
= self
._get
_krb
5_creds
(prefix
='ADMIN',
632 allow_missing_password
=allow_missing_password
,
633 allow_missing_keys
=allow_missing_keys
)
634 c
.set_gensec_features(c
.get_gensec_features() | FEATURE_SEAL
)
637 def get_krbtgt_creds(self
,
639 require_strongest_key
=False):
640 if require_strongest_key
:
641 self
.assertTrue(require_keys
)
642 c
= self
._get
_krb
5_creds
(prefix
='KRBTGT',
643 default_username
='krbtgt',
644 allow_missing_password
=True,
645 allow_missing_keys
=not require_keys
,
646 require_strongest_key
=require_strongest_key
)
649 def get_anon_creds(self
):
654 def asn1_dump(self
, name
, obj
, asn1_print
=None):
655 if asn1_print
is None:
656 asn1_print
= self
.do_asn1_print
659 sys
.stderr
.write("%s:\n%s" % (name
, obj
))
661 sys
.stderr
.write("%s" % (obj
))
663 def hex_dump(self
, name
, blob
, hexdump
=None):
665 hexdump
= self
.do_hexdump
668 "%s: %d\n%s" % (name
, len(blob
), self
.hexdump(blob
)))
677 if asn1Spec
is not None:
678 class_name
= type(asn1Spec
).__name
__.split(':')[0]
680 class_name
= "<None-asn1Spec>"
681 self
.hex_dump(class_name
, blob
, hexdump
=hexdump
)
682 obj
, _
= pyasn1_der_decode(blob
, asn1Spec
=asn1Spec
)
683 self
.asn1_dump(None, obj
, asn1_print
=asn1_print
)
685 obj
= pyasn1_native_encode(obj
)
696 obj
= pyasn1_native_decode(obj
, asn1Spec
=asn1Spec
)
697 class_name
= type(obj
).__name
__.split(':')[0]
698 if class_name
is not None:
699 self
.asn1_dump(None, obj
, asn1_print
=asn1_print
)
700 blob
= pyasn1_der_encode(obj
)
701 if class_name
is not None:
702 self
.hex_dump(class_name
, blob
, hexdump
=hexdump
)
705 def send_pdu(self
, req
, asn1_print
=None, hexdump
=None):
707 k5_pdu
= self
.der_encode(
708 req
, native_decode
=False, asn1_print
=asn1_print
, hexdump
=False)
709 header
= struct
.pack('>I', len(k5_pdu
))
712 self
.hex_dump("send_pdu", header
, hexdump
=hexdump
)
713 self
.hex_dump("send_pdu", k5_pdu
, hexdump
=hexdump
)
715 sent
= self
.s
.send(req_pdu
, 0)
716 if sent
== len(req_pdu
):
718 req_pdu
= req_pdu
[sent
:]
719 except socket
.error
as e
:
720 self
._disconnect
("send_pdu: %s" % e
)
723 self
._disconnect
("send_pdu: %s" % e
)
726 def recv_raw(self
, num_recv
=0xffff, hexdump
=None, timeout
=None):
729 if timeout
is not None:
730 self
.s
.settimeout(timeout
)
731 rep_pdu
= self
.s
.recv(num_recv
, 0)
732 self
.s
.settimeout(10)
733 if len(rep_pdu
) == 0:
734 self
._disconnect
("recv_raw: EOF")
736 self
.hex_dump("recv_raw", rep_pdu
, hexdump
=hexdump
)
737 except socket
.timeout
:
738 self
.s
.settimeout(10)
739 sys
.stderr
.write("recv_raw: TIMEOUT\n")
740 except socket
.error
as e
:
741 self
._disconnect
("recv_raw: %s" % e
)
744 self
._disconnect
("recv_raw: %s" % e
)
748 def recv_pdu_raw(self
, asn1_print
=None, hexdump
=None, timeout
=None):
751 raw_pdu
= self
.recv_raw(
752 num_recv
=4, hexdump
=hexdump
, timeout
=timeout
)
755 header
= struct
.unpack(">I", raw_pdu
[0:4])
762 raw_pdu
= self
.recv_raw(
763 num_recv
=missing
, hexdump
=hexdump
, timeout
=timeout
)
764 self
.assertGreaterEqual(len(raw_pdu
), 1)
766 missing
= k5_len
- len(rep_pdu
)
767 k5_raw
= self
.der_decode(
773 pvno
= k5_raw
['field-0']
774 self
.assertEqual(pvno
, 5)
775 msg_type
= k5_raw
['field-1']
776 self
.assertIn(msg_type
, [KRB_AS_REP
, KRB_TGS_REP
, KRB_ERROR
])
777 if msg_type
== KRB_AS_REP
:
778 asn1Spec
= krb5_asn1
.AS_REP()
779 elif msg_type
== KRB_TGS_REP
:
780 asn1Spec
= krb5_asn1
.TGS_REP()
781 elif msg_type
== KRB_ERROR
:
782 asn1Spec
= krb5_asn1
.KRB_ERROR()
783 rep
= self
.der_decode(rep_pdu
, asn1Spec
=asn1Spec
,
784 asn1_print
=asn1_print
, hexdump
=False)
785 return (rep
, rep_pdu
)
787 def recv_pdu(self
, asn1_print
=None, hexdump
=None, timeout
=None):
788 (rep
, rep_pdu
) = self
.recv_pdu_raw(asn1_print
=asn1_print
,
793 def assertIsConnected(self
):
794 self
.assertIsNotNone(self
.s
, msg
="Not connected")
796 def assertNotConnected(self
):
797 self
.assertIsNone(self
.s
, msg
="Is connected")
799 def send_recv_transaction(
807 self
.send_pdu(req
, asn1_print
=asn1_print
, hexdump
=hexdump
)
809 asn1_print
=asn1_print
, hexdump
=hexdump
, timeout
=timeout
)
811 self
._disconnect
("transaction failed")
813 self
._disconnect
("transaction done")
816 def assertNoValue(self
, value
):
817 self
.assertTrue(value
.isNoValue
)
819 def assertHasValue(self
, value
):
820 self
.assertIsNotNone(value
)
822 def getElementValue(self
, obj
, elem
):
823 return obj
.get(elem
, None)
825 def assertElementMissing(self
, obj
, elem
):
826 v
= self
.getElementValue(obj
, elem
)
829 def assertElementPresent(self
, obj
, elem
):
830 v
= self
.getElementValue(obj
, elem
)
831 self
.assertIsNotNone(v
)
832 if self
.strict_checking
:
833 if isinstance(v
, collections
.abc
.Container
):
834 self
.assertNotEqual(0, len(v
))
836 def assertElementEqual(self
, obj
, elem
, value
):
837 v
= self
.getElementValue(obj
, elem
)
838 self
.assertIsNotNone(v
)
839 self
.assertEqual(v
, value
)
841 def assertElementEqualUTF8(self
, obj
, elem
, value
):
842 v
= self
.getElementValue(obj
, elem
)
843 self
.assertIsNotNone(v
)
844 self
.assertEqual(v
, bytes(value
, 'utf8'))
846 def assertPrincipalEqual(self
, princ1
, princ2
):
847 self
.assertEqual(princ1
['name-type'], princ2
['name-type'])
849 len(princ1
['name-string']),
850 len(princ2
['name-string']),
851 msg
="princ1=%s != princ2=%s" % (princ1
, princ2
))
852 for idx
in range(len(princ1
['name-string'])):
854 princ1
['name-string'][idx
],
855 princ2
['name-string'][idx
],
856 msg
="princ1=%s != princ2=%s" % (princ1
, princ2
))
858 def assertElementEqualPrincipal(self
, obj
, elem
, value
):
859 v
= self
.getElementValue(obj
, elem
)
860 self
.assertIsNotNone(v
)
861 v
= pyasn1_native_decode(v
, asn1Spec
=krb5_asn1
.PrincipalName())
862 self
.assertPrincipalEqual(v
, value
)
864 def assertElementKVNO(self
, obj
, elem
, value
):
865 v
= self
.getElementValue(obj
, elem
)
866 if value
== "autodetect":
868 if value
is not None:
869 self
.assertIsNotNone(v
)
870 # The value on the wire should never be 0
871 self
.assertNotEqual(v
, 0)
872 # unspecified_kvno means we don't know the kvno,
873 # but want to enforce its presence
874 if value
is not self
.unspecified_kvno
:
876 self
.assertNotEqual(value
, 0)
877 self
.assertEqual(v
, value
)
881 def get_KerberosTimeWithUsec(self
, epoch
=None, offset
=None):
884 if offset
is not None:
885 epoch
= epoch
+ int(offset
)
886 dt
= datetime
.datetime
.fromtimestamp(epoch
, tz
=datetime
.timezone
.utc
)
887 return (dt
.strftime("%Y%m%d%H%M%SZ"), dt
.microsecond
)
889 def get_KerberosTime(self
, epoch
=None, offset
=None):
890 (s
, _
) = self
.get_KerberosTimeWithUsec(epoch
=epoch
, offset
=offset
)
893 def get_EpochFromKerberosTime(self
, kerberos_time
):
894 if isinstance(kerberos_time
, bytes
):
895 kerberos_time
= kerberos_time
.decode()
897 epoch
= datetime
.datetime
.strptime(kerberos_time
,
899 epoch
= epoch
.replace(tzinfo
=datetime
.timezone
.utc
)
900 epoch
= int(epoch
.timestamp())
905 nonce_min
= 0x7f000000
906 nonce_max
= 0x7fffffff
907 v
= random
.randint(nonce_min
, nonce_max
)
910 def get_pa_dict(self
, pa_data
):
913 if pa_data
is not None:
915 pa_type
= pa
['padata-type']
916 if pa_type
in pa_dict
:
917 raise RuntimeError(f
'Duplicate type {pa_type}')
918 pa_dict
[pa_type
] = pa
['padata-value']
922 def SessionKey_create(self
, etype
, contents
, kvno
=None):
923 key
= kcrypto
.Key(etype
, contents
)
924 return Krb5EncryptionKey(key
, kvno
)
926 def PasswordKey_create(self
, etype
=None, pwd
=None, salt
=None, kvno
=None):
927 self
.assertIsNotNone(pwd
)
928 self
.assertIsNotNone(salt
)
929 key
= kcrypto
.string_to_key(etype
, pwd
, salt
)
930 return Krb5EncryptionKey(key
, kvno
)
932 def PasswordKey_from_etype_info2(self
, creds
, etype_info2
, kvno
=None):
933 e
= etype_info2
['etype']
935 salt
= etype_info2
.get('salt', None)
937 if e
== kcrypto
.Enctype
.RC4
:
938 nthash
= creds
.get_nt_hash()
939 return self
.SessionKey_create(etype
=e
, contents
=nthash
, kvno
=kvno
)
941 password
= creds
.get_password()
942 return self
.PasswordKey_create(
943 etype
=e
, pwd
=password
, salt
=salt
, kvno
=kvno
)
945 def TicketDecryptionKey_from_creds(self
, creds
, etype
=None):
948 etypes
= creds
.get_tgs_krb5_etypes()
951 forced_key
= creds
.get_forced_key(etype
)
952 if forced_key
is not None:
955 kvno
= creds
.get_kvno()
957 fail_msg
= ("%s has no fixed key for etype[%s] kvno[%s] "
958 "nor a password specified, " % (
959 creds
.get_username(), etype
, kvno
))
961 if etype
== kcrypto
.Enctype
.RC4
:
962 nthash
= creds
.get_nt_hash()
963 self
.assertIsNotNone(nthash
, msg
=fail_msg
)
964 return self
.SessionKey_create(etype
=etype
,
968 password
= creds
.get_password()
969 self
.assertIsNotNone(password
, msg
=fail_msg
)
970 salt
= creds
.get_salt()
971 return self
.PasswordKey_create(etype
=etype
,
976 def RandomKey(self
, etype
):
977 e
= kcrypto
._get
_enctype
_profile
(etype
)
978 contents
= samba
.generate_random_bytes(e
.keysize
)
979 return self
.SessionKey_create(etype
=etype
, contents
=contents
)
981 def EncryptionKey_import(self
, EncryptionKey_obj
):
982 return self
.SessionKey_create(EncryptionKey_obj
['keytype'],
983 EncryptionKey_obj
['keyvalue'])
985 def EncryptedData_create(self
, key
, usage
, plaintext
):
986 # EncryptedData ::= SEQUENCE {
987 # etype [0] Int32 -- EncryptionType --,
988 # kvno [1] UInt32 OPTIONAL,
989 # cipher [2] OCTET STRING -- ciphertext
991 ciphertext
= key
.encrypt(usage
, plaintext
)
992 EncryptedData_obj
= {
996 if key
.kvno
is not None:
997 EncryptedData_obj
['kvno'] = key
.kvno
998 return EncryptedData_obj
1000 def Checksum_create(self
, key
, usage
, plaintext
, ctype
=None):
1001 # Checksum ::= SEQUENCE {
1002 # cksumtype [0] Int32,
1003 # checksum [1] OCTET STRING
1007 checksum
= key
.make_checksum(usage
, plaintext
, ctype
=ctype
)
1010 'checksum': checksum
,
1015 def PrincipalName_create(cls
, name_type
, names
):
1016 # PrincipalName ::= SEQUENCE {
1017 # name-type [0] Int32,
1018 # name-string [1] SEQUENCE OF KerberosString
1020 PrincipalName_obj
= {
1021 'name-type': name_type
,
1022 'name-string': names
,
1024 return PrincipalName_obj
1026 def AuthorizationData_create(self
, ad_type
, ad_data
):
1027 # AuthorizationData ::= SEQUENCE {
1028 # ad-type [0] Int32,
1029 # ad-data [1] OCTET STRING
1035 return AUTH_DATA_obj
1037 def PA_DATA_create(self
, padata_type
, padata_value
):
1038 # PA-DATA ::= SEQUENCE {
1039 # -- NOTE: first tag is [1], not [0]
1040 # padata-type [1] Int32,
1041 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1044 'padata-type': padata_type
,
1045 'padata-value': padata_value
,
1049 def PA_ENC_TS_ENC_create(self
, ts
, usec
):
1050 # PA-ENC-TS-ENC ::= SEQUENCE {
1051 # patimestamp[0] KerberosTime, -- client's time
1052 # pausec[1] krb5int32 OPTIONAL
1054 PA_ENC_TS_ENC_obj
= {
1058 return PA_ENC_TS_ENC_obj
1060 def PA_PAC_OPTIONS_create(self
, options
):
1061 # PA-PAC-OPTIONS ::= SEQUENCE {
1062 # options [0] PACOptionFlags
1064 PA_PAC_OPTIONS_obj
= {
1067 return PA_PAC_OPTIONS_obj
1069 def KRB_FAST_ARMOR_create(self
, armor_type
, armor_value
):
1070 # KrbFastArmor ::= SEQUENCE {
1071 # armor-type [0] Int32,
1072 # armor-value [1] OCTET STRING,
1075 KRB_FAST_ARMOR_obj
= {
1076 'armor-type': armor_type
,
1077 'armor-value': armor_value
1079 return KRB_FAST_ARMOR_obj
1081 def KRB_FAST_REQ_create(self
, fast_options
, padata
, req_body
):
1082 # KrbFastReq ::= SEQUENCE {
1083 # fast-options [0] FastOptions,
1084 # padata [1] SEQUENCE OF PA-DATA,
1085 # req-body [2] KDC-REQ-BODY,
1088 KRB_FAST_REQ_obj
= {
1089 'fast-options': fast_options
,
1091 'req-body': req_body
1093 return KRB_FAST_REQ_obj
1095 def KRB_FAST_ARMORED_REQ_create(self
, armor
, req_checksum
, enc_fast_req
):
1096 # KrbFastArmoredReq ::= SEQUENCE {
1097 # armor [0] KrbFastArmor OPTIONAL,
1098 # req-checksum [1] Checksum,
1099 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1101 KRB_FAST_ARMORED_REQ_obj
= {
1102 'req-checksum': req_checksum
,
1103 'enc-fast-req': enc_fast_req
1105 if armor
is not None:
1106 KRB_FAST_ARMORED_REQ_obj
['armor'] = armor
1107 return KRB_FAST_ARMORED_REQ_obj
1109 def PA_FX_FAST_REQUEST_create(self
, armored_data
):
1110 # PA-FX-FAST-REQUEST ::= CHOICE {
1111 # armored-data [0] KrbFastArmoredReq,
1114 PA_FX_FAST_REQUEST_obj
= {
1115 'armored-data': armored_data
1117 return PA_FX_FAST_REQUEST_obj
1119 def KERB_PA_PAC_REQUEST_create(self
, include_pac
, pa_data_create
=True):
1120 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1121 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1123 # --If FALSE, and PAC present,
1126 KERB_PA_PAC_REQUEST_obj
= {
1127 'include-pac': include_pac
,
1129 if not pa_data_create
:
1130 return KERB_PA_PAC_REQUEST_obj
1131 pa_pac
= self
.der_encode(KERB_PA_PAC_REQUEST_obj
,
1132 asn1Spec
=krb5_asn1
.KERB_PA_PAC_REQUEST())
1133 pa_data
= self
.PA_DATA_create(PADATA_PAC_REQUEST
, pa_pac
)
1136 def KDC_REQ_BODY_create(self
,
1148 EncAuthorizationData
,
1149 EncAuthorizationData_key
,
1150 EncAuthorizationData_usage
,
1153 # KDC-REQ-BODY ::= SEQUENCE {
1154 # kdc-options [0] KDCOptions,
1155 # cname [1] PrincipalName OPTIONAL
1156 # -- Used only in AS-REQ --,
1159 # -- Also client's in AS-REQ --,
1160 # sname [3] PrincipalName OPTIONAL,
1161 # from [4] KerberosTime OPTIONAL,
1162 # till [5] KerberosTime,
1163 # rtime [6] KerberosTime OPTIONAL,
1165 # etype [8] SEQUENCE OF Int32
1167 # -- in preference order --,
1168 # addresses [9] HostAddresses OPTIONAL,
1169 # enc-authorization-data [10] EncryptedData OPTIONAL
1170 # -- AuthorizationData --,
1171 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1172 # -- NOTE: not empty
1174 if EncAuthorizationData
is not None:
1175 enc_ad_plain
= self
.der_encode(
1176 EncAuthorizationData
,
1177 asn1Spec
=krb5_asn1
.AuthorizationData(),
1178 asn1_print
=asn1_print
,
1180 enc_ad
= self
.EncryptedData_create(EncAuthorizationData_key
,
1181 EncAuthorizationData_usage
,
1185 KDC_REQ_BODY_obj
= {
1186 'kdc-options': kdc_options
,
1192 if cname
is not None:
1193 KDC_REQ_BODY_obj
['cname'] = cname
1194 if sname
is not None:
1195 KDC_REQ_BODY_obj
['sname'] = sname
1196 if from_time
is not None:
1197 KDC_REQ_BODY_obj
['from'] = from_time
1198 if renew_time
is not None:
1199 KDC_REQ_BODY_obj
['rtime'] = renew_time
1200 if addresses
is not None:
1201 KDC_REQ_BODY_obj
['addresses'] = addresses
1202 if enc_ad
is not None:
1203 KDC_REQ_BODY_obj
['enc-authorization-data'] = enc_ad
1204 if additional_tickets
is not None:
1205 KDC_REQ_BODY_obj
['additional-tickets'] = additional_tickets
1206 return KDC_REQ_BODY_obj
1208 def KDC_REQ_create(self
,
1215 # KDC-REQ ::= SEQUENCE {
1216 # -- NOTE: first tag is [1], not [0]
1217 # pvno [1] INTEGER (5) ,
1218 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1219 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1220 # -- NOTE: not empty --,
1221 # req-body [4] KDC-REQ-BODY
1226 'msg-type': msg_type
,
1227 'req-body': req_body
,
1229 if padata
is not None:
1230 KDC_REQ_obj
['padata'] = padata
1231 if asn1Spec
is not None:
1232 KDC_REQ_decoded
= pyasn1_native_decode(
1233 KDC_REQ_obj
, asn1Spec
=asn1Spec
)
1235 KDC_REQ_decoded
= None
1236 return KDC_REQ_obj
, KDC_REQ_decoded
1238 def AS_REQ_create(self
,
1240 kdc_options
, # required
1244 from_time
, # optional
1245 till_time
, # required
1246 renew_time
, # optional
1249 addresses
, # optional
1251 native_decoded_only
=True,
1254 # KDC-REQ ::= SEQUENCE {
1255 # -- NOTE: first tag is [1], not [0]
1256 # pvno [1] INTEGER (5) ,
1257 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1258 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1259 # -- NOTE: not empty --,
1260 # req-body [4] KDC-REQ-BODY
1263 # KDC-REQ-BODY ::= SEQUENCE {
1264 # kdc-options [0] KDCOptions,
1265 # cname [1] PrincipalName OPTIONAL
1266 # -- Used only in AS-REQ --,
1269 # -- Also client's in AS-REQ --,
1270 # sname [3] PrincipalName OPTIONAL,
1271 # from [4] KerberosTime OPTIONAL,
1272 # till [5] KerberosTime,
1273 # rtime [6] KerberosTime OPTIONAL,
1275 # etype [8] SEQUENCE OF Int32
1277 # -- in preference order --,
1278 # addresses [9] HostAddresses OPTIONAL,
1279 # enc-authorization-data [10] EncryptedData OPTIONAL
1280 # -- AuthorizationData --,
1281 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1282 # -- NOTE: not empty
1284 KDC_REQ_BODY_obj
= self
.KDC_REQ_BODY_create(
1296 EncAuthorizationData
=None,
1297 EncAuthorizationData_key
=None,
1298 EncAuthorizationData_usage
=None,
1299 asn1_print
=asn1_print
,
1301 obj
, decoded
= self
.KDC_REQ_create(
1302 msg_type
=KRB_AS_REQ
,
1304 req_body
=KDC_REQ_BODY_obj
,
1305 asn1Spec
=krb5_asn1
.AS_REQ(),
1306 asn1_print
=asn1_print
,
1308 if native_decoded_only
:
1312 def AP_REQ_create(self
, ap_options
, ticket
, authenticator
):
1313 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1314 # pvno [0] INTEGER (5),
1315 # msg-type [1] INTEGER (14),
1316 # ap-options [2] APOptions,
1317 # ticket [3] Ticket,
1318 # authenticator [4] EncryptedData -- Authenticator
1322 'msg-type': KRB_AP_REQ
,
1323 'ap-options': ap_options
,
1325 'authenticator': authenticator
,
1329 def Authenticator_create(
1330 self
, crealm
, cname
, cksum
, cusec
, ctime
, subkey
, seq_number
,
1331 authorization_data
):
1332 # -- Unencrypted authenticator
1333 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1334 # authenticator-vno [0] INTEGER (5),
1336 # cname [2] PrincipalName,
1337 # cksum [3] Checksum OPTIONAL,
1338 # cusec [4] Microseconds,
1339 # ctime [5] KerberosTime,
1340 # subkey [6] EncryptionKey OPTIONAL,
1341 # seq-number [7] UInt32 OPTIONAL,
1342 # authorization-data [8] AuthorizationData OPTIONAL
1344 Authenticator_obj
= {
1345 'authenticator-vno': 5,
1351 if cksum
is not None:
1352 Authenticator_obj
['cksum'] = cksum
1353 if subkey
is not None:
1354 Authenticator_obj
['subkey'] = subkey
1355 if seq_number
is not None:
1356 Authenticator_obj
['seq-number'] = seq_number
1357 if authorization_data
is not None:
1358 Authenticator_obj
['authorization-data'] = authorization_data
1359 return Authenticator_obj
1361 def TGS_REQ_create(self
,
1366 kdc_options
, # required
1370 from_time
, # optional
1371 till_time
, # required
1372 renew_time
, # optional
1375 addresses
, # optional
1376 EncAuthorizationData
,
1377 EncAuthorizationData_key
,
1380 authenticator_subkey
=None,
1381 body_checksum_type
=None,
1382 native_decoded_only
=True,
1385 # KDC-REQ ::= SEQUENCE {
1386 # -- NOTE: first tag is [1], not [0]
1387 # pvno [1] INTEGER (5) ,
1388 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1389 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1390 # -- NOTE: not empty --,
1391 # req-body [4] KDC-REQ-BODY
1394 # KDC-REQ-BODY ::= SEQUENCE {
1395 # kdc-options [0] KDCOptions,
1396 # cname [1] PrincipalName OPTIONAL
1397 # -- Used only in AS-REQ --,
1400 # -- Also client's in AS-REQ --,
1401 # sname [3] PrincipalName OPTIONAL,
1402 # from [4] KerberosTime OPTIONAL,
1403 # till [5] KerberosTime,
1404 # rtime [6] KerberosTime OPTIONAL,
1406 # etype [8] SEQUENCE OF Int32
1408 # -- in preference order --,
1409 # addresses [9] HostAddresses OPTIONAL,
1410 # enc-authorization-data [10] EncryptedData OPTIONAL
1411 # -- AuthorizationData --,
1412 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1413 # -- NOTE: not empty
1416 if authenticator_subkey
is not None:
1417 EncAuthorizationData_usage
= KU_TGS_REQ_AUTH_DAT_SUBKEY
1419 EncAuthorizationData_usage
= KU_TGS_REQ_AUTH_DAT_SESSION
1421 req_body
= self
.KDC_REQ_BODY_create(
1422 kdc_options
=kdc_options
,
1426 from_time
=from_time
,
1427 till_time
=till_time
,
1428 renew_time
=renew_time
,
1431 addresses
=addresses
,
1432 additional_tickets
=additional_tickets
,
1433 EncAuthorizationData
=EncAuthorizationData
,
1434 EncAuthorizationData_key
=EncAuthorizationData_key
,
1435 EncAuthorizationData_usage
=EncAuthorizationData_usage
)
1436 req_body_blob
= self
.der_encode(req_body
,
1437 asn1Spec
=krb5_asn1
.KDC_REQ_BODY(),
1438 asn1_print
=asn1_print
, hexdump
=hexdump
)
1440 req_body_checksum
= self
.Checksum_create(ticket_session_key
,
1441 KU_TGS_REQ_AUTH_CKSUM
,
1443 ctype
=body_checksum_type
)
1446 if authenticator_subkey
is not None:
1447 subkey_obj
= authenticator_subkey
.export_obj()
1448 seq_number
= random
.randint(0, 0xfffffffe)
1449 authenticator
= self
.Authenticator_create(
1452 cksum
=req_body_checksum
,
1456 seq_number
=seq_number
,
1457 authorization_data
=None)
1458 authenticator
= self
.der_encode(
1460 asn1Spec
=krb5_asn1
.Authenticator(),
1461 asn1_print
=asn1_print
,
1464 authenticator
= self
.EncryptedData_create(
1465 ticket_session_key
, KU_TGS_REQ_AUTH
, authenticator
)
1467 ap_options
= krb5_asn1
.APOptions('0')
1468 ap_req
= self
.AP_REQ_create(ap_options
=str(ap_options
),
1470 authenticator
=authenticator
)
1471 ap_req
= self
.der_encode(ap_req
, asn1Spec
=krb5_asn1
.AP_REQ(),
1472 asn1_print
=asn1_print
, hexdump
=hexdump
)
1473 pa_tgs_req
= self
.PA_DATA_create(PADATA_KDC_REQ
, ap_req
)
1474 if padata
is not None:
1475 padata
.append(pa_tgs_req
)
1477 padata
= [pa_tgs_req
]
1479 obj
, decoded
= self
.KDC_REQ_create(
1480 msg_type
=KRB_TGS_REQ
,
1483 asn1Spec
=krb5_asn1
.TGS_REQ(),
1484 asn1_print
=asn1_print
,
1486 if native_decoded_only
:
1490 def PA_S4U2Self_create(self
, name
, realm
, tgt_session_key
, ctype
=None):
1491 # PA-S4U2Self ::= SEQUENCE {
1492 # name [0] PrincipalName,
1494 # cksum [2] Checksum,
1495 # auth [3] GeneralString
1497 cksum_data
= name
['name-type'].to_bytes(4, byteorder
='little')
1498 for n
in name
['name-string']:
1499 cksum_data
+= n
.encode()
1500 cksum_data
+= realm
.encode()
1501 cksum_data
+= "Kerberos".encode()
1502 cksum
= self
.Checksum_create(tgt_session_key
,
1503 KU_NON_KERB_CKSUM_SALT
,
1513 pa_s4u2self
= self
.der_encode(
1514 PA_S4U2Self_obj
, asn1Spec
=krb5_asn1
.PA_S4U2Self())
1515 return self
.PA_DATA_create(PADATA_FOR_USER
, pa_s4u2self
)
1517 def _generic_kdc_exchange(self
,
1518 kdc_exchange_dict
, # required
1519 cname
=None, # optional
1520 realm
=None, # required
1521 sname
=None, # optional
1522 from_time
=None, # optional
1523 till_time
=None, # required
1524 renew_time
=None, # optional
1525 etypes
=None, # required
1526 addresses
=None, # optional
1527 additional_tickets
=None, # optional
1528 EncAuthorizationData
=None, # optional
1529 EncAuthorizationData_key
=None, # optional
1530 EncAuthorizationData_usage
=None): # optional
1532 check_error_fn
= kdc_exchange_dict
['check_error_fn']
1533 check_rep_fn
= kdc_exchange_dict
['check_rep_fn']
1534 generate_fast_fn
= kdc_exchange_dict
['generate_fast_fn']
1535 generate_fast_armor_fn
= kdc_exchange_dict
['generate_fast_armor_fn']
1536 generate_fast_padata_fn
= kdc_exchange_dict
['generate_fast_padata_fn']
1537 generate_padata_fn
= kdc_exchange_dict
['generate_padata_fn']
1538 callback_dict
= kdc_exchange_dict
['callback_dict']
1539 req_msg_type
= kdc_exchange_dict
['req_msg_type']
1540 req_asn1Spec
= kdc_exchange_dict
['req_asn1Spec']
1541 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
1543 expected_error_mode
= kdc_exchange_dict
['expected_error_mode']
1544 kdc_options
= kdc_exchange_dict
['kdc_options']
1546 # Parameters specific to the outer request body
1547 outer_req
= kdc_exchange_dict
['outer_req']
1549 if till_time
is None:
1550 till_time
= self
.get_KerberosTime(offset
=36000)
1552 if 'nonce' in kdc_exchange_dict
:
1553 nonce
= kdc_exchange_dict
['nonce']
1555 nonce
= self
.get_Nonce()
1556 kdc_exchange_dict
['nonce'] = nonce
1558 req_body
= self
.KDC_REQ_BODY_create(
1559 kdc_options
=kdc_options
,
1563 from_time
=from_time
,
1564 till_time
=till_time
,
1565 renew_time
=renew_time
,
1568 addresses
=addresses
,
1569 additional_tickets
=additional_tickets
,
1570 EncAuthorizationData
=EncAuthorizationData
,
1571 EncAuthorizationData_key
=EncAuthorizationData_key
,
1572 EncAuthorizationData_usage
=EncAuthorizationData_usage
)
1574 inner_req_body
= dict(req_body
)
1575 if outer_req
is not None:
1576 for key
, value
in outer_req
.items():
1577 if value
is not None:
1578 req_body
[key
] = value
1582 if req_msg_type
== KRB_AS_REQ
:
1584 tgs_req_padata
= None
1586 self
.assertEqual(KRB_TGS_REQ
, req_msg_type
)
1588 tgs_req
= self
.generate_ap_req(kdc_exchange_dict
,
1592 tgs_req_padata
= self
.PA_DATA_create(PADATA_KDC_REQ
, tgs_req
)
1594 if generate_fast_padata_fn
is not None:
1595 self
.assertIsNotNone(generate_fast_fn
)
1596 # This can alter req_body...
1597 fast_padata
, req_body
= generate_fast_padata_fn(kdc_exchange_dict
,
1603 if generate_fast_armor_fn
is not None:
1604 self
.assertIsNotNone(generate_fast_fn
)
1605 fast_ap_req
= generate_fast_armor_fn(kdc_exchange_dict
,
1610 fast_armor_type
= kdc_exchange_dict
['fast_armor_type']
1611 fast_armor
= self
.KRB_FAST_ARMOR_create(fast_armor_type
,
1616 if generate_padata_fn
is not None:
1617 # This can alter req_body...
1618 outer_padata
, req_body
= generate_padata_fn(kdc_exchange_dict
,
1621 self
.assertIsNotNone(outer_padata
)
1622 self
.assertNotIn(PADATA_KDC_REQ
,
1623 [pa
['padata-type'] for pa
in outer_padata
],
1624 'Don\'t create TGS-REQ manually')
1628 if generate_fast_fn
is not None:
1629 armor_key
= kdc_exchange_dict
['armor_key']
1630 self
.assertIsNotNone(armor_key
)
1632 if req_msg_type
== KRB_AS_REQ
:
1633 checksum_blob
= self
.der_encode(
1635 asn1Spec
=krb5_asn1
.KDC_REQ_BODY())
1637 self
.assertEqual(KRB_TGS_REQ
, req_msg_type
)
1638 checksum_blob
= tgs_req
1640 checksum
= self
.Checksum_create(armor_key
,
1644 fast
= generate_fast_fn(kdc_exchange_dict
,
1655 if tgs_req_padata
is not None:
1656 padata
.append(tgs_req_padata
)
1658 if fast
is not None:
1661 if outer_padata
is not None:
1662 padata
+= outer_padata
1667 kdc_exchange_dict
['req_padata'] = padata
1668 kdc_exchange_dict
['fast_padata'] = fast_padata
1669 kdc_exchange_dict
['req_body'] = inner_req_body
1671 req_obj
, req_decoded
= self
.KDC_REQ_create(msg_type
=req_msg_type
,
1674 asn1Spec
=req_asn1Spec())
1676 rep
= self
.send_recv_transaction(req_decoded
)
1677 self
.assertIsNotNone(rep
)
1679 msg_type
= self
.getElementValue(rep
, 'msg-type')
1680 self
.assertIsNotNone(msg_type
)
1682 expected_msg_type
= None
1683 if check_error_fn
is not None:
1684 expected_msg_type
= KRB_ERROR
1685 self
.assertIsNone(check_rep_fn
)
1686 self
.assertNotEqual(0, expected_error_mode
)
1687 if check_rep_fn
is not None:
1688 expected_msg_type
= rep_msg_type
1689 self
.assertIsNone(check_error_fn
)
1690 self
.assertEqual(0, expected_error_mode
)
1691 self
.assertIsNotNone(expected_msg_type
)
1692 self
.assertEqual(msg_type
, expected_msg_type
)
1694 if msg_type
== KRB_ERROR
:
1695 return check_error_fn(kdc_exchange_dict
,
1699 return check_rep_fn(kdc_exchange_dict
, callback_dict
, rep
)
1701 def as_exchange_dict(self
,
1702 expected_crealm
=None,
1703 expected_cname
=None,
1704 expected_cname_private
=None,
1705 expected_srealm
=None,
1706 expected_sname
=None,
1707 ticket_decryption_key
=None,
1708 generate_fast_fn
=None,
1709 generate_fast_armor_fn
=None,
1710 generate_fast_padata_fn
=None,
1711 fast_armor_type
=FX_FAST_ARMOR_AP_REQUEST
,
1712 generate_padata_fn
=None,
1713 check_error_fn
=None,
1715 check_padata_fn
=None,
1716 check_kdc_private_fn
=None,
1718 expected_error_mode
=0,
1719 client_as_etypes
=None,
1721 authenticator_subkey
=None,
1728 kdc_exchange_dict
= {
1729 'req_msg_type': KRB_AS_REQ
,
1730 'req_asn1Spec': krb5_asn1
.AS_REQ
,
1731 'rep_msg_type': KRB_AS_REP
,
1732 'rep_asn1Spec': krb5_asn1
.AS_REP
,
1733 'rep_encpart_asn1Spec': krb5_asn1
.EncASRepPart
,
1734 'expected_crealm': expected_crealm
,
1735 'expected_cname': expected_cname
,
1736 'expected_srealm': expected_srealm
,
1737 'expected_sname': expected_sname
,
1738 'ticket_decryption_key': ticket_decryption_key
,
1739 'generate_fast_fn': generate_fast_fn
,
1740 'generate_fast_armor_fn': generate_fast_armor_fn
,
1741 'generate_fast_padata_fn': generate_fast_padata_fn
,
1742 'fast_armor_type': fast_armor_type
,
1743 'generate_padata_fn': generate_padata_fn
,
1744 'check_error_fn': check_error_fn
,
1745 'check_rep_fn': check_rep_fn
,
1746 'check_padata_fn': check_padata_fn
,
1747 'check_kdc_private_fn': check_kdc_private_fn
,
1748 'callback_dict': callback_dict
,
1749 'expected_error_mode': expected_error_mode
,
1750 'client_as_etypes': client_as_etypes
,
1751 'expected_salt': expected_salt
,
1752 'authenticator_subkey': authenticator_subkey
,
1753 'armor_key': armor_key
,
1754 'armor_tgt': armor_tgt
,
1755 'armor_subkey': armor_subkey
,
1756 'auth_data': auth_data
,
1757 'kdc_options': kdc_options
,
1758 'outer_req': outer_req
1760 if expected_cname_private
is not None:
1761 kdc_exchange_dict
['expected_cname_private'] = (
1762 expected_cname_private
)
1764 if callback_dict
is None:
1767 return kdc_exchange_dict
1769 def tgs_exchange_dict(self
,
1770 expected_crealm
=None,
1771 expected_cname
=None,
1772 expected_cname_private
=None,
1773 expected_srealm
=None,
1774 expected_sname
=None,
1775 ticket_decryption_key
=None,
1776 generate_fast_fn
=None,
1777 generate_fast_armor_fn
=None,
1778 generate_fast_padata_fn
=None,
1779 fast_armor_type
=FX_FAST_ARMOR_AP_REQUEST
,
1780 generate_padata_fn
=None,
1781 check_error_fn
=None,
1783 check_padata_fn
=None,
1784 check_kdc_private_fn
=None,
1790 authenticator_subkey
=None,
1792 body_checksum_type
=None,
1795 kdc_exchange_dict
= {
1796 'req_msg_type': KRB_TGS_REQ
,
1797 'req_asn1Spec': krb5_asn1
.TGS_REQ
,
1798 'rep_msg_type': KRB_TGS_REP
,
1799 'rep_asn1Spec': krb5_asn1
.TGS_REP
,
1800 'rep_encpart_asn1Spec': krb5_asn1
.EncTGSRepPart
,
1801 'expected_crealm': expected_crealm
,
1802 'expected_cname': expected_cname
,
1803 'expected_srealm': expected_srealm
,
1804 'expected_sname': expected_sname
,
1805 'ticket_decryption_key': ticket_decryption_key
,
1806 'generate_fast_fn': generate_fast_fn
,
1807 'generate_fast_armor_fn': generate_fast_armor_fn
,
1808 'generate_fast_padata_fn': generate_fast_padata_fn
,
1809 'fast_armor_type': fast_armor_type
,
1810 'generate_padata_fn': generate_padata_fn
,
1811 'check_error_fn': check_error_fn
,
1812 'check_rep_fn': check_rep_fn
,
1813 'check_padata_fn': check_padata_fn
,
1814 'check_kdc_private_fn': check_kdc_private_fn
,
1815 'callback_dict': callback_dict
,
1817 'body_checksum_type': body_checksum_type
,
1818 'armor_key': armor_key
,
1819 'armor_tgt': armor_tgt
,
1820 'armor_subkey': armor_subkey
,
1821 'auth_data': auth_data
,
1822 'authenticator_subkey': authenticator_subkey
,
1823 'kdc_options': kdc_options
,
1824 'outer_req': outer_req
1826 if expected_cname_private
is not None:
1827 kdc_exchange_dict
['expected_cname_private'] = (
1828 expected_cname_private
)
1830 if callback_dict
is None:
1833 return kdc_exchange_dict
1835 def generic_check_kdc_rep(self
,
1840 expected_crealm
= kdc_exchange_dict
['expected_crealm']
1841 expected_cname
= kdc_exchange_dict
['expected_cname']
1842 expected_srealm
= kdc_exchange_dict
['expected_srealm']
1843 expected_sname
= kdc_exchange_dict
['expected_sname']
1844 ticket_decryption_key
= kdc_exchange_dict
['ticket_decryption_key']
1845 check_padata_fn
= kdc_exchange_dict
['check_padata_fn']
1846 check_kdc_private_fn
= kdc_exchange_dict
['check_kdc_private_fn']
1847 rep_encpart_asn1Spec
= kdc_exchange_dict
['rep_encpart_asn1Spec']
1848 msg_type
= kdc_exchange_dict
['rep_msg_type']
1849 armor_key
= kdc_exchange_dict
['armor_key']
1851 self
.assertElementEqual(rep
, 'msg-type', msg_type
) # AS-REP | TGS-REP
1852 padata
= self
.getElementValue(rep
, 'padata')
1853 if self
.strict_checking
:
1854 self
.assertElementEqualUTF8(rep
, 'crealm', expected_crealm
)
1855 self
.assertElementEqualPrincipal(rep
, 'cname', expected_cname
)
1856 self
.assertElementPresent(rep
, 'ticket')
1857 ticket
= self
.getElementValue(rep
, 'ticket')
1858 ticket_encpart
= None
1859 ticket_cipher
= None
1860 self
.assertIsNotNone(ticket
)
1861 if ticket
is not None: # Never None, but gives indentation
1862 self
.assertElementEqual(ticket
, 'tkt-vno', 5)
1863 self
.assertElementEqualUTF8(ticket
, 'realm', expected_srealm
)
1864 self
.assertElementEqualPrincipal(ticket
, 'sname', expected_sname
)
1865 self
.assertElementPresent(ticket
, 'enc-part')
1866 ticket_encpart
= self
.getElementValue(ticket
, 'enc-part')
1867 self
.assertIsNotNone(ticket_encpart
)
1868 if ticket_encpart
is not None: # Never None, but gives indentation
1869 self
.assertElementPresent(ticket_encpart
, 'etype')
1870 # 'unspecified' means present, with any value != 0
1871 self
.assertElementKVNO(ticket_encpart
, 'kvno',
1872 self
.unspecified_kvno
)
1873 self
.assertElementPresent(ticket_encpart
, 'cipher')
1874 ticket_cipher
= self
.getElementValue(ticket_encpart
, 'cipher')
1875 self
.assertElementPresent(rep
, 'enc-part')
1876 encpart
= self
.getElementValue(rep
, 'enc-part')
1877 encpart_cipher
= None
1878 self
.assertIsNotNone(encpart
)
1879 if encpart
is not None: # Never None, but gives indentation
1880 self
.assertElementPresent(encpart
, 'etype')
1881 self
.assertElementKVNO(ticket_encpart
, 'kvno', 'autodetect')
1882 self
.assertElementPresent(encpart
, 'cipher')
1883 encpart_cipher
= self
.getElementValue(encpart
, 'cipher')
1885 ticket_checksum
= None
1887 encpart_decryption_key
= None
1888 self
.assertIsNotNone(check_padata_fn
)
1889 if check_padata_fn
is not None:
1890 # See if we can get the decryption key from the preauth phase
1891 encpart_decryption_key
, encpart_decryption_usage
= (
1892 check_padata_fn(kdc_exchange_dict
, callback_dict
,
1895 if armor_key
is not None:
1896 pa_dict
= self
.get_pa_dict(padata
)
1898 if PADATA_FX_FAST
in pa_dict
:
1899 fx_fast_data
= pa_dict
[PADATA_FX_FAST
]
1900 fast_response
= self
.check_fx_fast_data(kdc_exchange_dict
,
1905 if 'strengthen-key' in fast_response
:
1906 strengthen_key
= self
.EncryptionKey_import(
1907 fast_response
['strengthen-key'])
1908 encpart_decryption_key
= (
1909 self
.generate_strengthen_reply_key(
1911 encpart_decryption_key
))
1913 fast_finished
= fast_response
.get('finished', None)
1914 if fast_finished
is not None:
1915 ticket_checksum
= fast_finished
['ticket-checksum']
1917 self
.check_rep_padata(kdc_exchange_dict
,
1920 fast_response
['padata'])
1922 ticket_private
= None
1923 self
.assertIsNotNone(ticket_decryption_key
)
1924 if ticket_decryption_key
is not None:
1925 self
.assertElementEqual(ticket_encpart
, 'etype',
1926 ticket_decryption_key
.etype
)
1927 self
.assertElementKVNO(ticket_encpart
, 'kvno',
1928 ticket_decryption_key
.kvno
)
1929 ticket_decpart
= ticket_decryption_key
.decrypt(KU_TICKET
,
1931 ticket_private
= self
.der_decode(
1933 asn1Spec
=krb5_asn1
.EncTicketPart())
1935 encpart_private
= None
1936 self
.assertIsNotNone(encpart_decryption_key
)
1937 if encpart_decryption_key
is not None:
1938 self
.assertElementEqual(encpart
, 'etype',
1939 encpart_decryption_key
.etype
)
1940 if self
.strict_checking
:
1941 self
.assertElementKVNO(encpart
, 'kvno',
1942 encpart_decryption_key
.kvno
)
1943 rep_decpart
= encpart_decryption_key
.decrypt(
1944 encpart_decryption_usage
,
1946 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
1947 # application tag 26
1949 encpart_private
= self
.der_decode(
1951 asn1Spec
=rep_encpart_asn1Spec())
1953 encpart_private
= self
.der_decode(
1955 asn1Spec
=krb5_asn1
.EncTGSRepPart())
1957 self
.assertIsNotNone(check_kdc_private_fn
)
1958 if check_kdc_private_fn
is not None:
1959 check_kdc_private_fn(kdc_exchange_dict
, callback_dict
,
1960 rep
, ticket_private
, encpart_private
,
1965 def check_fx_fast_data(self
,
1970 expect_strengthen_key
=True):
1971 fx_fast_data
= self
.der_decode(fx_fast_data
,
1972 asn1Spec
=krb5_asn1
.PA_FX_FAST_REPLY())
1974 enc_fast_rep
= fx_fast_data
['armored-data']['enc-fast-rep']
1975 self
.assertEqual(enc_fast_rep
['etype'], armor_key
.etype
)
1977 fast_rep
= armor_key
.decrypt(KU_FAST_REP
, enc_fast_rep
['cipher'])
1979 fast_response
= self
.der_decode(fast_rep
,
1980 asn1Spec
=krb5_asn1
.KrbFastResponse())
1982 if expect_strengthen_key
and self
.strict_checking
:
1983 self
.assertIn('strengthen-key', fast_response
)
1986 self
.assertIn('finished', fast_response
)
1988 # Ensure that the nonce matches the nonce in the body of the request
1990 nonce
= kdc_exchange_dict
['nonce']
1991 self
.assertEqual(nonce
, fast_response
['nonce'])
1993 return fast_response
1995 def generic_check_kdc_private(self
,
2002 kdc_options
= kdc_exchange_dict
['kdc_options']
2003 canon_pos
= len(tuple(krb5_asn1
.KDCOptions('canonicalize'))) - 1
2004 canonicalize
= (canon_pos
< len(kdc_options
)
2005 and kdc_options
[canon_pos
] == '1')
2007 expected_crealm
= kdc_exchange_dict
['expected_crealm']
2008 expected_srealm
= kdc_exchange_dict
['expected_srealm']
2009 expected_sname
= kdc_exchange_dict
['expected_sname']
2010 ticket_decryption_key
= kdc_exchange_dict
['ticket_decryption_key']
2013 expected_cname
= kdc_exchange_dict
['expected_cname_private']
2015 expected_cname
= kdc_exchange_dict
['expected_cname']
2017 ticket
= self
.getElementValue(rep
, 'ticket')
2019 if ticket_checksum
is not None:
2020 armor_key
= kdc_exchange_dict
['armor_key']
2021 self
.verify_ticket_checksum(ticket
, ticket_checksum
, armor_key
)
2023 ticket_session_key
= None
2024 if ticket_private
is not None:
2025 self
.assertElementPresent(ticket_private
, 'flags')
2026 self
.assertElementPresent(ticket_private
, 'key')
2027 ticket_key
= self
.getElementValue(ticket_private
, 'key')
2028 self
.assertIsNotNone(ticket_key
)
2029 if ticket_key
is not None: # Never None, but gives indentation
2030 self
.assertElementPresent(ticket_key
, 'keytype')
2031 self
.assertElementPresent(ticket_key
, 'keyvalue')
2032 ticket_session_key
= self
.EncryptionKey_import(ticket_key
)
2033 self
.assertElementEqualUTF8(ticket_private
, 'crealm',
2035 self
.assertElementEqualPrincipal(ticket_private
, 'cname',
2037 self
.assertElementPresent(ticket_private
, 'transited')
2038 self
.assertElementPresent(ticket_private
, 'authtime')
2039 if self
.strict_checking
:
2040 self
.assertElementPresent(ticket_private
, 'starttime')
2041 self
.assertElementPresent(ticket_private
, 'endtime')
2042 # TODO self.assertElementPresent(ticket_private, 'renew-till')
2043 # TODO self.assertElementMissing(ticket_private, 'caddr')
2044 self
.assertElementPresent(ticket_private
, 'authorization-data')
2046 encpart_session_key
= None
2047 if encpart_private
is not None:
2048 self
.assertElementPresent(encpart_private
, 'key')
2049 encpart_key
= self
.getElementValue(encpart_private
, 'key')
2050 self
.assertIsNotNone(encpart_key
)
2051 if encpart_key
is not None: # Never None, but gives indentation
2052 self
.assertElementPresent(encpart_key
, 'keytype')
2053 self
.assertElementPresent(encpart_key
, 'keyvalue')
2054 encpart_session_key
= self
.EncryptionKey_import(encpart_key
)
2055 self
.assertElementPresent(encpart_private
, 'last-req')
2056 self
.assertElementEqual(encpart_private
, 'nonce',
2057 kdc_exchange_dict
['nonce'])
2058 # TODO self.assertElementPresent(encpart_private,
2060 self
.assertElementPresent(encpart_private
, 'flags')
2061 self
.assertElementPresent(encpart_private
, 'authtime')
2062 if self
.strict_checking
:
2063 self
.assertElementPresent(encpart_private
, 'starttime')
2064 self
.assertElementPresent(encpart_private
, 'endtime')
2065 # TODO self.assertElementPresent(encpart_private, 'renew-till')
2066 self
.assertElementEqualUTF8(encpart_private
, 'srealm',
2068 self
.assertElementEqualPrincipal(encpart_private
, 'sname',
2070 # TODO self.assertElementMissing(encpart_private, 'caddr')
2072 sent_claims
= self
.sent_claims(kdc_exchange_dict
)
2074 if self
.strict_checking
:
2075 if sent_claims
or canonicalize
:
2076 self
.assertElementPresent(encpart_private
,
2077 'encrypted-pa-data')
2078 enc_pa_dict
= self
.get_pa_dict(
2079 encpart_private
['encrypted-pa-data'])
2081 self
.assertIn(PADATA_SUPPORTED_ETYPES
, enc_pa_dict
)
2083 (supported_etypes
,) = struct
.unpack(
2085 enc_pa_dict
[PADATA_SUPPORTED_ETYPES
])
2088 security
.KERB_ENCTYPE_FAST_SUPPORTED
2091 security
.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED
2094 security
.KERB_ENCTYPE_CLAIMS_SUPPORTED
2097 self
.assertNotIn(PADATA_SUPPORTED_ETYPES
, enc_pa_dict
)
2099 # ClaimsCompIdFASTSupported registry key
2101 self
.assertIn(PADATA_PAC_OPTIONS
, enc_pa_dict
)
2103 self
.check_pac_options_claims_support(
2104 enc_pa_dict
[PADATA_PAC_OPTIONS
])
2106 self
.assertNotIn(PADATA_PAC_OPTIONS
, enc_pa_dict
)
2108 self
.assertElementEqual(encpart_private
,
2109 'encrypted-pa-data',
2112 if ticket_session_key
is not None and encpart_session_key
is not None:
2113 self
.assertEqual(ticket_session_key
.etype
,
2114 encpart_session_key
.etype
)
2115 self
.assertEqual(ticket_session_key
.key
.contents
,
2116 encpart_session_key
.key
.contents
)
2117 if encpart_session_key
is not None:
2118 session_key
= encpart_session_key
2120 session_key
= ticket_session_key
2121 ticket_creds
= KerberosTicketCreds(
2124 crealm
=expected_crealm
,
2125 cname
=expected_cname
,
2126 srealm
=expected_srealm
,
2127 sname
=expected_sname
,
2128 decryption_key
=ticket_decryption_key
,
2129 ticket_private
=ticket_private
,
2130 encpart_private
=encpart_private
)
2132 kdc_exchange_dict
['rep_ticket_creds'] = ticket_creds
2134 def check_pac_options_claims_support(self
, pac_options
):
2135 pac_options
= self
.der_decode(pac_options
,
2136 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
2137 self
.assertEqual('1', pac_options
['options'][0]) # claims bit
2139 def generic_check_kdc_error(self
,
2144 expected_cname
= kdc_exchange_dict
['expected_cname']
2145 expected_srealm
= kdc_exchange_dict
['expected_srealm']
2146 expected_sname
= kdc_exchange_dict
['expected_sname']
2147 expected_error_mode
= kdc_exchange_dict
['expected_error_mode']
2149 sent_fast
= self
.sent_fast(kdc_exchange_dict
)
2151 self
.assertElementEqual(rep
, 'pvno', 5)
2152 self
.assertElementEqual(rep
, 'msg-type', KRB_ERROR
)
2153 self
.assertElementEqual(rep
, 'error-code', expected_error_mode
)
2154 if self
.strict_checking
:
2155 self
.assertElementMissing(rep
, 'ctime')
2156 self
.assertElementMissing(rep
, 'cusec')
2157 self
.assertElementPresent(rep
, 'stime')
2158 self
.assertElementPresent(rep
, 'susec')
2159 # error-code checked above
2160 if self
.strict_checking
:
2161 self
.assertElementMissing(rep
, 'crealm')
2162 self
.assertElementMissing(rep
, 'cname')
2163 self
.assertElementEqualUTF8(rep
, 'realm', expected_srealm
)
2164 if sent_fast
and expected_error_mode
== KDC_ERR_GENERIC
:
2165 self
.assertElementEqualPrincipal(rep
, 'sname',
2166 self
.get_krbtgt_sname())
2168 self
.assertElementEqualPrincipal(rep
, 'sname', expected_sname
)
2169 self
.assertElementMissing(rep
, 'e-text')
2170 if expected_error_mode
== KDC_ERR_GENERIC
:
2171 self
.assertElementMissing(rep
, 'e-data')
2173 edata
= self
.getElementValue(rep
, 'e-data')
2174 if self
.strict_checking
:
2175 self
.assertIsNotNone(edata
)
2176 if edata
is not None:
2177 rep_padata
= self
.der_decode(edata
,
2178 asn1Spec
=krb5_asn1
.METHOD_DATA())
2179 self
.assertGreater(len(rep_padata
), 0)
2182 self
.assertEqual(1, len(rep_padata
))
2183 rep_pa_dict
= self
.get_pa_dict(rep_padata
)
2184 self
.assertIn(PADATA_FX_FAST
, rep_pa_dict
)
2186 armor_key
= kdc_exchange_dict
['armor_key']
2187 self
.assertIsNotNone(armor_key
)
2188 fast_response
= self
.check_fx_fast_data(
2190 rep_pa_dict
[PADATA_FX_FAST
],
2192 expect_strengthen_key
=False)
2194 rep_padata
= fast_response
['padata']
2198 etype_info2
= self
.check_rep_padata(kdc_exchange_dict
,
2203 kdc_exchange_dict
['preauth_etype_info2'] = etype_info2
2207 def check_rep_padata(self
,
2212 expected_error_mode
= kdc_exchange_dict
['expected_error_mode']
2213 req_body
= kdc_exchange_dict
['req_body']
2214 proposed_etypes
= req_body
['etype']
2215 client_as_etypes
= kdc_exchange_dict
.get('client_as_etypes', [])
2217 expect_etype_info2
= ()
2218 expect_etype_info
= False
2219 unexpect_etype_info
= True
2220 expected_aes_type
= 0
2221 expected_rc4_type
= 0
2222 if kcrypto
.Enctype
.RC4
in proposed_etypes
:
2223 expect_etype_info
= True
2224 for etype
in proposed_etypes
:
2225 if etype
in (kcrypto
.Enctype
.AES256
, kcrypto
.Enctype
.AES128
):
2226 expect_etype_info
= False
2227 if etype
not in client_as_etypes
:
2229 if etype
in (kcrypto
.Enctype
.AES256
, kcrypto
.Enctype
.AES128
):
2230 if etype
> expected_aes_type
:
2231 expected_aes_type
= etype
2232 if etype
in (kcrypto
.Enctype
.RC4
,) and expected_error_mode
!= 0:
2233 unexpect_etype_info
= False
2234 if etype
> expected_rc4_type
:
2235 expected_rc4_type
= etype
2237 if expected_aes_type
!= 0:
2238 expect_etype_info2
+= (expected_aes_type
,)
2239 if expected_rc4_type
!= 0:
2240 expect_etype_info2
+= (expected_rc4_type
,)
2242 expected_patypes
= ()
2243 if expect_etype_info
:
2244 self
.assertGreater(len(expect_etype_info2
), 0)
2245 expected_patypes
+= (PADATA_ETYPE_INFO
,)
2246 if len(expect_etype_info2
) != 0:
2247 expected_patypes
+= (PADATA_ETYPE_INFO2
,)
2249 expected_patypes
+= (PADATA_ENC_TIMESTAMP
,)
2250 expected_patypes
+= (PADATA_PK_AS_REQ
,)
2251 expected_patypes
+= (PADATA_PK_AS_REP_19
,)
2253 if self
.strict_checking
:
2254 for i
, patype
in enumerate(expected_patypes
):
2255 self
.assertElementEqual(rep_padata
[i
], 'padata-type', patype
)
2256 self
.assertEqual(len(rep_padata
), len(expected_patypes
))
2260 enc_timestamp
= None
2263 for pa
in rep_padata
:
2264 patype
= self
.getElementValue(pa
, 'padata-type')
2265 pavalue
= self
.getElementValue(pa
, 'padata-value')
2266 if patype
== PADATA_ETYPE_INFO2
:
2267 self
.assertIsNone(etype_info2
)
2268 etype_info2
= self
.der_decode(pavalue
,
2269 asn1Spec
=krb5_asn1
.ETYPE_INFO2())
2271 if patype
== PADATA_ETYPE_INFO
:
2272 self
.assertIsNone(etype_info
)
2273 etype_info
= self
.der_decode(pavalue
,
2274 asn1Spec
=krb5_asn1
.ETYPE_INFO())
2276 if patype
== PADATA_ENC_TIMESTAMP
:
2277 self
.assertIsNone(enc_timestamp
)
2278 enc_timestamp
= pavalue
2279 self
.assertEqual(len(enc_timestamp
), 0)
2281 if patype
== PADATA_PK_AS_REQ
:
2282 self
.assertIsNone(pk_as_req
)
2284 self
.assertEqual(len(pk_as_req
), 0)
2286 if patype
== PADATA_PK_AS_REP_19
:
2287 self
.assertIsNone(pk_as_rep19
)
2288 pk_as_rep19
= pavalue
2289 self
.assertEqual(len(pk_as_rep19
), 0)
2292 if all(etype
not in client_as_etypes
or etype
not in proposed_etypes
2293 for etype
in (kcrypto
.Enctype
.AES256
,
2294 kcrypto
.Enctype
.AES128
,
2295 kcrypto
.Enctype
.RC4
)):
2296 self
.assertIsNone(etype_info2
)
2297 self
.assertIsNone(etype_info
)
2298 if self
.strict_checking
:
2299 self
.assertIsNotNone(enc_timestamp
)
2300 self
.assertIsNotNone(pk_as_req
)
2301 self
.assertIsNotNone(pk_as_rep19
)
2304 if self
.strict_checking
:
2305 self
.assertIsNotNone(etype_info2
)
2306 if expect_etype_info
:
2307 self
.assertIsNotNone(etype_info
)
2309 if self
.strict_checking
:
2310 self
.assertIsNone(etype_info
)
2311 if unexpect_etype_info
:
2312 self
.assertIsNone(etype_info
)
2314 if self
.strict_checking
:
2315 self
.assertGreaterEqual(len(etype_info2
), 1)
2316 self
.assertEqual(len(etype_info2
), len(expect_etype_info2
))
2317 for i
in range(0, len(etype_info2
)):
2318 e
= self
.getElementValue(etype_info2
[i
], 'etype')
2319 self
.assertEqual(e
, expect_etype_info2
[i
])
2320 salt
= self
.getElementValue(etype_info2
[i
], 'salt')
2321 if e
== kcrypto
.Enctype
.RC4
:
2322 self
.assertIsNone(salt
)
2324 self
.assertIsNotNone(salt
)
2325 expected_salt
= kdc_exchange_dict
['expected_salt']
2326 if expected_salt
is not None:
2327 self
.assertEqual(salt
, expected_salt
)
2328 s2kparams
= self
.getElementValue(etype_info2
[i
], 's2kparams')
2329 if self
.strict_checking
:
2330 self
.assertIsNone(s2kparams
)
2331 if etype_info
is not None:
2332 self
.assertEqual(len(etype_info
), 1)
2333 e
= self
.getElementValue(etype_info
[0], 'etype')
2334 self
.assertEqual(e
, kcrypto
.Enctype
.RC4
)
2335 self
.assertEqual(e
, expect_etype_info2
[0])
2336 salt
= self
.getElementValue(etype_info
[0], 'salt')
2337 if self
.strict_checking
:
2338 self
.assertIsNotNone(salt
)
2339 self
.assertEqual(len(salt
), 0)
2341 self
.assertIsNotNone(enc_timestamp
)
2342 self
.assertIsNotNone(pk_as_req
)
2343 self
.assertIsNotNone(pk_as_rep19
)
2347 def generate_simple_fast(self
,
2355 armor_key
= kdc_exchange_dict
['armor_key']
2357 fast_req
= self
.KRB_FAST_REQ_create(fast_options
,
2360 fast_req
= self
.der_encode(fast_req
,
2361 asn1Spec
=krb5_asn1
.KrbFastReq())
2362 fast_req
= self
.EncryptedData_create(armor_key
,
2366 fast_armored_req
= self
.KRB_FAST_ARMORED_REQ_create(fast_armor
,
2370 fx_fast_request
= self
.PA_FX_FAST_REQUEST_create(fast_armored_req
)
2371 fx_fast_request
= self
.der_encode(
2373 asn1Spec
=krb5_asn1
.PA_FX_FAST_REQUEST())
2375 fast_padata
= self
.PA_DATA_create(PADATA_FX_FAST
,
2380 def generate_ap_req(self
,
2386 tgt
= kdc_exchange_dict
['armor_tgt']
2387 authenticator_subkey
= kdc_exchange_dict
['armor_subkey']
2389 req_body_checksum
= None
2391 tgt
= kdc_exchange_dict
['tgt']
2392 authenticator_subkey
= kdc_exchange_dict
['authenticator_subkey']
2393 body_checksum_type
= kdc_exchange_dict
['body_checksum_type']
2395 req_body_blob
= self
.der_encode(req_body
,
2396 asn1Spec
=krb5_asn1
.KDC_REQ_BODY())
2398 req_body_checksum
= self
.Checksum_create(tgt
.session_key
,
2399 KU_TGS_REQ_AUTH_CKSUM
,
2401 ctype
=body_checksum_type
)
2403 auth_data
= kdc_exchange_dict
['auth_data']
2406 if authenticator_subkey
is not None:
2407 subkey_obj
= authenticator_subkey
.export_obj()
2408 seq_number
= random
.randint(0, 0xfffffffe)
2409 (ctime
, cusec
) = self
.get_KerberosTimeWithUsec()
2410 authenticator_obj
= self
.Authenticator_create(
2413 cksum
=req_body_checksum
,
2417 seq_number
=seq_number
,
2418 authorization_data
=auth_data
)
2419 authenticator_blob
= self
.der_encode(
2421 asn1Spec
=krb5_asn1
.Authenticator())
2423 usage
= KU_AP_REQ_AUTH
if armor
else KU_TGS_REQ_AUTH
2424 authenticator
= self
.EncryptedData_create(tgt
.session_key
,
2428 ap_options
= krb5_asn1
.APOptions('0')
2429 ap_req_obj
= self
.AP_REQ_create(ap_options
=str(ap_options
),
2431 authenticator
=authenticator
)
2432 ap_req
= self
.der_encode(ap_req_obj
, asn1Spec
=krb5_asn1
.AP_REQ())
2436 def generate_simple_tgs_padata(self
,
2440 ap_req
= self
.generate_ap_req(kdc_exchange_dict
,
2444 pa_tgs_req
= self
.PA_DATA_create(PADATA_KDC_REQ
, ap_req
)
2445 padata
= [pa_tgs_req
]
2447 return padata
, req_body
2449 def check_simple_tgs_padata(self
,
2454 tgt
= kdc_exchange_dict
['tgt']
2455 authenticator_subkey
= kdc_exchange_dict
['authenticator_subkey']
2456 if authenticator_subkey
is not None:
2457 subkey
= authenticator_subkey
2458 subkey_usage
= KU_TGS_REP_ENC_PART_SUB_KEY
2460 subkey
= tgt
.session_key
2461 subkey_usage
= KU_TGS_REP_ENC_PART_SESSION
2463 return subkey
, subkey_usage
2465 def generate_armor_key(self
, subkey
, session_key
):
2466 armor_key
= kcrypto
.cf2(subkey
.key
,
2470 armor_key
= Krb5EncryptionKey(armor_key
, None)
2474 def generate_strengthen_reply_key(self
, strengthen_key
, reply_key
):
2475 strengthen_reply_key
= kcrypto
.cf2(strengthen_key
.key
,
2479 strengthen_reply_key
= Krb5EncryptionKey(strengthen_reply_key
,
2482 return strengthen_reply_key
2484 def generate_client_challenge_key(self
, armor_key
, longterm_key
):
2485 client_challenge_key
= kcrypto
.cf2(armor_key
.key
,
2487 b
'clientchallengearmor',
2488 b
'challengelongterm')
2489 client_challenge_key
= Krb5EncryptionKey(client_challenge_key
, None)
2491 return client_challenge_key
2493 def generate_kdc_challenge_key(self
, armor_key
, longterm_key
):
2494 kdc_challenge_key
= kcrypto
.cf2(armor_key
.key
,
2496 b
'kdcchallengearmor',
2497 b
'challengelongterm')
2498 kdc_challenge_key
= Krb5EncryptionKey(kdc_challenge_key
, None)
2500 return kdc_challenge_key
2502 def verify_ticket_checksum(self
, ticket
, expected_checksum
, armor_key
):
2503 expected_type
= expected_checksum
['cksumtype']
2504 self
.assertEqual(armor_key
.ctype
, expected_type
)
2506 ticket_blob
= self
.der_encode(ticket
,
2507 asn1Spec
=krb5_asn1
.Ticket())
2508 checksum
= self
.Checksum_create(armor_key
,
2511 self
.assertEqual(expected_checksum
, checksum
)
2513 def get_outer_pa_dict(self
, kdc_exchange_dict
):
2514 return self
.get_pa_dict(kdc_exchange_dict
['req_padata'])
2516 def get_fast_pa_dict(self
, kdc_exchange_dict
):
2517 req_pa_dict
= self
.get_pa_dict(kdc_exchange_dict
['fast_padata'])
2522 return self
.get_outer_pa_dict(kdc_exchange_dict
)
2524 def sent_fast(self
, kdc_exchange_dict
):
2525 outer_pa_dict
= self
.get_outer_pa_dict(kdc_exchange_dict
)
2527 return PADATA_FX_FAST
in outer_pa_dict
2529 def sent_enc_challenge(self
, kdc_exchange_dict
):
2530 fast_pa_dict
= self
.get_fast_pa_dict(kdc_exchange_dict
)
2532 return PADATA_ENCRYPTED_CHALLENGE
in fast_pa_dict
2534 def sent_claims(self
, kdc_exchange_dict
):
2535 fast_pa_dict
= self
.get_fast_pa_dict(kdc_exchange_dict
)
2537 if PADATA_PAC_OPTIONS
not in fast_pa_dict
:
2540 pac_options
= self
.der_decode(fast_pa_dict
[PADATA_PAC_OPTIONS
],
2541 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
2542 pac_options
= pac_options
['options']
2543 claims_pos
= len(tuple(krb5_asn1
.PACOptionFlags('claims'))) - 1
2545 return (claims_pos
< len(pac_options
)
2546 and pac_options
[claims_pos
] == '1')
2548 def get_krbtgt_sname(self
):
2549 krbtgt_creds
= self
.get_krbtgt_creds()
2550 krbtgt_username
= krbtgt_creds
.get_username()
2551 krbtgt_realm
= krbtgt_creds
.get_realm()
2552 krbtgt_sname
= self
.PrincipalName_create(
2553 name_type
=NT_SRV_INST
, names
=[krbtgt_username
, krbtgt_realm
])
2557 def _test_as_exchange(self
,
2563 expected_error_mode
,
2573 ticket_decryption_key
=None):
2575 def _generate_padata_copy(_kdc_exchange_dict
,
2578 return padata
, req_body
2580 def _check_padata_preauth_key(_kdc_exchange_dict
,
2584 as_rep_usage
= KU_AS_REP_ENC_PART
2585 return preauth_key
, as_rep_usage
2587 if expected_error_mode
== 0:
2588 check_error_fn
= None
2589 check_rep_fn
= self
.generic_check_kdc_rep
2591 check_error_fn
= self
.generic_check_kdc_error
2594 if padata
is not None:
2595 generate_padata_fn
= _generate_padata_copy
2597 generate_padata_fn
= None
2599 kdc_exchange_dict
= self
.as_exchange_dict(
2600 expected_crealm
=expected_crealm
,
2601 expected_cname
=expected_cname
,
2602 expected_srealm
=expected_srealm
,
2603 expected_sname
=expected_sname
,
2604 ticket_decryption_key
=ticket_decryption_key
,
2605 generate_padata_fn
=generate_padata_fn
,
2606 check_error_fn
=check_error_fn
,
2607 check_rep_fn
=check_rep_fn
,
2608 check_padata_fn
=_check_padata_preauth_key
,
2609 check_kdc_private_fn
=self
.generic_check_kdc_private
,
2610 expected_error_mode
=expected_error_mode
,
2611 client_as_etypes
=client_as_etypes
,
2612 expected_salt
=expected_salt
,
2613 kdc_options
=str(kdc_options
))
2615 rep
= self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
2622 return rep
, kdc_exchange_dict