1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Stefan Metzmacher 2020
3 # Copyright (C) 2020-2021 Catalyst.Net Ltd
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from datetime
import datetime
, timezone
27 from collections
import namedtuple
29 from ldb
import SCOPE_BASE
30 from samba
import generate_random_password
31 from samba
.auth
import system_session
32 from samba
.credentials
import Credentials
, SPECIFIED
, MUST_USE_KERBEROS
33 from samba
.dcerpc
import drsblobs
, drsuapi
, misc
, krb5pac
, krb5ccache
, security
34 from samba
.drs_utils
import drs_Replicate
, drsuapi_connect
35 from samba
.dsdb
import (
36 DSDB_SYNTAX_BINARY_DN
,
37 DS_DOMAIN_FUNCTION_2000
,
38 DS_DOMAIN_FUNCTION_2008
,
39 DS_GUID_COMPUTERS_CONTAINER
,
40 DS_GUID_USERS_CONTAINER
,
41 UF_WORKSTATION_TRUST_ACCOUNT
,
42 UF_NO_AUTH_DATA_REQUIRED
,
45 UF_PARTIAL_SECRETS_ACCOUNT
,
46 UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
48 from samba
.join
import DCJoinContext
49 from samba
.ndr
import ndr_pack
, ndr_unpack
51 from samba
.samdb
import SamDB
, dsdb_Dn
53 from samba
.tests
import delete_force
54 import samba
.tests
.krb5
.kcrypto
as kcrypto
55 from samba
.tests
.krb5
.raw_testcase
import (
60 import samba
.tests
.krb5
.rfc4120_pyasn1
as krb5_asn1
61 from samba
.tests
.krb5
.rfc4120_constants
import (
64 AES256_CTS_HMAC_SHA1_96
,
66 KDC_ERR_PREAUTH_REQUIRED
,
71 KU_ENC_CHALLENGE_CLIENT
,
77 PADATA_ENCRYPTED_CHALLENGE
,
82 sys
.path
.insert(0, "bin/python")
83 os
.environ
["PYTHONUNBUFFERED"] = "1"
85 global_asn1_print
= False
86 global_hexdump
= False
89 class KDCBaseTest(RawKerberosTest
):
90 """ Base class for KDC tests.
101 cls
._functional
_level
= None
103 # An identifier to ensure created accounts have unique names. Windows
104 # caches accounts based on usernames, so account names being different
105 # across test runs avoids previous test runs affecting the results.
106 cls
.account_base
= f
'krb5_{secrets.token_hex(5)}_'
109 # A set containing DNs of accounts created as part of testing.
112 cls
.account_cache
= {}
117 cls
.ldb_cleanups
= []
120 def tearDownClass(cls
):
121 # Clean up any accounts created by create_account. This is
122 # done in tearDownClass() rather than tearDown(), so that
123 # accounts need only be created once for permutation tests.
124 if cls
._ldb
is not None:
125 for cleanup
in reversed(cls
.ldb_cleanups
):
127 cls
._ldb
.modify(cleanup
)
131 for dn
in cls
.accounts
:
132 delete_force(cls
._ldb
, dn
)
134 if cls
._rodc
_ctx
is not None:
135 cls
._rodc
_ctx
.cleanup_old_join(force
=True)
137 super().tearDownClass()
141 self
.do_asn1_print
= global_asn1_print
142 self
.do_hexdump
= global_hexdump
146 type(self
)._lp
= self
.get_loadparm()
151 if self
._ldb
is None:
152 creds
= self
.get_admin_creds()
155 session
= system_session()
156 type(self
)._ldb
= SamDB(url
="ldap://%s" % self
.dc_host
,
157 session_info
=session
,
163 def get_rodc_samdb(self
):
164 if self
._rodc
_ldb
is None:
165 creds
= self
.get_admin_creds()
168 session
= system_session()
169 type(self
)._rodc
_ldb
= SamDB(url
="ldap://%s" % self
.host
,
170 session_info
=session
,
175 return self
._rodc
_ldb
177 def get_server_dn(self
, samdb
):
178 server
= samdb
.get_serverName()
180 res
= samdb
.search(base
=server
,
181 scope
=ldb
.SCOPE_BASE
,
182 attrs
=['serverReference'])
183 dn
= ldb
.Dn(samdb
, res
[0]['serverReference'][0].decode('utf8'))
187 def get_mock_rodc_ctx(self
):
188 if self
._rodc
_ctx
is None:
189 admin_creds
= self
.get_admin_creds()
192 rodc_name
= 'KRB5RODC'
193 site_name
= 'Default-First-Site-Name'
195 type(self
)._rodc
_ctx
= DCJoinContext(server
=self
.dc_host
,
199 netbios_name
=rodc_name
,
202 self
.create_rodc(self
._rodc
_ctx
)
204 return self
._rodc
_ctx
206 def get_domain_functional_level(self
, ldb
):
207 if self
._functional
_level
is None:
208 res
= ldb
.search(base
='',
210 attrs
=['domainFunctionality'])
212 functional_level
= int(res
[0]['domainFunctionality'][0])
214 functional_level
= DS_DOMAIN_FUNCTION_2000
216 type(self
)._functional
_level
= functional_level
218 return self
._functional
_level
220 def get_default_enctypes(self
):
221 samdb
= self
.get_samdb()
222 functional_level
= self
.get_domain_functional_level(samdb
)
224 # RC4 should always be supported
225 default_enctypes
= {kcrypto
.Enctype
.RC4
}
226 if functional_level
>= DS_DOMAIN_FUNCTION_2008
:
227 # AES is only supported at functional level 2008 or higher
228 default_enctypes
.add(kcrypto
.Enctype
.AES256
)
229 default_enctypes
.add(kcrypto
.Enctype
.AES128
)
231 return default_enctypes
233 def create_account(self
, samdb
, name
, machine_account
=False,
234 spn
=None, upn
=None, additional_details
=None,
235 ou
=None, account_control
=0):
236 '''Create an account for testing.
237 The dn of the created account is added to self.accounts,
238 which is used by tearDownClass to clean up the created accounts.
241 guid
= (DS_GUID_COMPUTERS_CONTAINER
if machine_account
242 else DS_GUID_USERS_CONTAINER
)
244 ou
= samdb
.get_wellknown_dn(samdb
.get_default_basedn(), guid
)
246 dn
= "CN=%s,%s" % (name
, ou
)
248 # remove the account if it exists, this will happen if a previous test
250 delete_force(samdb
, dn
)
252 object_class
= "computer"
253 account_name
= "%s$" % name
254 account_control |
= UF_WORKSTATION_TRUST_ACCOUNT
256 object_class
= "user"
258 account_control |
= UF_NORMAL_ACCOUNT
260 password
= generate_random_password(32, 32)
261 utf16pw
= ('"%s"' % password
).encode('utf-16-le')
265 "objectclass": object_class
,
266 "sAMAccountName": account_name
,
267 "userAccountControl": str(account_control
),
268 "unicodePwd": utf16pw
}
270 details
["servicePrincipalName"] = spn
272 details
["userPrincipalName"] = upn
273 if additional_details
is not None:
274 details
.update(additional_details
)
277 creds
= KerberosCredentials()
278 creds
.guess(self
.get_lp())
279 creds
.set_realm(samdb
.domain_dns_name().upper())
280 creds
.set_domain(samdb
.domain_netbios_name().upper())
281 creds
.set_password(password
)
282 creds
.set_username(account_name
)
284 creds
.set_workstation(name
)
286 creds
.set_workstation('')
287 creds
.set_dn(ldb
.Dn(samdb
, dn
))
290 # Save the account name so it can be deleted in tearDownClass
291 self
.accounts
.add(dn
)
293 self
.creds_set_enctypes(creds
)
295 res
= samdb
.search(base
=dn
,
296 scope
=ldb
.SCOPE_BASE
,
297 attrs
=['msDS-KeyVersionNumber'])
298 kvno
= res
[0].get('msDS-KeyVersionNumber', idx
=0)
300 self
.assertEqual(int(kvno
), 1)
305 def get_security_descriptor(self
, dn
):
306 samdb
= self
.get_samdb()
308 sid
= self
.get_objectSid(samdb
, dn
)
310 owner_sid
= security
.dom_sid(security
.SID_BUILTIN_ADMINISTRATORS
)
313 ace
.access_mask
= security
.SEC_ADS_GENERIC_ALL
315 ace
.trustee
= security
.dom_sid(sid
)
317 dacl
= security
.acl()
318 dacl
.revision
= security
.SECURITY_ACL_REVISION_ADS
322 security_desc
= security
.descriptor()
323 security_desc
.type |
= security
.SEC_DESC_DACL_PRESENT
324 security_desc
.owner_sid
= owner_sid
325 security_desc
.dacl
= dacl
327 return ndr_pack(security_desc
)
329 def create_rodc(self
, ctx
):
330 ctx
.nc_list
= [ctx
.base_dn
, ctx
.config_dn
, ctx
.schema_dn
]
331 ctx
.full_nc_list
= [ctx
.base_dn
, ctx
.config_dn
, ctx
.schema_dn
]
332 ctx
.krbtgt_dn
= f
'CN=krbtgt_{ctx.myname},CN=Users,{ctx.base_dn}'
334 ctx
.never_reveal_sid
= [f
'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_DENY}>',
335 f
'<SID={security.SID_BUILTIN_ADMINISTRATORS}>',
336 f
'<SID={security.SID_BUILTIN_SERVER_OPERATORS}>',
337 f
'<SID={security.SID_BUILTIN_BACKUP_OPERATORS}>',
338 f
'<SID={security.SID_BUILTIN_ACCOUNT_OPERATORS}>']
339 ctx
.reveal_sid
= f
'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_ALLOW}>'
341 mysid
= ctx
.get_mysid()
342 admin_dn
= f
'<SID={mysid}>'
343 ctx
.managedby
= admin_dn
345 ctx
.userAccountControl
= (UF_WORKSTATION_TRUST_ACCOUNT |
346 UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
347 UF_PARTIAL_SECRETS_ACCOUNT
)
349 ctx
.connection_dn
= f
'CN=RODC Connection (FRS),{ctx.ntds_dn}'
350 ctx
.secure_channel_type
= misc
.SEC_CHAN_RODC
352 ctx
.replica_flags
= (drsuapi
.DRSUAPI_DRS_INIT_SYNC |
353 drsuapi
.DRSUAPI_DRS_PER_SYNC |
354 drsuapi
.DRSUAPI_DRS_GET_ANC |
355 drsuapi
.DRSUAPI_DRS_NEVER_SYNCED |
356 drsuapi
.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING
)
357 ctx
.domain_replica_flags
= ctx
.replica_flags | drsuapi
.DRSUAPI_DRS_CRITICAL_ONLY
361 ctx
.cleanup_old_join()
364 ctx
.join_add_objects()
366 # cleanup the failed join (checking we still have a live LDB
367 # connection to the remote DC first)
368 ctx
.refresh_ldb_connection()
369 ctx
.cleanup_old_join()
372 def replicate_account_to_rodc(self
, dn
):
373 samdb
= self
.get_samdb()
374 rodc_samdb
= self
.get_rodc_samdb()
376 repl_val
= f
'{samdb.get_dsServiceName()}:{dn}:SECRETS_ONLY'
379 msg
.dn
= ldb
.Dn(rodc_samdb
, '')
380 msg
['replicateSingleObject'] = ldb
.MessageElement(
382 ldb
.FLAG_MOD_REPLACE
,
383 'replicateSingleObject')
386 # Try replication using the replicateSingleObject rootDSE
388 rodc_samdb
.modify(msg
)
389 except ldb
.LdbError
as err
:
390 enum
, estr
= err
.args
391 self
.assertEqual(enum
, ldb
.ERR_UNWILLING_TO_PERFORM
)
392 self
.assertIn('rootdse_modify: unknown attribute to change!',
395 # If that method wasn't supported, we may be in the rodc:local test
396 # environment, where we can try replicating to the local database.
400 rodc_creds
= Credentials()
402 rodc_creds
.set_machine_account(lp
)
404 local_samdb
= SamDB(url
=None, session_info
=system_session(),
405 credentials
=rodc_creds
, lp
=lp
)
407 destination_dsa_guid
= misc
.GUID(local_samdb
.get_ntds_GUID())
409 repl
= drs_Replicate(f
'ncacn_ip_tcp:{self.dc_host}[seal]',
411 local_samdb
, destination_dsa_guid
)
413 source_dsa_invocation_id
= misc
.GUID(samdb
.invocation_id
)
416 source_dsa_invocation_id
,
417 destination_dsa_guid
,
418 exop
=drsuapi
.DRSUAPI_EXOP_REPL_SECRET
,
421 def reveal_account_to_mock_rodc(self
, dn
):
422 samdb
= self
.get_samdb()
423 rodc_ctx
= self
.get_mock_rodc_ctx()
428 destination_dsa_guid
=rodc_ctx
.ntds_guid
,
429 source_dsa_invocation_id
=misc
.GUID(samdb
.invocation_id
))
431 def check_revealed(self
, dn
, rodc_dn
, revealed
=True):
432 samdb
= self
.get_samdb()
434 res
= samdb
.search(base
=rodc_dn
,
435 scope
=ldb
.SCOPE_BASE
,
436 attrs
=['msDS-RevealedUsers'])
438 revealed_users
= res
[0].get('msDS-RevealedUsers')
439 if revealed_users
is None:
440 self
.assertFalse(revealed
)
443 revealed_dns
= set(str(dsdb_Dn(samdb
, str(user
),
444 syntax_oid
=DSDB_SYNTAX_BINARY_DN
).dn
)
445 for user
in revealed_users
)
448 self
.assertIn(str(dn
), revealed_dns
)
450 self
.assertNotIn(str(dn
), revealed_dns
)
452 def get_secrets(self
, samdb
, dn
,
453 destination_dsa_guid
,
454 source_dsa_invocation_id
):
455 admin_creds
= self
.get_admin_creds()
457 dns_hostname
= samdb
.host_dns_name()
458 (bind
, handle
, _
) = drsuapi_connect(dns_hostname
,
462 req
= drsuapi
.DsGetNCChangesRequest8()
464 req
.destination_dsa_guid
= destination_dsa_guid
465 req
.source_dsa_invocation_id
= source_dsa_invocation_id
467 naming_context
= drsuapi
.DsReplicaObjectIdentifier()
468 naming_context
.dn
= dn
470 req
.naming_context
= naming_context
472 hwm
= drsuapi
.DsReplicaHighWaterMark()
473 hwm
.tmp_highest_usn
= 0
477 req
.highwatermark
= hwm
478 req
.uptodateness_vector
= None
480 req
.replica_flags
= 0
482 req
.max_object_count
= 1
483 req
.max_ndr_size
= 402116
484 req
.extended_op
= drsuapi
.DRSUAPI_EXOP_REPL_SECRET
486 attids
= [drsuapi
.DRSUAPI_ATTID_supplementalCredentials
,
487 drsuapi
.DRSUAPI_ATTID_unicodePwd
]
489 partial_attribute_set
= drsuapi
.DsPartialAttributeSet()
490 partial_attribute_set
.version
= 1
491 partial_attribute_set
.attids
= attids
492 partial_attribute_set
.num_attids
= len(attids
)
494 req
.partial_attribute_set
= partial_attribute_set
496 req
.partial_attribute_set_ex
= None
497 req
.mapping_ctr
.num_mappings
= 0
498 req
.mapping_ctr
.mappings
= None
500 _
, ctr
= bind
.DsGetNCChanges(handle
, 8, req
)
502 self
.assertEqual(1, ctr
.object_count
)
504 identifier
= ctr
.first_object
.object.identifier
505 attributes
= ctr
.first_object
.object.attribute_ctr
.attributes
507 self
.assertEqual(dn
, identifier
.dn
)
509 return bind
, identifier
, attributes
511 def get_keys(self
, samdb
, dn
):
512 admin_creds
= self
.get_admin_creds()
514 bind
, identifier
, attributes
= self
.get_secrets(
517 destination_dsa_guid
=misc
.GUID(samdb
.get_ntds_GUID()),
518 source_dsa_invocation_id
=misc
.GUID())
520 rid
= identifier
.sid
.split()[1]
522 net_ctx
= net
.Net(admin_creds
)
526 for attr
in attributes
:
527 if attr
.attid
== drsuapi
.DRSUAPI_ATTID_supplementalCredentials
:
528 net_ctx
.replicate_decrypt(bind
, attr
, rid
)
529 attr_val
= attr
.value_ctr
.values
[0].blob
531 spl
= ndr_unpack(drsblobs
.supplementalCredentialsBlob
,
533 for pkg
in spl
.sub
.packages
:
534 if pkg
.name
== 'Primary:Kerberos-Newer-Keys':
535 krb5_new_keys_raw
= binascii
.a2b_hex(pkg
.data
)
536 krb5_new_keys
= ndr_unpack(
537 drsblobs
.package_PrimaryKerberosBlob
,
539 for key
in krb5_new_keys
.ctr
.keys
:
540 keytype
= key
.keytype
541 if keytype
in (kcrypto
.Enctype
.AES256
,
542 kcrypto
.Enctype
.AES128
):
543 keys
[keytype
] = key
.value
.hex()
544 elif attr
.attid
== drsuapi
.DRSUAPI_ATTID_unicodePwd
:
545 net_ctx
.replicate_decrypt(bind
, attr
, rid
)
546 pwd
= attr
.value_ctr
.values
[0].blob
547 keys
[kcrypto
.Enctype
.RC4
] = pwd
.hex()
549 default_enctypes
= self
.get_default_enctypes()
551 self
.assertCountEqual(default_enctypes
, keys
)
555 def creds_set_keys(self
, creds
, keys
):
557 for enctype
, key
in keys
.items():
558 creds
.set_forced_key(enctype
, key
)
560 def creds_set_enctypes(self
, creds
):
561 samdb
= self
.get_samdb()
563 res
= samdb
.search(creds
.get_dn(),
564 scope
=ldb
.SCOPE_BASE
,
565 attrs
=['msDS-SupportedEncryptionTypes'])
566 supported_enctypes
= res
[0].get('msDS-SupportedEncryptionTypes', idx
=0)
568 if supported_enctypes
is None:
569 supported_enctypes
= 0
571 creds
.set_as_supported_enctypes(supported_enctypes
)
572 creds
.set_tgs_supported_enctypes(supported_enctypes
)
573 creds
.set_ap_supported_enctypes(supported_enctypes
)
575 def creds_set_default_enctypes(self
, creds
, fast_support
=False):
576 default_enctypes
= self
.get_default_enctypes()
577 supported_enctypes
= KerberosCredentials
.etypes_to_bits(
581 supported_enctypes |
= KerberosCredentials
.fast_supported_bits
583 creds
.set_as_supported_enctypes(supported_enctypes
)
584 creds
.set_tgs_supported_enctypes(supported_enctypes
)
585 creds
.set_ap_supported_enctypes(supported_enctypes
)
587 def add_to_group(self
, account_dn
, group_dn
, group_attr
):
588 samdb
= self
.get_samdb()
590 res
= samdb
.search(base
=group_dn
,
591 scope
=ldb
.SCOPE_BASE
,
594 self
.assertIn(group_attr
, orig_msg
)
596 members
= list(orig_msg
[group_attr
])
597 members
.append(account_dn
)
601 msg
[group_attr
] = ldb
.MessageElement(members
,
602 ldb
.FLAG_MOD_REPLACE
,
605 cleanup
= samdb
.msg_diff(msg
, orig_msg
)
606 self
.ldb_cleanups
.append(cleanup
)
611 def get_cached_creds(self
, *,
618 'allowed_replication': False,
619 'allowed_replication_mock': False,
620 'denied_replication': False,
621 'denied_replication_mock': False,
622 'revealed_to_rodc': False,
623 'revealed_to_mock_rodc': False,
624 'no_auth_data_required': False,
625 'supported_enctypes': None,
626 'not_delegated': False,
627 'delegation_to_spn': None,
628 'delegation_from_dn': None,
629 'trusted_to_auth_for_delegation': False,
630 'fast_support': False
634 'machine_account': machine_account
,
639 cache_key
= tuple(sorted(account_opts
.items()))
641 creds
= self
.account_cache
.get(cache_key
)
643 creds
= self
.create_account_opts(**account_opts
)
644 self
.account_cache
[cache_key
] = creds
648 def create_account_opts(self
, *,
651 allowed_replication_mock
,
653 denied_replication_mock
,
655 revealed_to_mock_rodc
,
656 no_auth_data_required
,
661 trusted_to_auth_for_delegation
,
664 self
.assertFalse(not_delegated
)
666 self
.assertIsNone(delegation_to_spn
)
667 self
.assertIsNone(delegation_from_dn
)
668 self
.assertFalse(trusted_to_auth_for_delegation
)
670 samdb
= self
.get_samdb()
671 rodc_samdb
= self
.get_rodc_samdb()
673 rodc_dn
= self
.get_server_dn(rodc_samdb
)
675 user_name
= self
.account_base
+ str(self
.account_id
)
676 type(self
).account_id
+= 1
678 user_account_control
= 0
679 if trusted_to_auth_for_delegation
:
680 user_account_control |
= UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
682 user_account_control |
= UF_NOT_DELEGATED
683 if no_auth_data_required
:
684 user_account_control |
= UF_NO_AUTH_DATA_REQUIRED
688 enctypes
= supported_enctypes
690 enctypes
= enctypes
or 0
691 enctypes |
= KerberosCredentials
.fast_supported_bits
693 if enctypes
is not None:
694 details
['msDS-SupportedEncryptionTypes'] = str(enctypes
)
696 if delegation_to_spn
:
697 details
['msDS-AllowedToDelegateTo'] = delegation_to_spn
699 if delegation_from_dn
:
700 security_descriptor
= self
.get_security_descriptor(
702 details
['msDS-AllowedToActOnBehalfOfOtherIdentity'] = (
706 spn
= 'host/' + user_name
710 creds
, dn
= self
.create_account(samdb
, user_name
,
711 machine_account
=machine_account
,
713 additional_details
=details
,
714 account_control
=user_account_control
)
716 keys
= self
.get_keys(samdb
, dn
)
717 self
.creds_set_keys(creds
, keys
)
719 # Handle secret replication to the RODC.
721 if allowed_replication
or revealed_to_rodc
:
722 # Allow replicating this account's secrets if requested, or allow
723 # it only temporarily if we're about to replicate them.
724 allowed_cleanup
= self
.add_to_group(
726 'msDS-RevealOnDemandGroup')
729 # Replicate this account's secrets to the RODC.
730 self
.replicate_account_to_rodc(dn
)
732 if not allowed_replication
:
733 # If we don't want replicating secrets to be allowed for this
734 # account, disable it again.
735 samdb
.modify(allowed_cleanup
)
737 self
.check_revealed(dn
,
739 revealed
=revealed_to_rodc
)
741 if denied_replication
:
742 # Deny replicating this account's secrets to the RODC.
743 self
.add_to_group(dn
, rodc_dn
, 'msDS-NeverRevealGroup')
745 # Handle secret replication to the mock RODC.
747 if allowed_replication_mock
or revealed_to_mock_rodc
:
748 # Allow replicating this account's secrets if requested, or allow
749 # it only temporarily if we want to add the account to the mock
750 # RODC's msDS-RevealedUsers.
751 rodc_ctx
= self
.get_mock_rodc_ctx()
752 mock_rodc_dn
= ldb
.Dn(samdb
, rodc_ctx
.acct_dn
)
754 allowed_mock_cleanup
= self
.add_to_group(
756 'msDS-RevealOnDemandGroup')
758 if revealed_to_mock_rodc
:
759 # Request replicating this account's secrets to the mock RODC,
760 # which updates msDS-RevealedUsers.
761 self
.reveal_account_to_mock_rodc(dn
)
763 if not allowed_replication_mock
:
764 # If we don't want replicating secrets to be allowed for this
765 # account, disable it again.
766 samdb
.modify(allowed_mock_cleanup
)
768 self
.check_revealed(dn
,
770 revealed
=revealed_to_mock_rodc
)
772 if denied_replication_mock
:
773 # Deny replicating this account's secrets to the mock RODC.
774 rodc_ctx
= self
.get_mock_rodc_ctx()
775 mock_rodc_dn
= ldb
.Dn(samdb
, rodc_ctx
.acct_dn
)
777 self
.add_to_group(dn
, mock_rodc_dn
, 'msDS-NeverRevealGroup')
781 def get_client_creds(self
,
782 allow_missing_password
=False,
783 allow_missing_keys
=True):
784 def create_client_account():
785 return self
.get_cached_creds(machine_account
=False)
787 c
= self
._get
_krb
5_creds
(prefix
='CLIENT',
788 allow_missing_password
=allow_missing_password
,
789 allow_missing_keys
=allow_missing_keys
,
790 fallback_creds_fn
=create_client_account
)
793 def get_mach_creds(self
,
794 allow_missing_password
=False,
795 allow_missing_keys
=True):
796 def create_mach_account():
797 return self
.get_cached_creds(machine_account
=True,
798 opts
={'fast_support': True})
800 c
= self
._get
_krb
5_creds
(prefix
='MAC',
801 allow_missing_password
=allow_missing_password
,
802 allow_missing_keys
=allow_missing_keys
,
803 fallback_creds_fn
=create_mach_account
)
806 def get_service_creds(self
,
807 allow_missing_password
=False,
808 allow_missing_keys
=True):
809 def create_service_account():
810 return self
.get_cached_creds(
811 machine_account
=True,
813 'trusted_to_auth_for_delegation': True,
817 c
= self
._get
_krb
5_creds
(prefix
='SERVICE',
818 allow_missing_password
=allow_missing_password
,
819 allow_missing_keys
=allow_missing_keys
,
820 fallback_creds_fn
=create_service_account
)
823 def get_rodc_krbtgt_creds(self
,
825 require_strongest_key
=False):
826 if require_strongest_key
:
827 self
.assertTrue(require_keys
)
829 def download_rodc_krbtgt_creds():
830 samdb
= self
.get_samdb()
831 rodc_samdb
= self
.get_rodc_samdb()
833 rodc_dn
= self
.get_server_dn(rodc_samdb
)
835 res
= samdb
.search(rodc_dn
,
836 scope
=ldb
.SCOPE_BASE
,
837 attrs
=['msDS-KrbTgtLink'])
838 krbtgt_dn
= res
[0]['msDS-KrbTgtLink'][0]
840 res
= samdb
.search(krbtgt_dn
,
841 scope
=ldb
.SCOPE_BASE
,
842 attrs
=['sAMAccountName',
843 'msDS-KeyVersionNumber',
844 'msDS-SecondaryKrbTgtNumber'])
845 krbtgt_dn
= res
[0].dn
846 username
= str(res
[0]['sAMAccountName'])
848 creds
= KerberosCredentials()
849 creds
.set_domain(self
.env_get_var('DOMAIN', 'RODC_KRBTGT'))
850 creds
.set_realm(self
.env_get_var('REALM', 'RODC_KRBTGT'))
851 creds
.set_username(username
)
853 kvno
= int(res
[0]['msDS-KeyVersionNumber'][0])
854 krbtgt_number
= int(res
[0]['msDS-SecondaryKrbTgtNumber'][0])
856 rodc_kvno
= krbtgt_number
<< 16 | kvno
857 creds
.set_kvno(rodc_kvno
)
858 creds
.set_dn(krbtgt_dn
)
860 keys
= self
.get_keys(samdb
, krbtgt_dn
)
861 self
.creds_set_keys(creds
, keys
)
863 # The RODC krbtgt account should support the default enctypes,
864 # although it might not have the msDS-SupportedEncryptionTypes
866 self
.creds_set_default_enctypes(creds
)
870 c
= self
._get
_krb
5_creds
(prefix
='RODC_KRBTGT',
871 allow_missing_password
=True,
872 allow_missing_keys
=not require_keys
,
873 require_strongest_key
=require_strongest_key
,
874 fallback_creds_fn
=download_rodc_krbtgt_creds
)
877 def get_mock_rodc_krbtgt_creds(self
,
879 require_strongest_key
=False):
880 if require_strongest_key
:
881 self
.assertTrue(require_keys
)
883 def create_rodc_krbtgt_account():
884 samdb
= self
.get_samdb()
886 rodc_ctx
= self
.get_mock_rodc_ctx()
888 krbtgt_dn
= rodc_ctx
.new_krbtgt_dn
890 res
= samdb
.search(base
=ldb
.Dn(samdb
, krbtgt_dn
),
891 scope
=ldb
.SCOPE_BASE
,
892 attrs
=['msDS-KeyVersionNumber',
893 'msDS-SecondaryKrbTgtNumber'])
895 username
= str(rodc_ctx
.krbtgt_name
)
897 creds
= KerberosCredentials()
898 creds
.set_domain(self
.env_get_var('DOMAIN', 'RODC_KRBTGT'))
899 creds
.set_realm(self
.env_get_var('REALM', 'RODC_KRBTGT'))
900 creds
.set_username(username
)
902 kvno
= int(res
[0]['msDS-KeyVersionNumber'][0])
903 krbtgt_number
= int(res
[0]['msDS-SecondaryKrbTgtNumber'][0])
905 rodc_kvno
= krbtgt_number
<< 16 | kvno
906 creds
.set_kvno(rodc_kvno
)
909 keys
= self
.get_keys(samdb
, dn
)
910 self
.creds_set_keys(creds
, keys
)
912 self
.creds_set_enctypes(creds
)
916 c
= self
._get
_krb
5_creds
(prefix
='MOCK_RODC_KRBTGT',
917 allow_missing_password
=True,
918 allow_missing_keys
=not require_keys
,
919 require_strongest_key
=require_strongest_key
,
920 fallback_creds_fn
=create_rodc_krbtgt_account
)
923 def get_krbtgt_creds(self
,
925 require_strongest_key
=False):
926 if require_strongest_key
:
927 self
.assertTrue(require_keys
)
929 def download_krbtgt_creds():
930 samdb
= self
.get_samdb()
932 krbtgt_rid
= security
.DOMAIN_RID_KRBTGT
933 krbtgt_sid
= '%s-%d' % (samdb
.get_domain_sid(), krbtgt_rid
)
935 res
= samdb
.search(base
='<SID=%s>' % krbtgt_sid
,
936 scope
=ldb
.SCOPE_BASE
,
937 attrs
=['sAMAccountName',
938 'msDS-KeyVersionNumber'])
940 username
= str(res
[0]['sAMAccountName'])
942 creds
= KerberosCredentials()
943 creds
.set_domain(self
.env_get_var('DOMAIN', 'KRBTGT'))
944 creds
.set_realm(self
.env_get_var('REALM', 'KRBTGT'))
945 creds
.set_username(username
)
947 kvno
= int(res
[0]['msDS-KeyVersionNumber'][0])
951 keys
= self
.get_keys(samdb
, dn
)
952 self
.creds_set_keys(creds
, keys
)
954 # The krbtgt account should support the default enctypes, although
955 # it might not (on Samba) have the msDS-SupportedEncryptionTypes
957 self
.creds_set_default_enctypes(creds
,
958 fast_support
=self
.kdc_fast_support
)
962 c
= self
._get
_krb
5_creds
(prefix
='KRBTGT',
963 default_username
='krbtgt',
964 allow_missing_password
=True,
965 allow_missing_keys
=not require_keys
,
966 require_strongest_key
=require_strongest_key
,
967 fallback_creds_fn
=download_krbtgt_creds
)
970 def get_dc_creds(self
,
972 require_strongest_key
=False):
973 if require_strongest_key
:
974 self
.assertTrue(require_keys
)
976 def download_dc_creds():
977 samdb
= self
.get_samdb()
980 dc_sid
= '%s-%d' % (samdb
.get_domain_sid(), dc_rid
)
982 res
= samdb
.search(base
='<SID=%s>' % dc_sid
,
983 scope
=ldb
.SCOPE_BASE
,
984 attrs
=['sAMAccountName',
985 'msDS-KeyVersionNumber'])
987 username
= str(res
[0]['sAMAccountName'])
989 creds
= KerberosCredentials()
990 creds
.set_domain(self
.env_get_var('DOMAIN', 'DC'))
991 creds
.set_realm(self
.env_get_var('REALM', 'DC'))
992 creds
.set_username(username
)
994 kvno
= int(res
[0]['msDS-KeyVersionNumber'][0])
998 keys
= self
.get_keys(samdb
, dn
)
999 self
.creds_set_keys(creds
, keys
)
1001 self
.creds_set_enctypes(creds
)
1005 c
= self
._get
_krb
5_creds
(prefix
='DC',
1006 allow_missing_password
=True,
1007 allow_missing_keys
=not require_keys
,
1008 require_strongest_key
=require_strongest_key
,
1009 fallback_creds_fn
=download_dc_creds
)
1012 def as_req(self
, cname
, sname
, realm
, etypes
, padata
=None, kdc_options
=0):
1013 '''Send a Kerberos AS_REQ, returns the undecoded response
1016 till
= self
.get_KerberosTime(offset
=36000)
1018 req
= self
.AS_REQ_create(padata
=padata
,
1019 kdc_options
=str(kdc_options
),
1029 additional_tickets
=None)
1030 rep
= self
.send_recv_transaction(req
)
1033 def get_as_rep_key(self
, creds
, rep
):
1034 '''Extract the session key from an AS-REP
1036 rep_padata
= self
.der_decode(
1038 asn1Spec
=krb5_asn1
.METHOD_DATA())
1040 for pa
in rep_padata
:
1041 if pa
['padata-type'] == PADATA_ETYPE_INFO2
:
1042 padata_value
= pa
['padata-value']
1045 etype_info2
= self
.der_decode(
1046 padata_value
, asn1Spec
=krb5_asn1
.ETYPE_INFO2())
1048 key
= self
.PasswordKey_from_etype_info2(creds
, etype_info2
[0],
1052 def get_enc_timestamp_pa_data(self
, creds
, rep
, skew
=0):
1053 '''generate the pa_data data element for an AS-REQ
1056 key
= self
.get_as_rep_key(creds
, rep
)
1058 return self
.get_enc_timestamp_pa_data_from_key(key
, skew
=skew
)
1060 def get_enc_timestamp_pa_data_from_key(self
, key
, skew
=0):
1061 (patime
, pausec
) = self
.get_KerberosTimeWithUsec(offset
=skew
)
1062 padata
= self
.PA_ENC_TS_ENC_create(patime
, pausec
)
1063 padata
= self
.der_encode(padata
, asn1Spec
=krb5_asn1
.PA_ENC_TS_ENC())
1065 padata
= self
.EncryptedData_create(key
, KU_PA_ENC_TIMESTAMP
, padata
)
1066 padata
= self
.der_encode(padata
, asn1Spec
=krb5_asn1
.EncryptedData())
1068 padata
= self
.PA_DATA_create(PADATA_ENC_TIMESTAMP
, padata
)
1072 def get_challenge_pa_data(self
, client_challenge_key
, skew
=0):
1073 patime
, pausec
= self
.get_KerberosTimeWithUsec(offset
=skew
)
1074 padata
= self
.PA_ENC_TS_ENC_create(patime
, pausec
)
1075 padata
= self
.der_encode(padata
,
1076 asn1Spec
=krb5_asn1
.PA_ENC_TS_ENC())
1078 padata
= self
.EncryptedData_create(client_challenge_key
,
1079 KU_ENC_CHALLENGE_CLIENT
,
1081 padata
= self
.der_encode(padata
,
1082 asn1Spec
=krb5_asn1
.EncryptedData())
1084 padata
= self
.PA_DATA_create(PADATA_ENCRYPTED_CHALLENGE
,
1089 def get_as_rep_enc_data(self
, key
, rep
):
1090 ''' Decrypt and Decode the encrypted data in an AS-REP
1092 enc_part
= key
.decrypt(KU_AS_REP_ENC_PART
, rep
['enc-part']['cipher'])
1093 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
1094 # application tag 26
1096 enc_part
= self
.der_decode(
1097 enc_part
, asn1Spec
=krb5_asn1
.EncASRepPart())
1099 enc_part
= self
.der_decode(
1100 enc_part
, asn1Spec
=krb5_asn1
.EncTGSRepPart())
1104 def check_pre_authentication(self
, rep
):
1105 """ Check that the kdc response was pre-authentication required
1107 self
.check_error_rep(rep
, KDC_ERR_PREAUTH_REQUIRED
)
1109 def check_as_reply(self
, rep
):
1110 """ Check that the kdc response is an AS-REP and that the
1116 match the expected values
1118 self
.check_reply(rep
, msg_type
=KRB_AS_REP
)
1120 def check_tgs_reply(self
, rep
):
1121 """ Check that the kdc response is an TGS-REP and that the
1127 match the expected values
1129 self
.check_reply(rep
, msg_type
=KRB_TGS_REP
)
1131 def check_reply(self
, rep
, msg_type
):
1133 # Should have a reply, and it should an TGS-REP message.
1134 self
.assertIsNotNone(rep
)
1135 self
.assertEqual(rep
['msg-type'], msg_type
, "rep = {%s}" % rep
)
1137 # Protocol version number should be 5
1138 pvno
= int(rep
['pvno'])
1139 self
.assertEqual(5, pvno
, "rep = {%s}" % rep
)
1141 # The ticket version number should be 5
1142 tkt_vno
= int(rep
['ticket']['tkt-vno'])
1143 self
.assertEqual(5, tkt_vno
, "rep = {%s}" % rep
)
1145 # Check that the kvno is not an RODC kvno
1146 # MIT kerberos does not provide the kvno, so we treat it as optional.
1147 # This is tested in compatability_test.py
1148 if 'kvno' in rep
['enc-part']:
1149 kvno
= int(rep
['enc-part']['kvno'])
1150 # If the high order bits are set this is an RODC kvno.
1151 self
.assertEqual(0, kvno
& 0xFFFF0000, "rep = {%s}" % rep
)
1153 def check_error_rep(self
, rep
, expected
):
1154 """ Check that the reply is an error message, with the expected
1155 error-code specified.
1157 self
.assertIsNotNone(rep
)
1158 self
.assertEqual(rep
['msg-type'], KRB_ERROR
, "rep = {%s}" % rep
)
1159 if isinstance(expected
, collections
.abc
.Container
):
1160 self
.assertIn(rep
['error-code'], expected
, "rep = {%s}" % rep
)
1162 self
.assertEqual(rep
['error-code'], expected
, "rep = {%s}" % rep
)
1164 def tgs_req(self
, cname
, sname
, realm
, ticket
, key
, etypes
,
1165 expected_error_mode
=0, padata
=None, kdc_options
=0,
1166 to_rodc
=False, service_creds
=None, expect_pac
=True,
1167 expect_edata
=None, expected_flags
=None, unexpected_flags
=None):
1168 '''Send a TGS-REQ, returns the response and the decrypted and
1172 subkey
= self
.RandomKey(key
.etype
)
1174 (ctime
, cusec
) = self
.get_KerberosTimeWithUsec()
1176 tgt
= KerberosTicketCreds(ticket
,
1181 if service_creds
is not None:
1182 decryption_key
= self
.TicketDecryptionKey_from_creds(
1185 decryption_key
= None
1187 if not expected_error_mode
:
1188 check_error_fn
= None
1189 check_rep_fn
= self
.generic_check_kdc_rep
1191 check_error_fn
= self
.generic_check_kdc_error
1194 def generate_padata(_kdc_exchange_dict
,
1198 return padata
, req_body
1200 kdc_exchange_dict
= self
.tgs_exchange_dict(
1201 expected_crealm
=realm
,
1202 expected_cname
=cname
,
1203 expected_srealm
=realm
,
1204 expected_sname
=sname
,
1205 expected_error_mode
=expected_error_mode
,
1206 expected_flags
=expected_flags
,
1207 unexpected_flags
=unexpected_flags
,
1208 check_error_fn
=check_error_fn
,
1209 check_rep_fn
=check_rep_fn
,
1210 check_kdc_private_fn
=self
.generic_check_kdc_private
,
1211 ticket_decryption_key
=decryption_key
,
1212 generate_padata_fn
=generate_padata
if padata
is not None else None,
1214 authenticator_subkey
=subkey
,
1215 kdc_options
=str(kdc_options
),
1216 expect_edata
=expect_edata
,
1217 expect_pac
=expect_pac
,
1220 rep
= self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
1226 if expected_error_mode
:
1229 ticket_creds
= kdc_exchange_dict
['rep_ticket_creds']
1230 enc_part
= ticket_creds
.encpart_private
1232 return rep
, enc_part
1234 def get_service_ticket(self
, tgt
, target_creds
, service
='host',
1235 to_rodc
=False, kdc_options
=None,
1236 expected_flags
=None, unexpected_flags
=None,
1238 user_name
= tgt
.cname
['name-string'][0]
1239 target_name
= target_creds
.get_username()
1240 cache_key
= (user_name
, target_name
, service
, to_rodc
, kdc_options
)
1243 ticket
= self
.tkt_cache
.get(cache_key
)
1245 if ticket
is not None:
1248 etype
= (AES256_CTS_HMAC_SHA1_96
, ARCFOUR_HMAC_MD5
)
1250 if kdc_options
is None:
1252 kdc_options
= krb5_asn1
.KDCOptions(kdc_options
)
1254 key
= tgt
.session_key
1260 target_name
= target_creds
.get_username()[:-1]
1261 sname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
1262 names
=[service
, target_name
])
1264 rep
, enc_part
= self
.tgs_req(cname
, sname
, realm
, ticket
, key
, etype
,
1266 service_creds
=target_creds
,
1267 kdc_options
=kdc_options
,
1268 expected_flags
=expected_flags
,
1269 unexpected_flags
=unexpected_flags
)
1271 service_ticket
= rep
['ticket']
1273 ticket_etype
= service_ticket
['enc-part']['etype']
1274 target_key
= self
.TicketDecryptionKey_from_creds(target_creds
,
1277 session_key
= self
.EncryptionKey_import(enc_part
['key'])
1279 service_ticket_creds
= KerberosTicketCreds(service_ticket
,
1285 decryption_key
=target_key
)
1287 self
.tkt_cache
[cache_key
] = service_ticket_creds
1289 return service_ticket_creds
1291 def get_tgt(self
, creds
, to_rodc
=False, kdc_options
=None,
1292 expected_flags
=None, unexpected_flags
=None,
1294 user_name
= creds
.get_username()
1295 cache_key
= (user_name
, to_rodc
, kdc_options
)
1298 tgt
= self
.tkt_cache
.get(cache_key
)
1303 realm
= creds
.get_realm()
1305 salt
= creds
.get_salt()
1307 etype
= (AES256_CTS_HMAC_SHA1_96
, ARCFOUR_HMAC_MD5
)
1308 cname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
1310 sname
= self
.PrincipalName_create(name_type
=NT_SRV_INST
,
1311 names
=['krbtgt', realm
])
1313 till
= self
.get_KerberosTime(offset
=36000)
1316 krbtgt_creds
= self
.get_rodc_krbtgt_creds()
1318 krbtgt_creds
= self
.get_krbtgt_creds()
1319 ticket_decryption_key
= (
1320 self
.TicketDecryptionKey_from_creds(krbtgt_creds
))
1322 if kdc_options
is None:
1323 kdc_options
= ('forwardable,'
1327 kdc_options
= krb5_asn1
.KDCOptions(kdc_options
)
1329 pac_options
= '1' # supports claims
1331 rep
, kdc_exchange_dict
= self
._test
_as
_exchange
(
1336 client_as_etypes
=etype
,
1337 expected_error_mode
=KDC_ERR_PREAUTH_REQUIRED
,
1338 expected_crealm
=realm
,
1339 expected_cname
=cname
,
1340 expected_srealm
=realm
,
1341 expected_sname
=sname
,
1343 expected_flags
=expected_flags
,
1344 unexpected_flags
=unexpected_flags
,
1347 kdc_options
=kdc_options
,
1349 ticket_decryption_key
=ticket_decryption_key
,
1351 pac_options
=pac_options
,
1353 self
.check_pre_authentication(rep
)
1355 etype_info2
= kdc_exchange_dict
['preauth_etype_info2']
1357 preauth_key
= self
.PasswordKey_from_etype_info2(creds
,
1361 ts_enc_padata
= self
.get_enc_timestamp_pa_data(creds
, rep
)
1363 padata
= [ts_enc_padata
]
1365 expected_realm
= realm
.upper()
1367 expected_sname
= self
.PrincipalName_create(
1368 name_type
=NT_SRV_INST
, names
=['krbtgt', realm
.upper()])
1370 expected_etypes
= krbtgt_creds
.tgs_supported_enctypes
1372 rep
, kdc_exchange_dict
= self
._test
_as
_exchange
(
1377 client_as_etypes
=etype
,
1378 expected_error_mode
=0,
1379 expected_crealm
=expected_realm
,
1380 expected_cname
=cname
,
1381 expected_srealm
=expected_realm
,
1382 expected_sname
=expected_sname
,
1384 expected_flags
=expected_flags
,
1385 unexpected_flags
=unexpected_flags
,
1386 expected_supported_etypes
=expected_etypes
,
1389 kdc_options
=kdc_options
,
1390 preauth_key
=preauth_key
,
1391 ticket_decryption_key
=ticket_decryption_key
,
1393 pac_options
=pac_options
,
1395 self
.check_as_reply(rep
)
1397 ticket_creds
= kdc_exchange_dict
['rep_ticket_creds']
1399 self
.tkt_cache
[cache_key
] = ticket_creds
1403 # Named tuple to contain values of interest when the PAC is decoded.
1404 PacData
= namedtuple(
1406 "account_name account_sid logon_name upn domain_name")
1408 def get_pac_data(self
, authorization_data
):
1409 '''Decode the PAC element contained in the authorization-data element
1417 # The PAC data will be wrapped in an AD_IF_RELEVANT element
1418 ad_if_relevant_elements
= (
1419 x
for x
in authorization_data
if x
['ad-type'] == AD_IF_RELEVANT
)
1420 for dt
in ad_if_relevant_elements
:
1421 buf
= self
.der_decode(
1422 dt
['ad-data'], asn1Spec
=krb5_asn1
.AD_IF_RELEVANT())
1423 # The PAC data is further wrapped in a AD_WIN2K_PAC element
1424 for ad
in (x
for x
in buf
if x
['ad-type'] == AD_WIN2K_PAC
):
1425 pb
= ndr_unpack(krb5pac
.PAC_DATA
, ad
['ad-data'])
1426 for pac
in pb
.buffers
:
1427 if pac
.type == krb5pac
.PAC_TYPE_LOGON_INFO
:
1429 pac
.info
.info
.info3
.base
.account_name
)
1431 str(pac
.info
.info
.info3
.base
.domain_sid
)
1432 + "-" + str(pac
.info
.info
.info3
.base
.rid
))
1433 elif pac
.type == krb5pac
.PAC_TYPE_LOGON_NAME
:
1434 logon_name
= pac
.info
.account_name
1435 elif pac
.type == krb5pac
.PAC_TYPE_UPN_DNS_INFO
:
1436 upn
= pac
.info
.upn_name
1437 domain_name
= pac
.info
.dns_domain_name
1439 return self
.PacData(
1446 def decode_service_ticket(self
, creds
, ticket
):
1447 '''Decrypt and decode a service ticket
1450 name
= creds
.get_username()
1451 if name
.endswith('$'):
1453 realm
= creds
.get_realm()
1454 salt
= "%s.%s@%s" % (name
, realm
.lower(), realm
.upper())
1456 key
= self
.PasswordKey_create(
1457 ticket
['enc-part']['etype'],
1458 creds
.get_password(),
1460 ticket
['enc-part']['kvno'])
1462 enc_part
= key
.decrypt(KU_TICKET
, ticket
['enc-part']['cipher'])
1463 enc_ticket_part
= self
.der_decode(
1464 enc_part
, asn1Spec
=krb5_asn1
.EncTicketPart())
1465 return enc_ticket_part
1467 def get_objectSid(self
, samdb
, dn
):
1468 ''' Get the objectSID for a DN
1469 Note: performs an Ldb query.
1471 res
= samdb
.search(dn
, scope
=SCOPE_BASE
, attrs
=["objectSID"])
1472 self
.assertTrue(len(res
) == 1, "did not get objectSid for %s" % dn
)
1473 sid
= samdb
.schema_format_value("objectSID", res
[0]["objectSID"][0])
1474 return sid
.decode('utf8')
1476 def add_attribute(self
, samdb
, dn_str
, name
, value
):
1477 if isinstance(value
, list):
1481 flag
= ldb
.FLAG_MOD_ADD
1483 dn
= ldb
.Dn(samdb
, dn_str
)
1484 msg
= ldb
.Message(dn
)
1485 msg
[name
] = ldb
.MessageElement(values
, flag
, name
)
1488 def modify_attribute(self
, samdb
, dn_str
, name
, value
):
1489 if isinstance(value
, list):
1493 flag
= ldb
.FLAG_MOD_REPLACE
1495 dn
= ldb
.Dn(samdb
, dn_str
)
1496 msg
= ldb
.Message(dn
)
1497 msg
[name
] = ldb
.MessageElement(values
, flag
, name
)
1500 def create_ccache(self
, cname
, ticket
, enc_part
):
1501 """ Lay out a version 4 on-disk credentials cache, to be read using the
1505 field
= krb5ccache
.DELTATIME_TAG()
1506 field
.kdc_sec_offset
= 0
1507 field
.kdc_usec_offset
= 0
1509 v4tag
= krb5ccache
.V4TAG()
1513 v4tags
= krb5ccache
.V4TAGS()
1515 v4tags
.further_tags
= b
''
1517 optional_header
= krb5ccache
.V4HEADER()
1518 optional_header
.v4tags
= v4tags
1520 cname_string
= cname
['name-string']
1522 cprincipal
= krb5ccache
.PRINCIPAL()
1523 cprincipal
.name_type
= cname
['name-type']
1524 cprincipal
.component_count
= len(cname_string
)
1525 cprincipal
.realm
= ticket
['realm']
1526 cprincipal
.components
= cname_string
1528 sname
= ticket
['sname']
1529 sname_string
= sname
['name-string']
1531 sprincipal
= krb5ccache
.PRINCIPAL()
1532 sprincipal
.name_type
= sname
['name-type']
1533 sprincipal
.component_count
= len(sname_string
)
1534 sprincipal
.realm
= ticket
['realm']
1535 sprincipal
.components
= sname_string
1537 key
= self
.EncryptionKey_import(enc_part
['key'])
1539 key_data
= key
.export_obj()
1540 keyblock
= krb5ccache
.KEYBLOCK()
1541 keyblock
.enctype
= key_data
['keytype']
1542 keyblock
.data
= key_data
['keyvalue']
1544 addresses
= krb5ccache
.ADDRESSES()
1548 authdata
= krb5ccache
.AUTHDATA()
1552 # Re-encode the ticket, since it was decoded by another layer.
1553 ticket_data
= self
.der_encode(ticket
, asn1Spec
=krb5_asn1
.Ticket())
1555 authtime
= enc_part
['authtime']
1556 starttime
= enc_part
.get('starttime', authtime
)
1557 endtime
= enc_part
['endtime']
1559 cred
= krb5ccache
.CREDENTIAL()
1560 cred
.client
= cprincipal
1561 cred
.server
= sprincipal
1562 cred
.keyblock
= keyblock
1563 cred
.authtime
= self
.get_EpochFromKerberosTime(authtime
)
1564 cred
.starttime
= self
.get_EpochFromKerberosTime(starttime
)
1565 cred
.endtime
= self
.get_EpochFromKerberosTime(endtime
)
1567 # Account for clock skew of up to five minutes.
1568 self
.assertLess(cred
.authtime
- 5 * 60,
1569 datetime
.now(timezone
.utc
).timestamp(),
1570 "Ticket not yet valid - clocks may be out of sync.")
1571 self
.assertLess(cred
.starttime
- 5 * 60,
1572 datetime
.now(timezone
.utc
).timestamp(),
1573 "Ticket not yet valid - clocks may be out of sync.")
1574 self
.assertGreater(cred
.endtime
- 60 * 60,
1575 datetime
.now(timezone
.utc
).timestamp(),
1576 "Ticket already expired/about to expire - "
1577 "clocks may be out of sync.")
1579 cred
.renew_till
= cred
.endtime
1581 cred
.ticket_flags
= int(enc_part
['flags'], 2)
1582 cred
.addresses
= addresses
1583 cred
.authdata
= authdata
1584 cred
.ticket
= ticket_data
1585 cred
.second_ticket
= b
''
1587 ccache
= krb5ccache
.CCACHE()
1590 ccache
.optional_header
= optional_header
1591 ccache
.principal
= cprincipal
1594 # Serialise the credentials cache structure.
1595 result
= ndr_pack(ccache
)
1597 # Create a temporary file and write the credentials.
1598 cachefile
= tempfile
.NamedTemporaryFile(dir=self
.tempdir
, delete
=False)
1599 cachefile
.write(result
)
1604 def create_ccache_with_user(self
, user_credentials
, mach_name
,
1606 # Obtain a service ticket authorising the user and place it into a
1607 # newly created credentials cache file.
1609 user_name
= user_credentials
.get_username()
1610 realm
= user_credentials
.get_realm()
1612 # Do the initial AS-REQ, should get a pre-authentication required
1614 etype
= (AES256_CTS_HMAC_SHA1_96
, ARCFOUR_HMAC_MD5
)
1615 cname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
1617 sname
= self
.PrincipalName_create(name_type
=NT_SRV_HST
,
1618 names
=["krbtgt", realm
])
1620 rep
= self
.as_req(cname
, sname
, realm
, etype
)
1621 self
.check_pre_authentication(rep
)
1623 # Do the next AS-REQ
1624 padata
= self
.get_enc_timestamp_pa_data(user_credentials
, rep
)
1625 key
= self
.get_as_rep_key(user_credentials
, rep
)
1626 rep
= self
.as_req(cname
, sname
, realm
, etype
, padata
=[padata
])
1627 self
.check_as_reply(rep
)
1629 # Request a ticket to the host service on the machine account
1630 ticket
= rep
['ticket']
1631 enc_part
= self
.get_as_rep_enc_data(key
, rep
)
1632 key
= self
.EncryptionKey_import(enc_part
['key'])
1633 cname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
1635 sname
= self
.PrincipalName_create(name_type
=NT_SRV_HST
,
1636 names
=[service
, mach_name
])
1638 (rep
, enc_part
) = self
.tgs_req(
1639 cname
, sname
, realm
, ticket
, key
, etype
)
1640 self
.check_tgs_reply(rep
)
1641 key
= self
.EncryptionKey_import(enc_part
['key'])
1643 # Check the contents of the pac, and the ticket
1644 ticket
= rep
['ticket']
1646 # Write the ticket into a credentials cache file that can be ingested
1647 # by the main credentials code.
1648 cachefile
= self
.create_ccache(cname
, ticket
, enc_part
)
1650 # Create a credentials object to reference the credentials cache.
1651 creds
= Credentials()
1652 creds
.set_kerberos_state(MUST_USE_KERBEROS
)
1653 creds
.set_username(user_name
, SPECIFIED
)
1654 creds
.set_realm(realm
)
1655 creds
.set_named_ccache(cachefile
.name
, SPECIFIED
, self
.get_lp())
1657 # Return the credentials along with the cache file.
1658 return (creds
, cachefile
)