CVE-2020-25719 tests/krb5: Add _modify_tgt() method for modifying already obtained...
[Samba.git] / python / samba / tests / krb5 / kdc_tgs_tests.py
blob52a347b9ed44c06da5f1656f458926f2a099a783
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) 2020 Catalyst.Net Ltd
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 import sys
21 import os
23 import ldb
26 from samba import dsdb, ntstatus
28 from samba.dcerpc import krb5pac, security
30 sys.path.insert(0, "bin/python")
31 os.environ["PYTHONUNBUFFERED"] = "1"
33 import samba.tests.krb5.kcrypto as kcrypto
34 from samba.tests.krb5.kdc_base_test import KDCBaseTest
35 from samba.tests.krb5.rfc4120_constants import (
36 AES256_CTS_HMAC_SHA1_96,
37 ARCFOUR_HMAC_MD5,
38 KRB_ERROR,
39 KRB_TGS_REP,
40 KDC_ERR_BADMATCH,
41 KDC_ERR_BADOPTION,
42 KDC_ERR_CLIENT_NAME_MISMATCH,
43 KDC_ERR_GENERIC,
44 KDC_ERR_MODIFIED,
45 KDC_ERR_POLICY,
46 KDC_ERR_S_PRINCIPAL_UNKNOWN,
47 KDC_ERR_TGT_REVOKED,
48 NT_PRINCIPAL,
49 NT_SRV_INST,
51 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
53 global_asn1_print = False
54 global_hexdump = False
57 class KdcTgsTests(KDCBaseTest):
59 def setUp(self):
60 super().setUp()
61 self.do_asn1_print = global_asn1_print
62 self.do_hexdump = global_hexdump
64 def test_tgs_req_cname_does_not_not_match_authenticator_cname(self):
65 ''' Try and obtain a ticket from the TGS, but supply a cname
66 that differs from that provided to the krbtgt
67 '''
68 # Create the user account
69 samdb = self.get_samdb()
70 user_name = "tsttktusr"
71 (uc, _) = self.create_account(samdb, user_name)
72 realm = uc.get_realm().lower()
74 # Do the initial AS-REQ, should get a pre-authentication required
75 # response
76 etype = (AES256_CTS_HMAC_SHA1_96,)
77 cname = self.PrincipalName_create(
78 name_type=NT_PRINCIPAL, names=[user_name])
79 sname = self.PrincipalName_create(
80 name_type=NT_SRV_INST, names=["krbtgt", realm])
82 rep = self.as_req(cname, sname, realm, etype)
83 self.check_pre_authentication(rep)
85 # Do the next AS-REQ
86 padata = self.get_enc_timestamp_pa_data(uc, rep)
87 key = self.get_as_rep_key(uc, rep)
88 rep = self.as_req(cname, sname, realm, etype, padata=[padata])
89 self.check_as_reply(rep)
91 # Request a service ticket, but use a cname that does not match
92 # that in the original AS-REQ
93 enc_part2 = self.get_as_rep_enc_data(key, rep)
94 key = self.EncryptionKey_import(enc_part2['key'])
95 ticket = rep['ticket']
97 cname = self.PrincipalName_create(
98 name_type=NT_PRINCIPAL,
99 names=["Administrator"])
100 sname = self.PrincipalName_create(
101 name_type=NT_PRINCIPAL,
102 names=["host", samdb.host_dns_name()])
104 (rep, enc_part) = self.tgs_req(cname, sname, realm, ticket, key, etype,
105 expected_error_mode=KDC_ERR_BADMATCH,
106 expect_edata=False)
108 self.assertIsNone(
109 enc_part,
110 "rep = {%s}, enc_part = {%s}" % (rep, enc_part))
111 self.assertEqual(KRB_ERROR, rep['msg-type'], "rep = {%s}" % rep)
112 self.assertEqual(
113 KDC_ERR_BADMATCH,
114 rep['error-code'],
115 "rep = {%s}" % rep)
117 def test_ldap_service_ticket(self):
118 '''Get a ticket to the ldap service
120 # Create the user account
121 samdb = self.get_samdb()
122 user_name = "tsttktusr"
123 (uc, _) = self.create_account(samdb, user_name)
124 realm = uc.get_realm().lower()
126 # Do the initial AS-REQ, should get a pre-authentication required
127 # response
128 etype = (AES256_CTS_HMAC_SHA1_96,)
129 cname = self.PrincipalName_create(
130 name_type=NT_PRINCIPAL, names=[user_name])
131 sname = self.PrincipalName_create(
132 name_type=NT_SRV_INST, names=["krbtgt", realm])
134 rep = self.as_req(cname, sname, realm, etype)
135 self.check_pre_authentication(rep)
137 # Do the next AS-REQ
138 padata = self.get_enc_timestamp_pa_data(uc, rep)
139 key = self.get_as_rep_key(uc, rep)
140 rep = self.as_req(cname, sname, realm, etype, padata=[padata])
141 self.check_as_reply(rep)
143 enc_part2 = self.get_as_rep_enc_data(key, rep)
144 key = self.EncryptionKey_import(enc_part2['key'])
145 ticket = rep['ticket']
147 # Request a ticket to the ldap service
148 sname = self.PrincipalName_create(
149 name_type=NT_SRV_INST,
150 names=["ldap", samdb.host_dns_name()])
152 (rep, _) = self.tgs_req(
153 cname, sname, uc.get_realm(), ticket, key, etype,
154 service_creds=self.get_dc_creds())
156 self.check_tgs_reply(rep)
158 def test_get_ticket_for_host_service_of_machine_account(self):
160 # Create a user and machine account for the test.
162 samdb = self.get_samdb()
163 user_name = "tsttktusr"
164 (uc, dn) = self.create_account(samdb, user_name)
165 (mc, _) = self.create_account(samdb, "tsttktmac",
166 account_type=self.AccountType.COMPUTER)
167 realm = uc.get_realm().lower()
169 # Do the initial AS-REQ, should get a pre-authentication required
170 # response
171 etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
172 cname = self.PrincipalName_create(
173 name_type=NT_PRINCIPAL, names=[user_name])
174 sname = self.PrincipalName_create(
175 name_type=NT_SRV_INST, names=["krbtgt", realm])
177 rep = self.as_req(cname, sname, realm, etype)
178 self.check_pre_authentication(rep)
180 # Do the next AS-REQ
181 padata = self.get_enc_timestamp_pa_data(uc, rep)
182 key = self.get_as_rep_key(uc, rep)
183 rep = self.as_req(cname, sname, realm, etype, padata=[padata])
184 self.check_as_reply(rep)
186 # Request a ticket to the host service on the machine account
187 ticket = rep['ticket']
188 enc_part2 = self.get_as_rep_enc_data(key, rep)
189 key = self.EncryptionKey_import(enc_part2['key'])
190 cname = self.PrincipalName_create(
191 name_type=NT_PRINCIPAL,
192 names=[user_name])
193 sname = self.PrincipalName_create(
194 name_type=NT_PRINCIPAL,
195 names=[mc.get_username()])
197 (rep, enc_part) = self.tgs_req(
198 cname, sname, uc.get_realm(), ticket, key, etype,
199 service_creds=mc)
200 self.check_tgs_reply(rep)
202 # Check the contents of the service ticket
203 ticket = rep['ticket']
204 enc_part = self.decode_service_ticket(mc, ticket)
206 pac_data = self.get_pac_data(enc_part['authorization-data'])
207 sid = self.get_objectSid(samdb, dn)
208 upn = "%s@%s" % (uc.get_username(), realm)
209 self.assertEqual(
210 uc.get_username(),
211 str(pac_data.account_name),
212 "rep = {%s},%s" % (rep, pac_data))
213 self.assertEqual(
214 uc.get_username(),
215 pac_data.logon_name,
216 "rep = {%s},%s" % (rep, pac_data))
217 self.assertEqual(
218 uc.get_realm(),
219 pac_data.domain_name,
220 "rep = {%s},%s" % (rep, pac_data))
221 self.assertEqual(
222 upn,
223 pac_data.upn,
224 "rep = {%s},%s" % (rep, pac_data))
225 self.assertEqual(
226 sid,
227 pac_data.account_sid,
228 "rep = {%s},%s" % (rep, pac_data))
230 def _make_tgs_request(self, client_creds, service_creds, tgt,
231 pac_request=None, expect_pac=True,
232 expect_error=False,
233 expected_account_name=None,
234 expected_upn_name=None,
235 expected_sid=None):
236 client_account = client_creds.get_username()
237 cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
238 names=[client_account])
240 service_account = service_creds.get_username()
241 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
242 names=[service_account])
244 realm = service_creds.get_realm()
246 expected_crealm = realm
247 expected_cname = cname
248 expected_srealm = realm
249 expected_sname = sname
251 expected_supported_etypes = service_creds.tgs_supported_enctypes
253 etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
255 kdc_options = str(krb5_asn1.KDCOptions('canonicalize'))
257 target_decryption_key = self.TicketDecryptionKey_from_creds(
258 service_creds)
260 authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256)
262 if expect_error:
263 expected_error_mode = KDC_ERR_BADOPTION
264 check_error_fn = self.generic_check_kdc_error
265 check_rep_fn = None
266 else:
267 expected_error_mode = 0
268 check_error_fn = None
269 check_rep_fn = self.generic_check_kdc_rep
271 kdc_exchange_dict = self.tgs_exchange_dict(
272 expected_crealm=expected_crealm,
273 expected_cname=expected_cname,
274 expected_srealm=expected_srealm,
275 expected_sname=expected_sname,
276 expected_account_name=expected_account_name,
277 expected_upn_name=expected_upn_name,
278 expected_sid=expected_sid,
279 expected_supported_etypes=expected_supported_etypes,
280 ticket_decryption_key=target_decryption_key,
281 check_error_fn=check_error_fn,
282 check_rep_fn=check_rep_fn,
283 check_kdc_private_fn=self.generic_check_kdc_private,
284 expected_error_mode=expected_error_mode,
285 tgt=tgt,
286 authenticator_subkey=authenticator_subkey,
287 kdc_options=kdc_options,
288 pac_request=pac_request,
289 expect_pac=expect_pac)
291 rep = self._generic_kdc_exchange(kdc_exchange_dict,
292 cname=cname,
293 realm=realm,
294 sname=sname,
295 etypes=etypes)
296 if expect_error:
297 self.check_error_rep(rep, expected_error_mode)
299 return None
300 else:
301 self.check_reply(rep, KRB_TGS_REP)
303 return kdc_exchange_dict['rep_ticket_creds']
305 def test_request(self):
306 client_creds = self.get_client_creds()
307 service_creds = self.get_service_creds()
309 tgt = self.get_tgt(client_creds)
311 pac = self.get_ticket_pac(tgt)
312 self.assertIsNotNone(pac)
314 ticket = self._make_tgs_request(client_creds, service_creds, tgt)
316 pac = self.get_ticket_pac(ticket)
317 self.assertIsNotNone(pac)
319 def test_request_no_pac(self):
320 client_creds = self.get_client_creds()
321 service_creds = self.get_service_creds()
323 tgt = self.get_tgt(client_creds, pac_request=False)
325 pac = self.get_ticket_pac(tgt)
326 self.assertIsNotNone(pac)
328 ticket = self._make_tgs_request(client_creds, service_creds, tgt,
329 pac_request=False, expect_pac=False)
331 pac = self.get_ticket_pac(ticket, expect_pac=False)
332 self.assertIsNone(pac)
334 def test_client_no_auth_data_required(self):
335 client_creds = self.get_cached_creds(
336 account_type=self.AccountType.USER,
337 opts={'no_auth_data_required': True})
338 service_creds = self.get_service_creds()
340 tgt = self.get_tgt(client_creds)
342 pac = self.get_ticket_pac(tgt)
343 self.assertIsNotNone(pac)
345 ticket = self._make_tgs_request(client_creds, service_creds, tgt)
347 pac = self.get_ticket_pac(ticket)
348 self.assertIsNotNone(pac)
350 def test_no_pac_client_no_auth_data_required(self):
351 client_creds = self.get_cached_creds(
352 account_type=self.AccountType.USER,
353 opts={'no_auth_data_required': True})
354 service_creds = self.get_service_creds()
356 tgt = self.get_tgt(client_creds)
358 pac = self.get_ticket_pac(tgt)
359 self.assertIsNotNone(pac)
361 ticket = self._make_tgs_request(client_creds, service_creds, tgt,
362 pac_request=False, expect_pac=True)
364 pac = self.get_ticket_pac(ticket)
365 self.assertIsNotNone(pac)
367 def test_service_no_auth_data_required(self):
368 client_creds = self.get_client_creds()
369 service_creds = self.get_cached_creds(
370 account_type=self.AccountType.COMPUTER,
371 opts={'no_auth_data_required': True})
373 tgt = self.get_tgt(client_creds)
375 pac = self.get_ticket_pac(tgt)
376 self.assertIsNotNone(pac)
378 ticket = self._make_tgs_request(client_creds, service_creds, tgt,
379 expect_pac=False)
381 pac = self.get_ticket_pac(ticket, expect_pac=False)
382 self.assertIsNone(pac)
384 def test_no_pac_service_no_auth_data_required(self):
385 client_creds = self.get_client_creds()
386 service_creds = self.get_cached_creds(
387 account_type=self.AccountType.COMPUTER,
388 opts={'no_auth_data_required': True})
390 tgt = self.get_tgt(client_creds, pac_request=False)
392 pac = self.get_ticket_pac(tgt)
393 self.assertIsNotNone(pac)
395 ticket = self._make_tgs_request(client_creds, service_creds, tgt,
396 pac_request=False, expect_pac=False)
398 pac = self.get_ticket_pac(ticket, expect_pac=False)
399 self.assertIsNone(pac)
401 def test_remove_pac_service_no_auth_data_required(self):
402 client_creds = self.get_client_creds()
403 service_creds = self.get_cached_creds(
404 account_type=self.AccountType.COMPUTER,
405 opts={'no_auth_data_required': True})
407 tgt = self.modified_ticket(self.get_tgt(client_creds),
408 exclude_pac=True)
410 pac = self.get_ticket_pac(tgt, expect_pac=False)
411 self.assertIsNone(pac)
413 self._make_tgs_request(client_creds, service_creds, tgt,
414 expect_pac=False, expect_error=True)
416 def test_remove_pac_client_no_auth_data_required(self):
417 client_creds = self.get_cached_creds(
418 account_type=self.AccountType.USER,
419 opts={'no_auth_data_required': True})
420 service_creds = self.get_service_creds()
422 tgt = self.modified_ticket(self.get_tgt(client_creds),
423 exclude_pac=True)
425 pac = self.get_ticket_pac(tgt, expect_pac=False)
426 self.assertIsNone(pac)
428 self._make_tgs_request(client_creds, service_creds, tgt,
429 expect_pac=False, expect_error=True)
431 def test_remove_pac(self):
432 client_creds = self.get_client_creds()
433 service_creds = self.get_service_creds()
435 tgt = self.modified_ticket(self.get_tgt(client_creds),
436 exclude_pac=True)
438 pac = self.get_ticket_pac(tgt, expect_pac=False)
439 self.assertIsNone(pac)
441 self._make_tgs_request(client_creds, service_creds, tgt,
442 expect_pac=False, expect_error=True)
444 def test_upn_dns_info_ex_user(self):
445 client_creds = self.get_client_creds()
446 self._run_upn_dns_info_ex_test(client_creds)
448 def test_upn_dns_info_ex_mac(self):
449 mach_creds = self.get_mach_creds()
450 self._run_upn_dns_info_ex_test(mach_creds)
452 def test_upn_dns_info_ex_upn_user(self):
453 client_creds = self.get_cached_creds(
454 account_type=self.AccountType.USER,
455 opts={'upn': 'upn_dns_info_test_upn0@bar'})
456 self._run_upn_dns_info_ex_test(client_creds)
458 def test_upn_dns_info_ex_upn_mac(self):
459 mach_creds = self.get_cached_creds(
460 account_type=self.AccountType.COMPUTER,
461 opts={'upn': 'upn_dns_info_test_upn1@bar'})
462 self._run_upn_dns_info_ex_test(mach_creds)
464 def _run_upn_dns_info_ex_test(self, client_creds):
465 service_creds = self.get_service_creds()
467 samdb = self.get_samdb()
468 dn = client_creds.get_dn()
470 account_name = client_creds.get_username()
471 upn_name = client_creds.get_upn()
472 if upn_name is None:
473 realm = client_creds.get_realm().lower()
474 upn_name = f'{account_name}@{realm}'
475 sid = self.get_objectSid(samdb, dn)
477 tgt = self.get_tgt(client_creds,
478 expected_account_name=account_name,
479 expected_upn_name=upn_name,
480 expected_sid=sid)
482 self._make_tgs_request(client_creds, service_creds, tgt,
483 expected_account_name=account_name,
484 expected_upn_name=upn_name,
485 expected_sid=sid)
487 # Test making a TGS request.
488 def test_tgs_req(self):
489 creds = self._get_creds()
490 tgt = self._get_tgt(creds)
491 self._run_tgs(tgt, expected_error=0)
493 def test_renew_req(self):
494 creds = self._get_creds()
495 tgt = self._get_tgt(creds, renewable=True)
496 self._renew_tgt(tgt, expected_error=0)
498 def test_validate_req(self):
499 creds = self._get_creds()
500 tgt = self._get_tgt(creds, invalid=True)
501 self._validate_tgt(tgt, expected_error=0)
503 def test_s4u2self_req(self):
504 creds = self._get_creds()
505 tgt = self._get_tgt(creds)
506 self._s4u2self(tgt, creds, expected_error=0)
508 def test_user2user_req(self):
509 creds = self._get_creds()
510 tgt = self._get_tgt(creds)
511 self._user2user(tgt, creds, expected_error=0)
513 # Test making a request without a PAC.
514 def test_tgs_no_pac(self):
515 creds = self._get_creds()
516 tgt = self._get_tgt(creds, remove_pac=True)
517 self._run_tgs(tgt, expected_error=KDC_ERR_BADOPTION)
519 def test_renew_no_pac(self):
520 creds = self._get_creds()
521 tgt = self._get_tgt(creds, renewable=True, remove_pac=True)
522 self._renew_tgt(tgt, expected_error=KDC_ERR_BADOPTION)
524 def test_validate_no_pac(self):
525 creds = self._get_creds()
526 tgt = self._get_tgt(creds, invalid=True, remove_pac=True)
527 self._validate_tgt(tgt, expected_error=KDC_ERR_BADOPTION)
529 def test_s4u2self_no_pac(self):
530 creds = self._get_creds()
531 tgt = self._get_tgt(creds, remove_pac=True)
532 self._s4u2self(tgt, creds,
533 expected_error=(KDC_ERR_GENERIC, KDC_ERR_BADOPTION),
534 expected_status=ntstatus.NT_STATUS_INVALID_PARAMETER,
535 expect_edata=True)
537 def test_user2user_no_pac(self):
538 creds = self._get_creds()
539 tgt = self._get_tgt(creds, remove_pac=True)
540 self._user2user(tgt, creds, expected_error=KDC_ERR_BADOPTION)
542 # Test making a request with authdata and without a PAC.
543 def test_tgs_authdata_no_pac(self):
544 creds = self._get_creds()
545 tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
546 self._run_tgs(tgt, expected_error=KDC_ERR_BADOPTION)
548 def test_renew_authdata_no_pac(self):
549 creds = self._get_creds()
550 tgt = self._get_tgt(creds, renewable=True, remove_pac=True,
551 allow_empty_authdata=True)
552 self._renew_tgt(tgt, expected_error=KDC_ERR_BADOPTION)
554 def test_validate_authdata_no_pac(self):
555 creds = self._get_creds()
556 tgt = self._get_tgt(creds, invalid=True, remove_pac=True,
557 allow_empty_authdata=True)
558 self._validate_tgt(tgt, expected_error=KDC_ERR_BADOPTION)
560 def test_s4u2self_authdata_no_pac(self):
561 creds = self._get_creds()
562 tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
563 self._s4u2self(tgt, creds,
564 expected_error=(KDC_ERR_GENERIC, KDC_ERR_BADOPTION),
565 expected_status=ntstatus.NT_STATUS_INVALID_PARAMETER,
566 expect_edata=True)
568 def test_user2user_authdata_no_pac(self):
569 creds = self._get_creds()
570 tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
571 self._user2user(tgt, creds, expected_error=KDC_ERR_BADOPTION)
573 # Test changing the SID in the PAC to that of another account.
574 def test_tgs_sid_mismatch_existing(self):
575 creds = self._get_creds()
576 existing_rid = self._get_existing_rid()
577 tgt = self._get_tgt(creds, new_rid=existing_rid)
578 self._run_tgs(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
580 def test_renew_sid_mismatch_existing(self):
581 creds = self._get_creds()
582 existing_rid = self._get_existing_rid()
583 tgt = self._get_tgt(creds, renewable=True, new_rid=existing_rid)
584 self._renew_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
586 def test_validate_sid_mismatch_existing(self):
587 creds = self._get_creds()
588 existing_rid = self._get_existing_rid()
589 tgt = self._get_tgt(creds, invalid=True, new_rid=existing_rid)
590 self._validate_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
592 def test_s4u2self_sid_mismatch_existing(self):
593 creds = self._get_creds()
594 existing_rid = self._get_existing_rid()
595 tgt = self._get_tgt(creds, new_rid=existing_rid)
596 self._s4u2self(tgt, creds,
597 expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
599 def test_user2user_sid_mismatch_existing(self):
600 creds = self._get_creds()
601 existing_rid = self._get_existing_rid()
602 tgt = self._get_tgt(creds, new_rid=existing_rid)
603 self._user2user(tgt, creds,
604 expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
606 # Test changing the SID in the PAC to a non-existent one.
607 def test_tgs_sid_mismatch_nonexisting(self):
608 creds = self._get_creds()
609 nonexistent_rid = self._get_non_existent_rid()
610 tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
611 self._run_tgs(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
613 def test_renew_sid_mismatch_nonexisting(self):
614 creds = self._get_creds()
615 nonexistent_rid = self._get_non_existent_rid()
616 tgt = self._get_tgt(creds, renewable=True,
617 new_rid=nonexistent_rid)
618 self._renew_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
620 def test_validate_sid_mismatch_nonexisting(self):
621 creds = self._get_creds()
622 nonexistent_rid = self._get_non_existent_rid()
623 tgt = self._get_tgt(creds, invalid=True,
624 new_rid=nonexistent_rid)
625 self._validate_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
627 def test_s4u2self_sid_mismatch_nonexisting(self):
628 creds = self._get_creds()
629 nonexistent_rid = self._get_non_existent_rid()
630 tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
631 self._s4u2self(tgt, creds,
632 expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
634 def test_user2user_sid_mismatch_nonexisting(self):
635 creds = self._get_creds()
636 nonexistent_rid = self._get_non_existent_rid()
637 tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
638 self._user2user(tgt, creds,
639 expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
641 # Test with an RODC-issued ticket where the client is revealed to the RODC.
642 def test_tgs_rodc_revealed(self):
643 creds = self._get_creds(replication_allowed=True,
644 revealed_to_rodc=True)
645 tgt = self._get_tgt(creds, from_rodc=True)
646 self._run_tgs(tgt, expected_error=0)
648 def test_renew_rodc_revealed(self):
649 creds = self._get_creds(replication_allowed=True,
650 revealed_to_rodc=True)
651 tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
652 self._renew_tgt(tgt, expected_error=0)
654 def test_validate_rodc_revealed(self):
655 creds = self._get_creds(replication_allowed=True,
656 revealed_to_rodc=True)
657 tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
658 self._validate_tgt(tgt, expected_error=0)
660 def test_s4u2self_rodc_revealed(self):
661 creds = self._get_creds(replication_allowed=True,
662 revealed_to_rodc=True)
663 tgt = self._get_tgt(creds, from_rodc=True)
664 self._s4u2self(tgt, creds, expected_error=0)
666 def test_user2user_rodc_revealed(self):
667 creds = self._get_creds(replication_allowed=True,
668 revealed_to_rodc=True)
669 tgt = self._get_tgt(creds, from_rodc=True)
670 self._user2user(tgt, creds, expected_error=0)
672 # Test with an RODC-issued ticket where the SID in the PAC is changed to
673 # that of another account.
674 def test_tgs_rodc_sid_mismatch_existing(self):
675 creds = self._get_creds(replication_allowed=True,
676 revealed_to_rodc=True)
677 existing_rid = self._get_existing_rid(replication_allowed=True,
678 revealed_to_rodc=True)
679 tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
680 self._run_tgs(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
682 def test_renew_rodc_sid_mismatch_existing(self):
683 creds = self._get_creds(replication_allowed=True,
684 revealed_to_rodc=True)
685 existing_rid = self._get_existing_rid(replication_allowed=True,
686 revealed_to_rodc=True)
687 tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
688 new_rid=existing_rid)
689 self._renew_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
691 def test_validate_rodc_sid_mismatch_existing(self):
692 creds = self._get_creds(replication_allowed=True,
693 revealed_to_rodc=True)
694 existing_rid = self._get_existing_rid(replication_allowed=True,
695 revealed_to_rodc=True)
696 tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
697 new_rid=existing_rid)
698 self._validate_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
700 def test_s4u2self_rodc_sid_mismatch_existing(self):
701 creds = self._get_creds(replication_allowed=True,
702 revealed_to_rodc=True)
703 existing_rid = self._get_existing_rid(replication_allowed=True,
704 revealed_to_rodc=True)
705 tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
706 self._s4u2self(tgt, creds, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
708 def test_user2user_rodc_sid_mismatch_existing(self):
709 creds = self._get_creds(replication_allowed=True,
710 revealed_to_rodc=True)
711 existing_rid = self._get_existing_rid(replication_allowed=True,
712 revealed_to_rodc=True)
713 tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
714 self._user2user(tgt, creds,
715 expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
717 # Test with an RODC-issued ticket where the SID in the PAC is changed to a
718 # non-existent one.
719 def test_tgs_rodc_sid_mismatch_nonexisting(self):
720 creds = self._get_creds(replication_allowed=True,
721 revealed_to_rodc=True)
722 nonexistent_rid = self._get_non_existent_rid()
723 tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
724 self._run_tgs(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
726 def test_renew_rodc_sid_mismatch_nonexisting(self):
727 creds = self._get_creds(replication_allowed=True,
728 revealed_to_rodc=True)
729 nonexistent_rid = self._get_non_existent_rid()
730 tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
731 new_rid=nonexistent_rid)
732 self._renew_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
734 def test_validate_rodc_sid_mismatch_nonexisting(self):
735 creds = self._get_creds(replication_allowed=True,
736 revealed_to_rodc=True)
737 nonexistent_rid = self._get_non_existent_rid()
738 tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
739 new_rid=nonexistent_rid)
740 self._validate_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
742 def test_s4u2self_rodc_sid_mismatch_nonexisting(self):
743 creds = self._get_creds(replication_allowed=True,
744 revealed_to_rodc=True)
745 nonexistent_rid = self._get_non_existent_rid()
746 tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
747 self._s4u2self(tgt, creds, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
749 def test_user2user_rodc_sid_mismatch_nonexisting(self):
750 creds = self._get_creds(replication_allowed=True,
751 revealed_to_rodc=True)
752 nonexistent_rid = self._get_non_existent_rid()
753 tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
754 self._user2user(tgt, creds,
755 expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
757 # Test with an RODC-issued ticket where the client is not revealed to the
758 # RODC.
759 def test_tgs_rodc_not_revealed(self):
760 creds = self._get_creds(replication_allowed=True)
761 tgt = self._get_tgt(creds, from_rodc=True)
762 # TODO: error code
763 self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
765 def test_renew_rodc_not_revealed(self):
766 creds = self._get_creds(replication_allowed=True)
767 tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
768 self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
770 def test_validate_rodc_not_revealed(self):
771 creds = self._get_creds(replication_allowed=True)
772 tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
773 self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
775 def test_s4u2self_rodc_not_revealed(self):
776 creds = self._get_creds(replication_allowed=True)
777 tgt = self._get_tgt(creds, from_rodc=True)
778 self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
780 def test_user2user_rodc_not_revealed(self):
781 creds = self._get_creds(replication_allowed=True)
782 tgt = self._get_tgt(creds, from_rodc=True)
783 self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
785 # Test with an RODC-issued ticket where the RODC account does not have the
786 # PARTIAL_SECRETS bit set.
787 def test_tgs_rodc_no_partial_secrets(self):
788 creds = self._get_creds(replication_allowed=True,
789 revealed_to_rodc=True)
790 tgt = self._get_tgt(creds, from_rodc=True)
791 self._remove_rodc_partial_secrets()
792 self._run_tgs(tgt, expected_error=KDC_ERR_POLICY)
794 def test_renew_rodc_no_partial_secrets(self):
795 creds = self._get_creds(replication_allowed=True,
796 revealed_to_rodc=True)
797 tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
798 self._remove_rodc_partial_secrets()
799 self._renew_tgt(tgt, expected_error=KDC_ERR_POLICY)
801 def test_validate_rodc_no_partial_secrets(self):
802 creds = self._get_creds(replication_allowed=True,
803 revealed_to_rodc=True)
804 tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
805 self._remove_rodc_partial_secrets()
806 self._validate_tgt(tgt, expected_error=KDC_ERR_POLICY)
808 def test_s4u2self_rodc_no_partial_secrets(self):
809 creds = self._get_creds(replication_allowed=True,
810 revealed_to_rodc=True)
811 tgt = self._get_tgt(creds, from_rodc=True)
812 self._remove_rodc_partial_secrets()
813 self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
815 def test_user2user_rodc_no_partial_secrets(self):
816 creds = self._get_creds(replication_allowed=True,
817 revealed_to_rodc=True)
818 tgt = self._get_tgt(creds, from_rodc=True)
819 self._remove_rodc_partial_secrets()
820 self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
822 # Test with an RODC-issued ticket where the RODC account does not have an
823 # msDS-KrbTgtLink.
824 def test_tgs_rodc_no_krbtgt_link(self):
825 creds = self._get_creds(replication_allowed=True,
826 revealed_to_rodc=True)
827 tgt = self._get_tgt(creds, from_rodc=True)
828 self._remove_rodc_krbtgt_link()
829 self._run_tgs(tgt, expected_error=KDC_ERR_POLICY)
831 def test_renew_rodc_no_krbtgt_link(self):
832 creds = self._get_creds(replication_allowed=True,
833 revealed_to_rodc=True)
834 tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
835 self._remove_rodc_krbtgt_link()
836 self._renew_tgt(tgt, expected_error=KDC_ERR_POLICY)
838 def test_validate_rodc_no_krbtgt_link(self):
839 creds = self._get_creds(replication_allowed=True,
840 revealed_to_rodc=True)
841 tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
842 self._remove_rodc_krbtgt_link()
843 self._validate_tgt(tgt, expected_error=KDC_ERR_POLICY)
845 def test_s4u2self_rodc_no_krbtgt_link(self):
846 creds = self._get_creds(replication_allowed=True,
847 revealed_to_rodc=True)
848 tgt = self._get_tgt(creds, from_rodc=True)
849 self._remove_rodc_krbtgt_link()
850 self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
852 def test_user2user_rodc_no_krbtgt_link(self):
853 creds = self._get_creds(replication_allowed=True,
854 revealed_to_rodc=True)
855 tgt = self._get_tgt(creds, from_rodc=True)
856 self._remove_rodc_krbtgt_link()
857 self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
859 # Test with an RODC-issued ticket where the client is not allowed to
860 # replicate to the RODC.
861 def test_tgs_rodc_not_allowed(self):
862 creds = self._get_creds(revealed_to_rodc=True)
863 tgt = self._get_tgt(creds, from_rodc=True)
864 self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
866 def test_renew_rodc_not_allowed(self):
867 creds = self._get_creds(revealed_to_rodc=True)
868 tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
869 self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
871 def test_validate_rodc_not_allowed(self):
872 creds = self._get_creds(revealed_to_rodc=True)
873 tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
874 self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
876 def test_s4u2self_rodc_not_allowed(self):
877 creds = self._get_creds(revealed_to_rodc=True)
878 tgt = self._get_tgt(creds, from_rodc=True)
879 self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
881 def test_user2user_rodc_not_allowed(self):
882 creds = self._get_creds(revealed_to_rodc=True)
883 tgt = self._get_tgt(creds, from_rodc=True)
884 self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
886 # Test with an RODC-issued ticket where the client is denied from
887 # replicating to the RODC.
888 def test_tgs_rodc_denied(self):
889 creds = self._get_creds(replication_denied=True,
890 revealed_to_rodc=True)
891 tgt = self._get_tgt(creds, from_rodc=True)
892 self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
894 def test_renew_rodc_denied(self):
895 creds = self._get_creds(replication_denied=True,
896 revealed_to_rodc=True)
897 tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
898 self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
900 def test_validate_rodc_denied(self):
901 creds = self._get_creds(replication_denied=True,
902 revealed_to_rodc=True)
903 tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
904 self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
906 def test_s4u2self_rodc_denied(self):
907 creds = self._get_creds(replication_denied=True,
908 revealed_to_rodc=True)
909 tgt = self._get_tgt(creds, from_rodc=True)
910 self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
912 def test_user2user_rodc_denied(self):
913 creds = self._get_creds(replication_denied=True,
914 revealed_to_rodc=True)
915 tgt = self._get_tgt(creds, from_rodc=True)
916 self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
918 # Test with an RODC-issued ticket where the client is both allowed and
919 # denied replicating to the RODC.
920 def test_tgs_rodc_allowed_denied(self):
921 creds = self._get_creds(replication_allowed=True,
922 replication_denied=True,
923 revealed_to_rodc=True)
924 tgt = self._get_tgt(creds, from_rodc=True)
925 self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
927 def test_renew_rodc_allowed_denied(self):
928 creds = self._get_creds(replication_allowed=True,
929 replication_denied=True,
930 revealed_to_rodc=True)
931 tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
932 self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
934 def test_validate_rodc_allowed_denied(self):
935 creds = self._get_creds(replication_allowed=True,
936 replication_denied=True,
937 revealed_to_rodc=True)
938 tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
939 self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
941 def test_s4u2self_rodc_allowed_denied(self):
942 creds = self._get_creds(replication_allowed=True,
943 replication_denied=True,
944 revealed_to_rodc=True)
945 tgt = self._get_tgt(creds, from_rodc=True)
946 self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
948 def test_user2user_rodc_allowed_denied(self):
949 creds = self._get_creds(replication_allowed=True,
950 replication_denied=True,
951 revealed_to_rodc=True)
952 tgt = self._get_tgt(creds, from_rodc=True)
953 self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
955 # Test user-to-user with incorrect service principal names.
956 def test_user2user_matching_sname_host(self):
957 creds = self._get_creds()
958 tgt = self._get_tgt(creds)
960 user_name = creds.get_username()
961 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
962 names=['host', user_name])
964 self._user2user(tgt, creds, sname=sname,
965 expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
967 def test_user2user_matching_sname_no_host(self):
968 creds = self._get_creds()
969 tgt = self._get_tgt(creds)
971 user_name = creds.get_username()
972 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
973 names=[user_name])
975 self._user2user(tgt, creds, sname=sname, expected_error=0)
977 def test_user2user_wrong_sname(self):
978 creds = self._get_creds()
979 tgt = self._get_tgt(creds)
981 other_creds = self._get_mach_creds()
982 user_name = other_creds.get_username()
983 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
984 names=[user_name])
986 self._user2user(tgt, creds, sname=sname,
987 expected_error=(KDC_ERR_BADMATCH,
988 KDC_ERR_BADOPTION))
990 def test_user2user_non_existent_sname(self):
991 creds = self._get_creds()
992 tgt = self._get_tgt(creds)
994 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
995 names=['host', 'non_existent_user'])
997 self._user2user(tgt, creds, sname=sname,
998 expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
1000 def test_user2user_service_ticket(self):
1001 creds = self._get_creds()
1002 tgt = self._get_tgt(creds)
1004 service_creds = self.get_service_creds()
1005 service_ticket = self.get_service_ticket(tgt, service_creds)
1007 self._user2user(service_ticket, creds,
1008 expected_error=(KDC_ERR_MODIFIED, KDC_ERR_POLICY))
1010 def _get_tgt(self,
1011 client_creds,
1012 renewable=False,
1013 invalid=False,
1014 from_rodc=False,
1015 new_rid=None,
1016 remove_pac=False,
1017 allow_empty_authdata=False,
1018 can_modify_logon_info=True,
1019 can_modify_requester_sid=True,
1020 remove_pac_attrs=False,
1021 remove_requester_sid=False):
1022 self.assertFalse(renewable and invalid)
1024 if remove_pac:
1025 self.assertIsNone(new_rid)
1027 tgt = self.get_tgt(client_creds)
1029 return self._modify_tgt(
1030 tgt=tgt,
1031 renewable=renewable,
1032 invalid=invalid,
1033 from_rodc=from_rodc,
1034 new_rid=new_rid,
1035 remove_pac=remove_pac,
1036 allow_empty_authdata=allow_empty_authdata,
1037 can_modify_logon_info=can_modify_logon_info,
1038 can_modify_requester_sid=can_modify_requester_sid,
1039 remove_pac_attrs=remove_pac_attrs,
1040 remove_requester_sid=remove_requester_sid)
1042 def _modify_tgt(self,
1043 tgt,
1044 renewable=False,
1045 invalid=False,
1046 from_rodc=False,
1047 new_rid=None,
1048 remove_pac=False,
1049 allow_empty_authdata=False,
1050 cname=None,
1051 crealm=None,
1052 can_modify_logon_info=True,
1053 can_modify_requester_sid=True,
1054 remove_pac_attrs=False,
1055 remove_requester_sid=False):
1056 if from_rodc:
1057 krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
1058 else:
1059 krbtgt_creds = self.get_krbtgt_creds()
1061 if new_rid is not None or remove_requester_sid or remove_pac_attrs:
1062 def change_sid_fn(pac):
1063 pac_buffers = pac.buffers
1064 for pac_buffer in pac_buffers:
1065 if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
1066 if new_rid is not None and can_modify_logon_info:
1067 logon_info = pac_buffer.info.info
1069 logon_info.info3.base.rid = new_rid
1070 elif pac_buffer.type == krb5pac.PAC_TYPE_REQUESTER_SID:
1071 if remove_requester_sid:
1072 pac.num_buffers -= 1
1073 pac_buffers.remove(pac_buffer)
1074 elif new_rid is not None and can_modify_requester_sid:
1075 requester_sid = pac_buffer.info
1077 samdb = self.get_samdb()
1078 domain_sid = samdb.get_domain_sid()
1080 new_sid = f'{domain_sid}-{new_rid}'
1082 requester_sid.sid = security.dom_sid(new_sid)
1083 elif pac_buffer.type == krb5pac.PAC_TYPE_ATTRIBUTES_INFO:
1084 if remove_pac_attrs:
1085 pac.num_buffers -= 1
1086 pac_buffers.remove(pac_buffer)
1088 pac.buffers = pac_buffers
1090 return pac
1091 else:
1092 change_sid_fn = None
1094 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
1096 if remove_pac:
1097 checksum_keys = None
1098 else:
1099 checksum_keys = {
1100 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
1103 if renewable:
1104 def flags_modify_fn(enc_part):
1105 # Set the renewable flag.
1106 renewable_flag = krb5_asn1.TicketFlags('renewable')
1107 pos = len(tuple(renewable_flag)) - 1
1109 flags = enc_part['flags']
1110 self.assertLessEqual(pos, len(flags))
1112 new_flags = flags[:pos] + '1' + flags[pos + 1:]
1113 enc_part['flags'] = new_flags
1115 # Set the renew-till time to be in the future.
1116 renew_till = self.get_KerberosTime(offset=100 * 60 * 60)
1117 enc_part['renew-till'] = renew_till
1119 return enc_part
1120 elif invalid:
1121 def flags_modify_fn(enc_part):
1122 # Set the invalid flag.
1123 invalid_flag = krb5_asn1.TicketFlags('invalid')
1124 pos = len(tuple(invalid_flag)) - 1
1126 flags = enc_part['flags']
1127 self.assertLessEqual(pos, len(flags))
1129 new_flags = flags[:pos] + '1' + flags[pos + 1:]
1130 enc_part['flags'] = new_flags
1132 # Set the ticket start time to be in the past.
1133 past_time = self.get_KerberosTime(offset=-100 * 60 * 60)
1134 enc_part['starttime'] = past_time
1136 return enc_part
1137 else:
1138 flags_modify_fn = None
1140 if cname is not None or crealm is not None:
1141 def modify_fn(enc_part):
1142 if flags_modify_fn is not None:
1143 enc_part = flags_modify_fn(enc_part)
1145 if cname is not None:
1146 enc_part['cname'] = cname
1148 if crealm is not None:
1149 enc_part['crealm'] = crealm
1151 return enc_part
1152 else:
1153 modify_fn = flags_modify_fn
1155 if cname is not None:
1156 def modify_pac_fn(pac):
1157 if change_sid_fn is not None:
1158 pac = change_sid_fn(pac)
1160 for pac_buffer in pac.buffers:
1161 if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
1162 logon_info = pac_buffer.info
1164 logon_info.account_name = (
1165 cname['name-string'][0].decode('utf-8'))
1167 return pac
1168 else:
1169 modify_pac_fn = change_sid_fn
1171 return self.modified_ticket(
1172 tgt,
1173 new_ticket_key=krbtgt_key,
1174 modify_fn=modify_fn,
1175 modify_pac_fn=modify_pac_fn,
1176 exclude_pac=remove_pac,
1177 allow_empty_authdata=allow_empty_authdata,
1178 update_pac_checksums=not remove_pac,
1179 checksum_keys=checksum_keys)
1181 def _remove_rodc_partial_secrets(self):
1182 samdb = self.get_samdb()
1184 rodc_ctx = self.get_mock_rodc_ctx()
1185 rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
1187 def add_rodc_partial_secrets():
1188 msg = ldb.Message()
1189 msg.dn = rodc_dn
1190 msg['userAccountControl'] = ldb.MessageElement(
1191 str(rodc_ctx.userAccountControl),
1192 ldb.FLAG_MOD_REPLACE,
1193 'userAccountControl')
1194 samdb.modify(msg)
1196 self.addCleanup(add_rodc_partial_secrets)
1198 uac = rodc_ctx.userAccountControl & ~dsdb.UF_PARTIAL_SECRETS_ACCOUNT
1200 msg = ldb.Message()
1201 msg.dn = rodc_dn
1202 msg['userAccountControl'] = ldb.MessageElement(
1203 str(uac),
1204 ldb.FLAG_MOD_REPLACE,
1205 'userAccountControl')
1206 samdb.modify(msg)
1208 def _remove_rodc_krbtgt_link(self):
1209 samdb = self.get_samdb()
1211 rodc_ctx = self.get_mock_rodc_ctx()
1212 rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
1214 def add_rodc_krbtgt_link():
1215 msg = ldb.Message()
1216 msg.dn = rodc_dn
1217 msg['msDS-KrbTgtLink'] = ldb.MessageElement(
1218 rodc_ctx.new_krbtgt_dn,
1219 ldb.FLAG_MOD_ADD,
1220 'msDS-KrbTgtLink')
1221 samdb.modify(msg)
1223 self.addCleanup(add_rodc_krbtgt_link)
1225 msg = ldb.Message()
1226 msg.dn = rodc_dn
1227 msg['msDS-KrbTgtLink'] = ldb.MessageElement(
1229 ldb.FLAG_MOD_DELETE,
1230 'msDS-KrbTgtLink')
1231 samdb.modify(msg)
1233 def _get_creds(self,
1234 replication_allowed=False,
1235 replication_denied=False,
1236 revealed_to_rodc=False):
1237 return self.get_cached_creds(
1238 account_type=self.AccountType.COMPUTER,
1239 opts={
1240 'allowed_replication_mock': replication_allowed,
1241 'denied_replication_mock': replication_denied,
1242 'revealed_to_mock_rodc': revealed_to_rodc,
1243 'id': 0
1246 def _get_existing_rid(self,
1247 replication_allowed=False,
1248 replication_denied=False,
1249 revealed_to_rodc=False):
1250 other_creds = self.get_cached_creds(
1251 account_type=self.AccountType.COMPUTER,
1252 opts={
1253 'allowed_replication_mock': replication_allowed,
1254 'denied_replication_mock': replication_denied,
1255 'revealed_to_mock_rodc': revealed_to_rodc,
1256 'id': 1
1259 samdb = self.get_samdb()
1261 other_dn = other_creds.get_dn()
1262 other_sid = self.get_objectSid(samdb, other_dn)
1264 other_rid = int(other_sid.rsplit('-', 1)[1])
1266 return other_rid
1268 def _get_mach_creds(self):
1269 return self.get_cached_creds(
1270 account_type=self.AccountType.COMPUTER,
1271 opts={
1272 'allowed_replication_mock': True,
1273 'denied_replication_mock': False,
1274 'revealed_to_mock_rodc': True,
1275 'id': 2
1278 def _get_non_existent_rid(self):
1279 return (1 << 30) - 1
1281 def _run_tgs(self, tgt, expected_error):
1282 target_creds = self.get_service_creds()
1283 self._tgs_req(tgt, expected_error, target_creds)
1285 def _renew_tgt(self, tgt, expected_error):
1286 krbtgt_creds = self.get_krbtgt_creds()
1287 kdc_options = str(krb5_asn1.KDCOptions('renew'))
1288 self._tgs_req(tgt, expected_error, krbtgt_creds,
1289 kdc_options=kdc_options)
1291 def _validate_tgt(self, tgt, expected_error):
1292 krbtgt_creds = self.get_krbtgt_creds()
1293 kdc_options = str(krb5_asn1.KDCOptions('validate'))
1294 self._tgs_req(tgt, expected_error, krbtgt_creds,
1295 kdc_options=kdc_options)
1297 def _s4u2self(self, tgt, tgt_creds, expected_error,
1298 expect_edata=False, expected_status=None):
1299 user_creds = self._get_mach_creds()
1301 user_name = user_creds.get_username()
1302 user_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1303 names=[user_name])
1304 user_realm = user_creds.get_realm()
1306 def generate_s4u2self_padata(_kdc_exchange_dict,
1307 _callback_dict,
1308 req_body):
1309 padata = self.PA_S4U2Self_create(
1310 name=user_cname,
1311 realm=user_realm,
1312 tgt_session_key=tgt.session_key,
1313 ctype=None)
1315 return [padata], req_body
1317 return self._tgs_req(tgt, expected_error, tgt_creds,
1318 expected_cname=user_cname,
1319 generate_padata_fn=generate_s4u2self_padata,
1320 expect_claims=False, expect_edata=expect_edata,
1321 expected_status=expected_status)
1323 def _user2user(self, tgt, tgt_creds, expected_error, sname=None):
1324 user_creds = self._get_mach_creds()
1325 user_tgt = self.get_tgt(user_creds)
1327 kdc_options = str(krb5_asn1.KDCOptions('enc-tkt-in-skey'))
1328 self._tgs_req(user_tgt, expected_error, tgt_creds,
1329 kdc_options=kdc_options,
1330 additional_ticket=tgt,
1331 sname=sname)
1333 def _tgs_req(self, tgt, expected_error, target_creds,
1334 kdc_options='0',
1335 expected_cname=None,
1336 additional_ticket=None,
1337 generate_padata_fn=None,
1338 sname=None,
1339 expect_claims=True,
1340 expect_edata=False,
1341 expected_status=None):
1342 srealm = target_creds.get_realm()
1344 if sname is None:
1345 target_name = target_creds.get_username()
1346 if target_name == 'krbtgt':
1347 sname = self.PrincipalName_create(name_type=NT_SRV_INST,
1348 names=[target_name, srealm])
1349 else:
1350 if target_name[-1] == '$':
1351 target_name = target_name[:-1]
1352 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1353 names=['host', target_name])
1355 if additional_ticket is not None:
1356 additional_tickets = [additional_ticket.ticket]
1357 decryption_key = additional_ticket.session_key
1358 else:
1359 additional_tickets = None
1360 decryption_key = self.TicketDecryptionKey_from_creds(
1361 target_creds)
1363 subkey = self.RandomKey(tgt.session_key.etype)
1365 etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
1367 if expected_error:
1368 check_error_fn = self.generic_check_kdc_error
1369 check_rep_fn = None
1370 else:
1371 check_error_fn = None
1372 check_rep_fn = self.generic_check_kdc_rep
1374 if expected_cname is None:
1375 expected_cname = tgt.cname
1377 kdc_exchange_dict = self.tgs_exchange_dict(
1378 expected_crealm=tgt.crealm,
1379 expected_cname=expected_cname,
1380 expected_srealm=srealm,
1381 expected_sname=sname,
1382 ticket_decryption_key=decryption_key,
1383 generate_padata_fn=generate_padata_fn,
1384 check_error_fn=check_error_fn,
1385 check_rep_fn=check_rep_fn,
1386 check_kdc_private_fn=self.generic_check_kdc_private,
1387 expected_error_mode=expected_error,
1388 expected_status=expected_status,
1389 tgt=tgt,
1390 authenticator_subkey=subkey,
1391 kdc_options=kdc_options,
1392 expect_edata=expect_edata,
1393 expect_claims=expect_claims)
1395 rep = self._generic_kdc_exchange(kdc_exchange_dict,
1396 cname=None,
1397 realm=srealm,
1398 sname=sname,
1399 etypes=etypes,
1400 additional_tickets=additional_tickets)
1401 if expected_error:
1402 self.check_error_rep(rep, expected_error)
1403 return None
1404 else:
1405 self.check_reply(rep, KRB_TGS_REP)
1406 return kdc_exchange_dict['rep_ticket_creds']
1409 if __name__ == "__main__":
1410 global_asn1_print = False
1411 global_hexdump = False
1412 import unittest
1413 unittest.main()