1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
29 from pyasn1
.codec
.der
.decoder
import decode
as pyasn1_der_decode
30 from pyasn1
.codec
.der
.encoder
import encode
as pyasn1_der_encode
31 from pyasn1
.codec
.native
.decoder
import decode
as pyasn1_native_decode
32 from pyasn1
.codec
.native
.encoder
import encode
as pyasn1_native_encode
34 from pyasn1
.codec
.ber
.encoder
import BitStringEncoder
36 from samba
.credentials
import Credentials
37 from samba
.dcerpc
import security
38 from samba
.gensec
import FEATURE_SEAL
41 from samba
.tests
import TestCaseInTempDir
43 import samba
.tests
.krb5
.rfc4120_pyasn1
as krb5_asn1
44 from samba
.tests
.krb5
.rfc4120_constants
import (
45 FX_FAST_ARMOR_AP_REQUEST
,
59 KU_NON_KERB_CKSUM_SALT
,
60 KU_TGS_REP_ENC_PART_SESSION
,
61 KU_TGS_REP_ENC_PART_SUB_KEY
,
63 KU_TGS_REQ_AUTH_CKSUM
,
64 KU_TGS_REQ_AUTH_DAT_SESSION
,
65 KU_TGS_REQ_AUTH_DAT_SUBKEY
,
77 PADATA_SUPPORTED_ETYPES
79 import samba
.tests
.krb5
.kcrypto
as kcrypto
82 def BitStringEncoder_encodeValue32(
83 self
, value
, asn1Spec
, encodeFun
, **options
):
85 # BitStrings like KDCOptions or TicketFlags should at least
86 # be 32-Bit on the wire
88 if asn1Spec
is not None:
89 # TODO: try to avoid ASN.1 schema instantiation
90 value
= asn1Spec
.clone(value
)
92 valueLength
= len(value
)
94 alignedValue
= value
<< (8 - valueLength
% 8)
98 substrate
= alignedValue
.asOctets()
99 length
= len(substrate
)
100 # We need at least 32-Bit / 4-Bytes
105 ret
= b
'\x00' + substrate
+ (b
'\x00' * padding
)
106 return ret
, False, True
109 BitStringEncoder
.encodeValue
= BitStringEncoder_encodeValue32
112 def BitString_NamedValues_prettyPrint(self
, scope
=0):
113 ret
= "%s" % self
.asBinary()
116 for byte
in self
.asNumbers():
117 for bit
in [7, 6, 5, 4, 3, 2, 1, 0]:
124 if len(bits
) < highest_bit
:
125 for bitPosition
in range(len(bits
), highest_bit
):
128 delim
= ": (\n%s " % indent
129 for bitPosition
in range(highest_bit
):
130 if bitPosition
in self
.prettyPrintNamedValues
:
131 name
= self
.prettyPrintNamedValues
[bitPosition
]
132 elif bits
[bitPosition
] != 0:
133 name
= "unknown-bit-%u" % bitPosition
136 ret
+= "%s%s:%u" % (delim
, name
, bits
[bitPosition
])
137 delim
= ",\n%s " % indent
138 ret
+= "\n%s)" % indent
142 krb5_asn1
.TicketFlags
.prettyPrintNamedValues
=\
143 krb5_asn1
.TicketFlagsValues
.namedValues
144 krb5_asn1
.TicketFlags
.namedValues
=\
145 krb5_asn1
.TicketFlagsValues
.namedValues
146 krb5_asn1
.TicketFlags
.prettyPrint
=\
147 BitString_NamedValues_prettyPrint
148 krb5_asn1
.KDCOptions
.prettyPrintNamedValues
=\
149 krb5_asn1
.KDCOptionsValues
.namedValues
150 krb5_asn1
.KDCOptions
.namedValues
=\
151 krb5_asn1
.KDCOptionsValues
.namedValues
152 krb5_asn1
.KDCOptions
.prettyPrint
=\
153 BitString_NamedValues_prettyPrint
154 krb5_asn1
.APOptions
.prettyPrintNamedValues
=\
155 krb5_asn1
.APOptionsValues
.namedValues
156 krb5_asn1
.APOptions
.namedValues
=\
157 krb5_asn1
.APOptionsValues
.namedValues
158 krb5_asn1
.APOptions
.prettyPrint
=\
159 BitString_NamedValues_prettyPrint
160 krb5_asn1
.PACOptionFlags
.prettyPrintNamedValues
=\
161 krb5_asn1
.PACOptionFlagsValues
.namedValues
162 krb5_asn1
.PACOptionFlags
.namedValues
=\
163 krb5_asn1
.PACOptionFlagsValues
.namedValues
164 krb5_asn1
.PACOptionFlags
.prettyPrint
=\
165 BitString_NamedValues_prettyPrint
168 def Integer_NamedValues_prettyPrint(self
, scope
=0):
170 if intval
in self
.prettyPrintNamedValues
:
171 name
= self
.prettyPrintNamedValues
[intval
]
173 name
= "<__unknown__>"
174 ret
= "%d (0x%x) %s" % (intval
, intval
, name
)
178 krb5_asn1
.NameType
.prettyPrintNamedValues
=\
179 krb5_asn1
.NameTypeValues
.namedValues
180 krb5_asn1
.NameType
.prettyPrint
=\
181 Integer_NamedValues_prettyPrint
182 krb5_asn1
.AuthDataType
.prettyPrintNamedValues
=\
183 krb5_asn1
.AuthDataTypeValues
.namedValues
184 krb5_asn1
.AuthDataType
.prettyPrint
=\
185 Integer_NamedValues_prettyPrint
186 krb5_asn1
.PADataType
.prettyPrintNamedValues
=\
187 krb5_asn1
.PADataTypeValues
.namedValues
188 krb5_asn1
.PADataType
.prettyPrint
=\
189 Integer_NamedValues_prettyPrint
190 krb5_asn1
.EncryptionType
.prettyPrintNamedValues
=\
191 krb5_asn1
.EncryptionTypeValues
.namedValues
192 krb5_asn1
.EncryptionType
.prettyPrint
=\
193 Integer_NamedValues_prettyPrint
194 krb5_asn1
.ChecksumType
.prettyPrintNamedValues
=\
195 krb5_asn1
.ChecksumTypeValues
.namedValues
196 krb5_asn1
.ChecksumType
.prettyPrint
=\
197 Integer_NamedValues_prettyPrint
198 krb5_asn1
.KerbErrorDataType
.prettyPrintNamedValues
=\
199 krb5_asn1
.KerbErrorDataTypeValues
.namedValues
200 krb5_asn1
.KerbErrorDataType
.prettyPrint
=\
201 Integer_NamedValues_prettyPrint
204 class Krb5EncryptionKey
:
205 def __init__(self
, key
, kvno
):
207 kcrypto
.Enctype
.AES256
: kcrypto
.Cksumtype
.SHA1_AES256
,
208 kcrypto
.Enctype
.AES128
: kcrypto
.Cksumtype
.SHA1_AES128
,
209 kcrypto
.Enctype
.RC4
: kcrypto
.Cksumtype
.HMAC_MD5
,
212 self
.etype
= key
.enctype
213 self
.ctype
= EncTypeChecksum
[self
.etype
]
216 def encrypt(self
, usage
, plaintext
):
217 ciphertext
= kcrypto
.encrypt(self
.key
, usage
, plaintext
)
220 def decrypt(self
, usage
, ciphertext
):
221 plaintext
= kcrypto
.decrypt(self
.key
, usage
, ciphertext
)
224 def make_checksum(self
, usage
, plaintext
, ctype
=None):
227 cksum
= kcrypto
.make_checksum(ctype
, self
.key
, usage
, plaintext
)
230 def export_obj(self
):
231 EncryptionKey_obj
= {
232 'keytype': self
.etype
,
233 'keyvalue': self
.key
.contents
,
235 return EncryptionKey_obj
238 class KerberosCredentials(Credentials
):
240 super(KerberosCredentials
, self
).__init
__()
242 all_enc_types |
= security
.KERB_ENCTYPE_RC4_HMAC_MD5
243 all_enc_types |
= security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
244 all_enc_types |
= security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
246 self
.as_supported_enctypes
= all_enc_types
247 self
.tgs_supported_enctypes
= all_enc_types
248 self
.ap_supported_enctypes
= all_enc_types
251 self
.forced_keys
= {}
253 self
.forced_salt
= None
255 def set_as_supported_enctypes(self
, value
):
256 self
.as_supported_enctypes
= int(value
)
258 def set_tgs_supported_enctypes(self
, value
):
259 self
.tgs_supported_enctypes
= int(value
)
261 def set_ap_supported_enctypes(self
, value
):
262 self
.ap_supported_enctypes
= int(value
)
264 def _get_krb5_etypes(self
, supported_enctypes
):
267 if supported_enctypes
& security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
:
268 etypes
+= (kcrypto
.Enctype
.AES256
,)
269 if supported_enctypes
& security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
:
270 etypes
+= (kcrypto
.Enctype
.AES128
,)
271 if supported_enctypes
& security
.KERB_ENCTYPE_RC4_HMAC_MD5
:
272 etypes
+= (kcrypto
.Enctype
.RC4
,)
276 def get_as_krb5_etypes(self
):
277 return self
._get
_krb
5_etypes
(self
.as_supported_enctypes
)
279 def get_tgs_krb5_etypes(self
):
280 return self
._get
_krb
5_etypes
(self
.tgs_supported_enctypes
)
282 def get_ap_krb5_etypes(self
):
283 return self
._get
_krb
5_etypes
(self
.ap_supported_enctypes
)
285 def set_kvno(self
, kvno
):
291 def set_forced_key(self
, etype
, hexkey
):
293 contents
= binascii
.a2b_hex(hexkey
)
294 key
= kcrypto
.Key(etype
, contents
)
295 self
.forced_keys
[etype
] = Krb5EncryptionKey(key
, self
.kvno
)
297 def get_forced_key(self
, etype
):
299 return self
.forced_keys
.get(etype
, None)
301 def set_forced_salt(self
, salt
):
302 self
.forced_salt
= bytes(salt
)
304 def get_forced_salt(self
):
305 return self
.forced_salt
308 if self
.forced_salt
is not None:
309 return self
.forced_salt
311 if self
.get_workstation():
312 salt_string
= '%shost%s.%s' % (
313 self
.get_realm().upper(),
314 self
.get_username().lower().rsplit('$', 1)[0],
315 self
.get_realm().lower())
317 salt_string
= self
.get_realm().upper() + self
.get_username()
319 return salt_string
.encode('utf-8')
322 class KerberosTicketCreds
:
323 def __init__(self
, ticket
, session_key
,
324 crealm
=None, cname
=None,
325 srealm
=None, sname
=None,
328 encpart_private
=None):
330 self
.session_key
= session_key
335 self
.decryption_key
= decryption_key
336 self
.ticket_private
= ticket_private
337 self
.encpart_private
= encpart_private
340 class RawKerberosTest(TestCaseInTempDir
):
341 """A raw Kerberos Test case."""
344 {"value": -1111, "name": "dummy", },
345 {"value": kcrypto
.Enctype
.AES256
, "name": "aes128", },
346 {"value": kcrypto
.Enctype
.AES128
, "name": "aes256", },
347 {"value": kcrypto
.Enctype
.RC4
, "name": "rc4", },
350 setup_etype_test_permutations_done
= False
353 def setup_etype_test_permutations(cls
):
354 if cls
.setup_etype_test_permutations_done
:
359 num_idxs
= len(cls
.etypes_to_test
)
361 for num
in range(1, num_idxs
+ 1):
362 chunk
= list(itertools
.permutations(range(num_idxs
), num
))
365 permutations
.append(el
)
367 for p
in permutations
:
371 n
= cls
.etypes_to_test
[idx
]["name"]
376 etypes
+= (cls
.etypes_to_test
[idx
]["value"],)
378 r
= {"name": name
, "etypes": etypes
, }
381 cls
.etype_test_permutations
= res
382 cls
.setup_etype_test_permutations_done
= True
385 def etype_test_permutation_name_idx(cls
):
386 cls
.setup_etype_test_permutations()
389 for e
in cls
.etype_test_permutations
:
395 def etype_test_permutation_by_idx(self
, idx
):
396 e
= self
.etype_test_permutations
[idx
]
397 return (e
['name'], e
['etypes'])
403 cls
.host
= samba
.tests
.env_get_var_value('SERVER')
405 # A dictionary containing credentials that have already been
411 self
.do_asn1_print
= False
412 self
.do_hexdump
= False
414 strict_checking
= samba
.tests
.env_get_var_value('STRICT_CHECKING',
416 if strict_checking
is None:
417 strict_checking
= '1'
418 self
.strict_checking
= bool(int(strict_checking
))
422 self
.unspecified_kvno
= object()
425 self
._disconnect
("tearDown")
428 def _disconnect(self
, reason
):
434 sys
.stderr
.write("disconnect[%s]\n" % reason
)
436 def _connect_tcp(self
):
439 self
.a
= socket
.getaddrinfo(self
.host
, tcp_port
, socket
.AF_UNSPEC
,
440 socket
.SOCK_STREAM
, socket
.SOL_TCP
,
442 self
.s
= socket
.socket(self
.a
[0][0], self
.a
[0][1], self
.a
[0][2])
443 self
.s
.settimeout(10)
444 self
.s
.connect(self
.a
[0][4])
453 self
.assertNotConnected()
456 sys
.stderr
.write("connected[%s]\n" % self
.host
)
458 def env_get_var(self
, varname
, prefix
,
459 fallback_default
=True,
460 allow_missing
=False):
462 if prefix
is not None:
463 allow_missing_prefix
= allow_missing
or fallback_default
464 val
= samba
.tests
.env_get_var_value(
465 '%s_%s' % (prefix
, varname
),
466 allow_missing
=allow_missing_prefix
)
468 fallback_default
= True
469 if val
is None and fallback_default
:
470 val
= samba
.tests
.env_get_var_value(varname
,
471 allow_missing
=allow_missing
)
474 def _get_krb5_creds_from_env(self
, prefix
,
475 default_username
=None,
476 allow_missing_password
=False,
477 allow_missing_keys
=True,
478 require_strongest_key
=False):
479 c
= KerberosCredentials()
482 domain
= self
.env_get_var('DOMAIN', prefix
)
483 realm
= self
.env_get_var('REALM', prefix
)
484 allow_missing_username
= default_username
is not None
485 username
= self
.env_get_var('USERNAME', prefix
,
486 fallback_default
=False,
487 allow_missing
=allow_missing_username
)
489 username
= default_username
490 password
= self
.env_get_var('PASSWORD', prefix
,
491 fallback_default
=False,
492 allow_missing
=allow_missing_password
)
495 c
.set_username(username
)
496 if password
is not None:
497 c
.set_password(password
)
498 as_supported_enctypes
= self
.env_get_var('AS_SUPPORTED_ENCTYPES',
499 prefix
, allow_missing
=True)
500 if as_supported_enctypes
is not None:
501 c
.set_as_supported_enctypes(as_supported_enctypes
)
502 tgs_supported_enctypes
= self
.env_get_var('TGS_SUPPORTED_ENCTYPES',
503 prefix
, allow_missing
=True)
504 if tgs_supported_enctypes
is not None:
505 c
.set_tgs_supported_enctypes(tgs_supported_enctypes
)
506 ap_supported_enctypes
= self
.env_get_var('AP_SUPPORTED_ENCTYPES',
507 prefix
, allow_missing
=True)
508 if ap_supported_enctypes
is not None:
509 c
.set_ap_supported_enctypes(ap_supported_enctypes
)
511 if require_strongest_key
:
512 kvno_allow_missing
= False
514 aes256_allow_missing
= False
516 aes256_allow_missing
= True
518 kvno_allow_missing
= allow_missing_keys
519 aes256_allow_missing
= allow_missing_keys
520 kvno
= self
.env_get_var('KVNO', prefix
,
521 fallback_default
=False,
522 allow_missing
=kvno_allow_missing
)
525 aes256_key
= self
.env_get_var('AES256_KEY_HEX', prefix
,
526 fallback_default
=False,
527 allow_missing
=aes256_allow_missing
)
528 if aes256_key
is not None:
529 c
.set_forced_key(kcrypto
.Enctype
.AES256
, aes256_key
)
530 aes128_key
= self
.env_get_var('AES128_KEY_HEX', prefix
,
531 fallback_default
=False,
533 if aes128_key
is not None:
534 c
.set_forced_key(kcrypto
.Enctype
.AES128
, aes128_key
)
535 rc4_key
= self
.env_get_var('RC4_KEY_HEX', prefix
,
536 fallback_default
=False, allow_missing
=True)
537 if rc4_key
is not None:
538 c
.set_forced_key(kcrypto
.Enctype
.RC4
, rc4_key
)
540 if not allow_missing_keys
:
541 self
.assertTrue(c
.forced_keys
,
542 'Please supply %s encryption keys '
543 'in environment' % prefix
)
547 def _get_krb5_creds(self
,
549 default_username
=None,
550 allow_missing_password
=False,
551 allow_missing_keys
=True,
552 require_strongest_key
=False,
553 fallback_creds_fn
=None):
554 if prefix
in self
.creds_dict
:
555 return self
.creds_dict
[prefix
]
557 # We don't have the credentials already
561 # Try to obtain them from the environment
562 creds
= self
._get
_krb
5_creds
_from
_env
(
564 default_username
=default_username
,
565 allow_missing_password
=allow_missing_password
,
566 allow_missing_keys
=allow_missing_keys
,
567 require_strongest_key
=require_strongest_key
)
568 except Exception as err
:
569 # An error occurred, so save it for later
572 self
.assertIsNotNone(creds
)
573 # Save the obtained credentials
574 self
.creds_dict
[prefix
] = creds
577 if fallback_creds_fn
is not None:
579 # Try to use the fallback method
580 creds
= fallback_creds_fn()
581 except Exception as err
:
582 print("ERROR FROM ENV: %r" % (env_err
))
583 print("FALLBACK-FN: %s" % (fallback_creds_fn
))
584 print("FALLBACK-ERROR: %r" % (err
))
586 self
.assertIsNotNone(creds
)
587 # Save the obtained credentials
588 self
.creds_dict
[prefix
] = creds
591 # Both methods failed, so raise the exception from the
595 def get_user_creds(self
,
596 allow_missing_password
=False,
597 allow_missing_keys
=True):
598 c
= self
._get
_krb
5_creds
(prefix
=None,
599 allow_missing_password
=allow_missing_password
,
600 allow_missing_keys
=allow_missing_keys
)
603 def get_service_creds(self
,
604 allow_missing_password
=False,
605 allow_missing_keys
=True):
606 c
= self
._get
_krb
5_creds
(prefix
='SERVICE',
607 allow_missing_password
=allow_missing_password
,
608 allow_missing_keys
=allow_missing_keys
)
611 def get_client_creds(self
,
612 allow_missing_password
=False,
613 allow_missing_keys
=True):
614 c
= self
._get
_krb
5_creds
(prefix
='CLIENT',
615 allow_missing_password
=allow_missing_password
,
616 allow_missing_keys
=allow_missing_keys
)
619 def get_server_creds(self
,
620 allow_missing_password
=False,
621 allow_missing_keys
=True):
622 c
= self
._get
_krb
5_creds
(prefix
='SERVER',
623 allow_missing_password
=allow_missing_password
,
624 allow_missing_keys
=allow_missing_keys
)
627 def get_admin_creds(self
,
628 allow_missing_password
=False,
629 allow_missing_keys
=True):
630 c
= self
._get
_krb
5_creds
(prefix
='ADMIN',
631 allow_missing_password
=allow_missing_password
,
632 allow_missing_keys
=allow_missing_keys
)
633 c
.set_gensec_features(c
.get_gensec_features() | FEATURE_SEAL
)
636 def get_krbtgt_creds(self
,
638 require_strongest_key
=False):
639 if require_strongest_key
:
640 self
.assertTrue(require_keys
)
641 c
= self
._get
_krb
5_creds
(prefix
='KRBTGT',
642 default_username
='krbtgt',
643 allow_missing_password
=True,
644 allow_missing_keys
=not require_keys
,
645 require_strongest_key
=require_strongest_key
)
648 def get_anon_creds(self
):
653 def asn1_dump(self
, name
, obj
, asn1_print
=None):
654 if asn1_print
is None:
655 asn1_print
= self
.do_asn1_print
658 sys
.stderr
.write("%s:\n%s" % (name
, obj
))
660 sys
.stderr
.write("%s" % (obj
))
662 def hex_dump(self
, name
, blob
, hexdump
=None):
664 hexdump
= self
.do_hexdump
667 "%s: %d\n%s" % (name
, len(blob
), self
.hexdump(blob
)))
676 if asn1Spec
is not None:
677 class_name
= type(asn1Spec
).__name
__.split(':')[0]
679 class_name
= "<None-asn1Spec>"
680 self
.hex_dump(class_name
, blob
, hexdump
=hexdump
)
681 obj
, _
= pyasn1_der_decode(blob
, asn1Spec
=asn1Spec
)
682 self
.asn1_dump(None, obj
, asn1_print
=asn1_print
)
684 obj
= pyasn1_native_encode(obj
)
695 obj
= pyasn1_native_decode(obj
, asn1Spec
=asn1Spec
)
696 class_name
= type(obj
).__name
__.split(':')[0]
697 if class_name
is not None:
698 self
.asn1_dump(None, obj
, asn1_print
=asn1_print
)
699 blob
= pyasn1_der_encode(obj
)
700 if class_name
is not None:
701 self
.hex_dump(class_name
, blob
, hexdump
=hexdump
)
704 def send_pdu(self
, req
, asn1_print
=None, hexdump
=None):
706 k5_pdu
= self
.der_encode(
707 req
, native_decode
=False, asn1_print
=asn1_print
, hexdump
=False)
708 header
= struct
.pack('>I', len(k5_pdu
))
711 self
.hex_dump("send_pdu", header
, hexdump
=hexdump
)
712 self
.hex_dump("send_pdu", k5_pdu
, hexdump
=hexdump
)
714 sent
= self
.s
.send(req_pdu
, 0)
715 if sent
== len(req_pdu
):
717 req_pdu
= req_pdu
[sent
:]
718 except socket
.error
as e
:
719 self
._disconnect
("send_pdu: %s" % e
)
722 self
._disconnect
("send_pdu: %s" % e
)
725 def recv_raw(self
, num_recv
=0xffff, hexdump
=None, timeout
=None):
728 if timeout
is not None:
729 self
.s
.settimeout(timeout
)
730 rep_pdu
= self
.s
.recv(num_recv
, 0)
731 self
.s
.settimeout(10)
732 if len(rep_pdu
) == 0:
733 self
._disconnect
("recv_raw: EOF")
735 self
.hex_dump("recv_raw", rep_pdu
, hexdump
=hexdump
)
736 except socket
.timeout
:
737 self
.s
.settimeout(10)
738 sys
.stderr
.write("recv_raw: TIMEOUT\n")
739 except socket
.error
as e
:
740 self
._disconnect
("recv_raw: %s" % e
)
743 self
._disconnect
("recv_raw: %s" % e
)
747 def recv_pdu_raw(self
, asn1_print
=None, hexdump
=None, timeout
=None):
750 raw_pdu
= self
.recv_raw(
751 num_recv
=4, hexdump
=hexdump
, timeout
=timeout
)
754 header
= struct
.unpack(">I", raw_pdu
[0:4])
761 raw_pdu
= self
.recv_raw(
762 num_recv
=missing
, hexdump
=hexdump
, timeout
=timeout
)
763 self
.assertGreaterEqual(len(raw_pdu
), 1)
765 missing
= k5_len
- len(rep_pdu
)
766 k5_raw
= self
.der_decode(
772 pvno
= k5_raw
['field-0']
773 self
.assertEqual(pvno
, 5)
774 msg_type
= k5_raw
['field-1']
775 self
.assertIn(msg_type
, [KRB_AS_REP
, KRB_TGS_REP
, KRB_ERROR
])
776 if msg_type
== KRB_AS_REP
:
777 asn1Spec
= krb5_asn1
.AS_REP()
778 elif msg_type
== KRB_TGS_REP
:
779 asn1Spec
= krb5_asn1
.TGS_REP()
780 elif msg_type
== KRB_ERROR
:
781 asn1Spec
= krb5_asn1
.KRB_ERROR()
782 rep
= self
.der_decode(rep_pdu
, asn1Spec
=asn1Spec
,
783 asn1_print
=asn1_print
, hexdump
=False)
784 return (rep
, rep_pdu
)
786 def recv_pdu(self
, asn1_print
=None, hexdump
=None, timeout
=None):
787 (rep
, rep_pdu
) = self
.recv_pdu_raw(asn1_print
=asn1_print
,
792 def assertIsConnected(self
):
793 self
.assertIsNotNone(self
.s
, msg
="Not connected")
795 def assertNotConnected(self
):
796 self
.assertIsNone(self
.s
, msg
="Is connected")
798 def send_recv_transaction(
806 self
.send_pdu(req
, asn1_print
=asn1_print
, hexdump
=hexdump
)
808 asn1_print
=asn1_print
, hexdump
=hexdump
, timeout
=timeout
)
810 self
._disconnect
("transaction failed")
812 self
._disconnect
("transaction done")
815 def assertNoValue(self
, value
):
816 self
.assertTrue(value
.isNoValue
)
818 def assertHasValue(self
, value
):
819 self
.assertIsNotNone(value
)
821 def getElementValue(self
, obj
, elem
):
822 return obj
.get(elem
, None)
824 def assertElementMissing(self
, obj
, elem
):
825 v
= self
.getElementValue(obj
, elem
)
828 def assertElementPresent(self
, obj
, elem
):
829 v
= self
.getElementValue(obj
, elem
)
830 self
.assertIsNotNone(v
)
831 if self
.strict_checking
:
832 if isinstance(v
, collections
.abc
.Container
):
833 self
.assertNotEqual(0, len(v
))
835 def assertElementEqual(self
, obj
, elem
, value
):
836 v
= self
.getElementValue(obj
, elem
)
837 self
.assertIsNotNone(v
)
838 self
.assertEqual(v
, value
)
840 def assertElementEqualUTF8(self
, obj
, elem
, value
):
841 v
= self
.getElementValue(obj
, elem
)
842 self
.assertIsNotNone(v
)
843 self
.assertEqual(v
, bytes(value
, 'utf8'))
845 def assertPrincipalEqual(self
, princ1
, princ2
):
846 self
.assertEqual(princ1
['name-type'], princ2
['name-type'])
848 len(princ1
['name-string']),
849 len(princ2
['name-string']),
850 msg
="princ1=%s != princ2=%s" % (princ1
, princ2
))
851 for idx
in range(len(princ1
['name-string'])):
853 princ1
['name-string'][idx
],
854 princ2
['name-string'][idx
],
855 msg
="princ1=%s != princ2=%s" % (princ1
, princ2
))
857 def assertElementEqualPrincipal(self
, obj
, elem
, value
):
858 v
= self
.getElementValue(obj
, elem
)
859 self
.assertIsNotNone(v
)
860 v
= pyasn1_native_decode(v
, asn1Spec
=krb5_asn1
.PrincipalName())
861 self
.assertPrincipalEqual(v
, value
)
863 def assertElementKVNO(self
, obj
, elem
, value
):
864 v
= self
.getElementValue(obj
, elem
)
865 if value
== "autodetect":
867 if value
is not None:
868 self
.assertIsNotNone(v
)
869 # The value on the wire should never be 0
870 self
.assertNotEqual(v
, 0)
871 # unspecified_kvno means we don't know the kvno,
872 # but want to enforce its presence
873 if value
is not self
.unspecified_kvno
:
875 self
.assertNotEqual(value
, 0)
876 self
.assertEqual(v
, value
)
880 def get_KerberosTimeWithUsec(self
, epoch
=None, offset
=None):
883 if offset
is not None:
884 epoch
= epoch
+ int(offset
)
885 dt
= datetime
.datetime
.fromtimestamp(epoch
, tz
=datetime
.timezone
.utc
)
886 return (dt
.strftime("%Y%m%d%H%M%SZ"), dt
.microsecond
)
888 def get_KerberosTime(self
, epoch
=None, offset
=None):
889 (s
, _
) = self
.get_KerberosTimeWithUsec(epoch
=epoch
, offset
=offset
)
892 def get_EpochFromKerberosTime(self
, kerberos_time
):
893 if isinstance(kerberos_time
, bytes
):
894 kerberos_time
= kerberos_time
.decode()
896 epoch
= datetime
.datetime
.strptime(kerberos_time
,
898 epoch
= epoch
.replace(tzinfo
=datetime
.timezone
.utc
)
899 epoch
= int(epoch
.timestamp())
904 nonce_min
= 0x7f000000
905 nonce_max
= 0x7fffffff
906 v
= random
.randint(nonce_min
, nonce_max
)
909 def get_pa_dict(self
, pa_data
):
912 if pa_data
is not None:
914 pa_type
= pa
['padata-type']
915 if pa_type
in pa_dict
:
916 raise RuntimeError(f
'Duplicate type {pa_type}')
917 pa_dict
[pa_type
] = pa
['padata-value']
921 def SessionKey_create(self
, etype
, contents
, kvno
=None):
922 key
= kcrypto
.Key(etype
, contents
)
923 return Krb5EncryptionKey(key
, kvno
)
925 def PasswordKey_create(self
, etype
=None, pwd
=None, salt
=None, kvno
=None):
926 self
.assertIsNotNone(pwd
)
927 self
.assertIsNotNone(salt
)
928 key
= kcrypto
.string_to_key(etype
, pwd
, salt
)
929 return Krb5EncryptionKey(key
, kvno
)
931 def PasswordKey_from_etype_info2(self
, creds
, etype_info2
, kvno
=None):
932 e
= etype_info2
['etype']
934 salt
= etype_info2
.get('salt', None)
936 if e
== kcrypto
.Enctype
.RC4
:
937 nthash
= creds
.get_nt_hash()
938 return self
.SessionKey_create(etype
=e
, contents
=nthash
, kvno
=kvno
)
940 password
= creds
.get_password()
941 return self
.PasswordKey_create(
942 etype
=e
, pwd
=password
, salt
=salt
, kvno
=kvno
)
944 def TicketDecryptionKey_from_creds(self
, creds
, etype
=None):
947 etypes
= creds
.get_tgs_krb5_etypes()
950 forced_key
= creds
.get_forced_key(etype
)
951 if forced_key
is not None:
954 kvno
= creds
.get_kvno()
956 fail_msg
= ("%s has no fixed key for etype[%s] kvno[%s] "
957 "nor a password specified, " % (
958 creds
.get_username(), etype
, kvno
))
960 if etype
== kcrypto
.Enctype
.RC4
:
961 nthash
= creds
.get_nt_hash()
962 self
.assertIsNotNone(nthash
, msg
=fail_msg
)
963 return self
.SessionKey_create(etype
=etype
,
967 password
= creds
.get_password()
968 self
.assertIsNotNone(password
, msg
=fail_msg
)
969 salt
= creds
.get_salt()
970 return self
.PasswordKey_create(etype
=etype
,
975 def RandomKey(self
, etype
):
976 e
= kcrypto
._get
_enctype
_profile
(etype
)
977 contents
= samba
.generate_random_bytes(e
.keysize
)
978 return self
.SessionKey_create(etype
=etype
, contents
=contents
)
980 def EncryptionKey_import(self
, EncryptionKey_obj
):
981 return self
.SessionKey_create(EncryptionKey_obj
['keytype'],
982 EncryptionKey_obj
['keyvalue'])
984 def EncryptedData_create(self
, key
, usage
, plaintext
):
985 # EncryptedData ::= SEQUENCE {
986 # etype [0] Int32 -- EncryptionType --,
987 # kvno [1] UInt32 OPTIONAL,
988 # cipher [2] OCTET STRING -- ciphertext
990 ciphertext
= key
.encrypt(usage
, plaintext
)
991 EncryptedData_obj
= {
995 if key
.kvno
is not None:
996 EncryptedData_obj
['kvno'] = key
.kvno
997 return EncryptedData_obj
999 def Checksum_create(self
, key
, usage
, plaintext
, ctype
=None):
1000 # Checksum ::= SEQUENCE {
1001 # cksumtype [0] Int32,
1002 # checksum [1] OCTET STRING
1006 checksum
= key
.make_checksum(usage
, plaintext
, ctype
=ctype
)
1009 'checksum': checksum
,
1014 def PrincipalName_create(cls
, name_type
, names
):
1015 # PrincipalName ::= SEQUENCE {
1016 # name-type [0] Int32,
1017 # name-string [1] SEQUENCE OF KerberosString
1019 PrincipalName_obj
= {
1020 'name-type': name_type
,
1021 'name-string': names
,
1023 return PrincipalName_obj
1025 def AuthorizationData_create(self
, ad_type
, ad_data
):
1026 # AuthorizationData ::= SEQUENCE {
1027 # ad-type [0] Int32,
1028 # ad-data [1] OCTET STRING
1034 return AUTH_DATA_obj
1036 def PA_DATA_create(self
, padata_type
, padata_value
):
1037 # PA-DATA ::= SEQUENCE {
1038 # -- NOTE: first tag is [1], not [0]
1039 # padata-type [1] Int32,
1040 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1043 'padata-type': padata_type
,
1044 'padata-value': padata_value
,
1048 def PA_ENC_TS_ENC_create(self
, ts
, usec
):
1049 # PA-ENC-TS-ENC ::= SEQUENCE {
1050 # patimestamp[0] KerberosTime, -- client's time
1051 # pausec[1] krb5int32 OPTIONAL
1053 PA_ENC_TS_ENC_obj
= {
1057 return PA_ENC_TS_ENC_obj
1059 def PA_PAC_OPTIONS_create(self
, options
):
1060 # PA-PAC-OPTIONS ::= SEQUENCE {
1061 # options [0] PACOptionFlags
1063 PA_PAC_OPTIONS_obj
= {
1066 return PA_PAC_OPTIONS_obj
1068 def KRB_FAST_ARMOR_create(self
, armor_type
, armor_value
):
1069 # KrbFastArmor ::= SEQUENCE {
1070 # armor-type [0] Int32,
1071 # armor-value [1] OCTET STRING,
1074 KRB_FAST_ARMOR_obj
= {
1075 'armor-type': armor_type
,
1076 'armor-value': armor_value
1078 return KRB_FAST_ARMOR_obj
1080 def KRB_FAST_REQ_create(self
, fast_options
, padata
, req_body
):
1081 # KrbFastReq ::= SEQUENCE {
1082 # fast-options [0] FastOptions,
1083 # padata [1] SEQUENCE OF PA-DATA,
1084 # req-body [2] KDC-REQ-BODY,
1087 KRB_FAST_REQ_obj
= {
1088 'fast-options': fast_options
,
1090 'req-body': req_body
1092 return KRB_FAST_REQ_obj
1094 def KRB_FAST_ARMORED_REQ_create(self
, armor
, req_checksum
, enc_fast_req
):
1095 # KrbFastArmoredReq ::= SEQUENCE {
1096 # armor [0] KrbFastArmor OPTIONAL,
1097 # req-checksum [1] Checksum,
1098 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1100 KRB_FAST_ARMORED_REQ_obj
= {
1101 'req-checksum': req_checksum
,
1102 'enc-fast-req': enc_fast_req
1104 if armor
is not None:
1105 KRB_FAST_ARMORED_REQ_obj
['armor'] = armor
1106 return KRB_FAST_ARMORED_REQ_obj
1108 def PA_FX_FAST_REQUEST_create(self
, armored_data
):
1109 # PA-FX-FAST-REQUEST ::= CHOICE {
1110 # armored-data [0] KrbFastArmoredReq,
1113 PA_FX_FAST_REQUEST_obj
= {
1114 'armored-data': armored_data
1116 return PA_FX_FAST_REQUEST_obj
1118 def KERB_PA_PAC_REQUEST_create(self
, include_pac
, pa_data_create
=True):
1119 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1120 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1122 # --If FALSE, and PAC present,
1125 KERB_PA_PAC_REQUEST_obj
= {
1126 'include-pac': include_pac
,
1128 if not pa_data_create
:
1129 return KERB_PA_PAC_REQUEST_obj
1130 pa_pac
= self
.der_encode(KERB_PA_PAC_REQUEST_obj
,
1131 asn1Spec
=krb5_asn1
.KERB_PA_PAC_REQUEST())
1132 pa_data
= self
.PA_DATA_create(PADATA_PAC_REQUEST
, pa_pac
)
1135 def KDC_REQ_BODY_create(self
,
1147 EncAuthorizationData
,
1148 EncAuthorizationData_key
,
1149 EncAuthorizationData_usage
,
1152 # KDC-REQ-BODY ::= SEQUENCE {
1153 # kdc-options [0] KDCOptions,
1154 # cname [1] PrincipalName OPTIONAL
1155 # -- Used only in AS-REQ --,
1158 # -- Also client's in AS-REQ --,
1159 # sname [3] PrincipalName OPTIONAL,
1160 # from [4] KerberosTime OPTIONAL,
1161 # till [5] KerberosTime,
1162 # rtime [6] KerberosTime OPTIONAL,
1164 # etype [8] SEQUENCE OF Int32
1166 # -- in preference order --,
1167 # addresses [9] HostAddresses OPTIONAL,
1168 # enc-authorization-data [10] EncryptedData OPTIONAL
1169 # -- AuthorizationData --,
1170 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1171 # -- NOTE: not empty
1173 if EncAuthorizationData
is not None:
1174 enc_ad_plain
= self
.der_encode(
1175 EncAuthorizationData
,
1176 asn1Spec
=krb5_asn1
.AuthorizationData(),
1177 asn1_print
=asn1_print
,
1179 enc_ad
= self
.EncryptedData_create(EncAuthorizationData_key
,
1180 EncAuthorizationData_usage
,
1184 KDC_REQ_BODY_obj
= {
1185 'kdc-options': kdc_options
,
1191 if cname
is not None:
1192 KDC_REQ_BODY_obj
['cname'] = cname
1193 if sname
is not None:
1194 KDC_REQ_BODY_obj
['sname'] = sname
1195 if from_time
is not None:
1196 KDC_REQ_BODY_obj
['from'] = from_time
1197 if renew_time
is not None:
1198 KDC_REQ_BODY_obj
['rtime'] = renew_time
1199 if addresses
is not None:
1200 KDC_REQ_BODY_obj
['addresses'] = addresses
1201 if enc_ad
is not None:
1202 KDC_REQ_BODY_obj
['enc-authorization-data'] = enc_ad
1203 if additional_tickets
is not None:
1204 KDC_REQ_BODY_obj
['additional-tickets'] = additional_tickets
1205 return KDC_REQ_BODY_obj
1207 def KDC_REQ_create(self
,
1214 # KDC-REQ ::= SEQUENCE {
1215 # -- NOTE: first tag is [1], not [0]
1216 # pvno [1] INTEGER (5) ,
1217 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1218 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1219 # -- NOTE: not empty --,
1220 # req-body [4] KDC-REQ-BODY
1225 'msg-type': msg_type
,
1226 'req-body': req_body
,
1228 if padata
is not None:
1229 KDC_REQ_obj
['padata'] = padata
1230 if asn1Spec
is not None:
1231 KDC_REQ_decoded
= pyasn1_native_decode(
1232 KDC_REQ_obj
, asn1Spec
=asn1Spec
)
1234 KDC_REQ_decoded
= None
1235 return KDC_REQ_obj
, KDC_REQ_decoded
1237 def AS_REQ_create(self
,
1239 kdc_options
, # required
1243 from_time
, # optional
1244 till_time
, # required
1245 renew_time
, # optional
1248 addresses
, # optional
1250 native_decoded_only
=True,
1253 # KDC-REQ ::= SEQUENCE {
1254 # -- NOTE: first tag is [1], not [0]
1255 # pvno [1] INTEGER (5) ,
1256 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1257 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1258 # -- NOTE: not empty --,
1259 # req-body [4] KDC-REQ-BODY
1262 # KDC-REQ-BODY ::= SEQUENCE {
1263 # kdc-options [0] KDCOptions,
1264 # cname [1] PrincipalName OPTIONAL
1265 # -- Used only in AS-REQ --,
1268 # -- Also client's in AS-REQ --,
1269 # sname [3] PrincipalName OPTIONAL,
1270 # from [4] KerberosTime OPTIONAL,
1271 # till [5] KerberosTime,
1272 # rtime [6] KerberosTime OPTIONAL,
1274 # etype [8] SEQUENCE OF Int32
1276 # -- in preference order --,
1277 # addresses [9] HostAddresses OPTIONAL,
1278 # enc-authorization-data [10] EncryptedData OPTIONAL
1279 # -- AuthorizationData --,
1280 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1281 # -- NOTE: not empty
1283 KDC_REQ_BODY_obj
= self
.KDC_REQ_BODY_create(
1295 EncAuthorizationData
=None,
1296 EncAuthorizationData_key
=None,
1297 EncAuthorizationData_usage
=None,
1298 asn1_print
=asn1_print
,
1300 obj
, decoded
= self
.KDC_REQ_create(
1301 msg_type
=KRB_AS_REQ
,
1303 req_body
=KDC_REQ_BODY_obj
,
1304 asn1Spec
=krb5_asn1
.AS_REQ(),
1305 asn1_print
=asn1_print
,
1307 if native_decoded_only
:
1311 def AP_REQ_create(self
, ap_options
, ticket
, authenticator
):
1312 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1313 # pvno [0] INTEGER (5),
1314 # msg-type [1] INTEGER (14),
1315 # ap-options [2] APOptions,
1316 # ticket [3] Ticket,
1317 # authenticator [4] EncryptedData -- Authenticator
1321 'msg-type': KRB_AP_REQ
,
1322 'ap-options': ap_options
,
1324 'authenticator': authenticator
,
1328 def Authenticator_create(
1329 self
, crealm
, cname
, cksum
, cusec
, ctime
, subkey
, seq_number
,
1330 authorization_data
):
1331 # -- Unencrypted authenticator
1332 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1333 # authenticator-vno [0] INTEGER (5),
1335 # cname [2] PrincipalName,
1336 # cksum [3] Checksum OPTIONAL,
1337 # cusec [4] Microseconds,
1338 # ctime [5] KerberosTime,
1339 # subkey [6] EncryptionKey OPTIONAL,
1340 # seq-number [7] UInt32 OPTIONAL,
1341 # authorization-data [8] AuthorizationData OPTIONAL
1343 Authenticator_obj
= {
1344 'authenticator-vno': 5,
1350 if cksum
is not None:
1351 Authenticator_obj
['cksum'] = cksum
1352 if subkey
is not None:
1353 Authenticator_obj
['subkey'] = subkey
1354 if seq_number
is not None:
1355 Authenticator_obj
['seq-number'] = seq_number
1356 if authorization_data
is not None:
1357 Authenticator_obj
['authorization-data'] = authorization_data
1358 return Authenticator_obj
1360 def TGS_REQ_create(self
,
1365 kdc_options
, # required
1369 from_time
, # optional
1370 till_time
, # required
1371 renew_time
, # optional
1374 addresses
, # optional
1375 EncAuthorizationData
,
1376 EncAuthorizationData_key
,
1379 authenticator_subkey
=None,
1380 body_checksum_type
=None,
1381 native_decoded_only
=True,
1384 # KDC-REQ ::= SEQUENCE {
1385 # -- NOTE: first tag is [1], not [0]
1386 # pvno [1] INTEGER (5) ,
1387 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1388 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1389 # -- NOTE: not empty --,
1390 # req-body [4] KDC-REQ-BODY
1393 # KDC-REQ-BODY ::= SEQUENCE {
1394 # kdc-options [0] KDCOptions,
1395 # cname [1] PrincipalName OPTIONAL
1396 # -- Used only in AS-REQ --,
1399 # -- Also client's in AS-REQ --,
1400 # sname [3] PrincipalName OPTIONAL,
1401 # from [4] KerberosTime OPTIONAL,
1402 # till [5] KerberosTime,
1403 # rtime [6] KerberosTime OPTIONAL,
1405 # etype [8] SEQUENCE OF Int32
1407 # -- in preference order --,
1408 # addresses [9] HostAddresses OPTIONAL,
1409 # enc-authorization-data [10] EncryptedData OPTIONAL
1410 # -- AuthorizationData --,
1411 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1412 # -- NOTE: not empty
1415 if authenticator_subkey
is not None:
1416 EncAuthorizationData_usage
= KU_TGS_REQ_AUTH_DAT_SUBKEY
1418 EncAuthorizationData_usage
= KU_TGS_REQ_AUTH_DAT_SESSION
1420 req_body
= self
.KDC_REQ_BODY_create(
1421 kdc_options
=kdc_options
,
1425 from_time
=from_time
,
1426 till_time
=till_time
,
1427 renew_time
=renew_time
,
1430 addresses
=addresses
,
1431 additional_tickets
=additional_tickets
,
1432 EncAuthorizationData
=EncAuthorizationData
,
1433 EncAuthorizationData_key
=EncAuthorizationData_key
,
1434 EncAuthorizationData_usage
=EncAuthorizationData_usage
)
1435 req_body_blob
= self
.der_encode(req_body
,
1436 asn1Spec
=krb5_asn1
.KDC_REQ_BODY(),
1437 asn1_print
=asn1_print
, hexdump
=hexdump
)
1439 req_body_checksum
= self
.Checksum_create(ticket_session_key
,
1440 KU_TGS_REQ_AUTH_CKSUM
,
1442 ctype
=body_checksum_type
)
1445 if authenticator_subkey
is not None:
1446 subkey_obj
= authenticator_subkey
.export_obj()
1447 seq_number
= random
.randint(0, 0xfffffffe)
1448 authenticator
= self
.Authenticator_create(
1451 cksum
=req_body_checksum
,
1455 seq_number
=seq_number
,
1456 authorization_data
=None)
1457 authenticator
= self
.der_encode(
1459 asn1Spec
=krb5_asn1
.Authenticator(),
1460 asn1_print
=asn1_print
,
1463 authenticator
= self
.EncryptedData_create(
1464 ticket_session_key
, KU_TGS_REQ_AUTH
, authenticator
)
1466 ap_options
= krb5_asn1
.APOptions('0')
1467 ap_req
= self
.AP_REQ_create(ap_options
=str(ap_options
),
1469 authenticator
=authenticator
)
1470 ap_req
= self
.der_encode(ap_req
, asn1Spec
=krb5_asn1
.AP_REQ(),
1471 asn1_print
=asn1_print
, hexdump
=hexdump
)
1472 pa_tgs_req
= self
.PA_DATA_create(PADATA_KDC_REQ
, ap_req
)
1473 if padata
is not None:
1474 padata
.append(pa_tgs_req
)
1476 padata
= [pa_tgs_req
]
1478 obj
, decoded
= self
.KDC_REQ_create(
1479 msg_type
=KRB_TGS_REQ
,
1482 asn1Spec
=krb5_asn1
.TGS_REQ(),
1483 asn1_print
=asn1_print
,
1485 if native_decoded_only
:
1489 def PA_S4U2Self_create(self
, name
, realm
, tgt_session_key
, ctype
=None):
1490 # PA-S4U2Self ::= SEQUENCE {
1491 # name [0] PrincipalName,
1493 # cksum [2] Checksum,
1494 # auth [3] GeneralString
1496 cksum_data
= name
['name-type'].to_bytes(4, byteorder
='little')
1497 for n
in name
['name-string']:
1498 cksum_data
+= n
.encode()
1499 cksum_data
+= realm
.encode()
1500 cksum_data
+= "Kerberos".encode()
1501 cksum
= self
.Checksum_create(tgt_session_key
,
1502 KU_NON_KERB_CKSUM_SALT
,
1512 pa_s4u2self
= self
.der_encode(
1513 PA_S4U2Self_obj
, asn1Spec
=krb5_asn1
.PA_S4U2Self())
1514 return self
.PA_DATA_create(PADATA_FOR_USER
, pa_s4u2self
)
1516 def _generic_kdc_exchange(self
,
1517 kdc_exchange_dict
, # required
1518 cname
=None, # optional
1519 realm
=None, # required
1520 sname
=None, # optional
1521 from_time
=None, # optional
1522 till_time
=None, # required
1523 renew_time
=None, # optional
1524 etypes
=None, # required
1525 addresses
=None, # optional
1526 additional_tickets
=None, # optional
1527 EncAuthorizationData
=None, # optional
1528 EncAuthorizationData_key
=None, # optional
1529 EncAuthorizationData_usage
=None): # optional
1531 check_error_fn
= kdc_exchange_dict
['check_error_fn']
1532 check_rep_fn
= kdc_exchange_dict
['check_rep_fn']
1533 generate_fast_fn
= kdc_exchange_dict
['generate_fast_fn']
1534 generate_fast_armor_fn
= kdc_exchange_dict
['generate_fast_armor_fn']
1535 generate_fast_padata_fn
= kdc_exchange_dict
['generate_fast_padata_fn']
1536 generate_padata_fn
= kdc_exchange_dict
['generate_padata_fn']
1537 callback_dict
= kdc_exchange_dict
['callback_dict']
1538 req_msg_type
= kdc_exchange_dict
['req_msg_type']
1539 req_asn1Spec
= kdc_exchange_dict
['req_asn1Spec']
1540 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
1542 expected_error_mode
= kdc_exchange_dict
['expected_error_mode']
1543 kdc_options
= kdc_exchange_dict
['kdc_options']
1545 # Parameters specific to the outer request body
1546 outer_req
= kdc_exchange_dict
['outer_req']
1548 if till_time
is None:
1549 till_time
= self
.get_KerberosTime(offset
=36000)
1551 if 'nonce' in kdc_exchange_dict
:
1552 nonce
= kdc_exchange_dict
['nonce']
1554 nonce
= self
.get_Nonce()
1555 kdc_exchange_dict
['nonce'] = nonce
1557 req_body
= self
.KDC_REQ_BODY_create(
1558 kdc_options
=kdc_options
,
1562 from_time
=from_time
,
1563 till_time
=till_time
,
1564 renew_time
=renew_time
,
1567 addresses
=addresses
,
1568 additional_tickets
=additional_tickets
,
1569 EncAuthorizationData
=EncAuthorizationData
,
1570 EncAuthorizationData_key
=EncAuthorizationData_key
,
1571 EncAuthorizationData_usage
=EncAuthorizationData_usage
)
1573 inner_req_body
= dict(req_body
)
1574 if outer_req
is not None:
1575 for key
, value
in outer_req
.items():
1576 if value
is not None:
1577 req_body
[key
] = value
1581 if req_msg_type
== KRB_AS_REQ
:
1583 tgs_req_padata
= None
1585 self
.assertEqual(KRB_TGS_REQ
, req_msg_type
)
1587 tgs_req
= self
.generate_ap_req(kdc_exchange_dict
,
1591 tgs_req_padata
= self
.PA_DATA_create(PADATA_KDC_REQ
, tgs_req
)
1593 if generate_fast_padata_fn
is not None:
1594 self
.assertIsNotNone(generate_fast_fn
)
1595 # This can alter req_body...
1596 fast_padata
, req_body
= generate_fast_padata_fn(kdc_exchange_dict
,
1602 if generate_fast_armor_fn
is not None:
1603 self
.assertIsNotNone(generate_fast_fn
)
1604 fast_ap_req
= generate_fast_armor_fn(kdc_exchange_dict
,
1609 fast_armor_type
= kdc_exchange_dict
['fast_armor_type']
1610 fast_armor
= self
.KRB_FAST_ARMOR_create(fast_armor_type
,
1615 if generate_padata_fn
is not None:
1616 # This can alter req_body...
1617 outer_padata
, req_body
= generate_padata_fn(kdc_exchange_dict
,
1620 self
.assertIsNotNone(outer_padata
)
1621 self
.assertNotIn(PADATA_KDC_REQ
,
1622 [pa
['padata-type'] for pa
in outer_padata
],
1623 'Don\'t create TGS-REQ manually')
1627 if generate_fast_fn
is not None:
1628 armor_key
= kdc_exchange_dict
['armor_key']
1629 self
.assertIsNotNone(armor_key
)
1631 if req_msg_type
== KRB_AS_REQ
:
1632 checksum_blob
= self
.der_encode(
1634 asn1Spec
=krb5_asn1
.KDC_REQ_BODY())
1636 self
.assertEqual(KRB_TGS_REQ
, req_msg_type
)
1637 checksum_blob
= tgs_req
1639 checksum
= self
.Checksum_create(armor_key
,
1643 fast
= generate_fast_fn(kdc_exchange_dict
,
1654 if tgs_req_padata
is not None:
1655 padata
.append(tgs_req_padata
)
1657 if fast
is not None:
1660 if outer_padata
is not None:
1661 padata
+= outer_padata
1666 kdc_exchange_dict
['req_padata'] = padata
1667 kdc_exchange_dict
['fast_padata'] = fast_padata
1668 kdc_exchange_dict
['req_body'] = inner_req_body
1670 req_obj
, req_decoded
= self
.KDC_REQ_create(msg_type
=req_msg_type
,
1673 asn1Spec
=req_asn1Spec())
1675 rep
= self
.send_recv_transaction(req_decoded
)
1676 self
.assertIsNotNone(rep
)
1678 msg_type
= self
.getElementValue(rep
, 'msg-type')
1679 self
.assertIsNotNone(msg_type
)
1681 expected_msg_type
= None
1682 if check_error_fn
is not None:
1683 expected_msg_type
= KRB_ERROR
1684 self
.assertIsNone(check_rep_fn
)
1685 self
.assertNotEqual(0, expected_error_mode
)
1686 if check_rep_fn
is not None:
1687 expected_msg_type
= rep_msg_type
1688 self
.assertIsNone(check_error_fn
)
1689 self
.assertEqual(0, expected_error_mode
)
1690 self
.assertIsNotNone(expected_msg_type
)
1691 self
.assertEqual(msg_type
, expected_msg_type
)
1693 if msg_type
== KRB_ERROR
:
1694 return check_error_fn(kdc_exchange_dict
,
1698 return check_rep_fn(kdc_exchange_dict
, callback_dict
, rep
)
1700 def as_exchange_dict(self
,
1701 expected_crealm
=None,
1702 expected_cname
=None,
1703 expected_cname_private
=None,
1704 expected_srealm
=None,
1705 expected_sname
=None,
1706 ticket_decryption_key
=None,
1707 generate_fast_fn
=None,
1708 generate_fast_armor_fn
=None,
1709 generate_fast_padata_fn
=None,
1710 fast_armor_type
=FX_FAST_ARMOR_AP_REQUEST
,
1711 generate_padata_fn
=None,
1712 check_error_fn
=None,
1714 check_padata_fn
=None,
1715 check_kdc_private_fn
=None,
1717 expected_error_mode
=0,
1718 client_as_etypes
=None,
1720 authenticator_subkey
=None,
1727 kdc_exchange_dict
= {
1728 'req_msg_type': KRB_AS_REQ
,
1729 'req_asn1Spec': krb5_asn1
.AS_REQ
,
1730 'rep_msg_type': KRB_AS_REP
,
1731 'rep_asn1Spec': krb5_asn1
.AS_REP
,
1732 'rep_encpart_asn1Spec': krb5_asn1
.EncASRepPart
,
1733 'expected_crealm': expected_crealm
,
1734 'expected_cname': expected_cname
,
1735 'expected_srealm': expected_srealm
,
1736 'expected_sname': expected_sname
,
1737 'ticket_decryption_key': ticket_decryption_key
,
1738 'generate_fast_fn': generate_fast_fn
,
1739 'generate_fast_armor_fn': generate_fast_armor_fn
,
1740 'generate_fast_padata_fn': generate_fast_padata_fn
,
1741 'fast_armor_type': fast_armor_type
,
1742 'generate_padata_fn': generate_padata_fn
,
1743 'check_error_fn': check_error_fn
,
1744 'check_rep_fn': check_rep_fn
,
1745 'check_padata_fn': check_padata_fn
,
1746 'check_kdc_private_fn': check_kdc_private_fn
,
1747 'callback_dict': callback_dict
,
1748 'expected_error_mode': expected_error_mode
,
1749 'client_as_etypes': client_as_etypes
,
1750 'expected_salt': expected_salt
,
1751 'authenticator_subkey': authenticator_subkey
,
1752 'armor_key': armor_key
,
1753 'armor_tgt': armor_tgt
,
1754 'armor_subkey': armor_subkey
,
1755 'auth_data': auth_data
,
1756 'kdc_options': kdc_options
,
1757 'outer_req': outer_req
1759 if expected_cname_private
is not None:
1760 kdc_exchange_dict
['expected_cname_private'] = (
1761 expected_cname_private
)
1763 if callback_dict
is None:
1766 return kdc_exchange_dict
1768 def tgs_exchange_dict(self
,
1769 expected_crealm
=None,
1770 expected_cname
=None,
1771 expected_cname_private
=None,
1772 expected_srealm
=None,
1773 expected_sname
=None,
1774 ticket_decryption_key
=None,
1775 generate_fast_fn
=None,
1776 generate_fast_armor_fn
=None,
1777 generate_fast_padata_fn
=None,
1778 fast_armor_type
=FX_FAST_ARMOR_AP_REQUEST
,
1779 generate_padata_fn
=None,
1780 check_error_fn
=None,
1782 check_padata_fn
=None,
1783 check_kdc_private_fn
=None,
1789 authenticator_subkey
=None,
1791 body_checksum_type
=None,
1794 kdc_exchange_dict
= {
1795 'req_msg_type': KRB_TGS_REQ
,
1796 'req_asn1Spec': krb5_asn1
.TGS_REQ
,
1797 'rep_msg_type': KRB_TGS_REP
,
1798 'rep_asn1Spec': krb5_asn1
.TGS_REP
,
1799 'rep_encpart_asn1Spec': krb5_asn1
.EncTGSRepPart
,
1800 'expected_crealm': expected_crealm
,
1801 'expected_cname': expected_cname
,
1802 'expected_srealm': expected_srealm
,
1803 'expected_sname': expected_sname
,
1804 'ticket_decryption_key': ticket_decryption_key
,
1805 'generate_fast_fn': generate_fast_fn
,
1806 'generate_fast_armor_fn': generate_fast_armor_fn
,
1807 'generate_fast_padata_fn': generate_fast_padata_fn
,
1808 'fast_armor_type': fast_armor_type
,
1809 'generate_padata_fn': generate_padata_fn
,
1810 'check_error_fn': check_error_fn
,
1811 'check_rep_fn': check_rep_fn
,
1812 'check_padata_fn': check_padata_fn
,
1813 'check_kdc_private_fn': check_kdc_private_fn
,
1814 'callback_dict': callback_dict
,
1816 'body_checksum_type': body_checksum_type
,
1817 'armor_key': armor_key
,
1818 'armor_tgt': armor_tgt
,
1819 'armor_subkey': armor_subkey
,
1820 'auth_data': auth_data
,
1821 'authenticator_subkey': authenticator_subkey
,
1822 'kdc_options': kdc_options
,
1823 'outer_req': outer_req
1825 if expected_cname_private
is not None:
1826 kdc_exchange_dict
['expected_cname_private'] = (
1827 expected_cname_private
)
1829 if callback_dict
is None:
1832 return kdc_exchange_dict
1834 def generic_check_kdc_rep(self
,
1839 expected_crealm
= kdc_exchange_dict
['expected_crealm']
1840 expected_cname
= kdc_exchange_dict
['expected_cname']
1841 expected_srealm
= kdc_exchange_dict
['expected_srealm']
1842 expected_sname
= kdc_exchange_dict
['expected_sname']
1843 ticket_decryption_key
= kdc_exchange_dict
['ticket_decryption_key']
1844 check_padata_fn
= kdc_exchange_dict
['check_padata_fn']
1845 check_kdc_private_fn
= kdc_exchange_dict
['check_kdc_private_fn']
1846 rep_encpart_asn1Spec
= kdc_exchange_dict
['rep_encpart_asn1Spec']
1847 msg_type
= kdc_exchange_dict
['rep_msg_type']
1848 armor_key
= kdc_exchange_dict
['armor_key']
1850 self
.assertElementEqual(rep
, 'msg-type', msg_type
) # AS-REP | TGS-REP
1851 padata
= self
.getElementValue(rep
, 'padata')
1852 if self
.strict_checking
:
1853 self
.assertElementEqualUTF8(rep
, 'crealm', expected_crealm
)
1854 self
.assertElementEqualPrincipal(rep
, 'cname', expected_cname
)
1855 self
.assertElementPresent(rep
, 'ticket')
1856 ticket
= self
.getElementValue(rep
, 'ticket')
1857 ticket_encpart
= None
1858 ticket_cipher
= None
1859 self
.assertIsNotNone(ticket
)
1860 if ticket
is not None: # Never None, but gives indentation
1861 self
.assertElementEqual(ticket
, 'tkt-vno', 5)
1862 self
.assertElementEqualUTF8(ticket
, 'realm', expected_srealm
)
1863 self
.assertElementEqualPrincipal(ticket
, 'sname', expected_sname
)
1864 self
.assertElementPresent(ticket
, 'enc-part')
1865 ticket_encpart
= self
.getElementValue(ticket
, 'enc-part')
1866 self
.assertIsNotNone(ticket_encpart
)
1867 if ticket_encpart
is not None: # Never None, but gives indentation
1868 self
.assertElementPresent(ticket_encpart
, 'etype')
1869 # 'unspecified' means present, with any value != 0
1870 self
.assertElementKVNO(ticket_encpart
, 'kvno',
1871 self
.unspecified_kvno
)
1872 self
.assertElementPresent(ticket_encpart
, 'cipher')
1873 ticket_cipher
= self
.getElementValue(ticket_encpart
, 'cipher')
1874 self
.assertElementPresent(rep
, 'enc-part')
1875 encpart
= self
.getElementValue(rep
, 'enc-part')
1876 encpart_cipher
= None
1877 self
.assertIsNotNone(encpart
)
1878 if encpart
is not None: # Never None, but gives indentation
1879 self
.assertElementPresent(encpart
, 'etype')
1880 self
.assertElementKVNO(ticket_encpart
, 'kvno', 'autodetect')
1881 self
.assertElementPresent(encpart
, 'cipher')
1882 encpart_cipher
= self
.getElementValue(encpart
, 'cipher')
1884 ticket_checksum
= None
1886 encpart_decryption_key
= None
1887 self
.assertIsNotNone(check_padata_fn
)
1888 if check_padata_fn
is not None:
1889 # See if we can get the decryption key from the preauth phase
1890 encpart_decryption_key
, encpart_decryption_usage
= (
1891 check_padata_fn(kdc_exchange_dict
, callback_dict
,
1894 if armor_key
is not None:
1895 pa_dict
= self
.get_pa_dict(padata
)
1897 if PADATA_FX_FAST
in pa_dict
:
1898 fx_fast_data
= pa_dict
[PADATA_FX_FAST
]
1899 fast_response
= self
.check_fx_fast_data(kdc_exchange_dict
,
1904 if 'strengthen-key' in fast_response
:
1905 strengthen_key
= self
.EncryptionKey_import(
1906 fast_response
['strengthen-key'])
1907 encpart_decryption_key
= (
1908 self
.generate_strengthen_reply_key(
1910 encpart_decryption_key
))
1912 fast_finished
= fast_response
.get('finished', None)
1913 if fast_finished
is not None:
1914 ticket_checksum
= fast_finished
['ticket-checksum']
1916 self
.check_rep_padata(kdc_exchange_dict
,
1919 fast_response
['padata'])
1921 ticket_private
= None
1922 self
.assertIsNotNone(ticket_decryption_key
)
1923 if ticket_decryption_key
is not None:
1924 self
.assertElementEqual(ticket_encpart
, 'etype',
1925 ticket_decryption_key
.etype
)
1926 self
.assertElementKVNO(ticket_encpart
, 'kvno',
1927 ticket_decryption_key
.kvno
)
1928 ticket_decpart
= ticket_decryption_key
.decrypt(KU_TICKET
,
1930 ticket_private
= self
.der_decode(
1932 asn1Spec
=krb5_asn1
.EncTicketPart())
1934 encpart_private
= None
1935 self
.assertIsNotNone(encpart_decryption_key
)
1936 if encpart_decryption_key
is not None:
1937 self
.assertElementEqual(encpart
, 'etype',
1938 encpart_decryption_key
.etype
)
1939 if self
.strict_checking
:
1940 self
.assertElementKVNO(encpart
, 'kvno',
1941 encpart_decryption_key
.kvno
)
1942 rep_decpart
= encpart_decryption_key
.decrypt(
1943 encpart_decryption_usage
,
1945 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
1946 # application tag 26
1948 encpart_private
= self
.der_decode(
1950 asn1Spec
=rep_encpart_asn1Spec())
1952 encpart_private
= self
.der_decode(
1954 asn1Spec
=krb5_asn1
.EncTGSRepPart())
1956 self
.assertIsNotNone(check_kdc_private_fn
)
1957 if check_kdc_private_fn
is not None:
1958 check_kdc_private_fn(kdc_exchange_dict
, callback_dict
,
1959 rep
, ticket_private
, encpart_private
,
1964 def check_fx_fast_data(self
,
1969 expect_strengthen_key
=True):
1970 fx_fast_data
= self
.der_decode(fx_fast_data
,
1971 asn1Spec
=krb5_asn1
.PA_FX_FAST_REPLY())
1973 enc_fast_rep
= fx_fast_data
['armored-data']['enc-fast-rep']
1974 self
.assertEqual(enc_fast_rep
['etype'], armor_key
.etype
)
1976 fast_rep
= armor_key
.decrypt(KU_FAST_REP
, enc_fast_rep
['cipher'])
1978 fast_response
= self
.der_decode(fast_rep
,
1979 asn1Spec
=krb5_asn1
.KrbFastResponse())
1981 if expect_strengthen_key
and self
.strict_checking
:
1982 self
.assertIn('strengthen-key', fast_response
)
1985 self
.assertIn('finished', fast_response
)
1987 # Ensure that the nonce matches the nonce in the body of the request
1989 nonce
= kdc_exchange_dict
['nonce']
1990 self
.assertEqual(nonce
, fast_response
['nonce'])
1992 return fast_response
1994 def generic_check_kdc_private(self
,
2001 kdc_options
= kdc_exchange_dict
['kdc_options']
2002 canon_pos
= len(tuple(krb5_asn1
.KDCOptions('canonicalize'))) - 1
2003 canonicalize
= (canon_pos
< len(kdc_options
)
2004 and kdc_options
[canon_pos
] == '1')
2006 expected_crealm
= kdc_exchange_dict
['expected_crealm']
2007 expected_srealm
= kdc_exchange_dict
['expected_srealm']
2008 expected_sname
= kdc_exchange_dict
['expected_sname']
2009 ticket_decryption_key
= kdc_exchange_dict
['ticket_decryption_key']
2012 expected_cname
= kdc_exchange_dict
['expected_cname_private']
2014 expected_cname
= kdc_exchange_dict
['expected_cname']
2016 ticket
= self
.getElementValue(rep
, 'ticket')
2018 if ticket_checksum
is not None:
2019 armor_key
= kdc_exchange_dict
['armor_key']
2020 self
.verify_ticket_checksum(ticket
, ticket_checksum
, armor_key
)
2022 ticket_session_key
= None
2023 if ticket_private
is not None:
2024 self
.assertElementPresent(ticket_private
, 'flags')
2025 self
.assertElementPresent(ticket_private
, 'key')
2026 ticket_key
= self
.getElementValue(ticket_private
, 'key')
2027 self
.assertIsNotNone(ticket_key
)
2028 if ticket_key
is not None: # Never None, but gives indentation
2029 self
.assertElementPresent(ticket_key
, 'keytype')
2030 self
.assertElementPresent(ticket_key
, 'keyvalue')
2031 ticket_session_key
= self
.EncryptionKey_import(ticket_key
)
2032 self
.assertElementEqualUTF8(ticket_private
, 'crealm',
2034 self
.assertElementEqualPrincipal(ticket_private
, 'cname',
2036 self
.assertElementPresent(ticket_private
, 'transited')
2037 self
.assertElementPresent(ticket_private
, 'authtime')
2038 if self
.strict_checking
:
2039 self
.assertElementPresent(ticket_private
, 'starttime')
2040 self
.assertElementPresent(ticket_private
, 'endtime')
2041 # TODO self.assertElementPresent(ticket_private, 'renew-till')
2042 # TODO self.assertElementMissing(ticket_private, 'caddr')
2043 self
.assertElementPresent(ticket_private
, 'authorization-data')
2045 encpart_session_key
= None
2046 if encpart_private
is not None:
2047 self
.assertElementPresent(encpart_private
, 'key')
2048 encpart_key
= self
.getElementValue(encpart_private
, 'key')
2049 self
.assertIsNotNone(encpart_key
)
2050 if encpart_key
is not None: # Never None, but gives indentation
2051 self
.assertElementPresent(encpart_key
, 'keytype')
2052 self
.assertElementPresent(encpart_key
, 'keyvalue')
2053 encpart_session_key
= self
.EncryptionKey_import(encpart_key
)
2054 self
.assertElementPresent(encpart_private
, 'last-req')
2055 self
.assertElementEqual(encpart_private
, 'nonce',
2056 kdc_exchange_dict
['nonce'])
2057 # TODO self.assertElementPresent(encpart_private,
2059 self
.assertElementPresent(encpart_private
, 'flags')
2060 self
.assertElementPresent(encpart_private
, 'authtime')
2061 if self
.strict_checking
:
2062 self
.assertElementPresent(encpart_private
, 'starttime')
2063 self
.assertElementPresent(encpart_private
, 'endtime')
2064 # TODO self.assertElementPresent(encpart_private, 'renew-till')
2065 self
.assertElementEqualUTF8(encpart_private
, 'srealm',
2067 self
.assertElementEqualPrincipal(encpart_private
, 'sname',
2069 # TODO self.assertElementMissing(encpart_private, 'caddr')
2071 sent_claims
= self
.sent_claims(kdc_exchange_dict
)
2073 if self
.strict_checking
:
2074 if sent_claims
or canonicalize
:
2075 self
.assertElementPresent(encpart_private
,
2076 'encrypted-pa-data')
2077 enc_pa_dict
= self
.get_pa_dict(
2078 encpart_private
['encrypted-pa-data'])
2080 self
.assertIn(PADATA_SUPPORTED_ETYPES
, enc_pa_dict
)
2082 (supported_etypes
,) = struct
.unpack(
2084 enc_pa_dict
[PADATA_SUPPORTED_ETYPES
])
2087 security
.KERB_ENCTYPE_FAST_SUPPORTED
2090 security
.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED
2093 security
.KERB_ENCTYPE_CLAIMS_SUPPORTED
2096 self
.assertNotIn(PADATA_SUPPORTED_ETYPES
, enc_pa_dict
)
2098 # ClaimsCompIdFASTSupported registry key
2100 self
.assertIn(PADATA_PAC_OPTIONS
, enc_pa_dict
)
2102 self
.check_pac_options_claims_support(
2103 enc_pa_dict
[PADATA_PAC_OPTIONS
])
2105 self
.assertNotIn(PADATA_PAC_OPTIONS
, enc_pa_dict
)
2107 self
.assertElementEqual(encpart_private
,
2108 'encrypted-pa-data',
2111 if ticket_session_key
is not None and encpart_session_key
is not None:
2112 self
.assertEqual(ticket_session_key
.etype
,
2113 encpart_session_key
.etype
)
2114 self
.assertEqual(ticket_session_key
.key
.contents
,
2115 encpart_session_key
.key
.contents
)
2116 if encpart_session_key
is not None:
2117 session_key
= encpart_session_key
2119 session_key
= ticket_session_key
2120 ticket_creds
= KerberosTicketCreds(
2123 crealm
=expected_crealm
,
2124 cname
=expected_cname
,
2125 srealm
=expected_srealm
,
2126 sname
=expected_sname
,
2127 decryption_key
=ticket_decryption_key
,
2128 ticket_private
=ticket_private
,
2129 encpart_private
=encpart_private
)
2131 kdc_exchange_dict
['rep_ticket_creds'] = ticket_creds
2133 def check_pac_options_claims_support(self
, pac_options
):
2134 pac_options
= self
.der_decode(pac_options
,
2135 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
2136 self
.assertEqual('1', pac_options
['options'][0]) # claims bit
2138 def generic_check_kdc_error(self
,
2143 expected_crealm
= kdc_exchange_dict
['expected_crealm']
2144 expected_cname
= kdc_exchange_dict
['expected_cname']
2145 expected_srealm
= kdc_exchange_dict
['expected_srealm']
2146 expected_sname
= kdc_exchange_dict
['expected_sname']
2147 expected_error_mode
= kdc_exchange_dict
['expected_error_mode']
2149 self
.assertElementEqual(rep
, 'pvno', 5)
2150 self
.assertElementEqual(rep
, 'msg-type', KRB_ERROR
)
2151 self
.assertElementEqual(rep
, 'error-code', expected_error_mode
)
2152 if self
.strict_checking
:
2153 self
.assertElementMissing(rep
, 'ctime')
2154 self
.assertElementMissing(rep
, 'cusec')
2155 self
.assertElementPresent(rep
, 'stime')
2156 self
.assertElementPresent(rep
, 'susec')
2157 # error-code checked above
2158 if self
.strict_checking
:
2159 self
.assertElementMissing(rep
, 'crealm')
2160 self
.assertElementMissing(rep
, 'cname')
2161 self
.assertElementEqualUTF8(rep
, 'realm', expected_srealm
)
2162 self
.assertElementEqualPrincipal(rep
, 'sname', expected_sname
)
2163 self
.assertElementMissing(rep
, 'e-text')
2164 if expected_error_mode
== KDC_ERR_GENERIC
:
2165 self
.assertElementMissing(rep
, 'e-data')
2167 edata
= self
.getElementValue(rep
, 'e-data')
2168 if self
.strict_checking
:
2169 self
.assertIsNotNone(edata
)
2170 if edata
is not None:
2171 rep_padata
= self
.der_decode(edata
,
2172 asn1Spec
=krb5_asn1
.METHOD_DATA())
2173 self
.assertGreater(len(rep_padata
), 0)
2177 etype_info2
= self
.check_rep_padata(kdc_exchange_dict
,
2182 kdc_exchange_dict
['preauth_etype_info2'] = etype_info2
2186 def check_rep_padata(self
,
2191 expected_error_mode
= kdc_exchange_dict
['expected_error_mode']
2192 req_body
= kdc_exchange_dict
['req_body']
2193 proposed_etypes
= req_body
['etype']
2194 client_as_etypes
= kdc_exchange_dict
.get('client_as_etypes', [])
2196 expect_etype_info2
= ()
2197 expect_etype_info
= False
2198 unexpect_etype_info
= True
2199 expected_aes_type
= 0
2200 expected_rc4_type
= 0
2201 if kcrypto
.Enctype
.RC4
in proposed_etypes
:
2202 expect_etype_info
= True
2203 for etype
in proposed_etypes
:
2204 if etype
in (kcrypto
.Enctype
.AES256
, kcrypto
.Enctype
.AES128
):
2205 expect_etype_info
= False
2206 if etype
not in client_as_etypes
:
2208 if etype
in (kcrypto
.Enctype
.AES256
, kcrypto
.Enctype
.AES128
):
2209 if etype
> expected_aes_type
:
2210 expected_aes_type
= etype
2211 if etype
in (kcrypto
.Enctype
.RC4
,) and expected_error_mode
!= 0:
2212 unexpect_etype_info
= False
2213 if etype
> expected_rc4_type
:
2214 expected_rc4_type
= etype
2216 if expected_aes_type
!= 0:
2217 expect_etype_info2
+= (expected_aes_type
,)
2218 if expected_rc4_type
!= 0:
2219 expect_etype_info2
+= (expected_rc4_type
,)
2221 expected_patypes
= ()
2222 if expect_etype_info
:
2223 self
.assertGreater(len(expect_etype_info2
), 0)
2224 expected_patypes
+= (PADATA_ETYPE_INFO
,)
2225 if len(expect_etype_info2
) != 0:
2226 expected_patypes
+= (PADATA_ETYPE_INFO2
,)
2228 expected_patypes
+= (PADATA_ENC_TIMESTAMP
,)
2229 expected_patypes
+= (PADATA_PK_AS_REQ
,)
2230 expected_patypes
+= (PADATA_PK_AS_REP_19
,)
2232 if self
.strict_checking
:
2233 for i
, patype
in enumerate(expected_patypes
):
2234 self
.assertElementEqual(rep_padata
[i
], 'padata-type', patype
)
2235 self
.assertEqual(len(rep_padata
), len(expected_patypes
))
2239 enc_timestamp
= None
2242 for pa
in rep_padata
:
2243 patype
= self
.getElementValue(pa
, 'padata-type')
2244 pavalue
= self
.getElementValue(pa
, 'padata-value')
2245 if patype
== PADATA_ETYPE_INFO2
:
2246 self
.assertIsNone(etype_info2
)
2247 etype_info2
= self
.der_decode(pavalue
,
2248 asn1Spec
=krb5_asn1
.ETYPE_INFO2())
2250 if patype
== PADATA_ETYPE_INFO
:
2251 self
.assertIsNone(etype_info
)
2252 etype_info
= self
.der_decode(pavalue
,
2253 asn1Spec
=krb5_asn1
.ETYPE_INFO())
2255 if patype
== PADATA_ENC_TIMESTAMP
:
2256 self
.assertIsNone(enc_timestamp
)
2257 enc_timestamp
= pavalue
2258 self
.assertEqual(len(enc_timestamp
), 0)
2260 if patype
== PADATA_PK_AS_REQ
:
2261 self
.assertIsNone(pk_as_req
)
2263 self
.assertEqual(len(pk_as_req
), 0)
2265 if patype
== PADATA_PK_AS_REP_19
:
2266 self
.assertIsNone(pk_as_rep19
)
2267 pk_as_rep19
= pavalue
2268 self
.assertEqual(len(pk_as_rep19
), 0)
2271 if all(etype
not in client_as_etypes
or etype
not in proposed_etypes
2272 for etype
in (kcrypto
.Enctype
.AES256
,
2273 kcrypto
.Enctype
.AES128
,
2274 kcrypto
.Enctype
.RC4
)):
2275 self
.assertIsNone(etype_info2
)
2276 self
.assertIsNone(etype_info
)
2277 if self
.strict_checking
:
2278 self
.assertIsNotNone(enc_timestamp
)
2279 self
.assertIsNotNone(pk_as_req
)
2280 self
.assertIsNotNone(pk_as_rep19
)
2283 if self
.strict_checking
:
2284 self
.assertIsNotNone(etype_info2
)
2285 if expect_etype_info
:
2286 self
.assertIsNotNone(etype_info
)
2288 if self
.strict_checking
:
2289 self
.assertIsNone(etype_info
)
2290 if unexpect_etype_info
:
2291 self
.assertIsNone(etype_info
)
2293 if self
.strict_checking
:
2294 self
.assertGreaterEqual(len(etype_info2
), 1)
2295 self
.assertEqual(len(etype_info2
), len(expect_etype_info2
))
2296 for i
in range(0, len(etype_info2
)):
2297 e
= self
.getElementValue(etype_info2
[i
], 'etype')
2298 self
.assertEqual(e
, expect_etype_info2
[i
])
2299 salt
= self
.getElementValue(etype_info2
[i
], 'salt')
2300 if e
== kcrypto
.Enctype
.RC4
:
2301 self
.assertIsNone(salt
)
2303 self
.assertIsNotNone(salt
)
2304 expected_salt
= kdc_exchange_dict
['expected_salt']
2305 if expected_salt
is not None:
2306 self
.assertEqual(salt
, expected_salt
)
2307 s2kparams
= self
.getElementValue(etype_info2
[i
], 's2kparams')
2308 if self
.strict_checking
:
2309 self
.assertIsNone(s2kparams
)
2310 if etype_info
is not None:
2311 self
.assertEqual(len(etype_info
), 1)
2312 e
= self
.getElementValue(etype_info
[0], 'etype')
2313 self
.assertEqual(e
, kcrypto
.Enctype
.RC4
)
2314 self
.assertEqual(e
, expect_etype_info2
[0])
2315 salt
= self
.getElementValue(etype_info
[0], 'salt')
2316 if self
.strict_checking
:
2317 self
.assertIsNotNone(salt
)
2318 self
.assertEqual(len(salt
), 0)
2320 self
.assertIsNotNone(enc_timestamp
)
2321 self
.assertIsNotNone(pk_as_req
)
2322 self
.assertIsNotNone(pk_as_rep19
)
2326 def generate_simple_fast(self
,
2334 armor_key
= kdc_exchange_dict
['armor_key']
2336 fast_req
= self
.KRB_FAST_REQ_create(fast_options
,
2339 fast_req
= self
.der_encode(fast_req
,
2340 asn1Spec
=krb5_asn1
.KrbFastReq())
2341 fast_req
= self
.EncryptedData_create(armor_key
,
2345 fast_armored_req
= self
.KRB_FAST_ARMORED_REQ_create(fast_armor
,
2349 fx_fast_request
= self
.PA_FX_FAST_REQUEST_create(fast_armored_req
)
2350 fx_fast_request
= self
.der_encode(
2352 asn1Spec
=krb5_asn1
.PA_FX_FAST_REQUEST())
2354 fast_padata
= self
.PA_DATA_create(PADATA_FX_FAST
,
2359 def generate_ap_req(self
,
2365 tgt
= kdc_exchange_dict
['armor_tgt']
2366 authenticator_subkey
= kdc_exchange_dict
['armor_subkey']
2368 req_body_checksum
= None
2370 tgt
= kdc_exchange_dict
['tgt']
2371 authenticator_subkey
= kdc_exchange_dict
['authenticator_subkey']
2372 body_checksum_type
= kdc_exchange_dict
['body_checksum_type']
2374 req_body_blob
= self
.der_encode(req_body
,
2375 asn1Spec
=krb5_asn1
.KDC_REQ_BODY())
2377 req_body_checksum
= self
.Checksum_create(tgt
.session_key
,
2378 KU_TGS_REQ_AUTH_CKSUM
,
2380 ctype
=body_checksum_type
)
2382 auth_data
= kdc_exchange_dict
['auth_data']
2385 if authenticator_subkey
is not None:
2386 subkey_obj
= authenticator_subkey
.export_obj()
2387 seq_number
= random
.randint(0, 0xfffffffe)
2388 (ctime
, cusec
) = self
.get_KerberosTimeWithUsec()
2389 authenticator_obj
= self
.Authenticator_create(
2392 cksum
=req_body_checksum
,
2396 seq_number
=seq_number
,
2397 authorization_data
=auth_data
)
2398 authenticator_blob
= self
.der_encode(
2400 asn1Spec
=krb5_asn1
.Authenticator())
2402 usage
= KU_AP_REQ_AUTH
if armor
else KU_TGS_REQ_AUTH
2403 authenticator
= self
.EncryptedData_create(tgt
.session_key
,
2407 ap_options
= krb5_asn1
.APOptions('0')
2408 ap_req_obj
= self
.AP_REQ_create(ap_options
=str(ap_options
),
2410 authenticator
=authenticator
)
2411 ap_req
= self
.der_encode(ap_req_obj
, asn1Spec
=krb5_asn1
.AP_REQ())
2415 def generate_simple_tgs_padata(self
,
2419 ap_req
= self
.generate_ap_req(kdc_exchange_dict
,
2423 pa_tgs_req
= self
.PA_DATA_create(PADATA_KDC_REQ
, ap_req
)
2424 padata
= [pa_tgs_req
]
2426 return padata
, req_body
2428 def check_simple_tgs_padata(self
,
2433 tgt
= kdc_exchange_dict
['tgt']
2434 authenticator_subkey
= kdc_exchange_dict
['authenticator_subkey']
2435 if authenticator_subkey
is not None:
2436 subkey
= authenticator_subkey
2437 subkey_usage
= KU_TGS_REP_ENC_PART_SUB_KEY
2439 subkey
= tgt
.session_key
2440 subkey_usage
= KU_TGS_REP_ENC_PART_SESSION
2442 return subkey
, subkey_usage
2444 def generate_armor_key(self
, subkey
, session_key
):
2445 armor_key
= kcrypto
.cf2(subkey
.key
,
2449 armor_key
= Krb5EncryptionKey(armor_key
, None)
2453 def generate_strengthen_reply_key(self
, strengthen_key
, reply_key
):
2454 strengthen_reply_key
= kcrypto
.cf2(strengthen_key
.key
,
2458 strengthen_reply_key
= Krb5EncryptionKey(strengthen_reply_key
,
2461 return strengthen_reply_key
2463 def generate_client_challenge_key(self
, armor_key
, longterm_key
):
2464 client_challenge_key
= kcrypto
.cf2(armor_key
.key
,
2466 b
'clientchallengearmor',
2467 b
'challengelongterm')
2468 client_challenge_key
= Krb5EncryptionKey(client_challenge_key
, None)
2470 return client_challenge_key
2472 def generate_kdc_challenge_key(self
, armor_key
, longterm_key
):
2473 kdc_challenge_key
= kcrypto
.cf2(armor_key
.key
,
2475 b
'kdcchallengearmor',
2476 b
'challengelongterm')
2477 kdc_challenge_key
= Krb5EncryptionKey(kdc_challenge_key
, None)
2479 return kdc_challenge_key
2481 def verify_ticket_checksum(self
, ticket
, expected_checksum
, armor_key
):
2482 expected_type
= expected_checksum
['cksumtype']
2483 self
.assertEqual(armor_key
.ctype
, expected_type
)
2485 ticket_blob
= self
.der_encode(ticket
,
2486 asn1Spec
=krb5_asn1
.Ticket())
2487 checksum
= self
.Checksum_create(armor_key
,
2490 self
.assertEqual(expected_checksum
, checksum
)
2492 def get_outer_pa_dict(self
, kdc_exchange_dict
):
2493 return self
.get_pa_dict(kdc_exchange_dict
['req_padata'])
2495 def get_fast_pa_dict(self
, kdc_exchange_dict
):
2496 req_pa_dict
= self
.get_pa_dict(kdc_exchange_dict
['fast_padata'])
2501 return self
.get_outer_pa_dict(kdc_exchange_dict
)
2503 def sent_fast(self
, kdc_exchange_dict
):
2504 outer_pa_dict
= self
.get_outer_pa_dict(kdc_exchange_dict
)
2506 return PADATA_FX_FAST
in outer_pa_dict
2508 def sent_enc_challenge(self
, kdc_exchange_dict
):
2509 fast_pa_dict
= self
.get_fast_pa_dict(kdc_exchange_dict
)
2511 return PADATA_ENCRYPTED_CHALLENGE
in fast_pa_dict
2513 def sent_claims(self
, kdc_exchange_dict
):
2514 fast_pa_dict
= self
.get_fast_pa_dict(kdc_exchange_dict
)
2516 if PADATA_PAC_OPTIONS
not in fast_pa_dict
:
2519 pac_options
= self
.der_decode(fast_pa_dict
[PADATA_PAC_OPTIONS
],
2520 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
2521 pac_options
= pac_options
['options']
2522 claims_pos
= len(tuple(krb5_asn1
.PACOptionFlags('claims'))) - 1
2524 return (claims_pos
< len(pac_options
)
2525 and pac_options
[claims_pos
] == '1')
2527 def _test_as_exchange(self
,
2533 expected_error_mode
,
2543 ticket_decryption_key
=None):
2545 def _generate_padata_copy(_kdc_exchange_dict
,
2548 return padata
, req_body
2550 def _check_padata_preauth_key(_kdc_exchange_dict
,
2554 as_rep_usage
= KU_AS_REP_ENC_PART
2555 return preauth_key
, as_rep_usage
2557 if expected_error_mode
== 0:
2558 check_error_fn
= None
2559 check_rep_fn
= self
.generic_check_kdc_rep
2561 check_error_fn
= self
.generic_check_kdc_error
2564 if padata
is not None:
2565 generate_padata_fn
= _generate_padata_copy
2567 generate_padata_fn
= None
2569 kdc_exchange_dict
= self
.as_exchange_dict(
2570 expected_crealm
=expected_crealm
,
2571 expected_cname
=expected_cname
,
2572 expected_srealm
=expected_srealm
,
2573 expected_sname
=expected_sname
,
2574 ticket_decryption_key
=ticket_decryption_key
,
2575 generate_padata_fn
=generate_padata_fn
,
2576 check_error_fn
=check_error_fn
,
2577 check_rep_fn
=check_rep_fn
,
2578 check_padata_fn
=_check_padata_preauth_key
,
2579 check_kdc_private_fn
=self
.generic_check_kdc_private
,
2580 expected_error_mode
=expected_error_mode
,
2581 client_as_etypes
=client_as_etypes
,
2582 expected_salt
=expected_salt
,
2583 kdc_options
=str(kdc_options
))
2585 rep
= self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
2592 return rep
, kdc_exchange_dict