tests/krb5: Add tests for constrained delegation to NO_AUTH_DATA_REQUIRED service
[Samba.git] / python / samba / tests / krb5 / s4u_tests.py
blobbbb7135b55b38fa6467dac78a52055e8cfe9d00d
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import os
21 import functools
23 sys.path.insert(0, "bin/python")
24 os.environ["PYTHONUNBUFFERED"] = "1"
26 from samba import ntstatus
27 from samba.dcerpc import krb5pac, lsa
29 from samba.tests import env_get_var_value
30 from samba.tests.krb5.kcrypto import Cksumtype, Enctype
31 from samba.tests.krb5.kdc_base_test import KDCBaseTest
32 from samba.tests.krb5.raw_testcase import (
33 RodcPacEncryptionKey,
34 ZeroedChecksumKey
36 from samba.tests.krb5.rfc4120_constants import (
37 AES256_CTS_HMAC_SHA1_96,
38 ARCFOUR_HMAC_MD5,
39 KDC_ERR_BADOPTION,
40 KDC_ERR_BAD_INTEGRITY,
41 KDC_ERR_GENERIC,
42 KDC_ERR_INAPP_CKSUM,
43 KDC_ERR_MODIFIED,
44 KDC_ERR_SUMTYPE_NOSUPP,
45 KU_PA_ENC_TIMESTAMP,
46 KU_AS_REP_ENC_PART,
47 KU_TGS_REP_ENC_PART_SUB_KEY,
48 NT_PRINCIPAL
50 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
52 global_asn1_print = False
53 global_hexdump = False
56 class S4UKerberosTests(KDCBaseTest):
58 def setUp(self):
59 super(S4UKerberosTests, self).setUp()
60 self.do_asn1_print = global_asn1_print
61 self.do_hexdump = global_hexdump
63 def _test_s4u2self(self, pa_s4u2self_ctype=None):
64 service_creds = self.get_service_creds()
65 service = service_creds.get_username()
66 realm = service_creds.get_realm()
68 cname = self.PrincipalName_create(name_type=1, names=[service])
69 sname = self.PrincipalName_create(name_type=2, names=["krbtgt", realm])
71 till = self.get_KerberosTime(offset=36000)
73 kdc_options = krb5_asn1.KDCOptions('forwardable')
74 padata = None
76 etypes = (18, 17, 23)
78 req = self.AS_REQ_create(padata=padata,
79 kdc_options=str(kdc_options),
80 cname=cname,
81 realm=realm,
82 sname=sname,
83 from_time=None,
84 till_time=till,
85 renew_time=None,
86 nonce=0x7fffffff,
87 etypes=etypes,
88 addresses=None,
89 additional_tickets=None)
90 rep = self.send_recv_transaction(req)
91 self.assertIsNotNone(rep)
93 self.assertEqual(rep['msg-type'], 30)
94 self.assertEqual(rep['error-code'], 25)
95 rep_padata = self.der_decode(
96 rep['e-data'], asn1Spec=krb5_asn1.METHOD_DATA())
98 for pa in rep_padata:
99 if pa['padata-type'] == 19:
100 etype_info2 = pa['padata-value']
101 break
103 etype_info2 = self.der_decode(
104 etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2())
106 key = self.PasswordKey_from_etype_info2(service_creds, etype_info2[0])
108 (patime, pausec) = self.get_KerberosTimeWithUsec()
109 pa_ts = self.PA_ENC_TS_ENC_create(patime, pausec)
110 pa_ts = self.der_encode(pa_ts, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
112 pa_ts = self.EncryptedData_create(key, KU_PA_ENC_TIMESTAMP, pa_ts)
113 pa_ts = self.der_encode(pa_ts, asn1Spec=krb5_asn1.EncryptedData())
115 pa_ts = self.PA_DATA_create(2, pa_ts)
117 kdc_options = krb5_asn1.KDCOptions('forwardable')
118 padata = [pa_ts]
120 req = self.AS_REQ_create(padata=padata,
121 kdc_options=str(kdc_options),
122 cname=cname,
123 realm=realm,
124 sname=sname,
125 from_time=None,
126 till_time=till,
127 renew_time=None,
128 nonce=0x7fffffff,
129 etypes=etypes,
130 addresses=None,
131 additional_tickets=None)
132 rep = self.send_recv_transaction(req)
133 self.assertIsNotNone(rep)
135 msg_type = rep['msg-type']
136 self.assertEqual(msg_type, 11)
138 enc_part2 = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
139 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
140 # application tag 26
141 try:
142 enc_part2 = self.der_decode(
143 enc_part2, asn1Spec=krb5_asn1.EncASRepPart())
144 except Exception:
145 enc_part2 = self.der_decode(
146 enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
148 # S4U2Self Request
149 sname = cname
151 for_user_name = env_get_var_value('FOR_USER')
152 uname = self.PrincipalName_create(name_type=1, names=[for_user_name])
154 kdc_options = krb5_asn1.KDCOptions('forwardable')
155 till = self.get_KerberosTime(offset=36000)
156 ticket = rep['ticket']
157 ticket_session_key = self.EncryptionKey_import(enc_part2['key'])
158 pa_s4u = self.PA_S4U2Self_create(name=uname, realm=realm,
159 tgt_session_key=ticket_session_key,
160 ctype=pa_s4u2self_ctype)
161 padata = [pa_s4u]
163 subkey = self.RandomKey(ticket_session_key.etype)
165 (ctime, cusec) = self.get_KerberosTimeWithUsec()
167 req = self.TGS_REQ_create(padata=padata,
168 cusec=cusec,
169 ctime=ctime,
170 ticket=ticket,
171 kdc_options=str(kdc_options),
172 cname=cname,
173 realm=realm,
174 sname=sname,
175 from_time=None,
176 till_time=till,
177 renew_time=None,
178 nonce=0x7ffffffe,
179 etypes=etypes,
180 addresses=None,
181 EncAuthorizationData=None,
182 EncAuthorizationData_key=None,
183 additional_tickets=None,
184 ticket_session_key=ticket_session_key,
185 authenticator_subkey=subkey)
186 rep = self.send_recv_transaction(req)
187 self.assertIsNotNone(rep)
189 msg_type = rep['msg-type']
190 if msg_type == 13:
191 enc_part2 = subkey.decrypt(
192 KU_TGS_REP_ENC_PART_SUB_KEY, rep['enc-part']['cipher'])
193 enc_part2 = self.der_decode(
194 enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
196 return msg_type
198 # Using the checksum type from the tgt_session_key happens to work
199 # everywhere
200 def test_s4u2self(self):
201 msg_type = self._test_s4u2self()
202 self.assertEqual(msg_type, 13)
204 # Per spec, the checksum of PA-FOR-USER is HMAC_MD5, see [MS-SFU] 2.2.1
205 def test_s4u2self_hmac_md5_checksum(self):
206 msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.HMAC_MD5)
207 self.assertEqual(msg_type, 13)
209 def test_s4u2self_md5_unkeyed_checksum(self):
210 msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.MD5)
211 self.assertEqual(msg_type, 30)
213 def test_s4u2self_sha1_unkeyed_checksum(self):
214 msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.SHA1)
215 self.assertEqual(msg_type, 30)
217 def test_s4u2self_crc32_unkeyed_checksum(self):
218 msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.CRC32)
219 self.assertEqual(msg_type, 30)
221 def _run_s4u2self_test(self, kdc_dict):
222 client_opts = kdc_dict.pop('client_opts', None)
223 client_creds = self.get_cached_creds(machine_account=False,
224 opts=client_opts)
226 service_opts = kdc_dict.pop('service_opts', None)
227 service_creds = self.get_cached_creds(machine_account=True,
228 opts=service_opts)
230 service_tgt = self.get_tgt(service_creds)
231 modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
232 if modify_service_tgt_fn is not None:
233 service_tgt = modify_service_tgt_fn(service_tgt)
235 client_name = client_creds.get_username()
236 client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
237 names=[client_name])
239 service_name = service_creds.get_username()[:-1]
240 service_sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
241 names=['host', service_name])
243 realm = client_creds.get_realm()
245 expected_flags = kdc_dict.pop('expected_flags', None)
246 if expected_flags is not None:
247 expected_flags = krb5_asn1.TicketFlags(expected_flags)
249 unexpected_flags = kdc_dict.pop('unexpected_flags', None)
250 if unexpected_flags is not None:
251 unexpected_flags = krb5_asn1.TicketFlags(unexpected_flags)
253 kdc_options = kdc_dict.pop('kdc_options', '0')
254 kdc_options = krb5_asn1.KDCOptions(kdc_options)
256 service_decryption_key = self.TicketDecryptionKey_from_creds(
257 service_creds)
259 authenticator_subkey = self.RandomKey(Enctype.AES256)
261 etypes = kdc_dict.pop('etypes', (AES256_CTS_HMAC_SHA1_96,
262 ARCFOUR_HMAC_MD5))
264 def generate_s4u2self_padata(_kdc_exchange_dict,
265 _callback_dict,
266 req_body):
267 pa_s4u = self.PA_S4U2Self_create(
268 name=client_cname,
269 realm=realm,
270 tgt_session_key=service_tgt.session_key,
271 ctype=None)
273 return [pa_s4u], req_body
275 kdc_exchange_dict = self.tgs_exchange_dict(
276 expected_crealm=realm,
277 expected_cname=client_cname,
278 expected_srealm=realm,
279 expected_sname=service_sname,
280 expected_flags=expected_flags,
281 unexpected_flags=unexpected_flags,
282 ticket_decryption_key=service_decryption_key,
283 expect_ticket_checksum=True,
284 generate_padata_fn=generate_s4u2self_padata,
285 check_rep_fn=self.generic_check_kdc_rep,
286 check_kdc_private_fn=self.generic_check_kdc_private,
287 expected_error_mode=0,
288 tgt=service_tgt,
289 authenticator_subkey=authenticator_subkey,
290 kdc_options=str(kdc_options),
291 expect_claims=False)
293 self._generic_kdc_exchange(kdc_exchange_dict,
294 cname=None,
295 realm=realm,
296 sname=service_sname,
297 etypes=etypes)
299 # Ensure we used all the parameters given to us.
300 self.assertEqual({}, kdc_dict)
302 # Test performing an S4U2Self operation with a forwardable ticket. The
303 # resulting ticket should have the 'forwardable' flag set.
304 def test_s4u2self_forwardable(self):
305 self._run_s4u2self_test(
307 'client_opts': {
308 'not_delegated': False
310 'kdc_options': 'forwardable',
311 'modify_service_tgt_fn': functools.partial(
312 self.set_ticket_forwardable, flag=True),
313 'expected_flags': 'forwardable'
316 # Test performing an S4U2Self operation without requesting a forwardable
317 # ticket. The resulting ticket should not have the 'forwardable' flag set.
318 def test_s4u2self_without_forwardable(self):
319 self._run_s4u2self_test(
321 'client_opts': {
322 'not_delegated': False
324 'modify_service_tgt_fn': functools.partial(
325 self.set_ticket_forwardable, flag=True),
326 'unexpected_flags': 'forwardable'
329 # Do an S4U2Self with a non-forwardable TGT. The 'forwardable' flag should
330 # not be set on the ticket.
331 def test_s4u2self_not_forwardable(self):
332 self._run_s4u2self_test(
334 'client_opts': {
335 'not_delegated': False
337 'kdc_options': 'forwardable',
338 'modify_service_tgt_fn': functools.partial(
339 self.set_ticket_forwardable, flag=False),
340 'unexpected_flags': 'forwardable'
343 # Do an S4U2Self with the not_delegated flag set on the client. The
344 # 'forwardable' flag should not be set on the ticket.
345 def test_s4u2self_client_not_delegated(self):
346 self._run_s4u2self_test(
348 'client_opts': {
349 'not_delegated': True
351 'kdc_options': 'forwardable',
352 'modify_service_tgt_fn': functools.partial(
353 self.set_ticket_forwardable, flag=True),
354 'unexpected_flags': 'forwardable'
357 # Do an S4U2Self with a service not trusted to authenticate for delegation,
358 # but having an empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
359 # flag should be set on the ticket.
360 def test_s4u2self_not_trusted_empty_allowed(self):
361 self._run_s4u2self_test(
363 'client_opts': {
364 'not_delegated': False
366 'service_opts': {
367 'trusted_to_auth_for_delegation': False,
368 'delegation_to_spn': ()
370 'kdc_options': 'forwardable',
371 'modify_service_tgt_fn': functools.partial(
372 self.set_ticket_forwardable, flag=True),
373 'expected_flags': 'forwardable'
376 # Do an S4U2Self with a service not trusted to authenticate for delegation
377 # and having a non-empty msDS-AllowedToDelegateTo attribute. The
378 # 'forwardable' flag should not be set on the ticket.
379 def test_s4u2self_not_trusted_nonempty_allowed(self):
380 self._run_s4u2self_test(
382 'client_opts': {
383 'not_delegated': False
385 'service_opts': {
386 'trusted_to_auth_for_delegation': False,
387 'delegation_to_spn': ('test',)
389 'kdc_options': 'forwardable',
390 'modify_service_tgt_fn': functools.partial(
391 self.set_ticket_forwardable, flag=True),
392 'unexpected_flags': 'forwardable'
395 # Do an S4U2Self with a service trusted to authenticate for delegation and
396 # having an empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
397 # flag should be set on the ticket.
398 def test_s4u2self_trusted_empty_allowed(self):
399 self._run_s4u2self_test(
401 'client_opts': {
402 'not_delegated': False
404 'service_opts': {
405 'trusted_to_auth_for_delegation': True,
406 'delegation_to_spn': ()
408 'kdc_options': 'forwardable',
409 'modify_service_tgt_fn': functools.partial(
410 self.set_ticket_forwardable, flag=True),
411 'expected_flags': 'forwardable'
414 # Do an S4U2Self with a service trusted to authenticate for delegation and
415 # having a non-empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
416 # flag should be set on the ticket.
417 def test_s4u2self_trusted_nonempty_allowed(self):
418 self._run_s4u2self_test(
420 'client_opts': {
421 'not_delegated': False
423 'service_opts': {
424 'trusted_to_auth_for_delegation': True,
425 'delegation_to_spn': ('test',)
427 'kdc_options': 'forwardable',
428 'modify_service_tgt_fn': functools.partial(
429 self.set_ticket_forwardable, flag=True),
430 'expected_flags': 'forwardable'
433 def _run_delegation_test(self, kdc_dict):
434 client_opts = kdc_dict.pop('client_opts', None)
435 client_creds = self.get_cached_creds(machine_account=False,
436 opts=client_opts)
438 service1_opts = kdc_dict.pop('service1_opts', {})
439 service2_opts = kdc_dict.pop('service2_opts', {})
441 allow_delegation = kdc_dict.pop('allow_delegation', False)
442 allow_rbcd = kdc_dict.pop('allow_rbcd', False)
443 self.assertFalse(allow_delegation and allow_rbcd)
445 if allow_rbcd:
446 service1_creds = self.get_cached_creds(machine_account=True,
447 opts=service1_opts)
449 self.assertNotIn('delegation_from_dn', service2_opts)
450 service2_opts['delegation_from_dn'] = str(service1_creds.get_dn())
452 service2_creds = self.get_cached_creds(machine_account=True,
453 opts=service2_opts)
454 else:
455 service2_creds = self.get_cached_creds(machine_account=True,
456 opts=service2_opts)
458 if allow_delegation:
459 self.assertNotIn('delegation_to_spn', service1_opts)
460 service1_opts['delegation_to_spn'] = service2_creds.get_spn()
462 service1_creds = self.get_cached_creds(machine_account=True,
463 opts=service1_opts)
465 client_tkt_options = kdc_dict.pop('client_tkt_options', 'forwardable')
466 expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
468 client_tgt = self.get_tgt(client_creds,
469 kdc_options=client_tkt_options,
470 expected_flags=expected_flags)
471 client_service_tkt = self.get_service_ticket(
472 client_tgt,
473 service1_creds,
474 kdc_options=client_tkt_options,
475 expected_flags=expected_flags)
477 service1_tgt = self.get_tgt(service1_creds)
479 modify_client_tkt_fn = kdc_dict.pop('modify_client_tkt_fn', None)
480 if modify_client_tkt_fn is not None:
481 client_service_tkt = modify_client_tkt_fn(client_service_tkt)
483 additional_tickets = [client_service_tkt.ticket]
485 modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
486 if modify_service_tgt_fn is not None:
487 service1_tgt = modify_service_tgt_fn(service1_tgt)
489 kdc_options = kdc_dict.pop('kdc_options', None)
490 if kdc_options is None:
491 kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
493 client_username = client_creds.get_username()
494 client_realm = client_creds.get_realm()
495 client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
496 names=[client_username])
498 service1_name = service1_creds.get_username()[:-1]
499 service1_realm = service1_creds.get_realm()
501 service2_name = service2_creds.get_username()[:-1]
502 service2_realm = service2_creds.get_realm()
503 service2_service = 'host'
504 service2_sname = self.PrincipalName_create(
505 name_type=NT_PRINCIPAL, names=[service2_service,
506 service2_name])
507 service2_decryption_key = self.TicketDecryptionKey_from_creds(
508 service2_creds)
509 service2_etypes = service2_creds.tgs_supported_enctypes
511 expected_error_mode = kdc_dict.pop('expected_error_mode')
512 expected_status = kdc_dict.pop('expected_status', None)
513 if expected_error_mode:
514 check_error_fn = self.generic_check_kdc_error
515 check_rep_fn = None
516 else:
517 check_error_fn = None
518 check_rep_fn = self.generic_check_kdc_rep
520 self.assertIsNone(expected_status)
522 expect_edata = kdc_dict.pop('expect_edata', None)
523 if expect_edata is not None:
524 self.assertTrue(expected_error_mode)
526 pac_options = kdc_dict.pop('pac_options', None)
528 authenticator_subkey = self.RandomKey(Enctype.AES256)
530 etypes = kdc_dict.pop('etypes', (AES256_CTS_HMAC_SHA1_96,
531 ARCFOUR_HMAC_MD5))
533 expected_proxy_target = service2_creds.get_spn()
535 expected_transited_services = kdc_dict.pop(
536 'expected_transited_services', [])
538 transited_service = f'host/{service1_name}@{service1_realm}'
539 expected_transited_services.append(transited_service)
541 expect_pac = kdc_dict.pop('expect_pac', True)
543 kdc_exchange_dict = self.tgs_exchange_dict(
544 expected_crealm=client_realm,
545 expected_cname=client_cname,
546 expected_srealm=service2_realm,
547 expected_sname=service2_sname,
548 expected_supported_etypes=service2_etypes,
549 ticket_decryption_key=service2_decryption_key,
550 check_error_fn=check_error_fn,
551 check_rep_fn=check_rep_fn,
552 check_kdc_private_fn=self.generic_check_kdc_private,
553 expected_error_mode=expected_error_mode,
554 expected_status=expected_status,
555 callback_dict={},
556 tgt=service1_tgt,
557 authenticator_subkey=authenticator_subkey,
558 kdc_options=kdc_options,
559 pac_options=pac_options,
560 expect_edata=expect_edata,
561 expected_proxy_target=expected_proxy_target,
562 expected_transited_services=expected_transited_services,
563 expect_pac=expect_pac)
565 self._generic_kdc_exchange(kdc_exchange_dict,
566 cname=None,
567 realm=service2_realm,
568 sname=service2_sname,
569 etypes=etypes,
570 additional_tickets=additional_tickets)
572 # Ensure we used all the parameters given to us.
573 self.assertEqual({}, kdc_dict)
575 def test_constrained_delegation(self):
576 # Test constrained delegation.
577 self._run_delegation_test(
579 'expected_error_mode': 0,
580 'allow_delegation': True
583 def test_constrained_delegation_no_auth_data_required(self):
584 # Test constrained delegation.
585 self._run_delegation_test(
587 'expected_error_mode': 0,
588 'allow_delegation': True,
589 'service2_opts': {
590 'no_auth_data_required': True
592 'expect_pac': False
595 def test_constrained_delegation_existing_delegation_info(self):
596 # Test constrained delegation with an existing S4U_DELEGATION_INFO
597 # structure in the PAC.
599 services = ['service1', 'service2', 'service3']
601 self._run_delegation_test(
603 'expected_error_mode': 0,
604 'allow_delegation': True,
605 'modify_client_tkt_fn': functools.partial(
606 self.add_delegation_info, services=services),
607 'expected_transited_services': services
610 def test_constrained_delegation_not_allowed(self):
611 # Test constrained delegation when the delegating service does not
612 # allow it.
613 self._run_delegation_test(
615 'expected_error_mode': KDC_ERR_BADOPTION,
616 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
617 'allow_delegation': False
620 def test_constrained_delegation_no_client_pac(self):
621 # Test constrained delegation when the client service ticket does not
622 # contain a PAC.
623 self._run_delegation_test(
625 'expected_error_mode': (KDC_ERR_BADOPTION,
626 KDC_ERR_MODIFIED),
627 'allow_delegation': True,
628 'modify_client_tkt_fn': self.remove_ticket_pac,
629 'expect_edata': False
632 def test_constrained_delegation_no_service_pac(self):
633 # Test constrained delegation when the service TGT does not contain a
634 # PAC.
635 self._run_delegation_test(
637 'expected_error_mode': 0,
638 'allow_delegation': True,
639 'modify_service_tgt_fn': self.remove_ticket_pac
642 def test_constrained_delegation_no_client_pac_no_auth_data_required(self):
643 # Test constrained delegation when the client service ticket does not
644 # contain a PAC.
645 self._run_delegation_test(
647 'expected_error_mode': (KDC_ERR_BADOPTION,
648 KDC_ERR_MODIFIED),
649 'allow_delegation': True,
650 'modify_client_tkt_fn': self.remove_ticket_pac,
651 'expect_edata': False,
652 'service2_opts': {
653 'no_auth_data_required': True
657 def test_constrained_delegation_no_service_pac_no_auth_data_required(self):
658 # Test constrained delegation when the service TGT does not contain a
659 # PAC.
660 self._run_delegation_test(
662 'expected_error_mode': (KDC_ERR_BADOPTION,
663 KDC_ERR_MODIFIED),
664 'allow_delegation': True,
665 'modify_service_tgt_fn': self.remove_ticket_pac,
666 'service2_opts': {
667 'no_auth_data_required': True
671 def test_constrained_delegation_non_forwardable(self):
672 # Test constrained delegation with a non-forwardable ticket.
673 self._run_delegation_test(
675 'expected_error_mode': KDC_ERR_BADOPTION,
676 'expected_status': ntstatus.NT_STATUS_ACCOUNT_RESTRICTION,
677 'allow_delegation': True,
678 'modify_client_tkt_fn': functools.partial(
679 self.set_ticket_forwardable, flag=False)
682 def test_constrained_delegation_pac_options_rbcd(self):
683 # Test constrained delegation, but with the RBCD bit set in the PAC
684 # options.
685 self._run_delegation_test(
687 'expected_error_mode': 0,
688 'pac_options': '0001', # supports RBCD
689 'allow_delegation': True
692 def test_rbcd_no_auth_data_required(self):
693 self._run_delegation_test(
695 'expected_error_mode': 0,
696 'allow_rbcd': True,
697 'pac_options': '0001', # supports RBCD
698 'service2_opts': {
699 'no_auth_data_required': True
701 'expect_pac': False
704 def test_rbcd_existing_delegation_info(self):
705 # Test constrained delegation with an existing S4U_DELEGATION_INFO
706 # structure in the PAC.
708 services = ['service1', 'service2', 'service3']
710 self._run_delegation_test(
712 'expected_error_mode': 0,
713 'allow_rbcd': True,
714 'pac_options': '0001', # supports RBCD
715 'modify_client_tkt_fn': functools.partial(
716 self.add_delegation_info, services=services),
717 'expected_transited_services': services
720 def test_rbcd_not_allowed(self):
721 # Test resource-based constrained delegation when the target service
722 # does not allow it.
723 self._run_delegation_test(
725 'expected_error_mode': KDC_ERR_BADOPTION,
726 'expected_status': ntstatus.NT_STATUS_NOT_FOUND,
727 'allow_rbcd': False,
728 'pac_options': '0001' # supports RBCD
731 def test_rbcd_no_client_pac_a(self):
732 # Test constrained delegation when the client service ticket does not
733 # contain a PAC, and an empty msDS-AllowedToDelegateTo attribute.
734 self._run_delegation_test(
736 'expected_error_mode': KDC_ERR_MODIFIED,
737 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
738 'allow_rbcd': True,
739 'pac_options': '0001', # supports RBCD
740 'modify_client_tkt_fn': self.remove_ticket_pac
743 def test_rbcd_no_client_pac_b(self):
744 # Test constrained delegation when the client service ticket does not
745 # contain a PAC, and a non-empty msDS-AllowedToDelegateTo attribute.
746 self._run_delegation_test(
748 'expected_error_mode': KDC_ERR_MODIFIED,
749 'expected_status': ntstatus.NT_STATUS_NO_MATCH,
750 'allow_rbcd': True,
751 'pac_options': '0001', # supports RBCD
752 'modify_client_tkt_fn': self.remove_ticket_pac,
753 'service1_opts': {
754 'delegation_to_spn': ('host/test')
758 def test_rbcd_no_service_pac(self):
759 # Test constrained delegation when the service TGT does not contain a
760 # PAC.
761 self._run_delegation_test(
763 'expected_error_mode': KDC_ERR_BADOPTION,
764 'expected_status':
765 ntstatus.NT_STATUS_NOT_FOUND,
766 'allow_rbcd': True,
767 'pac_options': '0001', # supports RBCD
768 'modify_service_tgt_fn': self.remove_ticket_pac
771 def test_rbcd_no_client_pac_no_auth_data_required_a(self):
772 # Test constrained delegation when the client service ticket does not
773 # contain a PAC, and an empty msDS-AllowedToDelegateTo attribute.
774 self._run_delegation_test(
776 'expected_error_mode': KDC_ERR_MODIFIED,
777 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
778 'allow_rbcd': True,
779 'pac_options': '0001', # supports RBCD
780 'modify_client_tkt_fn': self.remove_ticket_pac,
781 'service2_opts': {
782 'no_auth_data_required': True
786 def test_rbcd_no_client_pac_no_auth_data_required_b(self):
787 # Test constrained delegation when the client service ticket does not
788 # contain a PAC, and a non-empty msDS-AllowedToDelegateTo attribute.
789 self._run_delegation_test(
791 'expected_error_mode': KDC_ERR_MODIFIED,
792 'expected_status': ntstatus.NT_STATUS_NO_MATCH,
793 'allow_rbcd': True,
794 'pac_options': '0001', # supports RBCD
795 'modify_client_tkt_fn': self.remove_ticket_pac,
796 'service1_opts': {
797 'delegation_to_spn': ('host/test')
799 'service2_opts': {
800 'no_auth_data_required': True
804 def test_rbcd_no_service_pac_no_auth_data_required(self):
805 # Test constrained delegation when the service TGT does not contain a
806 # PAC.
807 self._run_delegation_test(
809 'expected_error_mode': KDC_ERR_BADOPTION,
810 'expected_status':
811 ntstatus.NT_STATUS_NOT_FOUND,
812 'allow_rbcd': True,
813 'pac_options': '0001', # supports RBCD
814 'modify_service_tgt_fn': self.remove_ticket_pac,
815 'service2_opts': {
816 'no_auth_data_required': True
820 def test_rbcd_non_forwardable(self):
821 # Test resource-based constrained delegation with a non-forwardable
822 # ticket.
823 self._run_delegation_test(
825 'expected_error_mode': KDC_ERR_BADOPTION,
826 'expected_status': ntstatus.NT_STATUS_ACCOUNT_RESTRICTION,
827 'allow_rbcd': True,
828 'pac_options': '0001', # supports RBCD
829 'modify_client_tkt_fn': functools.partial(
830 self.set_ticket_forwardable, flag=False)
833 def test_rbcd_no_pac_options_a(self):
834 # Test resource-based constrained delegation without the RBCD bit set
835 # in the PAC options, and an empty msDS-AllowedToDelegateTo attribute.
836 self._run_delegation_test(
838 'expected_error_mode': KDC_ERR_BADOPTION,
839 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
840 'allow_rbcd': True,
841 'pac_options': '1' # does not support RBCD
844 def test_rbcd_no_pac_options_b(self):
845 # Test resource-based constrained delegation without the RBCD bit set
846 # in the PAC options, and a non-empty msDS-AllowedToDelegateTo
847 # attribute.
848 self._run_delegation_test(
850 'expected_error_mode': KDC_ERR_BADOPTION,
851 'expected_status': ntstatus.NT_STATUS_NO_MATCH,
852 'allow_rbcd': True,
853 'pac_options': '1', # does not support RBCD
854 'service1_opts': {
855 'delegation_to_spn': ('host/test')
859 def test_bronze_bit_constrained_delegation_old_checksum(self):
860 # Attempt to modify the ticket without updating the PAC checksums.
861 self._run_delegation_test(
863 'expected_error_mode': (KDC_ERR_MODIFIED,
864 KDC_ERR_BAD_INTEGRITY),
865 'allow_delegation': True,
866 'client_tkt_options': '0', # non-forwardable ticket
867 'modify_client_tkt_fn': functools.partial(
868 self.set_ticket_forwardable,
869 flag=True, update_pac_checksums=False),
870 'expect_edata': False
873 def test_bronze_bit_rbcd_old_checksum(self):
874 # Attempt to modify the ticket without updating the PAC checksums.
875 self._run_delegation_test(
877 'expected_error_mode': KDC_ERR_MODIFIED,
878 'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
879 'allow_rbcd': True,
880 'pac_options': '0001', # supports RBCD
881 'client_tkt_options': '0', # non-forwardable ticket
882 'modify_client_tkt_fn': functools.partial(
883 self.set_ticket_forwardable,
884 flag=True, update_pac_checksums=False)
887 def test_constrained_delegation_missing_client_checksum(self):
888 # Present a user ticket without the required checksums.
889 for checksum in self.pac_checksum_types:
890 with self.subTest(checksum=checksum):
891 if checksum == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
892 expected_error_mode = (KDC_ERR_BADOPTION,
893 KDC_ERR_MODIFIED)
894 else:
895 expected_error_mode = KDC_ERR_GENERIC
897 self._run_delegation_test(
899 'expected_error_mode': expected_error_mode,
900 'allow_delegation': True,
901 'modify_client_tkt_fn': functools.partial(
902 self.remove_pac_checksum, checksum=checksum),
903 'expect_edata': False
906 def test_constrained_delegation_missing_service_checksum(self):
907 # Present the service's ticket without the required checksums.
908 for checksum in filter(lambda x: x != krb5pac.PAC_TYPE_TICKET_CHECKSUM,
909 self.pac_checksum_types):
910 with self.subTest(checksum=checksum):
911 self._run_delegation_test(
913 'expected_error_mode': KDC_ERR_GENERIC,
914 'expected_status':
915 ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES,
916 'allow_delegation': True,
917 'modify_service_tgt_fn': functools.partial(
918 self.remove_pac_checksum, checksum=checksum)
921 def test_rbcd_missing_client_checksum(self):
922 # Present a user ticket without the required checksums.
923 for checksum in self.pac_checksum_types:
924 with self.subTest(checksum=checksum):
925 if checksum == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
926 expected_error_mode = KDC_ERR_MODIFIED
927 else:
928 expected_error_mode = KDC_ERR_GENERIC
930 self._run_delegation_test(
932 'expected_error_mode': expected_error_mode,
933 'expected_status':
934 ntstatus.NT_STATUS_NOT_SUPPORTED,
935 'allow_rbcd': True,
936 'pac_options': '0001', # supports RBCD
937 'modify_client_tkt_fn': functools.partial(
938 self.remove_pac_checksum, checksum=checksum)
941 def test_rbcd_missing_service_checksum(self):
942 # Present the service's ticket without the required checksums.
943 for checksum in filter(lambda x: x != krb5pac.PAC_TYPE_TICKET_CHECKSUM,
944 self.pac_checksum_types):
945 with self.subTest(checksum=checksum):
946 self._run_delegation_test(
948 'expected_error_mode': KDC_ERR_GENERIC,
949 'expected_status':
950 ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES,
951 'allow_rbcd': True,
952 'pac_options': '0001', # supports RBCD
953 'modify_service_tgt_fn': functools.partial(
954 self.remove_pac_checksum, checksum=checksum)
957 def test_constrained_delegation_zeroed_client_checksum(self):
958 # Present a user ticket with invalid checksums.
959 for checksum in self.pac_checksum_types:
960 with self.subTest(checksum=checksum):
961 self._run_delegation_test(
963 'expected_error_mode': (KDC_ERR_MODIFIED,
964 KDC_ERR_BAD_INTEGRITY),
965 'allow_delegation': True,
966 'modify_client_tkt_fn': functools.partial(
967 self.zeroed_pac_checksum, checksum=checksum),
968 'expect_edata': False
971 def test_constrained_delegation_zeroed_service_checksum(self):
972 # Present the service's ticket with invalid checksums.
973 for checksum in self.pac_checksum_types:
974 with self.subTest(checksum=checksum):
975 if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
976 expected_error_mode = (KDC_ERR_MODIFIED,
977 KDC_ERR_BAD_INTEGRITY)
978 expected_status = ntstatus.NT_STATUS_WRONG_PASSWORD
979 else:
980 expected_error_mode = 0
981 expected_status = None
983 self._run_delegation_test(
985 'expected_error_mode': expected_error_mode,
986 'expected_status': expected_status,
987 'allow_delegation': True,
988 'modify_service_tgt_fn': functools.partial(
989 self.zeroed_pac_checksum, checksum=checksum)
992 def test_rbcd_zeroed_client_checksum(self):
993 # Present a user ticket with invalid checksums.
994 for checksum in self.pac_checksum_types:
995 with self.subTest(checksum=checksum):
996 self._run_delegation_test(
998 'expected_error_mode': KDC_ERR_MODIFIED,
999 'expected_status':
1000 ntstatus.NT_STATUS_NOT_SUPPORTED,
1001 'allow_rbcd': True,
1002 'pac_options': '0001', # supports RBCD
1003 'modify_client_tkt_fn': functools.partial(
1004 self.zeroed_pac_checksum, checksum=checksum)
1007 def test_rbcd_zeroed_service_checksum(self):
1008 # Present the service's ticket with invalid checksums.
1009 for checksum in self.pac_checksum_types:
1010 with self.subTest(checksum=checksum):
1011 if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1012 expected_error_mode = (KDC_ERR_MODIFIED,
1013 KDC_ERR_BAD_INTEGRITY)
1014 expected_status = ntstatus.NT_STATUS_WRONG_PASSWORD
1015 else:
1016 expected_error_mode = 0
1017 expected_status = None
1019 self._run_delegation_test(
1021 'expected_error_mode': expected_error_mode,
1022 'expected_status': expected_status,
1023 'allow_rbcd': True,
1024 'pac_options': '0001', # supports RBCD
1025 'modify_service_tgt_fn': functools.partial(
1026 self.zeroed_pac_checksum, checksum=checksum)
1029 unkeyed_ctypes = {Cksumtype.MD5, Cksumtype.SHA1, Cksumtype.CRC32}
1031 def test_constrained_delegation_unkeyed_client_checksum(self):
1032 # Present a user ticket with invalid checksums.
1033 for checksum in self.pac_checksum_types:
1034 for ctype in self.unkeyed_ctypes:
1035 with self.subTest(checksum=checksum, ctype=ctype):
1036 if (checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM
1037 and ctype == Cksumtype.SHA1):
1038 expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
1039 KDC_ERR_INAPP_CKSUM)
1040 else:
1041 expected_error_mode = (KDC_ERR_GENERIC,
1042 KDC_ERR_INAPP_CKSUM)
1044 self._run_delegation_test(
1046 'expected_error_mode': expected_error_mode,
1047 'allow_delegation': True,
1048 'modify_client_tkt_fn': functools.partial(
1049 self.unkeyed_pac_checksum,
1050 checksum=checksum, ctype=ctype),
1051 'expect_edata': False
1054 def test_constrained_delegation_unkeyed_service_checksum(self):
1055 # Present the service's ticket with invalid checksums.
1056 for checksum in self.pac_checksum_types:
1057 for ctype in self.unkeyed_ctypes:
1058 with self.subTest(checksum=checksum, ctype=ctype):
1059 if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1060 if ctype == Cksumtype.SHA1:
1061 expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
1062 KDC_ERR_INAPP_CKSUM)
1063 expected_status = ntstatus.NT_STATUS_LOGON_FAILURE
1064 else:
1065 expected_error_mode = (KDC_ERR_GENERIC,
1066 KDC_ERR_INAPP_CKSUM)
1067 expected_status = (
1068 ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
1069 else:
1070 expected_error_mode = 0
1071 expected_status = None
1073 self._run_delegation_test(
1075 'expected_error_mode': expected_error_mode,
1076 'expected_status': expected_status,
1077 'allow_delegation': True,
1078 'modify_service_tgt_fn': functools.partial(
1079 self.unkeyed_pac_checksum,
1080 checksum=checksum, ctype=ctype)
1083 def test_rbcd_unkeyed_client_checksum(self):
1084 # Present a user ticket with invalid checksums.
1085 for checksum in self.pac_checksum_types:
1086 for ctype in self.unkeyed_ctypes:
1087 with self.subTest(checksum=checksum, ctype=ctype):
1088 if (checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM
1089 and ctype == Cksumtype.SHA1):
1090 expected_error_mode = KDC_ERR_SUMTYPE_NOSUPP
1091 else:
1092 expected_error_mode = KDC_ERR_GENERIC
1094 self._run_delegation_test(
1096 'expected_error_mode': expected_error_mode,
1097 'expected_status':
1098 ntstatus.NT_STATUS_NOT_SUPPORTED,
1099 'allow_rbcd': True,
1100 'pac_options': '0001', # supports RBCD
1101 'modify_client_tkt_fn': functools.partial(
1102 self.unkeyed_pac_checksum,
1103 checksum=checksum, ctype=ctype)
1106 def test_rbcd_unkeyed_service_checksum(self):
1107 # Present the service's ticket with invalid checksums.
1108 for checksum in self.pac_checksum_types:
1109 for ctype in self.unkeyed_ctypes:
1110 with self.subTest(checksum=checksum, ctype=ctype):
1111 if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1112 if ctype == Cksumtype.SHA1:
1113 expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
1114 KDC_ERR_BAD_INTEGRITY)
1115 expected_status = ntstatus.NT_STATUS_LOGON_FAILURE
1116 else:
1117 expected_error_mode = KDC_ERR_GENERIC
1118 expected_status = (
1119 ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
1120 else:
1121 expected_error_mode = 0
1122 expected_status = None
1124 self._run_delegation_test(
1126 'expected_error_mode': expected_error_mode,
1127 'expected_status': expected_status,
1128 'allow_rbcd': True,
1129 'pac_options': '0001', # supports RBCD
1130 'modify_service_tgt_fn': functools.partial(
1131 self.unkeyed_pac_checksum,
1132 checksum=checksum, ctype=ctype)
1135 def remove_pac_checksum(self, ticket, checksum):
1136 checksum_keys = self.get_krbtgt_checksum_key()
1138 return self.modified_ticket(ticket,
1139 checksum_keys=checksum_keys,
1140 include_checksums={checksum: False})
1142 def zeroed_pac_checksum(self, ticket, checksum):
1143 krbtgt_creds = self.get_krbtgt_creds()
1144 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
1146 server_key = ticket.decryption_key
1148 checksum_keys = {
1149 krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
1150 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
1151 krb5pac.PAC_TYPE_TICKET_CHECKSUM: krbtgt_key,
1154 if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
1155 zeroed_key = server_key
1156 else:
1157 zeroed_key = krbtgt_key
1159 checksum_keys[checksum] = ZeroedChecksumKey(zeroed_key.key,
1160 zeroed_key.kvno)
1162 return self.modified_ticket(ticket,
1163 checksum_keys=checksum_keys,
1164 include_checksums={checksum: True})
1166 def unkeyed_pac_checksum(self, ticket, checksum, ctype):
1167 krbtgt_creds = self.get_krbtgt_creds()
1168 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
1170 server_key = ticket.decryption_key
1172 checksum_keys = {
1173 krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
1174 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
1175 krb5pac.PAC_TYPE_TICKET_CHECKSUM: krbtgt_key,
1178 # Make a copy of the existing key and change the ctype.
1179 key = checksum_keys[checksum]
1180 new_key = RodcPacEncryptionKey(key.key, key.kvno)
1181 new_key.ctype = ctype
1182 checksum_keys[checksum] = new_key
1184 return self.modified_ticket(ticket,
1185 checksum_keys=checksum_keys,
1186 include_checksums={checksum: True})
1188 def add_delegation_info(self, ticket, services=None):
1189 def modify_pac_fn(pac):
1190 pac_buffers = pac.buffers
1191 self.assertNotIn(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION,
1192 (buffer.type for buffer in pac_buffers))
1194 transited_services = list(map(lsa.String, services))
1196 delegation = krb5pac.PAC_CONSTRAINED_DELEGATION()
1197 delegation.proxy_target = lsa.String('test_proxy_target')
1198 delegation.transited_services = transited_services
1199 delegation.num_transited_services = len(transited_services)
1201 info = krb5pac.PAC_CONSTRAINED_DELEGATION_CTR()
1202 info.info = delegation
1204 pac_buffer = krb5pac.PAC_BUFFER()
1205 pac_buffer.type = krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION
1206 pac_buffer.info = info
1208 pac_buffers.append(pac_buffer)
1210 pac.buffers = pac_buffers
1211 pac.num_buffers += 1
1213 return pac
1215 checksum_keys = self.get_krbtgt_checksum_key()
1217 return self.modified_ticket(ticket,
1218 checksum_keys=checksum_keys,
1219 modify_pac_fn=modify_pac_fn)
1221 def set_ticket_forwardable(self, ticket, flag, update_pac_checksums=True):
1222 flag = '1' if flag else '0'
1224 def modify_fn(enc_part):
1225 # Reset the forwardable flag
1226 forwardable_pos = (len(tuple(krb5_asn1.TicketFlags('forwardable')))
1227 - 1)
1229 flags = enc_part['flags']
1230 self.assertLessEqual(forwardable_pos, len(flags))
1231 enc_part['flags'] = (flags[:forwardable_pos] +
1232 flag +
1233 flags[forwardable_pos+1:])
1235 return enc_part
1237 if update_pac_checksums:
1238 checksum_keys = self.get_krbtgt_checksum_key()
1239 else:
1240 checksum_keys = None
1242 return self.modified_ticket(ticket,
1243 modify_fn=modify_fn,
1244 checksum_keys=checksum_keys,
1245 update_pac_checksums=update_pac_checksums)
1247 def remove_ticket_pac(self, ticket):
1248 return self.modified_ticket(ticket,
1249 exclude_pac=True)
1252 if __name__ == "__main__":
1253 global_asn1_print = False
1254 global_hexdump = False
1255 import unittest
1256 unittest.main()