2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 sys
.path
.insert(0, "bin/python")
24 os
.environ
["PYTHONUNBUFFERED"] = "1"
26 from samba
import ntstatus
27 from samba
.dcerpc
import krb5pac
, lsa
29 from samba
.tests
import env_get_var_value
30 from samba
.tests
.krb5
.kcrypto
import Cksumtype
, Enctype
31 from samba
.tests
.krb5
.kdc_base_test
import KDCBaseTest
32 from samba
.tests
.krb5
.raw_testcase
import (
36 from samba
.tests
.krb5
.rfc4120_constants
import (
37 AES256_CTS_HMAC_SHA1_96
,
40 KDC_ERR_BAD_INTEGRITY
,
44 KDC_ERR_SUMTYPE_NOSUPP
,
47 KU_TGS_REP_ENC_PART_SUB_KEY
,
50 import samba
.tests
.krb5
.rfc4120_pyasn1
as krb5_asn1
52 global_asn1_print
= False
53 global_hexdump
= False
56 class S4UKerberosTests(KDCBaseTest
):
59 super(S4UKerberosTests
, self
).setUp()
60 self
.do_asn1_print
= global_asn1_print
61 self
.do_hexdump
= global_hexdump
63 def _test_s4u2self(self
, pa_s4u2self_ctype
=None):
64 service_creds
= self
.get_service_creds()
65 service
= service_creds
.get_username()
66 realm
= service_creds
.get_realm()
68 cname
= self
.PrincipalName_create(name_type
=1, names
=[service
])
69 sname
= self
.PrincipalName_create(name_type
=2, names
=["krbtgt", realm
])
71 till
= self
.get_KerberosTime(offset
=36000)
73 kdc_options
= krb5_asn1
.KDCOptions('forwardable')
78 req
= self
.AS_REQ_create(padata
=padata
,
79 kdc_options
=str(kdc_options
),
89 additional_tickets
=None)
90 rep
= self
.send_recv_transaction(req
)
91 self
.assertIsNotNone(rep
)
93 self
.assertEqual(rep
['msg-type'], 30)
94 self
.assertEqual(rep
['error-code'], 25)
95 rep_padata
= self
.der_decode(
96 rep
['e-data'], asn1Spec
=krb5_asn1
.METHOD_DATA())
99 if pa
['padata-type'] == 19:
100 etype_info2
= pa
['padata-value']
103 etype_info2
= self
.der_decode(
104 etype_info2
, asn1Spec
=krb5_asn1
.ETYPE_INFO2())
106 key
= self
.PasswordKey_from_etype_info2(service_creds
, etype_info2
[0])
108 (patime
, pausec
) = self
.get_KerberosTimeWithUsec()
109 pa_ts
= self
.PA_ENC_TS_ENC_create(patime
, pausec
)
110 pa_ts
= self
.der_encode(pa_ts
, asn1Spec
=krb5_asn1
.PA_ENC_TS_ENC())
112 pa_ts
= self
.EncryptedData_create(key
, KU_PA_ENC_TIMESTAMP
, pa_ts
)
113 pa_ts
= self
.der_encode(pa_ts
, asn1Spec
=krb5_asn1
.EncryptedData())
115 pa_ts
= self
.PA_DATA_create(2, pa_ts
)
117 kdc_options
= krb5_asn1
.KDCOptions('forwardable')
120 req
= self
.AS_REQ_create(padata
=padata
,
121 kdc_options
=str(kdc_options
),
131 additional_tickets
=None)
132 rep
= self
.send_recv_transaction(req
)
133 self
.assertIsNotNone(rep
)
135 msg_type
= rep
['msg-type']
136 self
.assertEqual(msg_type
, 11)
138 enc_part2
= key
.decrypt(KU_AS_REP_ENC_PART
, rep
['enc-part']['cipher'])
139 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
142 enc_part2
= self
.der_decode(
143 enc_part2
, asn1Spec
=krb5_asn1
.EncASRepPart())
145 enc_part2
= self
.der_decode(
146 enc_part2
, asn1Spec
=krb5_asn1
.EncTGSRepPart())
151 for_user_name
= env_get_var_value('FOR_USER')
152 uname
= self
.PrincipalName_create(name_type
=1, names
=[for_user_name
])
154 kdc_options
= krb5_asn1
.KDCOptions('forwardable')
155 till
= self
.get_KerberosTime(offset
=36000)
156 ticket
= rep
['ticket']
157 ticket_session_key
= self
.EncryptionKey_import(enc_part2
['key'])
158 pa_s4u
= self
.PA_S4U2Self_create(name
=uname
, realm
=realm
,
159 tgt_session_key
=ticket_session_key
,
160 ctype
=pa_s4u2self_ctype
)
163 subkey
= self
.RandomKey(ticket_session_key
.etype
)
165 (ctime
, cusec
) = self
.get_KerberosTimeWithUsec()
167 req
= self
.TGS_REQ_create(padata
=padata
,
171 kdc_options
=str(kdc_options
),
181 EncAuthorizationData
=None,
182 EncAuthorizationData_key
=None,
183 additional_tickets
=None,
184 ticket_session_key
=ticket_session_key
,
185 authenticator_subkey
=subkey
)
186 rep
= self
.send_recv_transaction(req
)
187 self
.assertIsNotNone(rep
)
189 msg_type
= rep
['msg-type']
191 enc_part2
= subkey
.decrypt(
192 KU_TGS_REP_ENC_PART_SUB_KEY
, rep
['enc-part']['cipher'])
193 enc_part2
= self
.der_decode(
194 enc_part2
, asn1Spec
=krb5_asn1
.EncTGSRepPart())
198 # Using the checksum type from the tgt_session_key happens to work
200 def test_s4u2self(self
):
201 msg_type
= self
._test
_s
4u2self
()
202 self
.assertEqual(msg_type
, 13)
204 # Per spec, the checksum of PA-FOR-USER is HMAC_MD5, see [MS-SFU] 2.2.1
205 def test_s4u2self_hmac_md5_checksum(self
):
206 msg_type
= self
._test
_s
4u2self
(pa_s4u2self_ctype
=Cksumtype
.HMAC_MD5
)
207 self
.assertEqual(msg_type
, 13)
209 def test_s4u2self_md5_unkeyed_checksum(self
):
210 msg_type
= self
._test
_s
4u2self
(pa_s4u2self_ctype
=Cksumtype
.MD5
)
211 self
.assertEqual(msg_type
, 30)
213 def test_s4u2self_sha1_unkeyed_checksum(self
):
214 msg_type
= self
._test
_s
4u2self
(pa_s4u2self_ctype
=Cksumtype
.SHA1
)
215 self
.assertEqual(msg_type
, 30)
217 def test_s4u2self_crc32_unkeyed_checksum(self
):
218 msg_type
= self
._test
_s
4u2self
(pa_s4u2self_ctype
=Cksumtype
.CRC32
)
219 self
.assertEqual(msg_type
, 30)
221 def _run_s4u2self_test(self
, kdc_dict
):
222 client_opts
= kdc_dict
.pop('client_opts', None)
223 client_creds
= self
.get_cached_creds(machine_account
=False,
226 service_opts
= kdc_dict
.pop('service_opts', None)
227 service_creds
= self
.get_cached_creds(machine_account
=True,
230 service_tgt
= self
.get_tgt(service_creds
)
231 modify_service_tgt_fn
= kdc_dict
.pop('modify_service_tgt_fn', None)
232 if modify_service_tgt_fn
is not None:
233 service_tgt
= modify_service_tgt_fn(service_tgt
)
235 client_name
= client_creds
.get_username()
236 client_cname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
239 service_name
= service_creds
.get_username()[:-1]
240 service_sname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
241 names
=['host', service_name
])
243 realm
= client_creds
.get_realm()
245 expected_flags
= kdc_dict
.pop('expected_flags', None)
246 if expected_flags
is not None:
247 expected_flags
= krb5_asn1
.TicketFlags(expected_flags
)
249 unexpected_flags
= kdc_dict
.pop('unexpected_flags', None)
250 if unexpected_flags
is not None:
251 unexpected_flags
= krb5_asn1
.TicketFlags(unexpected_flags
)
253 kdc_options
= kdc_dict
.pop('kdc_options', '0')
254 kdc_options
= krb5_asn1
.KDCOptions(kdc_options
)
256 service_decryption_key
= self
.TicketDecryptionKey_from_creds(
259 authenticator_subkey
= self
.RandomKey(Enctype
.AES256
)
261 etypes
= kdc_dict
.pop('etypes', (AES256_CTS_HMAC_SHA1_96
,
264 def generate_s4u2self_padata(_kdc_exchange_dict
,
267 pa_s4u
= self
.PA_S4U2Self_create(
270 tgt_session_key
=service_tgt
.session_key
,
273 return [pa_s4u
], req_body
275 kdc_exchange_dict
= self
.tgs_exchange_dict(
276 expected_crealm
=realm
,
277 expected_cname
=client_cname
,
278 expected_srealm
=realm
,
279 expected_sname
=service_sname
,
280 expected_flags
=expected_flags
,
281 unexpected_flags
=unexpected_flags
,
282 ticket_decryption_key
=service_decryption_key
,
283 expect_ticket_checksum
=True,
284 generate_padata_fn
=generate_s4u2self_padata
,
285 check_rep_fn
=self
.generic_check_kdc_rep
,
286 check_kdc_private_fn
=self
.generic_check_kdc_private
,
287 expected_error_mode
=0,
289 authenticator_subkey
=authenticator_subkey
,
290 kdc_options
=str(kdc_options
),
293 self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
299 # Ensure we used all the parameters given to us.
300 self
.assertEqual({}, kdc_dict
)
302 # Test performing an S4U2Self operation with a forwardable ticket. The
303 # resulting ticket should have the 'forwardable' flag set.
304 def test_s4u2self_forwardable(self
):
305 self
._run
_s
4u2self
_test
(
308 'not_delegated': False
310 'kdc_options': 'forwardable',
311 'modify_service_tgt_fn': functools
.partial(
312 self
.set_ticket_forwardable
, flag
=True),
313 'expected_flags': 'forwardable'
316 # Test performing an S4U2Self operation without requesting a forwardable
317 # ticket. The resulting ticket should not have the 'forwardable' flag set.
318 def test_s4u2self_without_forwardable(self
):
319 self
._run
_s
4u2self
_test
(
322 'not_delegated': False
324 'modify_service_tgt_fn': functools
.partial(
325 self
.set_ticket_forwardable
, flag
=True),
326 'unexpected_flags': 'forwardable'
329 # Do an S4U2Self with a non-forwardable TGT. The 'forwardable' flag should
330 # not be set on the ticket.
331 def test_s4u2self_not_forwardable(self
):
332 self
._run
_s
4u2self
_test
(
335 'not_delegated': False
337 'kdc_options': 'forwardable',
338 'modify_service_tgt_fn': functools
.partial(
339 self
.set_ticket_forwardable
, flag
=False),
340 'unexpected_flags': 'forwardable'
343 # Do an S4U2Self with the not_delegated flag set on the client. The
344 # 'forwardable' flag should not be set on the ticket.
345 def test_s4u2self_client_not_delegated(self
):
346 self
._run
_s
4u2self
_test
(
349 'not_delegated': True
351 'kdc_options': 'forwardable',
352 'modify_service_tgt_fn': functools
.partial(
353 self
.set_ticket_forwardable
, flag
=True),
354 'unexpected_flags': 'forwardable'
357 # Do an S4U2Self with a service not trusted to authenticate for delegation,
358 # but having an empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
359 # flag should be set on the ticket.
360 def test_s4u2self_not_trusted_empty_allowed(self
):
361 self
._run
_s
4u2self
_test
(
364 'not_delegated': False
367 'trusted_to_auth_for_delegation': False,
368 'delegation_to_spn': ()
370 'kdc_options': 'forwardable',
371 'modify_service_tgt_fn': functools
.partial(
372 self
.set_ticket_forwardable
, flag
=True),
373 'expected_flags': 'forwardable'
376 # Do an S4U2Self with a service not trusted to authenticate for delegation
377 # and having a non-empty msDS-AllowedToDelegateTo attribute. The
378 # 'forwardable' flag should not be set on the ticket.
379 def test_s4u2self_not_trusted_nonempty_allowed(self
):
380 self
._run
_s
4u2self
_test
(
383 'not_delegated': False
386 'trusted_to_auth_for_delegation': False,
387 'delegation_to_spn': ('test',)
389 'kdc_options': 'forwardable',
390 'modify_service_tgt_fn': functools
.partial(
391 self
.set_ticket_forwardable
, flag
=True),
392 'unexpected_flags': 'forwardable'
395 # Do an S4U2Self with a service trusted to authenticate for delegation and
396 # having an empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
397 # flag should be set on the ticket.
398 def test_s4u2self_trusted_empty_allowed(self
):
399 self
._run
_s
4u2self
_test
(
402 'not_delegated': False
405 'trusted_to_auth_for_delegation': True,
406 'delegation_to_spn': ()
408 'kdc_options': 'forwardable',
409 'modify_service_tgt_fn': functools
.partial(
410 self
.set_ticket_forwardable
, flag
=True),
411 'expected_flags': 'forwardable'
414 # Do an S4U2Self with a service trusted to authenticate for delegation and
415 # having a non-empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
416 # flag should be set on the ticket.
417 def test_s4u2self_trusted_nonempty_allowed(self
):
418 self
._run
_s
4u2self
_test
(
421 'not_delegated': False
424 'trusted_to_auth_for_delegation': True,
425 'delegation_to_spn': ('test',)
427 'kdc_options': 'forwardable',
428 'modify_service_tgt_fn': functools
.partial(
429 self
.set_ticket_forwardable
, flag
=True),
430 'expected_flags': 'forwardable'
433 def _run_delegation_test(self
, kdc_dict
):
434 client_opts
= kdc_dict
.pop('client_opts', None)
435 client_creds
= self
.get_cached_creds(machine_account
=False,
438 service1_opts
= kdc_dict
.pop('service1_opts', {})
439 service2_opts
= kdc_dict
.pop('service2_opts', {})
441 allow_delegation
= kdc_dict
.pop('allow_delegation', False)
442 allow_rbcd
= kdc_dict
.pop('allow_rbcd', False)
443 self
.assertFalse(allow_delegation
and allow_rbcd
)
446 service1_creds
= self
.get_cached_creds(machine_account
=True,
449 self
.assertNotIn('delegation_from_dn', service2_opts
)
450 service2_opts
['delegation_from_dn'] = str(service1_creds
.get_dn())
452 service2_creds
= self
.get_cached_creds(machine_account
=True,
455 service2_creds
= self
.get_cached_creds(machine_account
=True,
459 self
.assertNotIn('delegation_to_spn', service1_opts
)
460 service1_opts
['delegation_to_spn'] = service2_creds
.get_spn()
462 service1_creds
= self
.get_cached_creds(machine_account
=True,
465 client_tkt_options
= kdc_dict
.pop('client_tkt_options', 'forwardable')
466 expected_flags
= krb5_asn1
.TicketFlags(client_tkt_options
)
468 client_tgt
= self
.get_tgt(client_creds
,
469 kdc_options
=client_tkt_options
,
470 expected_flags
=expected_flags
)
471 client_service_tkt
= self
.get_service_ticket(
474 kdc_options
=client_tkt_options
,
475 expected_flags
=expected_flags
)
477 service1_tgt
= self
.get_tgt(service1_creds
)
479 modify_client_tkt_fn
= kdc_dict
.pop('modify_client_tkt_fn', None)
480 if modify_client_tkt_fn
is not None:
481 client_service_tkt
= modify_client_tkt_fn(client_service_tkt
)
483 additional_tickets
= [client_service_tkt
.ticket
]
485 modify_service_tgt_fn
= kdc_dict
.pop('modify_service_tgt_fn', None)
486 if modify_service_tgt_fn
is not None:
487 service1_tgt
= modify_service_tgt_fn(service1_tgt
)
489 kdc_options
= kdc_dict
.pop('kdc_options', None)
490 if kdc_options
is None:
491 kdc_options
= str(krb5_asn1
.KDCOptions('cname-in-addl-tkt'))
493 client_username
= client_creds
.get_username()
494 client_realm
= client_creds
.get_realm()
495 client_cname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
496 names
=[client_username
])
498 service1_name
= service1_creds
.get_username()[:-1]
499 service1_realm
= service1_creds
.get_realm()
501 service2_name
= service2_creds
.get_username()[:-1]
502 service2_realm
= service2_creds
.get_realm()
503 service2_service
= 'host'
504 service2_sname
= self
.PrincipalName_create(
505 name_type
=NT_PRINCIPAL
, names
=[service2_service
,
507 service2_decryption_key
= self
.TicketDecryptionKey_from_creds(
509 service2_etypes
= service2_creds
.tgs_supported_enctypes
511 expected_error_mode
= kdc_dict
.pop('expected_error_mode')
512 expected_status
= kdc_dict
.pop('expected_status', None)
513 if expected_error_mode
:
514 check_error_fn
= self
.generic_check_kdc_error
517 check_error_fn
= None
518 check_rep_fn
= self
.generic_check_kdc_rep
520 self
.assertIsNone(expected_status
)
522 expect_edata
= kdc_dict
.pop('expect_edata', None)
523 if expect_edata
is not None:
524 self
.assertTrue(expected_error_mode
)
526 pac_options
= kdc_dict
.pop('pac_options', None)
528 authenticator_subkey
= self
.RandomKey(Enctype
.AES256
)
530 etypes
= kdc_dict
.pop('etypes', (AES256_CTS_HMAC_SHA1_96
,
533 expected_proxy_target
= service2_creds
.get_spn()
535 expected_transited_services
= kdc_dict
.pop(
536 'expected_transited_services', [])
538 transited_service
= f
'host/{service1_name}@{service1_realm}'
539 expected_transited_services
.append(transited_service
)
541 expect_pac
= kdc_dict
.pop('expect_pac', True)
543 kdc_exchange_dict
= self
.tgs_exchange_dict(
544 expected_crealm
=client_realm
,
545 expected_cname
=client_cname
,
546 expected_srealm
=service2_realm
,
547 expected_sname
=service2_sname
,
548 expected_supported_etypes
=service2_etypes
,
549 ticket_decryption_key
=service2_decryption_key
,
550 check_error_fn
=check_error_fn
,
551 check_rep_fn
=check_rep_fn
,
552 check_kdc_private_fn
=self
.generic_check_kdc_private
,
553 expected_error_mode
=expected_error_mode
,
554 expected_status
=expected_status
,
557 authenticator_subkey
=authenticator_subkey
,
558 kdc_options
=kdc_options
,
559 pac_options
=pac_options
,
560 expect_edata
=expect_edata
,
561 expected_proxy_target
=expected_proxy_target
,
562 expected_transited_services
=expected_transited_services
,
563 expect_pac
=expect_pac
)
565 self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
567 realm
=service2_realm
,
568 sname
=service2_sname
,
570 additional_tickets
=additional_tickets
)
572 # Ensure we used all the parameters given to us.
573 self
.assertEqual({}, kdc_dict
)
575 def test_constrained_delegation(self
):
576 # Test constrained delegation.
577 self
._run
_delegation
_test
(
579 'expected_error_mode': 0,
580 'allow_delegation': True
583 def test_constrained_delegation_no_auth_data_required(self
):
584 # Test constrained delegation.
585 self
._run
_delegation
_test
(
587 'expected_error_mode': 0,
588 'allow_delegation': True,
590 'no_auth_data_required': True
595 def test_constrained_delegation_existing_delegation_info(self
):
596 # Test constrained delegation with an existing S4U_DELEGATION_INFO
597 # structure in the PAC.
599 services
= ['service1', 'service2', 'service3']
601 self
._run
_delegation
_test
(
603 'expected_error_mode': 0,
604 'allow_delegation': True,
605 'modify_client_tkt_fn': functools
.partial(
606 self
.add_delegation_info
, services
=services
),
607 'expected_transited_services': services
610 def test_constrained_delegation_not_allowed(self
):
611 # Test constrained delegation when the delegating service does not
613 self
._run
_delegation
_test
(
615 'expected_error_mode': KDC_ERR_BADOPTION
,
616 'expected_status': ntstatus
.NT_STATUS_NOT_SUPPORTED
,
617 'allow_delegation': False
620 def test_constrained_delegation_no_client_pac(self
):
621 # Test constrained delegation when the client service ticket does not
623 self
._run
_delegation
_test
(
625 'expected_error_mode': (KDC_ERR_BADOPTION
,
627 'allow_delegation': True,
628 'modify_client_tkt_fn': self
.remove_ticket_pac
,
629 'expect_edata': False
632 def test_constrained_delegation_no_service_pac(self
):
633 # Test constrained delegation when the service TGT does not contain a
635 self
._run
_delegation
_test
(
637 'expected_error_mode': 0,
638 'allow_delegation': True,
639 'modify_service_tgt_fn': self
.remove_ticket_pac
642 def test_constrained_delegation_no_client_pac_no_auth_data_required(self
):
643 # Test constrained delegation when the client service ticket does not
645 self
._run
_delegation
_test
(
647 'expected_error_mode': (KDC_ERR_BADOPTION
,
649 'allow_delegation': True,
650 'modify_client_tkt_fn': self
.remove_ticket_pac
,
651 'expect_edata': False,
653 'no_auth_data_required': True
657 def test_constrained_delegation_no_service_pac_no_auth_data_required(self
):
658 # Test constrained delegation when the service TGT does not contain a
660 self
._run
_delegation
_test
(
662 'expected_error_mode': (KDC_ERR_BADOPTION
,
664 'allow_delegation': True,
665 'modify_service_tgt_fn': self
.remove_ticket_pac
,
667 'no_auth_data_required': True
671 def test_constrained_delegation_non_forwardable(self
):
672 # Test constrained delegation with a non-forwardable ticket.
673 self
._run
_delegation
_test
(
675 'expected_error_mode': KDC_ERR_BADOPTION
,
676 'expected_status': ntstatus
.NT_STATUS_ACCOUNT_RESTRICTION
,
677 'allow_delegation': True,
678 'modify_client_tkt_fn': functools
.partial(
679 self
.set_ticket_forwardable
, flag
=False)
682 def test_constrained_delegation_pac_options_rbcd(self
):
683 # Test constrained delegation, but with the RBCD bit set in the PAC
685 self
._run
_delegation
_test
(
687 'expected_error_mode': 0,
688 'pac_options': '0001', # supports RBCD
689 'allow_delegation': True
692 def test_rbcd_no_auth_data_required(self
):
693 self
._run
_delegation
_test
(
695 'expected_error_mode': 0,
697 'pac_options': '0001', # supports RBCD
699 'no_auth_data_required': True
704 def test_rbcd_existing_delegation_info(self
):
705 # Test constrained delegation with an existing S4U_DELEGATION_INFO
706 # structure in the PAC.
708 services
= ['service1', 'service2', 'service3']
710 self
._run
_delegation
_test
(
712 'expected_error_mode': 0,
714 'pac_options': '0001', # supports RBCD
715 'modify_client_tkt_fn': functools
.partial(
716 self
.add_delegation_info
, services
=services
),
717 'expected_transited_services': services
720 def test_rbcd_not_allowed(self
):
721 # Test resource-based constrained delegation when the target service
723 self
._run
_delegation
_test
(
725 'expected_error_mode': KDC_ERR_BADOPTION
,
726 'expected_status': ntstatus
.NT_STATUS_NOT_FOUND
,
728 'pac_options': '0001' # supports RBCD
731 def test_rbcd_no_client_pac_a(self
):
732 # Test constrained delegation when the client service ticket does not
733 # contain a PAC, and an empty msDS-AllowedToDelegateTo attribute.
734 self
._run
_delegation
_test
(
736 'expected_error_mode': KDC_ERR_MODIFIED
,
737 'expected_status': ntstatus
.NT_STATUS_NOT_SUPPORTED
,
739 'pac_options': '0001', # supports RBCD
740 'modify_client_tkt_fn': self
.remove_ticket_pac
743 def test_rbcd_no_client_pac_b(self
):
744 # Test constrained delegation when the client service ticket does not
745 # contain a PAC, and a non-empty msDS-AllowedToDelegateTo attribute.
746 self
._run
_delegation
_test
(
748 'expected_error_mode': KDC_ERR_MODIFIED
,
749 'expected_status': ntstatus
.NT_STATUS_NO_MATCH
,
751 'pac_options': '0001', # supports RBCD
752 'modify_client_tkt_fn': self
.remove_ticket_pac
,
754 'delegation_to_spn': ('host/test')
758 def test_rbcd_no_service_pac(self
):
759 # Test constrained delegation when the service TGT does not contain a
761 self
._run
_delegation
_test
(
763 'expected_error_mode': KDC_ERR_BADOPTION
,
765 ntstatus
.NT_STATUS_NOT_FOUND
,
767 'pac_options': '0001', # supports RBCD
768 'modify_service_tgt_fn': self
.remove_ticket_pac
771 def test_rbcd_no_client_pac_no_auth_data_required_a(self
):
772 # Test constrained delegation when the client service ticket does not
773 # contain a PAC, and an empty msDS-AllowedToDelegateTo attribute.
774 self
._run
_delegation
_test
(
776 'expected_error_mode': KDC_ERR_MODIFIED
,
777 'expected_status': ntstatus
.NT_STATUS_NOT_SUPPORTED
,
779 'pac_options': '0001', # supports RBCD
780 'modify_client_tkt_fn': self
.remove_ticket_pac
,
782 'no_auth_data_required': True
786 def test_rbcd_no_client_pac_no_auth_data_required_b(self
):
787 # Test constrained delegation when the client service ticket does not
788 # contain a PAC, and a non-empty msDS-AllowedToDelegateTo attribute.
789 self
._run
_delegation
_test
(
791 'expected_error_mode': KDC_ERR_MODIFIED
,
792 'expected_status': ntstatus
.NT_STATUS_NO_MATCH
,
794 'pac_options': '0001', # supports RBCD
795 'modify_client_tkt_fn': self
.remove_ticket_pac
,
797 'delegation_to_spn': ('host/test')
800 'no_auth_data_required': True
804 def test_rbcd_no_service_pac_no_auth_data_required(self
):
805 # Test constrained delegation when the service TGT does not contain a
807 self
._run
_delegation
_test
(
809 'expected_error_mode': KDC_ERR_BADOPTION
,
811 ntstatus
.NT_STATUS_NOT_FOUND
,
813 'pac_options': '0001', # supports RBCD
814 'modify_service_tgt_fn': self
.remove_ticket_pac
,
816 'no_auth_data_required': True
820 def test_rbcd_non_forwardable(self
):
821 # Test resource-based constrained delegation with a non-forwardable
823 self
._run
_delegation
_test
(
825 'expected_error_mode': KDC_ERR_BADOPTION
,
826 'expected_status': ntstatus
.NT_STATUS_ACCOUNT_RESTRICTION
,
828 'pac_options': '0001', # supports RBCD
829 'modify_client_tkt_fn': functools
.partial(
830 self
.set_ticket_forwardable
, flag
=False)
833 def test_rbcd_no_pac_options_a(self
):
834 # Test resource-based constrained delegation without the RBCD bit set
835 # in the PAC options, and an empty msDS-AllowedToDelegateTo attribute.
836 self
._run
_delegation
_test
(
838 'expected_error_mode': KDC_ERR_BADOPTION
,
839 'expected_status': ntstatus
.NT_STATUS_NOT_SUPPORTED
,
841 'pac_options': '1' # does not support RBCD
844 def test_rbcd_no_pac_options_b(self
):
845 # Test resource-based constrained delegation without the RBCD bit set
846 # in the PAC options, and a non-empty msDS-AllowedToDelegateTo
848 self
._run
_delegation
_test
(
850 'expected_error_mode': KDC_ERR_BADOPTION
,
851 'expected_status': ntstatus
.NT_STATUS_NO_MATCH
,
853 'pac_options': '1', # does not support RBCD
855 'delegation_to_spn': ('host/test')
859 def test_bronze_bit_constrained_delegation_old_checksum(self
):
860 # Attempt to modify the ticket without updating the PAC checksums.
861 self
._run
_delegation
_test
(
863 'expected_error_mode': (KDC_ERR_MODIFIED
,
864 KDC_ERR_BAD_INTEGRITY
),
865 'allow_delegation': True,
866 'client_tkt_options': '0', # non-forwardable ticket
867 'modify_client_tkt_fn': functools
.partial(
868 self
.set_ticket_forwardable
,
869 flag
=True, update_pac_checksums
=False),
870 'expect_edata': False
873 def test_bronze_bit_rbcd_old_checksum(self
):
874 # Attempt to modify the ticket without updating the PAC checksums.
875 self
._run
_delegation
_test
(
877 'expected_error_mode': KDC_ERR_MODIFIED
,
878 'expected_status': ntstatus
.NT_STATUS_NOT_SUPPORTED
,
880 'pac_options': '0001', # supports RBCD
881 'client_tkt_options': '0', # non-forwardable ticket
882 'modify_client_tkt_fn': functools
.partial(
883 self
.set_ticket_forwardable
,
884 flag
=True, update_pac_checksums
=False)
887 def test_constrained_delegation_missing_client_checksum(self
):
888 # Present a user ticket without the required checksums.
889 for checksum
in self
.pac_checksum_types
:
890 with self
.subTest(checksum
=checksum
):
891 if checksum
== krb5pac
.PAC_TYPE_TICKET_CHECKSUM
:
892 expected_error_mode
= (KDC_ERR_BADOPTION
,
895 expected_error_mode
= KDC_ERR_GENERIC
897 self
._run
_delegation
_test
(
899 'expected_error_mode': expected_error_mode
,
900 'allow_delegation': True,
901 'modify_client_tkt_fn': functools
.partial(
902 self
.remove_pac_checksum
, checksum
=checksum
),
903 'expect_edata': False
906 def test_constrained_delegation_missing_service_checksum(self
):
907 # Present the service's ticket without the required checksums.
908 for checksum
in filter(lambda x
: x
!= krb5pac
.PAC_TYPE_TICKET_CHECKSUM
,
909 self
.pac_checksum_types
):
910 with self
.subTest(checksum
=checksum
):
911 self
._run
_delegation
_test
(
913 'expected_error_mode': KDC_ERR_GENERIC
,
915 ntstatus
.NT_STATUS_INSUFFICIENT_RESOURCES
,
916 'allow_delegation': True,
917 'modify_service_tgt_fn': functools
.partial(
918 self
.remove_pac_checksum
, checksum
=checksum
)
921 def test_rbcd_missing_client_checksum(self
):
922 # Present a user ticket without the required checksums.
923 for checksum
in self
.pac_checksum_types
:
924 with self
.subTest(checksum
=checksum
):
925 if checksum
== krb5pac
.PAC_TYPE_TICKET_CHECKSUM
:
926 expected_error_mode
= KDC_ERR_MODIFIED
928 expected_error_mode
= KDC_ERR_GENERIC
930 self
._run
_delegation
_test
(
932 'expected_error_mode': expected_error_mode
,
934 ntstatus
.NT_STATUS_NOT_SUPPORTED
,
936 'pac_options': '0001', # supports RBCD
937 'modify_client_tkt_fn': functools
.partial(
938 self
.remove_pac_checksum
, checksum
=checksum
)
941 def test_rbcd_missing_service_checksum(self
):
942 # Present the service's ticket without the required checksums.
943 for checksum
in filter(lambda x
: x
!= krb5pac
.PAC_TYPE_TICKET_CHECKSUM
,
944 self
.pac_checksum_types
):
945 with self
.subTest(checksum
=checksum
):
946 self
._run
_delegation
_test
(
948 'expected_error_mode': KDC_ERR_GENERIC
,
950 ntstatus
.NT_STATUS_INSUFFICIENT_RESOURCES
,
952 'pac_options': '0001', # supports RBCD
953 'modify_service_tgt_fn': functools
.partial(
954 self
.remove_pac_checksum
, checksum
=checksum
)
957 def test_constrained_delegation_zeroed_client_checksum(self
):
958 # Present a user ticket with invalid checksums.
959 for checksum
in self
.pac_checksum_types
:
960 with self
.subTest(checksum
=checksum
):
961 self
._run
_delegation
_test
(
963 'expected_error_mode': (KDC_ERR_MODIFIED
,
964 KDC_ERR_BAD_INTEGRITY
),
965 'allow_delegation': True,
966 'modify_client_tkt_fn': functools
.partial(
967 self
.zeroed_pac_checksum
, checksum
=checksum
),
968 'expect_edata': False
971 def test_constrained_delegation_zeroed_service_checksum(self
):
972 # Present the service's ticket with invalid checksums.
973 for checksum
in self
.pac_checksum_types
:
974 with self
.subTest(checksum
=checksum
):
975 if checksum
== krb5pac
.PAC_TYPE_SRV_CHECKSUM
:
976 expected_error_mode
= (KDC_ERR_MODIFIED
,
977 KDC_ERR_BAD_INTEGRITY
)
978 expected_status
= ntstatus
.NT_STATUS_WRONG_PASSWORD
980 expected_error_mode
= 0
981 expected_status
= None
983 self
._run
_delegation
_test
(
985 'expected_error_mode': expected_error_mode
,
986 'expected_status': expected_status
,
987 'allow_delegation': True,
988 'modify_service_tgt_fn': functools
.partial(
989 self
.zeroed_pac_checksum
, checksum
=checksum
)
992 def test_rbcd_zeroed_client_checksum(self
):
993 # Present a user ticket with invalid checksums.
994 for checksum
in self
.pac_checksum_types
:
995 with self
.subTest(checksum
=checksum
):
996 self
._run
_delegation
_test
(
998 'expected_error_mode': KDC_ERR_MODIFIED
,
1000 ntstatus
.NT_STATUS_NOT_SUPPORTED
,
1002 'pac_options': '0001', # supports RBCD
1003 'modify_client_tkt_fn': functools
.partial(
1004 self
.zeroed_pac_checksum
, checksum
=checksum
)
1007 def test_rbcd_zeroed_service_checksum(self
):
1008 # Present the service's ticket with invalid checksums.
1009 for checksum
in self
.pac_checksum_types
:
1010 with self
.subTest(checksum
=checksum
):
1011 if checksum
== krb5pac
.PAC_TYPE_SRV_CHECKSUM
:
1012 expected_error_mode
= (KDC_ERR_MODIFIED
,
1013 KDC_ERR_BAD_INTEGRITY
)
1014 expected_status
= ntstatus
.NT_STATUS_WRONG_PASSWORD
1016 expected_error_mode
= 0
1017 expected_status
= None
1019 self
._run
_delegation
_test
(
1021 'expected_error_mode': expected_error_mode
,
1022 'expected_status': expected_status
,
1024 'pac_options': '0001', # supports RBCD
1025 'modify_service_tgt_fn': functools
.partial(
1026 self
.zeroed_pac_checksum
, checksum
=checksum
)
1029 unkeyed_ctypes
= {Cksumtype
.MD5
, Cksumtype
.SHA1
, Cksumtype
.CRC32
}
1031 def test_constrained_delegation_unkeyed_client_checksum(self
):
1032 # Present a user ticket with invalid checksums.
1033 for checksum
in self
.pac_checksum_types
:
1034 for ctype
in self
.unkeyed_ctypes
:
1035 with self
.subTest(checksum
=checksum
, ctype
=ctype
):
1036 if (checksum
== krb5pac
.PAC_TYPE_SRV_CHECKSUM
1037 and ctype
== Cksumtype
.SHA1
):
1038 expected_error_mode
= (KDC_ERR_SUMTYPE_NOSUPP
,
1039 KDC_ERR_INAPP_CKSUM
)
1041 expected_error_mode
= (KDC_ERR_GENERIC
,
1042 KDC_ERR_INAPP_CKSUM
)
1044 self
._run
_delegation
_test
(
1046 'expected_error_mode': expected_error_mode
,
1047 'allow_delegation': True,
1048 'modify_client_tkt_fn': functools
.partial(
1049 self
.unkeyed_pac_checksum
,
1050 checksum
=checksum
, ctype
=ctype
),
1051 'expect_edata': False
1054 def test_constrained_delegation_unkeyed_service_checksum(self
):
1055 # Present the service's ticket with invalid checksums.
1056 for checksum
in self
.pac_checksum_types
:
1057 for ctype
in self
.unkeyed_ctypes
:
1058 with self
.subTest(checksum
=checksum
, ctype
=ctype
):
1059 if checksum
== krb5pac
.PAC_TYPE_SRV_CHECKSUM
:
1060 if ctype
== Cksumtype
.SHA1
:
1061 expected_error_mode
= (KDC_ERR_SUMTYPE_NOSUPP
,
1062 KDC_ERR_INAPP_CKSUM
)
1063 expected_status
= ntstatus
.NT_STATUS_LOGON_FAILURE
1065 expected_error_mode
= (KDC_ERR_GENERIC
,
1066 KDC_ERR_INAPP_CKSUM
)
1068 ntstatus
.NT_STATUS_INSUFFICIENT_RESOURCES
)
1070 expected_error_mode
= 0
1071 expected_status
= None
1073 self
._run
_delegation
_test
(
1075 'expected_error_mode': expected_error_mode
,
1076 'expected_status': expected_status
,
1077 'allow_delegation': True,
1078 'modify_service_tgt_fn': functools
.partial(
1079 self
.unkeyed_pac_checksum
,
1080 checksum
=checksum
, ctype
=ctype
)
1083 def test_rbcd_unkeyed_client_checksum(self
):
1084 # Present a user ticket with invalid checksums.
1085 for checksum
in self
.pac_checksum_types
:
1086 for ctype
in self
.unkeyed_ctypes
:
1087 with self
.subTest(checksum
=checksum
, ctype
=ctype
):
1088 if (checksum
== krb5pac
.PAC_TYPE_SRV_CHECKSUM
1089 and ctype
== Cksumtype
.SHA1
):
1090 expected_error_mode
= KDC_ERR_SUMTYPE_NOSUPP
1092 expected_error_mode
= KDC_ERR_GENERIC
1094 self
._run
_delegation
_test
(
1096 'expected_error_mode': expected_error_mode
,
1098 ntstatus
.NT_STATUS_NOT_SUPPORTED
,
1100 'pac_options': '0001', # supports RBCD
1101 'modify_client_tkt_fn': functools
.partial(
1102 self
.unkeyed_pac_checksum
,
1103 checksum
=checksum
, ctype
=ctype
)
1106 def test_rbcd_unkeyed_service_checksum(self
):
1107 # Present the service's ticket with invalid checksums.
1108 for checksum
in self
.pac_checksum_types
:
1109 for ctype
in self
.unkeyed_ctypes
:
1110 with self
.subTest(checksum
=checksum
, ctype
=ctype
):
1111 if checksum
== krb5pac
.PAC_TYPE_SRV_CHECKSUM
:
1112 if ctype
== Cksumtype
.SHA1
:
1113 expected_error_mode
= (KDC_ERR_SUMTYPE_NOSUPP
,
1114 KDC_ERR_BAD_INTEGRITY
)
1115 expected_status
= ntstatus
.NT_STATUS_LOGON_FAILURE
1117 expected_error_mode
= KDC_ERR_GENERIC
1119 ntstatus
.NT_STATUS_INSUFFICIENT_RESOURCES
)
1121 expected_error_mode
= 0
1122 expected_status
= None
1124 self
._run
_delegation
_test
(
1126 'expected_error_mode': expected_error_mode
,
1127 'expected_status': expected_status
,
1129 'pac_options': '0001', # supports RBCD
1130 'modify_service_tgt_fn': functools
.partial(
1131 self
.unkeyed_pac_checksum
,
1132 checksum
=checksum
, ctype
=ctype
)
1135 def remove_pac_checksum(self
, ticket
, checksum
):
1136 checksum_keys
= self
.get_krbtgt_checksum_key()
1138 return self
.modified_ticket(ticket
,
1139 checksum_keys
=checksum_keys
,
1140 include_checksums
={checksum
: False})
1142 def zeroed_pac_checksum(self
, ticket
, checksum
):
1143 krbtgt_creds
= self
.get_krbtgt_creds()
1144 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
1146 server_key
= ticket
.decryption_key
1149 krb5pac
.PAC_TYPE_SRV_CHECKSUM
: server_key
,
1150 krb5pac
.PAC_TYPE_KDC_CHECKSUM
: krbtgt_key
,
1151 krb5pac
.PAC_TYPE_TICKET_CHECKSUM
: krbtgt_key
,
1154 if checksum
== krb5pac
.PAC_TYPE_SRV_CHECKSUM
:
1155 zeroed_key
= server_key
1157 zeroed_key
= krbtgt_key
1159 checksum_keys
[checksum
] = ZeroedChecksumKey(zeroed_key
.key
,
1162 return self
.modified_ticket(ticket
,
1163 checksum_keys
=checksum_keys
,
1164 include_checksums
={checksum
: True})
1166 def unkeyed_pac_checksum(self
, ticket
, checksum
, ctype
):
1167 krbtgt_creds
= self
.get_krbtgt_creds()
1168 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
1170 server_key
= ticket
.decryption_key
1173 krb5pac
.PAC_TYPE_SRV_CHECKSUM
: server_key
,
1174 krb5pac
.PAC_TYPE_KDC_CHECKSUM
: krbtgt_key
,
1175 krb5pac
.PAC_TYPE_TICKET_CHECKSUM
: krbtgt_key
,
1178 # Make a copy of the existing key and change the ctype.
1179 key
= checksum_keys
[checksum
]
1180 new_key
= RodcPacEncryptionKey(key
.key
, key
.kvno
)
1181 new_key
.ctype
= ctype
1182 checksum_keys
[checksum
] = new_key
1184 return self
.modified_ticket(ticket
,
1185 checksum_keys
=checksum_keys
,
1186 include_checksums
={checksum
: True})
1188 def add_delegation_info(self
, ticket
, services
=None):
1189 def modify_pac_fn(pac
):
1190 pac_buffers
= pac
.buffers
1191 self
.assertNotIn(krb5pac
.PAC_TYPE_CONSTRAINED_DELEGATION
,
1192 (buffer.type for buffer in pac_buffers
))
1194 transited_services
= list(map(lsa
.String
, services
))
1196 delegation
= krb5pac
.PAC_CONSTRAINED_DELEGATION()
1197 delegation
.proxy_target
= lsa
.String('test_proxy_target')
1198 delegation
.transited_services
= transited_services
1199 delegation
.num_transited_services
= len(transited_services
)
1201 info
= krb5pac
.PAC_CONSTRAINED_DELEGATION_CTR()
1202 info
.info
= delegation
1204 pac_buffer
= krb5pac
.PAC_BUFFER()
1205 pac_buffer
.type = krb5pac
.PAC_TYPE_CONSTRAINED_DELEGATION
1206 pac_buffer
.info
= info
1208 pac_buffers
.append(pac_buffer
)
1210 pac
.buffers
= pac_buffers
1211 pac
.num_buffers
+= 1
1215 checksum_keys
= self
.get_krbtgt_checksum_key()
1217 return self
.modified_ticket(ticket
,
1218 checksum_keys
=checksum_keys
,
1219 modify_pac_fn
=modify_pac_fn
)
1221 def set_ticket_forwardable(self
, ticket
, flag
, update_pac_checksums
=True):
1222 flag
= '1' if flag
else '0'
1224 def modify_fn(enc_part
):
1225 # Reset the forwardable flag
1226 forwardable_pos
= (len(tuple(krb5_asn1
.TicketFlags('forwardable')))
1229 flags
= enc_part
['flags']
1230 self
.assertLessEqual(forwardable_pos
, len(flags
))
1231 enc_part
['flags'] = (flags
[:forwardable_pos
] +
1233 flags
[forwardable_pos
+1:])
1237 if update_pac_checksums
:
1238 checksum_keys
= self
.get_krbtgt_checksum_key()
1240 checksum_keys
= None
1242 return self
.modified_ticket(ticket
,
1243 modify_fn
=modify_fn
,
1244 checksum_keys
=checksum_keys
,
1245 update_pac_checksums
=update_pac_checksums
)
1247 def remove_ticket_pac(self
, ticket
):
1248 return self
.modified_ticket(ticket
,
1252 if __name__
== "__main__":
1253 global_asn1_print
= False
1254 global_hexdump
= False