tests/krb5: Allow specifying additional User Account Control flags for account
[Samba.git] / python / samba / tests / krb5 / kdc_base_test.py
blobbd5bacfaca19085d1cca486df5e2000bc9bcdc26
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Stefan Metzmacher 2020
3 # Copyright (C) 2020-2021 Catalyst.Net Ltd
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import os
21 from datetime import datetime, timezone
22 import tempfile
23 import binascii
24 import collections
26 from collections import namedtuple
27 import ldb
28 from ldb import SCOPE_BASE
29 from samba import generate_random_password
30 from samba.auth import system_session
31 from samba.credentials import Credentials, SPECIFIED, MUST_USE_KERBEROS
32 from samba.dcerpc import drsblobs, drsuapi, misc, krb5pac, krb5ccache, security
33 from samba.drs_utils import drsuapi_connect
34 from samba.dsdb import (
35 DS_DOMAIN_FUNCTION_2000,
36 DS_DOMAIN_FUNCTION_2008,
37 UF_WORKSTATION_TRUST_ACCOUNT,
38 UF_NORMAL_ACCOUNT
40 from samba.ndr import ndr_pack, ndr_unpack
41 from samba import net
42 from samba.samdb import SamDB
44 from samba.tests import delete_force
45 import samba.tests.krb5.kcrypto as kcrypto
46 from samba.tests.krb5.raw_testcase import KerberosCredentials, RawKerberosTest
47 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
48 from samba.tests.krb5.rfc4120_constants import (
49 AD_IF_RELEVANT,
50 AD_WIN2K_PAC,
51 AES256_CTS_HMAC_SHA1_96,
52 ARCFOUR_HMAC_MD5,
53 KDC_ERR_PREAUTH_REQUIRED,
54 KRB_AS_REP,
55 KRB_TGS_REP,
56 KRB_ERROR,
57 KU_AS_REP_ENC_PART,
58 KU_ENC_CHALLENGE_CLIENT,
59 KU_PA_ENC_TIMESTAMP,
60 KU_TGS_REP_ENC_PART_SUB_KEY,
61 KU_TICKET,
62 NT_PRINCIPAL,
63 NT_SRV_HST,
64 PADATA_ENCRYPTED_CHALLENGE,
65 PADATA_ENC_TIMESTAMP,
66 PADATA_ETYPE_INFO2,
69 sys.path.insert(0, "bin/python")
70 os.environ["PYTHONUNBUFFERED"] = "1"
72 global_asn1_print = False
73 global_hexdump = False
76 class KDCBaseTest(RawKerberosTest):
77 """ Base class for KDC tests.
78 """
80 @classmethod
81 def setUpClass(cls):
82 super().setUpClass()
83 cls._lp = None
85 cls._ldb = None
87 cls._functional_level = None
89 # A set containing DNs of accounts created as part of testing.
90 cls.accounts = set()
92 @classmethod
93 def tearDownClass(cls):
94 # Clean up any accounts created by create_account. This is
95 # done in tearDownClass() rather than tearDown(), so that
96 # accounts need only be created once for permutation tests.
97 if cls._ldb is not None:
98 for dn in cls.accounts:
99 delete_force(cls._ldb, dn)
100 super().tearDownClass()
102 def setUp(self):
103 super().setUp()
104 self.do_asn1_print = global_asn1_print
105 self.do_hexdump = global_hexdump
107 def get_lp(self):
108 if self._lp is None:
109 type(self)._lp = self.get_loadparm()
111 return self._lp
113 def get_samdb(self):
114 if self._ldb is None:
115 creds = self.get_admin_creds()
116 lp = self.get_lp()
118 session = system_session()
119 type(self)._ldb = SamDB(url="ldap://%s" % self.host,
120 session_info=session,
121 credentials=creds,
122 lp=lp)
124 return self._ldb
126 def get_domain_functional_level(self, ldb):
127 if self._functional_level is None:
128 res = ldb.search(base='',
129 scope=SCOPE_BASE,
130 attrs=['domainFunctionality'])
131 try:
132 functional_level = int(res[0]['domainFunctionality'][0])
133 except KeyError:
134 functional_level = DS_DOMAIN_FUNCTION_2000
136 type(self)._functional_level = functional_level
138 return self._functional_level
140 def get_default_enctypes(self):
141 samdb = self.get_samdb()
142 functional_level = self.get_domain_functional_level(samdb)
144 # RC4 should always be supported
145 default_enctypes = security.KERB_ENCTYPE_RC4_HMAC_MD5
146 if functional_level >= DS_DOMAIN_FUNCTION_2008:
147 # AES is only supported at functional level 2008 or higher
148 default_enctypes |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
149 default_enctypes |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
151 return default_enctypes
153 def create_account(self, ldb, name, machine_account=False,
154 spn=None, upn=None, additional_details=None,
155 ou=None, account_control=0):
156 '''Create an account for testing.
157 The dn of the created account is added to self.accounts,
158 which is used by tearDownClass to clean up the created accounts.
160 if ou is None:
161 ou = ldb.domain_dn()
163 dn = "CN=%s,%s" % (name, ou)
165 # remove the account if it exists, this will happen if a previous test
166 # run failed
167 delete_force(ldb, dn)
168 if machine_account:
169 object_class = "computer"
170 account_name = "%s$" % name
171 account_control |= UF_WORKSTATION_TRUST_ACCOUNT
172 else:
173 object_class = "user"
174 account_name = name
175 account_control |= UF_NORMAL_ACCOUNT
177 password = generate_random_password(32, 32)
178 utf16pw = ('"%s"' % password).encode('utf-16-le')
180 details = {
181 "dn": dn,
182 "objectclass": object_class,
183 "sAMAccountName": account_name,
184 "userAccountControl": str(account_control),
185 "unicodePwd": utf16pw}
186 if spn is not None:
187 details["servicePrincipalName"] = spn
188 if upn is not None:
189 details["userPrincipalName"] = upn
190 if additional_details is not None:
191 details.update(additional_details)
192 ldb.add(details)
194 creds = KerberosCredentials()
195 creds.guess(self.get_lp())
196 creds.set_realm(ldb.domain_dns_name().upper())
197 creds.set_domain(ldb.domain_netbios_name().upper())
198 creds.set_password(password)
199 creds.set_username(account_name)
200 if machine_account:
201 creds.set_workstation(name)
202 else:
203 creds.set_workstation('')
205 # Save the account name so it can be deleted in tearDownClass
206 self.accounts.add(dn)
208 return (creds, dn)
210 def get_keys(self, samdb, dn):
211 admin_creds = self.get_admin_creds()
213 dns_hostname = samdb.host_dns_name()
214 (bind, handle, _) = drsuapi_connect(dns_hostname,
215 self.get_lp(),
216 admin_creds)
218 destination_dsa_guid = misc.GUID(samdb.get_ntds_GUID())
220 req = drsuapi.DsGetNCChangesRequest8()
222 req.destination_dsa_guid = destination_dsa_guid
223 req.source_dsa_invocation_id = misc.GUID()
225 naming_context = drsuapi.DsReplicaObjectIdentifier()
226 naming_context.dn = str(dn)
228 req.naming_context = naming_context
230 hwm = drsuapi.DsReplicaHighWaterMark()
231 hwm.tmp_highest_usn = 0
232 hwm.reserved_usn = 0
233 hwm.highest_usn = 0
235 req.highwatermark = hwm
236 req.uptodateness_vector = None
238 req.replica_flags = 0
240 req.max_object_count = 1
241 req.max_ndr_size = 402116
242 req.extended_op = drsuapi.DRSUAPI_EXOP_REPL_SECRET
244 attids = [drsuapi.DRSUAPI_ATTID_supplementalCredentials,
245 drsuapi.DRSUAPI_ATTID_unicodePwd]
247 partial_attribute_set = drsuapi.DsPartialAttributeSet()
248 partial_attribute_set.version = 1
249 partial_attribute_set.attids = attids
250 partial_attribute_set.num_attids = len(attids)
252 req.partial_attribute_set = partial_attribute_set
254 req.partial_attribute_set_ex = None
255 req.mapping_ctr.num_mappings = 0
256 req.mapping_ctr.mappings = None
258 _, ctr = bind.DsGetNCChanges(handle, 8, req)
259 identifier = ctr.first_object.object.identifier
260 attributes = ctr.first_object.object.attribute_ctr.attributes
262 rid = identifier.sid.split()[1]
264 net_ctx = net.Net(admin_creds)
266 keys = {}
268 for attr in attributes:
269 if attr.attid == drsuapi.DRSUAPI_ATTID_supplementalCredentials:
270 net_ctx.replicate_decrypt(bind, attr, rid)
271 attr_val = attr.value_ctr.values[0].blob
273 spl = ndr_unpack(drsblobs.supplementalCredentialsBlob,
274 attr_val)
275 for pkg in spl.sub.packages:
276 if pkg.name == 'Primary:Kerberos-Newer-Keys':
277 krb5_new_keys_raw = binascii.a2b_hex(pkg.data)
278 krb5_new_keys = ndr_unpack(
279 drsblobs.package_PrimaryKerberosBlob,
280 krb5_new_keys_raw)
281 for key in krb5_new_keys.ctr.keys:
282 keytype = key.keytype
283 if keytype in (kcrypto.Enctype.AES256,
284 kcrypto.Enctype.AES128):
285 keys[keytype] = key.value.hex()
286 elif attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
287 net_ctx.replicate_decrypt(bind, attr, rid)
288 pwd = attr.value_ctr.values[0].blob
289 keys[kcrypto.Enctype.RC4] = pwd.hex()
291 default_enctypes = self.get_default_enctypes()
293 if default_enctypes & security.KERB_ENCTYPE_RC4_HMAC_MD5:
294 self.assertIn(kcrypto.Enctype.RC4, keys)
295 if default_enctypes & security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96:
296 self.assertIn(kcrypto.Enctype.AES256, keys)
297 if default_enctypes & security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96:
298 self.assertIn(kcrypto.Enctype.AES128, keys)
300 return keys
302 def creds_set_keys(self, creds, keys):
303 if keys is not None:
304 for enctype, key in keys.items():
305 creds.set_forced_key(enctype, key)
307 supported_enctypes = 0
308 if kcrypto.Enctype.AES256 in keys:
309 supported_enctypes |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
310 if kcrypto.Enctype.AES128 in keys:
311 supported_enctypes |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
312 if kcrypto.Enctype.RC4 in keys:
313 supported_enctypes |= security.KERB_ENCTYPE_RC4_HMAC_MD5
315 creds.set_as_supported_enctypes(supported_enctypes)
316 creds.set_tgs_supported_enctypes(supported_enctypes)
317 creds.set_ap_supported_enctypes(supported_enctypes)
319 def get_client_creds(self,
320 allow_missing_password=False,
321 allow_missing_keys=True):
322 def create_client_account():
323 samdb = self.get_samdb()
325 creds, dn = self.create_account(samdb, 'kdctestclient')
327 res = samdb.search(base=dn,
328 scope=ldb.SCOPE_BASE,
329 attrs=['msDS-KeyVersionNumber'])
330 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
331 creds.set_kvno(kvno)
333 keys = self.get_keys(samdb, dn)
334 self.creds_set_keys(creds, keys)
336 return creds
338 c = self._get_krb5_creds(prefix='CLIENT',
339 allow_missing_password=allow_missing_password,
340 allow_missing_keys=allow_missing_keys,
341 fallback_creds_fn=create_client_account)
342 return c
344 def get_mach_creds(self,
345 allow_missing_password=False,
346 allow_missing_keys=True):
347 def create_mach_account():
348 samdb = self.get_samdb()
350 mach_name = 'kdctestmac'
351 details = {
352 'msDS-SupportedEncryptionTypes': str(
353 security.KERB_ENCTYPE_FAST_SUPPORTED |
354 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
355 security.KERB_ENCTYPE_CLAIMS_SUPPORTED
359 creds, dn = self.create_account(samdb, mach_name,
360 machine_account=True,
361 spn='host/' + mach_name,
362 additional_details=details)
364 res = samdb.search(base=dn,
365 scope=ldb.SCOPE_BASE,
366 attrs=['msDS-KeyVersionNumber'])
367 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
368 creds.set_kvno(kvno)
370 keys = self.get_keys(samdb, dn)
371 self.creds_set_keys(creds, keys)
373 return creds
375 c = self._get_krb5_creds(prefix='MAC',
376 allow_missing_password=allow_missing_password,
377 allow_missing_keys=allow_missing_keys,
378 fallback_creds_fn=create_mach_account)
379 return c
381 def get_service_creds(self,
382 allow_missing_password=False,
383 allow_missing_keys=True):
384 def create_service_account():
385 samdb = self.get_samdb()
387 mach_name = 'kdctestservice'
388 details = {
389 'msDS-SupportedEncryptionTypes': str(
390 security.KERB_ENCTYPE_FAST_SUPPORTED |
391 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
392 security.KERB_ENCTYPE_CLAIMS_SUPPORTED
396 creds, dn = self.create_account(samdb, mach_name,
397 machine_account=True,
398 spn='host/' + mach_name,
399 additional_details=details)
401 res = samdb.search(base=dn,
402 scope=ldb.SCOPE_BASE,
403 attrs=['msDS-KeyVersionNumber'])
404 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
405 creds.set_kvno(kvno)
407 keys = self.get_keys(samdb, dn)
408 self.creds_set_keys(creds, keys)
410 return creds
412 c = self._get_krb5_creds(prefix='SERVICE',
413 allow_missing_password=allow_missing_password,
414 allow_missing_keys=allow_missing_keys,
415 fallback_creds_fn=create_service_account)
416 return c
418 def get_krbtgt_creds(self,
419 require_keys=True,
420 require_strongest_key=False):
421 if require_strongest_key:
422 self.assertTrue(require_keys)
424 def download_krbtgt_creds():
425 samdb = self.get_samdb()
427 krbtgt_rid = 502
428 krbtgt_sid = '%s-%d' % (samdb.get_domain_sid(), krbtgt_rid)
430 res = samdb.search(base='<SID=%s>' % krbtgt_sid,
431 scope=ldb.SCOPE_BASE,
432 attrs=['sAMAccountName',
433 'msDS-KeyVersionNumber'])
434 dn = res[0].dn
435 username = str(res[0]['sAMAccountName'])
437 creds = KerberosCredentials()
438 creds.set_domain(self.env_get_var('DOMAIN', 'KRBTGT'))
439 creds.set_realm(self.env_get_var('REALM', 'KRBTGT'))
440 creds.set_username(username)
442 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
443 creds.set_kvno(kvno)
445 keys = self.get_keys(samdb, dn)
446 self.creds_set_keys(creds, keys)
448 return creds
450 c = self._get_krb5_creds(prefix='KRBTGT',
451 default_username='krbtgt',
452 allow_missing_password=True,
453 allow_missing_keys=not require_keys,
454 require_strongest_key=require_strongest_key,
455 fallback_creds_fn=download_krbtgt_creds)
456 return c
458 def as_req(self, cname, sname, realm, etypes, padata=None):
459 '''Send a Kerberos AS_REQ, returns the undecoded response
462 till = self.get_KerberosTime(offset=36000)
463 kdc_options = 0
465 req = self.AS_REQ_create(padata=padata,
466 kdc_options=str(kdc_options),
467 cname=cname,
468 realm=realm,
469 sname=sname,
470 from_time=None,
471 till_time=till,
472 renew_time=None,
473 nonce=0x7fffffff,
474 etypes=etypes,
475 addresses=None,
476 additional_tickets=None)
477 rep = self.send_recv_transaction(req)
478 return rep
480 def get_as_rep_key(self, creds, rep):
481 '''Extract the session key from an AS-REP
483 rep_padata = self.der_decode(
484 rep['e-data'],
485 asn1Spec=krb5_asn1.METHOD_DATA())
487 for pa in rep_padata:
488 if pa['padata-type'] == PADATA_ETYPE_INFO2:
489 padata_value = pa['padata-value']
490 break
492 etype_info2 = self.der_decode(
493 padata_value, asn1Spec=krb5_asn1.ETYPE_INFO2())
495 key = self.PasswordKey_from_etype_info2(creds, etype_info2[0],
496 creds.get_kvno())
497 return key
499 def get_enc_timestamp_pa_data(self, creds, rep, skew=0):
500 '''generate the pa_data data element for an AS-REQ
503 key = self.get_as_rep_key(creds, rep)
505 return self.get_enc_timestamp_pa_data_from_key(key, skew=skew)
507 def get_enc_timestamp_pa_data_from_key(self, key, skew=0):
508 (patime, pausec) = self.get_KerberosTimeWithUsec(offset=skew)
509 padata = self.PA_ENC_TS_ENC_create(patime, pausec)
510 padata = self.der_encode(padata, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
512 padata = self.EncryptedData_create(key, KU_PA_ENC_TIMESTAMP, padata)
513 padata = self.der_encode(padata, asn1Spec=krb5_asn1.EncryptedData())
515 padata = self.PA_DATA_create(PADATA_ENC_TIMESTAMP, padata)
517 return padata
519 def get_challenge_pa_data(self, client_challenge_key, skew=0):
520 patime, pausec = self.get_KerberosTimeWithUsec(offset=skew)
521 padata = self.PA_ENC_TS_ENC_create(patime, pausec)
522 padata = self.der_encode(padata,
523 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
525 padata = self.EncryptedData_create(client_challenge_key,
526 KU_ENC_CHALLENGE_CLIENT,
527 padata)
528 padata = self.der_encode(padata,
529 asn1Spec=krb5_asn1.EncryptedData())
531 padata = self.PA_DATA_create(PADATA_ENCRYPTED_CHALLENGE,
532 padata)
534 return padata
536 def get_as_rep_enc_data(self, key, rep):
537 ''' Decrypt and Decode the encrypted data in an AS-REP
539 enc_part = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
540 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
541 # application tag 26
542 try:
543 enc_part = self.der_decode(
544 enc_part, asn1Spec=krb5_asn1.EncASRepPart())
545 except Exception:
546 enc_part = self.der_decode(
547 enc_part, asn1Spec=krb5_asn1.EncTGSRepPart())
549 return enc_part
551 def check_pre_authentication(self, rep):
552 """ Check that the kdc response was pre-authentication required
554 self.check_error_rep(rep, KDC_ERR_PREAUTH_REQUIRED)
556 def check_as_reply(self, rep):
557 """ Check that the kdc response is an AS-REP and that the
558 values for:
559 msg-type
560 pvno
561 tkt-pvno
562 kvno
563 match the expected values
565 self.check_reply(rep, msg_type=KRB_AS_REP)
567 def check_tgs_reply(self, rep):
568 """ Check that the kdc response is an TGS-REP and that the
569 values for:
570 msg-type
571 pvno
572 tkt-pvno
573 kvno
574 match the expected values
576 self.check_reply(rep, msg_type=KRB_TGS_REP)
578 def check_reply(self, rep, msg_type):
580 # Should have a reply, and it should an TGS-REP message.
581 self.assertIsNotNone(rep)
582 self.assertEqual(rep['msg-type'], msg_type, "rep = {%s}" % rep)
584 # Protocol version number should be 5
585 pvno = int(rep['pvno'])
586 self.assertEqual(5, pvno, "rep = {%s}" % rep)
588 # The ticket version number should be 5
589 tkt_vno = int(rep['ticket']['tkt-vno'])
590 self.assertEqual(5, tkt_vno, "rep = {%s}" % rep)
592 # Check that the kvno is not an RODC kvno
593 # MIT kerberos does not provide the kvno, so we treat it as optional.
594 # This is tested in compatability_test.py
595 if 'kvno' in rep['enc-part']:
596 kvno = int(rep['enc-part']['kvno'])
597 # If the high order bits are set this is an RODC kvno.
598 self.assertEqual(0, kvno & 0xFFFF0000, "rep = {%s}" % rep)
600 def check_error_rep(self, rep, expected):
601 """ Check that the reply is an error message, with the expected
602 error-code specified.
604 self.assertIsNotNone(rep)
605 self.assertEqual(rep['msg-type'], KRB_ERROR, "rep = {%s}" % rep)
606 if isinstance(expected, collections.abc.Container):
607 self.assertIn(rep['error-code'], expected, "rep = {%s}" % rep)
608 else:
609 self.assertEqual(rep['error-code'], expected, "rep = {%s}" % rep)
611 def tgs_req(self, cname, sname, realm, ticket, key, etypes):
612 '''Send a TGS-REQ, returns the response and the decrypted and
613 decoded enc-part
616 kdc_options = "0"
617 till = self.get_KerberosTime(offset=36000)
618 padata = []
620 subkey = self.RandomKey(key.etype)
622 (ctime, cusec) = self.get_KerberosTimeWithUsec()
624 req = self.TGS_REQ_create(padata=padata,
625 cusec=cusec,
626 ctime=ctime,
627 ticket=ticket,
628 kdc_options=str(kdc_options),
629 cname=cname,
630 realm=realm,
631 sname=sname,
632 from_time=None,
633 till_time=till,
634 renew_time=None,
635 nonce=0x7ffffffe,
636 etypes=etypes,
637 addresses=None,
638 EncAuthorizationData=None,
639 EncAuthorizationData_key=None,
640 additional_tickets=None,
641 ticket_session_key=key,
642 authenticator_subkey=subkey)
643 rep = self.send_recv_transaction(req)
644 self.assertIsNotNone(rep)
646 msg_type = rep['msg-type']
647 enc_part = None
648 if msg_type == KRB_TGS_REP:
649 enc_part = subkey.decrypt(
650 KU_TGS_REP_ENC_PART_SUB_KEY, rep['enc-part']['cipher'])
651 enc_part = self.der_decode(
652 enc_part, asn1Spec=krb5_asn1.EncTGSRepPart())
653 return (rep, enc_part)
655 # Named tuple to contain values of interest when the PAC is decoded.
656 PacData = namedtuple(
657 "PacData",
658 "account_name account_sid logon_name upn domain_name")
659 PAC_LOGON_INFO = 1
660 PAC_CREDENTIAL_INFO = 2
661 PAC_SRV_CHECKSUM = 6
662 PAC_KDC_CHECKSUM = 7
663 PAC_LOGON_NAME = 10
664 PAC_CONSTRAINED_DELEGATION = 11
665 PAC_UPN_DNS_INFO = 12
667 def get_pac_data(self, authorization_data):
668 '''Decode the PAC element contained in the authorization-data element
670 account_name = None
671 user_sid = None
672 logon_name = None
673 upn = None
674 domain_name = None
676 # The PAC data will be wrapped in an AD_IF_RELEVANT element
677 ad_if_relevant_elements = (
678 x for x in authorization_data if x['ad-type'] == AD_IF_RELEVANT)
679 for dt in ad_if_relevant_elements:
680 buf = self.der_decode(
681 dt['ad-data'], asn1Spec=krb5_asn1.AD_IF_RELEVANT())
682 # The PAC data is further wrapped in a AD_WIN2K_PAC element
683 for ad in (x for x in buf if x['ad-type'] == AD_WIN2K_PAC):
684 pb = ndr_unpack(krb5pac.PAC_DATA, ad['ad-data'])
685 for pac in pb.buffers:
686 if pac.type == self.PAC_LOGON_INFO:
687 account_name = (
688 pac.info.info.info3.base.account_name)
689 user_sid = (
690 str(pac.info.info.info3.base.domain_sid)
691 + "-" + str(pac.info.info.info3.base.rid))
692 elif pac.type == self.PAC_LOGON_NAME:
693 logon_name = pac.info.account_name
694 elif pac.type == self.PAC_UPN_DNS_INFO:
695 upn = pac.info.upn_name
696 domain_name = pac.info.dns_domain_name
698 return self.PacData(
699 account_name,
700 user_sid,
701 logon_name,
702 upn,
703 domain_name)
705 def decode_service_ticket(self, creds, ticket):
706 '''Decrypt and decode a service ticket
709 name = creds.get_username()
710 if name.endswith('$'):
711 name = name[:-1]
712 realm = creds.get_realm()
713 salt = "%s.%s@%s" % (name, realm.lower(), realm.upper())
715 key = self.PasswordKey_create(
716 ticket['enc-part']['etype'],
717 creds.get_password(),
718 salt,
719 ticket['enc-part']['kvno'])
721 enc_part = key.decrypt(KU_TICKET, ticket['enc-part']['cipher'])
722 enc_ticket_part = self.der_decode(
723 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
724 return enc_ticket_part
726 def get_objectSid(self, samdb, dn):
727 ''' Get the objectSID for a DN
728 Note: performs an Ldb query.
730 res = samdb.search(dn, scope=SCOPE_BASE, attrs=["objectSID"])
731 self.assertTrue(len(res) == 1, "did not get objectSid for %s" % dn)
732 sid = samdb.schema_format_value("objectSID", res[0]["objectSID"][0])
733 return sid.decode('utf8')
735 def add_attribute(self, samdb, dn_str, name, value):
736 if isinstance(value, list):
737 values = value
738 else:
739 values = [value]
740 flag = ldb.FLAG_MOD_ADD
742 dn = ldb.Dn(samdb, dn_str)
743 msg = ldb.Message(dn)
744 msg[name] = ldb.MessageElement(values, flag, name)
745 samdb.modify(msg)
747 def modify_attribute(self, samdb, dn_str, name, value):
748 if isinstance(value, list):
749 values = value
750 else:
751 values = [value]
752 flag = ldb.FLAG_MOD_REPLACE
754 dn = ldb.Dn(samdb, dn_str)
755 msg = ldb.Message(dn)
756 msg[name] = ldb.MessageElement(values, flag, name)
757 samdb.modify(msg)
759 def create_ccache(self, cname, ticket, enc_part):
760 """ Lay out a version 4 on-disk credentials cache, to be read using the
761 FILE: protocol.
764 field = krb5ccache.DELTATIME_TAG()
765 field.kdc_sec_offset = 0
766 field.kdc_usec_offset = 0
768 v4tag = krb5ccache.V4TAG()
769 v4tag.tag = 1
770 v4tag.field = field
772 v4tags = krb5ccache.V4TAGS()
773 v4tags.tag = v4tag
774 v4tags.further_tags = b''
776 optional_header = krb5ccache.V4HEADER()
777 optional_header.v4tags = v4tags
779 cname_string = cname['name-string']
781 cprincipal = krb5ccache.PRINCIPAL()
782 cprincipal.name_type = cname['name-type']
783 cprincipal.component_count = len(cname_string)
784 cprincipal.realm = ticket['realm']
785 cprincipal.components = cname_string
787 sname = ticket['sname']
788 sname_string = sname['name-string']
790 sprincipal = krb5ccache.PRINCIPAL()
791 sprincipal.name_type = sname['name-type']
792 sprincipal.component_count = len(sname_string)
793 sprincipal.realm = ticket['realm']
794 sprincipal.components = sname_string
796 key = self.EncryptionKey_import(enc_part['key'])
798 key_data = key.export_obj()
799 keyblock = krb5ccache.KEYBLOCK()
800 keyblock.enctype = key_data['keytype']
801 keyblock.data = key_data['keyvalue']
803 addresses = krb5ccache.ADDRESSES()
804 addresses.count = 0
805 addresses.data = []
807 authdata = krb5ccache.AUTHDATA()
808 authdata.count = 0
809 authdata.data = []
811 # Re-encode the ticket, since it was decoded by another layer.
812 ticket_data = self.der_encode(ticket, asn1Spec=krb5_asn1.Ticket())
814 authtime = enc_part['authtime']
815 starttime = enc_part.get('starttime', authtime)
816 endtime = enc_part['endtime']
818 cred = krb5ccache.CREDENTIAL()
819 cred.client = cprincipal
820 cred.server = sprincipal
821 cred.keyblock = keyblock
822 cred.authtime = self.get_EpochFromKerberosTime(authtime)
823 cred.starttime = self.get_EpochFromKerberosTime(starttime)
824 cred.endtime = self.get_EpochFromKerberosTime(endtime)
826 # Account for clock skew of up to five minutes.
827 self.assertLess(cred.authtime - 5 * 60,
828 datetime.now(timezone.utc).timestamp(),
829 "Ticket not yet valid - clocks may be out of sync.")
830 self.assertLess(cred.starttime - 5 * 60,
831 datetime.now(timezone.utc).timestamp(),
832 "Ticket not yet valid - clocks may be out of sync.")
833 self.assertGreater(cred.endtime - 60 * 60,
834 datetime.now(timezone.utc).timestamp(),
835 "Ticket already expired/about to expire - "
836 "clocks may be out of sync.")
838 cred.renew_till = cred.endtime
839 cred.is_skey = 0
840 cred.ticket_flags = int(enc_part['flags'], 2)
841 cred.addresses = addresses
842 cred.authdata = authdata
843 cred.ticket = ticket_data
844 cred.second_ticket = b''
846 ccache = krb5ccache.CCACHE()
847 ccache.pvno = 5
848 ccache.version = 4
849 ccache.optional_header = optional_header
850 ccache.principal = cprincipal
851 ccache.cred = cred
853 # Serialise the credentials cache structure.
854 result = ndr_pack(ccache)
856 # Create a temporary file and write the credentials.
857 cachefile = tempfile.NamedTemporaryFile(dir=self.tempdir, delete=False)
858 cachefile.write(result)
859 cachefile.close()
861 return cachefile
863 def create_ccache_with_user(self, user_credentials, mach_name,
864 service="host"):
865 # Obtain a service ticket authorising the user and place it into a
866 # newly created credentials cache file.
868 user_name = user_credentials.get_username()
869 realm = user_credentials.get_realm()
871 # Do the initial AS-REQ, should get a pre-authentication required
872 # response
873 etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
874 cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
875 names=[user_name])
876 sname = self.PrincipalName_create(name_type=NT_SRV_HST,
877 names=["krbtgt", realm])
879 rep = self.as_req(cname, sname, realm, etype)
880 self.check_pre_authentication(rep)
882 # Do the next AS-REQ
883 padata = self.get_enc_timestamp_pa_data(user_credentials, rep)
884 key = self.get_as_rep_key(user_credentials, rep)
885 rep = self.as_req(cname, sname, realm, etype, padata=[padata])
886 self.check_as_reply(rep)
888 # Request a ticket to the host service on the machine account
889 ticket = rep['ticket']
890 enc_part = self.get_as_rep_enc_data(key, rep)
891 key = self.EncryptionKey_import(enc_part['key'])
892 cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
893 names=[user_name])
894 sname = self.PrincipalName_create(name_type=NT_SRV_HST,
895 names=[service, mach_name])
897 (rep, enc_part) = self.tgs_req(
898 cname, sname, realm, ticket, key, etype)
899 self.check_tgs_reply(rep)
900 key = self.EncryptionKey_import(enc_part['key'])
902 # Check the contents of the pac, and the ticket
903 ticket = rep['ticket']
905 # Write the ticket into a credentials cache file that can be ingested
906 # by the main credentials code.
907 cachefile = self.create_ccache(cname, ticket, enc_part)
909 # Create a credentials object to reference the credentials cache.
910 creds = Credentials()
911 creds.set_kerberos_state(MUST_USE_KERBEROS)
912 creds.set_username(user_name, SPECIFIED)
913 creds.set_realm(realm)
914 creds.set_named_ccache(cachefile.name, SPECIFIED, self.get_lp())
916 # Return the credentials along with the cache file.
917 return (creds, cachefile)