2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) 2020 Catalyst.Net Ltd
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 from samba
import dsdb
, ntstatus
28 from samba
.dcerpc
import krb5pac
, security
30 sys
.path
.insert(0, "bin/python")
31 os
.environ
["PYTHONUNBUFFERED"] = "1"
33 import samba
.tests
.krb5
.kcrypto
as kcrypto
34 from samba
.tests
.krb5
.kdc_base_test
import KDCBaseTest
35 from samba
.tests
.krb5
.rfc4120_constants
import (
36 AES256_CTS_HMAC_SHA1_96
,
42 KDC_ERR_CLIENT_NAME_MISMATCH
,
46 KDC_ERR_S_PRINCIPAL_UNKNOWN
,
51 import samba
.tests
.krb5
.rfc4120_pyasn1
as krb5_asn1
53 global_asn1_print
= False
54 global_hexdump
= False
57 class KdcTgsTests(KDCBaseTest
):
61 self
.do_asn1_print
= global_asn1_print
62 self
.do_hexdump
= global_hexdump
64 def test_tgs_req_cname_does_not_not_match_authenticator_cname(self
):
65 ''' Try and obtain a ticket from the TGS, but supply a cname
66 that differs from that provided to the krbtgt
68 # Create the user account
69 samdb
= self
.get_samdb()
70 user_name
= "tsttktusr"
71 (uc
, _
) = self
.create_account(samdb
, user_name
)
72 realm
= uc
.get_realm().lower()
74 # Do the initial AS-REQ, should get a pre-authentication required
76 etype
= (AES256_CTS_HMAC_SHA1_96
,)
77 cname
= self
.PrincipalName_create(
78 name_type
=NT_PRINCIPAL
, names
=[user_name
])
79 sname
= self
.PrincipalName_create(
80 name_type
=NT_SRV_INST
, names
=["krbtgt", realm
])
82 rep
= self
.as_req(cname
, sname
, realm
, etype
)
83 self
.check_pre_authentication(rep
)
86 padata
= self
.get_enc_timestamp_pa_data(uc
, rep
)
87 key
= self
.get_as_rep_key(uc
, rep
)
88 rep
= self
.as_req(cname
, sname
, realm
, etype
, padata
=[padata
])
89 self
.check_as_reply(rep
)
91 # Request a service ticket, but use a cname that does not match
92 # that in the original AS-REQ
93 enc_part2
= self
.get_as_rep_enc_data(key
, rep
)
94 key
= self
.EncryptionKey_import(enc_part2
['key'])
95 ticket
= rep
['ticket']
97 cname
= self
.PrincipalName_create(
98 name_type
=NT_PRINCIPAL
,
99 names
=["Administrator"])
100 sname
= self
.PrincipalName_create(
101 name_type
=NT_PRINCIPAL
,
102 names
=["host", samdb
.host_dns_name()])
104 (rep
, enc_part
) = self
.tgs_req(cname
, sname
, realm
, ticket
, key
, etype
,
105 expected_error_mode
=KDC_ERR_BADMATCH
,
110 "rep = {%s}, enc_part = {%s}" % (rep
, enc_part
))
111 self
.assertEqual(KRB_ERROR
, rep
['msg-type'], "rep = {%s}" % rep
)
117 def test_ldap_service_ticket(self
):
118 '''Get a ticket to the ldap service
120 # Create the user account
121 samdb
= self
.get_samdb()
122 user_name
= "tsttktusr"
123 (uc
, _
) = self
.create_account(samdb
, user_name
)
124 realm
= uc
.get_realm().lower()
126 # Do the initial AS-REQ, should get a pre-authentication required
128 etype
= (AES256_CTS_HMAC_SHA1_96
,)
129 cname
= self
.PrincipalName_create(
130 name_type
=NT_PRINCIPAL
, names
=[user_name
])
131 sname
= self
.PrincipalName_create(
132 name_type
=NT_SRV_INST
, names
=["krbtgt", realm
])
134 rep
= self
.as_req(cname
, sname
, realm
, etype
)
135 self
.check_pre_authentication(rep
)
138 padata
= self
.get_enc_timestamp_pa_data(uc
, rep
)
139 key
= self
.get_as_rep_key(uc
, rep
)
140 rep
= self
.as_req(cname
, sname
, realm
, etype
, padata
=[padata
])
141 self
.check_as_reply(rep
)
143 enc_part2
= self
.get_as_rep_enc_data(key
, rep
)
144 key
= self
.EncryptionKey_import(enc_part2
['key'])
145 ticket
= rep
['ticket']
147 # Request a ticket to the ldap service
148 sname
= self
.PrincipalName_create(
149 name_type
=NT_SRV_INST
,
150 names
=["ldap", samdb
.host_dns_name()])
152 (rep
, _
) = self
.tgs_req(
153 cname
, sname
, uc
.get_realm(), ticket
, key
, etype
,
154 service_creds
=self
.get_dc_creds())
156 self
.check_tgs_reply(rep
)
158 def test_get_ticket_for_host_service_of_machine_account(self
):
160 # Create a user and machine account for the test.
162 samdb
= self
.get_samdb()
163 user_name
= "tsttktusr"
164 (uc
, dn
) = self
.create_account(samdb
, user_name
)
165 (mc
, _
) = self
.create_account(samdb
, "tsttktmac",
166 account_type
=self
.AccountType
.COMPUTER
)
167 realm
= uc
.get_realm().lower()
169 # Do the initial AS-REQ, should get a pre-authentication required
171 etype
= (AES256_CTS_HMAC_SHA1_96
, ARCFOUR_HMAC_MD5
)
172 cname
= self
.PrincipalName_create(
173 name_type
=NT_PRINCIPAL
, names
=[user_name
])
174 sname
= self
.PrincipalName_create(
175 name_type
=NT_SRV_INST
, names
=["krbtgt", realm
])
177 rep
= self
.as_req(cname
, sname
, realm
, etype
)
178 self
.check_pre_authentication(rep
)
181 padata
= self
.get_enc_timestamp_pa_data(uc
, rep
)
182 key
= self
.get_as_rep_key(uc
, rep
)
183 rep
= self
.as_req(cname
, sname
, realm
, etype
, padata
=[padata
])
184 self
.check_as_reply(rep
)
186 # Request a ticket to the host service on the machine account
187 ticket
= rep
['ticket']
188 enc_part2
= self
.get_as_rep_enc_data(key
, rep
)
189 key
= self
.EncryptionKey_import(enc_part2
['key'])
190 cname
= self
.PrincipalName_create(
191 name_type
=NT_PRINCIPAL
,
193 sname
= self
.PrincipalName_create(
194 name_type
=NT_PRINCIPAL
,
195 names
=[mc
.get_username()])
197 (rep
, enc_part
) = self
.tgs_req(
198 cname
, sname
, uc
.get_realm(), ticket
, key
, etype
,
200 self
.check_tgs_reply(rep
)
202 # Check the contents of the service ticket
203 ticket
= rep
['ticket']
204 enc_part
= self
.decode_service_ticket(mc
, ticket
)
206 pac_data
= self
.get_pac_data(enc_part
['authorization-data'])
207 sid
= self
.get_objectSid(samdb
, dn
)
208 upn
= "%s@%s" % (uc
.get_username(), realm
)
211 str(pac_data
.account_name
),
212 "rep = {%s},%s" % (rep
, pac_data
))
216 "rep = {%s},%s" % (rep
, pac_data
))
219 pac_data
.domain_name
,
220 "rep = {%s},%s" % (rep
, pac_data
))
224 "rep = {%s},%s" % (rep
, pac_data
))
227 pac_data
.account_sid
,
228 "rep = {%s},%s" % (rep
, pac_data
))
230 def _make_tgs_request(self
, client_creds
, service_creds
, tgt
,
231 pac_request
=None, expect_pac
=True,
233 expected_account_name
=None,
234 expected_upn_name
=None,
236 client_account
= client_creds
.get_username()
237 cname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
238 names
=[client_account
])
240 service_account
= service_creds
.get_username()
241 sname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
242 names
=[service_account
])
244 realm
= service_creds
.get_realm()
246 expected_crealm
= realm
247 expected_cname
= cname
248 expected_srealm
= realm
249 expected_sname
= sname
251 expected_supported_etypes
= service_creds
.tgs_supported_enctypes
253 etypes
= (AES256_CTS_HMAC_SHA1_96
, ARCFOUR_HMAC_MD5
)
255 kdc_options
= str(krb5_asn1
.KDCOptions('canonicalize'))
257 target_decryption_key
= self
.TicketDecryptionKey_from_creds(
260 authenticator_subkey
= self
.RandomKey(kcrypto
.Enctype
.AES256
)
263 expected_error_mode
= KDC_ERR_BADOPTION
264 check_error_fn
= self
.generic_check_kdc_error
267 expected_error_mode
= 0
268 check_error_fn
= None
269 check_rep_fn
= self
.generic_check_kdc_rep
271 kdc_exchange_dict
= self
.tgs_exchange_dict(
272 expected_crealm
=expected_crealm
,
273 expected_cname
=expected_cname
,
274 expected_srealm
=expected_srealm
,
275 expected_sname
=expected_sname
,
276 expected_account_name
=expected_account_name
,
277 expected_upn_name
=expected_upn_name
,
278 expected_sid
=expected_sid
,
279 expected_supported_etypes
=expected_supported_etypes
,
280 ticket_decryption_key
=target_decryption_key
,
281 check_error_fn
=check_error_fn
,
282 check_rep_fn
=check_rep_fn
,
283 check_kdc_private_fn
=self
.generic_check_kdc_private
,
284 expected_error_mode
=expected_error_mode
,
286 authenticator_subkey
=authenticator_subkey
,
287 kdc_options
=kdc_options
,
288 pac_request
=pac_request
,
289 expect_pac
=expect_pac
)
291 rep
= self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
297 self
.check_error_rep(rep
, expected_error_mode
)
301 self
.check_reply(rep
, KRB_TGS_REP
)
303 return kdc_exchange_dict
['rep_ticket_creds']
305 def test_request(self
):
306 client_creds
= self
.get_client_creds()
307 service_creds
= self
.get_service_creds()
309 tgt
= self
.get_tgt(client_creds
)
311 pac
= self
.get_ticket_pac(tgt
)
312 self
.assertIsNotNone(pac
)
314 ticket
= self
._make
_tgs
_request
(client_creds
, service_creds
, tgt
)
316 pac
= self
.get_ticket_pac(ticket
)
317 self
.assertIsNotNone(pac
)
319 def test_request_no_pac(self
):
320 client_creds
= self
.get_client_creds()
321 service_creds
= self
.get_service_creds()
323 tgt
= self
.get_tgt(client_creds
, pac_request
=False)
325 pac
= self
.get_ticket_pac(tgt
)
326 self
.assertIsNotNone(pac
)
328 ticket
= self
._make
_tgs
_request
(client_creds
, service_creds
, tgt
,
329 pac_request
=False, expect_pac
=False)
331 pac
= self
.get_ticket_pac(ticket
, expect_pac
=False)
332 self
.assertIsNone(pac
)
334 def test_client_no_auth_data_required(self
):
335 client_creds
= self
.get_cached_creds(
336 account_type
=self
.AccountType
.USER
,
337 opts
={'no_auth_data_required': True})
338 service_creds
= self
.get_service_creds()
340 tgt
= self
.get_tgt(client_creds
)
342 pac
= self
.get_ticket_pac(tgt
)
343 self
.assertIsNotNone(pac
)
345 ticket
= self
._make
_tgs
_request
(client_creds
, service_creds
, tgt
)
347 pac
= self
.get_ticket_pac(ticket
)
348 self
.assertIsNotNone(pac
)
350 def test_no_pac_client_no_auth_data_required(self
):
351 client_creds
= self
.get_cached_creds(
352 account_type
=self
.AccountType
.USER
,
353 opts
={'no_auth_data_required': True})
354 service_creds
= self
.get_service_creds()
356 tgt
= self
.get_tgt(client_creds
)
358 pac
= self
.get_ticket_pac(tgt
)
359 self
.assertIsNotNone(pac
)
361 ticket
= self
._make
_tgs
_request
(client_creds
, service_creds
, tgt
,
362 pac_request
=False, expect_pac
=True)
364 pac
= self
.get_ticket_pac(ticket
)
365 self
.assertIsNotNone(pac
)
367 def test_service_no_auth_data_required(self
):
368 client_creds
= self
.get_client_creds()
369 service_creds
= self
.get_cached_creds(
370 account_type
=self
.AccountType
.COMPUTER
,
371 opts
={'no_auth_data_required': True})
373 tgt
= self
.get_tgt(client_creds
)
375 pac
= self
.get_ticket_pac(tgt
)
376 self
.assertIsNotNone(pac
)
378 ticket
= self
._make
_tgs
_request
(client_creds
, service_creds
, tgt
,
381 pac
= self
.get_ticket_pac(ticket
, expect_pac
=False)
382 self
.assertIsNone(pac
)
384 def test_no_pac_service_no_auth_data_required(self
):
385 client_creds
= self
.get_client_creds()
386 service_creds
= self
.get_cached_creds(
387 account_type
=self
.AccountType
.COMPUTER
,
388 opts
={'no_auth_data_required': True})
390 tgt
= self
.get_tgt(client_creds
, pac_request
=False)
392 pac
= self
.get_ticket_pac(tgt
)
393 self
.assertIsNotNone(pac
)
395 ticket
= self
._make
_tgs
_request
(client_creds
, service_creds
, tgt
,
396 pac_request
=False, expect_pac
=False)
398 pac
= self
.get_ticket_pac(ticket
, expect_pac
=False)
399 self
.assertIsNone(pac
)
401 def test_remove_pac_service_no_auth_data_required(self
):
402 client_creds
= self
.get_client_creds()
403 service_creds
= self
.get_cached_creds(
404 account_type
=self
.AccountType
.COMPUTER
,
405 opts
={'no_auth_data_required': True})
407 tgt
= self
.modified_ticket(self
.get_tgt(client_creds
),
410 pac
= self
.get_ticket_pac(tgt
, expect_pac
=False)
411 self
.assertIsNone(pac
)
413 self
._make
_tgs
_request
(client_creds
, service_creds
, tgt
,
414 expect_pac
=False, expect_error
=True)
416 def test_remove_pac_client_no_auth_data_required(self
):
417 client_creds
= self
.get_cached_creds(
418 account_type
=self
.AccountType
.USER
,
419 opts
={'no_auth_data_required': True})
420 service_creds
= self
.get_service_creds()
422 tgt
= self
.modified_ticket(self
.get_tgt(client_creds
),
425 pac
= self
.get_ticket_pac(tgt
, expect_pac
=False)
426 self
.assertIsNone(pac
)
428 self
._make
_tgs
_request
(client_creds
, service_creds
, tgt
,
429 expect_pac
=False, expect_error
=True)
431 def test_remove_pac(self
):
432 client_creds
= self
.get_client_creds()
433 service_creds
= self
.get_service_creds()
435 tgt
= self
.modified_ticket(self
.get_tgt(client_creds
),
438 pac
= self
.get_ticket_pac(tgt
, expect_pac
=False)
439 self
.assertIsNone(pac
)
441 self
._make
_tgs
_request
(client_creds
, service_creds
, tgt
,
442 expect_pac
=False, expect_error
=True)
444 def test_upn_dns_info_ex_user(self
):
445 client_creds
= self
.get_client_creds()
446 self
._run
_upn
_dns
_info
_ex
_test
(client_creds
)
448 def test_upn_dns_info_ex_mac(self
):
449 mach_creds
= self
.get_mach_creds()
450 self
._run
_upn
_dns
_info
_ex
_test
(mach_creds
)
452 def test_upn_dns_info_ex_upn_user(self
):
453 client_creds
= self
.get_cached_creds(
454 account_type
=self
.AccountType
.USER
,
455 opts
={'upn': 'upn_dns_info_test_upn0@bar'})
456 self
._run
_upn
_dns
_info
_ex
_test
(client_creds
)
458 def test_upn_dns_info_ex_upn_mac(self
):
459 mach_creds
= self
.get_cached_creds(
460 account_type
=self
.AccountType
.COMPUTER
,
461 opts
={'upn': 'upn_dns_info_test_upn1@bar'})
462 self
._run
_upn
_dns
_info
_ex
_test
(mach_creds
)
464 def _run_upn_dns_info_ex_test(self
, client_creds
):
465 service_creds
= self
.get_service_creds()
467 samdb
= self
.get_samdb()
468 dn
= client_creds
.get_dn()
470 account_name
= client_creds
.get_username()
471 upn_name
= client_creds
.get_upn()
473 realm
= client_creds
.get_realm().lower()
474 upn_name
= f
'{account_name}@{realm}'
475 sid
= self
.get_objectSid(samdb
, dn
)
477 tgt
= self
.get_tgt(client_creds
,
478 expected_account_name
=account_name
,
479 expected_upn_name
=upn_name
,
482 self
._make
_tgs
_request
(client_creds
, service_creds
, tgt
,
483 expected_account_name
=account_name
,
484 expected_upn_name
=upn_name
,
487 # Test making a TGS request.
488 def test_tgs_req(self
):
489 creds
= self
._get
_creds
()
490 tgt
= self
._get
_tgt
(creds
)
491 self
._run
_tgs
(tgt
, expected_error
=0)
493 def test_renew_req(self
):
494 creds
= self
._get
_creds
()
495 tgt
= self
._get
_tgt
(creds
, renewable
=True)
496 self
._renew
_tgt
(tgt
, expected_error
=0)
498 def test_validate_req(self
):
499 creds
= self
._get
_creds
()
500 tgt
= self
._get
_tgt
(creds
, invalid
=True)
501 self
._validate
_tgt
(tgt
, expected_error
=0)
503 def test_s4u2self_req(self
):
504 creds
= self
._get
_creds
()
505 tgt
= self
._get
_tgt
(creds
)
506 self
._s
4u2self
(tgt
, creds
, expected_error
=0)
508 def test_user2user_req(self
):
509 creds
= self
._get
_creds
()
510 tgt
= self
._get
_tgt
(creds
)
511 self
._user
2user
(tgt
, creds
, expected_error
=0)
513 # Test making a request without a PAC.
514 def test_tgs_no_pac(self
):
515 creds
= self
._get
_creds
()
516 tgt
= self
._get
_tgt
(creds
, remove_pac
=True)
517 self
._run
_tgs
(tgt
, expected_error
=KDC_ERR_BADOPTION
)
519 def test_renew_no_pac(self
):
520 creds
= self
._get
_creds
()
521 tgt
= self
._get
_tgt
(creds
, renewable
=True, remove_pac
=True)
522 self
._renew
_tgt
(tgt
, expected_error
=KDC_ERR_BADOPTION
)
524 def test_validate_no_pac(self
):
525 creds
= self
._get
_creds
()
526 tgt
= self
._get
_tgt
(creds
, invalid
=True, remove_pac
=True)
527 self
._validate
_tgt
(tgt
, expected_error
=KDC_ERR_BADOPTION
)
529 def test_s4u2self_no_pac(self
):
530 creds
= self
._get
_creds
()
531 tgt
= self
._get
_tgt
(creds
, remove_pac
=True)
532 self
._s
4u2self
(tgt
, creds
,
533 expected_error
=(KDC_ERR_GENERIC
, KDC_ERR_BADOPTION
),
534 expected_status
=ntstatus
.NT_STATUS_INVALID_PARAMETER
,
537 def test_user2user_no_pac(self
):
538 creds
= self
._get
_creds
()
539 tgt
= self
._get
_tgt
(creds
, remove_pac
=True)
540 self
._user
2user
(tgt
, creds
, expected_error
=KDC_ERR_BADOPTION
)
542 # Test making a request with authdata and without a PAC.
543 def test_tgs_authdata_no_pac(self
):
544 creds
= self
._get
_creds
()
545 tgt
= self
._get
_tgt
(creds
, remove_pac
=True, allow_empty_authdata
=True)
546 self
._run
_tgs
(tgt
, expected_error
=KDC_ERR_BADOPTION
)
548 def test_renew_authdata_no_pac(self
):
549 creds
= self
._get
_creds
()
550 tgt
= self
._get
_tgt
(creds
, renewable
=True, remove_pac
=True,
551 allow_empty_authdata
=True)
552 self
._renew
_tgt
(tgt
, expected_error
=KDC_ERR_BADOPTION
)
554 def test_validate_authdata_no_pac(self
):
555 creds
= self
._get
_creds
()
556 tgt
= self
._get
_tgt
(creds
, invalid
=True, remove_pac
=True,
557 allow_empty_authdata
=True)
558 self
._validate
_tgt
(tgt
, expected_error
=KDC_ERR_BADOPTION
)
560 def test_s4u2self_authdata_no_pac(self
):
561 creds
= self
._get
_creds
()
562 tgt
= self
._get
_tgt
(creds
, remove_pac
=True, allow_empty_authdata
=True)
563 self
._s
4u2self
(tgt
, creds
,
564 expected_error
=(KDC_ERR_GENERIC
, KDC_ERR_BADOPTION
),
565 expected_status
=ntstatus
.NT_STATUS_INVALID_PARAMETER
,
568 def test_user2user_authdata_no_pac(self
):
569 creds
= self
._get
_creds
()
570 tgt
= self
._get
_tgt
(creds
, remove_pac
=True, allow_empty_authdata
=True)
571 self
._user
2user
(tgt
, creds
, expected_error
=KDC_ERR_BADOPTION
)
573 # Test changing the SID in the PAC to that of another account.
574 def test_tgs_sid_mismatch_existing(self
):
575 creds
= self
._get
_creds
()
576 existing_rid
= self
._get
_existing
_rid
()
577 tgt
= self
._get
_tgt
(creds
, new_rid
=existing_rid
)
578 self
._run
_tgs
(tgt
, expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
580 def test_renew_sid_mismatch_existing(self
):
581 creds
= self
._get
_creds
()
582 existing_rid
= self
._get
_existing
_rid
()
583 tgt
= self
._get
_tgt
(creds
, renewable
=True, new_rid
=existing_rid
)
584 self
._renew
_tgt
(tgt
, expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
586 def test_validate_sid_mismatch_existing(self
):
587 creds
= self
._get
_creds
()
588 existing_rid
= self
._get
_existing
_rid
()
589 tgt
= self
._get
_tgt
(creds
, invalid
=True, new_rid
=existing_rid
)
590 self
._validate
_tgt
(tgt
, expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
592 def test_s4u2self_sid_mismatch_existing(self
):
593 creds
= self
._get
_creds
()
594 existing_rid
= self
._get
_existing
_rid
()
595 tgt
= self
._get
_tgt
(creds
, new_rid
=existing_rid
)
596 self
._s
4u2self
(tgt
, creds
,
597 expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
599 def test_user2user_sid_mismatch_existing(self
):
600 creds
= self
._get
_creds
()
601 existing_rid
= self
._get
_existing
_rid
()
602 tgt
= self
._get
_tgt
(creds
, new_rid
=existing_rid
)
603 self
._user
2user
(tgt
, creds
,
604 expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
606 # Test changing the SID in the PAC to a non-existent one.
607 def test_tgs_sid_mismatch_nonexisting(self
):
608 creds
= self
._get
_creds
()
609 nonexistent_rid
= self
._get
_non
_existent
_rid
()
610 tgt
= self
._get
_tgt
(creds
, new_rid
=nonexistent_rid
)
611 self
._run
_tgs
(tgt
, expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
613 def test_renew_sid_mismatch_nonexisting(self
):
614 creds
= self
._get
_creds
()
615 nonexistent_rid
= self
._get
_non
_existent
_rid
()
616 tgt
= self
._get
_tgt
(creds
, renewable
=True,
617 new_rid
=nonexistent_rid
)
618 self
._renew
_tgt
(tgt
, expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
620 def test_validate_sid_mismatch_nonexisting(self
):
621 creds
= self
._get
_creds
()
622 nonexistent_rid
= self
._get
_non
_existent
_rid
()
623 tgt
= self
._get
_tgt
(creds
, invalid
=True,
624 new_rid
=nonexistent_rid
)
625 self
._validate
_tgt
(tgt
, expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
627 def test_s4u2self_sid_mismatch_nonexisting(self
):
628 creds
= self
._get
_creds
()
629 nonexistent_rid
= self
._get
_non
_existent
_rid
()
630 tgt
= self
._get
_tgt
(creds
, new_rid
=nonexistent_rid
)
631 self
._s
4u2self
(tgt
, creds
,
632 expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
634 def test_user2user_sid_mismatch_nonexisting(self
):
635 creds
= self
._get
_creds
()
636 nonexistent_rid
= self
._get
_non
_existent
_rid
()
637 tgt
= self
._get
_tgt
(creds
, new_rid
=nonexistent_rid
)
638 self
._user
2user
(tgt
, creds
,
639 expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
641 # Test with an RODC-issued ticket where the client is revealed to the RODC.
642 def test_tgs_rodc_revealed(self
):
643 creds
= self
._get
_creds
(replication_allowed
=True,
644 revealed_to_rodc
=True)
645 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
646 self
._run
_tgs
(tgt
, expected_error
=0)
648 def test_renew_rodc_revealed(self
):
649 creds
= self
._get
_creds
(replication_allowed
=True,
650 revealed_to_rodc
=True)
651 tgt
= self
._get
_tgt
(creds
, renewable
=True, from_rodc
=True)
652 self
._renew
_tgt
(tgt
, expected_error
=0)
654 def test_validate_rodc_revealed(self
):
655 creds
= self
._get
_creds
(replication_allowed
=True,
656 revealed_to_rodc
=True)
657 tgt
= self
._get
_tgt
(creds
, invalid
=True, from_rodc
=True)
658 self
._validate
_tgt
(tgt
, expected_error
=0)
660 def test_s4u2self_rodc_revealed(self
):
661 creds
= self
._get
_creds
(replication_allowed
=True,
662 revealed_to_rodc
=True)
663 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
664 self
._s
4u2self
(tgt
, creds
, expected_error
=0)
666 def test_user2user_rodc_revealed(self
):
667 creds
= self
._get
_creds
(replication_allowed
=True,
668 revealed_to_rodc
=True)
669 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
670 self
._user
2user
(tgt
, creds
, expected_error
=0)
672 # Test with an RODC-issued ticket where the SID in the PAC is changed to
673 # that of another account.
674 def test_tgs_rodc_sid_mismatch_existing(self
):
675 creds
= self
._get
_creds
(replication_allowed
=True,
676 revealed_to_rodc
=True)
677 existing_rid
= self
._get
_existing
_rid
(replication_allowed
=True,
678 revealed_to_rodc
=True)
679 tgt
= self
._get
_tgt
(creds
, from_rodc
=True, new_rid
=existing_rid
)
680 self
._run
_tgs
(tgt
, expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
682 def test_renew_rodc_sid_mismatch_existing(self
):
683 creds
= self
._get
_creds
(replication_allowed
=True,
684 revealed_to_rodc
=True)
685 existing_rid
= self
._get
_existing
_rid
(replication_allowed
=True,
686 revealed_to_rodc
=True)
687 tgt
= self
._get
_tgt
(creds
, renewable
=True, from_rodc
=True,
688 new_rid
=existing_rid
)
689 self
._renew
_tgt
(tgt
, expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
691 def test_validate_rodc_sid_mismatch_existing(self
):
692 creds
= self
._get
_creds
(replication_allowed
=True,
693 revealed_to_rodc
=True)
694 existing_rid
= self
._get
_existing
_rid
(replication_allowed
=True,
695 revealed_to_rodc
=True)
696 tgt
= self
._get
_tgt
(creds
, invalid
=True, from_rodc
=True,
697 new_rid
=existing_rid
)
698 self
._validate
_tgt
(tgt
, expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
700 def test_s4u2self_rodc_sid_mismatch_existing(self
):
701 creds
= self
._get
_creds
(replication_allowed
=True,
702 revealed_to_rodc
=True)
703 existing_rid
= self
._get
_existing
_rid
(replication_allowed
=True,
704 revealed_to_rodc
=True)
705 tgt
= self
._get
_tgt
(creds
, from_rodc
=True, new_rid
=existing_rid
)
706 self
._s
4u2self
(tgt
, creds
, expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
708 def test_user2user_rodc_sid_mismatch_existing(self
):
709 creds
= self
._get
_creds
(replication_allowed
=True,
710 revealed_to_rodc
=True)
711 existing_rid
= self
._get
_existing
_rid
(replication_allowed
=True,
712 revealed_to_rodc
=True)
713 tgt
= self
._get
_tgt
(creds
, from_rodc
=True, new_rid
=existing_rid
)
714 self
._user
2user
(tgt
, creds
,
715 expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
717 # Test with an RODC-issued ticket where the SID in the PAC is changed to a
719 def test_tgs_rodc_sid_mismatch_nonexisting(self
):
720 creds
= self
._get
_creds
(replication_allowed
=True,
721 revealed_to_rodc
=True)
722 nonexistent_rid
= self
._get
_non
_existent
_rid
()
723 tgt
= self
._get
_tgt
(creds
, from_rodc
=True, new_rid
=nonexistent_rid
)
724 self
._run
_tgs
(tgt
, expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
726 def test_renew_rodc_sid_mismatch_nonexisting(self
):
727 creds
= self
._get
_creds
(replication_allowed
=True,
728 revealed_to_rodc
=True)
729 nonexistent_rid
= self
._get
_non
_existent
_rid
()
730 tgt
= self
._get
_tgt
(creds
, renewable
=True, from_rodc
=True,
731 new_rid
=nonexistent_rid
)
732 self
._renew
_tgt
(tgt
, expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
734 def test_validate_rodc_sid_mismatch_nonexisting(self
):
735 creds
= self
._get
_creds
(replication_allowed
=True,
736 revealed_to_rodc
=True)
737 nonexistent_rid
= self
._get
_non
_existent
_rid
()
738 tgt
= self
._get
_tgt
(creds
, invalid
=True, from_rodc
=True,
739 new_rid
=nonexistent_rid
)
740 self
._validate
_tgt
(tgt
, expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
742 def test_s4u2self_rodc_sid_mismatch_nonexisting(self
):
743 creds
= self
._get
_creds
(replication_allowed
=True,
744 revealed_to_rodc
=True)
745 nonexistent_rid
= self
._get
_non
_existent
_rid
()
746 tgt
= self
._get
_tgt
(creds
, from_rodc
=True, new_rid
=nonexistent_rid
)
747 self
._s
4u2self
(tgt
, creds
, expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
749 def test_user2user_rodc_sid_mismatch_nonexisting(self
):
750 creds
= self
._get
_creds
(replication_allowed
=True,
751 revealed_to_rodc
=True)
752 nonexistent_rid
= self
._get
_non
_existent
_rid
()
753 tgt
= self
._get
_tgt
(creds
, from_rodc
=True, new_rid
=nonexistent_rid
)
754 self
._user
2user
(tgt
, creds
,
755 expected_error
=KDC_ERR_CLIENT_NAME_MISMATCH
)
757 # Test with an RODC-issued ticket where the client is not revealed to the
759 def test_tgs_rodc_not_revealed(self
):
760 creds
= self
._get
_creds
(replication_allowed
=True)
761 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
763 self
._run
_tgs
(tgt
, expected_error
=KDC_ERR_TGT_REVOKED
)
765 def test_renew_rodc_not_revealed(self
):
766 creds
= self
._get
_creds
(replication_allowed
=True)
767 tgt
= self
._get
_tgt
(creds
, renewable
=True, from_rodc
=True)
768 self
._renew
_tgt
(tgt
, expected_error
=KDC_ERR_TGT_REVOKED
)
770 def test_validate_rodc_not_revealed(self
):
771 creds
= self
._get
_creds
(replication_allowed
=True)
772 tgt
= self
._get
_tgt
(creds
, invalid
=True, from_rodc
=True)
773 self
._validate
_tgt
(tgt
, expected_error
=KDC_ERR_TGT_REVOKED
)
775 def test_s4u2self_rodc_not_revealed(self
):
776 creds
= self
._get
_creds
(replication_allowed
=True)
777 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
778 self
._s
4u2self
(tgt
, creds
, expected_error
=KDC_ERR_TGT_REVOKED
)
780 def test_user2user_rodc_not_revealed(self
):
781 creds
= self
._get
_creds
(replication_allowed
=True)
782 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
783 self
._user
2user
(tgt
, creds
, expected_error
=KDC_ERR_TGT_REVOKED
)
785 # Test with an RODC-issued ticket where the RODC account does not have the
786 # PARTIAL_SECRETS bit set.
787 def test_tgs_rodc_no_partial_secrets(self
):
788 creds
= self
._get
_creds
(replication_allowed
=True,
789 revealed_to_rodc
=True)
790 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
791 self
._remove
_rodc
_partial
_secrets
()
792 self
._run
_tgs
(tgt
, expected_error
=KDC_ERR_POLICY
)
794 def test_renew_rodc_no_partial_secrets(self
):
795 creds
= self
._get
_creds
(replication_allowed
=True,
796 revealed_to_rodc
=True)
797 tgt
= self
._get
_tgt
(creds
, renewable
=True, from_rodc
=True)
798 self
._remove
_rodc
_partial
_secrets
()
799 self
._renew
_tgt
(tgt
, expected_error
=KDC_ERR_POLICY
)
801 def test_validate_rodc_no_partial_secrets(self
):
802 creds
= self
._get
_creds
(replication_allowed
=True,
803 revealed_to_rodc
=True)
804 tgt
= self
._get
_tgt
(creds
, invalid
=True, from_rodc
=True)
805 self
._remove
_rodc
_partial
_secrets
()
806 self
._validate
_tgt
(tgt
, expected_error
=KDC_ERR_POLICY
)
808 def test_s4u2self_rodc_no_partial_secrets(self
):
809 creds
= self
._get
_creds
(replication_allowed
=True,
810 revealed_to_rodc
=True)
811 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
812 self
._remove
_rodc
_partial
_secrets
()
813 self
._s
4u2self
(tgt
, creds
, expected_error
=KDC_ERR_POLICY
)
815 def test_user2user_rodc_no_partial_secrets(self
):
816 creds
= self
._get
_creds
(replication_allowed
=True,
817 revealed_to_rodc
=True)
818 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
819 self
._remove
_rodc
_partial
_secrets
()
820 self
._user
2user
(tgt
, creds
, expected_error
=KDC_ERR_POLICY
)
822 # Test with an RODC-issued ticket where the RODC account does not have an
824 def test_tgs_rodc_no_krbtgt_link(self
):
825 creds
= self
._get
_creds
(replication_allowed
=True,
826 revealed_to_rodc
=True)
827 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
828 self
._remove
_rodc
_krbtgt
_link
()
829 self
._run
_tgs
(tgt
, expected_error
=KDC_ERR_POLICY
)
831 def test_renew_rodc_no_krbtgt_link(self
):
832 creds
= self
._get
_creds
(replication_allowed
=True,
833 revealed_to_rodc
=True)
834 tgt
= self
._get
_tgt
(creds
, renewable
=True, from_rodc
=True)
835 self
._remove
_rodc
_krbtgt
_link
()
836 self
._renew
_tgt
(tgt
, expected_error
=KDC_ERR_POLICY
)
838 def test_validate_rodc_no_krbtgt_link(self
):
839 creds
= self
._get
_creds
(replication_allowed
=True,
840 revealed_to_rodc
=True)
841 tgt
= self
._get
_tgt
(creds
, invalid
=True, from_rodc
=True)
842 self
._remove
_rodc
_krbtgt
_link
()
843 self
._validate
_tgt
(tgt
, expected_error
=KDC_ERR_POLICY
)
845 def test_s4u2self_rodc_no_krbtgt_link(self
):
846 creds
= self
._get
_creds
(replication_allowed
=True,
847 revealed_to_rodc
=True)
848 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
849 self
._remove
_rodc
_krbtgt
_link
()
850 self
._s
4u2self
(tgt
, creds
, expected_error
=KDC_ERR_POLICY
)
852 def test_user2user_rodc_no_krbtgt_link(self
):
853 creds
= self
._get
_creds
(replication_allowed
=True,
854 revealed_to_rodc
=True)
855 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
856 self
._remove
_rodc
_krbtgt
_link
()
857 self
._user
2user
(tgt
, creds
, expected_error
=KDC_ERR_POLICY
)
859 # Test with an RODC-issued ticket where the client is not allowed to
860 # replicate to the RODC.
861 def test_tgs_rodc_not_allowed(self
):
862 creds
= self
._get
_creds
(revealed_to_rodc
=True)
863 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
864 self
._run
_tgs
(tgt
, expected_error
=KDC_ERR_TGT_REVOKED
)
866 def test_renew_rodc_not_allowed(self
):
867 creds
= self
._get
_creds
(revealed_to_rodc
=True)
868 tgt
= self
._get
_tgt
(creds
, renewable
=True, from_rodc
=True)
869 self
._renew
_tgt
(tgt
, expected_error
=KDC_ERR_TGT_REVOKED
)
871 def test_validate_rodc_not_allowed(self
):
872 creds
= self
._get
_creds
(revealed_to_rodc
=True)
873 tgt
= self
._get
_tgt
(creds
, invalid
=True, from_rodc
=True)
874 self
._validate
_tgt
(tgt
, expected_error
=KDC_ERR_TGT_REVOKED
)
876 def test_s4u2self_rodc_not_allowed(self
):
877 creds
= self
._get
_creds
(revealed_to_rodc
=True)
878 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
879 self
._s
4u2self
(tgt
, creds
, expected_error
=KDC_ERR_TGT_REVOKED
)
881 def test_user2user_rodc_not_allowed(self
):
882 creds
= self
._get
_creds
(revealed_to_rodc
=True)
883 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
884 self
._user
2user
(tgt
, creds
, expected_error
=KDC_ERR_TGT_REVOKED
)
886 # Test with an RODC-issued ticket where the client is denied from
887 # replicating to the RODC.
888 def test_tgs_rodc_denied(self
):
889 creds
= self
._get
_creds
(replication_denied
=True,
890 revealed_to_rodc
=True)
891 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
892 self
._run
_tgs
(tgt
, expected_error
=KDC_ERR_TGT_REVOKED
)
894 def test_renew_rodc_denied(self
):
895 creds
= self
._get
_creds
(replication_denied
=True,
896 revealed_to_rodc
=True)
897 tgt
= self
._get
_tgt
(creds
, renewable
=True, from_rodc
=True)
898 self
._renew
_tgt
(tgt
, expected_error
=KDC_ERR_TGT_REVOKED
)
900 def test_validate_rodc_denied(self
):
901 creds
= self
._get
_creds
(replication_denied
=True,
902 revealed_to_rodc
=True)
903 tgt
= self
._get
_tgt
(creds
, invalid
=True, from_rodc
=True)
904 self
._validate
_tgt
(tgt
, expected_error
=KDC_ERR_TGT_REVOKED
)
906 def test_s4u2self_rodc_denied(self
):
907 creds
= self
._get
_creds
(replication_denied
=True,
908 revealed_to_rodc
=True)
909 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
910 self
._s
4u2self
(tgt
, creds
, expected_error
=KDC_ERR_TGT_REVOKED
)
912 def test_user2user_rodc_denied(self
):
913 creds
= self
._get
_creds
(replication_denied
=True,
914 revealed_to_rodc
=True)
915 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
916 self
._user
2user
(tgt
, creds
, expected_error
=KDC_ERR_TGT_REVOKED
)
918 # Test with an RODC-issued ticket where the client is both allowed and
919 # denied replicating to the RODC.
920 def test_tgs_rodc_allowed_denied(self
):
921 creds
= self
._get
_creds
(replication_allowed
=True,
922 replication_denied
=True,
923 revealed_to_rodc
=True)
924 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
925 self
._run
_tgs
(tgt
, expected_error
=KDC_ERR_TGT_REVOKED
)
927 def test_renew_rodc_allowed_denied(self
):
928 creds
= self
._get
_creds
(replication_allowed
=True,
929 replication_denied
=True,
930 revealed_to_rodc
=True)
931 tgt
= self
._get
_tgt
(creds
, renewable
=True, from_rodc
=True)
932 self
._renew
_tgt
(tgt
, expected_error
=KDC_ERR_TGT_REVOKED
)
934 def test_validate_rodc_allowed_denied(self
):
935 creds
= self
._get
_creds
(replication_allowed
=True,
936 replication_denied
=True,
937 revealed_to_rodc
=True)
938 tgt
= self
._get
_tgt
(creds
, invalid
=True, from_rodc
=True)
939 self
._validate
_tgt
(tgt
, expected_error
=KDC_ERR_TGT_REVOKED
)
941 def test_s4u2self_rodc_allowed_denied(self
):
942 creds
= self
._get
_creds
(replication_allowed
=True,
943 replication_denied
=True,
944 revealed_to_rodc
=True)
945 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
946 self
._s
4u2self
(tgt
, creds
, expected_error
=KDC_ERR_TGT_REVOKED
)
948 def test_user2user_rodc_allowed_denied(self
):
949 creds
= self
._get
_creds
(replication_allowed
=True,
950 replication_denied
=True,
951 revealed_to_rodc
=True)
952 tgt
= self
._get
_tgt
(creds
, from_rodc
=True)
953 self
._user
2user
(tgt
, creds
, expected_error
=KDC_ERR_TGT_REVOKED
)
955 # Test user-to-user with incorrect service principal names.
956 def test_user2user_matching_sname_host(self
):
957 creds
= self
._get
_creds
()
958 tgt
= self
._get
_tgt
(creds
)
960 user_name
= creds
.get_username()
961 sname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
962 names
=['host', user_name
])
964 self
._user
2user
(tgt
, creds
, sname
=sname
,
965 expected_error
=KDC_ERR_S_PRINCIPAL_UNKNOWN
)
967 def test_user2user_matching_sname_no_host(self
):
968 creds
= self
._get
_creds
()
969 tgt
= self
._get
_tgt
(creds
)
971 user_name
= creds
.get_username()
972 sname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
975 self
._user
2user
(tgt
, creds
, sname
=sname
, expected_error
=0)
977 def test_user2user_wrong_sname(self
):
978 creds
= self
._get
_creds
()
979 tgt
= self
._get
_tgt
(creds
)
981 other_creds
= self
._get
_mach
_creds
()
982 user_name
= other_creds
.get_username()
983 sname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
986 self
._user
2user
(tgt
, creds
, sname
=sname
,
987 expected_error
=(KDC_ERR_BADMATCH
,
990 def test_user2user_non_existent_sname(self
):
991 creds
= self
._get
_creds
()
992 tgt
= self
._get
_tgt
(creds
)
994 sname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
995 names
=['host', 'non_existent_user'])
997 self
._user
2user
(tgt
, creds
, sname
=sname
,
998 expected_error
=KDC_ERR_S_PRINCIPAL_UNKNOWN
)
1000 def test_user2user_service_ticket(self
):
1001 creds
= self
._get
_creds
()
1002 tgt
= self
._get
_tgt
(creds
)
1004 service_creds
= self
.get_service_creds()
1005 service_ticket
= self
.get_service_ticket(tgt
, service_creds
)
1007 self
._user
2user
(service_ticket
, creds
,
1008 expected_error
=(KDC_ERR_MODIFIED
, KDC_ERR_POLICY
))
1017 allow_empty_authdata
=False,
1018 can_modify_logon_info
=True,
1019 can_modify_requester_sid
=True,
1020 remove_pac_attrs
=False,
1021 remove_requester_sid
=False):
1022 self
.assertFalse(renewable
and invalid
)
1025 self
.assertIsNone(new_rid
)
1027 tgt
= self
.get_tgt(client_creds
)
1029 return self
._modify
_tgt
(
1031 renewable
=renewable
,
1033 from_rodc
=from_rodc
,
1035 remove_pac
=remove_pac
,
1036 allow_empty_authdata
=allow_empty_authdata
,
1037 can_modify_logon_info
=can_modify_logon_info
,
1038 can_modify_requester_sid
=can_modify_requester_sid
,
1039 remove_pac_attrs
=remove_pac_attrs
,
1040 remove_requester_sid
=remove_requester_sid
)
1042 def _modify_tgt(self
,
1049 allow_empty_authdata
=False,
1052 can_modify_logon_info
=True,
1053 can_modify_requester_sid
=True,
1054 remove_pac_attrs
=False,
1055 remove_requester_sid
=False):
1057 krbtgt_creds
= self
.get_mock_rodc_krbtgt_creds()
1059 krbtgt_creds
= self
.get_krbtgt_creds()
1061 if new_rid
is not None or remove_requester_sid
or remove_pac_attrs
:
1062 def change_sid_fn(pac
):
1063 pac_buffers
= pac
.buffers
1064 for pac_buffer
in pac_buffers
:
1065 if pac_buffer
.type == krb5pac
.PAC_TYPE_LOGON_INFO
:
1066 if new_rid
is not None and can_modify_logon_info
:
1067 logon_info
= pac_buffer
.info
.info
1069 logon_info
.info3
.base
.rid
= new_rid
1070 elif pac_buffer
.type == krb5pac
.PAC_TYPE_REQUESTER_SID
:
1071 if remove_requester_sid
:
1072 pac
.num_buffers
-= 1
1073 pac_buffers
.remove(pac_buffer
)
1074 elif new_rid
is not None and can_modify_requester_sid
:
1075 requester_sid
= pac_buffer
.info
1077 samdb
= self
.get_samdb()
1078 domain_sid
= samdb
.get_domain_sid()
1080 new_sid
= f
'{domain_sid}-{new_rid}'
1082 requester_sid
.sid
= security
.dom_sid(new_sid
)
1083 elif pac_buffer
.type == krb5pac
.PAC_TYPE_ATTRIBUTES_INFO
:
1084 if remove_pac_attrs
:
1085 pac
.num_buffers
-= 1
1086 pac_buffers
.remove(pac_buffer
)
1088 pac
.buffers
= pac_buffers
1092 change_sid_fn
= None
1094 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
1097 checksum_keys
= None
1100 krb5pac
.PAC_TYPE_KDC_CHECKSUM
: krbtgt_key
1104 def flags_modify_fn(enc_part
):
1105 # Set the renewable flag.
1106 renewable_flag
= krb5_asn1
.TicketFlags('renewable')
1107 pos
= len(tuple(renewable_flag
)) - 1
1109 flags
= enc_part
['flags']
1110 self
.assertLessEqual(pos
, len(flags
))
1112 new_flags
= flags
[:pos
] + '1' + flags
[pos
+ 1:]
1113 enc_part
['flags'] = new_flags
1115 # Set the renew-till time to be in the future.
1116 renew_till
= self
.get_KerberosTime(offset
=100 * 60 * 60)
1117 enc_part
['renew-till'] = renew_till
1121 def flags_modify_fn(enc_part
):
1122 # Set the invalid flag.
1123 invalid_flag
= krb5_asn1
.TicketFlags('invalid')
1124 pos
= len(tuple(invalid_flag
)) - 1
1126 flags
= enc_part
['flags']
1127 self
.assertLessEqual(pos
, len(flags
))
1129 new_flags
= flags
[:pos
] + '1' + flags
[pos
+ 1:]
1130 enc_part
['flags'] = new_flags
1132 # Set the ticket start time to be in the past.
1133 past_time
= self
.get_KerberosTime(offset
=-100 * 60 * 60)
1134 enc_part
['starttime'] = past_time
1138 flags_modify_fn
= None
1140 if cname
is not None or crealm
is not None:
1141 def modify_fn(enc_part
):
1142 if flags_modify_fn
is not None:
1143 enc_part
= flags_modify_fn(enc_part
)
1145 if cname
is not None:
1146 enc_part
['cname'] = cname
1148 if crealm
is not None:
1149 enc_part
['crealm'] = crealm
1153 modify_fn
= flags_modify_fn
1155 if cname
is not None:
1156 def modify_pac_fn(pac
):
1157 if change_sid_fn
is not None:
1158 pac
= change_sid_fn(pac
)
1160 for pac_buffer
in pac
.buffers
:
1161 if pac_buffer
.type == krb5pac
.PAC_TYPE_LOGON_NAME
:
1162 logon_info
= pac_buffer
.info
1164 logon_info
.account_name
= (
1165 cname
['name-string'][0].decode('utf-8'))
1169 modify_pac_fn
= change_sid_fn
1171 return self
.modified_ticket(
1173 new_ticket_key
=krbtgt_key
,
1174 modify_fn
=modify_fn
,
1175 modify_pac_fn
=modify_pac_fn
,
1176 exclude_pac
=remove_pac
,
1177 allow_empty_authdata
=allow_empty_authdata
,
1178 update_pac_checksums
=not remove_pac
,
1179 checksum_keys
=checksum_keys
)
1181 def _remove_rodc_partial_secrets(self
):
1182 samdb
= self
.get_samdb()
1184 rodc_ctx
= self
.get_mock_rodc_ctx()
1185 rodc_dn
= ldb
.Dn(samdb
, rodc_ctx
.acct_dn
)
1187 def add_rodc_partial_secrets():
1190 msg
['userAccountControl'] = ldb
.MessageElement(
1191 str(rodc_ctx
.userAccountControl
),
1192 ldb
.FLAG_MOD_REPLACE
,
1193 'userAccountControl')
1196 self
.addCleanup(add_rodc_partial_secrets
)
1198 uac
= rodc_ctx
.userAccountControl
& ~dsdb
.UF_PARTIAL_SECRETS_ACCOUNT
1202 msg
['userAccountControl'] = ldb
.MessageElement(
1204 ldb
.FLAG_MOD_REPLACE
,
1205 'userAccountControl')
1208 def _remove_rodc_krbtgt_link(self
):
1209 samdb
= self
.get_samdb()
1211 rodc_ctx
= self
.get_mock_rodc_ctx()
1212 rodc_dn
= ldb
.Dn(samdb
, rodc_ctx
.acct_dn
)
1214 def add_rodc_krbtgt_link():
1217 msg
['msDS-KrbTgtLink'] = ldb
.MessageElement(
1218 rodc_ctx
.new_krbtgt_dn
,
1223 self
.addCleanup(add_rodc_krbtgt_link
)
1227 msg
['msDS-KrbTgtLink'] = ldb
.MessageElement(
1229 ldb
.FLAG_MOD_DELETE
,
1233 def _get_creds(self
,
1234 replication_allowed
=False,
1235 replication_denied
=False,
1236 revealed_to_rodc
=False):
1237 return self
.get_cached_creds(
1238 account_type
=self
.AccountType
.COMPUTER
,
1240 'allowed_replication_mock': replication_allowed
,
1241 'denied_replication_mock': replication_denied
,
1242 'revealed_to_mock_rodc': revealed_to_rodc
,
1246 def _get_existing_rid(self
,
1247 replication_allowed
=False,
1248 replication_denied
=False,
1249 revealed_to_rodc
=False):
1250 other_creds
= self
.get_cached_creds(
1251 account_type
=self
.AccountType
.COMPUTER
,
1253 'allowed_replication_mock': replication_allowed
,
1254 'denied_replication_mock': replication_denied
,
1255 'revealed_to_mock_rodc': revealed_to_rodc
,
1259 samdb
= self
.get_samdb()
1261 other_dn
= other_creds
.get_dn()
1262 other_sid
= self
.get_objectSid(samdb
, other_dn
)
1264 other_rid
= int(other_sid
.rsplit('-', 1)[1])
1268 def _get_mach_creds(self
):
1269 return self
.get_cached_creds(
1270 account_type
=self
.AccountType
.COMPUTER
,
1272 'allowed_replication_mock': True,
1273 'denied_replication_mock': False,
1274 'revealed_to_mock_rodc': True,
1278 def _get_non_existent_rid(self
):
1279 return (1 << 30) - 1
1281 def _run_tgs(self
, tgt
, expected_error
):
1282 target_creds
= self
.get_service_creds()
1283 self
._tgs
_req
(tgt
, expected_error
, target_creds
)
1285 def _renew_tgt(self
, tgt
, expected_error
):
1286 krbtgt_creds
= self
.get_krbtgt_creds()
1287 kdc_options
= str(krb5_asn1
.KDCOptions('renew'))
1288 self
._tgs
_req
(tgt
, expected_error
, krbtgt_creds
,
1289 kdc_options
=kdc_options
)
1291 def _validate_tgt(self
, tgt
, expected_error
):
1292 krbtgt_creds
= self
.get_krbtgt_creds()
1293 kdc_options
= str(krb5_asn1
.KDCOptions('validate'))
1294 self
._tgs
_req
(tgt
, expected_error
, krbtgt_creds
,
1295 kdc_options
=kdc_options
)
1297 def _s4u2self(self
, tgt
, tgt_creds
, expected_error
,
1298 expect_edata
=False, expected_status
=None):
1299 user_creds
= self
._get
_mach
_creds
()
1301 user_name
= user_creds
.get_username()
1302 user_cname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
1304 user_realm
= user_creds
.get_realm()
1306 def generate_s4u2self_padata(_kdc_exchange_dict
,
1309 padata
= self
.PA_S4U2Self_create(
1312 tgt_session_key
=tgt
.session_key
,
1315 return [padata
], req_body
1317 return self
._tgs
_req
(tgt
, expected_error
, tgt_creds
,
1318 expected_cname
=user_cname
,
1319 generate_padata_fn
=generate_s4u2self_padata
,
1320 expect_claims
=False, expect_edata
=expect_edata
,
1321 expected_status
=expected_status
)
1323 def _user2user(self
, tgt
, tgt_creds
, expected_error
, sname
=None):
1324 user_creds
= self
._get
_mach
_creds
()
1325 user_tgt
= self
.get_tgt(user_creds
)
1327 kdc_options
= str(krb5_asn1
.KDCOptions('enc-tkt-in-skey'))
1328 self
._tgs
_req
(user_tgt
, expected_error
, tgt_creds
,
1329 kdc_options
=kdc_options
,
1330 additional_ticket
=tgt
,
1333 def _tgs_req(self
, tgt
, expected_error
, target_creds
,
1335 expected_cname
=None,
1336 additional_ticket
=None,
1337 generate_padata_fn
=None,
1341 expected_status
=None):
1342 srealm
= target_creds
.get_realm()
1345 target_name
= target_creds
.get_username()
1346 if target_name
== 'krbtgt':
1347 sname
= self
.PrincipalName_create(name_type
=NT_SRV_INST
,
1348 names
=[target_name
, srealm
])
1350 if target_name
[-1] == '$':
1351 target_name
= target_name
[:-1]
1352 sname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
1353 names
=['host', target_name
])
1355 if additional_ticket
is not None:
1356 additional_tickets
= [additional_ticket
.ticket
]
1357 decryption_key
= additional_ticket
.session_key
1359 additional_tickets
= None
1360 decryption_key
= self
.TicketDecryptionKey_from_creds(
1363 subkey
= self
.RandomKey(tgt
.session_key
.etype
)
1365 etypes
= (AES256_CTS_HMAC_SHA1_96
, ARCFOUR_HMAC_MD5
)
1368 check_error_fn
= self
.generic_check_kdc_error
1371 check_error_fn
= None
1372 check_rep_fn
= self
.generic_check_kdc_rep
1374 if expected_cname
is None:
1375 expected_cname
= tgt
.cname
1377 kdc_exchange_dict
= self
.tgs_exchange_dict(
1378 expected_crealm
=tgt
.crealm
,
1379 expected_cname
=expected_cname
,
1380 expected_srealm
=srealm
,
1381 expected_sname
=sname
,
1382 ticket_decryption_key
=decryption_key
,
1383 generate_padata_fn
=generate_padata_fn
,
1384 check_error_fn
=check_error_fn
,
1385 check_rep_fn
=check_rep_fn
,
1386 check_kdc_private_fn
=self
.generic_check_kdc_private
,
1387 expected_error_mode
=expected_error
,
1388 expected_status
=expected_status
,
1390 authenticator_subkey
=subkey
,
1391 kdc_options
=kdc_options
,
1392 expect_edata
=expect_edata
,
1393 expect_claims
=expect_claims
)
1395 rep
= self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
1400 additional_tickets
=additional_tickets
)
1402 self
.check_error_rep(rep
, expected_error
)
1405 self
.check_reply(rep
, KRB_TGS_REP
)
1406 return kdc_exchange_dict
['rep_ticket_creds']
1409 if __name__
== "__main__":
1410 global_asn1_print
= False
1411 global_hexdump
= False