tests/krb5: Allow specifying options and expected flags when obtaining a ticket
[Samba.git] / python / samba / tests / krb5 / kdc_base_test.py
blob34a23b3b876717028d080fb90214c2e8204d7b31
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Stefan Metzmacher 2020
3 # Copyright (C) 2020-2021 Catalyst.Net Ltd
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import os
21 from datetime import datetime, timezone
22 import tempfile
23 import binascii
24 import collections
25 import secrets
27 from collections import namedtuple
28 import ldb
29 from ldb import SCOPE_BASE
30 from samba import generate_random_password
31 from samba.auth import system_session
32 from samba.credentials import Credentials, SPECIFIED, MUST_USE_KERBEROS
33 from samba.dcerpc import drsblobs, drsuapi, misc, krb5pac, krb5ccache, security
34 from samba.drs_utils import drs_Replicate, drsuapi_connect
35 from samba.dsdb import (
36 DSDB_SYNTAX_BINARY_DN,
37 DS_DOMAIN_FUNCTION_2000,
38 DS_DOMAIN_FUNCTION_2008,
39 DS_GUID_COMPUTERS_CONTAINER,
40 DS_GUID_USERS_CONTAINER,
41 UF_WORKSTATION_TRUST_ACCOUNT,
42 UF_NO_AUTH_DATA_REQUIRED,
43 UF_NORMAL_ACCOUNT,
44 UF_NOT_DELEGATED,
45 UF_PARTIAL_SECRETS_ACCOUNT,
46 UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
48 from samba.join import DCJoinContext
49 from samba.ndr import ndr_pack, ndr_unpack
50 from samba import net
51 from samba.samdb import SamDB, dsdb_Dn
53 from samba.tests import delete_force
54 import samba.tests.krb5.kcrypto as kcrypto
55 from samba.tests.krb5.raw_testcase import (
56 KerberosCredentials,
57 KerberosTicketCreds,
58 RawKerberosTest
60 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
61 from samba.tests.krb5.rfc4120_constants import (
62 AD_IF_RELEVANT,
63 AD_WIN2K_PAC,
64 AES256_CTS_HMAC_SHA1_96,
65 ARCFOUR_HMAC_MD5,
66 KDC_ERR_PREAUTH_REQUIRED,
67 KRB_AS_REP,
68 KRB_TGS_REP,
69 KRB_ERROR,
70 KU_AS_REP_ENC_PART,
71 KU_ENC_CHALLENGE_CLIENT,
72 KU_PA_ENC_TIMESTAMP,
73 KU_TICKET,
74 NT_PRINCIPAL,
75 NT_SRV_HST,
76 NT_SRV_INST,
77 PADATA_ENCRYPTED_CHALLENGE,
78 PADATA_ENC_TIMESTAMP,
79 PADATA_ETYPE_INFO2,
82 sys.path.insert(0, "bin/python")
83 os.environ["PYTHONUNBUFFERED"] = "1"
85 global_asn1_print = False
86 global_hexdump = False
89 class KDCBaseTest(RawKerberosTest):
90 """ Base class for KDC tests.
91 """
93 @classmethod
94 def setUpClass(cls):
95 super().setUpClass()
96 cls._lp = None
98 cls._ldb = None
99 cls._rodc_ldb = None
101 cls._functional_level = None
103 # An identifier to ensure created accounts have unique names. Windows
104 # caches accounts based on usernames, so account names being different
105 # across test runs avoids previous test runs affecting the results.
106 cls.account_base = f'krb5_{secrets.token_hex(5)}_'
107 cls.account_id = 0
109 # A set containing DNs of accounts created as part of testing.
110 cls.accounts = set()
112 cls.account_cache = {}
113 cls.tkt_cache = {}
115 cls._rodc_ctx = None
117 cls.ldb_cleanups = []
119 @classmethod
120 def tearDownClass(cls):
121 # Clean up any accounts created by create_account. This is
122 # done in tearDownClass() rather than tearDown(), so that
123 # accounts need only be created once for permutation tests.
124 if cls._ldb is not None:
125 for cleanup in reversed(cls.ldb_cleanups):
126 try:
127 cls._ldb.modify(cleanup)
128 except ldb.LdbError:
129 pass
131 for dn in cls.accounts:
132 delete_force(cls._ldb, dn)
134 if cls._rodc_ctx is not None:
135 cls._rodc_ctx.cleanup_old_join(force=True)
137 super().tearDownClass()
139 def setUp(self):
140 super().setUp()
141 self.do_asn1_print = global_asn1_print
142 self.do_hexdump = global_hexdump
144 def get_lp(self):
145 if self._lp is None:
146 type(self)._lp = self.get_loadparm()
148 return self._lp
150 def get_samdb(self):
151 if self._ldb is None:
152 creds = self.get_admin_creds()
153 lp = self.get_lp()
155 session = system_session()
156 type(self)._ldb = SamDB(url="ldap://%s" % self.dc_host,
157 session_info=session,
158 credentials=creds,
159 lp=lp)
161 return self._ldb
163 def get_rodc_samdb(self):
164 if self._rodc_ldb is None:
165 creds = self.get_admin_creds()
166 lp = self.get_lp()
168 session = system_session()
169 type(self)._rodc_ldb = SamDB(url="ldap://%s" % self.host,
170 session_info=session,
171 credentials=creds,
172 lp=lp,
173 am_rodc=True)
175 return self._rodc_ldb
177 def get_server_dn(self, samdb):
178 server = samdb.get_serverName()
180 res = samdb.search(base=server,
181 scope=ldb.SCOPE_BASE,
182 attrs=['serverReference'])
183 dn = ldb.Dn(samdb, res[0]['serverReference'][0].decode('utf8'))
185 return dn
187 def get_mock_rodc_ctx(self):
188 if self._rodc_ctx is None:
189 admin_creds = self.get_admin_creds()
190 lp = self.get_lp()
192 rodc_name = 'KRB5RODC'
193 site_name = 'Default-First-Site-Name'
195 type(self)._rodc_ctx = DCJoinContext(server=self.dc_host,
196 creds=admin_creds,
197 lp=lp,
198 site=site_name,
199 netbios_name=rodc_name,
200 targetdir=None,
201 domain=None)
202 self.create_rodc(self._rodc_ctx)
204 return self._rodc_ctx
206 def get_domain_functional_level(self, ldb):
207 if self._functional_level is None:
208 res = ldb.search(base='',
209 scope=SCOPE_BASE,
210 attrs=['domainFunctionality'])
211 try:
212 functional_level = int(res[0]['domainFunctionality'][0])
213 except KeyError:
214 functional_level = DS_DOMAIN_FUNCTION_2000
216 type(self)._functional_level = functional_level
218 return self._functional_level
220 def get_default_enctypes(self):
221 samdb = self.get_samdb()
222 functional_level = self.get_domain_functional_level(samdb)
224 # RC4 should always be supported
225 default_enctypes = {kcrypto.Enctype.RC4}
226 if functional_level >= DS_DOMAIN_FUNCTION_2008:
227 # AES is only supported at functional level 2008 or higher
228 default_enctypes.add(kcrypto.Enctype.AES256)
229 default_enctypes.add(kcrypto.Enctype.AES128)
231 return default_enctypes
233 def create_account(self, samdb, name, machine_account=False,
234 spn=None, upn=None, additional_details=None,
235 ou=None, account_control=0):
236 '''Create an account for testing.
237 The dn of the created account is added to self.accounts,
238 which is used by tearDownClass to clean up the created accounts.
240 if ou is None:
241 guid = (DS_GUID_COMPUTERS_CONTAINER if machine_account
242 else DS_GUID_USERS_CONTAINER)
244 ou = samdb.get_wellknown_dn(samdb.get_default_basedn(), guid)
246 dn = "CN=%s,%s" % (name, ou)
248 # remove the account if it exists, this will happen if a previous test
249 # run failed
250 delete_force(samdb, dn)
251 if machine_account:
252 object_class = "computer"
253 account_name = "%s$" % name
254 account_control |= UF_WORKSTATION_TRUST_ACCOUNT
255 else:
256 object_class = "user"
257 account_name = name
258 account_control |= UF_NORMAL_ACCOUNT
260 password = generate_random_password(32, 32)
261 utf16pw = ('"%s"' % password).encode('utf-16-le')
263 details = {
264 "dn": dn,
265 "objectclass": object_class,
266 "sAMAccountName": account_name,
267 "userAccountControl": str(account_control),
268 "unicodePwd": utf16pw}
269 if spn is not None:
270 details["servicePrincipalName"] = spn
271 if upn is not None:
272 details["userPrincipalName"] = upn
273 if additional_details is not None:
274 details.update(additional_details)
275 samdb.add(details)
277 creds = KerberosCredentials()
278 creds.guess(self.get_lp())
279 creds.set_realm(samdb.domain_dns_name().upper())
280 creds.set_domain(samdb.domain_netbios_name().upper())
281 creds.set_password(password)
282 creds.set_username(account_name)
283 if machine_account:
284 creds.set_workstation(name)
285 else:
286 creds.set_workstation('')
287 creds.set_dn(ldb.Dn(samdb, dn))
288 creds.set_spn(spn)
290 # Save the account name so it can be deleted in tearDownClass
291 self.accounts.add(dn)
293 self.creds_set_enctypes(creds)
295 res = samdb.search(base=dn,
296 scope=ldb.SCOPE_BASE,
297 attrs=['msDS-KeyVersionNumber'])
298 kvno = res[0].get('msDS-KeyVersionNumber', idx=0)
299 if kvno is not None:
300 self.assertEqual(int(kvno), 1)
301 creds.set_kvno(1)
303 return (creds, dn)
305 def get_security_descriptor(self, dn):
306 samdb = self.get_samdb()
308 sid = self.get_objectSid(samdb, dn)
310 owner_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS)
312 ace = security.ace()
313 ace.access_mask = security.SEC_ADS_GENERIC_ALL
315 ace.trustee = security.dom_sid(sid)
317 dacl = security.acl()
318 dacl.revision = security.SECURITY_ACL_REVISION_ADS
319 dacl.aces = [ace]
320 dacl.num_aces = 1
322 security_desc = security.descriptor()
323 security_desc.type |= security.SEC_DESC_DACL_PRESENT
324 security_desc.owner_sid = owner_sid
325 security_desc.dacl = dacl
327 return ndr_pack(security_desc)
329 def create_rodc(self, ctx):
330 ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
331 ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
332 ctx.krbtgt_dn = f'CN=krbtgt_{ctx.myname},CN=Users,{ctx.base_dn}'
334 ctx.never_reveal_sid = [f'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_DENY}>',
335 f'<SID={security.SID_BUILTIN_ADMINISTRATORS}>',
336 f'<SID={security.SID_BUILTIN_SERVER_OPERATORS}>',
337 f'<SID={security.SID_BUILTIN_BACKUP_OPERATORS}>',
338 f'<SID={security.SID_BUILTIN_ACCOUNT_OPERATORS}>']
339 ctx.reveal_sid = f'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_ALLOW}>'
341 mysid = ctx.get_mysid()
342 admin_dn = f'<SID={mysid}>'
343 ctx.managedby = admin_dn
345 ctx.userAccountControl = (UF_WORKSTATION_TRUST_ACCOUNT |
346 UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
347 UF_PARTIAL_SECRETS_ACCOUNT)
349 ctx.connection_dn = f'CN=RODC Connection (FRS),{ctx.ntds_dn}'
350 ctx.secure_channel_type = misc.SEC_CHAN_RODC
351 ctx.RODC = True
352 ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
353 drsuapi.DRSUAPI_DRS_PER_SYNC |
354 drsuapi.DRSUAPI_DRS_GET_ANC |
355 drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
356 drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
357 ctx.domain_replica_flags = ctx.replica_flags | drsuapi.DRSUAPI_DRS_CRITICAL_ONLY
359 ctx.build_nc_lists()
361 ctx.cleanup_old_join()
363 try:
364 ctx.join_add_objects()
365 except Exception:
366 # cleanup the failed join (checking we still have a live LDB
367 # connection to the remote DC first)
368 ctx.refresh_ldb_connection()
369 ctx.cleanup_old_join()
370 raise
372 def replicate_account_to_rodc(self, dn):
373 samdb = self.get_samdb()
374 rodc_samdb = self.get_rodc_samdb()
376 repl_val = f'{samdb.get_dsServiceName()}:{dn}:SECRETS_ONLY'
378 msg = ldb.Message()
379 msg.dn = ldb.Dn(rodc_samdb, '')
380 msg['replicateSingleObject'] = ldb.MessageElement(
381 repl_val,
382 ldb.FLAG_MOD_REPLACE,
383 'replicateSingleObject')
385 try:
386 # Try replication using the replicateSingleObject rootDSE
387 # operation.
388 rodc_samdb.modify(msg)
389 except ldb.LdbError as err:
390 enum, estr = err.args
391 self.assertEqual(enum, ldb.ERR_UNWILLING_TO_PERFORM)
392 self.assertIn('rootdse_modify: unknown attribute to change!',
393 estr)
395 # If that method wasn't supported, we may be in the rodc:local test
396 # environment, where we can try replicating to the local database.
398 lp = self.get_lp()
400 rodc_creds = Credentials()
401 rodc_creds.guess(lp)
402 rodc_creds.set_machine_account(lp)
404 local_samdb = SamDB(url=None, session_info=system_session(),
405 credentials=rodc_creds, lp=lp)
407 destination_dsa_guid = misc.GUID(local_samdb.get_ntds_GUID())
409 repl = drs_Replicate(f'ncacn_ip_tcp:{self.dc_host}[seal]',
410 lp, rodc_creds,
411 local_samdb, destination_dsa_guid)
413 source_dsa_invocation_id = misc.GUID(samdb.invocation_id)
415 repl.replicate(dn,
416 source_dsa_invocation_id,
417 destination_dsa_guid,
418 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
419 rodc=True)
421 def reveal_account_to_mock_rodc(self, dn):
422 samdb = self.get_samdb()
423 rodc_ctx = self.get_mock_rodc_ctx()
425 self.get_secrets(
426 samdb,
428 destination_dsa_guid=rodc_ctx.ntds_guid,
429 source_dsa_invocation_id=misc.GUID(samdb.invocation_id))
431 def check_revealed(self, dn, rodc_dn, revealed=True):
432 samdb = self.get_samdb()
434 res = samdb.search(base=rodc_dn,
435 scope=ldb.SCOPE_BASE,
436 attrs=['msDS-RevealedUsers'])
438 revealed_users = res[0].get('msDS-RevealedUsers')
439 if revealed_users is None:
440 self.assertFalse(revealed)
441 return
443 revealed_dns = set(str(dsdb_Dn(samdb, str(user),
444 syntax_oid=DSDB_SYNTAX_BINARY_DN).dn)
445 for user in revealed_users)
447 if revealed:
448 self.assertIn(str(dn), revealed_dns)
449 else:
450 self.assertNotIn(str(dn), revealed_dns)
452 def get_secrets(self, samdb, dn,
453 destination_dsa_guid,
454 source_dsa_invocation_id):
455 admin_creds = self.get_admin_creds()
457 dns_hostname = samdb.host_dns_name()
458 (bind, handle, _) = drsuapi_connect(dns_hostname,
459 self.get_lp(),
460 admin_creds)
462 req = drsuapi.DsGetNCChangesRequest8()
464 req.destination_dsa_guid = destination_dsa_guid
465 req.source_dsa_invocation_id = source_dsa_invocation_id
467 naming_context = drsuapi.DsReplicaObjectIdentifier()
468 naming_context.dn = dn
470 req.naming_context = naming_context
472 hwm = drsuapi.DsReplicaHighWaterMark()
473 hwm.tmp_highest_usn = 0
474 hwm.reserved_usn = 0
475 hwm.highest_usn = 0
477 req.highwatermark = hwm
478 req.uptodateness_vector = None
480 req.replica_flags = 0
482 req.max_object_count = 1
483 req.max_ndr_size = 402116
484 req.extended_op = drsuapi.DRSUAPI_EXOP_REPL_SECRET
486 attids = [drsuapi.DRSUAPI_ATTID_supplementalCredentials,
487 drsuapi.DRSUAPI_ATTID_unicodePwd]
489 partial_attribute_set = drsuapi.DsPartialAttributeSet()
490 partial_attribute_set.version = 1
491 partial_attribute_set.attids = attids
492 partial_attribute_set.num_attids = len(attids)
494 req.partial_attribute_set = partial_attribute_set
496 req.partial_attribute_set_ex = None
497 req.mapping_ctr.num_mappings = 0
498 req.mapping_ctr.mappings = None
500 _, ctr = bind.DsGetNCChanges(handle, 8, req)
502 self.assertEqual(1, ctr.object_count)
504 identifier = ctr.first_object.object.identifier
505 attributes = ctr.first_object.object.attribute_ctr.attributes
507 self.assertEqual(dn, identifier.dn)
509 return bind, identifier, attributes
511 def get_keys(self, samdb, dn):
512 admin_creds = self.get_admin_creds()
514 bind, identifier, attributes = self.get_secrets(
515 samdb,
516 str(dn),
517 destination_dsa_guid=misc.GUID(samdb.get_ntds_GUID()),
518 source_dsa_invocation_id=misc.GUID())
520 rid = identifier.sid.split()[1]
522 net_ctx = net.Net(admin_creds)
524 keys = {}
526 for attr in attributes:
527 if attr.attid == drsuapi.DRSUAPI_ATTID_supplementalCredentials:
528 net_ctx.replicate_decrypt(bind, attr, rid)
529 attr_val = attr.value_ctr.values[0].blob
531 spl = ndr_unpack(drsblobs.supplementalCredentialsBlob,
532 attr_val)
533 for pkg in spl.sub.packages:
534 if pkg.name == 'Primary:Kerberos-Newer-Keys':
535 krb5_new_keys_raw = binascii.a2b_hex(pkg.data)
536 krb5_new_keys = ndr_unpack(
537 drsblobs.package_PrimaryKerberosBlob,
538 krb5_new_keys_raw)
539 for key in krb5_new_keys.ctr.keys:
540 keytype = key.keytype
541 if keytype in (kcrypto.Enctype.AES256,
542 kcrypto.Enctype.AES128):
543 keys[keytype] = key.value.hex()
544 elif attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
545 net_ctx.replicate_decrypt(bind, attr, rid)
546 pwd = attr.value_ctr.values[0].blob
547 keys[kcrypto.Enctype.RC4] = pwd.hex()
549 default_enctypes = self.get_default_enctypes()
551 self.assertCountEqual(default_enctypes, keys)
553 return keys
555 def creds_set_keys(self, creds, keys):
556 if keys is not None:
557 for enctype, key in keys.items():
558 creds.set_forced_key(enctype, key)
560 def creds_set_enctypes(self, creds):
561 samdb = self.get_samdb()
563 res = samdb.search(creds.get_dn(),
564 scope=ldb.SCOPE_BASE,
565 attrs=['msDS-SupportedEncryptionTypes'])
566 supported_enctypes = res[0].get('msDS-SupportedEncryptionTypes', idx=0)
568 if supported_enctypes is None:
569 supported_enctypes = 0
571 creds.set_as_supported_enctypes(supported_enctypes)
572 creds.set_tgs_supported_enctypes(supported_enctypes)
573 creds.set_ap_supported_enctypes(supported_enctypes)
575 def creds_set_default_enctypes(self, creds, fast_support=False):
576 default_enctypes = self.get_default_enctypes()
577 supported_enctypes = KerberosCredentials.etypes_to_bits(
578 default_enctypes)
580 if fast_support:
581 supported_enctypes |= KerberosCredentials.fast_supported_bits
583 creds.set_as_supported_enctypes(supported_enctypes)
584 creds.set_tgs_supported_enctypes(supported_enctypes)
585 creds.set_ap_supported_enctypes(supported_enctypes)
587 def add_to_group(self, account_dn, group_dn, group_attr):
588 samdb = self.get_samdb()
590 res = samdb.search(base=group_dn,
591 scope=ldb.SCOPE_BASE,
592 attrs=[group_attr])
593 orig_msg = res[0]
594 self.assertIn(group_attr, orig_msg)
596 members = list(orig_msg[group_attr])
597 members.append(account_dn)
599 msg = ldb.Message()
600 msg.dn = group_dn
601 msg[group_attr] = ldb.MessageElement(members,
602 ldb.FLAG_MOD_REPLACE,
603 group_attr)
605 cleanup = samdb.msg_diff(msg, orig_msg)
606 self.ldb_cleanups.append(cleanup)
607 samdb.modify(msg)
609 return cleanup
611 def get_cached_creds(self, *,
612 machine_account,
613 opts=None):
614 if opts is None:
615 opts = {}
617 opts_default = {
618 'allowed_replication': False,
619 'allowed_replication_mock': False,
620 'denied_replication': False,
621 'denied_replication_mock': False,
622 'revealed_to_rodc': False,
623 'revealed_to_mock_rodc': False,
624 'no_auth_data_required': False,
625 'supported_enctypes': None,
626 'not_delegated': False,
627 'delegation_to_spn': None,
628 'delegation_from_dn': None,
629 'trusted_to_auth_for_delegation': False,
630 'fast_support': False
633 account_opts = {
634 'machine_account': machine_account,
635 **opts_default,
636 **opts
639 cache_key = tuple(sorted(account_opts.items()))
641 creds = self.account_cache.get(cache_key)
642 if creds is None:
643 creds = self.create_account_opts(**account_opts)
644 self.account_cache[cache_key] = creds
646 return creds
648 def create_account_opts(self, *,
649 machine_account,
650 allowed_replication,
651 allowed_replication_mock,
652 denied_replication,
653 denied_replication_mock,
654 revealed_to_rodc,
655 revealed_to_mock_rodc,
656 no_auth_data_required,
657 supported_enctypes,
658 not_delegated,
659 delegation_to_spn,
660 delegation_from_dn,
661 trusted_to_auth_for_delegation,
662 fast_support):
663 if machine_account:
664 self.assertFalse(not_delegated)
665 else:
666 self.assertIsNone(delegation_to_spn)
667 self.assertIsNone(delegation_from_dn)
668 self.assertFalse(trusted_to_auth_for_delegation)
670 samdb = self.get_samdb()
671 rodc_samdb = self.get_rodc_samdb()
673 rodc_dn = self.get_server_dn(rodc_samdb)
675 user_name = self.account_base + str(self.account_id)
676 type(self).account_id += 1
678 user_account_control = 0
679 if trusted_to_auth_for_delegation:
680 user_account_control |= UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
681 if not_delegated:
682 user_account_control |= UF_NOT_DELEGATED
683 if no_auth_data_required:
684 user_account_control |= UF_NO_AUTH_DATA_REQUIRED
686 details = {}
688 enctypes = supported_enctypes
689 if fast_support:
690 enctypes = enctypes or 0
691 enctypes |= KerberosCredentials.fast_supported_bits
693 if enctypes is not None:
694 details['msDS-SupportedEncryptionTypes'] = str(enctypes)
696 if delegation_to_spn:
697 details['msDS-AllowedToDelegateTo'] = delegation_to_spn
699 if delegation_from_dn:
700 security_descriptor = self.get_security_descriptor(
701 delegation_from_dn)
702 details['msDS-AllowedToActOnBehalfOfOtherIdentity'] = (
703 security_descriptor)
705 if machine_account:
706 spn = 'host/' + user_name
707 else:
708 spn = None
710 creds, dn = self.create_account(samdb, user_name,
711 machine_account=machine_account,
712 spn=spn,
713 additional_details=details,
714 account_control=user_account_control)
716 keys = self.get_keys(samdb, dn)
717 self.creds_set_keys(creds, keys)
719 # Handle secret replication to the RODC.
721 if allowed_replication or revealed_to_rodc:
722 # Allow replicating this account's secrets if requested, or allow
723 # it only temporarily if we're about to replicate them.
724 allowed_cleanup = self.add_to_group(
725 dn, rodc_dn,
726 'msDS-RevealOnDemandGroup')
728 if revealed_to_rodc:
729 # Replicate this account's secrets to the RODC.
730 self.replicate_account_to_rodc(dn)
732 if not allowed_replication:
733 # If we don't want replicating secrets to be allowed for this
734 # account, disable it again.
735 samdb.modify(allowed_cleanup)
737 self.check_revealed(dn,
738 rodc_dn,
739 revealed=revealed_to_rodc)
741 if denied_replication:
742 # Deny replicating this account's secrets to the RODC.
743 self.add_to_group(dn, rodc_dn, 'msDS-NeverRevealGroup')
745 # Handle secret replication to the mock RODC.
747 if allowed_replication_mock or revealed_to_mock_rodc:
748 # Allow replicating this account's secrets if requested, or allow
749 # it only temporarily if we want to add the account to the mock
750 # RODC's msDS-RevealedUsers.
751 rodc_ctx = self.get_mock_rodc_ctx()
752 mock_rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
754 allowed_mock_cleanup = self.add_to_group(
755 dn, mock_rodc_dn,
756 'msDS-RevealOnDemandGroup')
758 if revealed_to_mock_rodc:
759 # Request replicating this account's secrets to the mock RODC,
760 # which updates msDS-RevealedUsers.
761 self.reveal_account_to_mock_rodc(dn)
763 if not allowed_replication_mock:
764 # If we don't want replicating secrets to be allowed for this
765 # account, disable it again.
766 samdb.modify(allowed_mock_cleanup)
768 self.check_revealed(dn,
769 mock_rodc_dn,
770 revealed=revealed_to_mock_rodc)
772 if denied_replication_mock:
773 # Deny replicating this account's secrets to the mock RODC.
774 rodc_ctx = self.get_mock_rodc_ctx()
775 mock_rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
777 self.add_to_group(dn, mock_rodc_dn, 'msDS-NeverRevealGroup')
779 return creds
781 def get_client_creds(self,
782 allow_missing_password=False,
783 allow_missing_keys=True):
784 def create_client_account():
785 return self.get_cached_creds(machine_account=False)
787 c = self._get_krb5_creds(prefix='CLIENT',
788 allow_missing_password=allow_missing_password,
789 allow_missing_keys=allow_missing_keys,
790 fallback_creds_fn=create_client_account)
791 return c
793 def get_mach_creds(self,
794 allow_missing_password=False,
795 allow_missing_keys=True):
796 def create_mach_account():
797 return self.get_cached_creds(machine_account=True,
798 opts={'fast_support': True})
800 c = self._get_krb5_creds(prefix='MAC',
801 allow_missing_password=allow_missing_password,
802 allow_missing_keys=allow_missing_keys,
803 fallback_creds_fn=create_mach_account)
804 return c
806 def get_service_creds(self,
807 allow_missing_password=False,
808 allow_missing_keys=True):
809 def create_service_account():
810 return self.get_cached_creds(
811 machine_account=True,
812 opts={
813 'trusted_to_auth_for_delegation': True,
814 'fast_support': True
817 c = self._get_krb5_creds(prefix='SERVICE',
818 allow_missing_password=allow_missing_password,
819 allow_missing_keys=allow_missing_keys,
820 fallback_creds_fn=create_service_account)
821 return c
823 def get_rodc_krbtgt_creds(self,
824 require_keys=True,
825 require_strongest_key=False):
826 if require_strongest_key:
827 self.assertTrue(require_keys)
829 def download_rodc_krbtgt_creds():
830 samdb = self.get_samdb()
831 rodc_samdb = self.get_rodc_samdb()
833 rodc_dn = self.get_server_dn(rodc_samdb)
835 res = samdb.search(rodc_dn,
836 scope=ldb.SCOPE_BASE,
837 attrs=['msDS-KrbTgtLink'])
838 krbtgt_dn = res[0]['msDS-KrbTgtLink'][0]
840 res = samdb.search(krbtgt_dn,
841 scope=ldb.SCOPE_BASE,
842 attrs=['sAMAccountName',
843 'msDS-KeyVersionNumber',
844 'msDS-SecondaryKrbTgtNumber'])
845 krbtgt_dn = res[0].dn
846 username = str(res[0]['sAMAccountName'])
848 creds = KerberosCredentials()
849 creds.set_domain(self.env_get_var('DOMAIN', 'RODC_KRBTGT'))
850 creds.set_realm(self.env_get_var('REALM', 'RODC_KRBTGT'))
851 creds.set_username(username)
853 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
854 krbtgt_number = int(res[0]['msDS-SecondaryKrbTgtNumber'][0])
856 rodc_kvno = krbtgt_number << 16 | kvno
857 creds.set_kvno(rodc_kvno)
858 creds.set_dn(krbtgt_dn)
860 keys = self.get_keys(samdb, krbtgt_dn)
861 self.creds_set_keys(creds, keys)
863 # The RODC krbtgt account should support the default enctypes,
864 # although it might not have the msDS-SupportedEncryptionTypes
865 # attribute.
866 self.creds_set_default_enctypes(creds)
868 return creds
870 c = self._get_krb5_creds(prefix='RODC_KRBTGT',
871 allow_missing_password=True,
872 allow_missing_keys=not require_keys,
873 require_strongest_key=require_strongest_key,
874 fallback_creds_fn=download_rodc_krbtgt_creds)
875 return c
877 def get_mock_rodc_krbtgt_creds(self,
878 require_keys=True,
879 require_strongest_key=False):
880 if require_strongest_key:
881 self.assertTrue(require_keys)
883 def create_rodc_krbtgt_account():
884 samdb = self.get_samdb()
886 rodc_ctx = self.get_mock_rodc_ctx()
888 krbtgt_dn = rodc_ctx.new_krbtgt_dn
890 res = samdb.search(base=ldb.Dn(samdb, krbtgt_dn),
891 scope=ldb.SCOPE_BASE,
892 attrs=['msDS-KeyVersionNumber',
893 'msDS-SecondaryKrbTgtNumber'])
894 dn = res[0].dn
895 username = str(rodc_ctx.krbtgt_name)
897 creds = KerberosCredentials()
898 creds.set_domain(self.env_get_var('DOMAIN', 'RODC_KRBTGT'))
899 creds.set_realm(self.env_get_var('REALM', 'RODC_KRBTGT'))
900 creds.set_username(username)
902 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
903 krbtgt_number = int(res[0]['msDS-SecondaryKrbTgtNumber'][0])
905 rodc_kvno = krbtgt_number << 16 | kvno
906 creds.set_kvno(rodc_kvno)
907 creds.set_dn(dn)
909 keys = self.get_keys(samdb, dn)
910 self.creds_set_keys(creds, keys)
912 self.creds_set_enctypes(creds)
914 return creds
916 c = self._get_krb5_creds(prefix='MOCK_RODC_KRBTGT',
917 allow_missing_password=True,
918 allow_missing_keys=not require_keys,
919 require_strongest_key=require_strongest_key,
920 fallback_creds_fn=create_rodc_krbtgt_account)
921 return c
923 def get_krbtgt_creds(self,
924 require_keys=True,
925 require_strongest_key=False):
926 if require_strongest_key:
927 self.assertTrue(require_keys)
929 def download_krbtgt_creds():
930 samdb = self.get_samdb()
932 krbtgt_rid = security.DOMAIN_RID_KRBTGT
933 krbtgt_sid = '%s-%d' % (samdb.get_domain_sid(), krbtgt_rid)
935 res = samdb.search(base='<SID=%s>' % krbtgt_sid,
936 scope=ldb.SCOPE_BASE,
937 attrs=['sAMAccountName',
938 'msDS-KeyVersionNumber'])
939 dn = res[0].dn
940 username = str(res[0]['sAMAccountName'])
942 creds = KerberosCredentials()
943 creds.set_domain(self.env_get_var('DOMAIN', 'KRBTGT'))
944 creds.set_realm(self.env_get_var('REALM', 'KRBTGT'))
945 creds.set_username(username)
947 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
948 creds.set_kvno(kvno)
949 creds.set_dn(dn)
951 keys = self.get_keys(samdb, dn)
952 self.creds_set_keys(creds, keys)
954 # The krbtgt account should support the default enctypes, although
955 # it might not (on Samba) have the msDS-SupportedEncryptionTypes
956 # attribute.
957 self.creds_set_default_enctypes(creds,
958 fast_support=self.kdc_fast_support)
960 return creds
962 c = self._get_krb5_creds(prefix='KRBTGT',
963 default_username='krbtgt',
964 allow_missing_password=True,
965 allow_missing_keys=not require_keys,
966 require_strongest_key=require_strongest_key,
967 fallback_creds_fn=download_krbtgt_creds)
968 return c
970 def get_dc_creds(self,
971 require_keys=True,
972 require_strongest_key=False):
973 if require_strongest_key:
974 self.assertTrue(require_keys)
976 def download_dc_creds():
977 samdb = self.get_samdb()
979 dc_rid = 1000
980 dc_sid = '%s-%d' % (samdb.get_domain_sid(), dc_rid)
982 res = samdb.search(base='<SID=%s>' % dc_sid,
983 scope=ldb.SCOPE_BASE,
984 attrs=['sAMAccountName',
985 'msDS-KeyVersionNumber'])
986 dn = res[0].dn
987 username = str(res[0]['sAMAccountName'])
989 creds = KerberosCredentials()
990 creds.set_domain(self.env_get_var('DOMAIN', 'DC'))
991 creds.set_realm(self.env_get_var('REALM', 'DC'))
992 creds.set_username(username)
994 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
995 creds.set_kvno(kvno)
996 creds.set_dn(dn)
998 keys = self.get_keys(samdb, dn)
999 self.creds_set_keys(creds, keys)
1001 self.creds_set_enctypes(creds)
1003 return creds
1005 c = self._get_krb5_creds(prefix='DC',
1006 allow_missing_password=True,
1007 allow_missing_keys=not require_keys,
1008 require_strongest_key=require_strongest_key,
1009 fallback_creds_fn=download_dc_creds)
1010 return c
1012 def as_req(self, cname, sname, realm, etypes, padata=None, kdc_options=0):
1013 '''Send a Kerberos AS_REQ, returns the undecoded response
1016 till = self.get_KerberosTime(offset=36000)
1018 req = self.AS_REQ_create(padata=padata,
1019 kdc_options=str(kdc_options),
1020 cname=cname,
1021 realm=realm,
1022 sname=sname,
1023 from_time=None,
1024 till_time=till,
1025 renew_time=None,
1026 nonce=0x7fffffff,
1027 etypes=etypes,
1028 addresses=None,
1029 additional_tickets=None)
1030 rep = self.send_recv_transaction(req)
1031 return rep
1033 def get_as_rep_key(self, creds, rep):
1034 '''Extract the session key from an AS-REP
1036 rep_padata = self.der_decode(
1037 rep['e-data'],
1038 asn1Spec=krb5_asn1.METHOD_DATA())
1040 for pa in rep_padata:
1041 if pa['padata-type'] == PADATA_ETYPE_INFO2:
1042 padata_value = pa['padata-value']
1043 break
1045 etype_info2 = self.der_decode(
1046 padata_value, asn1Spec=krb5_asn1.ETYPE_INFO2())
1048 key = self.PasswordKey_from_etype_info2(creds, etype_info2[0],
1049 creds.get_kvno())
1050 return key
1052 def get_enc_timestamp_pa_data(self, creds, rep, skew=0):
1053 '''generate the pa_data data element for an AS-REQ
1056 key = self.get_as_rep_key(creds, rep)
1058 return self.get_enc_timestamp_pa_data_from_key(key, skew=skew)
1060 def get_enc_timestamp_pa_data_from_key(self, key, skew=0):
1061 (patime, pausec) = self.get_KerberosTimeWithUsec(offset=skew)
1062 padata = self.PA_ENC_TS_ENC_create(patime, pausec)
1063 padata = self.der_encode(padata, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
1065 padata = self.EncryptedData_create(key, KU_PA_ENC_TIMESTAMP, padata)
1066 padata = self.der_encode(padata, asn1Spec=krb5_asn1.EncryptedData())
1068 padata = self.PA_DATA_create(PADATA_ENC_TIMESTAMP, padata)
1070 return padata
1072 def get_challenge_pa_data(self, client_challenge_key, skew=0):
1073 patime, pausec = self.get_KerberosTimeWithUsec(offset=skew)
1074 padata = self.PA_ENC_TS_ENC_create(patime, pausec)
1075 padata = self.der_encode(padata,
1076 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
1078 padata = self.EncryptedData_create(client_challenge_key,
1079 KU_ENC_CHALLENGE_CLIENT,
1080 padata)
1081 padata = self.der_encode(padata,
1082 asn1Spec=krb5_asn1.EncryptedData())
1084 padata = self.PA_DATA_create(PADATA_ENCRYPTED_CHALLENGE,
1085 padata)
1087 return padata
1089 def get_as_rep_enc_data(self, key, rep):
1090 ''' Decrypt and Decode the encrypted data in an AS-REP
1092 enc_part = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
1093 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
1094 # application tag 26
1095 try:
1096 enc_part = self.der_decode(
1097 enc_part, asn1Spec=krb5_asn1.EncASRepPart())
1098 except Exception:
1099 enc_part = self.der_decode(
1100 enc_part, asn1Spec=krb5_asn1.EncTGSRepPart())
1102 return enc_part
1104 def check_pre_authentication(self, rep):
1105 """ Check that the kdc response was pre-authentication required
1107 self.check_error_rep(rep, KDC_ERR_PREAUTH_REQUIRED)
1109 def check_as_reply(self, rep):
1110 """ Check that the kdc response is an AS-REP and that the
1111 values for:
1112 msg-type
1113 pvno
1114 tkt-pvno
1115 kvno
1116 match the expected values
1118 self.check_reply(rep, msg_type=KRB_AS_REP)
1120 def check_tgs_reply(self, rep):
1121 """ Check that the kdc response is an TGS-REP and that the
1122 values for:
1123 msg-type
1124 pvno
1125 tkt-pvno
1126 kvno
1127 match the expected values
1129 self.check_reply(rep, msg_type=KRB_TGS_REP)
1131 def check_reply(self, rep, msg_type):
1133 # Should have a reply, and it should an TGS-REP message.
1134 self.assertIsNotNone(rep)
1135 self.assertEqual(rep['msg-type'], msg_type, "rep = {%s}" % rep)
1137 # Protocol version number should be 5
1138 pvno = int(rep['pvno'])
1139 self.assertEqual(5, pvno, "rep = {%s}" % rep)
1141 # The ticket version number should be 5
1142 tkt_vno = int(rep['ticket']['tkt-vno'])
1143 self.assertEqual(5, tkt_vno, "rep = {%s}" % rep)
1145 # Check that the kvno is not an RODC kvno
1146 # MIT kerberos does not provide the kvno, so we treat it as optional.
1147 # This is tested in compatability_test.py
1148 if 'kvno' in rep['enc-part']:
1149 kvno = int(rep['enc-part']['kvno'])
1150 # If the high order bits are set this is an RODC kvno.
1151 self.assertEqual(0, kvno & 0xFFFF0000, "rep = {%s}" % rep)
1153 def check_error_rep(self, rep, expected):
1154 """ Check that the reply is an error message, with the expected
1155 error-code specified.
1157 self.assertIsNotNone(rep)
1158 self.assertEqual(rep['msg-type'], KRB_ERROR, "rep = {%s}" % rep)
1159 if isinstance(expected, collections.abc.Container):
1160 self.assertIn(rep['error-code'], expected, "rep = {%s}" % rep)
1161 else:
1162 self.assertEqual(rep['error-code'], expected, "rep = {%s}" % rep)
1164 def tgs_req(self, cname, sname, realm, ticket, key, etypes,
1165 expected_error_mode=0, padata=None, kdc_options=0,
1166 to_rodc=False, service_creds=None, expect_pac=True,
1167 expect_edata=None, expected_flags=None, unexpected_flags=None):
1168 '''Send a TGS-REQ, returns the response and the decrypted and
1169 decoded enc-part
1172 subkey = self.RandomKey(key.etype)
1174 (ctime, cusec) = self.get_KerberosTimeWithUsec()
1176 tgt = KerberosTicketCreds(ticket,
1177 key,
1178 crealm=realm,
1179 cname=cname)
1181 if service_creds is not None:
1182 decryption_key = self.TicketDecryptionKey_from_creds(
1183 service_creds)
1184 else:
1185 decryption_key = None
1187 if not expected_error_mode:
1188 check_error_fn = None
1189 check_rep_fn = self.generic_check_kdc_rep
1190 else:
1191 check_error_fn = self.generic_check_kdc_error
1192 check_rep_fn = None
1194 def generate_padata(_kdc_exchange_dict,
1195 _callback_dict,
1196 req_body):
1198 return padata, req_body
1200 kdc_exchange_dict = self.tgs_exchange_dict(
1201 expected_crealm=realm,
1202 expected_cname=cname,
1203 expected_srealm=realm,
1204 expected_sname=sname,
1205 expected_error_mode=expected_error_mode,
1206 expected_flags=expected_flags,
1207 unexpected_flags=unexpected_flags,
1208 check_error_fn=check_error_fn,
1209 check_rep_fn=check_rep_fn,
1210 check_kdc_private_fn=self.generic_check_kdc_private,
1211 ticket_decryption_key=decryption_key,
1212 generate_padata_fn=generate_padata if padata is not None else None,
1213 tgt=tgt,
1214 authenticator_subkey=subkey,
1215 kdc_options=str(kdc_options),
1216 expect_edata=expect_edata,
1217 expect_pac=expect_pac,
1218 to_rodc=to_rodc)
1220 rep = self._generic_kdc_exchange(kdc_exchange_dict,
1221 cname=None,
1222 realm=realm,
1223 sname=sname,
1224 etypes=etypes)
1226 if expected_error_mode:
1227 enc_part = None
1228 else:
1229 ticket_creds = kdc_exchange_dict['rep_ticket_creds']
1230 enc_part = ticket_creds.encpart_private
1232 return rep, enc_part
1234 def get_service_ticket(self, tgt, target_creds, service='host',
1235 to_rodc=False, kdc_options=None,
1236 expected_flags=None, unexpected_flags=None,
1237 fresh=False):
1238 user_name = tgt.cname['name-string'][0]
1239 target_name = target_creds.get_username()
1240 cache_key = (user_name, target_name, service, to_rodc, kdc_options)
1242 if not fresh:
1243 ticket = self.tkt_cache.get(cache_key)
1245 if ticket is not None:
1246 return ticket
1248 etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
1250 if kdc_options is None:
1251 kdc_options = '0'
1252 kdc_options = krb5_asn1.KDCOptions(kdc_options)
1254 key = tgt.session_key
1255 ticket = tgt.ticket
1257 cname = tgt.cname
1258 realm = tgt.crealm
1260 target_name = target_creds.get_username()[:-1]
1261 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1262 names=[service, target_name])
1264 rep, enc_part = self.tgs_req(cname, sname, realm, ticket, key, etype,
1265 to_rodc=to_rodc,
1266 service_creds=target_creds,
1267 kdc_options=kdc_options,
1268 expected_flags=expected_flags,
1269 unexpected_flags=unexpected_flags)
1271 service_ticket = rep['ticket']
1273 ticket_etype = service_ticket['enc-part']['etype']
1274 target_key = self.TicketDecryptionKey_from_creds(target_creds,
1275 etype=ticket_etype)
1277 session_key = self.EncryptionKey_import(enc_part['key'])
1279 service_ticket_creds = KerberosTicketCreds(service_ticket,
1280 session_key,
1281 crealm=realm,
1282 cname=cname,
1283 srealm=realm,
1284 sname=sname,
1285 decryption_key=target_key)
1287 self.tkt_cache[cache_key] = service_ticket_creds
1289 return service_ticket_creds
1291 def get_tgt(self, creds, to_rodc=False, kdc_options=None,
1292 expected_flags=None, unexpected_flags=None,
1293 fresh=False):
1294 user_name = creds.get_username()
1295 cache_key = (user_name, to_rodc, kdc_options)
1297 if not fresh:
1298 tgt = self.tkt_cache.get(cache_key)
1300 if tgt is not None:
1301 return tgt
1303 realm = creds.get_realm()
1305 salt = creds.get_salt()
1307 etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
1308 cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1309 names=[user_name])
1310 sname = self.PrincipalName_create(name_type=NT_SRV_INST,
1311 names=['krbtgt', realm])
1313 till = self.get_KerberosTime(offset=36000)
1315 if to_rodc:
1316 krbtgt_creds = self.get_rodc_krbtgt_creds()
1317 else:
1318 krbtgt_creds = self.get_krbtgt_creds()
1319 ticket_decryption_key = (
1320 self.TicketDecryptionKey_from_creds(krbtgt_creds))
1322 if kdc_options is None:
1323 kdc_options = ('forwardable,'
1324 'renewable,'
1325 'canonicalize,'
1326 'renewable-ok')
1327 kdc_options = krb5_asn1.KDCOptions(kdc_options)
1329 pac_options = '1' # supports claims
1331 rep, kdc_exchange_dict = self._test_as_exchange(
1332 cname=cname,
1333 realm=realm,
1334 sname=sname,
1335 till=till,
1336 client_as_etypes=etype,
1337 expected_error_mode=KDC_ERR_PREAUTH_REQUIRED,
1338 expected_crealm=realm,
1339 expected_cname=cname,
1340 expected_srealm=realm,
1341 expected_sname=sname,
1342 expected_salt=salt,
1343 expected_flags=expected_flags,
1344 unexpected_flags=unexpected_flags,
1345 etypes=etype,
1346 padata=None,
1347 kdc_options=kdc_options,
1348 preauth_key=None,
1349 ticket_decryption_key=ticket_decryption_key,
1350 pac_request=True,
1351 pac_options=pac_options,
1352 to_rodc=to_rodc)
1353 self.check_pre_authentication(rep)
1355 etype_info2 = kdc_exchange_dict['preauth_etype_info2']
1357 preauth_key = self.PasswordKey_from_etype_info2(creds,
1358 etype_info2[0],
1359 creds.get_kvno())
1361 ts_enc_padata = self.get_enc_timestamp_pa_data(creds, rep)
1363 padata = [ts_enc_padata]
1365 expected_realm = realm.upper()
1367 expected_sname = self.PrincipalName_create(
1368 name_type=NT_SRV_INST, names=['krbtgt', realm.upper()])
1370 expected_etypes = krbtgt_creds.tgs_supported_enctypes
1372 rep, kdc_exchange_dict = self._test_as_exchange(
1373 cname=cname,
1374 realm=realm,
1375 sname=sname,
1376 till=till,
1377 client_as_etypes=etype,
1378 expected_error_mode=0,
1379 expected_crealm=expected_realm,
1380 expected_cname=cname,
1381 expected_srealm=expected_realm,
1382 expected_sname=expected_sname,
1383 expected_salt=salt,
1384 expected_flags=expected_flags,
1385 unexpected_flags=unexpected_flags,
1386 expected_supported_etypes=expected_etypes,
1387 etypes=etype,
1388 padata=padata,
1389 kdc_options=kdc_options,
1390 preauth_key=preauth_key,
1391 ticket_decryption_key=ticket_decryption_key,
1392 pac_request=True,
1393 pac_options=pac_options,
1394 to_rodc=to_rodc)
1395 self.check_as_reply(rep)
1397 ticket_creds = kdc_exchange_dict['rep_ticket_creds']
1399 self.tkt_cache[cache_key] = ticket_creds
1401 return ticket_creds
1403 # Named tuple to contain values of interest when the PAC is decoded.
1404 PacData = namedtuple(
1405 "PacData",
1406 "account_name account_sid logon_name upn domain_name")
1408 def get_pac_data(self, authorization_data):
1409 '''Decode the PAC element contained in the authorization-data element
1411 account_name = None
1412 user_sid = None
1413 logon_name = None
1414 upn = None
1415 domain_name = None
1417 # The PAC data will be wrapped in an AD_IF_RELEVANT element
1418 ad_if_relevant_elements = (
1419 x for x in authorization_data if x['ad-type'] == AD_IF_RELEVANT)
1420 for dt in ad_if_relevant_elements:
1421 buf = self.der_decode(
1422 dt['ad-data'], asn1Spec=krb5_asn1.AD_IF_RELEVANT())
1423 # The PAC data is further wrapped in a AD_WIN2K_PAC element
1424 for ad in (x for x in buf if x['ad-type'] == AD_WIN2K_PAC):
1425 pb = ndr_unpack(krb5pac.PAC_DATA, ad['ad-data'])
1426 for pac in pb.buffers:
1427 if pac.type == krb5pac.PAC_TYPE_LOGON_INFO:
1428 account_name = (
1429 pac.info.info.info3.base.account_name)
1430 user_sid = (
1431 str(pac.info.info.info3.base.domain_sid)
1432 + "-" + str(pac.info.info.info3.base.rid))
1433 elif pac.type == krb5pac.PAC_TYPE_LOGON_NAME:
1434 logon_name = pac.info.account_name
1435 elif pac.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
1436 upn = pac.info.upn_name
1437 domain_name = pac.info.dns_domain_name
1439 return self.PacData(
1440 account_name,
1441 user_sid,
1442 logon_name,
1443 upn,
1444 domain_name)
1446 def decode_service_ticket(self, creds, ticket):
1447 '''Decrypt and decode a service ticket
1450 name = creds.get_username()
1451 if name.endswith('$'):
1452 name = name[:-1]
1453 realm = creds.get_realm()
1454 salt = "%s.%s@%s" % (name, realm.lower(), realm.upper())
1456 key = self.PasswordKey_create(
1457 ticket['enc-part']['etype'],
1458 creds.get_password(),
1459 salt,
1460 ticket['enc-part']['kvno'])
1462 enc_part = key.decrypt(KU_TICKET, ticket['enc-part']['cipher'])
1463 enc_ticket_part = self.der_decode(
1464 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
1465 return enc_ticket_part
1467 def get_objectSid(self, samdb, dn):
1468 ''' Get the objectSID for a DN
1469 Note: performs an Ldb query.
1471 res = samdb.search(dn, scope=SCOPE_BASE, attrs=["objectSID"])
1472 self.assertTrue(len(res) == 1, "did not get objectSid for %s" % dn)
1473 sid = samdb.schema_format_value("objectSID", res[0]["objectSID"][0])
1474 return sid.decode('utf8')
1476 def add_attribute(self, samdb, dn_str, name, value):
1477 if isinstance(value, list):
1478 values = value
1479 else:
1480 values = [value]
1481 flag = ldb.FLAG_MOD_ADD
1483 dn = ldb.Dn(samdb, dn_str)
1484 msg = ldb.Message(dn)
1485 msg[name] = ldb.MessageElement(values, flag, name)
1486 samdb.modify(msg)
1488 def modify_attribute(self, samdb, dn_str, name, value):
1489 if isinstance(value, list):
1490 values = value
1491 else:
1492 values = [value]
1493 flag = ldb.FLAG_MOD_REPLACE
1495 dn = ldb.Dn(samdb, dn_str)
1496 msg = ldb.Message(dn)
1497 msg[name] = ldb.MessageElement(values, flag, name)
1498 samdb.modify(msg)
1500 def create_ccache(self, cname, ticket, enc_part):
1501 """ Lay out a version 4 on-disk credentials cache, to be read using the
1502 FILE: protocol.
1505 field = krb5ccache.DELTATIME_TAG()
1506 field.kdc_sec_offset = 0
1507 field.kdc_usec_offset = 0
1509 v4tag = krb5ccache.V4TAG()
1510 v4tag.tag = 1
1511 v4tag.field = field
1513 v4tags = krb5ccache.V4TAGS()
1514 v4tags.tag = v4tag
1515 v4tags.further_tags = b''
1517 optional_header = krb5ccache.V4HEADER()
1518 optional_header.v4tags = v4tags
1520 cname_string = cname['name-string']
1522 cprincipal = krb5ccache.PRINCIPAL()
1523 cprincipal.name_type = cname['name-type']
1524 cprincipal.component_count = len(cname_string)
1525 cprincipal.realm = ticket['realm']
1526 cprincipal.components = cname_string
1528 sname = ticket['sname']
1529 sname_string = sname['name-string']
1531 sprincipal = krb5ccache.PRINCIPAL()
1532 sprincipal.name_type = sname['name-type']
1533 sprincipal.component_count = len(sname_string)
1534 sprincipal.realm = ticket['realm']
1535 sprincipal.components = sname_string
1537 key = self.EncryptionKey_import(enc_part['key'])
1539 key_data = key.export_obj()
1540 keyblock = krb5ccache.KEYBLOCK()
1541 keyblock.enctype = key_data['keytype']
1542 keyblock.data = key_data['keyvalue']
1544 addresses = krb5ccache.ADDRESSES()
1545 addresses.count = 0
1546 addresses.data = []
1548 authdata = krb5ccache.AUTHDATA()
1549 authdata.count = 0
1550 authdata.data = []
1552 # Re-encode the ticket, since it was decoded by another layer.
1553 ticket_data = self.der_encode(ticket, asn1Spec=krb5_asn1.Ticket())
1555 authtime = enc_part['authtime']
1556 starttime = enc_part.get('starttime', authtime)
1557 endtime = enc_part['endtime']
1559 cred = krb5ccache.CREDENTIAL()
1560 cred.client = cprincipal
1561 cred.server = sprincipal
1562 cred.keyblock = keyblock
1563 cred.authtime = self.get_EpochFromKerberosTime(authtime)
1564 cred.starttime = self.get_EpochFromKerberosTime(starttime)
1565 cred.endtime = self.get_EpochFromKerberosTime(endtime)
1567 # Account for clock skew of up to five minutes.
1568 self.assertLess(cred.authtime - 5 * 60,
1569 datetime.now(timezone.utc).timestamp(),
1570 "Ticket not yet valid - clocks may be out of sync.")
1571 self.assertLess(cred.starttime - 5 * 60,
1572 datetime.now(timezone.utc).timestamp(),
1573 "Ticket not yet valid - clocks may be out of sync.")
1574 self.assertGreater(cred.endtime - 60 * 60,
1575 datetime.now(timezone.utc).timestamp(),
1576 "Ticket already expired/about to expire - "
1577 "clocks may be out of sync.")
1579 cred.renew_till = cred.endtime
1580 cred.is_skey = 0
1581 cred.ticket_flags = int(enc_part['flags'], 2)
1582 cred.addresses = addresses
1583 cred.authdata = authdata
1584 cred.ticket = ticket_data
1585 cred.second_ticket = b''
1587 ccache = krb5ccache.CCACHE()
1588 ccache.pvno = 5
1589 ccache.version = 4
1590 ccache.optional_header = optional_header
1591 ccache.principal = cprincipal
1592 ccache.cred = cred
1594 # Serialise the credentials cache structure.
1595 result = ndr_pack(ccache)
1597 # Create a temporary file and write the credentials.
1598 cachefile = tempfile.NamedTemporaryFile(dir=self.tempdir, delete=False)
1599 cachefile.write(result)
1600 cachefile.close()
1602 return cachefile
1604 def create_ccache_with_user(self, user_credentials, mach_name,
1605 service="host"):
1606 # Obtain a service ticket authorising the user and place it into a
1607 # newly created credentials cache file.
1609 user_name = user_credentials.get_username()
1610 realm = user_credentials.get_realm()
1612 # Do the initial AS-REQ, should get a pre-authentication required
1613 # response
1614 etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
1615 cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1616 names=[user_name])
1617 sname = self.PrincipalName_create(name_type=NT_SRV_HST,
1618 names=["krbtgt", realm])
1620 rep = self.as_req(cname, sname, realm, etype)
1621 self.check_pre_authentication(rep)
1623 # Do the next AS-REQ
1624 padata = self.get_enc_timestamp_pa_data(user_credentials, rep)
1625 key = self.get_as_rep_key(user_credentials, rep)
1626 rep = self.as_req(cname, sname, realm, etype, padata=[padata])
1627 self.check_as_reply(rep)
1629 # Request a ticket to the host service on the machine account
1630 ticket = rep['ticket']
1631 enc_part = self.get_as_rep_enc_data(key, rep)
1632 key = self.EncryptionKey_import(enc_part['key'])
1633 cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1634 names=[user_name])
1635 sname = self.PrincipalName_create(name_type=NT_SRV_HST,
1636 names=[service, mach_name])
1638 (rep, enc_part) = self.tgs_req(
1639 cname, sname, realm, ticket, key, etype)
1640 self.check_tgs_reply(rep)
1641 key = self.EncryptionKey_import(enc_part['key'])
1643 # Check the contents of the pac, and the ticket
1644 ticket = rep['ticket']
1646 # Write the ticket into a credentials cache file that can be ingested
1647 # by the main credentials code.
1648 cachefile = self.create_ccache(cname, ticket, enc_part)
1650 # Create a credentials object to reference the credentials cache.
1651 creds = Credentials()
1652 creds.set_kerberos_state(MUST_USE_KERBEROS)
1653 creds.set_username(user_name, SPECIFIED)
1654 creds.set_realm(realm)
1655 creds.set_named_ccache(cachefile.name, SPECIFIED, self.get_lp())
1657 # Return the credentials along with the cache file.
1658 return (creds, cachefile)