1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
29 from pyasn1
.codec
.der
.decoder
import decode
as pyasn1_der_decode
30 from pyasn1
.codec
.der
.encoder
import encode
as pyasn1_der_encode
31 from pyasn1
.codec
.native
.decoder
import decode
as pyasn1_native_decode
32 from pyasn1
.codec
.native
.encoder
import encode
as pyasn1_native_encode
34 from pyasn1
.codec
.ber
.encoder
import BitStringEncoder
36 from samba
.credentials
import Credentials
37 from samba
.dcerpc
import krb5pac
, security
38 from samba
.gensec
import FEATURE_SEAL
39 from samba
.ndr
import ndr_pack
, ndr_unpack
42 from samba
.tests
import TestCaseInTempDir
44 import samba
.tests
.krb5
.rfc4120_pyasn1
as krb5_asn1
45 from samba
.tests
.krb5
.rfc4120_constants
import (
48 FX_FAST_ARMOR_AP_REQUEST
,
50 KDC_ERR_PREAUTH_FAILED
,
51 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
,
52 KERB_ERR_TYPE_EXTENDED
,
66 KU_NON_KERB_CKSUM_SALT
,
67 KU_TGS_REP_ENC_PART_SESSION
,
68 KU_TGS_REP_ENC_PART_SUB_KEY
,
70 KU_TGS_REQ_AUTH_CKSUM
,
71 KU_TGS_REQ_AUTH_DAT_SESSION
,
72 KU_TGS_REQ_AUTH_DAT_SUBKEY
,
76 PADATA_ENCRYPTED_CHALLENGE
,
89 PADATA_SUPPORTED_ETYPES
91 import samba
.tests
.krb5
.kcrypto
as kcrypto
94 def BitStringEncoder_encodeValue32(
95 self
, value
, asn1Spec
, encodeFun
, **options
):
97 # BitStrings like KDCOptions or TicketFlags should at least
98 # be 32-Bit on the wire
100 if asn1Spec
is not None:
101 # TODO: try to avoid ASN.1 schema instantiation
102 value
= asn1Spec
.clone(value
)
104 valueLength
= len(value
)
106 alignedValue
= value
<< (8 - valueLength
% 8)
110 substrate
= alignedValue
.asOctets()
111 length
= len(substrate
)
112 # We need at least 32-Bit / 4-Bytes
117 ret
= b
'\x00' + substrate
+ (b
'\x00' * padding
)
118 return ret
, False, True
121 BitStringEncoder
.encodeValue
= BitStringEncoder_encodeValue32
124 def BitString_NamedValues_prettyPrint(self
, scope
=0):
125 ret
= "%s" % self
.asBinary()
128 for byte
in self
.asNumbers():
129 for bit
in [7, 6, 5, 4, 3, 2, 1, 0]:
136 if len(bits
) < highest_bit
:
137 for bitPosition
in range(len(bits
), highest_bit
):
140 delim
= ": (\n%s " % indent
141 for bitPosition
in range(highest_bit
):
142 if bitPosition
in self
.prettyPrintNamedValues
:
143 name
= self
.prettyPrintNamedValues
[bitPosition
]
144 elif bits
[bitPosition
] != 0:
145 name
= "unknown-bit-%u" % bitPosition
148 ret
+= "%s%s:%u" % (delim
, name
, bits
[bitPosition
])
149 delim
= ",\n%s " % indent
150 ret
+= "\n%s)" % indent
154 krb5_asn1
.TicketFlags
.prettyPrintNamedValues
=\
155 krb5_asn1
.TicketFlagsValues
.namedValues
156 krb5_asn1
.TicketFlags
.namedValues
=\
157 krb5_asn1
.TicketFlagsValues
.namedValues
158 krb5_asn1
.TicketFlags
.prettyPrint
=\
159 BitString_NamedValues_prettyPrint
160 krb5_asn1
.KDCOptions
.prettyPrintNamedValues
=\
161 krb5_asn1
.KDCOptionsValues
.namedValues
162 krb5_asn1
.KDCOptions
.namedValues
=\
163 krb5_asn1
.KDCOptionsValues
.namedValues
164 krb5_asn1
.KDCOptions
.prettyPrint
=\
165 BitString_NamedValues_prettyPrint
166 krb5_asn1
.APOptions
.prettyPrintNamedValues
=\
167 krb5_asn1
.APOptionsValues
.namedValues
168 krb5_asn1
.APOptions
.namedValues
=\
169 krb5_asn1
.APOptionsValues
.namedValues
170 krb5_asn1
.APOptions
.prettyPrint
=\
171 BitString_NamedValues_prettyPrint
172 krb5_asn1
.PACOptionFlags
.prettyPrintNamedValues
=\
173 krb5_asn1
.PACOptionFlagsValues
.namedValues
174 krb5_asn1
.PACOptionFlags
.namedValues
=\
175 krb5_asn1
.PACOptionFlagsValues
.namedValues
176 krb5_asn1
.PACOptionFlags
.prettyPrint
=\
177 BitString_NamedValues_prettyPrint
180 def Integer_NamedValues_prettyPrint(self
, scope
=0):
182 if intval
in self
.prettyPrintNamedValues
:
183 name
= self
.prettyPrintNamedValues
[intval
]
185 name
= "<__unknown__>"
186 ret
= "%d (0x%x) %s" % (intval
, intval
, name
)
190 krb5_asn1
.NameType
.prettyPrintNamedValues
=\
191 krb5_asn1
.NameTypeValues
.namedValues
192 krb5_asn1
.NameType
.prettyPrint
=\
193 Integer_NamedValues_prettyPrint
194 krb5_asn1
.AuthDataType
.prettyPrintNamedValues
=\
195 krb5_asn1
.AuthDataTypeValues
.namedValues
196 krb5_asn1
.AuthDataType
.prettyPrint
=\
197 Integer_NamedValues_prettyPrint
198 krb5_asn1
.PADataType
.prettyPrintNamedValues
=\
199 krb5_asn1
.PADataTypeValues
.namedValues
200 krb5_asn1
.PADataType
.prettyPrint
=\
201 Integer_NamedValues_prettyPrint
202 krb5_asn1
.EncryptionType
.prettyPrintNamedValues
=\
203 krb5_asn1
.EncryptionTypeValues
.namedValues
204 krb5_asn1
.EncryptionType
.prettyPrint
=\
205 Integer_NamedValues_prettyPrint
206 krb5_asn1
.ChecksumType
.prettyPrintNamedValues
=\
207 krb5_asn1
.ChecksumTypeValues
.namedValues
208 krb5_asn1
.ChecksumType
.prettyPrint
=\
209 Integer_NamedValues_prettyPrint
210 krb5_asn1
.KerbErrorDataType
.prettyPrintNamedValues
=\
211 krb5_asn1
.KerbErrorDataTypeValues
.namedValues
212 krb5_asn1
.KerbErrorDataType
.prettyPrint
=\
213 Integer_NamedValues_prettyPrint
216 class Krb5EncryptionKey
:
217 def __init__(self
, key
, kvno
):
219 kcrypto
.Enctype
.AES256
: kcrypto
.Cksumtype
.SHA1_AES256
,
220 kcrypto
.Enctype
.AES128
: kcrypto
.Cksumtype
.SHA1_AES128
,
221 kcrypto
.Enctype
.RC4
: kcrypto
.Cksumtype
.HMAC_MD5
,
224 self
.etype
= key
.enctype
225 self
.ctype
= EncTypeChecksum
[self
.etype
]
228 def encrypt(self
, usage
, plaintext
):
229 ciphertext
= kcrypto
.encrypt(self
.key
, usage
, plaintext
)
232 def decrypt(self
, usage
, ciphertext
):
233 plaintext
= kcrypto
.decrypt(self
.key
, usage
, ciphertext
)
236 def make_zeroed_checksum(self
, ctype
=None):
240 checksum_len
= kcrypto
.checksum_len(ctype
)
241 return bytes(checksum_len
)
243 def make_checksum(self
, usage
, plaintext
, ctype
=None):
246 cksum
= kcrypto
.make_checksum(ctype
, self
.key
, usage
, plaintext
)
249 def verify_checksum(self
, usage
, plaintext
, ctype
, cksum
):
250 if self
.ctype
!= ctype
:
251 raise AssertionError(f
'key checksum type ({self.ctype}) != '
252 f
'checksum type ({ctype})')
254 kcrypto
.verify_checksum(ctype
,
260 def export_obj(self
):
261 EncryptionKey_obj
= {
262 'keytype': self
.etype
,
263 'keyvalue': self
.key
.contents
,
265 return EncryptionKey_obj
268 class RodcPacEncryptionKey(Krb5EncryptionKey
):
269 def __init__(self
, key
, kvno
, rodc_id
=None):
270 super().__init
__(key
, kvno
)
276 kvno
&= (1 << 16) - 1
278 rodc_id
= kvno
or None
280 if rodc_id
is not None:
281 self
.rodc_id
= rodc_id
.to_bytes(2, byteorder
='little')
285 def make_rodc_zeroed_checksum(self
, ctype
=None):
286 checksum
= super().make_zeroed_checksum(ctype
)
287 return checksum
+ bytes(len(self
.rodc_id
))
289 def make_rodc_checksum(self
, usage
, plaintext
, ctype
=None):
290 checksum
= super().make_checksum(usage
, plaintext
, ctype
)
291 return checksum
+ self
.rodc_id
293 def verify_rodc_checksum(self
, usage
, plaintext
, ctype
, cksum
):
295 cksum
, cksum_rodc_id
= cksum
[:-2], cksum
[-2:]
297 if self
.rodc_id
!= cksum_rodc_id
:
298 raise AssertionError(f
'{self.rodc_id.hex()} != '
299 f
'{cksum_rodc_id.hex()}')
301 super().verify_checksum(usage
,
307 class ZeroedChecksumKey(RodcPacEncryptionKey
):
308 def make_checksum(self
, usage
, plaintext
, ctype
=None):
309 return self
.make_zeroed_checksum(ctype
)
311 def make_rodc_checksum(self
, usage
, plaintext
, ctype
=None):
312 return self
.make_rodc_zeroed_checksum(ctype
)
315 class WrongLengthChecksumKey(RodcPacEncryptionKey
):
316 def __init__(self
, key
, kvno
, length
):
317 super().__init
__(key
, kvno
)
319 self
._length
= length
322 def _adjust_to_length(cls
, checksum
, length
):
323 diff
= length
- len(checksum
)
325 checksum
+= bytes(diff
)
327 checksum
= checksum
[:length
]
331 def make_zeroed_checksum(self
, ctype
=None):
332 return bytes(self
._length
)
334 def make_checksum(self
, usage
, plaintext
, ctype
=None):
335 checksum
= super().make_checksum(usage
, plaintext
, ctype
)
336 return self
._adjust
_to
_length
(checksum
, self
._length
)
338 def make_rodc_zeroed_checksum(self
, ctype
=None):
339 return bytes(self
._length
)
341 def make_rodc_checksum(self
, usage
, plaintext
, ctype
=None):
342 checksum
= super().make_rodc_checksum(usage
, plaintext
, ctype
)
343 return self
._adjust
_to
_length
(checksum
, self
._length
)
346 class KerberosCredentials(Credentials
):
348 fast_supported_bits
= (security
.KERB_ENCTYPE_FAST_SUPPORTED |
349 security
.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
350 security
.KERB_ENCTYPE_CLAIMS_SUPPORTED
)
353 super(KerberosCredentials
, self
).__init
__()
355 all_enc_types |
= security
.KERB_ENCTYPE_RC4_HMAC_MD5
356 all_enc_types |
= security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
357 all_enc_types |
= security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
359 self
.as_supported_enctypes
= all_enc_types
360 self
.tgs_supported_enctypes
= all_enc_types
361 self
.ap_supported_enctypes
= all_enc_types
364 self
.forced_keys
= {}
366 self
.forced_salt
= None
372 def set_as_supported_enctypes(self
, value
):
373 self
.as_supported_enctypes
= int(value
)
375 def set_tgs_supported_enctypes(self
, value
):
376 self
.tgs_supported_enctypes
= int(value
)
378 def set_ap_supported_enctypes(self
, value
):
379 self
.ap_supported_enctypes
= int(value
)
381 etype_map
= collections
.OrderedDict([
382 (kcrypto
.Enctype
.AES256
,
383 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
),
384 (kcrypto
.Enctype
.AES128
,
385 security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
),
386 (kcrypto
.Enctype
.RC4
,
387 security
.KERB_ENCTYPE_RC4_HMAC_MD5
),
388 (kcrypto
.Enctype
.DES_MD5
,
389 security
.KERB_ENCTYPE_DES_CBC_MD5
),
390 (kcrypto
.Enctype
.DES_CRC
,
391 security
.KERB_ENCTYPE_DES_CBC_CRC
)
395 def etypes_to_bits(cls
, etypes
):
398 bit
= cls
.etype_map
[etype
]
400 raise ValueError(f
'Got duplicate etype: {etype}')
406 def bits_to_etypes(cls
, bits
):
408 for etype
, bit
in cls
.etype_map
.items():
413 bits
&= ~cls
.fast_supported_bits
415 raise ValueError(f
'Unsupported etype bits: {bits}')
419 def get_as_krb5_etypes(self
):
420 return self
.bits_to_etypes(self
.as_supported_enctypes
)
422 def get_tgs_krb5_etypes(self
):
423 return self
.bits_to_etypes(self
.tgs_supported_enctypes
)
425 def get_ap_krb5_etypes(self
):
426 return self
.bits_to_etypes(self
.ap_supported_enctypes
)
428 def set_kvno(self
, kvno
):
429 # Sign-extend from 32 bits.
437 def set_forced_key(self
, etype
, hexkey
):
439 contents
= binascii
.a2b_hex(hexkey
)
440 key
= kcrypto
.Key(etype
, contents
)
441 self
.forced_keys
[etype
] = RodcPacEncryptionKey(key
, self
.kvno
)
443 def get_forced_key(self
, etype
):
445 return self
.forced_keys
.get(etype
)
447 def set_forced_salt(self
, salt
):
448 self
.forced_salt
= bytes(salt
)
450 def get_forced_salt(self
):
451 return self
.forced_salt
454 if self
.forced_salt
is not None:
455 return self
.forced_salt
459 salt_name
= upn
.rsplit('@', 1)[0].replace('/', '')
461 salt_name
= self
.get_username()
463 if self
.get_workstation():
464 salt_name
= self
.get_username().lower()
465 if salt_name
[-1] == '$':
466 salt_name
= salt_name
[:-1]
467 salt_string
= '%shost%s.%s' % (
468 self
.get_realm().upper(),
470 self
.get_realm().lower())
472 salt_string
= self
.get_realm().upper() + salt_name
474 return salt_string
.encode('utf-8')
476 def set_dn(self
, dn
):
482 def set_spn(self
, spn
):
488 def set_upn(self
, upn
):
495 class KerberosTicketCreds
:
496 def __init__(self
, ticket
, session_key
,
497 crealm
=None, cname
=None,
498 srealm
=None, sname
=None,
501 encpart_private
=None):
503 self
.session_key
= session_key
508 self
.decryption_key
= decryption_key
509 self
.ticket_private
= ticket_private
510 self
.encpart_private
= encpart_private
513 class RawKerberosTest(TestCaseInTempDir
):
514 """A raw Kerberos Test case."""
516 pac_checksum_types
= {krb5pac
.PAC_TYPE_SRV_CHECKSUM
,
517 krb5pac
.PAC_TYPE_KDC_CHECKSUM
,
518 krb5pac
.PAC_TYPE_TICKET_CHECKSUM
}
521 {"value": -1111, "name": "dummy", },
522 {"value": kcrypto
.Enctype
.AES256
, "name": "aes128", },
523 {"value": kcrypto
.Enctype
.AES128
, "name": "aes256", },
524 {"value": kcrypto
.Enctype
.RC4
, "name": "rc4", },
527 setup_etype_test_permutations_done
= False
530 def setup_etype_test_permutations(cls
):
531 if cls
.setup_etype_test_permutations_done
:
536 num_idxs
= len(cls
.etypes_to_test
)
538 for num
in range(1, num_idxs
+ 1):
539 chunk
= list(itertools
.permutations(range(num_idxs
), num
))
542 permutations
.append(el
)
544 for p
in permutations
:
548 n
= cls
.etypes_to_test
[idx
]["name"]
553 etypes
+= (cls
.etypes_to_test
[idx
]["value"],)
555 r
= {"name": name
, "etypes": etypes
, }
558 cls
.etype_test_permutations
= res
559 cls
.setup_etype_test_permutations_done
= True
562 def etype_test_permutation_name_idx(cls
):
563 cls
.setup_etype_test_permutations()
566 for e
in cls
.etype_test_permutations
:
572 def etype_test_permutation_by_idx(self
, idx
):
573 e
= self
.etype_test_permutations
[idx
]
574 return (e
['name'], e
['etypes'])
580 cls
.host
= samba
.tests
.env_get_var_value('SERVER')
581 cls
.dc_host
= samba
.tests
.env_get_var_value('DC_SERVER')
583 # A dictionary containing credentials that have already been
587 kdc_fast_support
= samba
.tests
.env_get_var_value('FAST_SUPPORT',
589 if kdc_fast_support
is None:
590 kdc_fast_support
= '0'
591 cls
.kdc_fast_support
= bool(int(kdc_fast_support
))
593 tkt_sig_support
= samba
.tests
.env_get_var_value('TKT_SIG_SUPPORT',
595 if tkt_sig_support
is None:
596 tkt_sig_support
= '0'
597 cls
.tkt_sig_support
= bool(int(tkt_sig_support
))
601 self
.do_asn1_print
= False
602 self
.do_hexdump
= False
604 strict_checking
= samba
.tests
.env_get_var_value('STRICT_CHECKING',
606 if strict_checking
is None:
607 strict_checking
= '1'
608 self
.strict_checking
= bool(int(strict_checking
))
612 self
.unspecified_kvno
= object()
615 self
._disconnect
("tearDown")
618 def _disconnect(self
, reason
):
624 sys
.stderr
.write("disconnect[%s]\n" % reason
)
626 def _connect_tcp(self
, host
):
629 self
.a
= socket
.getaddrinfo(host
, tcp_port
, socket
.AF_UNSPEC
,
630 socket
.SOCK_STREAM
, socket
.SOL_TCP
,
632 self
.s
= socket
.socket(self
.a
[0][0], self
.a
[0][1], self
.a
[0][2])
633 self
.s
.settimeout(10)
634 self
.s
.connect(self
.a
[0][4])
642 def connect(self
, host
):
643 self
.assertNotConnected()
644 self
._connect
_tcp
(host
)
646 sys
.stderr
.write("connected[%s]\n" % host
)
648 def env_get_var(self
, varname
, prefix
,
649 fallback_default
=True,
650 allow_missing
=False):
652 if prefix
is not None:
653 allow_missing_prefix
= allow_missing
or fallback_default
654 val
= samba
.tests
.env_get_var_value(
655 '%s_%s' % (prefix
, varname
),
656 allow_missing
=allow_missing_prefix
)
658 fallback_default
= True
659 if val
is None and fallback_default
:
660 val
= samba
.tests
.env_get_var_value(varname
,
661 allow_missing
=allow_missing
)
664 def _get_krb5_creds_from_env(self
, prefix
,
665 default_username
=None,
666 allow_missing_password
=False,
667 allow_missing_keys
=True,
668 require_strongest_key
=False):
669 c
= KerberosCredentials()
672 domain
= self
.env_get_var('DOMAIN', prefix
)
673 realm
= self
.env_get_var('REALM', prefix
)
674 allow_missing_username
= default_username
is not None
675 username
= self
.env_get_var('USERNAME', prefix
,
676 fallback_default
=False,
677 allow_missing
=allow_missing_username
)
679 username
= default_username
680 password
= self
.env_get_var('PASSWORD', prefix
,
681 fallback_default
=False,
682 allow_missing
=allow_missing_password
)
685 c
.set_username(username
)
686 if password
is not None:
687 c
.set_password(password
)
688 as_supported_enctypes
= self
.env_get_var('AS_SUPPORTED_ENCTYPES',
689 prefix
, allow_missing
=True)
690 if as_supported_enctypes
is not None:
691 c
.set_as_supported_enctypes(as_supported_enctypes
)
692 tgs_supported_enctypes
= self
.env_get_var('TGS_SUPPORTED_ENCTYPES',
693 prefix
, allow_missing
=True)
694 if tgs_supported_enctypes
is not None:
695 c
.set_tgs_supported_enctypes(tgs_supported_enctypes
)
696 ap_supported_enctypes
= self
.env_get_var('AP_SUPPORTED_ENCTYPES',
697 prefix
, allow_missing
=True)
698 if ap_supported_enctypes
is not None:
699 c
.set_ap_supported_enctypes(ap_supported_enctypes
)
701 if require_strongest_key
:
702 kvno_allow_missing
= False
704 aes256_allow_missing
= False
706 aes256_allow_missing
= True
708 kvno_allow_missing
= allow_missing_keys
709 aes256_allow_missing
= allow_missing_keys
710 kvno
= self
.env_get_var('KVNO', prefix
,
711 fallback_default
=False,
712 allow_missing
=kvno_allow_missing
)
715 aes256_key
= self
.env_get_var('AES256_KEY_HEX', prefix
,
716 fallback_default
=False,
717 allow_missing
=aes256_allow_missing
)
718 if aes256_key
is not None:
719 c
.set_forced_key(kcrypto
.Enctype
.AES256
, aes256_key
)
720 aes128_key
= self
.env_get_var('AES128_KEY_HEX', prefix
,
721 fallback_default
=False,
723 if aes128_key
is not None:
724 c
.set_forced_key(kcrypto
.Enctype
.AES128
, aes128_key
)
725 rc4_key
= self
.env_get_var('RC4_KEY_HEX', prefix
,
726 fallback_default
=False, allow_missing
=True)
727 if rc4_key
is not None:
728 c
.set_forced_key(kcrypto
.Enctype
.RC4
, rc4_key
)
730 if not allow_missing_keys
:
731 self
.assertTrue(c
.forced_keys
,
732 'Please supply %s encryption keys '
733 'in environment' % prefix
)
737 def _get_krb5_creds(self
,
739 default_username
=None,
740 allow_missing_password
=False,
741 allow_missing_keys
=True,
742 require_strongest_key
=False,
743 fallback_creds_fn
=None):
744 if prefix
in self
.creds_dict
:
745 return self
.creds_dict
[prefix
]
747 # We don't have the credentials already
751 # Try to obtain them from the environment
752 creds
= self
._get
_krb
5_creds
_from
_env
(
754 default_username
=default_username
,
755 allow_missing_password
=allow_missing_password
,
756 allow_missing_keys
=allow_missing_keys
,
757 require_strongest_key
=require_strongest_key
)
758 except Exception as err
:
759 # An error occurred, so save it for later
762 self
.assertIsNotNone(creds
)
763 # Save the obtained credentials
764 self
.creds_dict
[prefix
] = creds
767 if fallback_creds_fn
is not None:
769 # Try to use the fallback method
770 creds
= fallback_creds_fn()
771 except Exception as err
:
772 print("ERROR FROM ENV: %r" % (env_err
))
773 print("FALLBACK-FN: %s" % (fallback_creds_fn
))
774 print("FALLBACK-ERROR: %r" % (err
))
776 self
.assertIsNotNone(creds
)
777 # Save the obtained credentials
778 self
.creds_dict
[prefix
] = creds
781 # Both methods failed, so raise the exception from the
785 def get_user_creds(self
,
786 allow_missing_password
=False,
787 allow_missing_keys
=True):
788 c
= self
._get
_krb
5_creds
(prefix
=None,
789 allow_missing_password
=allow_missing_password
,
790 allow_missing_keys
=allow_missing_keys
)
793 def get_service_creds(self
,
794 allow_missing_password
=False,
795 allow_missing_keys
=True):
796 c
= self
._get
_krb
5_creds
(prefix
='SERVICE',
797 allow_missing_password
=allow_missing_password
,
798 allow_missing_keys
=allow_missing_keys
)
801 def get_client_creds(self
,
802 allow_missing_password
=False,
803 allow_missing_keys
=True):
804 c
= self
._get
_krb
5_creds
(prefix
='CLIENT',
805 allow_missing_password
=allow_missing_password
,
806 allow_missing_keys
=allow_missing_keys
)
809 def get_server_creds(self
,
810 allow_missing_password
=False,
811 allow_missing_keys
=True):
812 c
= self
._get
_krb
5_creds
(prefix
='SERVER',
813 allow_missing_password
=allow_missing_password
,
814 allow_missing_keys
=allow_missing_keys
)
817 def get_admin_creds(self
,
818 allow_missing_password
=False,
819 allow_missing_keys
=True):
820 c
= self
._get
_krb
5_creds
(prefix
='ADMIN',
821 allow_missing_password
=allow_missing_password
,
822 allow_missing_keys
=allow_missing_keys
)
823 c
.set_gensec_features(c
.get_gensec_features() | FEATURE_SEAL
)
826 def get_rodc_krbtgt_creds(self
,
828 require_strongest_key
=False):
829 if require_strongest_key
:
830 self
.assertTrue(require_keys
)
831 c
= self
._get
_krb
5_creds
(prefix
='RODC_KRBTGT',
832 allow_missing_password
=True,
833 allow_missing_keys
=not require_keys
,
834 require_strongest_key
=require_strongest_key
)
837 def get_krbtgt_creds(self
,
839 require_strongest_key
=False):
840 if require_strongest_key
:
841 self
.assertTrue(require_keys
)
842 c
= self
._get
_krb
5_creds
(prefix
='KRBTGT',
843 default_username
='krbtgt',
844 allow_missing_password
=True,
845 allow_missing_keys
=not require_keys
,
846 require_strongest_key
=require_strongest_key
)
849 def get_anon_creds(self
):
854 def asn1_dump(self
, name
, obj
, asn1_print
=None):
855 if asn1_print
is None:
856 asn1_print
= self
.do_asn1_print
859 sys
.stderr
.write("%s:\n%s" % (name
, obj
))
861 sys
.stderr
.write("%s" % (obj
))
863 def hex_dump(self
, name
, blob
, hexdump
=None):
865 hexdump
= self
.do_hexdump
868 "%s: %d\n%s" % (name
, len(blob
), self
.hexdump(blob
)))
877 if asn1Spec
is not None:
878 class_name
= type(asn1Spec
).__name
__.split(':')[0]
880 class_name
= "<None-asn1Spec>"
881 self
.hex_dump(class_name
, blob
, hexdump
=hexdump
)
882 obj
, _
= pyasn1_der_decode(blob
, asn1Spec
=asn1Spec
)
883 self
.asn1_dump(None, obj
, asn1_print
=asn1_print
)
885 obj
= pyasn1_native_encode(obj
)
896 obj
= pyasn1_native_decode(obj
, asn1Spec
=asn1Spec
)
897 class_name
= type(obj
).__name
__.split(':')[0]
898 if class_name
is not None:
899 self
.asn1_dump(None, obj
, asn1_print
=asn1_print
)
900 blob
= pyasn1_der_encode(obj
)
901 if class_name
is not None:
902 self
.hex_dump(class_name
, blob
, hexdump
=hexdump
)
905 def send_pdu(self
, req
, asn1_print
=None, hexdump
=None):
907 k5_pdu
= self
.der_encode(
908 req
, native_decode
=False, asn1_print
=asn1_print
, hexdump
=False)
909 header
= struct
.pack('>I', len(k5_pdu
))
912 self
.hex_dump("send_pdu", header
, hexdump
=hexdump
)
913 self
.hex_dump("send_pdu", k5_pdu
, hexdump
=hexdump
)
915 sent
= self
.s
.send(req_pdu
, 0)
916 if sent
== len(req_pdu
):
918 req_pdu
= req_pdu
[sent
:]
919 except socket
.error
as e
:
920 self
._disconnect
("send_pdu: %s" % e
)
923 self
._disconnect
("send_pdu: %s" % e
)
926 def recv_raw(self
, num_recv
=0xffff, hexdump
=None, timeout
=None):
929 if timeout
is not None:
930 self
.s
.settimeout(timeout
)
931 rep_pdu
= self
.s
.recv(num_recv
, 0)
932 self
.s
.settimeout(10)
933 if len(rep_pdu
) == 0:
934 self
._disconnect
("recv_raw: EOF")
936 self
.hex_dump("recv_raw", rep_pdu
, hexdump
=hexdump
)
937 except socket
.timeout
:
938 self
.s
.settimeout(10)
939 sys
.stderr
.write("recv_raw: TIMEOUT\n")
940 except socket
.error
as e
:
941 self
._disconnect
("recv_raw: %s" % e
)
944 self
._disconnect
("recv_raw: %s" % e
)
948 def recv_pdu_raw(self
, asn1_print
=None, hexdump
=None, timeout
=None):
951 raw_pdu
= self
.recv_raw(
952 num_recv
=4, hexdump
=hexdump
, timeout
=timeout
)
955 header
= struct
.unpack(">I", raw_pdu
[0:4])
962 raw_pdu
= self
.recv_raw(
963 num_recv
=missing
, hexdump
=hexdump
, timeout
=timeout
)
964 self
.assertGreaterEqual(len(raw_pdu
), 1)
966 missing
= k5_len
- len(rep_pdu
)
967 k5_raw
= self
.der_decode(
973 pvno
= k5_raw
['field-0']
974 self
.assertEqual(pvno
, 5)
975 msg_type
= k5_raw
['field-1']
976 self
.assertIn(msg_type
, [KRB_AS_REP
, KRB_TGS_REP
, KRB_ERROR
])
977 if msg_type
== KRB_AS_REP
:
978 asn1Spec
= krb5_asn1
.AS_REP()
979 elif msg_type
== KRB_TGS_REP
:
980 asn1Spec
= krb5_asn1
.TGS_REP()
981 elif msg_type
== KRB_ERROR
:
982 asn1Spec
= krb5_asn1
.KRB_ERROR()
983 rep
= self
.der_decode(rep_pdu
, asn1Spec
=asn1Spec
,
984 asn1_print
=asn1_print
, hexdump
=False)
985 return (rep
, rep_pdu
)
987 def recv_pdu(self
, asn1_print
=None, hexdump
=None, timeout
=None):
988 (rep
, rep_pdu
) = self
.recv_pdu_raw(asn1_print
=asn1_print
,
993 def assertIsConnected(self
):
994 self
.assertIsNotNone(self
.s
, msg
="Not connected")
996 def assertNotConnected(self
):
997 self
.assertIsNone(self
.s
, msg
="Is connected")
999 def send_recv_transaction(
1006 host
= self
.host
if to_rodc
else self
.dc_host
1009 self
.send_pdu(req
, asn1_print
=asn1_print
, hexdump
=hexdump
)
1010 rep
= self
.recv_pdu(
1011 asn1_print
=asn1_print
, hexdump
=hexdump
, timeout
=timeout
)
1013 self
._disconnect
("transaction failed")
1015 self
._disconnect
("transaction done")
1018 def assertNoValue(self
, value
):
1019 self
.assertTrue(value
.isNoValue
)
1021 def assertHasValue(self
, value
):
1022 self
.assertIsNotNone(value
)
1024 def getElementValue(self
, obj
, elem
):
1025 return obj
.get(elem
)
1027 def assertElementMissing(self
, obj
, elem
):
1028 v
= self
.getElementValue(obj
, elem
)
1029 self
.assertIsNone(v
)
1031 def assertElementPresent(self
, obj
, elem
, expect_empty
=False):
1032 v
= self
.getElementValue(obj
, elem
)
1033 self
.assertIsNotNone(v
)
1034 if self
.strict_checking
:
1035 if isinstance(v
, collections
.abc
.Container
):
1037 self
.assertEqual(0, len(v
))
1039 self
.assertNotEqual(0, len(v
))
1041 def assertElementEqual(self
, obj
, elem
, value
):
1042 v
= self
.getElementValue(obj
, elem
)
1043 self
.assertIsNotNone(v
)
1044 self
.assertEqual(v
, value
)
1046 def assertElementEqualUTF8(self
, obj
, elem
, value
):
1047 v
= self
.getElementValue(obj
, elem
)
1048 self
.assertIsNotNone(v
)
1049 self
.assertEqual(v
, bytes(value
, 'utf8'))
1051 def assertPrincipalEqual(self
, princ1
, princ2
):
1052 self
.assertEqual(princ1
['name-type'], princ2
['name-type'])
1054 len(princ1
['name-string']),
1055 len(princ2
['name-string']),
1056 msg
="princ1=%s != princ2=%s" % (princ1
, princ2
))
1057 for idx
in range(len(princ1
['name-string'])):
1059 princ1
['name-string'][idx
],
1060 princ2
['name-string'][idx
],
1061 msg
="princ1=%s != princ2=%s" % (princ1
, princ2
))
1063 def assertElementEqualPrincipal(self
, obj
, elem
, value
):
1064 v
= self
.getElementValue(obj
, elem
)
1065 self
.assertIsNotNone(v
)
1066 v
= pyasn1_native_decode(v
, asn1Spec
=krb5_asn1
.PrincipalName())
1067 self
.assertPrincipalEqual(v
, value
)
1069 def assertElementKVNO(self
, obj
, elem
, value
):
1070 v
= self
.getElementValue(obj
, elem
)
1071 if value
== "autodetect":
1073 if value
is not None:
1074 self
.assertIsNotNone(v
)
1075 # The value on the wire should never be 0
1076 self
.assertNotEqual(v
, 0)
1077 # unspecified_kvno means we don't know the kvno,
1078 # but want to enforce its presence
1079 if value
is not self
.unspecified_kvno
:
1081 self
.assertNotEqual(value
, 0)
1082 self
.assertEqual(v
, value
)
1084 self
.assertIsNone(v
)
1086 def assertElementFlags(self
, obj
, elem
, expected
, unexpected
):
1087 v
= self
.getElementValue(obj
, elem
)
1088 self
.assertIsNotNone(v
)
1089 if expected
is not None:
1090 self
.assertIsInstance(expected
, krb5_asn1
.TicketFlags
)
1091 for i
, flag
in enumerate(expected
):
1093 self
.assertEqual('1', v
[i
],
1094 f
"'{expected.namedValues[i]}' "
1096 if unexpected
is not None:
1097 self
.assertIsInstance(unexpected
, krb5_asn1
.TicketFlags
)
1098 for i
, flag
in enumerate(unexpected
):
1100 self
.assertEqual('0', v
[i
],
1101 f
"'{unexpected.namedValues[i]}' "
1102 f
"unexpected in {v}")
1104 def assertSequenceElementsEqual(self
, expected
, got
, *,
1105 require_strict
=None):
1106 if self
.strict_checking
:
1107 self
.assertEqual(expected
, got
)
1109 fail_msg
= f
'expected: {expected} got: {got}'
1111 if require_strict
is not None:
1112 fail_msg
+= f
' (ignoring: {require_strict})'
1113 expected
= (x
for x
in expected
if x
not in require_strict
)
1114 got
= (x
for x
in got
if x
not in require_strict
)
1116 self
.assertCountEqual(expected
, got
, fail_msg
)
1118 def get_KerberosTimeWithUsec(self
, epoch
=None, offset
=None):
1121 if offset
is not None:
1122 epoch
= epoch
+ int(offset
)
1123 dt
= datetime
.datetime
.fromtimestamp(epoch
, tz
=datetime
.timezone
.utc
)
1124 return (dt
.strftime("%Y%m%d%H%M%SZ"), dt
.microsecond
)
1126 def get_KerberosTime(self
, epoch
=None, offset
=None):
1127 (s
, _
) = self
.get_KerberosTimeWithUsec(epoch
=epoch
, offset
=offset
)
1130 def get_EpochFromKerberosTime(self
, kerberos_time
):
1131 if isinstance(kerberos_time
, bytes
):
1132 kerberos_time
= kerberos_time
.decode()
1134 epoch
= datetime
.datetime
.strptime(kerberos_time
,
1136 epoch
= epoch
.replace(tzinfo
=datetime
.timezone
.utc
)
1137 epoch
= int(epoch
.timestamp())
1141 def get_Nonce(self
):
1142 nonce_min
= 0x7f000000
1143 nonce_max
= 0x7fffffff
1144 v
= random
.randint(nonce_min
, nonce_max
)
1147 def get_pa_dict(self
, pa_data
):
1150 if pa_data
is not None:
1152 pa_type
= pa
['padata-type']
1153 if pa_type
in pa_dict
:
1154 raise RuntimeError(f
'Duplicate type {pa_type}')
1155 pa_dict
[pa_type
] = pa
['padata-value']
1159 def SessionKey_create(self
, etype
, contents
, kvno
=None):
1160 key
= kcrypto
.Key(etype
, contents
)
1161 return RodcPacEncryptionKey(key
, kvno
)
1163 def PasswordKey_create(self
, etype
=None, pwd
=None, salt
=None, kvno
=None):
1164 self
.assertIsNotNone(pwd
)
1165 self
.assertIsNotNone(salt
)
1166 key
= kcrypto
.string_to_key(etype
, pwd
, salt
)
1167 return RodcPacEncryptionKey(key
, kvno
)
1169 def PasswordKey_from_etype_info2(self
, creds
, etype_info2
, kvno
=None):
1170 e
= etype_info2
['etype']
1172 salt
= etype_info2
.get('salt')
1174 if e
== kcrypto
.Enctype
.RC4
:
1175 nthash
= creds
.get_nt_hash()
1176 return self
.SessionKey_create(etype
=e
, contents
=nthash
, kvno
=kvno
)
1178 password
= creds
.get_password()
1179 return self
.PasswordKey_create(
1180 etype
=e
, pwd
=password
, salt
=salt
, kvno
=kvno
)
1182 def TicketDecryptionKey_from_creds(self
, creds
, etype
=None):
1185 etypes
= creds
.get_tgs_krb5_etypes()
1189 etype
= kcrypto
.Enctype
.RC4
1191 forced_key
= creds
.get_forced_key(etype
)
1192 if forced_key
is not None:
1195 kvno
= creds
.get_kvno()
1197 fail_msg
= ("%s has no fixed key for etype[%s] kvno[%s] "
1198 "nor a password specified, " % (
1199 creds
.get_username(), etype
, kvno
))
1201 if etype
== kcrypto
.Enctype
.RC4
:
1202 nthash
= creds
.get_nt_hash()
1203 self
.assertIsNotNone(nthash
, msg
=fail_msg
)
1204 return self
.SessionKey_create(etype
=etype
,
1208 password
= creds
.get_password()
1209 self
.assertIsNotNone(password
, msg
=fail_msg
)
1210 salt
= creds
.get_salt()
1211 return self
.PasswordKey_create(etype
=etype
,
1216 def RandomKey(self
, etype
):
1217 e
= kcrypto
._get
_enctype
_profile
(etype
)
1218 contents
= samba
.generate_random_bytes(e
.keysize
)
1219 return self
.SessionKey_create(etype
=etype
, contents
=contents
)
1221 def EncryptionKey_import(self
, EncryptionKey_obj
):
1222 return self
.SessionKey_create(EncryptionKey_obj
['keytype'],
1223 EncryptionKey_obj
['keyvalue'])
1225 def EncryptedData_create(self
, key
, usage
, plaintext
):
1226 # EncryptedData ::= SEQUENCE {
1227 # etype [0] Int32 -- EncryptionType --,
1228 # kvno [1] Int32 OPTIONAL,
1229 # cipher [2] OCTET STRING -- ciphertext
1231 ciphertext
= key
.encrypt(usage
, plaintext
)
1232 EncryptedData_obj
= {
1234 'cipher': ciphertext
1236 if key
.kvno
is not None:
1237 EncryptedData_obj
['kvno'] = key
.kvno
1238 return EncryptedData_obj
1240 def Checksum_create(self
, key
, usage
, plaintext
, ctype
=None):
1241 # Checksum ::= SEQUENCE {
1242 # cksumtype [0] Int32,
1243 # checksum [1] OCTET STRING
1247 checksum
= key
.make_checksum(usage
, plaintext
, ctype
=ctype
)
1250 'checksum': checksum
,
1255 def PrincipalName_create(cls
, name_type
, names
):
1256 # PrincipalName ::= SEQUENCE {
1257 # name-type [0] Int32,
1258 # name-string [1] SEQUENCE OF KerberosString
1260 PrincipalName_obj
= {
1261 'name-type': name_type
,
1262 'name-string': names
,
1264 return PrincipalName_obj
1266 def AuthorizationData_create(self
, ad_type
, ad_data
):
1267 # AuthorizationData ::= SEQUENCE {
1268 # ad-type [0] Int32,
1269 # ad-data [1] OCTET STRING
1275 return AUTH_DATA_obj
1277 def PA_DATA_create(self
, padata_type
, padata_value
):
1278 # PA-DATA ::= SEQUENCE {
1279 # -- NOTE: first tag is [1], not [0]
1280 # padata-type [1] Int32,
1281 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1284 'padata-type': padata_type
,
1285 'padata-value': padata_value
,
1289 def PA_ENC_TS_ENC_create(self
, ts
, usec
):
1290 # PA-ENC-TS-ENC ::= SEQUENCE {
1291 # patimestamp[0] KerberosTime, -- client's time
1292 # pausec[1] krb5int32 OPTIONAL
1294 PA_ENC_TS_ENC_obj
= {
1298 return PA_ENC_TS_ENC_obj
1300 def PA_PAC_OPTIONS_create(self
, options
):
1301 # PA-PAC-OPTIONS ::= SEQUENCE {
1302 # options [0] PACOptionFlags
1304 PA_PAC_OPTIONS_obj
= {
1307 return PA_PAC_OPTIONS_obj
1309 def KRB_FAST_ARMOR_create(self
, armor_type
, armor_value
):
1310 # KrbFastArmor ::= SEQUENCE {
1311 # armor-type [0] Int32,
1312 # armor-value [1] OCTET STRING,
1315 KRB_FAST_ARMOR_obj
= {
1316 'armor-type': armor_type
,
1317 'armor-value': armor_value
1319 return KRB_FAST_ARMOR_obj
1321 def KRB_FAST_REQ_create(self
, fast_options
, padata
, req_body
):
1322 # KrbFastReq ::= SEQUENCE {
1323 # fast-options [0] FastOptions,
1324 # padata [1] SEQUENCE OF PA-DATA,
1325 # req-body [2] KDC-REQ-BODY,
1328 KRB_FAST_REQ_obj
= {
1329 'fast-options': fast_options
,
1331 'req-body': req_body
1333 return KRB_FAST_REQ_obj
1335 def KRB_FAST_ARMORED_REQ_create(self
, armor
, req_checksum
, enc_fast_req
):
1336 # KrbFastArmoredReq ::= SEQUENCE {
1337 # armor [0] KrbFastArmor OPTIONAL,
1338 # req-checksum [1] Checksum,
1339 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1341 KRB_FAST_ARMORED_REQ_obj
= {
1342 'req-checksum': req_checksum
,
1343 'enc-fast-req': enc_fast_req
1345 if armor
is not None:
1346 KRB_FAST_ARMORED_REQ_obj
['armor'] = armor
1347 return KRB_FAST_ARMORED_REQ_obj
1349 def PA_FX_FAST_REQUEST_create(self
, armored_data
):
1350 # PA-FX-FAST-REQUEST ::= CHOICE {
1351 # armored-data [0] KrbFastArmoredReq,
1354 PA_FX_FAST_REQUEST_obj
= {
1355 'armored-data': armored_data
1357 return PA_FX_FAST_REQUEST_obj
1359 def KERB_PA_PAC_REQUEST_create(self
, include_pac
, pa_data_create
=True):
1360 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1361 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1363 # --If FALSE, and PAC present,
1366 KERB_PA_PAC_REQUEST_obj
= {
1367 'include-pac': include_pac
,
1369 if not pa_data_create
:
1370 return KERB_PA_PAC_REQUEST_obj
1371 pa_pac
= self
.der_encode(KERB_PA_PAC_REQUEST_obj
,
1372 asn1Spec
=krb5_asn1
.KERB_PA_PAC_REQUEST())
1373 pa_data
= self
.PA_DATA_create(PADATA_PAC_REQUEST
, pa_pac
)
1376 def get_pa_pac_options(self
, options
):
1377 pac_options
= self
.PA_PAC_OPTIONS_create(options
)
1378 pac_options
= self
.der_encode(pac_options
,
1379 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
1380 pac_options
= self
.PA_DATA_create(PADATA_PAC_OPTIONS
, pac_options
)
1384 def KDC_REQ_BODY_create(self
,
1396 EncAuthorizationData
,
1397 EncAuthorizationData_key
,
1398 EncAuthorizationData_usage
,
1401 # KDC-REQ-BODY ::= SEQUENCE {
1402 # kdc-options [0] KDCOptions,
1403 # cname [1] PrincipalName OPTIONAL
1404 # -- Used only in AS-REQ --,
1407 # -- Also client's in AS-REQ --,
1408 # sname [3] PrincipalName OPTIONAL,
1409 # from [4] KerberosTime OPTIONAL,
1410 # till [5] KerberosTime,
1411 # rtime [6] KerberosTime OPTIONAL,
1413 # etype [8] SEQUENCE OF Int32
1415 # -- in preference order --,
1416 # addresses [9] HostAddresses OPTIONAL,
1417 # enc-authorization-data [10] EncryptedData OPTIONAL
1418 # -- AuthorizationData --,
1419 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1420 # -- NOTE: not empty
1422 if EncAuthorizationData
is not None:
1423 enc_ad_plain
= self
.der_encode(
1424 EncAuthorizationData
,
1425 asn1Spec
=krb5_asn1
.AuthorizationData(),
1426 asn1_print
=asn1_print
,
1428 enc_ad
= self
.EncryptedData_create(EncAuthorizationData_key
,
1429 EncAuthorizationData_usage
,
1433 KDC_REQ_BODY_obj
= {
1434 'kdc-options': kdc_options
,
1440 if cname
is not None:
1441 KDC_REQ_BODY_obj
['cname'] = cname
1442 if sname
is not None:
1443 KDC_REQ_BODY_obj
['sname'] = sname
1444 if from_time
is not None:
1445 KDC_REQ_BODY_obj
['from'] = from_time
1446 if renew_time
is not None:
1447 KDC_REQ_BODY_obj
['rtime'] = renew_time
1448 if addresses
is not None:
1449 KDC_REQ_BODY_obj
['addresses'] = addresses
1450 if enc_ad
is not None:
1451 KDC_REQ_BODY_obj
['enc-authorization-data'] = enc_ad
1452 if additional_tickets
is not None:
1453 KDC_REQ_BODY_obj
['additional-tickets'] = additional_tickets
1454 return KDC_REQ_BODY_obj
1456 def KDC_REQ_create(self
,
1463 # KDC-REQ ::= SEQUENCE {
1464 # -- NOTE: first tag is [1], not [0]
1465 # pvno [1] INTEGER (5) ,
1466 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1467 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1468 # -- NOTE: not empty --,
1469 # req-body [4] KDC-REQ-BODY
1474 'msg-type': msg_type
,
1475 'req-body': req_body
,
1477 if padata
is not None:
1478 KDC_REQ_obj
['padata'] = padata
1479 if asn1Spec
is not None:
1480 KDC_REQ_decoded
= pyasn1_native_decode(
1481 KDC_REQ_obj
, asn1Spec
=asn1Spec
)
1483 KDC_REQ_decoded
= None
1484 return KDC_REQ_obj
, KDC_REQ_decoded
1486 def AS_REQ_create(self
,
1488 kdc_options
, # required
1492 from_time
, # optional
1493 till_time
, # required
1494 renew_time
, # optional
1497 addresses
, # optional
1499 native_decoded_only
=True,
1502 # KDC-REQ ::= SEQUENCE {
1503 # -- NOTE: first tag is [1], not [0]
1504 # pvno [1] INTEGER (5) ,
1505 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1506 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1507 # -- NOTE: not empty --,
1508 # req-body [4] KDC-REQ-BODY
1511 # KDC-REQ-BODY ::= SEQUENCE {
1512 # kdc-options [0] KDCOptions,
1513 # cname [1] PrincipalName OPTIONAL
1514 # -- Used only in AS-REQ --,
1517 # -- Also client's in AS-REQ --,
1518 # sname [3] PrincipalName OPTIONAL,
1519 # from [4] KerberosTime OPTIONAL,
1520 # till [5] KerberosTime,
1521 # rtime [6] KerberosTime OPTIONAL,
1523 # etype [8] SEQUENCE OF Int32
1525 # -- in preference order --,
1526 # addresses [9] HostAddresses OPTIONAL,
1527 # enc-authorization-data [10] EncryptedData OPTIONAL
1528 # -- AuthorizationData --,
1529 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1530 # -- NOTE: not empty
1532 KDC_REQ_BODY_obj
= self
.KDC_REQ_BODY_create(
1544 EncAuthorizationData
=None,
1545 EncAuthorizationData_key
=None,
1546 EncAuthorizationData_usage
=None,
1547 asn1_print
=asn1_print
,
1549 obj
, decoded
= self
.KDC_REQ_create(
1550 msg_type
=KRB_AS_REQ
,
1552 req_body
=KDC_REQ_BODY_obj
,
1553 asn1Spec
=krb5_asn1
.AS_REQ(),
1554 asn1_print
=asn1_print
,
1556 if native_decoded_only
:
1560 def AP_REQ_create(self
, ap_options
, ticket
, authenticator
):
1561 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1562 # pvno [0] INTEGER (5),
1563 # msg-type [1] INTEGER (14),
1564 # ap-options [2] APOptions,
1565 # ticket [3] Ticket,
1566 # authenticator [4] EncryptedData -- Authenticator
1570 'msg-type': KRB_AP_REQ
,
1571 'ap-options': ap_options
,
1573 'authenticator': authenticator
,
1577 def Authenticator_create(
1578 self
, crealm
, cname
, cksum
, cusec
, ctime
, subkey
, seq_number
,
1579 authorization_data
):
1580 # -- Unencrypted authenticator
1581 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1582 # authenticator-vno [0] INTEGER (5),
1584 # cname [2] PrincipalName,
1585 # cksum [3] Checksum OPTIONAL,
1586 # cusec [4] Microseconds,
1587 # ctime [5] KerberosTime,
1588 # subkey [6] EncryptionKey OPTIONAL,
1589 # seq-number [7] UInt32 OPTIONAL,
1590 # authorization-data [8] AuthorizationData OPTIONAL
1592 Authenticator_obj
= {
1593 'authenticator-vno': 5,
1599 if cksum
is not None:
1600 Authenticator_obj
['cksum'] = cksum
1601 if subkey
is not None:
1602 Authenticator_obj
['subkey'] = subkey
1603 if seq_number
is not None:
1604 Authenticator_obj
['seq-number'] = seq_number
1605 if authorization_data
is not None:
1606 Authenticator_obj
['authorization-data'] = authorization_data
1607 return Authenticator_obj
1609 def TGS_REQ_create(self
,
1614 kdc_options
, # required
1618 from_time
, # optional
1619 till_time
, # required
1620 renew_time
, # optional
1623 addresses
, # optional
1624 EncAuthorizationData
,
1625 EncAuthorizationData_key
,
1628 authenticator_subkey
=None,
1629 body_checksum_type
=None,
1630 native_decoded_only
=True,
1633 # KDC-REQ ::= SEQUENCE {
1634 # -- NOTE: first tag is [1], not [0]
1635 # pvno [1] INTEGER (5) ,
1636 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1637 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1638 # -- NOTE: not empty --,
1639 # req-body [4] KDC-REQ-BODY
1642 # KDC-REQ-BODY ::= SEQUENCE {
1643 # kdc-options [0] KDCOptions,
1644 # cname [1] PrincipalName OPTIONAL
1645 # -- Used only in AS-REQ --,
1648 # -- Also client's in AS-REQ --,
1649 # sname [3] PrincipalName OPTIONAL,
1650 # from [4] KerberosTime OPTIONAL,
1651 # till [5] KerberosTime,
1652 # rtime [6] KerberosTime OPTIONAL,
1654 # etype [8] SEQUENCE OF Int32
1656 # -- in preference order --,
1657 # addresses [9] HostAddresses OPTIONAL,
1658 # enc-authorization-data [10] EncryptedData OPTIONAL
1659 # -- AuthorizationData --,
1660 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1661 # -- NOTE: not empty
1664 if authenticator_subkey
is not None:
1665 EncAuthorizationData_usage
= KU_TGS_REQ_AUTH_DAT_SUBKEY
1667 EncAuthorizationData_usage
= KU_TGS_REQ_AUTH_DAT_SESSION
1669 req_body
= self
.KDC_REQ_BODY_create(
1670 kdc_options
=kdc_options
,
1674 from_time
=from_time
,
1675 till_time
=till_time
,
1676 renew_time
=renew_time
,
1679 addresses
=addresses
,
1680 additional_tickets
=additional_tickets
,
1681 EncAuthorizationData
=EncAuthorizationData
,
1682 EncAuthorizationData_key
=EncAuthorizationData_key
,
1683 EncAuthorizationData_usage
=EncAuthorizationData_usage
)
1684 req_body_blob
= self
.der_encode(req_body
,
1685 asn1Spec
=krb5_asn1
.KDC_REQ_BODY(),
1686 asn1_print
=asn1_print
, hexdump
=hexdump
)
1688 req_body_checksum
= self
.Checksum_create(ticket_session_key
,
1689 KU_TGS_REQ_AUTH_CKSUM
,
1691 ctype
=body_checksum_type
)
1694 if authenticator_subkey
is not None:
1695 subkey_obj
= authenticator_subkey
.export_obj()
1696 seq_number
= random
.randint(0, 0xfffffffe)
1697 authenticator
= self
.Authenticator_create(
1700 cksum
=req_body_checksum
,
1704 seq_number
=seq_number
,
1705 authorization_data
=None)
1706 authenticator
= self
.der_encode(
1708 asn1Spec
=krb5_asn1
.Authenticator(),
1709 asn1_print
=asn1_print
,
1712 authenticator
= self
.EncryptedData_create(
1713 ticket_session_key
, KU_TGS_REQ_AUTH
, authenticator
)
1715 ap_options
= krb5_asn1
.APOptions('0')
1716 ap_req
= self
.AP_REQ_create(ap_options
=str(ap_options
),
1718 authenticator
=authenticator
)
1719 ap_req
= self
.der_encode(ap_req
, asn1Spec
=krb5_asn1
.AP_REQ(),
1720 asn1_print
=asn1_print
, hexdump
=hexdump
)
1721 pa_tgs_req
= self
.PA_DATA_create(PADATA_KDC_REQ
, ap_req
)
1722 if padata
is not None:
1723 padata
.append(pa_tgs_req
)
1725 padata
= [pa_tgs_req
]
1727 obj
, decoded
= self
.KDC_REQ_create(
1728 msg_type
=KRB_TGS_REQ
,
1731 asn1Spec
=krb5_asn1
.TGS_REQ(),
1732 asn1_print
=asn1_print
,
1734 if native_decoded_only
:
1738 def PA_S4U2Self_create(self
, name
, realm
, tgt_session_key
, ctype
=None):
1739 # PA-S4U2Self ::= SEQUENCE {
1740 # name [0] PrincipalName,
1742 # cksum [2] Checksum,
1743 # auth [3] GeneralString
1745 cksum_data
= name
['name-type'].to_bytes(4, byteorder
='little')
1746 for n
in name
['name-string']:
1747 cksum_data
+= n
.encode()
1748 cksum_data
+= realm
.encode()
1749 cksum_data
+= "Kerberos".encode()
1750 cksum
= self
.Checksum_create(tgt_session_key
,
1751 KU_NON_KERB_CKSUM_SALT
,
1761 pa_s4u2self
= self
.der_encode(
1762 PA_S4U2Self_obj
, asn1Spec
=krb5_asn1
.PA_S4U2Self())
1763 return self
.PA_DATA_create(PADATA_FOR_USER
, pa_s4u2self
)
1765 def _generic_kdc_exchange(self
,
1766 kdc_exchange_dict
, # required
1767 cname
=None, # optional
1768 realm
=None, # required
1769 sname
=None, # optional
1770 from_time
=None, # optional
1771 till_time
=None, # required
1772 renew_time
=None, # optional
1773 etypes
=None, # required
1774 addresses
=None, # optional
1775 additional_tickets
=None, # optional
1776 EncAuthorizationData
=None, # optional
1777 EncAuthorizationData_key
=None, # optional
1778 EncAuthorizationData_usage
=None): # optional
1780 check_error_fn
= kdc_exchange_dict
['check_error_fn']
1781 check_rep_fn
= kdc_exchange_dict
['check_rep_fn']
1782 generate_fast_fn
= kdc_exchange_dict
['generate_fast_fn']
1783 generate_fast_armor_fn
= kdc_exchange_dict
['generate_fast_armor_fn']
1784 generate_fast_padata_fn
= kdc_exchange_dict
['generate_fast_padata_fn']
1785 generate_padata_fn
= kdc_exchange_dict
['generate_padata_fn']
1786 callback_dict
= kdc_exchange_dict
['callback_dict']
1787 req_msg_type
= kdc_exchange_dict
['req_msg_type']
1788 req_asn1Spec
= kdc_exchange_dict
['req_asn1Spec']
1789 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
1791 expected_error_mode
= kdc_exchange_dict
['expected_error_mode']
1792 kdc_options
= kdc_exchange_dict
['kdc_options']
1794 pac_request
= kdc_exchange_dict
['pac_request']
1795 pac_options
= kdc_exchange_dict
['pac_options']
1797 # Parameters specific to the inner request body
1798 inner_req
= kdc_exchange_dict
['inner_req']
1800 # Parameters specific to the outer request body
1801 outer_req
= kdc_exchange_dict
['outer_req']
1803 if till_time
is None:
1804 till_time
= self
.get_KerberosTime(offset
=36000)
1806 if 'nonce' in kdc_exchange_dict
:
1807 nonce
= kdc_exchange_dict
['nonce']
1809 nonce
= self
.get_Nonce()
1810 kdc_exchange_dict
['nonce'] = nonce
1812 req_body
= self
.KDC_REQ_BODY_create(
1813 kdc_options
=kdc_options
,
1817 from_time
=from_time
,
1818 till_time
=till_time
,
1819 renew_time
=renew_time
,
1822 addresses
=addresses
,
1823 additional_tickets
=additional_tickets
,
1824 EncAuthorizationData
=EncAuthorizationData
,
1825 EncAuthorizationData_key
=EncAuthorizationData_key
,
1826 EncAuthorizationData_usage
=EncAuthorizationData_usage
)
1828 inner_req_body
= dict(req_body
)
1829 if inner_req
is not None:
1830 for key
, value
in inner_req
.items():
1831 if value
is not None:
1832 inner_req_body
[key
] = value
1834 del inner_req_body
[key
]
1835 if outer_req
is not None:
1836 for key
, value
in outer_req
.items():
1837 if value
is not None:
1838 req_body
[key
] = value
1842 additional_padata
= []
1843 if pac_request
is not None:
1844 pa_pac_request
= self
.KERB_PA_PAC_REQUEST_create(pac_request
)
1845 additional_padata
.append(pa_pac_request
)
1846 if pac_options
is not None:
1847 pa_pac_options
= self
.get_pa_pac_options(pac_options
)
1848 additional_padata
.append(pa_pac_options
)
1850 if req_msg_type
== KRB_AS_REQ
:
1852 tgs_req_padata
= None
1854 self
.assertEqual(KRB_TGS_REQ
, req_msg_type
)
1856 tgs_req
= self
.generate_ap_req(kdc_exchange_dict
,
1860 tgs_req_padata
= self
.PA_DATA_create(PADATA_KDC_REQ
, tgs_req
)
1862 if generate_fast_padata_fn
is not None:
1863 self
.assertIsNotNone(generate_fast_fn
)
1864 # This can alter req_body...
1865 fast_padata
, req_body
= generate_fast_padata_fn(kdc_exchange_dict
,
1871 if generate_fast_armor_fn
is not None:
1872 self
.assertIsNotNone(generate_fast_fn
)
1873 fast_ap_req
= generate_fast_armor_fn(kdc_exchange_dict
,
1878 fast_armor_type
= kdc_exchange_dict
['fast_armor_type']
1879 fast_armor
= self
.KRB_FAST_ARMOR_create(fast_armor_type
,
1884 if generate_padata_fn
is not None:
1885 # This can alter req_body...
1886 outer_padata
, req_body
= generate_padata_fn(kdc_exchange_dict
,
1889 self
.assertIsNotNone(outer_padata
)
1890 self
.assertNotIn(PADATA_KDC_REQ
,
1891 [pa
['padata-type'] for pa
in outer_padata
],
1892 'Don\'t create TGS-REQ manually')
1896 if generate_fast_fn
is not None:
1897 armor_key
= kdc_exchange_dict
['armor_key']
1898 self
.assertIsNotNone(armor_key
)
1900 if req_msg_type
== KRB_AS_REQ
:
1901 checksum_blob
= self
.der_encode(
1903 asn1Spec
=krb5_asn1
.KDC_REQ_BODY())
1905 self
.assertEqual(KRB_TGS_REQ
, req_msg_type
)
1906 checksum_blob
= tgs_req
1908 checksum
= self
.Checksum_create(armor_key
,
1912 fast_padata
+= additional_padata
1913 fast
= generate_fast_fn(kdc_exchange_dict
,
1924 if tgs_req_padata
is not None:
1925 padata
.append(tgs_req_padata
)
1927 if fast
is not None:
1930 if outer_padata
is not None:
1931 padata
+= outer_padata
1934 padata
+= additional_padata
1939 kdc_exchange_dict
['req_padata'] = padata
1940 kdc_exchange_dict
['fast_padata'] = fast_padata
1941 kdc_exchange_dict
['req_body'] = inner_req_body
1943 req_obj
, req_decoded
= self
.KDC_REQ_create(msg_type
=req_msg_type
,
1946 asn1Spec
=req_asn1Spec())
1948 to_rodc
= kdc_exchange_dict
['to_rodc']
1950 rep
= self
.send_recv_transaction(req_decoded
, to_rodc
=to_rodc
)
1951 self
.assertIsNotNone(rep
)
1953 msg_type
= self
.getElementValue(rep
, 'msg-type')
1954 self
.assertIsNotNone(msg_type
)
1956 expected_msg_type
= None
1957 if check_error_fn
is not None:
1958 expected_msg_type
= KRB_ERROR
1959 self
.assertIsNone(check_rep_fn
)
1960 self
.assertNotEqual(0, len(expected_error_mode
))
1961 self
.assertNotIn(0, expected_error_mode
)
1962 if check_rep_fn
is not None:
1963 expected_msg_type
= rep_msg_type
1964 self
.assertIsNone(check_error_fn
)
1965 self
.assertEqual(0, len(expected_error_mode
))
1966 self
.assertIsNotNone(expected_msg_type
)
1967 if msg_type
== KRB_ERROR
:
1968 error_code
= self
.getElementValue(rep
, 'error-code')
1969 fail_msg
= f
'Got unexpected error: {error_code}'
1971 fail_msg
= f
'Expected to fail with error: {expected_error_mode}'
1972 self
.assertEqual(msg_type
, expected_msg_type
, fail_msg
)
1974 if msg_type
== KRB_ERROR
:
1975 return check_error_fn(kdc_exchange_dict
,
1979 return check_rep_fn(kdc_exchange_dict
, callback_dict
, rep
)
1981 def as_exchange_dict(self
,
1982 expected_crealm
=None,
1983 expected_cname
=None,
1984 expected_anon
=False,
1985 expected_srealm
=None,
1986 expected_sname
=None,
1987 expected_supported_etypes
=None,
1988 expected_flags
=None,
1989 unexpected_flags
=None,
1990 ticket_decryption_key
=None,
1991 expect_ticket_checksum
=None,
1992 generate_fast_fn
=None,
1993 generate_fast_armor_fn
=None,
1994 generate_fast_padata_fn
=None,
1995 fast_armor_type
=FX_FAST_ARMOR_AP_REQUEST
,
1996 generate_padata_fn
=None,
1997 check_error_fn
=None,
1999 check_kdc_private_fn
=None,
2001 expected_error_mode
=0,
2002 expected_status
=None,
2003 client_as_etypes
=None,
2005 authenticator_subkey
=None,
2020 if expected_error_mode
== 0:
2021 expected_error_mode
= ()
2022 elif not isinstance(expected_error_mode
, collections
.abc
.Container
):
2023 expected_error_mode
= (expected_error_mode
,)
2025 kdc_exchange_dict
= {
2026 'req_msg_type': KRB_AS_REQ
,
2027 'req_asn1Spec': krb5_asn1
.AS_REQ
,
2028 'rep_msg_type': KRB_AS_REP
,
2029 'rep_asn1Spec': krb5_asn1
.AS_REP
,
2030 'rep_encpart_asn1Spec': krb5_asn1
.EncASRepPart
,
2031 'expected_crealm': expected_crealm
,
2032 'expected_cname': expected_cname
,
2033 'expected_anon': expected_anon
,
2034 'expected_srealm': expected_srealm
,
2035 'expected_sname': expected_sname
,
2036 'expected_supported_etypes': expected_supported_etypes
,
2037 'expected_flags': expected_flags
,
2038 'unexpected_flags': unexpected_flags
,
2039 'ticket_decryption_key': ticket_decryption_key
,
2040 'expect_ticket_checksum': expect_ticket_checksum
,
2041 'generate_fast_fn': generate_fast_fn
,
2042 'generate_fast_armor_fn': generate_fast_armor_fn
,
2043 'generate_fast_padata_fn': generate_fast_padata_fn
,
2044 'fast_armor_type': fast_armor_type
,
2045 'generate_padata_fn': generate_padata_fn
,
2046 'check_error_fn': check_error_fn
,
2047 'check_rep_fn': check_rep_fn
,
2048 'check_kdc_private_fn': check_kdc_private_fn
,
2049 'callback_dict': callback_dict
,
2050 'expected_error_mode': expected_error_mode
,
2051 'expected_status': expected_status
,
2052 'client_as_etypes': client_as_etypes
,
2053 'expected_salt': expected_salt
,
2054 'authenticator_subkey': authenticator_subkey
,
2055 'preauth_key': preauth_key
,
2056 'armor_key': armor_key
,
2057 'armor_tgt': armor_tgt
,
2058 'armor_subkey': armor_subkey
,
2059 'auth_data': auth_data
,
2060 'kdc_options': kdc_options
,
2061 'inner_req': inner_req
,
2062 'outer_req': outer_req
,
2063 'pac_request': pac_request
,
2064 'pac_options': pac_options
,
2065 'expect_edata': expect_edata
,
2066 'expect_pac': expect_pac
,
2067 'expect_claims': expect_claims
,
2070 if callback_dict
is None:
2073 return kdc_exchange_dict
2075 def tgs_exchange_dict(self
,
2076 expected_crealm
=None,
2077 expected_cname
=None,
2078 expected_anon
=False,
2079 expected_srealm
=None,
2080 expected_sname
=None,
2081 expected_supported_etypes
=None,
2082 expected_flags
=None,
2083 unexpected_flags
=None,
2084 ticket_decryption_key
=None,
2085 expect_ticket_checksum
=None,
2086 generate_fast_fn
=None,
2087 generate_fast_armor_fn
=None,
2088 generate_fast_padata_fn
=None,
2089 fast_armor_type
=FX_FAST_ARMOR_AP_REQUEST
,
2090 generate_padata_fn
=None,
2091 check_error_fn
=None,
2093 check_kdc_private_fn
=None,
2094 expected_error_mode
=0,
2095 expected_status
=None,
2101 authenticator_subkey
=None,
2103 body_checksum_type
=None,
2112 expected_proxy_target
=None,
2113 expected_transited_services
=None,
2115 if expected_error_mode
== 0:
2116 expected_error_mode
= ()
2117 elif not isinstance(expected_error_mode
, collections
.abc
.Container
):
2118 expected_error_mode
= (expected_error_mode
,)
2120 kdc_exchange_dict
= {
2121 'req_msg_type': KRB_TGS_REQ
,
2122 'req_asn1Spec': krb5_asn1
.TGS_REQ
,
2123 'rep_msg_type': KRB_TGS_REP
,
2124 'rep_asn1Spec': krb5_asn1
.TGS_REP
,
2125 'rep_encpart_asn1Spec': krb5_asn1
.EncTGSRepPart
,
2126 'expected_crealm': expected_crealm
,
2127 'expected_cname': expected_cname
,
2128 'expected_anon': expected_anon
,
2129 'expected_srealm': expected_srealm
,
2130 'expected_sname': expected_sname
,
2131 'expected_supported_etypes': expected_supported_etypes
,
2132 'expected_flags': expected_flags
,
2133 'unexpected_flags': unexpected_flags
,
2134 'ticket_decryption_key': ticket_decryption_key
,
2135 'expect_ticket_checksum': expect_ticket_checksum
,
2136 'generate_fast_fn': generate_fast_fn
,
2137 'generate_fast_armor_fn': generate_fast_armor_fn
,
2138 'generate_fast_padata_fn': generate_fast_padata_fn
,
2139 'fast_armor_type': fast_armor_type
,
2140 'generate_padata_fn': generate_padata_fn
,
2141 'check_error_fn': check_error_fn
,
2142 'check_rep_fn': check_rep_fn
,
2143 'check_kdc_private_fn': check_kdc_private_fn
,
2144 'callback_dict': callback_dict
,
2145 'expected_error_mode': expected_error_mode
,
2146 'expected_status': expected_status
,
2148 'body_checksum_type': body_checksum_type
,
2149 'armor_key': armor_key
,
2150 'armor_tgt': armor_tgt
,
2151 'armor_subkey': armor_subkey
,
2152 'auth_data': auth_data
,
2153 'authenticator_subkey': authenticator_subkey
,
2154 'kdc_options': kdc_options
,
2155 'inner_req': inner_req
,
2156 'outer_req': outer_req
,
2157 'pac_request': pac_request
,
2158 'pac_options': pac_options
,
2159 'expect_edata': expect_edata
,
2160 'expect_pac': expect_pac
,
2161 'expect_claims': expect_claims
,
2162 'expected_proxy_target': expected_proxy_target
,
2163 'expected_transited_services': expected_transited_services
,
2166 if callback_dict
is None:
2169 return kdc_exchange_dict
2171 def generic_check_kdc_rep(self
,
2176 expected_crealm
= kdc_exchange_dict
['expected_crealm']
2177 expected_anon
= kdc_exchange_dict
['expected_anon']
2178 expected_srealm
= kdc_exchange_dict
['expected_srealm']
2179 expected_sname
= kdc_exchange_dict
['expected_sname']
2180 ticket_decryption_key
= kdc_exchange_dict
['ticket_decryption_key']
2181 check_kdc_private_fn
= kdc_exchange_dict
['check_kdc_private_fn']
2182 rep_encpart_asn1Spec
= kdc_exchange_dict
['rep_encpart_asn1Spec']
2183 msg_type
= kdc_exchange_dict
['rep_msg_type']
2184 armor_key
= kdc_exchange_dict
['armor_key']
2186 self
.assertElementEqual(rep
, 'msg-type', msg_type
) # AS-REP | TGS-REP
2187 padata
= self
.getElementValue(rep
, 'padata')
2188 if self
.strict_checking
:
2189 self
.assertElementEqualUTF8(rep
, 'crealm', expected_crealm
)
2191 expected_cname
= self
.PrincipalName_create(
2192 name_type
=NT_WELLKNOWN
,
2193 names
=['WELLKNOWN', 'ANONYMOUS'])
2195 expected_cname
= kdc_exchange_dict
['expected_cname']
2196 self
.assertElementEqualPrincipal(rep
, 'cname', expected_cname
)
2197 self
.assertElementPresent(rep
, 'ticket')
2198 ticket
= self
.getElementValue(rep
, 'ticket')
2199 ticket_encpart
= None
2200 ticket_cipher
= None
2201 self
.assertIsNotNone(ticket
)
2202 if ticket
is not None: # Never None, but gives indentation
2203 self
.assertElementEqual(ticket
, 'tkt-vno', 5)
2204 self
.assertElementEqualUTF8(ticket
, 'realm', expected_srealm
)
2205 self
.assertElementEqualPrincipal(ticket
, 'sname', expected_sname
)
2206 self
.assertElementPresent(ticket
, 'enc-part')
2207 ticket_encpart
= self
.getElementValue(ticket
, 'enc-part')
2208 self
.assertIsNotNone(ticket_encpart
)
2209 if ticket_encpart
is not None: # Never None, but gives indentation
2210 self
.assertElementPresent(ticket_encpart
, 'etype')
2211 # 'unspecified' means present, with any value != 0
2212 self
.assertElementKVNO(ticket_encpart
, 'kvno',
2213 self
.unspecified_kvno
)
2214 self
.assertElementPresent(ticket_encpart
, 'cipher')
2215 ticket_cipher
= self
.getElementValue(ticket_encpart
, 'cipher')
2216 self
.assertElementPresent(rep
, 'enc-part')
2217 encpart
= self
.getElementValue(rep
, 'enc-part')
2218 encpart_cipher
= None
2219 self
.assertIsNotNone(encpart
)
2220 if encpart
is not None: # Never None, but gives indentation
2221 self
.assertElementPresent(encpart
, 'etype')
2222 self
.assertElementKVNO(ticket_encpart
, 'kvno', 'autodetect')
2223 self
.assertElementPresent(encpart
, 'cipher')
2224 encpart_cipher
= self
.getElementValue(encpart
, 'cipher')
2226 ticket_checksum
= None
2228 # Get the decryption key for the encrypted part
2229 encpart_decryption_key
, encpart_decryption_usage
= (
2230 self
.get_preauth_key(kdc_exchange_dict
))
2232 if armor_key
is not None:
2233 pa_dict
= self
.get_pa_dict(padata
)
2235 if PADATA_FX_FAST
in pa_dict
:
2236 fx_fast_data
= pa_dict
[PADATA_FX_FAST
]
2237 fast_response
= self
.check_fx_fast_data(kdc_exchange_dict
,
2242 if 'strengthen-key' in fast_response
:
2243 strengthen_key
= self
.EncryptionKey_import(
2244 fast_response
['strengthen-key'])
2245 encpart_decryption_key
= (
2246 self
.generate_strengthen_reply_key(
2248 encpart_decryption_key
))
2250 fast_finished
= fast_response
.get('finished')
2251 if fast_finished
is not None:
2252 ticket_checksum
= fast_finished
['ticket-checksum']
2254 self
.check_rep_padata(kdc_exchange_dict
,
2256 fast_response
['padata'],
2259 ticket_private
= None
2260 if ticket_decryption_key
is not None:
2261 self
.assertElementEqual(ticket_encpart
, 'etype',
2262 ticket_decryption_key
.etype
)
2263 self
.assertElementKVNO(ticket_encpart
, 'kvno',
2264 ticket_decryption_key
.kvno
)
2265 ticket_decpart
= ticket_decryption_key
.decrypt(KU_TICKET
,
2267 ticket_private
= self
.der_decode(
2269 asn1Spec
=krb5_asn1
.EncTicketPart())
2271 encpart_private
= None
2272 self
.assertIsNotNone(encpart_decryption_key
)
2273 if encpart_decryption_key
is not None:
2274 self
.assertElementEqual(encpart
, 'etype',
2275 encpart_decryption_key
.etype
)
2276 if self
.strict_checking
:
2277 self
.assertElementKVNO(encpart
, 'kvno',
2278 encpart_decryption_key
.kvno
)
2279 rep_decpart
= encpart_decryption_key
.decrypt(
2280 encpart_decryption_usage
,
2282 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2283 # application tag 26
2285 encpart_private
= self
.der_decode(
2287 asn1Spec
=rep_encpart_asn1Spec())
2289 encpart_private
= self
.der_decode(
2291 asn1Spec
=krb5_asn1
.EncTGSRepPart())
2293 self
.assertIsNotNone(check_kdc_private_fn
)
2294 if check_kdc_private_fn
is not None:
2295 check_kdc_private_fn(kdc_exchange_dict
, callback_dict
,
2296 rep
, ticket_private
, encpart_private
,
2301 def check_fx_fast_data(self
,
2306 expect_strengthen_key
=True):
2307 fx_fast_data
= self
.der_decode(fx_fast_data
,
2308 asn1Spec
=krb5_asn1
.PA_FX_FAST_REPLY())
2310 enc_fast_rep
= fx_fast_data
['armored-data']['enc-fast-rep']
2311 self
.assertEqual(enc_fast_rep
['etype'], armor_key
.etype
)
2313 fast_rep
= armor_key
.decrypt(KU_FAST_REP
, enc_fast_rep
['cipher'])
2315 fast_response
= self
.der_decode(fast_rep
,
2316 asn1Spec
=krb5_asn1
.KrbFastResponse())
2318 if expect_strengthen_key
and self
.strict_checking
:
2319 self
.assertIn('strengthen-key', fast_response
)
2322 self
.assertIn('finished', fast_response
)
2324 # Ensure that the nonce matches the nonce in the body of the request
2326 nonce
= kdc_exchange_dict
['nonce']
2327 self
.assertEqual(nonce
, fast_response
['nonce'])
2329 return fast_response
2331 def generic_check_kdc_private(self
,
2338 kdc_options
= kdc_exchange_dict
['kdc_options']
2339 canon_pos
= len(tuple(krb5_asn1
.KDCOptions('canonicalize'))) - 1
2340 canonicalize
= (canon_pos
< len(kdc_options
)
2341 and kdc_options
[canon_pos
] == '1')
2342 renewable_pos
= len(tuple(krb5_asn1
.KDCOptions('renewable'))) - 1
2343 renewable
= (renewable_pos
< len(kdc_options
)
2344 and kdc_options
[renewable_pos
] == '1')
2346 expected_crealm
= kdc_exchange_dict
['expected_crealm']
2347 expected_cname
= kdc_exchange_dict
['expected_cname']
2348 expected_srealm
= kdc_exchange_dict
['expected_srealm']
2349 expected_sname
= kdc_exchange_dict
['expected_sname']
2350 ticket_decryption_key
= kdc_exchange_dict
['ticket_decryption_key']
2352 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
2354 expected_flags
= kdc_exchange_dict
.get('expected_flags')
2355 unexpected_flags
= kdc_exchange_dict
.get('unexpected_flags')
2357 ticket
= self
.getElementValue(rep
, 'ticket')
2359 if ticket_checksum
is not None:
2360 armor_key
= kdc_exchange_dict
['armor_key']
2361 self
.verify_ticket_checksum(ticket
, ticket_checksum
, armor_key
)
2363 to_rodc
= kdc_exchange_dict
['to_rodc']
2365 krbtgt_creds
= self
.get_rodc_krbtgt_creds()
2367 krbtgt_creds
= self
.get_krbtgt_creds()
2368 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
2370 expect_pac
= kdc_exchange_dict
['expect_pac']
2372 ticket_session_key
= None
2373 if ticket_private
is not None:
2374 self
.assertElementFlags(ticket_private
, 'flags',
2377 self
.assertElementPresent(ticket_private
, 'key')
2378 ticket_key
= self
.getElementValue(ticket_private
, 'key')
2379 self
.assertIsNotNone(ticket_key
)
2380 if ticket_key
is not None: # Never None, but gives indentation
2381 self
.assertElementPresent(ticket_key
, 'keytype')
2382 self
.assertElementPresent(ticket_key
, 'keyvalue')
2383 ticket_session_key
= self
.EncryptionKey_import(ticket_key
)
2384 self
.assertElementEqualUTF8(ticket_private
, 'crealm',
2386 if self
.strict_checking
:
2387 self
.assertElementEqualPrincipal(ticket_private
, 'cname',
2389 self
.assertElementPresent(ticket_private
, 'transited')
2390 self
.assertElementPresent(ticket_private
, 'authtime')
2391 if self
.strict_checking
:
2392 self
.assertElementPresent(ticket_private
, 'starttime')
2393 self
.assertElementPresent(ticket_private
, 'endtime')
2395 if self
.strict_checking
:
2396 self
.assertElementPresent(ticket_private
, 'renew-till')
2398 self
.assertElementMissing(ticket_private
, 'renew-till')
2399 if self
.strict_checking
:
2400 self
.assertElementEqual(ticket_private
, 'caddr', [])
2401 self
.assertElementPresent(ticket_private
, 'authorization-data',
2402 expect_empty
=not expect_pac
)
2404 encpart_session_key
= None
2405 if encpart_private
is not None:
2406 self
.assertElementPresent(encpart_private
, 'key')
2407 encpart_key
= self
.getElementValue(encpart_private
, 'key')
2408 self
.assertIsNotNone(encpart_key
)
2409 if encpart_key
is not None: # Never None, but gives indentation
2410 self
.assertElementPresent(encpart_key
, 'keytype')
2411 self
.assertElementPresent(encpart_key
, 'keyvalue')
2412 encpart_session_key
= self
.EncryptionKey_import(encpart_key
)
2413 self
.assertElementPresent(encpart_private
, 'last-req')
2414 self
.assertElementEqual(encpart_private
, 'nonce',
2415 kdc_exchange_dict
['nonce'])
2416 if rep_msg_type
== KRB_AS_REP
:
2417 if self
.strict_checking
:
2418 self
.assertElementPresent(encpart_private
,
2421 self
.assertElementMissing(encpart_private
,
2423 self
.assertElementFlags(encpart_private
, 'flags',
2426 self
.assertElementPresent(encpart_private
, 'authtime')
2427 if self
.strict_checking
:
2428 self
.assertElementPresent(encpart_private
, 'starttime')
2429 self
.assertElementPresent(encpart_private
, 'endtime')
2431 if self
.strict_checking
:
2432 self
.assertElementPresent(encpart_private
, 'renew-till')
2434 self
.assertElementMissing(encpart_private
, 'renew-till')
2435 self
.assertElementEqualUTF8(encpart_private
, 'srealm',
2437 self
.assertElementEqualPrincipal(encpart_private
, 'sname',
2439 if self
.strict_checking
:
2440 self
.assertElementEqual(encpart_private
, 'caddr', [])
2442 sent_pac_options
= self
.get_sent_pac_options(kdc_exchange_dict
)
2444 if self
.strict_checking
:
2445 if canonicalize
or '1' in sent_pac_options
:
2446 self
.assertElementPresent(encpart_private
,
2447 'encrypted-pa-data')
2448 enc_pa_dict
= self
.get_pa_dict(
2449 encpart_private
['encrypted-pa-data'])
2451 self
.assertIn(PADATA_SUPPORTED_ETYPES
, enc_pa_dict
)
2453 expected_supported_etypes
= kdc_exchange_dict
[
2454 'expected_supported_etypes']
2455 expected_supported_etypes |
= (
2456 security
.KERB_ENCTYPE_DES_CBC_CRC |
2457 security
.KERB_ENCTYPE_DES_CBC_MD5 |
2458 security
.KERB_ENCTYPE_RC4_HMAC_MD5
)
2460 (supported_etypes
,) = struct
.unpack(
2462 enc_pa_dict
[PADATA_SUPPORTED_ETYPES
])
2464 self
.assertEqual(supported_etypes
,
2465 expected_supported_etypes
)
2467 self
.assertNotIn(PADATA_SUPPORTED_ETYPES
, enc_pa_dict
)
2469 if '1' in sent_pac_options
:
2470 self
.assertIn(PADATA_PAC_OPTIONS
, enc_pa_dict
)
2472 pac_options
= self
.der_decode(
2473 enc_pa_dict
[PADATA_PAC_OPTIONS
],
2474 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
2476 self
.assertElementEqual(pac_options
, 'options',
2479 self
.assertNotIn(PADATA_PAC_OPTIONS
, enc_pa_dict
)
2481 self
.assertElementEqual(encpart_private
,
2482 'encrypted-pa-data',
2485 if ticket_session_key
is not None and encpart_session_key
is not None:
2486 self
.assertEqual(ticket_session_key
.etype
,
2487 encpart_session_key
.etype
)
2488 self
.assertEqual(ticket_session_key
.key
.contents
,
2489 encpart_session_key
.key
.contents
)
2490 if encpart_session_key
is not None:
2491 session_key
= encpart_session_key
2493 session_key
= ticket_session_key
2494 ticket_creds
= KerberosTicketCreds(
2497 crealm
=expected_crealm
,
2498 cname
=expected_cname
,
2499 srealm
=expected_srealm
,
2500 sname
=expected_sname
,
2501 decryption_key
=ticket_decryption_key
,
2502 ticket_private
=ticket_private
,
2503 encpart_private
=encpart_private
)
2505 if ticket_private
is not None:
2506 pac_data
= self
.get_ticket_pac(ticket_creds
, expect_pac
=expect_pac
)
2508 self
.check_pac_buffers(pac_data
, kdc_exchange_dict
)
2510 self
.assertIsNone(pac_data
)
2512 expect_ticket_checksum
= kdc_exchange_dict
['expect_ticket_checksum']
2513 if expect_ticket_checksum
:
2514 self
.assertIsNotNone(ticket_decryption_key
)
2516 if ticket_decryption_key
is not None:
2517 self
.verify_ticket(ticket_creds
, krbtgt_key
, expect_pac
=expect_pac
,
2518 expect_ticket_checksum
=expect_ticket_checksum
2519 or self
.tkt_sig_support
)
2521 kdc_exchange_dict
['rep_ticket_creds'] = ticket_creds
2523 def check_pac_buffers(self
, pac_data
, kdc_exchange_dict
):
2524 pac
= ndr_unpack(krb5pac
.PAC_DATA
, pac_data
)
2526 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
2527 armor_tgt
= kdc_exchange_dict
['armor_tgt']
2529 expected_sname
= kdc_exchange_dict
['expected_sname']
2530 expect_claims
= kdc_exchange_dict
['expect_claims']
2532 expected_types
= [krb5pac
.PAC_TYPE_LOGON_INFO
,
2533 krb5pac
.PAC_TYPE_SRV_CHECKSUM
,
2534 krb5pac
.PAC_TYPE_KDC_CHECKSUM
,
2535 krb5pac
.PAC_TYPE_LOGON_NAME
,
2536 krb5pac
.PAC_TYPE_UPN_DNS_INFO
]
2538 kdc_options
= kdc_exchange_dict
['kdc_options']
2539 pos
= len(tuple(krb5_asn1
.KDCOptions('cname-in-addl-tkt'))) - 1
2540 constrained_delegation
= (pos
< len(kdc_options
)
2541 and kdc_options
[pos
] == '1')
2542 if constrained_delegation
:
2543 expected_types
.append(krb5pac
.PAC_TYPE_CONSTRAINED_DELEGATION
)
2545 if self
.kdc_fast_support
:
2547 expected_types
.append(krb5pac
.PAC_TYPE_CLIENT_CLAIMS_INFO
)
2549 if (rep_msg_type
== KRB_TGS_REP
2550 and armor_tgt
is not None):
2551 expected_types
.append(krb5pac
.PAC_TYPE_DEVICE_INFO
)
2552 expected_types
.append(krb5pac
.PAC_TYPE_DEVICE_CLAIMS_INFO
)
2554 if not self
.is_tgs(expected_sname
):
2555 expected_types
.append(krb5pac
.PAC_TYPE_TICKET_CHECKSUM
)
2557 if self
.strict_checking
:
2558 buffer_types
= [pac_buffer
.type
2559 for pac_buffer
in pac
.buffers
]
2560 self
.assertCountEqual(expected_types
, buffer_types
,
2561 f
'expected: {expected_types} '
2562 f
'got: {buffer_types}')
2564 for pac_buffer
in pac
.buffers
:
2565 if pac_buffer
.type == krb5pac
.PAC_TYPE_CONSTRAINED_DELEGATION
:
2566 expected_proxy_target
= kdc_exchange_dict
[
2567 'expected_proxy_target']
2568 expected_transited_services
= kdc_exchange_dict
[
2569 'expected_transited_services']
2571 delegation_info
= pac_buffer
.info
.info
2573 self
.assertEqual(expected_proxy_target
,
2574 str(delegation_info
.proxy_target
))
2576 transited_services
= list(map(
2577 str, delegation_info
.transited_services
))
2578 self
.assertEqual(expected_transited_services
,
2581 elif pac_buffer
.type == krb5pac
.PAC_TYPE_LOGON_NAME
:
2582 expected_cname
= kdc_exchange_dict
['expected_cname']
2583 account_name
= expected_cname
['name-string'][0]
2585 self
.assertEqual(account_name
, pac_buffer
.info
.account_name
)
2587 def generic_check_kdc_error(self
,
2593 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
2595 expected_anon
= kdc_exchange_dict
['expected_anon']
2596 expected_srealm
= kdc_exchange_dict
['expected_srealm']
2597 expected_sname
= kdc_exchange_dict
['expected_sname']
2598 expected_error_mode
= kdc_exchange_dict
['expected_error_mode']
2600 sent_fast
= self
.sent_fast(kdc_exchange_dict
)
2602 fast_armor_type
= kdc_exchange_dict
['fast_armor_type']
2604 self
.assertElementEqual(rep
, 'pvno', 5)
2605 self
.assertElementEqual(rep
, 'msg-type', KRB_ERROR
)
2606 error_code
= self
.getElementValue(rep
, 'error-code')
2607 self
.assertIn(error_code
, expected_error_mode
)
2608 if self
.strict_checking
:
2609 self
.assertElementMissing(rep
, 'ctime')
2610 self
.assertElementMissing(rep
, 'cusec')
2611 self
.assertElementPresent(rep
, 'stime')
2612 self
.assertElementPresent(rep
, 'susec')
2613 # error-code checked above
2614 if self
.strict_checking
:
2615 self
.assertElementMissing(rep
, 'crealm')
2616 if expected_anon
and not inner
:
2617 expected_cname
= self
.PrincipalName_create(
2618 name_type
=NT_WELLKNOWN
,
2619 names
=['WELLKNOWN', 'ANONYMOUS'])
2620 self
.assertElementEqualPrincipal(rep
, 'cname', expected_cname
)
2622 self
.assertElementMissing(rep
, 'cname')
2623 self
.assertElementEqualUTF8(rep
, 'realm', expected_srealm
)
2624 self
.assertElementEqualPrincipal(rep
, 'sname', expected_sname
)
2625 self
.assertElementMissing(rep
, 'e-text')
2626 expected_status
= kdc_exchange_dict
['expected_status']
2627 expect_edata
= kdc_exchange_dict
['expect_edata']
2628 if expect_edata
is None:
2629 expect_edata
= (error_code
!= KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
2630 and (not sent_fast
or fast_armor_type
is None
2631 or fast_armor_type
== FX_FAST_ARMOR_AP_REQUEST
)
2633 if not expect_edata
:
2634 self
.assertIsNone(expected_status
)
2635 self
.assertElementMissing(rep
, 'e-data')
2637 edata
= self
.getElementValue(rep
, 'e-data')
2638 if self
.strict_checking
:
2639 self
.assertIsNotNone(edata
)
2640 if edata
is not None:
2641 if rep_msg_type
== KRB_TGS_REP
and not sent_fast
:
2642 error_data
= self
.der_decode(
2644 asn1Spec
=krb5_asn1
.KERB_ERROR_DATA())
2645 self
.assertEqual(KERB_ERR_TYPE_EXTENDED
,
2646 error_data
['data-type'])
2648 extended_error
= error_data
['data-value']
2650 self
.assertEqual(12, len(extended_error
))
2652 status
= int.from_bytes(extended_error
[:4], 'little')
2653 flags
= int.from_bytes(extended_error
[8:], 'little')
2655 self
.assertEqual(expected_status
, status
)
2657 self
.assertEqual(3, flags
)
2659 self
.assertIsNone(expected_status
)
2661 rep_padata
= self
.der_decode(edata
,
2662 asn1Spec
=krb5_asn1
.METHOD_DATA())
2663 self
.assertGreater(len(rep_padata
), 0)
2666 self
.assertEqual(1, len(rep_padata
))
2667 rep_pa_dict
= self
.get_pa_dict(rep_padata
)
2668 self
.assertIn(PADATA_FX_FAST
, rep_pa_dict
)
2670 armor_key
= kdc_exchange_dict
['armor_key']
2671 self
.assertIsNotNone(armor_key
)
2672 fast_response
= self
.check_fx_fast_data(
2674 rep_pa_dict
[PADATA_FX_FAST
],
2676 expect_strengthen_key
=False)
2678 rep_padata
= fast_response
['padata']
2680 etype_info2
= self
.check_rep_padata(kdc_exchange_dict
,
2685 kdc_exchange_dict
['preauth_etype_info2'] = etype_info2
2689 def check_rep_padata(self
,
2694 rep_msg_type
= kdc_exchange_dict
['rep_msg_type']
2696 req_body
= kdc_exchange_dict
['req_body']
2697 proposed_etypes
= req_body
['etype']
2698 client_as_etypes
= kdc_exchange_dict
.get('client_as_etypes', [])
2700 sent_fast
= self
.sent_fast(kdc_exchange_dict
)
2701 sent_enc_challenge
= self
.sent_enc_challenge(kdc_exchange_dict
)
2703 if rep_msg_type
== KRB_TGS_REP
:
2704 self
.assertTrue(sent_fast
)
2706 expect_etype_info2
= ()
2707 expect_etype_info
= False
2708 unexpect_etype_info
= True
2709 expected_aes_type
= 0
2710 expected_rc4_type
= 0
2711 if kcrypto
.Enctype
.RC4
in proposed_etypes
:
2712 expect_etype_info
= True
2713 for etype
in proposed_etypes
:
2714 if etype
not in client_as_etypes
:
2716 if etype
in (kcrypto
.Enctype
.AES256
, kcrypto
.Enctype
.AES128
):
2717 expect_etype_info
= False
2718 if etype
> expected_aes_type
:
2719 expected_aes_type
= etype
2720 if etype
in (kcrypto
.Enctype
.RC4
,) and error_code
!= 0:
2721 unexpect_etype_info
= False
2722 if etype
> expected_rc4_type
:
2723 expected_rc4_type
= etype
2725 if expected_aes_type
!= 0:
2726 expect_etype_info2
+= (expected_aes_type
,)
2727 if expected_rc4_type
!= 0:
2728 expect_etype_info2
+= (expected_rc4_type
,)
2730 expected_patypes
= ()
2731 if sent_fast
and error_code
!= 0:
2732 expected_patypes
+= (PADATA_FX_ERROR
,)
2733 expected_patypes
+= (PADATA_FX_COOKIE
,)
2735 if rep_msg_type
== KRB_TGS_REP
:
2736 sent_pac_options
= self
.get_sent_pac_options(kdc_exchange_dict
)
2737 if ('1' in sent_pac_options
2738 and error_code
not in (0, KDC_ERR_GENERIC
)):
2739 expected_patypes
+= (PADATA_PAC_OPTIONS
,)
2740 elif error_code
!= KDC_ERR_GENERIC
:
2741 if expect_etype_info
:
2742 self
.assertGreater(len(expect_etype_info2
), 0)
2743 expected_patypes
+= (PADATA_ETYPE_INFO
,)
2744 if len(expect_etype_info2
) != 0:
2745 expected_patypes
+= (PADATA_ETYPE_INFO2
,)
2747 if error_code
!= KDC_ERR_PREAUTH_FAILED
:
2749 expected_patypes
+= (PADATA_ENCRYPTED_CHALLENGE
,)
2751 expected_patypes
+= (PADATA_ENC_TIMESTAMP
,)
2753 if not sent_enc_challenge
:
2754 expected_patypes
+= (PADATA_PK_AS_REQ
,)
2755 expected_patypes
+= (PADATA_PK_AS_REP_19
,)
2757 if (self
.kdc_fast_support
2759 and not sent_enc_challenge
):
2760 expected_patypes
+= (PADATA_FX_FAST
,)
2761 expected_patypes
+= (PADATA_FX_COOKIE
,)
2763 got_patypes
= tuple(pa
['padata-type'] for pa
in rep_padata
)
2764 self
.assertSequenceElementsEqual(expected_patypes
, got_patypes
,
2765 require_strict
={PADATA_FX_COOKIE
,
2768 PADATA_PK_AS_REP_19
,
2771 if not expected_patypes
:
2774 pa_dict
= self
.get_pa_dict(rep_padata
)
2776 enc_timestamp
= pa_dict
.get(PADATA_ENC_TIMESTAMP
)
2777 if enc_timestamp
is not None:
2778 self
.assertEqual(len(enc_timestamp
), 0)
2780 pk_as_req
= pa_dict
.get(PADATA_PK_AS_REQ
)
2781 if pk_as_req
is not None:
2782 self
.assertEqual(len(pk_as_req
), 0)
2784 pk_as_rep19
= pa_dict
.get(PADATA_PK_AS_REP_19
)
2785 if pk_as_rep19
is not None:
2786 self
.assertEqual(len(pk_as_rep19
), 0)
2788 fx_fast
= pa_dict
.get(PADATA_FX_FAST
)
2789 if fx_fast
is not None:
2790 self
.assertEqual(len(fx_fast
), 0)
2792 fast_cookie
= pa_dict
.get(PADATA_FX_COOKIE
)
2793 if fast_cookie
is not None:
2794 kdc_exchange_dict
['fast_cookie'] = fast_cookie
2796 fast_error
= pa_dict
.get(PADATA_FX_ERROR
)
2797 if fast_error
is not None:
2798 fast_error
= self
.der_decode(fast_error
,
2799 asn1Spec
=krb5_asn1
.KRB_ERROR())
2800 self
.generic_check_kdc_error(kdc_exchange_dict
,
2805 pac_options
= pa_dict
.get(PADATA_PAC_OPTIONS
)
2806 if pac_options
is not None:
2807 pac_options
= self
.der_decode(
2809 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
2810 self
.assertElementEqual(pac_options
, 'options', sent_pac_options
)
2812 enc_challenge
= pa_dict
.get(PADATA_ENCRYPTED_CHALLENGE
)
2813 if enc_challenge
is not None:
2814 if not sent_enc_challenge
:
2815 self
.assertEqual(len(enc_challenge
), 0)
2817 armor_key
= kdc_exchange_dict
['armor_key']
2818 self
.assertIsNotNone(armor_key
)
2820 preauth_key
, _
= self
.get_preauth_key(kdc_exchange_dict
)
2822 kdc_challenge_key
= self
.generate_kdc_challenge_key(
2823 armor_key
, preauth_key
)
2825 # Ensure that the encrypted challenge FAST factor is supported
2827 if self
.strict_checking
:
2828 self
.assertNotEqual(len(enc_challenge
), 0)
2829 if len(enc_challenge
) != 0:
2830 encrypted_challenge
= self
.der_decode(
2832 asn1Spec
=krb5_asn1
.EncryptedData())
2833 self
.assertEqual(encrypted_challenge
['etype'],
2834 kdc_challenge_key
.etype
)
2836 challenge
= kdc_challenge_key
.decrypt(
2837 KU_ENC_CHALLENGE_KDC
,
2838 encrypted_challenge
['cipher'])
2839 challenge
= self
.der_decode(
2841 asn1Spec
=krb5_asn1
.PA_ENC_TS_ENC())
2843 # Retrieve the returned timestamp.
2844 rep_patime
= challenge
['patimestamp']
2845 self
.assertIn('pausec', challenge
)
2847 # Ensure the returned time is within five minutes of the
2849 rep_time
= self
.get_EpochFromKerberosTime(rep_patime
)
2850 current_time
= time
.time()
2852 self
.assertLess(current_time
- 300, rep_time
)
2853 self
.assertLess(rep_time
, current_time
+ 300)
2855 etype_info2
= pa_dict
.get(PADATA_ETYPE_INFO2
)
2856 if etype_info2
is not None:
2857 etype_info2
= self
.der_decode(etype_info2
,
2858 asn1Spec
=krb5_asn1
.ETYPE_INFO2())
2859 self
.assertGreaterEqual(len(etype_info2
), 1)
2860 if self
.strict_checking
:
2861 self
.assertEqual(len(etype_info2
), len(expect_etype_info2
))
2862 for i
in range(0, len(etype_info2
)):
2863 e
= self
.getElementValue(etype_info2
[i
], 'etype')
2864 if self
.strict_checking
:
2865 self
.assertEqual(e
, expect_etype_info2
[i
])
2866 salt
= self
.getElementValue(etype_info2
[i
], 'salt')
2867 if e
== kcrypto
.Enctype
.RC4
:
2868 if self
.strict_checking
:
2869 self
.assertIsNone(salt
)
2871 self
.assertIsNotNone(salt
)
2872 expected_salt
= kdc_exchange_dict
['expected_salt']
2873 if expected_salt
is not None:
2874 self
.assertEqual(salt
, expected_salt
)
2875 s2kparams
= self
.getElementValue(etype_info2
[i
], 's2kparams')
2876 if self
.strict_checking
:
2877 self
.assertIsNone(s2kparams
)
2879 etype_info
= pa_dict
.get(PADATA_ETYPE_INFO
)
2880 if etype_info
is not None:
2881 etype_info
= self
.der_decode(etype_info
,
2882 asn1Spec
=krb5_asn1
.ETYPE_INFO())
2883 self
.assertEqual(len(etype_info
), 1)
2884 e
= self
.getElementValue(etype_info
[0], 'etype')
2885 self
.assertEqual(e
, kcrypto
.Enctype
.RC4
)
2886 self
.assertEqual(e
, expect_etype_info2
[0])
2887 salt
= self
.getElementValue(etype_info
[0], 'salt')
2888 if self
.strict_checking
:
2889 self
.assertIsNotNone(salt
)
2890 self
.assertEqual(len(salt
), 0)
2894 def generate_simple_fast(self
,
2902 armor_key
= kdc_exchange_dict
['armor_key']
2904 fast_req
= self
.KRB_FAST_REQ_create(fast_options
,
2907 fast_req
= self
.der_encode(fast_req
,
2908 asn1Spec
=krb5_asn1
.KrbFastReq())
2909 fast_req
= self
.EncryptedData_create(armor_key
,
2913 fast_armored_req
= self
.KRB_FAST_ARMORED_REQ_create(fast_armor
,
2917 fx_fast_request
= self
.PA_FX_FAST_REQUEST_create(fast_armored_req
)
2918 fx_fast_request
= self
.der_encode(
2920 asn1Spec
=krb5_asn1
.PA_FX_FAST_REQUEST())
2922 fast_padata
= self
.PA_DATA_create(PADATA_FX_FAST
,
2927 def generate_ap_req(self
,
2933 tgt
= kdc_exchange_dict
['armor_tgt']
2934 authenticator_subkey
= kdc_exchange_dict
['armor_subkey']
2936 req_body_checksum
= None
2938 tgt
= kdc_exchange_dict
['tgt']
2939 authenticator_subkey
= kdc_exchange_dict
['authenticator_subkey']
2940 body_checksum_type
= kdc_exchange_dict
['body_checksum_type']
2942 req_body_blob
= self
.der_encode(req_body
,
2943 asn1Spec
=krb5_asn1
.KDC_REQ_BODY())
2945 req_body_checksum
= self
.Checksum_create(tgt
.session_key
,
2946 KU_TGS_REQ_AUTH_CKSUM
,
2948 ctype
=body_checksum_type
)
2950 auth_data
= kdc_exchange_dict
['auth_data']
2953 if authenticator_subkey
is not None:
2954 subkey_obj
= authenticator_subkey
.export_obj()
2955 seq_number
= random
.randint(0, 0xfffffffe)
2956 (ctime
, cusec
) = self
.get_KerberosTimeWithUsec()
2957 authenticator_obj
= self
.Authenticator_create(
2960 cksum
=req_body_checksum
,
2964 seq_number
=seq_number
,
2965 authorization_data
=auth_data
)
2966 authenticator_blob
= self
.der_encode(
2968 asn1Spec
=krb5_asn1
.Authenticator())
2970 usage
= KU_AP_REQ_AUTH
if armor
else KU_TGS_REQ_AUTH
2971 authenticator
= self
.EncryptedData_create(tgt
.session_key
,
2975 ap_options
= krb5_asn1
.APOptions('0')
2976 ap_req_obj
= self
.AP_REQ_create(ap_options
=str(ap_options
),
2978 authenticator
=authenticator
)
2979 ap_req
= self
.der_encode(ap_req_obj
, asn1Spec
=krb5_asn1
.AP_REQ())
2983 def generate_simple_tgs_padata(self
,
2987 ap_req
= self
.generate_ap_req(kdc_exchange_dict
,
2991 pa_tgs_req
= self
.PA_DATA_create(PADATA_KDC_REQ
, ap_req
)
2992 padata
= [pa_tgs_req
]
2994 return padata
, req_body
2996 def get_preauth_key(self
, kdc_exchange_dict
):
2997 msg_type
= kdc_exchange_dict
['rep_msg_type']
2999 if msg_type
== KRB_AS_REP
:
3000 key
= kdc_exchange_dict
['preauth_key']
3001 usage
= KU_AS_REP_ENC_PART
3003 authenticator_subkey
= kdc_exchange_dict
['authenticator_subkey']
3004 if authenticator_subkey
is not None:
3005 key
= authenticator_subkey
3006 usage
= KU_TGS_REP_ENC_PART_SUB_KEY
3008 tgt
= kdc_exchange_dict
['tgt']
3009 key
= tgt
.session_key
3010 usage
= KU_TGS_REP_ENC_PART_SESSION
3012 self
.assertIsNotNone(key
)
3016 def generate_armor_key(self
, subkey
, session_key
):
3017 armor_key
= kcrypto
.cf2(subkey
.key
,
3021 armor_key
= Krb5EncryptionKey(armor_key
, None)
3025 def generate_strengthen_reply_key(self
, strengthen_key
, reply_key
):
3026 strengthen_reply_key
= kcrypto
.cf2(strengthen_key
.key
,
3030 strengthen_reply_key
= Krb5EncryptionKey(strengthen_reply_key
,
3033 return strengthen_reply_key
3035 def generate_client_challenge_key(self
, armor_key
, longterm_key
):
3036 client_challenge_key
= kcrypto
.cf2(armor_key
.key
,
3038 b
'clientchallengearmor',
3039 b
'challengelongterm')
3040 client_challenge_key
= Krb5EncryptionKey(client_challenge_key
, None)
3042 return client_challenge_key
3044 def generate_kdc_challenge_key(self
, armor_key
, longterm_key
):
3045 kdc_challenge_key
= kcrypto
.cf2(armor_key
.key
,
3047 b
'kdcchallengearmor',
3048 b
'challengelongterm')
3049 kdc_challenge_key
= Krb5EncryptionKey(kdc_challenge_key
, None)
3051 return kdc_challenge_key
3053 def verify_ticket_checksum(self
, ticket
, expected_checksum
, armor_key
):
3054 expected_type
= expected_checksum
['cksumtype']
3055 self
.assertEqual(armor_key
.ctype
, expected_type
)
3057 ticket_blob
= self
.der_encode(ticket
,
3058 asn1Spec
=krb5_asn1
.Ticket())
3059 checksum
= self
.Checksum_create(armor_key
,
3062 self
.assertEqual(expected_checksum
, checksum
)
3064 def verify_ticket(self
, ticket
, krbtgt_key
, expect_pac
=True,
3065 expect_ticket_checksum
=True):
3066 # Check if the ticket is a TGT.
3067 sname
= ticket
.ticket
['sname']
3068 is_tgt
= self
.is_tgs(sname
)
3070 # Decrypt the ticket.
3072 key
= ticket
.decryption_key
3073 enc_part
= ticket
.ticket
['enc-part']
3075 self
.assertElementEqual(enc_part
, 'etype', key
.etype
)
3076 self
.assertElementKVNO(enc_part
, 'kvno', key
.kvno
)
3078 enc_part
= key
.decrypt(KU_TICKET
, enc_part
['cipher'])
3079 enc_part
= self
.der_decode(
3080 enc_part
, asn1Spec
=krb5_asn1
.EncTicketPart())
3082 # Fetch the authorization data from the ticket.
3083 auth_data
= enc_part
.get('authorization-data')
3085 self
.assertIsNotNone(auth_data
)
3086 elif auth_data
is None:
3089 # Get a copy of the authdata with an empty PAC, and the existing PAC
3091 empty_pac
= self
.get_empty_pac()
3092 auth_data
, pac_data
= self
.replace_pac(auth_data
,
3094 expect_pac
=expect_pac
)
3098 # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
3099 # raw type to create a new PAC with zeroed signatures for
3100 # verification. This is because on Windows, the resource_groups field
3101 # is added to PAC_LOGON_INFO after the info3 field has been created,
3102 # which results in a different ordering of pointer values than Samba
3103 # (see commit 0e201ecdc53). Using the raw type avoids changing
3104 # PAC_LOGON_INFO, so verification against Windows can work. We still
3105 # need the PAC_DATA type to retrieve the actual checksums, because the
3106 # signatures in the raw type may contain padding bytes.
3107 pac
= ndr_unpack(krb5pac
.PAC_DATA
,
3109 raw_pac
= ndr_unpack(krb5pac
.PAC_DATA_RAW
,
3114 for pac_buffer
, raw_pac_buffer
in zip(pac
.buffers
, raw_pac
.buffers
):
3115 buffer_type
= pac_buffer
.type
3116 if buffer_type
in self
.pac_checksum_types
:
3117 self
.assertNotIn(buffer_type
, checksums
,
3118 f
'Duplicate checksum type {buffer_type}')
3120 # Fetch the checksum and the checksum type from the PAC buffer.
3121 checksum
= pac_buffer
.info
.signature
3122 ctype
= pac_buffer
.info
.type
3126 checksums
[buffer_type
] = checksum
, ctype
3128 if buffer_type
!= krb5pac
.PAC_TYPE_TICKET_CHECKSUM
:
3129 # Zero the checksum field so that we can later verify the
3130 # checksums. The ticket checksum field is not zeroed.
3132 signature
= ndr_unpack(
3133 krb5pac
.PAC_SIGNATURE_DATA
,
3134 raw_pac_buffer
.info
.remaining
)
3135 signature
.signature
= bytes(len(checksum
))
3136 raw_pac_buffer
.info
.remaining
= ndr_pack(
3139 # Re-encode the PAC.
3140 pac_data
= ndr_pack(raw_pac
)
3142 # Verify the signatures.
3144 server_checksum
, server_ctype
= checksums
[
3145 krb5pac
.PAC_TYPE_SRV_CHECKSUM
]
3146 key
.verify_checksum(KU_NON_KERB_CKSUM_SALT
,
3151 kdc_checksum
, kdc_ctype
= checksums
[
3152 krb5pac
.PAC_TYPE_KDC_CHECKSUM
]
3153 krbtgt_key
.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT
,
3159 self
.assertNotIn(krb5pac
.PAC_TYPE_TICKET_CHECKSUM
, checksums
)
3161 ticket_checksum
, ticket_ctype
= checksums
.get(
3162 krb5pac
.PAC_TYPE_TICKET_CHECKSUM
,
3164 if expect_ticket_checksum
:
3165 self
.assertIsNotNone(ticket_checksum
)
3166 elif expect_ticket_checksum
is False:
3167 self
.assertIsNone(ticket_checksum
)
3168 if ticket_checksum
is not None:
3169 enc_part
['authorization-data'] = auth_data
3170 enc_part
= self
.der_encode(enc_part
,
3171 asn1Spec
=krb5_asn1
.EncTicketPart())
3173 krbtgt_key
.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT
,
3178 def modified_ticket(self
,
3180 new_ticket_key
=None,
3184 update_pac_checksums
=True,
3186 include_checksums
=None):
3187 if checksum_keys
is None:
3188 # A dict containing a key for each checksum type to be created in
3192 if include_checksums
is None:
3193 # A dict containing a value for each checksum type; True if the
3194 # checksum type is to be included in the PAC, False if it is to be
3195 # excluded, or None/not present if the checksum is to be included
3196 # based on its presence in the original PAC.
3197 include_checksums
= {}
3199 # Check that the values passed in by the caller make sense.
3201 self
.assertLessEqual(checksum_keys
.keys(), self
.pac_checksum_types
)
3202 self
.assertLessEqual(include_checksums
.keys(), self
.pac_checksum_types
)
3205 self
.assertIsNone(modify_pac_fn
)
3207 update_pac_checksums
= False
3209 if not update_pac_checksums
:
3210 self
.assertFalse(checksum_keys
)
3211 self
.assertFalse(include_checksums
)
3213 expect_pac
= update_pac_checksums
or modify_pac_fn
is not None
3215 key
= ticket
.decryption_key
3217 if new_ticket_key
is None:
3218 # Use the same key to re-encrypt the ticket.
3219 new_ticket_key
= key
3221 if krb5pac
.PAC_TYPE_SRV_CHECKSUM
not in checksum_keys
:
3222 # If the server signature key is not present, fall back to the key
3223 # used to encrypt the ticket.
3224 checksum_keys
[krb5pac
.PAC_TYPE_SRV_CHECKSUM
] = new_ticket_key
3226 if krb5pac
.PAC_TYPE_TICKET_CHECKSUM
not in checksum_keys
:
3227 # If the ticket signature key is not present, fall back to the key
3228 # used for the KDC signature.
3229 kdc_checksum_key
= checksum_keys
.get(krb5pac
.PAC_TYPE_KDC_CHECKSUM
)
3230 if kdc_checksum_key
is not None:
3231 checksum_keys
[krb5pac
.PAC_TYPE_TICKET_CHECKSUM
] = (
3234 # Decrypt the ticket.
3236 enc_part
= ticket
.ticket
['enc-part']
3238 self
.assertElementEqual(enc_part
, 'etype', key
.etype
)
3239 self
.assertElementKVNO(enc_part
, 'kvno', key
.kvno
)
3241 enc_part
= key
.decrypt(KU_TICKET
, enc_part
['cipher'])
3242 enc_part
= self
.der_decode(
3243 enc_part
, asn1Spec
=krb5_asn1
.EncTicketPart())
3245 # Modify the ticket here.
3246 if modify_fn
is not None:
3247 enc_part
= modify_fn(enc_part
)
3249 auth_data
= enc_part
.get('authorization-data')
3251 self
.assertIsNotNone(auth_data
)
3252 if auth_data
is not None:
3255 # Get a copy of the authdata with an empty PAC, and the
3256 # existing PAC (if present).
3257 empty_pac
= self
.get_empty_pac()
3258 empty_pac_auth_data
, pac_data
= self
.replace_pac(
3261 expect_pac
=expect_pac
)
3263 if pac_data
is not None:
3264 pac
= ndr_unpack(krb5pac
.PAC_DATA
, pac_data
)
3266 # Modify the PAC here.
3267 if modify_pac_fn
is not None:
3268 pac
= modify_pac_fn(pac
)
3270 if update_pac_checksums
:
3271 # Get the enc-part with an empty PAC, which is needed
3272 # to create a ticket signature.
3273 enc_part_to_sign
= enc_part
.copy()
3274 enc_part_to_sign
['authorization-data'] = (
3275 empty_pac_auth_data
)
3276 enc_part_to_sign
= self
.der_encode(
3278 asn1Spec
=krb5_asn1
.EncTicketPart())
3280 self
.update_pac_checksums(pac
,
3285 # Re-encode the PAC.
3286 pac_data
= ndr_pack(pac
)
3287 new_pac
= self
.AuthorizationData_create(AD_WIN2K_PAC
,
3290 # Replace the PAC in the authorization data and re-add it to the
3292 auth_data
, _
= self
.replace_pac(auth_data
, new_pac
,
3293 expect_pac
=expect_pac
)
3294 enc_part
['authorization-data'] = auth_data
3296 # Re-encrypt the ticket enc-part with the new key.
3297 enc_part_new
= self
.der_encode(enc_part
,
3298 asn1Spec
=krb5_asn1
.EncTicketPart())
3299 enc_part_new
= self
.EncryptedData_create(new_ticket_key
,
3303 # Create a copy of the ticket with the new enc-part.
3304 new_ticket
= ticket
.ticket
.copy()
3305 new_ticket
['enc-part'] = enc_part_new
3307 new_ticket_creds
= KerberosTicketCreds(
3309 session_key
=ticket
.session_key
,
3310 crealm
=ticket
.crealm
,
3312 srealm
=ticket
.srealm
,
3314 decryption_key
=new_ticket_key
,
3315 ticket_private
=enc_part
,
3316 encpart_private
=ticket
.encpart_private
)
3318 return new_ticket_creds
3320 def update_pac_checksums(self
,
3325 pac_buffers
= pac
.buffers
3326 checksum_buffers
= {}
3328 # Find the relevant PAC checksum buffers.
3329 for pac_buffer
in pac_buffers
:
3330 buffer_type
= pac_buffer
.type
3331 if buffer_type
in self
.pac_checksum_types
:
3332 self
.assertNotIn(buffer_type
, checksum_buffers
,
3333 f
'Duplicate checksum type {buffer_type}')
3335 checksum_buffers
[buffer_type
] = pac_buffer
3337 # Create any additional buffers that were requested but not
3338 # present. Conversely, remove any buffers that were requested to be
3340 for buffer_type
in self
.pac_checksum_types
:
3341 if buffer_type
in checksum_buffers
:
3342 if include_checksums
.get(buffer_type
) is False:
3343 checksum_buffer
= checksum_buffers
.pop(buffer_type
)
3345 pac
.num_buffers
-= 1
3346 pac_buffers
.remove(checksum_buffer
)
3348 elif include_checksums
.get(buffer_type
) is True:
3349 info
= krb5pac
.PAC_SIGNATURE_DATA()
3351 checksum_buffer
= krb5pac
.PAC_BUFFER()
3352 checksum_buffer
.type = buffer_type
3353 checksum_buffer
.info
= info
3355 pac_buffers
.append(checksum_buffer
)
3356 pac
.num_buffers
+= 1
3358 checksum_buffers
[buffer_type
] = checksum_buffer
3360 # Fill the relevant checksum buffers.
3361 for buffer_type
, checksum_buffer
in checksum_buffers
.items():
3362 checksum_key
= checksum_keys
[buffer_type
]
3363 ctype
= checksum_key
.ctype
& ((1 << 32) - 1)
3365 if buffer_type
== krb5pac
.PAC_TYPE_TICKET_CHECKSUM
:
3366 self
.assertIsNotNone(enc_part
)
3368 signature
= checksum_key
.make_rodc_checksum(
3369 KU_NON_KERB_CKSUM_SALT
,
3372 elif buffer_type
== krb5pac
.PAC_TYPE_SRV_CHECKSUM
:
3373 signature
= checksum_key
.make_zeroed_checksum()
3376 signature
= checksum_key
.make_rodc_zeroed_checksum()
3378 checksum_buffer
.info
.signature
= signature
3379 checksum_buffer
.info
.type = ctype
3381 # Add the new checksum buffers to the PAC.
3382 pac
.buffers
= pac_buffers
3384 # Calculate the server and KDC checksums and insert them into the PAC.
3386 server_checksum_buffer
= checksum_buffers
.get(
3387 krb5pac
.PAC_TYPE_SRV_CHECKSUM
)
3388 if server_checksum_buffer
is not None:
3389 server_checksum_key
= checksum_keys
[krb5pac
.PAC_TYPE_SRV_CHECKSUM
]
3391 pac_data
= ndr_pack(pac
)
3392 server_checksum
= server_checksum_key
.make_checksum(
3393 KU_NON_KERB_CKSUM_SALT
,
3396 server_checksum_buffer
.info
.signature
= server_checksum
3398 kdc_checksum_buffer
= checksum_buffers
.get(
3399 krb5pac
.PAC_TYPE_KDC_CHECKSUM
)
3400 if kdc_checksum_buffer
is not None:
3401 if server_checksum_buffer
is None:
3402 # There's no server signature to make the checksum over, so
3403 # just make the checksum over an empty bytes object.
3404 server_checksum
= bytes()
3406 kdc_checksum_key
= checksum_keys
[krb5pac
.PAC_TYPE_KDC_CHECKSUM
]
3408 kdc_checksum
= kdc_checksum_key
.make_rodc_checksum(
3409 KU_NON_KERB_CKSUM_SALT
,
3412 kdc_checksum_buffer
.info
.signature
= kdc_checksum
3414 def replace_pac(self
, auth_data
, new_pac
, expect_pac
=True):
3415 if new_pac
is not None:
3416 self
.assertElementEqual(new_pac
, 'ad-type', AD_WIN2K_PAC
)
3417 self
.assertElementPresent(new_pac
, 'ad-data')
3424 for authdata_elem
in auth_data
:
3425 if authdata_elem
['ad-type'] == AD_IF_RELEVANT
:
3426 ad_relevant
= self
.der_decode(
3427 authdata_elem
['ad-data'],
3428 asn1Spec
=krb5_asn1
.AD_IF_RELEVANT())
3431 for relevant_elem
in ad_relevant
:
3432 if relevant_elem
['ad-type'] == AD_WIN2K_PAC
:
3433 self
.assertIsNone(old_pac
, 'Multiple PACs detected')
3434 old_pac
= relevant_elem
['ad-data']
3436 if new_pac
is not None:
3437 relevant_elems
.append(new_pac
)
3439 relevant_elems
.append(relevant_elem
)
3441 self
.assertIsNotNone(old_pac
, 'Expected PAC')
3444 ad_relevant
= self
.der_encode(
3446 asn1Spec
=krb5_asn1
.AD_IF_RELEVANT())
3448 authdata_elem
= self
.AuthorizationData_create(
3452 authdata_elem
= None
3454 if authdata_elem
is not None:
3455 new_auth_data
.append(authdata_elem
)
3458 self
.assertIsNotNone(ad_relevant
, 'Expected AD-RELEVANT')
3460 return new_auth_data
, old_pac
3462 def get_pac(self
, auth_data
, expect_pac
=True):
3463 _
, pac
= self
.replace_pac(auth_data
, None, expect_pac
)
3466 def get_ticket_pac(self
, ticket
, expect_pac
=True):
3467 auth_data
= ticket
.ticket_private
.get('authorization-data')
3469 self
.assertIsNotNone(auth_data
)
3470 elif auth_data
is None:
3473 return self
.get_pac(auth_data
, expect_pac
=expect_pac
)
3475 def get_krbtgt_checksum_key(self
):
3476 krbtgt_creds
= self
.get_krbtgt_creds()
3477 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
3480 krb5pac
.PAC_TYPE_KDC_CHECKSUM
: krbtgt_key
3483 def is_tgs(self
, principal
):
3484 name
= principal
['name-string'][0]
3485 return name
in ('krbtgt', b
'krbtgt')
3487 def get_empty_pac(self
):
3488 return self
.AuthorizationData_create(AD_WIN2K_PAC
, bytes(1))
3490 def get_outer_pa_dict(self
, kdc_exchange_dict
):
3491 return self
.get_pa_dict(kdc_exchange_dict
['req_padata'])
3493 def get_fast_pa_dict(self
, kdc_exchange_dict
):
3494 req_pa_dict
= self
.get_pa_dict(kdc_exchange_dict
['fast_padata'])
3499 return self
.get_outer_pa_dict(kdc_exchange_dict
)
3501 def sent_fast(self
, kdc_exchange_dict
):
3502 outer_pa_dict
= self
.get_outer_pa_dict(kdc_exchange_dict
)
3504 return PADATA_FX_FAST
in outer_pa_dict
3506 def sent_enc_challenge(self
, kdc_exchange_dict
):
3507 fast_pa_dict
= self
.get_fast_pa_dict(kdc_exchange_dict
)
3509 return PADATA_ENCRYPTED_CHALLENGE
in fast_pa_dict
3511 def get_sent_pac_options(self
, kdc_exchange_dict
):
3512 fast_pa_dict
= self
.get_fast_pa_dict(kdc_exchange_dict
)
3514 if PADATA_PAC_OPTIONS
not in fast_pa_dict
:
3517 pac_options
= self
.der_decode(fast_pa_dict
[PADATA_PAC_OPTIONS
],
3518 asn1Spec
=krb5_asn1
.PA_PAC_OPTIONS())
3519 pac_options
= pac_options
['options']
3521 # Mask out unsupported bits.
3522 pac_options
, remaining
= pac_options
[:4], pac_options
[4:]
3523 pac_options
+= '0' * len(remaining
)
3527 def get_krbtgt_sname(self
):
3528 krbtgt_creds
= self
.get_krbtgt_creds()
3529 krbtgt_username
= krbtgt_creds
.get_username()
3530 krbtgt_realm
= krbtgt_creds
.get_realm()
3531 krbtgt_sname
= self
.PrincipalName_create(
3532 name_type
=NT_SRV_INST
, names
=[krbtgt_username
, krbtgt_realm
])
3536 def _test_as_exchange(self
,
3542 expected_error_mode
,
3551 expected_flags
=None,
3552 unexpected_flags
=None,
3553 expected_supported_etypes
=None,
3555 ticket_decryption_key
=None,
3561 def _generate_padata_copy(_kdc_exchange_dict
,
3564 return padata
, req_body
3566 if not expected_error_mode
:
3567 check_error_fn
= None
3568 check_rep_fn
= self
.generic_check_kdc_rep
3570 check_error_fn
= self
.generic_check_kdc_error
3573 if padata
is not None:
3574 generate_padata_fn
= _generate_padata_copy
3576 generate_padata_fn
= None
3578 kdc_exchange_dict
= self
.as_exchange_dict(
3579 expected_crealm
=expected_crealm
,
3580 expected_cname
=expected_cname
,
3581 expected_srealm
=expected_srealm
,
3582 expected_sname
=expected_sname
,
3583 expected_supported_etypes
=expected_supported_etypes
,
3584 ticket_decryption_key
=ticket_decryption_key
,
3585 generate_padata_fn
=generate_padata_fn
,
3586 check_error_fn
=check_error_fn
,
3587 check_rep_fn
=check_rep_fn
,
3588 check_kdc_private_fn
=self
.generic_check_kdc_private
,
3589 expected_error_mode
=expected_error_mode
,
3590 client_as_etypes
=client_as_etypes
,
3591 expected_salt
=expected_salt
,
3592 expected_flags
=expected_flags
,
3593 unexpected_flags
=unexpected_flags
,
3594 preauth_key
=preauth_key
,
3595 kdc_options
=str(kdc_options
),
3596 pac_request
=pac_request
,
3597 pac_options
=pac_options
,
3598 expect_pac
=expect_pac
,
3601 rep
= self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
3608 return rep
, kdc_exchange_dict