1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
29 from pyasn1
.codec
.der
.decoder
import decode
as pyasn1_der_decode
30 from pyasn1
.codec
.der
.encoder
import encode
as pyasn1_der_encode
31 from pyasn1
.codec
.native
.decoder
import decode
as pyasn1_native_decode
32 from pyasn1
.codec
.native
.encoder
import encode
as pyasn1_native_encode
34 from pyasn1
.codec
.ber
.encoder
import BitStringEncoder
36 from samba
.credentials
import Credentials
37 from samba
.dcerpc
import krb5pac
, security
38 from samba
.gensec
import FEATURE_SEAL
39 from samba
.ndr
import ndr_pack
, ndr_unpack
42 from samba
.tests
import TestCaseInTempDir
44 import samba
.tests
.krb5
.rfc4120_pyasn1
as krb5_asn1
45 from samba
.tests
.krb5
.rfc4120_constants
import (
48 FX_FAST_ARMOR_AP_REQUEST
,
50 KDC_ERR_PREAUTH_FAILED
,
51 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
,
52 KERB_ERR_TYPE_EXTENDED
,
66 KU_NON_KERB_CKSUM_SALT
,
67 KU_TGS_REP_ENC_PART_SESSION
,
68 KU_TGS_REP_ENC_PART_SUB_KEY
,
70 KU_TGS_REQ_AUTH_CKSUM
,
71 KU_TGS_REQ_AUTH_DAT_SESSION
,
72 KU_TGS_REQ_AUTH_DAT_SUBKEY
,
76 PADATA_ENCRYPTED_CHALLENGE
,
89 PADATA_SUPPORTED_ETYPES
91 import samba
.tests
.krb5
.kcrypto
as kcrypto
94 def BitStringEncoder_encodeValue32(
95 self
, value
, asn1Spec
, encodeFun
, **options
):
97 # BitStrings like KDCOptions or TicketFlags should at least
98 # be 32-Bit on the wire
100 if asn1Spec
is not None:
101 # TODO: try to avoid ASN.1 schema instantiation
102 value
= asn1Spec
.clone(value
)
104 valueLength
= len(value
)
106 alignedValue
= value
<< (8 - valueLength
% 8)
110 substrate
= alignedValue
.asOctets()
111 length
= len(substrate
)
112 # We need at least 32-Bit / 4-Bytes
117 ret
= b
'\x00' + substrate
+ (b
'\x00' * padding
)
118 return ret
, False, True
121 BitStringEncoder
.encodeValue
= BitStringEncoder_encodeValue32
124 def BitString_NamedValues_prettyPrint(self
, scope
=0):
125 ret
= "%s" % self
.asBinary()
128 for byte
in self
.asNumbers():
129 for bit
in [7, 6, 5, 4, 3, 2, 1, 0]:
136 if len(bits
) < highest_bit
:
137 for bitPosition
in range(len(bits
), highest_bit
):
140 delim
= ": (\n%s " % indent
141 for bitPosition
in range(highest_bit
):
142 if bitPosition
in self
.prettyPrintNamedValues
:
143 name
= self
.prettyPrintNamedValues
[bitPosition
]
144 elif bits
[bitPosition
] != 0:
145 name
= "unknown-bit-%u" % bitPosition
148 ret
+= "%s%s:%u" % (delim
, name
, bits
[bitPosition
])
149 delim
= ",\n%s " % indent
150 ret
+= "\n%s)" % indent
154 krb5_asn1
.TicketFlags
.prettyPrintNamedValues
=\
155 krb5_asn1
.TicketFlagsValues
.namedValues
156 krb5_asn1
.TicketFlags
.namedValues
=\
157 krb5_asn1
.TicketFlagsValues
.namedValues
158 krb5_asn1
.TicketFlags
.prettyPrint
=\
159 BitString_NamedValues_prettyPrint
160 krb5_asn1
.KDCOptions
.prettyPrintNamedValues
=\
161 krb5_asn1
.KDCOptionsValues
.namedValues
162 krb5_asn1
.KDCOptions
.namedValues
=\
163 krb5_asn1
.KDCOptionsValues
.namedValues
164 krb5_asn1
.KDCOptions
.prettyPrint
=\
165 BitString_NamedValues_prettyPrint
166 krb5_asn1
.APOptions
.prettyPrintNamedValues
=\
167 krb5_asn1
.APOptionsValues
.namedValues
168 krb5_asn1
.APOptions
.namedValues
=\
169 krb5_asn1
.APOptionsValues
.namedValues
170 krb5_asn1
.APOptions
.prettyPrint
=\
171 BitString_NamedValues_prettyPrint
172 krb5_asn1
.PACOptionFlags
.prettyPrintNamedValues
=\
173 krb5_asn1
.PACOptionFlagsValues
.namedValues
174 krb5_asn1
.PACOptionFlags
.namedValues
=\
175 krb5_asn1
.PACOptionFlagsValues
.namedValues
176 krb5_asn1
.PACOptionFlags
.prettyPrint
=\
177 BitString_NamedValues_prettyPrint
180 def Integer_NamedValues_prettyPrint(self
, scope
=0):
182 if intval
in self
.prettyPrintNamedValues
:
183 name
= self
.prettyPrintNamedValues
[intval
]
185 name
= "<__unknown__>"
186 ret
= "%d (0x%x) %s" % (intval
, intval
, name
)
190 krb5_asn1
.NameType
.prettyPrintNamedValues
=\
191 krb5_asn1
.NameTypeValues
.namedValues
192 krb5_asn1
.NameType
.prettyPrint
=\
193 Integer_NamedValues_prettyPrint
194 krb5_asn1
.AuthDataType
.prettyPrintNamedValues
=\
195 krb5_asn1
.AuthDataTypeValues
.namedValues
196 krb5_asn1
.AuthDataType
.prettyPrint
=\
197 Integer_NamedValues_prettyPrint
198 krb5_asn1
.PADataType
.prettyPrintNamedValues
=\
199 krb5_asn1
.PADataTypeValues
.namedValues
200 krb5_asn1
.PADataType
.prettyPrint
=\
201 Integer_NamedValues_prettyPrint
202 krb5_asn1
.EncryptionType
.prettyPrintNamedValues
=\
203 krb5_asn1
.EncryptionTypeValues
.namedValues
204 krb5_asn1
.EncryptionType
.prettyPrint
=\
205 Integer_NamedValues_prettyPrint
206 krb5_asn1
.ChecksumType
.prettyPrintNamedValues
=\
207 krb5_asn1
.ChecksumTypeValues
.namedValues
208 krb5_asn1
.ChecksumType
.prettyPrint
=\
209 Integer_NamedValues_prettyPrint
210 krb5_asn1
.KerbErrorDataType
.prettyPrintNamedValues
=\
211 krb5_asn1
.KerbErrorDataTypeValues
.namedValues
212 krb5_asn1
.KerbErrorDataType
.prettyPrint
=\
213 Integer_NamedValues_prettyPrint
216 class Krb5EncryptionKey
:
217 def __init__(self
, key
, kvno
):
219 kcrypto
.Enctype
.AES256
: kcrypto
.Cksumtype
.SHA1_AES256
,
220 kcrypto
.Enctype
.AES128
: kcrypto
.Cksumtype
.SHA1_AES128
,
221 kcrypto
.Enctype
.RC4
: kcrypto
.Cksumtype
.HMAC_MD5
,
224 self
.etype
= key
.enctype
225 self
.ctype
= EncTypeChecksum
[self
.etype
]
228 def encrypt(self
, usage
, plaintext
):
229 ciphertext
= kcrypto
.encrypt(self
.key
, usage
, plaintext
)
232 def decrypt(self
, usage
, ciphertext
):
233 plaintext
= kcrypto
.decrypt(self
.key
, usage
, ciphertext
)
236 def make_zeroed_checksum(self
, ctype
=None):
240 checksum_len
= kcrypto
.checksum_len(ctype
)
241 return bytes(checksum_len
)
243 def make_checksum(self
, usage
, plaintext
, ctype
=None):
246 cksum
= kcrypto
.make_checksum(ctype
, self
.key
, usage
, plaintext
)
249 def verify_checksum(self
, usage
, plaintext
, ctype
, cksum
):
250 if self
.ctype
!= ctype
:
251 raise AssertionError(f
'{self.ctype} != {ctype}')
253 kcrypto
.verify_checksum(ctype
,
259 def export_obj(self
):
260 EncryptionKey_obj
= {
261 'keytype': self
.etype
,
262 'keyvalue': self
.key
.contents
,
264 return EncryptionKey_obj
267 class RodcPacEncryptionKey(Krb5EncryptionKey
):
268 def __init__(self
, key
, kvno
, rodc_id
=None):
269 super().__init
__(key
, kvno
)
275 kvno
&= (1 << 16) - 1
277 rodc_id
= kvno
or None
279 if rodc_id
is not None:
280 self
.rodc_id
= rodc_id
.to_bytes(2, byteorder
='little')
284 def make_rodc_zeroed_checksum(self
, ctype
=None):
285 checksum
= super().make_zeroed_checksum(ctype
)
286 return checksum
+ bytes(len(self
.rodc_id
))
288 def make_rodc_checksum(self
, usage
, plaintext
, ctype
=None):
289 checksum
= super().make_checksum(usage
, plaintext
, ctype
)
290 return checksum
+ self
.rodc_id
292 def verify_rodc_checksum(self
, usage
, plaintext
, ctype
, cksum
):
294 cksum
, cksum_rodc_id
= cksum
[:-2], cksum
[-2:]
296 if self
.rodc_id
!= cksum_rodc_id
:
297 raise AssertionError(f
'{self.rodc_id.hex()} != '
298 f
'{cksum_rodc_id.hex()}')
300 super().verify_checksum(usage
,
306 class ZeroedChecksumKey(RodcPacEncryptionKey
):
307 def make_checksum(self
, usage
, plaintext
, ctype
=None):
308 return self
.make_zeroed_checksum(ctype
)
310 def make_rodc_checksum(self
, usage
, plaintext
, ctype
=None):
311 return self
.make_rodc_zeroed_checksum(ctype
)
314 class WrongLengthChecksumKey(RodcPacEncryptionKey
):
315 def __init__(self
, key
, kvno
, length
):
316 super().__init
__(key
, kvno
)
318 self
._length
= length
321 def _adjust_to_length(cls
, checksum
, length
):
322 diff
= length
- len(checksum
)
324 checksum
+= bytes(diff
)
326 checksum
= checksum
[:length
]
330 def make_zeroed_checksum(self
, ctype
=None):
331 return bytes(self
._length
)
333 def make_checksum(self
, usage
, plaintext
, ctype
=None):
334 checksum
= super().make_checksum(usage
, plaintext
, ctype
)
335 return self
._adjust
_to
_length
(checksum
, self
._length
)
337 def make_rodc_zeroed_checksum(self
, ctype
=None):
338 return bytes(self
._length
)
340 def make_rodc_checksum(self
, usage
, plaintext
, ctype
=None):
341 checksum
= super().make_rodc_checksum(usage
, plaintext
, ctype
)
342 return self
._adjust
_to
_length
(checksum
, self
._length
)
345 class KerberosCredentials(Credentials
):
347 fast_supported_bits
= (security
.KERB_ENCTYPE_FAST_SUPPORTED |
348 security
.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
349 security
.KERB_ENCTYPE_CLAIMS_SUPPORTED
)
352 super(KerberosCredentials
, self
).__init
__()
354 all_enc_types |
= security
.KERB_ENCTYPE_RC4_HMAC_MD5
355 all_enc_types |
= security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
356 all_enc_types |
= security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
358 self
.as_supported_enctypes
= all_enc_types
359 self
.tgs_supported_enctypes
= all_enc_types
360 self
.ap_supported_enctypes
= all_enc_types
363 self
.forced_keys
= {}
365 self
.forced_salt
= None
370 def set_as_supported_enctypes(self
, value
):
371 self
.as_supported_enctypes
= int(value
)
373 def set_tgs_supported_enctypes(self
, value
):
374 self
.tgs_supported_enctypes
= int(value
)
376 def set_ap_supported_enctypes(self
, value
):
377 self
.ap_supported_enctypes
= int(value
)
379 etype_map
= collections
.OrderedDict([
380 (kcrypto
.Enctype
.AES256
,
381 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
),
382 (kcrypto
.Enctype
.AES128
,
383 security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
),
384 (kcrypto
.Enctype
.RC4
,
385 security
.KERB_ENCTYPE_RC4_HMAC_MD5
),
386 (kcrypto
.Enctype
.DES_MD5
,
387 security
.KERB_ENCTYPE_DES_CBC_MD5
),
388 (kcrypto
.Enctype
.DES_CRC
,
389 security
.KERB_ENCTYPE_DES_CBC_CRC
)
393 def etypes_to_bits(cls
, etypes
):
396 bit
= cls
.etype_map
[etype
]
398 raise ValueError(f
'Got duplicate etype: {etype}')
404 def bits_to_etypes(cls
, bits
):
406 for etype
, bit
in cls
.etype_map
.items():
411 bits
&= ~cls
.fast_supported_bits
413 raise ValueError(f
'Unsupported etype bits: {bits}')
417 def get_as_krb5_etypes(self
):
418 return self
.bits_to_etypes(self
.as_supported_enctypes
)
420 def get_tgs_krb5_etypes(self
):
421 return self
.bits_to_etypes(self
.tgs_supported_enctypes
)
423 def get_ap_krb5_etypes(self
):
424 return self
.bits_to_etypes(self
.ap_supported_enctypes
)
426 def set_kvno(self
, kvno
):
427 # Sign-extend from 32 bits.
435 def set_forced_key(self
, etype
, hexkey
):
437 contents
= binascii
.a2b_hex(hexkey
)
438 key
= kcrypto
.Key(etype
, contents
)
439 self
.forced_keys
[etype
] = RodcPacEncryptionKey(key
, self
.kvno
)
441 def get_forced_key(self
, etype
):
443 return self
.forced_keys
.get(etype
)
445 def set_forced_salt(self
, salt
):
446 self
.forced_salt
= bytes(salt
)
448 def get_forced_salt(self
):
449 return self
.forced_salt
452 if self
.forced_salt
is not None:
453 return self
.forced_salt
455 if self
.get_workstation():
456 salt_string
= '%shost%s.%s' % (
457 self
.get_realm().upper(),
458 self
.get_username().lower().rsplit('$', 1)[0],
459 self
.get_realm().lower())
461 salt_string
= self
.get_realm().upper() + self
.get_username()
463 return salt_string
.encode('utf-8')
465 def set_dn(self
, dn
):
471 def set_spn(self
, spn
):
478 class KerberosTicketCreds
:
479 def __init__(self
, ticket
, session_key
,
480 crealm
=None, cname
=None,
481 srealm
=None, sname
=None,
484 encpart_private
=None):
486 self
.session_key
= session_key
491 self
.decryption_key
= decryption_key
492 self
.ticket_private
= ticket_private
493 self
.encpart_private
= encpart_private
496 class RawKerberosTest(TestCaseInTempDir
):
497 """A raw Kerberos Test case."""
499 pac_checksum_types
= {krb5pac
.PAC_TYPE_SRV_CHECKSUM
,
500 krb5pac
.PAC_TYPE_KDC_CHECKSUM
,
501 krb5pac
.PAC_TYPE_TICKET_CHECKSUM
}
504 {"value": -1111, "name": "dummy", },
505 {"value": kcrypto
.Enctype
.AES256
, "name": "aes128", },
506 {"value": kcrypto
.Enctype
.AES128
, "name": "aes256", },
507 {"value": kcrypto
.Enctype
.RC4
, "name": "rc4", },
510 setup_etype_test_permutations_done
= False
513 def setup_etype_test_permutations(cls
):
514 if cls
.setup_etype_test_permutations_done
:
519 num_idxs
= len(cls
.etypes_to_test
)
521 for num
in range(1, num_idxs
+ 1):
522 chunk
= list(itertools
.permutations(range(num_idxs
), num
))
525 permutations
.append(el
)
527 for p
in permutations
:
531 n
= cls
.etypes_to_test
[idx
]["name"]
536 etypes
+= (cls
.etypes_to_test
[idx
]["value"],)
538 r
= {"name": name
, "etypes": etypes
, }
541 cls
.etype_test_permutations
= res
542 cls
.setup_etype_test_permutations_done
= True
545 def etype_test_permutation_name_idx(cls
):
546 cls
.setup_etype_test_permutations()
549 for e
in cls
.etype_test_permutations
:
555 def etype_test_permutation_by_idx(self
, idx
):
556 e
= self
.etype_test_permutations
[idx
]
557 return (e
['name'], e
['etypes'])
563 cls
.host
= samba
.tests
.env_get_var_value('SERVER')
564 cls
.dc_host
= samba
.tests
.env_get_var_value('DC_SERVER')
566 # A dictionary containing credentials that have already been
570 cls
.kdc_fast_support
= False
574 self
.do_asn1_print
= False
575 self
.do_hexdump
= False
577 strict_checking
= samba
.tests
.env_get_var_value('STRICT_CHECKING',
579 if strict_checking
is None:
580 strict_checking
= '1'
581 self
.strict_checking
= bool(int(strict_checking
))
585 self
.unspecified_kvno
= object()
588 self
._disconnect
("tearDown")
591 def _disconnect(self
, reason
):
597 sys
.stderr
.write("disconnect[%s]\n" % reason
)
599 def _connect_tcp(self
, host
):
602 self
.a
= socket
.getaddrinfo(host
, tcp_port
, socket
.AF_UNSPEC
,
603 socket
.SOCK_STREAM
, socket
.SOL_TCP
,
605 self
.s
= socket
.socket(self
.a
[0][0], self
.a
[0][1], self
.a
[0][2])
606 self
.s
.settimeout(10)
607 self
.s
.connect(self
.a
[0][4])
615 def connect(self
, host
):
616 self
.assertNotConnected()
617 self
._connect
_tcp
(host
)
619 sys
.stderr
.write("connected[%s]\n" % host
)
621 def env_get_var(self
, varname
, prefix
,
622 fallback_default
=True,
623 allow_missing
=False):
625 if prefix
is not None:
626 allow_missing_prefix
= allow_missing
or fallback_default
627 val
= samba
.tests
.env_get_var_value(
628 '%s_%s' % (prefix
, varname
),
629 allow_missing
=allow_missing_prefix
)
631 fallback_default
= True
632 if val
is None and fallback_default
:
633 val
= samba
.tests
.env_get_var_value(varname
,
634 allow_missing
=allow_missing
)
637 def _get_krb5_creds_from_env(self
, prefix
,
638 default_username
=None,
639 allow_missing_password
=False,
640 allow_missing_keys
=True,
641 require_strongest_key
=False):
642 c
= KerberosCredentials()
645 domain
= self
.env_get_var('DOMAIN', prefix
)
646 realm
= self
.env_get_var('REALM', prefix
)
647 allow_missing_username
= default_username
is not None
648 username
= self
.env_get_var('USERNAME', prefix
,
649 fallback_default
=False,
650 allow_missing
=allow_missing_username
)
652 username
= default_username
653 password
= self
.env_get_var('PASSWORD', prefix
,
654 fallback_default
=False,
655 allow_missing
=allow_missing_password
)
658 c
.set_username(username
)
659 if password
is not None:
660 c
.set_password(password
)
661 as_supported_enctypes
= self
.env_get_var('AS_SUPPORTED_ENCTYPES',
662 prefix
, allow_missing
=True)
663 if as_supported_enctypes
is not None:
664 c
.set_as_supported_enctypes(as_supported_enctypes
)
665 tgs_supported_enctypes
= self
.env_get_var('TGS_SUPPORTED_ENCTYPES',
666 prefix
, allow_missing
=True)
667 if tgs_supported_enctypes
is not None:
668 c
.set_tgs_supported_enctypes(tgs_supported_enctypes
)
669 ap_supported_enctypes
= self
.env_get_var('AP_SUPPORTED_ENCTYPES',
670 prefix
, allow_missing
=True)
671 if ap_supported_enctypes
is not None:
672 c
.set_ap_supported_enctypes(ap_supported_enctypes
)
674 if require_strongest_key
:
675 kvno_allow_missing
= False
677 aes256_allow_missing
= False
679 aes256_allow_missing
= True
681 kvno_allow_missing
= allow_missing_keys
682 aes256_allow_missing
= allow_missing_keys
683 kvno
= self
.env_get_var('KVNO', prefix
,
684 fallback_default
=False,
685 allow_missing
=kvno_allow_missing
)
688 aes256_key
= self
.env_get_var('AES256_KEY_HEX', prefix
,
689 fallback_default
=False,
690 allow_missing
=aes256_allow_missing
)
691 if aes256_key
is not None:
692 c
.set_forced_key(kcrypto
.Enctype
.AES256
, aes256_key
)
693 aes128_key
= self
.env_get_var('AES128_KEY_HEX', prefix
,
694 fallback_default
=False,
696 if aes128_key
is not None:
697 c
.set_forced_key(kcrypto
.Enctype
.AES128
, aes128_key
)
698 rc4_key
= self
.env_get_var('RC4_KEY_HEX', prefix
,
699 fallback_default
=False, allow_missing
=True)
700 if rc4_key
is not None:
701 c
.set_forced_key(kcrypto
.Enctype
.RC4
, rc4_key
)
703 if not allow_missing_keys
:
704 self
.assertTrue(c
.forced_keys
,
705 'Please supply %s encryption keys '
706 'in environment' % prefix
)
710 def _get_krb5_creds(self
,
712 default_username
=None,
713 allow_missing_password
=False,
714 allow_missing_keys
=True,
715 require_strongest_key
=False,
716 fallback_creds_fn
=None):
717 if prefix
in self
.creds_dict
:
718 return self
.creds_dict
[prefix
]
720 # We don't have the credentials already
724 # Try to obtain them from the environment
725 creds
= self
._get
_krb
5_creds
_from
_env
(
727 default_username
=default_username
,
728 allow_missing_password
=allow_missing_password
,
729 allow_missing_keys
=allow_missing_keys
,
730 require_strongest_key
=require_strongest_key
)
731 except Exception as err
:
732 # An error occurred, so save it for later
735 self
.assertIsNotNone(creds
)
736 # Save the obtained credentials
737 self
.creds_dict
[prefix
] = creds
740 if fallback_creds_fn
is not None:
742 # Try to use the fallback method
743 creds
= fallback_creds_fn()
744 except Exception as err
:
745 print("ERROR FROM ENV: %r" % (env_err
))
746 print("FALLBACK-FN: %s" % (fallback_creds_fn
))
747 print("FALLBACK-ERROR: %r" % (err
))
749 self
.assertIsNotNone(creds
)
750 # Save the obtained credentials
751 self
.creds_dict
[prefix
] = creds
754 # Both methods failed, so raise the exception from the
758 def get_user_creds(self
,
759 allow_missing_password
=False,
760 allow_missing_keys
=True):
761 c
= self
._get
_krb
5_creds
(prefix
=None,
762 allow_missing_password
=allow_missing_password
,
763 allow_missing_keys
=allow_missing_keys
)
766 def get_service_creds(self
,
767 allow_missing_password
=False,
768 allow_missing_keys
=True):
769 c
= self
._get
_krb
5_creds
(prefix
='SERVICE',
770 allow_missing_password
=allow_missing_password
,
771 allow_missing_keys
=allow_missing_keys
)
774 def get_client_creds(self
,
775 allow_missing_password
=False,
776 allow_missing_keys
=True):
777 c
= self
._get
_krb
5_creds
(prefix
='CLIENT',
778 allow_missing_password
=allow_missing_password
,
779 allow_missing_keys
=allow_missing_keys
)
782 def get_server_creds(self
,
783 allow_missing_password
=False,
784 allow_missing_keys
=True):
785 c
= self
._get
_krb
5_creds
(prefix
='SERVER',
786 allow_missing_password
=allow_missing_password
,
787 allow_missing_keys
=allow_missing_keys
)
790 def get_admin_creds(self
,
791 allow_missing_password
=False,
792 allow_missing_keys
=True):
793 c
= self
._get
_krb
5_creds
(prefix
='ADMIN',
794 allow_missing_password
=allow_missing_password
,
795 allow_missing_keys
=allow_missing_keys
)
796 c
.set_gensec_features(c
.get_gensec_features() | FEATURE_SEAL
)
799 def get_rodc_krbtgt_creds(self
,
801 require_strongest_key
=False):
802 if require_strongest_key
:
803 self
.assertTrue(require_keys
)
804 c
= self
._get
_krb
5_creds
(prefix
='RODC_KRBTGT',
805 allow_missing_password
=True,
806 allow_missing_keys
=not require_keys
,
807 require_strongest_key
=require_strongest_key
)
810 def get_krbtgt_creds(self
,
812 require_strongest_key
=False):
813 if require_strongest_key
:
814 self
.assertTrue(require_keys
)
815 c
= self
._get
_krb
5_creds
(prefix
='KRBTGT',
816 default_username
='krbtgt',
817 allow_missing_password
=True,
818 allow_missing_keys
=not require_keys
,
819 require_strongest_key
=require_strongest_key
)
822 def get_anon_creds(self
):
827 def asn1_dump(self
, name
, obj
, asn1_print
=None):
828 if asn1_print
is None:
829 asn1_print
= self
.do_asn1_print
832 sys
.stderr
.write("%s:\n%s" % (name
, obj
))
834 sys
.stderr
.write("%s" % (obj
))
836 def hex_dump(self
, name
, blob
, hexdump
=None):
838 hexdump
= self
.do_hexdump
841 "%s: %d\n%s" % (name
, len(blob
), self
.hexdump(blob
)))
850 if asn1Spec
is not None:
851 class_name
= type(asn1Spec
).__name
__.split(':')[0]
853 class_name
= "<None-asn1Spec>"
854 self
.hex_dump(class_name
, blob
, hexdump
=hexdump
)
855 obj
, _
= pyasn1_der_decode(blob
, asn1Spec
=asn1Spec
)
856 self
.asn1_dump(None, obj
, asn1_print
=asn1_print
)
858 obj
= pyasn1_native_encode(obj
)
869 obj
= pyasn1_native_decode(obj
, asn1Spec
=asn1Spec
)
870 class_name
= type(obj
).__name
__.split(':')[0]
871 if class_name
is not None:
872 self
.asn1_dump(None, obj
, asn1_print
=asn1_print
)
873 blob
= pyasn1_der_encode(obj
)
874 if class_name
is not None:
875 self
.hex_dump(class_name
, blob
, hexdump
=hexdump
)
878 def send_pdu(self
, req
, asn1_print
=None, hexdump
=None):
880 k5_pdu
= self
.der_encode(
881 req
, native_decode
=False, asn1_print
=asn1_print
, hexdump
=False)
882 header
= struct
.pack('>I', len(k5_pdu
))
885 self
.hex_dump("send_pdu", header
, hexdump
=hexdump
)
886 self
.hex_dump("send_pdu", k5_pdu
, hexdump
=hexdump
)
888 sent
= self
.s
.send(req_pdu
, 0)
889 if sent
== len(req_pdu
):
891 req_pdu
= req_pdu
[sent
:]
892 except socket
.error
as e
:
893 self
._disconnect
("send_pdu: %s" % e
)
896 self
._disconnect
("send_pdu: %s" % e
)
899 def recv_raw(self
, num_recv
=0xffff, hexdump
=None, timeout
=None):
902 if timeout
is not None:
903 self
.s
.settimeout(timeout
)
904 rep_pdu
= self
.s
.recv(num_recv
, 0)
905 self
.s
.settimeout(10)
906 if len(rep_pdu
) == 0:
907 self
._disconnect
("recv_raw: EOF")
909 self
.hex_dump("recv_raw", rep_pdu
, hexdump
=hexdump
)
910 except socket
.timeout
:
911 self
.s
.settimeout(10)
912 sys
.stderr
.write("recv_raw: TIMEOUT\n")
913 except socket
.error
as e
:
914 self
._disconnect
("recv_raw: %s" % e
)
917 self
._disconnect
("recv_raw: %s" % e
)
921 def recv_pdu_raw(self
, asn1_print
=None, hexdump
=None, timeout
=None):
924 raw_pdu
= self
.recv_raw(
925 num_recv
=4, hexdump
=hexdump
, timeout
=timeout
)
928 header
= struct
.unpack(">I", raw_pdu
[0:4])
935 raw_pdu
= self
.recv_raw(
936 num_recv
=missing
, hexdump
=hexdump
, timeout
=timeout
)
937 self
.assertGreaterEqual(len(raw_pdu
), 1)
939 missing
= k5_len
- len(rep_pdu
)
940 k5_raw
= self
.der_decode(
946 pvno
= k5_raw
['field-0']
947 self
.assertEqual(pvno
, 5)
948 msg_type
= k5_raw
['field-1']
949 self
.assertIn(msg_type
, [KRB_AS_REP
, KRB_TGS_REP
, KRB_ERROR
])
950 if msg_type
== KRB_AS_REP
:
951 asn1Spec
= krb5_asn1
.AS_REP()
952 elif msg_type
== KRB_TGS_REP
:
953 asn1Spec
= krb5_asn1
.TGS_REP()
954 elif msg_type
== KRB_ERROR
:
955 asn1Spec
= krb5_asn1
.KRB_ERROR()
956 rep
= self
.der_decode(rep_pdu
, asn1Spec
=asn1Spec
,
957 asn1_print
=asn1_print
, hexdump
=False)
958 return (rep
, rep_pdu
)
960 def recv_pdu(self
, asn1_print
=None, hexdump
=None, timeout
=None):
961 (rep
, rep_pdu
) = self
.recv_pdu_raw(asn1_print
=asn1_print
,
966 def assertIsConnected(self
):
967 self
.assertIsNotNone(self
.s
, msg
="Not connected")
969 def assertNotConnected(self
):
970 self
.assertIsNone(self
.s
, msg
="Is connected")
972 def send_recv_transaction(
979 host
= self
.host
if to_rodc
else self
.dc_host
982 self
.send_pdu(req
, asn1_print
=asn1_print
, hexdump
=hexdump
)
984 asn1_print
=asn1_print
, hexdump
=hexdump
, timeout
=timeout
)
986 self
._disconnect
("transaction failed")
988 self
._disconnect
("transaction done")
991 def assertNoValue(self
, value
):
992 self
.assertTrue(value
.isNoValue
)
994 def assertHasValue(self
, value
):
995 self
.assertIsNotNone(value
)
997 def getElementValue(self
, obj
, elem
):
1000 def assertElementMissing(self
, obj
, elem
):
1001 v
= self
.getElementValue(obj
, elem
)
1002 self
.assertIsNone(v
)
1004 def assertElementPresent(self
, obj
, elem
, expect_empty
=False):
1005 v
= self
.getElementValue(obj
, elem
)
1006 self
.assertIsNotNone(v
)
1007 if self
.strict_checking
:
1008 if isinstance(v
, collections
.abc
.Container
):
1010 self
.assertEqual(0, len(v
))
1012 self
.assertNotEqual(0, len(v
))
1014 def assertElementEqual(self
, obj
, elem
, value
):
1015 v
= self
.getElementValue(obj
, elem
)
1016 self
.assertIsNotNone(v
)
1017 self
.assertEqual(v
, value
)
1019 def assertElementEqualUTF8(self
, obj
, elem
, value
):
1020 v
= self
.getElementValue(obj
, elem
)
1021 self
.assertIsNotNone(v
)
1022 self
.assertEqual(v
, bytes(value
, 'utf8'))
1024 def assertPrincipalEqual(self
, princ1
, princ2
):
1025 self
.assertEqual(princ1
['name-type'], princ2
['name-type'])
1027 len(princ1
['name-string']),
1028 len(princ2
['name-string']),
1029 msg
="princ1=%s != princ2=%s" % (princ1
, princ2
))
1030 for idx
in range(len(princ1
['name-string'])):
1032 princ1
['name-string'][idx
],
1033 princ2
['name-string'][idx
],
1034 msg
="princ1=%s != princ2=%s" % (princ1
, princ2
))
1036 def assertElementEqualPrincipal(self
, obj
, elem
, value
):
1037 v
= self
.getElementValue(obj
, elem
)
1038 self
.assertIsNotNone(v
)
1039 v
= pyasn1_native_decode(v
, asn1Spec
=krb5_asn1
.PrincipalName())
1040 self
.assertPrincipalEqual(v
, value
)
1042 def assertElementKVNO(self
, obj
, elem
, value
):
1043 v
= self
.getElementValue(obj
, elem
)
1044 if value
== "autodetect":
1046 if value
is not None:
1047 self
.assertIsNotNone(v
)
1048 # The value on the wire should never be 0
1049 self
.assertNotEqual(v
, 0)
1050 # unspecified_kvno means we don't know the kvno,
1051 # but want to enforce its presence
1052 if value
is not self
.unspecified_kvno
:
1054 self
.assertNotEqual(value
, 0)
1055 self
.assertEqual(v
, value
)
1057 self
.assertIsNone(v
)
1059 def assertElementFlags(self
, obj
, elem
, expected
, unexpected
):
1060 v
= self
.getElementValue(obj
, elem
)
1061 self
.assertIsNotNone(v
)
1062 if expected
is not None:
1063 self
.assertIsInstance(expected
, krb5_asn1
.TicketFlags
)
1064 for i
, flag
in enumerate(expected
):
1066 self
.assertEqual('1', v
[i
],
1067 f
"'{expected.namedValues[i]}' "
1069 if unexpected
is not None:
1070 self
.assertIsInstance(unexpected
, krb5_asn1
.TicketFlags
)
1071 for i
, flag
in enumerate(unexpected
):
1073 self
.assertEqual('0', v
[i
],
1074 f
"'{unexpected.namedValues[i]}' "
1075 f
"unexpected in {v}")
1077 def get_KerberosTimeWithUsec(self
, epoch
=None, offset
=None):
1080 if offset
is not None:
1081 epoch
= epoch
+ int(offset
)
1082 dt
= datetime
.datetime
.fromtimestamp(epoch
, tz
=datetime
.timezone
.utc
)
1083 return (dt
.strftime("%Y%m%d%H%M%SZ"), dt
.microsecond
)
1085 def get_KerberosTime(self
, epoch
=None, offset
=None):
1086 (s
, _
) = self
.get_KerberosTimeWithUsec(epoch
=epoch
, offset
=offset
)
1089 def get_EpochFromKerberosTime(self
, kerberos_time
):
1090 if isinstance(kerberos_time
, bytes
):
1091 kerberos_time
= kerberos_time
.decode()
1093 epoch
= datetime
.datetime
.strptime(kerberos_time
,
1095 epoch
= epoch
.replace(tzinfo
=datetime
.timezone
.utc
)
1096 epoch
= int(epoch
.timestamp())
1100 def get_Nonce(self
):
1101 nonce_min
= 0x7f000000
1102 nonce_max
= 0x7fffffff
1103 v
= random
.randint(nonce_min
, nonce_max
)
1106 def get_pa_dict(self
, pa_data
):
1109 if pa_data
is not None:
1111 pa_type
= pa
['padata-type']
1112 if pa_type
in pa_dict
:
1113 raise RuntimeError(f
'Duplicate type {pa_type}')
1114 pa_dict
[pa_type
] = pa
['padata-value']
1118 def SessionKey_create(self
, etype
, contents
, kvno
=None):
1119 key
= kcrypto
.Key(etype
, contents
)
1120 return RodcPacEncryptionKey(key
, kvno
)
1122 def PasswordKey_create(self
, etype
=None, pwd
=None, salt
=None, kvno
=None):
1123 self
.assertIsNotNone(pwd
)
1124 self
.assertIsNotNone(salt
)
1125 key
= kcrypto
.string_to_key(etype
, pwd
, salt
)
1126 return RodcPacEncryptionKey(key
, kvno
)
1128 def PasswordKey_from_etype_info2(self
, creds
, etype_info2
, kvno
=None):
1129 e
= etype_info2
['etype']
1131 salt
= etype_info2
.get('salt')
1133 if e
== kcrypto
.Enctype
.RC4
:
1134 nthash
= creds
.get_nt_hash()
1135 return self
.SessionKey_create(etype
=e
, contents
=nthash
, kvno
=kvno
)
1137 password
= creds
.get_password()
1138 return self
.PasswordKey_create(
1139 etype
=e
, pwd
=password
, salt
=salt
, kvno
=kvno
)
1141 def TicketDecryptionKey_from_creds(self
, creds
, etype
=None):
1144 etypes
= creds
.get_tgs_krb5_etypes()
1148 etype
= kcrypto
.Enctype
.RC4
1150 forced_key
= creds
.get_forced_key(etype
)
1151 if forced_key
is not None:
1154 kvno
= creds
.get_kvno()
1156 fail_msg
= ("%s has no fixed key for etype[%s] kvno[%s] "
1157 "nor a password specified, " % (
1158 creds
.get_username(), etype
, kvno
))
1160 if etype
== kcrypto
.Enctype
.RC4
:
1161 nthash
= creds
.get_nt_hash()
1162 self
.assertIsNotNone(nthash
, msg
=fail_msg
)
1163 return self
.SessionKey_create(etype
=etype
,
1167 password
= creds
.get_password()
1168 self
.assertIsNotNone(password
, msg
=fail_msg
)
1169 salt
= creds
.get_salt()
1170 return self
.PasswordKey_create(etype
=etype
,
1175 def RandomKey(self
, etype
):
1176 e
= kcrypto
._get
_enctype
_profile
(etype
)
1177 contents
= samba
.generate_random_bytes(e
.keysize
)
1178 return self
.SessionKey_create(etype
=etype
, contents
=contents
)
1180 def EncryptionKey_import(self
, EncryptionKey_obj
):
1181 return self
.SessionKey_create(EncryptionKey_obj
['keytype'],
1182 EncryptionKey_obj
['keyvalue'])
1184 def EncryptedData_create(self
, key
, usage
, plaintext
):
1185 # EncryptedData ::= SEQUENCE {
1186 # etype [0] Int32 -- EncryptionType --,
1187 # kvno [1] Int32 OPTIONAL,
1188 # cipher [2] OCTET STRING -- ciphertext
1190 ciphertext
= key
.encrypt(usage
, plaintext
)
1191 EncryptedData_obj
= {
1193 'cipher': ciphertext
1195 if key
.kvno
is not None:
1196 EncryptedData_obj
['kvno'] = key
.kvno
1197 return EncryptedData_obj
1199 def Checksum_create(self
, key
, usage
, plaintext
, ctype
=None):
1200 # Checksum ::= SEQUENCE {
1201 # cksumtype [0] Int32,
1202 # checksum [1] OCTET STRING
1206 checksum
= key
.make_checksum(usage
, plaintext
, ctype
=ctype
)
1209 'checksum': checksum
,
1214 def PrincipalName_create(cls
, name_type
, names
):
1215 # PrincipalName ::= SEQUENCE {
1216 # name-type [0] Int32,
1217 # name-string [1] SEQUENCE OF KerberosString
1219 PrincipalName_obj
= {
1220 'name-type': name_type
,
1221 'name-string': names
,
1223 return PrincipalName_obj
1225 def AuthorizationData_create(self
, ad_type
, ad_data
):
1226 # AuthorizationData ::= SEQUENCE {
1227 # ad-type [0] Int32,
1228 # ad-data [1] OCTET STRING
1234 return AUTH_DATA_obj
1236 def PA_DATA_create(self
, padata_type
, padata_value
):
1237 # PA-DATA ::= SEQUENCE {
1238 # -- NOTE: first tag is [1], not [0]
1239 # padata-type [1] Int32,
1240 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1243 'padata-type': padata_type
,
1244 'padata-value': padata_value
,
1248 def PA_ENC_TS_ENC_create(self
, ts
, usec
):
1249 # PA-ENC-TS-ENC ::= SEQUENCE {
1250 # patimestamp[0] KerberosTime, -- client's time
1251 # pausec[1] krb5int32 OPTIONAL
1253 PA_ENC_TS_ENC_obj
= {
1257 return PA_ENC_TS_ENC_obj
1259 def PA_PAC_OPTIONS_create(self
, options
):
1260 # PA-PAC-OPTIONS ::= SEQUENCE {
1261 # options [0] PACOptionFlags
1263 PA_PAC_OPTIONS_obj
= {
1266 return PA_PAC_OPTIONS_obj
1268 def KRB_FAST_ARMOR_create(self
, armor_type
, armor_value
):
1269 # KrbFastArmor ::= SEQUENCE {
1270 # armor-type [0] Int32,
1271 # armor-value [1] OCTET STRING,
1274 KRB_FAST_ARMOR_obj
= {
1275 'armor-type': armor_type
,
1276 'armor-value': armor_value
1278 return KRB_FAST_ARMOR_obj
1280 def KRB_FAST_REQ_create(self
, fast_options
, padata
, req_body
):
1281 # KrbFastReq ::= SEQUENCE {
1282 # fast-options [0] FastOptions,
1283 # padata [1] SEQUENCE OF PA-DATA,
1284 # req-body [2] KDC-REQ-BODY,
1287 KRB_FAST_REQ_obj
= {
1288 'fast-options': fast_options
,
1290 'req-body': req_body
1292 return KRB_FAST_REQ_obj
1294 def KRB_FAST_ARMORED_REQ_create(self
, armor
, req_checksum
, enc_fast_req
):
1295 # KrbFastArmoredReq ::= SEQUENCE {
1296 # armor [0] KrbFastArmor OPTIONAL,
1297 # req-checksum [1] Checksum,
1298 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1300 KRB_FAST_ARMORED_REQ_obj
= {
1301 'req-checksum': req_checksum
,
1302 'enc-fast-req': enc_fast_req
1304 if armor
is not None:
1305 KRB_FAST_ARMORED_REQ_obj
['armor'] = armor
1306 return KRB_FAST_ARMORED_REQ_obj
1308 def PA_FX_FAST_REQUEST_create(self
, armored_data
):
1309 # PA-FX-FAST-REQUEST ::= CHOICE {
1310 # armored-data [0] KrbFastArmoredReq,
1313 PA_FX_FAST_REQUEST_obj
= {
1314 'armored-data': armored_data
1316 return PA_FX_FAST_REQUEST_obj
1318 def KERB_PA_PAC_REQUEST_create(self
, include_pac
, pa_data_create
=True):
1319 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1320 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1322 # --If FALSE, and PAC present,
1325 KERB_PA_PAC_REQUEST_obj
= {
1326 'include-pac': include_pac
,
1328 if not pa_data_create
:
1329 return KERB_PA_PAC_REQUEST_obj
1330 pa_pac
= self
.der_encode(KERB_PA_PAC_REQUEST_obj
,
1331 asn1Spec
=krb5_asn1
.KERB_PA_PAC_REQUEST())
1332 pa_data
= self
.PA_DATA_create(PADATA_PAC_REQUEST
, pa_pac
)
1335 def get_pa_pac_options(self
, options
):
1336 pac_options
= self
.PA_PAC_OPTIONS_create(options
)
1337 pac_options
= self
.der_encode(pac_options
,
1338 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
1339 pac_options
= self
.PA_DATA_create(PADATA_PAC_OPTIONS
, pac_options
)
1343 def KDC_REQ_BODY_create(self
,
1355 EncAuthorizationData
,
1356 EncAuthorizationData_key
,
1357 EncAuthorizationData_usage
,
1360 # KDC-REQ-BODY ::= SEQUENCE {
1361 # kdc-options [0] KDCOptions,
1362 # cname [1] PrincipalName OPTIONAL
1363 # -- Used only in AS-REQ --,
1366 # -- Also client's in AS-REQ --,
1367 # sname [3] PrincipalName OPTIONAL,
1368 # from [4] KerberosTime OPTIONAL,
1369 # till [5] KerberosTime,
1370 # rtime [6] KerberosTime OPTIONAL,
1372 # etype [8] SEQUENCE OF Int32
1374 # -- in preference order --,
1375 # addresses [9] HostAddresses OPTIONAL,
1376 # enc-authorization-data [10] EncryptedData OPTIONAL
1377 # -- AuthorizationData --,
1378 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1379 # -- NOTE: not empty
1381 if EncAuthorizationData
is not None:
1382 enc_ad_plain
= self
.der_encode(
1383 EncAuthorizationData
,
1384 asn1Spec
=krb5_asn1
.AuthorizationData(),
1385 asn1_print
=asn1_print
,
1387 enc_ad
= self
.EncryptedData_create(EncAuthorizationData_key
,
1388 EncAuthorizationData_usage
,
1392 KDC_REQ_BODY_obj
= {
1393 'kdc-options': kdc_options
,
1399 if cname
is not None:
1400 KDC_REQ_BODY_obj
['cname'] = cname
1401 if sname
is not None:
1402 KDC_REQ_BODY_obj
['sname'] = sname
1403 if from_time
is not None:
1404 KDC_REQ_BODY_obj
['from'] = from_time
1405 if renew_time
is not None:
1406 KDC_REQ_BODY_obj
['rtime'] = renew_time
1407 if addresses
is not None:
1408 KDC_REQ_BODY_obj
['addresses'] = addresses
1409 if enc_ad
is not None:
1410 KDC_REQ_BODY_obj
['enc-authorization-data'] = enc_ad
1411 if additional_tickets
is not None:
1412 KDC_REQ_BODY_obj
['additional-tickets'] = additional_tickets
1413 return KDC_REQ_BODY_obj
1415 def KDC_REQ_create(self
,
1422 # KDC-REQ ::= SEQUENCE {
1423 # -- NOTE: first tag is [1], not [0]
1424 # pvno [1] INTEGER (5) ,
1425 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1426 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1427 # -- NOTE: not empty --,
1428 # req-body [4] KDC-REQ-BODY
1433 'msg-type': msg_type
,
1434 'req-body': req_body
,
1436 if padata
is not None:
1437 KDC_REQ_obj
['padata'] = padata
1438 if asn1Spec
is not None:
1439 KDC_REQ_decoded
= pyasn1_native_decode(
1440 KDC_REQ_obj
, asn1Spec
=asn1Spec
)
1442 KDC_REQ_decoded
= None
1443 return KDC_REQ_obj
, KDC_REQ_decoded
1445 def AS_REQ_create(self
,
1447 kdc_options
, # required
1451 from_time
, # optional
1452 till_time
, # required
1453 renew_time
, # optional
1456 addresses
, # optional
1458 native_decoded_only
=True,
1461 # KDC-REQ ::= SEQUENCE {
1462 # -- NOTE: first tag is [1], not [0]
1463 # pvno [1] INTEGER (5) ,
1464 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1465 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1466 # -- NOTE: not empty --,
1467 # req-body [4] KDC-REQ-BODY
1470 # KDC-REQ-BODY ::= SEQUENCE {
1471 # kdc-options [0] KDCOptions,
1472 # cname [1] PrincipalName OPTIONAL
1473 # -- Used only in AS-REQ --,
1476 # -- Also client's in AS-REQ --,
1477 # sname [3] PrincipalName OPTIONAL,
1478 # from [4] KerberosTime OPTIONAL,
1479 # till [5] KerberosTime,
1480 # rtime [6] KerberosTime OPTIONAL,
1482 # etype [8] SEQUENCE OF Int32
1484 # -- in preference order --,
1485 # addresses [9] HostAddresses OPTIONAL,
1486 # enc-authorization-data [10] EncryptedData OPTIONAL
1487 # -- AuthorizationData --,
1488 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1489 # -- NOTE: not empty
1491 KDC_REQ_BODY_obj
= self
.KDC_REQ_BODY_create(
1503 EncAuthorizationData
=None,
1504 EncAuthorizationData_key
=None,
1505 EncAuthorizationData_usage
=None,
1506 asn1_print
=asn1_print
,
1508 obj
, decoded
= self
.KDC_REQ_create(
1509 msg_type
=KRB_AS_REQ
,
1511 req_body
=KDC_REQ_BODY_obj
,
1512 asn1Spec
=krb5_asn1
.AS_REQ(),
1513 asn1_print
=asn1_print
,
1515 if native_decoded_only
:
1519 def AP_REQ_create(self
, ap_options
, ticket
, authenticator
):
1520 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1521 # pvno [0] INTEGER (5),
1522 # msg-type [1] INTEGER (14),
1523 # ap-options [2] APOptions,
1524 # ticket [3] Ticket,
1525 # authenticator [4] EncryptedData -- Authenticator
1529 'msg-type': KRB_AP_REQ
,
1530 'ap-options': ap_options
,
1532 'authenticator': authenticator
,
1536 def Authenticator_create(
1537 self
, crealm
, cname
, cksum
, cusec
, ctime
, subkey
, seq_number
,
1538 authorization_data
):
1539 # -- Unencrypted authenticator
1540 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1541 # authenticator-vno [0] INTEGER (5),
1543 # cname [2] PrincipalName,
1544 # cksum [3] Checksum OPTIONAL,
1545 # cusec [4] Microseconds,
1546 # ctime [5] KerberosTime,
1547 # subkey [6] EncryptionKey OPTIONAL,
1548 # seq-number [7] UInt32 OPTIONAL,
1549 # authorization-data [8] AuthorizationData OPTIONAL
1551 Authenticator_obj
= {
1552 'authenticator-vno': 5,
1558 if cksum
is not None:
1559 Authenticator_obj
['cksum'] = cksum
1560 if subkey
is not None:
1561 Authenticator_obj
['subkey'] = subkey
1562 if seq_number
is not None:
1563 Authenticator_obj
['seq-number'] = seq_number
1564 if authorization_data
is not None:
1565 Authenticator_obj
['authorization-data'] = authorization_data
1566 return Authenticator_obj
1568 def TGS_REQ_create(self
,
1573 kdc_options
, # required
1577 from_time
, # optional
1578 till_time
, # required
1579 renew_time
, # optional
1582 addresses
, # optional
1583 EncAuthorizationData
,
1584 EncAuthorizationData_key
,
1587 authenticator_subkey
=None,
1588 body_checksum_type
=None,
1589 native_decoded_only
=True,
1592 # KDC-REQ ::= SEQUENCE {
1593 # -- NOTE: first tag is [1], not [0]
1594 # pvno [1] INTEGER (5) ,
1595 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1596 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1597 # -- NOTE: not empty --,
1598 # req-body [4] KDC-REQ-BODY
1601 # KDC-REQ-BODY ::= SEQUENCE {
1602 # kdc-options [0] KDCOptions,
1603 # cname [1] PrincipalName OPTIONAL
1604 # -- Used only in AS-REQ --,
1607 # -- Also client's in AS-REQ --,
1608 # sname [3] PrincipalName OPTIONAL,
1609 # from [4] KerberosTime OPTIONAL,
1610 # till [5] KerberosTime,
1611 # rtime [6] KerberosTime OPTIONAL,
1613 # etype [8] SEQUENCE OF Int32
1615 # -- in preference order --,
1616 # addresses [9] HostAddresses OPTIONAL,
1617 # enc-authorization-data [10] EncryptedData OPTIONAL
1618 # -- AuthorizationData --,
1619 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1620 # -- NOTE: not empty
1623 if authenticator_subkey
is not None:
1624 EncAuthorizationData_usage
= KU_TGS_REQ_AUTH_DAT_SUBKEY
1626 EncAuthorizationData_usage
= KU_TGS_REQ_AUTH_DAT_SESSION
1628 req_body
= self
.KDC_REQ_BODY_create(
1629 kdc_options
=kdc_options
,
1633 from_time
=from_time
,
1634 till_time
=till_time
,
1635 renew_time
=renew_time
,
1638 addresses
=addresses
,
1639 additional_tickets
=additional_tickets
,
1640 EncAuthorizationData
=EncAuthorizationData
,
1641 EncAuthorizationData_key
=EncAuthorizationData_key
,
1642 EncAuthorizationData_usage
=EncAuthorizationData_usage
)
1643 req_body_blob
= self
.der_encode(req_body
,
1644 asn1Spec
=krb5_asn1
.KDC_REQ_BODY(),
1645 asn1_print
=asn1_print
, hexdump
=hexdump
)
1647 req_body_checksum
= self
.Checksum_create(ticket_session_key
,
1648 KU_TGS_REQ_AUTH_CKSUM
,
1650 ctype
=body_checksum_type
)
1653 if authenticator_subkey
is not None:
1654 subkey_obj
= authenticator_subkey
.export_obj()
1655 seq_number
= random
.randint(0, 0xfffffffe)
1656 authenticator
= self
.Authenticator_create(
1659 cksum
=req_body_checksum
,
1663 seq_number
=seq_number
,
1664 authorization_data
=None)
1665 authenticator
= self
.der_encode(
1667 asn1Spec
=krb5_asn1
.Authenticator(),
1668 asn1_print
=asn1_print
,
1671 authenticator
= self
.EncryptedData_create(
1672 ticket_session_key
, KU_TGS_REQ_AUTH
, authenticator
)
1674 ap_options
= krb5_asn1
.APOptions('0')
1675 ap_req
= self
.AP_REQ_create(ap_options
=str(ap_options
),
1677 authenticator
=authenticator
)
1678 ap_req
= self
.der_encode(ap_req
, asn1Spec
=krb5_asn1
.AP_REQ(),
1679 asn1_print
=asn1_print
, hexdump
=hexdump
)
1680 pa_tgs_req
= self
.PA_DATA_create(PADATA_KDC_REQ
, ap_req
)
1681 if padata
is not None:
1682 padata
.append(pa_tgs_req
)
1684 padata
= [pa_tgs_req
]
1686 obj
, decoded
= self
.KDC_REQ_create(
1687 msg_type
=KRB_TGS_REQ
,
1690 asn1Spec
=krb5_asn1
.TGS_REQ(),
1691 asn1_print
=asn1_print
,
1693 if native_decoded_only
:
1697 def PA_S4U2Self_create(self
, name
, realm
, tgt_session_key
, ctype
=None):
1698 # PA-S4U2Self ::= SEQUENCE {
1699 # name [0] PrincipalName,
1701 # cksum [2] Checksum,
1702 # auth [3] GeneralString
1704 cksum_data
= name
['name-type'].to_bytes(4, byteorder
='little')
1705 for n
in name
['name-string']:
1706 cksum_data
+= n
.encode()
1707 cksum_data
+= realm
.encode()
1708 cksum_data
+= "Kerberos".encode()
1709 cksum
= self
.Checksum_create(tgt_session_key
,
1710 KU_NON_KERB_CKSUM_SALT
,
1720 pa_s4u2self
= self
.der_encode(
1721 PA_S4U2Self_obj
, asn1Spec
=krb5_asn1
.PA_S4U2Self())
1722 return self
.PA_DATA_create(PADATA_FOR_USER
, pa_s4u2self
)
1724 def _generic_kdc_exchange(self
,
1725 kdc_exchange_dict
, # required
1726 cname
=None, # optional
1727 realm
=None, # required
1728 sname
=None, # optional
1729 from_time
=None, # optional
1730 till_time
=None, # required
1731 renew_time
=None, # optional
1732 etypes
=None, # required
1733 addresses
=None, # optional
1734 additional_tickets
=None, # optional
1735 EncAuthorizationData
=None, # optional
1736 EncAuthorizationData_key
=None, # optional
1737 EncAuthorizationData_usage
=None): # optional
1739 check_error_fn
= kdc_exchange_dict
['check_error_fn']
1740 check_rep_fn
= kdc_exchange_dict
['check_rep_fn']
1741 generate_fast_fn
= kdc_exchange_dict
['generate_fast_fn']
1742 generate_fast_armor_fn
= kdc_exchange_dict
['generate_fast_armor_fn']
1743 generate_fast_padata_fn
= kdc_exchange_dict
['generate_fast_padata_fn']
1744 generate_padata_fn
= kdc_exchange_dict
['generate_padata_fn']
1745 callback_dict
= kdc_exchange_dict
['callback_dict']
1746 req_msg_type
= kdc_exchange_dict
['req_msg_type']
1747 req_asn1Spec
= kdc_exchange_dict
['req_asn1Spec']
1748 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
1750 expected_error_mode
= kdc_exchange_dict
['expected_error_mode']
1751 kdc_options
= kdc_exchange_dict
['kdc_options']
1753 pac_request
= kdc_exchange_dict
['pac_request']
1754 pac_options
= kdc_exchange_dict
['pac_options']
1756 # Parameters specific to the inner request body
1757 inner_req
= kdc_exchange_dict
['inner_req']
1759 # Parameters specific to the outer request body
1760 outer_req
= kdc_exchange_dict
['outer_req']
1762 if till_time
is None:
1763 till_time
= self
.get_KerberosTime(offset
=36000)
1765 if 'nonce' in kdc_exchange_dict
:
1766 nonce
= kdc_exchange_dict
['nonce']
1768 nonce
= self
.get_Nonce()
1769 kdc_exchange_dict
['nonce'] = nonce
1771 req_body
= self
.KDC_REQ_BODY_create(
1772 kdc_options
=kdc_options
,
1776 from_time
=from_time
,
1777 till_time
=till_time
,
1778 renew_time
=renew_time
,
1781 addresses
=addresses
,
1782 additional_tickets
=additional_tickets
,
1783 EncAuthorizationData
=EncAuthorizationData
,
1784 EncAuthorizationData_key
=EncAuthorizationData_key
,
1785 EncAuthorizationData_usage
=EncAuthorizationData_usage
)
1787 inner_req_body
= dict(req_body
)
1788 if inner_req
is not None:
1789 for key
, value
in inner_req
.items():
1790 if value
is not None:
1791 inner_req_body
[key
] = value
1793 del inner_req_body
[key
]
1794 if outer_req
is not None:
1795 for key
, value
in outer_req
.items():
1796 if value
is not None:
1797 req_body
[key
] = value
1801 additional_padata
= []
1802 if pac_request
is not None:
1803 pa_pac_request
= self
.KERB_PA_PAC_REQUEST_create(pac_request
)
1804 additional_padata
.append(pa_pac_request
)
1805 if pac_options
is not None:
1806 pa_pac_options
= self
.get_pa_pac_options(pac_options
)
1807 additional_padata
.append(pa_pac_options
)
1809 if req_msg_type
== KRB_AS_REQ
:
1811 tgs_req_padata
= None
1813 self
.assertEqual(KRB_TGS_REQ
, req_msg_type
)
1815 tgs_req
= self
.generate_ap_req(kdc_exchange_dict
,
1819 tgs_req_padata
= self
.PA_DATA_create(PADATA_KDC_REQ
, tgs_req
)
1821 if generate_fast_padata_fn
is not None:
1822 self
.assertIsNotNone(generate_fast_fn
)
1823 # This can alter req_body...
1824 fast_padata
, req_body
= generate_fast_padata_fn(kdc_exchange_dict
,
1830 if generate_fast_armor_fn
is not None:
1831 self
.assertIsNotNone(generate_fast_fn
)
1832 fast_ap_req
= generate_fast_armor_fn(kdc_exchange_dict
,
1837 fast_armor_type
= kdc_exchange_dict
['fast_armor_type']
1838 fast_armor
= self
.KRB_FAST_ARMOR_create(fast_armor_type
,
1843 if generate_padata_fn
is not None:
1844 # This can alter req_body...
1845 outer_padata
, req_body
= generate_padata_fn(kdc_exchange_dict
,
1848 self
.assertIsNotNone(outer_padata
)
1849 self
.assertNotIn(PADATA_KDC_REQ
,
1850 [pa
['padata-type'] for pa
in outer_padata
],
1851 'Don\'t create TGS-REQ manually')
1855 if generate_fast_fn
is not None:
1856 armor_key
= kdc_exchange_dict
['armor_key']
1857 self
.assertIsNotNone(armor_key
)
1859 if req_msg_type
== KRB_AS_REQ
:
1860 checksum_blob
= self
.der_encode(
1862 asn1Spec
=krb5_asn1
.KDC_REQ_BODY())
1864 self
.assertEqual(KRB_TGS_REQ
, req_msg_type
)
1865 checksum_blob
= tgs_req
1867 checksum
= self
.Checksum_create(armor_key
,
1871 fast_padata
+= additional_padata
1872 fast
= generate_fast_fn(kdc_exchange_dict
,
1883 if tgs_req_padata
is not None:
1884 padata
.append(tgs_req_padata
)
1886 if fast
is not None:
1889 if outer_padata
is not None:
1890 padata
+= outer_padata
1893 padata
+= additional_padata
1898 kdc_exchange_dict
['req_padata'] = padata
1899 kdc_exchange_dict
['fast_padata'] = fast_padata
1900 kdc_exchange_dict
['req_body'] = inner_req_body
1902 req_obj
, req_decoded
= self
.KDC_REQ_create(msg_type
=req_msg_type
,
1905 asn1Spec
=req_asn1Spec())
1907 to_rodc
= kdc_exchange_dict
['to_rodc']
1909 rep
= self
.send_recv_transaction(req_decoded
, to_rodc
=to_rodc
)
1910 self
.assertIsNotNone(rep
)
1912 msg_type
= self
.getElementValue(rep
, 'msg-type')
1913 self
.assertIsNotNone(msg_type
)
1915 expected_msg_type
= None
1916 if check_error_fn
is not None:
1917 expected_msg_type
= KRB_ERROR
1918 self
.assertIsNone(check_rep_fn
)
1919 self
.assertNotEqual(0, len(expected_error_mode
))
1920 self
.assertNotIn(0, expected_error_mode
)
1921 if check_rep_fn
is not None:
1922 expected_msg_type
= rep_msg_type
1923 self
.assertIsNone(check_error_fn
)
1924 self
.assertEqual(0, len(expected_error_mode
))
1925 self
.assertIsNotNone(expected_msg_type
)
1926 self
.assertEqual(msg_type
, expected_msg_type
)
1928 if msg_type
== KRB_ERROR
:
1929 return check_error_fn(kdc_exchange_dict
,
1933 return check_rep_fn(kdc_exchange_dict
, callback_dict
, rep
)
1935 def as_exchange_dict(self
,
1936 expected_crealm
=None,
1937 expected_cname
=None,
1938 expected_anon
=False,
1939 expected_srealm
=None,
1940 expected_sname
=None,
1941 expected_supported_etypes
=None,
1942 expected_flags
=None,
1943 unexpected_flags
=None,
1944 ticket_decryption_key
=None,
1945 expect_ticket_checksum
=None,
1946 generate_fast_fn
=None,
1947 generate_fast_armor_fn
=None,
1948 generate_fast_padata_fn
=None,
1949 fast_armor_type
=FX_FAST_ARMOR_AP_REQUEST
,
1950 generate_padata_fn
=None,
1951 check_error_fn
=None,
1953 check_kdc_private_fn
=None,
1955 expected_error_mode
=0,
1956 expected_status
=None,
1957 client_as_etypes
=None,
1959 authenticator_subkey
=None,
1974 if expected_error_mode
== 0:
1975 expected_error_mode
= ()
1976 elif not isinstance(expected_error_mode
, collections
.abc
.Container
):
1977 expected_error_mode
= (expected_error_mode
,)
1979 kdc_exchange_dict
= {
1980 'req_msg_type': KRB_AS_REQ
,
1981 'req_asn1Spec': krb5_asn1
.AS_REQ
,
1982 'rep_msg_type': KRB_AS_REP
,
1983 'rep_asn1Spec': krb5_asn1
.AS_REP
,
1984 'rep_encpart_asn1Spec': krb5_asn1
.EncASRepPart
,
1985 'expected_crealm': expected_crealm
,
1986 'expected_cname': expected_cname
,
1987 'expected_anon': expected_anon
,
1988 'expected_srealm': expected_srealm
,
1989 'expected_sname': expected_sname
,
1990 'expected_supported_etypes': expected_supported_etypes
,
1991 'expected_flags': expected_flags
,
1992 'unexpected_flags': unexpected_flags
,
1993 'ticket_decryption_key': ticket_decryption_key
,
1994 'expect_ticket_checksum': expect_ticket_checksum
,
1995 'generate_fast_fn': generate_fast_fn
,
1996 'generate_fast_armor_fn': generate_fast_armor_fn
,
1997 'generate_fast_padata_fn': generate_fast_padata_fn
,
1998 'fast_armor_type': fast_armor_type
,
1999 'generate_padata_fn': generate_padata_fn
,
2000 'check_error_fn': check_error_fn
,
2001 'check_rep_fn': check_rep_fn
,
2002 'check_kdc_private_fn': check_kdc_private_fn
,
2003 'callback_dict': callback_dict
,
2004 'expected_error_mode': expected_error_mode
,
2005 'expected_status': expected_status
,
2006 'client_as_etypes': client_as_etypes
,
2007 'expected_salt': expected_salt
,
2008 'authenticator_subkey': authenticator_subkey
,
2009 'preauth_key': preauth_key
,
2010 'armor_key': armor_key
,
2011 'armor_tgt': armor_tgt
,
2012 'armor_subkey': armor_subkey
,
2013 'auth_data': auth_data
,
2014 'kdc_options': kdc_options
,
2015 'inner_req': inner_req
,
2016 'outer_req': outer_req
,
2017 'pac_request': pac_request
,
2018 'pac_options': pac_options
,
2019 'expect_edata': expect_edata
,
2020 'expect_pac': expect_pac
,
2021 'expect_claims': expect_claims
,
2024 if callback_dict
is None:
2027 return kdc_exchange_dict
2029 def tgs_exchange_dict(self
,
2030 expected_crealm
=None,
2031 expected_cname
=None,
2032 expected_anon
=False,
2033 expected_srealm
=None,
2034 expected_sname
=None,
2035 expected_supported_etypes
=None,
2036 expected_flags
=None,
2037 unexpected_flags
=None,
2038 ticket_decryption_key
=None,
2039 expect_ticket_checksum
=None,
2040 generate_fast_fn
=None,
2041 generate_fast_armor_fn
=None,
2042 generate_fast_padata_fn
=None,
2043 fast_armor_type
=FX_FAST_ARMOR_AP_REQUEST
,
2044 generate_padata_fn
=None,
2045 check_error_fn
=None,
2047 check_kdc_private_fn
=None,
2048 expected_error_mode
=0,
2049 expected_status
=None,
2055 authenticator_subkey
=None,
2057 body_checksum_type
=None,
2066 expected_proxy_target
=None,
2067 expected_transited_services
=None,
2069 if expected_error_mode
== 0:
2070 expected_error_mode
= ()
2071 elif not isinstance(expected_error_mode
, collections
.abc
.Container
):
2072 expected_error_mode
= (expected_error_mode
,)
2074 kdc_exchange_dict
= {
2075 'req_msg_type': KRB_TGS_REQ
,
2076 'req_asn1Spec': krb5_asn1
.TGS_REQ
,
2077 'rep_msg_type': KRB_TGS_REP
,
2078 'rep_asn1Spec': krb5_asn1
.TGS_REP
,
2079 'rep_encpart_asn1Spec': krb5_asn1
.EncTGSRepPart
,
2080 'expected_crealm': expected_crealm
,
2081 'expected_cname': expected_cname
,
2082 'expected_anon': expected_anon
,
2083 'expected_srealm': expected_srealm
,
2084 'expected_sname': expected_sname
,
2085 'expected_supported_etypes': expected_supported_etypes
,
2086 'expected_flags': expected_flags
,
2087 'unexpected_flags': unexpected_flags
,
2088 'ticket_decryption_key': ticket_decryption_key
,
2089 'expect_ticket_checksum': expect_ticket_checksum
,
2090 'generate_fast_fn': generate_fast_fn
,
2091 'generate_fast_armor_fn': generate_fast_armor_fn
,
2092 'generate_fast_padata_fn': generate_fast_padata_fn
,
2093 'fast_armor_type': fast_armor_type
,
2094 'generate_padata_fn': generate_padata_fn
,
2095 'check_error_fn': check_error_fn
,
2096 'check_rep_fn': check_rep_fn
,
2097 'check_kdc_private_fn': check_kdc_private_fn
,
2098 'callback_dict': callback_dict
,
2099 'expected_error_mode': expected_error_mode
,
2100 'expected_status': expected_status
,
2102 'body_checksum_type': body_checksum_type
,
2103 'armor_key': armor_key
,
2104 'armor_tgt': armor_tgt
,
2105 'armor_subkey': armor_subkey
,
2106 'auth_data': auth_data
,
2107 'authenticator_subkey': authenticator_subkey
,
2108 'kdc_options': kdc_options
,
2109 'inner_req': inner_req
,
2110 'outer_req': outer_req
,
2111 'pac_request': pac_request
,
2112 'pac_options': pac_options
,
2113 'expect_edata': expect_edata
,
2114 'expect_pac': expect_pac
,
2115 'expect_claims': expect_claims
,
2116 'expected_proxy_target': expected_proxy_target
,
2117 'expected_transited_services': expected_transited_services
,
2120 if callback_dict
is None:
2123 return kdc_exchange_dict
2125 def generic_check_kdc_rep(self
,
2130 expected_crealm
= kdc_exchange_dict
['expected_crealm']
2131 expected_anon
= kdc_exchange_dict
['expected_anon']
2132 expected_srealm
= kdc_exchange_dict
['expected_srealm']
2133 expected_sname
= kdc_exchange_dict
['expected_sname']
2134 ticket_decryption_key
= kdc_exchange_dict
['ticket_decryption_key']
2135 check_kdc_private_fn
= kdc_exchange_dict
['check_kdc_private_fn']
2136 rep_encpart_asn1Spec
= kdc_exchange_dict
['rep_encpart_asn1Spec']
2137 msg_type
= kdc_exchange_dict
['rep_msg_type']
2138 armor_key
= kdc_exchange_dict
['armor_key']
2140 self
.assertElementEqual(rep
, 'msg-type', msg_type
) # AS-REP | TGS-REP
2141 padata
= self
.getElementValue(rep
, 'padata')
2142 if self
.strict_checking
:
2143 self
.assertElementEqualUTF8(rep
, 'crealm', expected_crealm
)
2145 expected_cname
= self
.PrincipalName_create(
2146 name_type
=NT_WELLKNOWN
,
2147 names
=['WELLKNOWN', 'ANONYMOUS'])
2149 expected_cname
= kdc_exchange_dict
['expected_cname']
2150 self
.assertElementEqualPrincipal(rep
, 'cname', expected_cname
)
2151 self
.assertElementPresent(rep
, 'ticket')
2152 ticket
= self
.getElementValue(rep
, 'ticket')
2153 ticket_encpart
= None
2154 ticket_cipher
= None
2155 self
.assertIsNotNone(ticket
)
2156 if ticket
is not None: # Never None, but gives indentation
2157 self
.assertElementEqual(ticket
, 'tkt-vno', 5)
2158 self
.assertElementEqualUTF8(ticket
, 'realm', expected_srealm
)
2159 self
.assertElementEqualPrincipal(ticket
, 'sname', expected_sname
)
2160 self
.assertElementPresent(ticket
, 'enc-part')
2161 ticket_encpart
= self
.getElementValue(ticket
, 'enc-part')
2162 self
.assertIsNotNone(ticket_encpart
)
2163 if ticket_encpart
is not None: # Never None, but gives indentation
2164 self
.assertElementPresent(ticket_encpart
, 'etype')
2165 # 'unspecified' means present, with any value != 0
2166 self
.assertElementKVNO(ticket_encpart
, 'kvno',
2167 self
.unspecified_kvno
)
2168 self
.assertElementPresent(ticket_encpart
, 'cipher')
2169 ticket_cipher
= self
.getElementValue(ticket_encpart
, 'cipher')
2170 self
.assertElementPresent(rep
, 'enc-part')
2171 encpart
= self
.getElementValue(rep
, 'enc-part')
2172 encpart_cipher
= None
2173 self
.assertIsNotNone(encpart
)
2174 if encpart
is not None: # Never None, but gives indentation
2175 self
.assertElementPresent(encpart
, 'etype')
2176 self
.assertElementKVNO(ticket_encpart
, 'kvno', 'autodetect')
2177 self
.assertElementPresent(encpart
, 'cipher')
2178 encpart_cipher
= self
.getElementValue(encpart
, 'cipher')
2180 ticket_checksum
= None
2182 # Get the decryption key for the encrypted part
2183 encpart_decryption_key
, encpart_decryption_usage
= (
2184 self
.get_preauth_key(kdc_exchange_dict
))
2186 if armor_key
is not None:
2187 pa_dict
= self
.get_pa_dict(padata
)
2189 if PADATA_FX_FAST
in pa_dict
:
2190 fx_fast_data
= pa_dict
[PADATA_FX_FAST
]
2191 fast_response
= self
.check_fx_fast_data(kdc_exchange_dict
,
2196 if 'strengthen-key' in fast_response
:
2197 strengthen_key
= self
.EncryptionKey_import(
2198 fast_response
['strengthen-key'])
2199 encpart_decryption_key
= (
2200 self
.generate_strengthen_reply_key(
2202 encpart_decryption_key
))
2204 fast_finished
= fast_response
.get('finished')
2205 if fast_finished
is not None:
2206 ticket_checksum
= fast_finished
['ticket-checksum']
2208 self
.check_rep_padata(kdc_exchange_dict
,
2210 fast_response
['padata'],
2213 ticket_private
= None
2214 if ticket_decryption_key
is not None:
2215 self
.assertElementEqual(ticket_encpart
, 'etype',
2216 ticket_decryption_key
.etype
)
2217 self
.assertElementKVNO(ticket_encpart
, 'kvno',
2218 ticket_decryption_key
.kvno
)
2219 ticket_decpart
= ticket_decryption_key
.decrypt(KU_TICKET
,
2221 ticket_private
= self
.der_decode(
2223 asn1Spec
=krb5_asn1
.EncTicketPart())
2225 encpart_private
= None
2226 self
.assertIsNotNone(encpart_decryption_key
)
2227 if encpart_decryption_key
is not None:
2228 self
.assertElementEqual(encpart
, 'etype',
2229 encpart_decryption_key
.etype
)
2230 if self
.strict_checking
:
2231 self
.assertElementKVNO(encpart
, 'kvno',
2232 encpart_decryption_key
.kvno
)
2233 rep_decpart
= encpart_decryption_key
.decrypt(
2234 encpart_decryption_usage
,
2236 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2237 # application tag 26
2239 encpart_private
= self
.der_decode(
2241 asn1Spec
=rep_encpart_asn1Spec())
2243 encpart_private
= self
.der_decode(
2245 asn1Spec
=krb5_asn1
.EncTGSRepPart())
2247 self
.assertIsNotNone(check_kdc_private_fn
)
2248 if check_kdc_private_fn
is not None:
2249 check_kdc_private_fn(kdc_exchange_dict
, callback_dict
,
2250 rep
, ticket_private
, encpart_private
,
2255 def check_fx_fast_data(self
,
2260 expect_strengthen_key
=True):
2261 fx_fast_data
= self
.der_decode(fx_fast_data
,
2262 asn1Spec
=krb5_asn1
.PA_FX_FAST_REPLY())
2264 enc_fast_rep
= fx_fast_data
['armored-data']['enc-fast-rep']
2265 self
.assertEqual(enc_fast_rep
['etype'], armor_key
.etype
)
2267 fast_rep
= armor_key
.decrypt(KU_FAST_REP
, enc_fast_rep
['cipher'])
2269 fast_response
= self
.der_decode(fast_rep
,
2270 asn1Spec
=krb5_asn1
.KrbFastResponse())
2272 if expect_strengthen_key
and self
.strict_checking
:
2273 self
.assertIn('strengthen-key', fast_response
)
2276 self
.assertIn('finished', fast_response
)
2278 # Ensure that the nonce matches the nonce in the body of the request
2280 nonce
= kdc_exchange_dict
['nonce']
2281 self
.assertEqual(nonce
, fast_response
['nonce'])
2283 return fast_response
2285 def generic_check_kdc_private(self
,
2292 kdc_options
= kdc_exchange_dict
['kdc_options']
2293 canon_pos
= len(tuple(krb5_asn1
.KDCOptions('canonicalize'))) - 1
2294 canonicalize
= (canon_pos
< len(kdc_options
)
2295 and kdc_options
[canon_pos
] == '1')
2296 renewable_pos
= len(tuple(krb5_asn1
.KDCOptions('renewable'))) - 1
2297 renewable
= (renewable_pos
< len(kdc_options
)
2298 and kdc_options
[renewable_pos
] == '1')
2300 expected_crealm
= kdc_exchange_dict
['expected_crealm']
2301 expected_cname
= kdc_exchange_dict
['expected_cname']
2302 expected_srealm
= kdc_exchange_dict
['expected_srealm']
2303 expected_sname
= kdc_exchange_dict
['expected_sname']
2304 ticket_decryption_key
= kdc_exchange_dict
['ticket_decryption_key']
2306 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
2308 expected_flags
= kdc_exchange_dict
.get('expected_flags')
2309 unexpected_flags
= kdc_exchange_dict
.get('unexpected_flags')
2311 ticket
= self
.getElementValue(rep
, 'ticket')
2313 if ticket_checksum
is not None:
2314 armor_key
= kdc_exchange_dict
['armor_key']
2315 self
.verify_ticket_checksum(ticket
, ticket_checksum
, armor_key
)
2317 to_rodc
= kdc_exchange_dict
['to_rodc']
2319 krbtgt_creds
= self
.get_rodc_krbtgt_creds()
2321 krbtgt_creds
= self
.get_krbtgt_creds()
2322 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
2324 expect_pac
= kdc_exchange_dict
['expect_pac']
2326 ticket_session_key
= None
2327 if ticket_private
is not None:
2328 self
.assertElementFlags(ticket_private
, 'flags',
2331 self
.assertElementPresent(ticket_private
, 'key')
2332 ticket_key
= self
.getElementValue(ticket_private
, 'key')
2333 self
.assertIsNotNone(ticket_key
)
2334 if ticket_key
is not None: # Never None, but gives indentation
2335 self
.assertElementPresent(ticket_key
, 'keytype')
2336 self
.assertElementPresent(ticket_key
, 'keyvalue')
2337 ticket_session_key
= self
.EncryptionKey_import(ticket_key
)
2338 self
.assertElementEqualUTF8(ticket_private
, 'crealm',
2340 if self
.strict_checking
:
2341 self
.assertElementEqualPrincipal(ticket_private
, 'cname',
2343 self
.assertElementPresent(ticket_private
, 'transited')
2344 self
.assertElementPresent(ticket_private
, 'authtime')
2345 if self
.strict_checking
:
2346 self
.assertElementPresent(ticket_private
, 'starttime')
2347 self
.assertElementPresent(ticket_private
, 'endtime')
2349 if self
.strict_checking
:
2350 self
.assertElementPresent(ticket_private
, 'renew-till')
2352 self
.assertElementMissing(ticket_private
, 'renew-till')
2353 if self
.strict_checking
:
2354 self
.assertElementEqual(ticket_private
, 'caddr', [])
2355 self
.assertElementPresent(ticket_private
, 'authorization-data',
2356 expect_empty
=not expect_pac
)
2359 authorization_data
= self
.getElementValue(ticket_private
,
2360 'authorization-data')
2361 pac_data
= self
.get_pac(authorization_data
)
2363 self
.check_pac_buffers(pac_data
, kdc_exchange_dict
)
2365 encpart_session_key
= None
2366 if encpart_private
is not None:
2367 self
.assertElementPresent(encpart_private
, 'key')
2368 encpart_key
= self
.getElementValue(encpart_private
, 'key')
2369 self
.assertIsNotNone(encpart_key
)
2370 if encpart_key
is not None: # Never None, but gives indentation
2371 self
.assertElementPresent(encpart_key
, 'keytype')
2372 self
.assertElementPresent(encpart_key
, 'keyvalue')
2373 encpart_session_key
= self
.EncryptionKey_import(encpart_key
)
2374 self
.assertElementPresent(encpart_private
, 'last-req')
2375 self
.assertElementEqual(encpart_private
, 'nonce',
2376 kdc_exchange_dict
['nonce'])
2377 if rep_msg_type
== KRB_AS_REP
:
2378 if self
.strict_checking
:
2379 self
.assertElementPresent(encpart_private
,
2382 self
.assertElementMissing(encpart_private
,
2384 self
.assertElementFlags(encpart_private
, 'flags',
2387 self
.assertElementPresent(encpart_private
, 'authtime')
2388 if self
.strict_checking
:
2389 self
.assertElementPresent(encpart_private
, 'starttime')
2390 self
.assertElementPresent(encpart_private
, 'endtime')
2392 if self
.strict_checking
:
2393 self
.assertElementPresent(encpart_private
, 'renew-till')
2395 self
.assertElementMissing(encpart_private
, 'renew-till')
2396 self
.assertElementEqualUTF8(encpart_private
, 'srealm',
2398 self
.assertElementEqualPrincipal(encpart_private
, 'sname',
2400 if self
.strict_checking
:
2401 self
.assertElementEqual(encpart_private
, 'caddr', [])
2403 sent_pac_options
= self
.get_sent_pac_options(kdc_exchange_dict
)
2405 if self
.strict_checking
:
2406 if canonicalize
or '1' in sent_pac_options
:
2407 self
.assertElementPresent(encpart_private
,
2408 'encrypted-pa-data')
2409 enc_pa_dict
= self
.get_pa_dict(
2410 encpart_private
['encrypted-pa-data'])
2412 self
.assertIn(PADATA_SUPPORTED_ETYPES
, enc_pa_dict
)
2414 expected_supported_etypes
= kdc_exchange_dict
[
2415 'expected_supported_etypes']
2416 expected_supported_etypes |
= (
2417 security
.KERB_ENCTYPE_DES_CBC_CRC |
2418 security
.KERB_ENCTYPE_DES_CBC_MD5 |
2419 security
.KERB_ENCTYPE_RC4_HMAC_MD5
)
2421 (supported_etypes
,) = struct
.unpack(
2423 enc_pa_dict
[PADATA_SUPPORTED_ETYPES
])
2425 self
.assertEqual(supported_etypes
,
2426 expected_supported_etypes
)
2428 self
.assertNotIn(PADATA_SUPPORTED_ETYPES
, enc_pa_dict
)
2430 if '1' in sent_pac_options
:
2431 self
.assertIn(PADATA_PAC_OPTIONS
, enc_pa_dict
)
2433 pac_options
= self
.der_decode(
2434 enc_pa_dict
[PADATA_PAC_OPTIONS
],
2435 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
2437 self
.assertElementEqual(pac_options
, 'options',
2440 self
.assertNotIn(PADATA_PAC_OPTIONS
, enc_pa_dict
)
2442 self
.assertElementEqual(encpart_private
,
2443 'encrypted-pa-data',
2446 if ticket_session_key
is not None and encpart_session_key
is not None:
2447 self
.assertEqual(ticket_session_key
.etype
,
2448 encpart_session_key
.etype
)
2449 self
.assertEqual(ticket_session_key
.key
.contents
,
2450 encpart_session_key
.key
.contents
)
2451 if encpart_session_key
is not None:
2452 session_key
= encpart_session_key
2454 session_key
= ticket_session_key
2455 ticket_creds
= KerberosTicketCreds(
2458 crealm
=expected_crealm
,
2459 cname
=expected_cname
,
2460 srealm
=expected_srealm
,
2461 sname
=expected_sname
,
2462 decryption_key
=ticket_decryption_key
,
2463 ticket_private
=ticket_private
,
2464 encpart_private
=encpart_private
)
2466 # TODO: This parameter should be removed when all service tickets are
2467 # issued with ticket checksums.
2468 expect_ticket_checksum
= kdc_exchange_dict
['expect_ticket_checksum']
2469 if expect_ticket_checksum
:
2470 self
.assertIsNotNone(ticket_decryption_key
)
2472 if ticket_decryption_key
is not None:
2473 self
.verify_ticket(ticket_creds
, krbtgt_key
, expect_pac
=expect_pac
,
2474 expect_ticket_checksum
=expect_ticket_checksum
)
2476 kdc_exchange_dict
['rep_ticket_creds'] = ticket_creds
2478 def check_pac_buffers(self
, pac_data
, kdc_exchange_dict
):
2479 pac
= ndr_unpack(krb5pac
.PAC_DATA
, pac_data
)
2481 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
2482 armor_tgt
= kdc_exchange_dict
['armor_tgt']
2484 expected_sname
= kdc_exchange_dict
['expected_sname']
2485 expect_claims
= kdc_exchange_dict
['expect_claims']
2487 expected_types
= [krb5pac
.PAC_TYPE_LOGON_INFO
,
2488 krb5pac
.PAC_TYPE_SRV_CHECKSUM
,
2489 krb5pac
.PAC_TYPE_KDC_CHECKSUM
,
2490 krb5pac
.PAC_TYPE_LOGON_NAME
,
2491 krb5pac
.PAC_TYPE_UPN_DNS_INFO
]
2493 kdc_options
= kdc_exchange_dict
['kdc_options']
2494 pos
= len(tuple(krb5_asn1
.KDCOptions('cname-in-addl-tkt'))) - 1
2495 constrained_delegation
= (pos
< len(kdc_options
)
2496 and kdc_options
[pos
] == '1')
2497 if constrained_delegation
:
2498 expected_types
.append(krb5pac
.PAC_TYPE_CONSTRAINED_DELEGATION
)
2500 if self
.kdc_fast_support
:
2502 expected_types
.append(krb5pac
.PAC_TYPE_CLIENT_CLAIMS_INFO
)
2504 if (rep_msg_type
== KRB_TGS_REP
2505 and armor_tgt
is not None):
2506 expected_types
.append(krb5pac
.PAC_TYPE_DEVICE_INFO
)
2507 expected_types
.append(krb5pac
.PAC_TYPE_DEVICE_CLAIMS_INFO
)
2509 if not self
.is_tgs(expected_sname
):
2510 expected_types
.append(krb5pac
.PAC_TYPE_TICKET_CHECKSUM
)
2512 if self
.strict_checking
:
2513 buffer_types
= [pac_buffer
.type
2514 for pac_buffer
in pac
.buffers
]
2515 self
.assertCountEqual(expected_types
, buffer_types
,
2516 f
'expected: {expected_types} '
2517 f
'got: {buffer_types}')
2519 for pac_buffer
in pac
.buffers
:
2520 if pac_buffer
.type == krb5pac
.PAC_TYPE_CONSTRAINED_DELEGATION
:
2521 expected_proxy_target
= kdc_exchange_dict
[
2522 'expected_proxy_target']
2523 expected_transited_services
= kdc_exchange_dict
[
2524 'expected_transited_services']
2526 delegation_info
= pac_buffer
.info
.info
2528 self
.assertEqual(expected_proxy_target
,
2529 str(delegation_info
.proxy_target
))
2531 transited_services
= list(map(
2532 str, delegation_info
.transited_services
))
2533 self
.assertEqual(expected_transited_services
,
2536 def generic_check_kdc_error(self
,
2542 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
2544 expected_anon
= kdc_exchange_dict
['expected_anon']
2545 expected_srealm
= kdc_exchange_dict
['expected_srealm']
2546 expected_sname
= kdc_exchange_dict
['expected_sname']
2547 expected_error_mode
= kdc_exchange_dict
['expected_error_mode']
2549 sent_fast
= self
.sent_fast(kdc_exchange_dict
)
2551 fast_armor_type
= kdc_exchange_dict
['fast_armor_type']
2553 self
.assertElementEqual(rep
, 'pvno', 5)
2554 self
.assertElementEqual(rep
, 'msg-type', KRB_ERROR
)
2555 error_code
= self
.getElementValue(rep
, 'error-code')
2556 self
.assertIn(error_code
, expected_error_mode
)
2557 if self
.strict_checking
:
2558 self
.assertElementMissing(rep
, 'ctime')
2559 self
.assertElementMissing(rep
, 'cusec')
2560 self
.assertElementPresent(rep
, 'stime')
2561 self
.assertElementPresent(rep
, 'susec')
2562 # error-code checked above
2563 if self
.strict_checking
:
2564 self
.assertElementMissing(rep
, 'crealm')
2565 if expected_anon
and not inner
:
2566 expected_cname
= self
.PrincipalName_create(
2567 name_type
=NT_WELLKNOWN
,
2568 names
=['WELLKNOWN', 'ANONYMOUS'])
2569 self
.assertElementEqualPrincipal(rep
, 'cname', expected_cname
)
2571 self
.assertElementMissing(rep
, 'cname')
2572 self
.assertElementEqualUTF8(rep
, 'realm', expected_srealm
)
2573 self
.assertElementEqualPrincipal(rep
, 'sname', expected_sname
)
2574 self
.assertElementMissing(rep
, 'e-text')
2575 expected_status
= kdc_exchange_dict
['expected_status']
2576 expect_edata
= kdc_exchange_dict
['expect_edata']
2577 if expect_edata
is None:
2578 expect_edata
= (error_code
!= KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
2579 and (not sent_fast
or fast_armor_type
is None
2580 or fast_armor_type
== FX_FAST_ARMOR_AP_REQUEST
)
2582 if not expect_edata
:
2583 self
.assertIsNone(expected_status
)
2584 self
.assertElementMissing(rep
, 'e-data')
2586 edata
= self
.getElementValue(rep
, 'e-data')
2587 if self
.strict_checking
:
2588 self
.assertIsNotNone(edata
)
2589 if edata
is not None:
2590 if rep_msg_type
== KRB_TGS_REP
and not sent_fast
:
2591 error_data
= self
.der_decode(
2593 asn1Spec
=krb5_asn1
.KERB_ERROR_DATA())
2594 self
.assertEqual(KERB_ERR_TYPE_EXTENDED
,
2595 error_data
['data-type'])
2597 extended_error
= error_data
['data-value']
2599 self
.assertEqual(12, len(extended_error
))
2601 status
= int.from_bytes(extended_error
[:4], 'little')
2602 flags
= int.from_bytes(extended_error
[8:], 'little')
2604 self
.assertEqual(expected_status
, status
)
2606 self
.assertEqual(3, flags
)
2608 self
.assertIsNone(expected_status
)
2610 rep_padata
= self
.der_decode(edata
,
2611 asn1Spec
=krb5_asn1
.METHOD_DATA())
2612 self
.assertGreater(len(rep_padata
), 0)
2615 self
.assertEqual(1, len(rep_padata
))
2616 rep_pa_dict
= self
.get_pa_dict(rep_padata
)
2617 self
.assertIn(PADATA_FX_FAST
, rep_pa_dict
)
2619 armor_key
= kdc_exchange_dict
['armor_key']
2620 self
.assertIsNotNone(armor_key
)
2621 fast_response
= self
.check_fx_fast_data(
2623 rep_pa_dict
[PADATA_FX_FAST
],
2625 expect_strengthen_key
=False)
2627 rep_padata
= fast_response
['padata']
2629 etype_info2
= self
.check_rep_padata(kdc_exchange_dict
,
2634 kdc_exchange_dict
['preauth_etype_info2'] = etype_info2
2638 def check_rep_padata(self
,
2643 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
2645 req_body
= kdc_exchange_dict
['req_body']
2646 proposed_etypes
= req_body
['etype']
2647 client_as_etypes
= kdc_exchange_dict
.get('client_as_etypes', [])
2649 sent_fast
= self
.sent_fast(kdc_exchange_dict
)
2650 sent_enc_challenge
= self
.sent_enc_challenge(kdc_exchange_dict
)
2652 if rep_msg_type
== KRB_TGS_REP
:
2653 self
.assertTrue(sent_fast
)
2655 expect_etype_info2
= ()
2656 expect_etype_info
= False
2657 unexpect_etype_info
= True
2658 expected_aes_type
= 0
2659 expected_rc4_type
= 0
2660 if kcrypto
.Enctype
.RC4
in proposed_etypes
:
2661 expect_etype_info
= True
2662 for etype
in proposed_etypes
:
2663 if etype
in (kcrypto
.Enctype
.AES256
, kcrypto
.Enctype
.AES128
):
2664 expect_etype_info
= False
2665 if etype
not in client_as_etypes
:
2667 if etype
in (kcrypto
.Enctype
.AES256
, kcrypto
.Enctype
.AES128
):
2668 if etype
> expected_aes_type
:
2669 expected_aes_type
= etype
2670 if etype
in (kcrypto
.Enctype
.RC4
,) and error_code
!= 0:
2671 unexpect_etype_info
= False
2672 if etype
> expected_rc4_type
:
2673 expected_rc4_type
= etype
2675 if expected_aes_type
!= 0:
2676 expect_etype_info2
+= (expected_aes_type
,)
2677 if expected_rc4_type
!= 0:
2678 expect_etype_info2
+= (expected_rc4_type
,)
2680 expected_patypes
= ()
2681 if sent_fast
and error_code
!= 0:
2682 expected_patypes
+= (PADATA_FX_ERROR
,)
2683 expected_patypes
+= (PADATA_FX_COOKIE
,)
2685 if rep_msg_type
== KRB_TGS_REP
:
2686 sent_pac_options
= self
.get_sent_pac_options(kdc_exchange_dict
)
2687 if ('1' in sent_pac_options
2688 and error_code
not in (0, KDC_ERR_GENERIC
)):
2689 expected_patypes
+= (PADATA_PAC_OPTIONS
,)
2690 elif error_code
!= KDC_ERR_GENERIC
:
2691 if expect_etype_info
:
2692 self
.assertGreater(len(expect_etype_info2
), 0)
2693 expected_patypes
+= (PADATA_ETYPE_INFO
,)
2694 if len(expect_etype_info2
) != 0:
2695 expected_patypes
+= (PADATA_ETYPE_INFO2
,)
2697 if error_code
!= KDC_ERR_PREAUTH_FAILED
:
2699 expected_patypes
+= (PADATA_ENCRYPTED_CHALLENGE
,)
2701 expected_patypes
+= (PADATA_ENC_TIMESTAMP
,)
2703 if not sent_enc_challenge
:
2704 expected_patypes
+= (PADATA_PK_AS_REQ
,)
2705 expected_patypes
+= (PADATA_PK_AS_REP_19
,)
2707 if (self
.kdc_fast_support
2709 and not sent_enc_challenge
):
2710 expected_patypes
+= (PADATA_FX_FAST
,)
2711 expected_patypes
+= (PADATA_FX_COOKIE
,)
2713 if self
.strict_checking
:
2714 for i
, patype
in enumerate(expected_patypes
):
2715 self
.assertElementEqual(rep_padata
[i
], 'padata-type', patype
)
2716 self
.assertEqual(len(rep_padata
), len(expected_patypes
))
2720 enc_timestamp
= None
2721 enc_challenge
= None
2728 for pa
in rep_padata
:
2729 patype
= self
.getElementValue(pa
, 'padata-type')
2730 pavalue
= self
.getElementValue(pa
, 'padata-value')
2731 if patype
== PADATA_ETYPE_INFO2
:
2732 self
.assertIsNone(etype_info2
)
2733 etype_info2
= self
.der_decode(pavalue
,
2734 asn1Spec
=krb5_asn1
.ETYPE_INFO2())
2736 if patype
== PADATA_ETYPE_INFO
:
2737 self
.assertIsNone(etype_info
)
2738 etype_info
= self
.der_decode(pavalue
,
2739 asn1Spec
=krb5_asn1
.ETYPE_INFO())
2741 if patype
== PADATA_ENC_TIMESTAMP
:
2742 self
.assertIsNone(enc_timestamp
)
2743 enc_timestamp
= pavalue
2744 self
.assertEqual(len(enc_timestamp
), 0)
2746 if patype
== PADATA_ENCRYPTED_CHALLENGE
:
2747 self
.assertIsNone(enc_challenge
)
2748 enc_challenge
= pavalue
2750 if patype
== PADATA_PK_AS_REQ
:
2751 self
.assertIsNone(pk_as_req
)
2753 self
.assertEqual(len(pk_as_req
), 0)
2755 if patype
== PADATA_PK_AS_REP_19
:
2756 self
.assertIsNone(pk_as_rep19
)
2757 pk_as_rep19
= pavalue
2758 self
.assertEqual(len(pk_as_rep19
), 0)
2760 if patype
== PADATA_FX_COOKIE
:
2761 self
.assertIsNone(fast_cookie
)
2762 fast_cookie
= pavalue
2763 self
.assertIsNotNone(fast_cookie
)
2765 if patype
== PADATA_FX_ERROR
:
2766 self
.assertIsNone(fast_error
)
2767 fast_error
= pavalue
2768 self
.assertIsNotNone(fast_error
)
2770 if patype
== PADATA_FX_FAST
:
2771 self
.assertIsNone(fx_fast
)
2773 self
.assertEqual(len(fx_fast
), 0)
2775 if patype
== PADATA_PAC_OPTIONS
:
2776 self
.assertIsNone(pac_options
)
2777 pac_options
= self
.der_decode(
2779 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
2782 if fast_cookie
is not None:
2783 kdc_exchange_dict
['fast_cookie'] = fast_cookie
2785 if fast_error
is not None:
2786 fast_error
= self
.der_decode(fast_error
,
2787 asn1Spec
=krb5_asn1
.KRB_ERROR())
2788 self
.generic_check_kdc_error(kdc_exchange_dict
,
2793 if pac_options
is not None:
2794 self
.assertElementEqual(pac_options
, 'options', sent_pac_options
)
2796 if enc_challenge
is not None:
2797 if not sent_enc_challenge
:
2798 self
.assertEqual(len(enc_challenge
), 0)
2800 armor_key
= kdc_exchange_dict
['armor_key']
2801 self
.assertIsNotNone(armor_key
)
2803 preauth_key
, _
= self
.get_preauth_key(kdc_exchange_dict
)
2805 kdc_challenge_key
= self
.generate_kdc_challenge_key(
2806 armor_key
, preauth_key
)
2808 # Ensure that the encrypted challenge FAST factor is supported
2810 if self
.strict_checking
:
2811 self
.assertNotEqual(len(enc_challenge
), 0)
2812 if len(enc_challenge
) != 0:
2813 encrypted_challenge
= self
.der_decode(
2815 asn1Spec
=krb5_asn1
.EncryptedData())
2816 self
.assertEqual(encrypted_challenge
['etype'],
2817 kdc_challenge_key
.etype
)
2819 challenge
= kdc_challenge_key
.decrypt(
2820 KU_ENC_CHALLENGE_KDC
,
2821 encrypted_challenge
['cipher'])
2822 challenge
= self
.der_decode(
2824 asn1Spec
=krb5_asn1
.PA_ENC_TS_ENC())
2826 # Retrieve the returned timestamp.
2827 rep_patime
= challenge
['patimestamp']
2828 self
.assertIn('pausec', challenge
)
2830 # Ensure the returned time is within five minutes of the
2832 rep_time
= self
.get_EpochFromKerberosTime(rep_patime
)
2833 current_time
= time
.time()
2835 self
.assertLess(current_time
- 300, rep_time
)
2836 self
.assertLess(rep_time
, current_time
+ 300)
2838 if all(etype
not in client_as_etypes
or etype
not in proposed_etypes
2839 for etype
in (kcrypto
.Enctype
.AES256
,
2840 kcrypto
.Enctype
.AES128
,
2841 kcrypto
.Enctype
.RC4
)):
2842 self
.assertIsNone(etype_info2
)
2843 self
.assertIsNone(etype_info
)
2844 if rep_msg_type
== KRB_AS_REP
:
2845 if self
.strict_checking
:
2847 self
.assertIsNotNone(enc_challenge
)
2848 self
.assertIsNone(enc_timestamp
)
2850 self
.assertIsNotNone(enc_timestamp
)
2851 self
.assertIsNone(enc_challenge
)
2852 self
.assertIsNotNone(pk_as_req
)
2853 self
.assertIsNotNone(pk_as_rep19
)
2855 self
.assertIsNone(enc_timestamp
)
2856 self
.assertIsNone(enc_challenge
)
2857 self
.assertIsNone(pk_as_req
)
2858 self
.assertIsNone(pk_as_rep19
)
2861 if error_code
!= KDC_ERR_GENERIC
:
2862 if self
.strict_checking
:
2863 self
.assertIsNotNone(etype_info2
)
2865 self
.assertIsNone(etype_info2
)
2866 if expect_etype_info
:
2867 self
.assertIsNotNone(etype_info
)
2869 if self
.strict_checking
:
2870 self
.assertIsNone(etype_info
)
2871 if unexpect_etype_info
:
2872 self
.assertIsNone(etype_info
)
2874 if error_code
!= KDC_ERR_GENERIC
and self
.strict_checking
:
2875 self
.assertGreaterEqual(len(etype_info2
), 1)
2876 self
.assertEqual(len(etype_info2
), len(expect_etype_info2
))
2877 for i
in range(0, len(etype_info2
)):
2878 e
= self
.getElementValue(etype_info2
[i
], 'etype')
2879 self
.assertEqual(e
, expect_etype_info2
[i
])
2880 salt
= self
.getElementValue(etype_info2
[i
], 'salt')
2881 if e
== kcrypto
.Enctype
.RC4
:
2882 self
.assertIsNone(salt
)
2884 self
.assertIsNotNone(salt
)
2885 expected_salt
= kdc_exchange_dict
['expected_salt']
2886 if expected_salt
is not None:
2887 self
.assertEqual(salt
, expected_salt
)
2888 s2kparams
= self
.getElementValue(etype_info2
[i
], 's2kparams')
2889 if self
.strict_checking
:
2890 self
.assertIsNone(s2kparams
)
2891 if etype_info
is not None:
2892 self
.assertEqual(len(etype_info
), 1)
2893 e
= self
.getElementValue(etype_info
[0], 'etype')
2894 self
.assertEqual(e
, kcrypto
.Enctype
.RC4
)
2895 self
.assertEqual(e
, expect_etype_info2
[0])
2896 salt
= self
.getElementValue(etype_info
[0], 'salt')
2897 if self
.strict_checking
:
2898 self
.assertIsNotNone(salt
)
2899 self
.assertEqual(len(salt
), 0)
2901 if error_code
not in (KDC_ERR_PREAUTH_FAILED
,
2904 self
.assertIsNotNone(enc_challenge
)
2905 if self
.strict_checking
:
2906 self
.assertIsNone(enc_timestamp
)
2908 self
.assertIsNotNone(enc_timestamp
)
2909 if self
.strict_checking
:
2910 self
.assertIsNone(enc_challenge
)
2911 if not sent_enc_challenge
:
2912 if self
.strict_checking
:
2913 self
.assertIsNotNone(pk_as_req
)
2914 self
.assertIsNotNone(pk_as_rep19
)
2916 self
.assertIsNone(pk_as_req
)
2917 self
.assertIsNone(pk_as_rep19
)
2919 if self
.strict_checking
:
2920 self
.assertIsNone(enc_timestamp
)
2921 self
.assertIsNone(enc_challenge
)
2922 self
.assertIsNone(pk_as_req
)
2923 self
.assertIsNone(pk_as_rep19
)
2927 def generate_simple_fast(self
,
2935 armor_key
= kdc_exchange_dict
['armor_key']
2937 fast_req
= self
.KRB_FAST_REQ_create(fast_options
,
2940 fast_req
= self
.der_encode(fast_req
,
2941 asn1Spec
=krb5_asn1
.KrbFastReq())
2942 fast_req
= self
.EncryptedData_create(armor_key
,
2946 fast_armored_req
= self
.KRB_FAST_ARMORED_REQ_create(fast_armor
,
2950 fx_fast_request
= self
.PA_FX_FAST_REQUEST_create(fast_armored_req
)
2951 fx_fast_request
= self
.der_encode(
2953 asn1Spec
=krb5_asn1
.PA_FX_FAST_REQUEST())
2955 fast_padata
= self
.PA_DATA_create(PADATA_FX_FAST
,
2960 def generate_ap_req(self
,
2966 tgt
= kdc_exchange_dict
['armor_tgt']
2967 authenticator_subkey
= kdc_exchange_dict
['armor_subkey']
2969 req_body_checksum
= None
2971 tgt
= kdc_exchange_dict
['tgt']
2972 authenticator_subkey
= kdc_exchange_dict
['authenticator_subkey']
2973 body_checksum_type
= kdc_exchange_dict
['body_checksum_type']
2975 req_body_blob
= self
.der_encode(req_body
,
2976 asn1Spec
=krb5_asn1
.KDC_REQ_BODY())
2978 req_body_checksum
= self
.Checksum_create(tgt
.session_key
,
2979 KU_TGS_REQ_AUTH_CKSUM
,
2981 ctype
=body_checksum_type
)
2983 auth_data
= kdc_exchange_dict
['auth_data']
2986 if authenticator_subkey
is not None:
2987 subkey_obj
= authenticator_subkey
.export_obj()
2988 seq_number
= random
.randint(0, 0xfffffffe)
2989 (ctime
, cusec
) = self
.get_KerberosTimeWithUsec()
2990 authenticator_obj
= self
.Authenticator_create(
2993 cksum
=req_body_checksum
,
2997 seq_number
=seq_number
,
2998 authorization_data
=auth_data
)
2999 authenticator_blob
= self
.der_encode(
3001 asn1Spec
=krb5_asn1
.Authenticator())
3003 usage
= KU_AP_REQ_AUTH
if armor
else KU_TGS_REQ_AUTH
3004 authenticator
= self
.EncryptedData_create(tgt
.session_key
,
3008 ap_options
= krb5_asn1
.APOptions('0')
3009 ap_req_obj
= self
.AP_REQ_create(ap_options
=str(ap_options
),
3011 authenticator
=authenticator
)
3012 ap_req
= self
.der_encode(ap_req_obj
, asn1Spec
=krb5_asn1
.AP_REQ())
3016 def generate_simple_tgs_padata(self
,
3020 ap_req
= self
.generate_ap_req(kdc_exchange_dict
,
3024 pa_tgs_req
= self
.PA_DATA_create(PADATA_KDC_REQ
, ap_req
)
3025 padata
= [pa_tgs_req
]
3027 return padata
, req_body
3029 def get_preauth_key(self
, kdc_exchange_dict
):
3030 msg_type
= kdc_exchange_dict
['rep_msg_type']
3032 if msg_type
== KRB_AS_REP
:
3033 key
= kdc_exchange_dict
['preauth_key']
3034 usage
= KU_AS_REP_ENC_PART
3036 authenticator_subkey
= kdc_exchange_dict
['authenticator_subkey']
3037 if authenticator_subkey
is not None:
3038 key
= authenticator_subkey
3039 usage
= KU_TGS_REP_ENC_PART_SUB_KEY
3041 tgt
= kdc_exchange_dict
['tgt']
3042 key
= tgt
.session_key
3043 usage
= KU_TGS_REP_ENC_PART_SESSION
3045 self
.assertIsNotNone(key
)
3049 def generate_armor_key(self
, subkey
, session_key
):
3050 armor_key
= kcrypto
.cf2(subkey
.key
,
3054 armor_key
= Krb5EncryptionKey(armor_key
, None)
3058 def generate_strengthen_reply_key(self
, strengthen_key
, reply_key
):
3059 strengthen_reply_key
= kcrypto
.cf2(strengthen_key
.key
,
3063 strengthen_reply_key
= Krb5EncryptionKey(strengthen_reply_key
,
3066 return strengthen_reply_key
3068 def generate_client_challenge_key(self
, armor_key
, longterm_key
):
3069 client_challenge_key
= kcrypto
.cf2(armor_key
.key
,
3071 b
'clientchallengearmor',
3072 b
'challengelongterm')
3073 client_challenge_key
= Krb5EncryptionKey(client_challenge_key
, None)
3075 return client_challenge_key
3077 def generate_kdc_challenge_key(self
, armor_key
, longterm_key
):
3078 kdc_challenge_key
= kcrypto
.cf2(armor_key
.key
,
3080 b
'kdcchallengearmor',
3081 b
'challengelongterm')
3082 kdc_challenge_key
= Krb5EncryptionKey(kdc_challenge_key
, None)
3084 return kdc_challenge_key
3086 def verify_ticket_checksum(self
, ticket
, expected_checksum
, armor_key
):
3087 expected_type
= expected_checksum
['cksumtype']
3088 self
.assertEqual(armor_key
.ctype
, expected_type
)
3090 ticket_blob
= self
.der_encode(ticket
,
3091 asn1Spec
=krb5_asn1
.Ticket())
3092 checksum
= self
.Checksum_create(armor_key
,
3095 self
.assertEqual(expected_checksum
, checksum
)
3097 def verify_ticket(self
, ticket
, krbtgt_key
, expect_pac
=True,
3098 expect_ticket_checksum
=True):
3099 # Check if the ticket is a TGT.
3100 sname
= ticket
.ticket
['sname']
3101 is_tgt
= self
.is_tgs(sname
)
3103 # Decrypt the ticket.
3105 key
= ticket
.decryption_key
3106 enc_part
= ticket
.ticket
['enc-part']
3108 self
.assertElementEqual(enc_part
, 'etype', key
.etype
)
3109 self
.assertElementKVNO(enc_part
, 'kvno', key
.kvno
)
3111 enc_part
= key
.decrypt(KU_TICKET
, enc_part
['cipher'])
3112 enc_part
= self
.der_decode(
3113 enc_part
, asn1Spec
=krb5_asn1
.EncTicketPart())
3115 # Fetch the authorization data from the ticket.
3116 auth_data
= enc_part
.get('authorization-data')
3118 self
.assertIsNotNone(auth_data
)
3119 elif auth_data
is None:
3122 # Get a copy of the authdata with an empty PAC, and the existing PAC
3124 empty_pac
= self
.get_empty_pac()
3125 auth_data
, pac_data
= self
.replace_pac(auth_data
,
3127 expect_pac
=expect_pac
)
3131 # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
3132 # raw type to create a new PAC with zeroed signatures for
3133 # verification. This is because on Windows, the resource_groups field
3134 # is added to PAC_LOGON_INFO after the info3 field has been created,
3135 # which results in a different ordering of pointer values than Samba
3136 # (see commit 0e201ecdc53). Using the raw type avoids changing
3137 # PAC_LOGON_INFO, so verification against Windows can work. We still
3138 # need the PAC_DATA type to retrieve the actual checksums, because the
3139 # signatures in the raw type may contain padding bytes.
3140 pac
= ndr_unpack(krb5pac
.PAC_DATA
,
3142 raw_pac
= ndr_unpack(krb5pac
.PAC_DATA_RAW
,
3147 for pac_buffer
, raw_pac_buffer
in zip(pac
.buffers
, raw_pac
.buffers
):
3148 buffer_type
= pac_buffer
.type
3149 if buffer_type
in self
.pac_checksum_types
:
3150 self
.assertNotIn(buffer_type
, checksums
,
3151 f
'Duplicate checksum type {buffer_type}')
3153 # Fetch the checksum and the checksum type from the PAC buffer.
3154 checksum
= pac_buffer
.info
.signature
3155 ctype
= pac_buffer
.info
.type
3159 checksums
[buffer_type
] = checksum
, ctype
3161 if buffer_type
!= krb5pac
.PAC_TYPE_TICKET_CHECKSUM
:
3162 # Zero the checksum field so that we can later verify the
3163 # checksums. The ticket checksum field is not zeroed.
3165 signature
= ndr_unpack(
3166 krb5pac
.PAC_SIGNATURE_DATA
,
3167 raw_pac_buffer
.info
.remaining
)
3168 signature
.signature
= bytes(len(checksum
))
3169 raw_pac_buffer
.info
.remaining
= ndr_pack(
3172 # Re-encode the PAC.
3173 pac_data
= ndr_pack(raw_pac
)
3175 # Verify the signatures.
3177 server_checksum
, server_ctype
= checksums
[
3178 krb5pac
.PAC_TYPE_SRV_CHECKSUM
]
3179 key
.verify_checksum(KU_NON_KERB_CKSUM_SALT
,
3184 kdc_checksum
, kdc_ctype
= checksums
[
3185 krb5pac
.PAC_TYPE_KDC_CHECKSUM
]
3186 krbtgt_key
.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT
,
3192 self
.assertNotIn(krb5pac
.PAC_TYPE_TICKET_CHECKSUM
, checksums
)
3194 ticket_checksum
, ticket_ctype
= checksums
.get(
3195 krb5pac
.PAC_TYPE_TICKET_CHECKSUM
,
3197 if expect_ticket_checksum
:
3198 self
.assertIsNotNone(ticket_checksum
)
3199 elif expect_ticket_checksum
is False:
3200 self
.assertIsNone(ticket_checksum
)
3201 if ticket_checksum
is not None:
3202 enc_part
['authorization-data'] = auth_data
3203 enc_part
= self
.der_encode(enc_part
,
3204 asn1Spec
=krb5_asn1
.EncTicketPart())
3206 krbtgt_key
.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT
,
3211 def modified_ticket(self
,
3213 new_ticket_key
=None,
3217 update_pac_checksums
=True,
3219 include_checksums
=None):
3220 if checksum_keys
is None:
3221 # A dict containing a key for each checksum type to be created in
3225 if include_checksums
is None:
3226 # A dict containing a value for each checksum type; True if the
3227 # checksum type is to be included in the PAC, False if it is to be
3228 # excluded, or None/not present if the checksum is to be included
3229 # based on its presence in the original PAC.
3230 include_checksums
= {}
3232 # Check that the values passed in by the caller make sense.
3234 self
.assertLessEqual(checksum_keys
.keys(), self
.pac_checksum_types
)
3235 self
.assertLessEqual(include_checksums
.keys(), self
.pac_checksum_types
)
3238 self
.assertIsNone(modify_pac_fn
)
3240 update_pac_checksums
= False
3242 if not update_pac_checksums
:
3243 self
.assertFalse(checksum_keys
)
3244 self
.assertFalse(include_checksums
)
3246 expect_pac
= update_pac_checksums
or modify_pac_fn
is not None
3248 key
= ticket
.decryption_key
3250 if new_ticket_key
is None:
3251 # Use the same key to re-encrypt the ticket.
3252 new_ticket_key
= key
3254 if krb5pac
.PAC_TYPE_SRV_CHECKSUM
not in checksum_keys
:
3255 # If the server signature key is not present, fall back to the key
3256 # used to encrypt the ticket.
3257 checksum_keys
[krb5pac
.PAC_TYPE_SRV_CHECKSUM
] = new_ticket_key
3259 if krb5pac
.PAC_TYPE_TICKET_CHECKSUM
not in checksum_keys
:
3260 # If the ticket signature key is not present, fall back to the key
3261 # used for the KDC signature.
3262 kdc_checksum_key
= checksum_keys
.get(krb5pac
.PAC_TYPE_KDC_CHECKSUM
)
3263 if kdc_checksum_key
is not None:
3264 checksum_keys
[krb5pac
.PAC_TYPE_TICKET_CHECKSUM
] = (
3267 # Decrypt the ticket.
3269 enc_part
= ticket
.ticket
['enc-part']
3271 self
.assertElementEqual(enc_part
, 'etype', key
.etype
)
3272 self
.assertElementKVNO(enc_part
, 'kvno', key
.kvno
)
3274 enc_part
= key
.decrypt(KU_TICKET
, enc_part
['cipher'])
3275 enc_part
= self
.der_decode(
3276 enc_part
, asn1Spec
=krb5_asn1
.EncTicketPart())
3278 # Modify the ticket here.
3279 if modify_fn
is not None:
3280 enc_part
= modify_fn(enc_part
)
3282 auth_data
= enc_part
.get('authorization-data')
3284 self
.assertIsNotNone(auth_data
)
3285 if auth_data
is not None:
3288 # Get a copy of the authdata with an empty PAC, and the
3289 # existing PAC (if present).
3290 empty_pac
= self
.get_empty_pac()
3291 empty_pac_auth_data
, pac_data
= self
.replace_pac(
3294 expect_pac
=expect_pac
)
3296 if pac_data
is not None:
3297 pac
= ndr_unpack(krb5pac
.PAC_DATA
, pac_data
)
3299 # Modify the PAC here.
3300 if modify_pac_fn
is not None:
3301 pac
= modify_pac_fn(pac
)
3303 if update_pac_checksums
:
3304 # Get the enc-part with an empty PAC, which is needed
3305 # to create a ticket signature.
3306 enc_part_to_sign
= enc_part
.copy()
3307 enc_part_to_sign
['authorization-data'] = (
3308 empty_pac_auth_data
)
3309 enc_part_to_sign
= self
.der_encode(
3311 asn1Spec
=krb5_asn1
.EncTicketPart())
3313 self
.update_pac_checksums(pac
,
3318 # Re-encode the PAC.
3319 pac_data
= ndr_pack(pac
)
3320 new_pac
= self
.AuthorizationData_create(AD_WIN2K_PAC
,
3323 # Replace the PAC in the authorization data and re-add it to the
3325 auth_data
, _
= self
.replace_pac(auth_data
, new_pac
,
3326 expect_pac
=expect_pac
)
3327 enc_part
['authorization-data'] = auth_data
3329 # Re-encrypt the ticket enc-part with the new key.
3330 enc_part_new
= self
.der_encode(enc_part
,
3331 asn1Spec
=krb5_asn1
.EncTicketPart())
3332 enc_part_new
= self
.EncryptedData_create(new_ticket_key
,
3336 # Create a copy of the ticket with the new enc-part.
3337 new_ticket
= ticket
.ticket
.copy()
3338 new_ticket
['enc-part'] = enc_part_new
3340 new_ticket_creds
= KerberosTicketCreds(
3342 session_key
=ticket
.session_key
,
3343 crealm
=ticket
.crealm
,
3345 srealm
=ticket
.srealm
,
3347 decryption_key
=new_ticket_key
,
3348 ticket_private
=enc_part
,
3349 encpart_private
=ticket
.encpart_private
)
3351 return new_ticket_creds
3353 def update_pac_checksums(self
,
3358 pac_buffers
= pac
.buffers
3359 checksum_buffers
= {}
3361 # Find the relevant PAC checksum buffers.
3362 for pac_buffer
in pac_buffers
:
3363 buffer_type
= pac_buffer
.type
3364 if buffer_type
in self
.pac_checksum_types
:
3365 self
.assertNotIn(buffer_type
, checksum_buffers
,
3366 f
'Duplicate checksum type {buffer_type}')
3368 checksum_buffers
[buffer_type
] = pac_buffer
3370 # Create any additional buffers that were requested but not
3371 # present. Conversely, remove any buffers that were requested to be
3373 for buffer_type
in self
.pac_checksum_types
:
3374 if buffer_type
in checksum_buffers
:
3375 if include_checksums
.get(buffer_type
) is False:
3376 checksum_buffer
= checksum_buffers
.pop(buffer_type
)
3378 pac
.num_buffers
-= 1
3379 pac_buffers
.remove(checksum_buffer
)
3381 elif include_checksums
.get(buffer_type
) is True:
3382 info
= krb5pac
.PAC_SIGNATURE_DATA()
3384 checksum_buffer
= krb5pac
.PAC_BUFFER()
3385 checksum_buffer
.type = buffer_type
3386 checksum_buffer
.info
= info
3388 pac_buffers
.append(checksum_buffer
)
3389 pac
.num_buffers
+= 1
3391 checksum_buffers
[buffer_type
] = checksum_buffer
3393 # Fill the relevant checksum buffers.
3394 for buffer_type
, checksum_buffer
in checksum_buffers
.items():
3395 checksum_key
= checksum_keys
[buffer_type
]
3396 ctype
= checksum_key
.ctype
& ((1 << 32) - 1)
3398 if buffer_type
== krb5pac
.PAC_TYPE_TICKET_CHECKSUM
:
3399 self
.assertIsNotNone(enc_part
)
3401 signature
= checksum_key
.make_rodc_checksum(
3402 KU_NON_KERB_CKSUM_SALT
,
3405 elif buffer_type
== krb5pac
.PAC_TYPE_SRV_CHECKSUM
:
3406 signature
= checksum_key
.make_zeroed_checksum()
3409 signature
= checksum_key
.make_rodc_zeroed_checksum()
3411 checksum_buffer
.info
.signature
= signature
3412 checksum_buffer
.info
.type = ctype
3414 # Add the new checksum buffers to the PAC.
3415 pac
.buffers
= pac_buffers
3417 # Calculate the server and KDC checksums and insert them into the PAC.
3419 server_checksum_buffer
= checksum_buffers
.get(
3420 krb5pac
.PAC_TYPE_SRV_CHECKSUM
)
3421 if server_checksum_buffer
is not None:
3422 server_checksum_key
= checksum_keys
[krb5pac
.PAC_TYPE_SRV_CHECKSUM
]
3424 pac_data
= ndr_pack(pac
)
3425 server_checksum
= server_checksum_key
.make_checksum(
3426 KU_NON_KERB_CKSUM_SALT
,
3429 server_checksum_buffer
.info
.signature
= server_checksum
3431 kdc_checksum_buffer
= checksum_buffers
.get(
3432 krb5pac
.PAC_TYPE_KDC_CHECKSUM
)
3433 if kdc_checksum_buffer
is not None:
3434 if server_checksum_buffer
is None:
3435 # There's no server signature to make the checksum over, so
3436 # just make the checksum over an empty bytes object.
3437 server_checksum
= bytes()
3439 kdc_checksum_key
= checksum_keys
[krb5pac
.PAC_TYPE_KDC_CHECKSUM
]
3441 kdc_checksum
= kdc_checksum_key
.make_rodc_checksum(
3442 KU_NON_KERB_CKSUM_SALT
,
3445 kdc_checksum_buffer
.info
.signature
= kdc_checksum
3447 def replace_pac(self
, auth_data
, new_pac
, expect_pac
=True):
3448 if new_pac
is not None:
3449 self
.assertElementEqual(new_pac
, 'ad-type', AD_WIN2K_PAC
)
3450 self
.assertElementPresent(new_pac
, 'ad-data')
3457 for authdata_elem
in auth_data
:
3458 if authdata_elem
['ad-type'] == AD_IF_RELEVANT
:
3459 ad_relevant
= self
.der_decode(
3460 authdata_elem
['ad-data'],
3461 asn1Spec
=krb5_asn1
.AD_IF_RELEVANT())
3464 for relevant_elem
in ad_relevant
:
3465 if relevant_elem
['ad-type'] == AD_WIN2K_PAC
:
3466 self
.assertIsNone(old_pac
, 'Multiple PACs detected')
3467 old_pac
= relevant_elem
['ad-data']
3469 if new_pac
is not None:
3470 relevant_elems
.append(new_pac
)
3472 relevant_elems
.append(relevant_elem
)
3474 self
.assertIsNotNone(old_pac
, 'Expected PAC')
3476 ad_relevant
= self
.der_encode(
3478 asn1Spec
=krb5_asn1
.AD_IF_RELEVANT())
3480 authdata_elem
= self
.AuthorizationData_create(AD_IF_RELEVANT
,
3483 new_auth_data
.append(authdata_elem
)
3486 self
.assertIsNotNone(ad_relevant
, 'Expected AD-RELEVANT')
3488 return new_auth_data
, old_pac
3490 def get_pac(self
, auth_data
, expect_pac
=True):
3491 _
, pac
= self
.replace_pac(auth_data
, None, expect_pac
)
3494 def get_krbtgt_checksum_key(self
):
3495 krbtgt_creds
= self
.get_krbtgt_creds()
3496 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
3499 krb5pac
.PAC_TYPE_KDC_CHECKSUM
: krbtgt_key
3502 def is_tgs(self
, principal
):
3503 name
= principal
['name-string'][0]
3504 return name
in ('krbtgt', b
'krbtgt')
3506 def get_empty_pac(self
):
3507 return self
.AuthorizationData_create(AD_WIN2K_PAC
, bytes(1))
3509 def get_outer_pa_dict(self
, kdc_exchange_dict
):
3510 return self
.get_pa_dict(kdc_exchange_dict
['req_padata'])
3512 def get_fast_pa_dict(self
, kdc_exchange_dict
):
3513 req_pa_dict
= self
.get_pa_dict(kdc_exchange_dict
['fast_padata'])
3518 return self
.get_outer_pa_dict(kdc_exchange_dict
)
3520 def sent_fast(self
, kdc_exchange_dict
):
3521 outer_pa_dict
= self
.get_outer_pa_dict(kdc_exchange_dict
)
3523 return PADATA_FX_FAST
in outer_pa_dict
3525 def sent_enc_challenge(self
, kdc_exchange_dict
):
3526 fast_pa_dict
= self
.get_fast_pa_dict(kdc_exchange_dict
)
3528 return PADATA_ENCRYPTED_CHALLENGE
in fast_pa_dict
3530 def get_sent_pac_options(self
, kdc_exchange_dict
):
3531 fast_pa_dict
= self
.get_fast_pa_dict(kdc_exchange_dict
)
3533 if PADATA_PAC_OPTIONS
not in fast_pa_dict
:
3536 pac_options
= self
.der_decode(fast_pa_dict
[PADATA_PAC_OPTIONS
],
3537 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
3538 pac_options
= pac_options
['options']
3540 # Mask out unsupported bits.
3541 pac_options
, remaining
= pac_options
[:4], pac_options
[4:]
3542 pac_options
+= '0' * len(remaining
)
3546 def get_krbtgt_sname(self
):
3547 krbtgt_creds
= self
.get_krbtgt_creds()
3548 krbtgt_username
= krbtgt_creds
.get_username()
3549 krbtgt_realm
= krbtgt_creds
.get_realm()
3550 krbtgt_sname
= self
.PrincipalName_create(
3551 name_type
=NT_SRV_INST
, names
=[krbtgt_username
, krbtgt_realm
])
3555 def _test_as_exchange(self
,
3561 expected_error_mode
,
3570 expected_flags
=None,
3571 unexpected_flags
=None,
3572 expected_supported_etypes
=None,
3574 ticket_decryption_key
=None,
3579 def _generate_padata_copy(_kdc_exchange_dict
,
3582 return padata
, req_body
3584 if not expected_error_mode
:
3585 check_error_fn
= None
3586 check_rep_fn
= self
.generic_check_kdc_rep
3588 check_error_fn
= self
.generic_check_kdc_error
3591 if padata
is not None:
3592 generate_padata_fn
= _generate_padata_copy
3594 generate_padata_fn
= None
3596 kdc_exchange_dict
= self
.as_exchange_dict(
3597 expected_crealm
=expected_crealm
,
3598 expected_cname
=expected_cname
,
3599 expected_srealm
=expected_srealm
,
3600 expected_sname
=expected_sname
,
3601 expected_supported_etypes
=expected_supported_etypes
,
3602 ticket_decryption_key
=ticket_decryption_key
,
3603 generate_padata_fn
=generate_padata_fn
,
3604 check_error_fn
=check_error_fn
,
3605 check_rep_fn
=check_rep_fn
,
3606 check_kdc_private_fn
=self
.generic_check_kdc_private
,
3607 expected_error_mode
=expected_error_mode
,
3608 client_as_etypes
=client_as_etypes
,
3609 expected_salt
=expected_salt
,
3610 expected_flags
=expected_flags
,
3611 unexpected_flags
=unexpected_flags
,
3612 preauth_key
=preauth_key
,
3613 kdc_options
=str(kdc_options
),
3614 pac_request
=pac_request
,
3615 pac_options
=pac_options
,
3618 rep
= self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
3625 return rep
, kdc_exchange_dict