tests/krb5: Test that computers (and, by extension, gMSAs) cannot perform interactive...
[Samba.git] / python / samba / tests / krb5 / gmsa_tests.py
blob1ce6add528460961a652b2b715a7e9523301e4f7
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) Catalyst.Net Ltd 2024
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <https://www.gnu.org/licenses/>.
20 import sys
21 import os
23 sys.path.insert(0, "bin/python")
24 os.environ["PYTHONUNBUFFERED"] = "1"
26 from typing import Iterable, NewType, Optional, Tuple, TypeVar
28 import datetime
29 from itertools import chain
31 import ldb
33 from samba import auth, dsdb, gensec, ntstatus, NTSTATUSError, werror
34 from samba.dcerpc import gkdi, gmsa, misc, netlogon, security
35 from samba.ndr import ndr_pack, ndr_unpack
36 from samba.nt_time import (
37 nt_time_delta_from_timedelta,
38 nt_time_from_datetime,
39 NtTime,
40 NtTimeDelta,
41 timedelta_from_nt_time_delta,
43 from samba.samdb import SamDB
44 from samba.credentials import Credentials, DONT_USE_KERBEROS
45 from samba.gkdi import (
46 Gkid,
47 GroupKey,
48 KEY_CYCLE_DURATION,
49 MAX_CLOCK_SKEW,
52 from samba.tests import connect_samdb
53 from samba.tests.krb5 import kcrypto
54 from samba.tests.gkdi import GkdiBaseTest, ROOT_KEY_START_TIME
55 from samba.tests.krb5.kdc_base_test import KDCBaseTest
56 from samba.tests.krb5.raw_testcase import KerberosCredentials
57 from samba.tests.krb5.rfc4120_constants import (
58 KU_PA_ENC_TIMESTAMP,
59 NT_PRINCIPAL,
60 PADATA_ENC_TIMESTAMP,
62 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
64 GMSA_DEFAULT_MANAGED_PASSWORD_INTERVAL = 30
66 Gmsa = NewType("Gmsa", ldb.Message)
69 def gkdi_rollover_interval(managed_password_interval: int) -> NtTimeDelta:
70 rollover_interval = NtTimeDelta(
71 managed_password_interval * 24 // 10 * KEY_CYCLE_DURATION
73 if rollover_interval == 0:
74 raise ValueError("rollover interval must not be zero")
75 return rollover_interval
78 class GmsaSeries:
79 start_time: NtTime
80 rollover_interval: NtTimeDelta
82 def __init__(self, start_gkid: Gkid, rollover_interval: NtTimeDelta) -> None:
83 self.start_time = start_gkid.start_nt_time()
84 self.rollover_interval = rollover_interval
86 def interval_gkid(self, n: int) -> Gkid:
87 return Gkid.from_nt_time(self.start_of_interval(n))
89 def start_of_interval(self, n: int) -> NtTime:
90 if not isinstance(n, int):
91 raise ValueError(f"{n} must be an integer")
92 return NtTime(int(self.start_time + n * self.rollover_interval))
94 def during_interval(self, n: int) -> NtTime:
95 return NtTime(int(self.start_of_interval(n) + self.rollover_interval // 2))
97 def during_skew_window(self, n: int) -> NtTime:
98 two_minutes = nt_time_delta_from_timedelta(datetime.timedelta(minutes=2))
99 return NtTime(
100 int(self.start_of_interval(n) + self.rollover_interval - two_minutes)
103 def outside_previous_password_valid_window(self, n: int) -> NtTime:
104 return NtTime(self.start_of_interval(n) + MAX_CLOCK_SKEW)
106 def within_previous_password_valid_window(self, n: int) -> NtTime:
107 return NtTime(self.outside_previous_password_valid_window(n) - 1)
110 class GmsaTests(GkdiBaseTest, KDCBaseTest):
111 def _as_req(
112 self,
113 creds: KerberosCredentials,
114 target_creds: KerberosCredentials,
115 enctype: kcrypto.Enctype,
116 ) -> dict:
117 preauth_key = self.PasswordKey_from_creds(creds, enctype)
119 def generate_padata_fn(
120 _kdc_exchange_dict: dict, _callback_dict: Optional[dict], req_body: dict
121 ) -> Tuple[list, dict]:
122 padata = []
124 patime, pausec = self.get_KerberosTimeWithUsec()
125 enc_ts = self.PA_ENC_TS_ENC_create(patime, pausec)
126 enc_ts = self.der_encode(enc_ts, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
128 enc_ts = self.EncryptedData_create(preauth_key, KU_PA_ENC_TIMESTAMP, enc_ts)
129 enc_ts = self.der_encode(enc_ts, asn1Spec=krb5_asn1.EncryptedData())
131 enc_ts = self.PA_DATA_create(PADATA_ENC_TIMESTAMP, enc_ts)
133 padata.append(enc_ts)
135 return padata, req_body
137 user_name = creds.get_username()
138 cname = self.PrincipalName_create(
139 name_type=NT_PRINCIPAL, names=user_name.split("/")
142 target_name = target_creds.get_username()
143 target_realm = target_creds.get_realm()
145 sname = self.PrincipalName_create(
146 name_type=NT_PRINCIPAL, names=["host", target_name[:-1]]
149 check_error_fn = None
150 check_rep_fn = self.generic_check_kdc_rep
152 expected_sname = self.PrincipalName_create(
153 name_type=NT_PRINCIPAL, names=[target_name]
156 kdc_options = "forwardable,renewable,canonicalize,renewable-ok"
157 kdc_options = krb5_asn1.KDCOptions(kdc_options)
159 ticket_decryption_key = self.TicketDecryptionKey_from_creds(target_creds)
161 kdc_exchange_dict = self.as_exchange_dict(
162 creds=creds,
163 expected_crealm=creds.get_realm(),
164 expected_cname=cname,
165 expected_srealm=target_realm,
166 expected_sname=expected_sname,
167 expected_supported_etypes=target_creds.tgs_supported_enctypes,
168 ticket_decryption_key=ticket_decryption_key,
169 generate_padata_fn=generate_padata_fn,
170 check_error_fn=check_error_fn,
171 check_rep_fn=check_rep_fn,
172 check_kdc_private_fn=self.generic_check_kdc_private,
173 expected_error_mode=0,
174 expected_salt=creds.get_salt(),
175 preauth_key=preauth_key,
176 kdc_options=str(kdc_options),
179 till = self.get_KerberosTime(offset=36000)
181 etypes = kcrypto.Enctype.AES256, kcrypto.Enctype.RC4
183 rep = self._generic_kdc_exchange(
184 kdc_exchange_dict,
185 cname=cname,
186 realm=target_realm,
187 sname=sname,
188 till_time=till,
189 etypes=etypes,
191 self.check_as_reply(rep)
193 return kdc_exchange_dict
195 # Note: unused
196 def gkdi_get_key_start_time(self, key_id: gkdi.KeyEnvelope) -> NtTime:
197 return Gkid.from_key_envelope(key_id).start_nt_time()
199 def get_password(
200 self,
201 samdb: SamDB,
202 target_sd: bytes,
203 root_key_id: Optional[misc.GUID],
204 gkid: Gkid,
205 sid: security.dom_sid,
206 ) -> bytes:
207 group_key = self.get_key_exact(samdb, target_sd, root_key_id, gkid)
209 password = self.generate_gmsa_password(group_key, sid)
210 return self.post_process_password_buffer(password)
212 def get_password_based_on_gkid(
213 self, samdb: SamDB, gkid: Gkid, sid: security.dom_sid
214 ) -> bytes:
215 return self.get_password(samdb, self.gmsa_sd, None, gkid, sid)
217 def get_password_based_on_timestamp(
218 self, samdb: SamDB, timestamp: NtTime, sid: security.dom_sid
219 ) -> bytes:
220 return self.get_password_based_on_gkid(samdb, Gkid.from_nt_time(timestamp), sid)
222 # Note: unused
223 def get_password_based_on_key_id(
224 self, samdb: SamDB, managed_password: gkdi.KeyEnvelope, sid: str
225 ) -> bytes:
226 return self.get_password(
227 samdb,
228 self.gmsa_sd,
229 managed_password.root_key_id,
230 Gkid.from_key_envelope(managed_password),
231 sid,
234 def generate_gmsa_password(self, key: GroupKey, sid: str) -> bytes:
235 context = ndr_pack(security.dom_sid(sid))
236 algorithm = key.hash_algorithm.algorithm()
237 gmsa_password_len = 256
239 return self.kdf(
240 algorithm,
241 key.key,
242 context,
243 label="GMSA PASSWORD",
244 len_in_bytes=gmsa_password_len,
247 def post_process_password_buffer(self, key: bytes) -> bytes:
248 self.assertEqual(0, len(key) & 1, f"length of key ({len(key)}) is not even")
250 def convert_null(t: Tuple[int, int]) -> Tuple[int, int]:
251 if t == (0, 0):
252 return 1, 0
254 return t
256 T = TypeVar("T")
258 def take_pairs(iterable: Iterable[T]) -> Iterable[Tuple[T, T]]:
259 it = iter(iterable)
260 while True:
261 try:
262 first = next(it)
263 except StopIteration:
264 break
266 yield first, next(it)
268 return bytes(chain.from_iterable(map(convert_null, take_pairs(key))))
270 def get_gmsa_object(self, samdb: SamDB, dn: ldb.Dn) -> Gmsa:
271 res = samdb.search(
273 scope=ldb.SCOPE_BASE,
274 attrs=[
275 "msDS-ManagedPasswordInterval",
276 "msDS-ManagedPasswordId",
277 "msDS-ManagedPasswordPreviousId",
278 "whenCreated",
281 return res[0]
283 def gmsa_rollover_interval(self, gmsa_object: Gmsa) -> NtTimeDelta:
284 managed_password_interval = gmsa_object.get(
285 "msDS-ManagedPasswordInterval", idx=0
287 if managed_password_interval is None:
288 managed_password_interval = GMSA_DEFAULT_MANAGED_PASSWORD_INTERVAL
289 else:
290 managed_password_interval = int(managed_password_interval)
292 return gkdi_rollover_interval(managed_password_interval)
294 def gmsa_creation_nt_time(self, gmsa_object: Gmsa) -> NtTime:
295 creation_time: Optional[bytes] = gmsa_object.get("whenCreated", idx=0)
296 self.assertIsNotNone(creation_time)
297 assert creation_time is not None # to help the type checker
299 create_time = datetime.datetime.fromtimestamp(
300 ldb.string_to_time(creation_time.decode()), tz=datetime.timezone.utc
302 return nt_time_from_datetime(create_time)
304 def gmsa_series(self, managed_password_interval: int) -> GmsaSeries:
305 return GmsaSeries(
306 self.future_gkid(), gkdi_rollover_interval(managed_password_interval)
309 def gmsa_series_for_account(
310 self, samdb: SamDB, creds: KerberosCredentials, managed_password_interval: int
311 ) -> GmsaSeries:
312 gmsa_object = self.get_gmsa_object(samdb, creds.get_dn())
313 current_nt_time = self.current_nt_time(samdb)
314 gkid = Gkid.from_nt_time(
315 self.account_quantized_time(gmsa_object, current_nt_time)
317 return GmsaSeries(gkid, gkdi_rollover_interval(managed_password_interval))
319 def quantized_time(
320 self, key_start_time: NtTime, time: NtTime, gkdi_rollover_interval: NtTimeDelta
321 ) -> NtTime:
322 self.assertLessEqual(key_start_time, time)
324 time_since_key_start = NtTimeDelta(time - key_start_time)
325 quantized_time_since_key_start = NtTimeDelta(
326 time_since_key_start // gkdi_rollover_interval * gkdi_rollover_interval
328 return NtTime(key_start_time + quantized_time_since_key_start)
330 def account_quantized_time(self, gmsa_object: Gmsa, time: NtTime) -> NtTime:
331 pwd_id_blob = gmsa_object.get("msDS-ManagedPasswordId", idx=0)
332 self.assertIsNotNone(pwd_id_blob, "SAM should have initialized password ID")
334 pwd_id = ndr_unpack(gkdi.KeyEnvelope, pwd_id_blob)
335 key_start_time = Gkid.from_key_envelope(pwd_id).start_nt_time()
337 gkdi_rollover_interval = self.gmsa_rollover_interval(gmsa_object)
338 return self.quantized_time(key_start_time, time, gkdi_rollover_interval)
340 def expected_gmsa_password_blob(
341 self,
342 samdb: SamDB,
343 creds: KerberosCredentials,
344 gkid: Gkid,
346 query_expiration_gkid: Gkid,
347 previous_gkid: Optional[Gkid] = None,
348 return_future_key: bool = False,
349 ) -> gmsa.MANAGEDPASSWORD_BLOB:
350 new_password = self.get_password_based_on_gkid(samdb, gkid, creds.get_sid())
351 old_password = None
352 if previous_gkid is not None:
353 old_password = self.get_password_based_on_gkid(
354 samdb, previous_gkid, creds.get_sid()
357 current_time = self.current_nt_time(samdb)
359 gmsa_object = self.get_gmsa_object(samdb, creds.get_dn())
360 gkdi_rollover_interval = self.gmsa_rollover_interval(gmsa_object)
362 query_expiration_time = query_expiration_gkid.start_nt_time()
363 query_password_interval = NtTimeDelta(query_expiration_time - current_time)
364 unchanged_password_interval = NtTimeDelta(
365 max(
367 query_expiration_time
368 + (gkdi_rollover_interval if return_future_key else 0)
369 - current_time
370 - MAX_CLOCK_SKEW,
374 return self.marshal_password(
375 new_password,
376 old_password,
377 query_password_interval,
378 unchanged_password_interval,
381 def expected_current_gmsa_password_blob(
382 self,
383 samdb: SamDB,
384 creds: KerberosCredentials,
386 future_key_is_acceptable: bool,
387 ) -> gmsa.MANAGEDPASSWORD_BLOB:
388 gmsa_object = self.get_gmsa_object(samdb, creds.get_dn())
390 gkdi_rollover_interval = self.gmsa_rollover_interval(gmsa_object)
392 pwd_id_blob = gmsa_object.get("msDS-ManagedPasswordId", idx=0)
393 self.assertIsNotNone(pwd_id_blob, "SAM should have initialized password ID")
395 pwd_id = ndr_unpack(gkdi.KeyEnvelope, pwd_id_blob)
396 key_start_time = Gkid.from_key_envelope(pwd_id).start_nt_time()
398 current_time = self.current_nt_time(samdb)
400 new_key_start_time = self.quantized_time(
401 key_start_time, current_time, gkdi_rollover_interval
403 new_key_expiration_time = NtTime(new_key_start_time + gkdi_rollover_interval)
405 account_sid = creds.get_sid()
407 within_clock_skew_window = (
408 new_key_expiration_time - current_time <= MAX_CLOCK_SKEW
410 return_future_key = future_key_is_acceptable and within_clock_skew_window
411 if return_future_key:
412 new_password = self.get_password_based_on_timestamp(
413 samdb, new_key_expiration_time, account_sid
415 old_password = self.get_password_based_on_timestamp(
416 samdb, new_key_start_time, account_sid
418 else:
419 new_password = self.get_password_based_on_timestamp(
420 samdb, new_key_start_time, account_sid
423 account_age = NtTimeDelta(
424 current_time - self.gmsa_creation_nt_time(gmsa_object)
426 if account_age >= gkdi_rollover_interval:
427 old_password = self.get_password_based_on_timestamp(
428 samdb,
429 NtTime(new_key_start_time - gkdi_rollover_interval),
430 account_sid,
432 else:
433 # The account is not old enough to have a previous password.
434 old_password = None
436 key_expiration_time = NtTime(key_start_time + gkdi_rollover_interval)
437 key_is_expired = key_expiration_time <= current_time
439 query_expiration_time = NtTime(
440 new_key_expiration_time if key_is_expired else key_expiration_time
442 query_password_interval = NtTimeDelta(query_expiration_time - current_time)
443 unchanged_password_interval = NtTimeDelta(
444 max(
446 query_expiration_time
447 + (gkdi_rollover_interval if return_future_key else 0)
448 - current_time
449 - MAX_CLOCK_SKEW,
453 return self.marshal_password(
454 new_password,
455 old_password,
456 query_password_interval,
457 unchanged_password_interval,
460 def marshal_password(
461 self,
462 current_password: bytes,
463 previous_password: Optional[bytes],
464 query_password_interval: NtTimeDelta,
465 unchanged_password_interval: NtTimeDelta,
466 ) -> gmsa.MANAGEDPASSWORD_BLOB:
467 managed_password = gmsa.MANAGEDPASSWORD_BLOB()
469 managed_password.passwords.current = current_password
470 managed_password.passwords.previous = previous_password
471 managed_password.passwords.query_interval = query_password_interval
472 managed_password.passwords.unchanged_interval = unchanged_password_interval
474 return managed_password
476 def gmsa_account(
477 self,
479 samdb: Optional[SamDB] = None,
480 interval: int = 1,
481 msa_membership: Optional[str] = None,
482 **kwargs,
483 ) -> KerberosCredentials:
484 if msa_membership is None:
485 allow_world_sddl = "O:SYD:(A;;RP;;;WD)"
486 msa_membership = allow_world_sddl
488 msa_membership_sd = ndr_pack(
489 security.descriptor.from_sddl(msa_membership, security.dom_sid())
492 try:
493 creds = self.get_cached_creds(
494 samdb=samdb,
495 account_type=self.AccountType.GROUP_MANAGED_SERVICE,
496 opts={
497 "additional_details": self.freeze(
499 "msDS-GroupMSAMembership": msa_membership_sd,
500 "msDS-ManagedPasswordInterval": str(interval),
503 **kwargs,
505 # Ensure the gMSA is a brand‐new account.
506 use_cache=False,
508 except ldb.LdbError as err:
509 if err.args[0] == ldb.ERR_UNWILLING_TO_PERFORM:
510 self.fail(
511 "If you’re running these tests against Windows, try “warming up”"
512 " the GKDI service by running `samba.tests.krb5.gkdi_tests` first."
515 raise
517 # Derive the account’s current password. The account is too new to have a previous password yet.
518 managed_pwd = self.expected_current_gmsa_password_blob(
519 self.get_samdb() if samdb is None else samdb,
520 creds,
521 future_key_is_acceptable=False,
524 # Set the password.
525 self.assertIsNotNone(
526 managed_pwd.passwords.current, "current password must be present"
528 creds.set_utf16_password(managed_pwd.passwords.current)
530 return creds
532 def get_local_samdb(self) -> SamDB:
533 """Return a connection to the local database."""
535 lp = self.get_lp()
536 samdb = connect_samdb(
537 samdb_url=lp.samdb_url(), lp=lp, credentials=self.get_admin_creds()
539 self.assertLocalSamDB(samdb)
541 return samdb
543 # Perform a gensec logon using NTLMSSP. As samdb is passed in as a
544 # parameter, it can have a time set on it with set_db_time().
545 def gensec_ntlmssp_logon(
546 self, client_creds: Credentials, samdb: SamDB, expect_success: bool = True
547 ) -> "Optional[auth.session_info]":
548 lp = self.get_lp()
549 lp.set("server role", "active directory domain controller")
551 settings = {"lp_ctx": lp, "target_hostname": lp.get("netbios name")}
553 gensec_client = gensec.Security.start_client(settings)
554 # Ensure that we don’t use Kerberos.
555 self.assertEqual(DONT_USE_KERBEROS, client_creds.get_kerberos_state())
556 gensec_client.set_credentials(client_creds)
557 gensec_client.want_feature(gensec.FEATURE_SEAL)
558 gensec_client.start_mech_by_name("ntlmssp")
560 auth_context = auth.AuthContext(lp_ctx=lp, ldb=samdb)
562 gensec_server = gensec.Security.start_server(settings, auth_context)
563 machine_creds = Credentials()
564 machine_creds.guess(lp)
565 machine_creds.set_machine_account(lp)
566 gensec_server.set_credentials(machine_creds)
568 gensec_server.start_mech_by_name("ntlmssp")
570 client_finished = False
571 server_finished = False
572 client_to_server = b""
573 server_to_client = b""
575 # Operate as both the client and the server to verify the user’s credentials.
576 while not client_finished or not server_finished:
577 if not client_finished:
578 client_finished, client_to_server = gensec_client.update(
579 server_to_client
581 if not server_finished:
582 try:
583 server_finished, server_to_client = gensec_server.update(
584 client_to_server
586 except NTSTATUSError as err:
587 self.assertFalse(expect_success, "got an unexpected error")
589 self.assertEqual(ntstatus.NT_STATUS_WRONG_PASSWORD, err.args[0])
590 return None
592 self.assertTrue(expect_success, "expected to get an error")
594 # Retrieve the SIDs from the security token.
595 return gensec_server.session_info()
597 def check_nt_interval(
598 self,
599 expected_nt_interval: NtTimeDelta,
600 nt_interval: NtTimeDelta,
601 interval_name: str,
602 ) -> None:
603 """Check that the intervals match to within thirty seconds or so."""
605 threshold = datetime.timedelta(seconds=30)
607 interval = timedelta_from_nt_time_delta(nt_interval)
608 expected_interval = timedelta_from_nt_time_delta(expected_nt_interval)
609 interval_difference = abs(interval - expected_interval)
610 self.assertLess(
611 interval_difference,
612 threshold,
613 f"{interval_name} ({interval}) is out by {interval_difference} from"
614 f" expected ({expected_interval})",
617 def check_managed_pwd_intervals(
618 self,
619 expected_managed_pwd: gmsa.MANAGEDPASSWORD_BLOB,
620 managed_pwd: gmsa.MANAGEDPASSWORD_BLOB,
621 ) -> None:
622 expected_passwords = expected_managed_pwd.passwords
623 passwords = managed_pwd.passwords
625 self.check_nt_interval(
626 expected_passwords.query_interval,
627 passwords.query_interval,
628 "query interval",
630 self.check_nt_interval(
631 expected_passwords.unchanged_interval,
632 passwords.unchanged_interval,
633 "unchanged interval",
636 def check_managed_pwd(
637 self,
638 samdb: SamDB,
639 creds: KerberosCredentials,
641 expected_managed_pwd: gmsa.MANAGEDPASSWORD_BLOB,
642 ) -> None:
643 res = samdb.search(
644 creds.get_dn(), scope=ldb.SCOPE_BASE, attrs=["msDS-ManagedPassword"]
646 self.assertEqual(1, len(res), "gMSA not found")
647 managed_password = res[0].get("msDS-ManagedPassword", idx=0)
649 self.assertIsNotNone(managed_password)
650 managed_pwd = ndr_unpack(gmsa.MANAGEDPASSWORD_BLOB, managed_password)
652 self.assertEqual(1, managed_pwd.version)
653 self.assertEqual(0, managed_pwd.reserved)
654 self.assertEqual(len(managed_password), managed_pwd.length)
656 self.assertIsNotNone(expected_managed_pwd.passwords.current)
658 self.assertEqual(
659 managed_pwd.passwords.current, expected_managed_pwd.passwords.current
661 self.assertEqual(
662 managed_pwd.passwords.previous, expected_managed_pwd.passwords.previous
665 self.check_managed_pwd_intervals(expected_managed_pwd, managed_pwd)
667 # When creating a gMSA, Windows seems to pick the root key with the
668 # greatest msKds-CreateTime having msKds-UseStartTime ≤ ten hours ago.
669 # Bear in mind that it seems also to cache the key, so it won’t always
670 # use the latest one.
672 def get_managed_service_accounts_dn(self) -> ldb.Dn:
673 samdb = self.get_samdb()
674 return samdb.get_wellknown_dn(
675 samdb.get_default_basedn(), dsdb.DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER
678 def check_managed_password_access(
679 self,
680 creds: Credentials,
682 samdb: Optional[SamDB] = None,
683 expect_access: bool = False,
684 expected_werror: int = werror.WERR_SUCCESS,
685 ) -> None:
686 if samdb is None:
687 samdb = self.get_samdb()
688 if expected_werror:
689 self.assertFalse(expect_access)
690 managed_service_accounts_dn = self.get_managed_service_accounts_dn()
691 username = creds.get_username()
693 # Try base, subtree, and one‐level searches.
694 searches = (
695 (creds.get_dn(), ldb.SCOPE_BASE),
696 (managed_service_accounts_dn, ldb.SCOPE_SUBTREE),
697 (managed_service_accounts_dn, ldb.SCOPE_ONELEVEL),
700 for dn, scope in searches:
701 # Perform a search and see whether we’re allowed to view the managed password.
703 try:
704 res = samdb.search(
706 scope=scope,
707 expression=f"sAMAccountName={username}",
708 attrs=["msDS-ManagedPassword"],
710 except ldb.LdbError as err:
711 self.assertTrue(expected_werror, "got an unexpected error")
713 num, estr = err.args
714 if num != ldb.ERR_OPERATIONS_ERROR:
715 raise
717 self.assertIn(f"{expected_werror:08X}", estr)
718 return
720 self.assertFalse(expected_werror, "expected to get an error")
721 self.assertEqual(1, len(res), "should always find the gMSA")
723 managed_password = res[0].get("msDS-ManagedPassword", idx=0)
724 if expect_access:
725 self.assertIsNotNone(
726 managed_password, "should be allowed to view the password"
728 else:
729 self.assertIsNone(
730 managed_password, "should not be allowed to view the password"
733 def test_retrieved_password_allowed(self):
734 """Test being allowed to view the managed password."""
735 self.check_managed_password_access(self.gmsa_account(), expect_access=True)
737 def test_retrieved_password_denied(self):
738 """Test not being allowed to view the managed password."""
739 deny_world_sddl = "O:SYD:(D;;RP;;;WD)"
740 self.check_managed_password_access(
741 self.gmsa_account(msa_membership=deny_world_sddl), expect_access=False
744 def test_retrieving_password_over_sealed_connection(self):
745 lp = self.get_lp()
746 samdb = SamDB(
747 f"ldap://{self.dc_host}",
748 credentials=self.get_admin_creds(),
749 session_info=auth.system_session(lp),
750 lp=lp,
753 self.check_managed_password_access(
754 self.gmsa_account(), samdb=samdb, expect_access=True
757 def test_retrieving_password_over_unsealed_connection(self):
758 # Requires --use-kerberos=required, or it automatically upgrades to an
759 # encrypted connection.
761 # Remove FEATURE_SEAL which gets added by insta_creds.
762 creds = self.insta_creds(template=self.get_admin_creds())
763 creds.set_gensec_features(creds.get_gensec_features() & ~gensec.FEATURE_SEAL)
765 lp = self.get_lp()
767 sasl_wrap = lp.get("client ldap sasl wrapping")
768 self.addCleanup(lp.set, "client ldap sasl wrapping", sasl_wrap)
769 lp.set("client ldap sasl wrapping", "sign")
771 # Create a second ldb connection without seal.
772 samdb = SamDB(
773 f"ldap://{self.dc_host}",
774 credentials=creds,
775 session_info=auth.system_session(lp),
776 lp=lp,
779 self.check_managed_password_access(
780 self.gmsa_account(),
781 samdb=samdb,
782 expected_werror=werror.WERR_DS_CONFIDENTIALITY_REQUIRED,
785 def test_retrieving_denied_password_over_unsealed_connection(self):
786 # Requires --use-kerberos=required, or it automatically upgrades to an
787 # encrypted connection.
789 # Remove FEATURE_SEAL which gets added by insta_creds.
790 creds = self.insta_creds(template=self.get_admin_creds())
791 creds.set_gensec_features(creds.get_gensec_features() & ~gensec.FEATURE_SEAL)
793 lp = self.get_lp()
795 sasl_wrap = lp.get("client ldap sasl wrapping")
796 self.addCleanup(lp.set, "client ldap sasl wrapping", sasl_wrap)
797 lp.set("client ldap sasl wrapping", "sign")
799 # Create a second ldb connection without seal.
800 samdb = SamDB(
801 f"ldap://{self.dc_host}",
802 credentials=creds,
803 session_info=auth.system_session(lp),
804 lp=lp,
807 # Deny anyone from being able to view the password.
808 deny_world_sddl = "O:SYD:(D;;RP;;;WD)"
809 self.check_managed_password_access(
810 self.gmsa_account(msa_membership=deny_world_sddl),
811 samdb=samdb,
812 expected_werror=werror.WERR_DS_CONFIDENTIALITY_REQUIRED,
815 def future_gkid(self) -> Gkid:
816 """Return (6333, 26, 5)—an arbitrary GKID far enough in the future that
817 it’s situated beyond any reasonable rollover period. But not so far in
818 the future that Python’s datetime library will throw OverflowErrors."""
819 future_date = datetime.datetime(9000, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)
820 return Gkid.from_nt_time(nt_time_from_datetime(future_date))
822 def future_time(self) -> NtTime:
823 """Return an arbitrary time far enough in the future that it’s situated
824 beyond any reasonable rollover period. But not so far in the future that
825 Python’s datetime library will throw OverflowErrors."""
826 return self.future_gkid().start_nt_time()
828 def test_retrieved_password(self):
829 """Test that we can retrieve the correct password for a gMSA."""
831 samdb = self.get_samdb()
832 creds = self.gmsa_account()
834 expected = self.expected_current_gmsa_password_blob(
835 samdb,
836 creds,
837 future_key_is_acceptable=True,
839 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
841 def test_retrieved_password_when_current_key_is_valid(self):
842 """Test that we can retrieve the correct password for a gMSA at a time
843 when we are sure it is valid."""
844 password_interval = 37
846 samdb = self.get_local_samdb()
847 series = self.gmsa_series(password_interval)
848 self.set_db_time(samdb, series.start_of_interval(0))
850 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
852 # Check the managed password of the account the moment it has been
853 # created.
854 expected = self.expected_gmsa_password_blob(
855 samdb,
856 creds,
857 series.interval_gkid(0),
858 previous_gkid=series.interval_gkid(-1),
859 query_expiration_gkid=series.interval_gkid(1),
861 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
863 def test_retrieved_password_when_current_key_is_expired(self):
864 """Test that we can retrieve the correct password for a gMSA when the
865 original password has expired."""
866 password_interval = 14
868 samdb = self.get_local_samdb()
869 series = self.gmsa_series(password_interval)
870 self.set_db_time(samdb, series.start_of_interval(0))
872 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
874 # Set the time to the moment the original password has expired, and
875 # check that the managed password is correct.
876 expired_time = series.start_of_interval(1)
877 self.set_db_time(samdb, expired_time)
878 expected = self.expected_gmsa_password_blob(
879 samdb,
880 creds,
881 series.interval_gkid(1),
882 previous_gkid=series.interval_gkid(0),
883 query_expiration_gkid=series.interval_gkid(2),
885 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
887 def test_retrieved_password_when_next_key_is_expired(self):
888 password_interval = 1
890 samdb = self.get_local_samdb()
891 series = self.gmsa_series(password_interval)
892 self.set_db_time(samdb, series.start_of_interval(0))
894 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
896 expired_time = series.start_of_interval(2)
897 self.set_db_time(samdb, expired_time)
899 expected = self.expected_gmsa_password_blob(
900 samdb,
901 creds,
902 series.interval_gkid(2),
903 previous_gkid=series.interval_gkid(1),
904 query_expiration_gkid=series.interval_gkid(3),
906 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
908 def test_retrieved_password_during_clock_skew_window_when_current_key_is_valid(
909 self,
911 password_interval = 60
913 samdb = self.get_local_samdb()
914 series = self.gmsa_series(password_interval)
915 self.set_db_time(samdb, series.start_of_interval(0))
917 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
919 self.set_db_time(samdb, series.during_skew_window(0))
921 expected = self.expected_gmsa_password_blob(
922 samdb,
923 creds,
924 series.interval_gkid(1),
925 previous_gkid=series.interval_gkid(0),
926 query_expiration_gkid=series.interval_gkid(1),
927 return_future_key=True,
929 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
931 def test_retrieved_password_during_clock_skew_window_when_current_key_is_expired(
932 self,
934 password_interval = 100
936 samdb = self.get_local_samdb()
937 series = self.gmsa_series(password_interval)
938 self.set_db_time(samdb, series.start_of_interval(0))
940 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
942 self.set_db_time(samdb, series.during_skew_window(1))
944 expected = self.expected_gmsa_password_blob(
945 samdb,
946 creds,
947 series.interval_gkid(2),
948 previous_gkid=series.interval_gkid(1),
949 query_expiration_gkid=series.interval_gkid(2),
950 return_future_key=True,
952 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
954 def test_retrieved_password_during_clock_skew_window_when_next_key_is_expired(
955 self,
957 password_interval = 16
959 samdb = self.get_local_samdb()
960 series = self.gmsa_series(password_interval)
961 self.set_db_time(samdb, series.start_of_interval(0))
963 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
965 self.set_db_time(samdb, series.during_skew_window(2))
967 expected = self.expected_gmsa_password_blob(
968 samdb,
969 creds,
970 series.interval_gkid(3),
971 previous_gkid=series.interval_gkid(2),
972 query_expiration_gkid=series.interval_gkid(3),
973 return_future_key=True,
975 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
977 def test_retrieving_managed_password_triggers_keys_update(self):
978 # Create a root key with a start time early enough to be usable at the
979 # time the gMSA is purported to be created.
980 samdb = self.get_samdb()
981 domain_dn = self.get_server_dn(samdb)
982 self.create_root_key(samdb, domain_dn, use_start_time=ROOT_KEY_START_TIME)
984 password_interval = 16
986 local_samdb = self.get_local_samdb()
987 series = GmsaSeries(Gkid(100, 0, 0), gkdi_rollover_interval(password_interval))
988 self.set_db_time(local_samdb, series.start_of_interval(0))
990 creds = self.gmsa_account(samdb=local_samdb, interval=password_interval)
991 dn = creds.get_dn()
993 current_nt_time = self.current_nt_time(samdb)
994 self.set_db_time(local_samdb, current_nt_time)
996 # Search the local database for the account’s keys.
997 res = local_samdb.search(
998 dn, scope=ldb.SCOPE_BASE, attrs=["unicodePwd", "supplementalCredentials"]
1000 self.assertEqual(1, len(res))
1002 previous_nt_hash = res[0].get("unicodePwd", idx=0)
1003 previous_supplemental_creds = self.unpack_supplemental_credentials(
1004 res[0].get("supplementalCredentials", idx=0)
1007 # Check that the NT hash is the value we expect.
1008 self.assertEqual(creds.get_nt_hash(), previous_nt_hash)
1010 # Search for the managed password over LDAP, triggering an update of the
1011 # keys in the database.
1012 res = samdb.search(dn, scope=ldb.SCOPE_BASE, attrs=["msDS-ManagedPassword"])
1013 self.assertEqual(1, len(res))
1015 # Verify that the password is present in the result.
1016 managed_password = res[0].get("msDS-ManagedPassword", idx=0)
1017 self.assertIsNotNone(managed_password, "should be allowed to view the password")
1019 # Search the local database again for the account’s keys, which should
1020 # have been updated.
1021 res = local_samdb.search(
1022 dn, scope=ldb.SCOPE_BASE, attrs=["unicodePwd", "supplementalCredentials"]
1024 self.assertEqual(1, len(res))
1026 nt_hash = res[0].get("unicodePwd", idx=0)
1027 supplemental_creds = self.unpack_supplemental_credentials(
1028 res[0].get("supplementalCredentials", idx=0)
1031 self.assertNotEqual(
1032 previous_nt_hash, nt_hash, "NT hash has not been updated (yet)"
1034 self.assertNotEqual(
1035 previous_supplemental_creds,
1036 supplemental_creds,
1037 "supplementalCredentials has not been updated (yet)",
1040 # Set the new password.
1041 managed_pwd = ndr_unpack(gmsa.MANAGEDPASSWORD_BLOB, managed_password)
1042 self.assertIsNotNone(
1043 managed_pwd.passwords.current, "current password must be present"
1045 creds.set_utf16_password(managed_pwd.passwords.current)
1047 # Check that the new NT hash is the value we expect.
1048 self.assertEqual(creds.get_nt_hash(), nt_hash)
1050 def test_authentication_triggers_keys_update(self):
1051 # Create a root key with a start time early enough to be usable at the
1052 # time the gMSA is purported to be created. But don’t create it on a
1053 # local samdb with a specifically set time, because (if the key isn’t
1054 # deleted later) we could end up with multiple keys with identical
1055 # creation and start times, and tests failing when the test and the
1056 # server don’t agree on which root key to use at a specific time.
1057 samdb = self.get_samdb()
1058 domain_dn = self.get_server_dn(samdb)
1059 self.create_root_key(samdb, domain_dn, use_start_time=ROOT_KEY_START_TIME)
1061 password_interval = 16
1063 local_samdb = self.get_local_samdb()
1064 series = GmsaSeries(Gkid(100, 0, 0), gkdi_rollover_interval(password_interval))
1065 self.set_db_time(local_samdb, series.start_of_interval(0))
1067 creds = self.gmsa_account(samdb=local_samdb, interval=password_interval)
1068 dn = creds.get_dn()
1070 current_nt_time = self.current_nt_time(samdb)
1071 self.set_db_time(local_samdb, current_nt_time)
1073 # Search the local database for the account’s keys.
1074 res = local_samdb.search(
1075 dn, scope=ldb.SCOPE_BASE, attrs=["unicodePwd", "supplementalCredentials"]
1077 self.assertEqual(1, len(res))
1079 previous_nt_hash = res[0].get("unicodePwd", idx=0)
1080 previous_supplemental_creds = self.unpack_supplemental_credentials(
1081 res[0].get("supplementalCredentials", idx=0)
1084 # Check that the NT hash is the value we expect.
1085 self.assertEqual(creds.get_nt_hash(), previous_nt_hash)
1087 # Calculate the password with which to authenticate.
1088 current_series = self.gmsa_series_for_account(
1089 local_samdb, creds, password_interval
1091 managed_pwd = self.expected_gmsa_password_blob(
1092 local_samdb,
1093 creds,
1094 current_series.interval_gkid(0),
1095 query_expiration_gkid=current_series.interval_gkid(1),
1098 # Set the new password.
1099 self.assertIsNotNone(
1100 managed_pwd.passwords.current, "current password must be present"
1102 creds.set_utf16_password(managed_pwd.passwords.current)
1104 # Perform an authentication using the new password. The KDC should
1105 # recognize that the keys in the database are out of date and update
1106 # them.
1107 self._as_req(creds, self.get_service_creds(), kcrypto.Enctype.AES256)
1109 # Search the local database again for the account’s keys, which should
1110 # have been updated.
1111 res = local_samdb.search(
1112 dn, scope=ldb.SCOPE_BASE, attrs=["unicodePwd", "supplementalCredentials"]
1114 self.assertEqual(1, len(res))
1116 nt_hash = res[0].get("unicodePwd", idx=0)
1117 supplemental_creds = self.unpack_supplemental_credentials(
1118 res[0].get("supplementalCredentials", idx=0)
1121 self.assertNotEqual(
1122 previous_nt_hash, nt_hash, "NT hash has not been updated (yet)"
1124 self.assertNotEqual(
1125 previous_supplemental_creds,
1126 supplemental_creds,
1127 "supplementalCredentials has not been updated (yet)",
1130 # Check that the new NT hash is the value we expect.
1131 self.assertEqual(creds.get_nt_hash(), nt_hash)
1133 def test_gmsa_can_perform_gensec_ntlmssp_logon(self):
1134 creds = self.gmsa_account(kerberos_enabled=False)
1136 # Perform a gensec logon.
1137 session = self.gensec_ntlmssp_logon(creds, self.get_local_samdb())
1139 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1140 token = session.security_token
1141 token_sids = token.sids
1142 self.assertGreater(len(token_sids), 0)
1144 # Ensure that they match.
1145 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1147 def test_gmsa_can_perform_gensec_ntlmssp_logon_when_current_key_is_valid(self):
1148 """Test that we can perform a gensec logon at a time when we are sure
1149 the current gMSA password is valid."""
1151 password_interval = 18
1153 samdb = self.get_local_samdb()
1154 series = self.gmsa_series(password_interval)
1155 self.set_db_time(samdb, series.start_of_interval(0))
1157 creds = self.gmsa_account(
1158 samdb=samdb, interval=password_interval, kerberos_enabled=False
1161 # Perform a gensec logon.
1162 session = self.gensec_ntlmssp_logon(creds, samdb)
1164 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1165 token = session.security_token
1166 token_sids = token.sids
1167 self.assertGreater(len(token_sids), 0)
1169 # Ensure that they match.
1170 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1172 def test_gmsa_can_perform_gensec_ntlmssp_logon_when_current_key_is_expired(self):
1173 """Test that we can perform a gensec logon using NTLMSSP at a time when
1174 the current gMSA password has expired."""
1176 password_interval = 40
1178 samdb = self.get_local_samdb()
1179 series = self.gmsa_series(password_interval)
1180 self.set_db_time(samdb, series.start_of_interval(0))
1182 creds = self.gmsa_account(
1183 samdb=samdb, interval=password_interval, kerberos_enabled=False
1186 # Set the time to the moment the original password has expired, and
1187 # perform a gensec logon.
1188 expired_time = series.start_of_interval(1)
1189 self.set_db_time(samdb, expired_time)
1191 # Calculate the password with which to authenticate.
1192 current_series = self.gmsa_series_for_account(samdb, creds, password_interval)
1193 managed_pwd = self.expected_gmsa_password_blob(
1194 samdb,
1195 creds,
1196 current_series.interval_gkid(0),
1197 previous_gkid=current_series.interval_gkid(-1),
1198 query_expiration_gkid=current_series.interval_gkid(1),
1201 # Set the new password.
1202 self.assertIsNotNone(
1203 managed_pwd.passwords.current, "current password must be present"
1205 creds.set_utf16_password(managed_pwd.passwords.current)
1207 # Perform a gensec logon.
1208 session = self.gensec_ntlmssp_logon(creds, samdb)
1210 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1211 token = session.security_token
1212 token_sids = token.sids
1213 self.assertGreater(len(token_sids), 0)
1215 # Ensure that they match.
1216 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1218 def test_gmsa_can_perform_gensec_ntlmssp_logon_when_next_key_is_expired(self):
1219 password_interval = 42
1221 samdb = self.get_local_samdb()
1222 series = self.gmsa_series(password_interval)
1223 self.set_db_time(samdb, series.start_of_interval(0))
1225 creds = self.gmsa_account(
1226 samdb=samdb, interval=password_interval, kerberos_enabled=False
1229 expired_time = series.start_of_interval(2)
1230 self.set_db_time(samdb, expired_time)
1232 # Calculate the password with which to authenticate.
1233 current_series = self.gmsa_series_for_account(samdb, creds, password_interval)
1234 managed_pwd = self.expected_gmsa_password_blob(
1235 samdb,
1236 creds,
1237 current_series.interval_gkid(0),
1238 previous_gkid=current_series.interval_gkid(-1),
1239 query_expiration_gkid=current_series.interval_gkid(1),
1242 # Set the new password.
1243 self.assertIsNotNone(
1244 managed_pwd.passwords.current, "current password must be present"
1246 creds.set_utf16_password(managed_pwd.passwords.current)
1248 # Perform a gensec logon.
1249 session = self.gensec_ntlmssp_logon(creds, samdb)
1251 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1252 token = session.security_token
1253 token_sids = token.sids
1254 self.assertGreater(len(token_sids), 0)
1256 # Ensure that they match.
1257 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1259 def test_gmsa_can_perform_gensec_ntlmssp_logon_during_clock_skew_window_when_current_key_is_valid(
1260 self,
1262 password_interval = 43
1264 samdb = self.get_local_samdb()
1265 series = self.gmsa_series(password_interval)
1266 self.set_db_time(samdb, series.start_of_interval(0))
1268 creds = self.gmsa_account(
1269 samdb=samdb, interval=password_interval, kerberos_enabled=False
1272 self.set_db_time(samdb, series.during_skew_window(0))
1274 # Calculate the password with which to authenticate.
1275 current_series = self.gmsa_series_for_account(samdb, creds, password_interval)
1276 managed_pwd = self.expected_gmsa_password_blob(
1277 samdb,
1278 creds,
1279 current_series.interval_gkid(0),
1280 previous_gkid=current_series.interval_gkid(-1),
1281 query_expiration_gkid=current_series.interval_gkid(1),
1284 # Set the new password.
1285 self.assertIsNotNone(
1286 managed_pwd.passwords.current, "current password must be present"
1288 creds.set_utf16_password(managed_pwd.passwords.current)
1290 # Perform a gensec logon.
1291 session = self.gensec_ntlmssp_logon(creds, samdb)
1293 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1294 token = session.security_token
1295 token_sids = token.sids
1296 self.assertGreater(len(token_sids), 0)
1298 # Ensure that they match.
1299 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1301 def test_gmsa_can_perform_gensec_ntlmssp_logon_during_clock_skew_window_when_current_key_is_expired(
1302 self,
1304 password_interval = 44
1306 samdb = self.get_local_samdb()
1307 series = self.gmsa_series(password_interval)
1308 self.set_db_time(samdb, series.start_of_interval(0))
1310 creds = self.gmsa_account(
1311 samdb=samdb, interval=password_interval, kerberos_enabled=False
1314 self.set_db_time(samdb, series.during_skew_window(1))
1316 # Calculate the password with which to authenticate.
1317 current_series = self.gmsa_series_for_account(samdb, creds, password_interval)
1318 managed_pwd = self.expected_gmsa_password_blob(
1319 samdb,
1320 creds,
1321 current_series.interval_gkid(0),
1322 previous_gkid=current_series.interval_gkid(-1),
1323 query_expiration_gkid=current_series.interval_gkid(1),
1326 # Set the new password.
1327 self.assertIsNotNone(
1328 managed_pwd.passwords.current, "current password must be present"
1330 creds.set_utf16_password(managed_pwd.passwords.current)
1332 # Perform a gensec logon.
1333 session = self.gensec_ntlmssp_logon(creds, samdb)
1335 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1336 token = session.security_token
1337 token_sids = token.sids
1338 self.assertGreater(len(token_sids), 0)
1340 # Ensure that they match.
1341 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1343 def test_gmsa_can_perform_gensec_ntlmssp_logon_during_clock_skew_window_when_next_key_is_expired(
1344 self,
1346 password_interval = 47
1348 samdb = self.get_local_samdb()
1349 series = self.gmsa_series(password_interval)
1350 self.set_db_time(samdb, series.start_of_interval(0))
1352 creds = self.gmsa_account(
1353 samdb=samdb, interval=password_interval, kerberos_enabled=False
1356 self.set_db_time(samdb, series.during_skew_window(2))
1358 # Calculate the password with which to authenticate.
1359 current_series = self.gmsa_series_for_account(samdb, creds, password_interval)
1360 managed_pwd = self.expected_gmsa_password_blob(
1361 samdb,
1362 creds,
1363 current_series.interval_gkid(0),
1364 previous_gkid=current_series.interval_gkid(-1),
1365 query_expiration_gkid=current_series.interval_gkid(1),
1368 # Set the new password.
1369 self.assertIsNotNone(
1370 managed_pwd.passwords.current, "current password must be present"
1372 creds.set_utf16_password(managed_pwd.passwords.current)
1374 # Perform a gensec logon.
1375 session = self.gensec_ntlmssp_logon(creds, samdb)
1377 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1378 token = session.security_token
1379 token_sids = token.sids
1380 self.assertGreater(len(token_sids), 0)
1382 # Ensure that they match.
1383 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1385 def test_gmsa_can_perform_gensec_ntlmssp_logon_with_previous_password_within_five_minutes(
1386 self,
1388 password_interval = 123
1390 samdb = self.get_local_samdb()
1391 series = self.gmsa_series(password_interval)
1392 self.set_db_time(samdb, series.start_of_interval(0))
1394 creds = self.gmsa_account(
1395 samdb=samdb, interval=password_interval, kerberos_enabled=False
1398 # Set the time to within five minutes of the original password’s expiry,
1399 # and perform a gensec logon with the original password.
1400 expired_time = series.within_previous_password_valid_window(1)
1401 self.set_db_time(samdb, expired_time)
1403 # Perform a gensec logon.
1404 session = self.gensec_ntlmssp_logon(creds, samdb)
1406 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1407 token = session.security_token
1408 token_sids = token.sids
1409 self.assertGreater(len(token_sids), 0)
1411 # Ensure that they match.
1412 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1414 def test_gmsa_cannot_perform_gensec_ntlmssp_logon_with_previous_but_one_password_within_five_minutes(
1415 self,
1417 password_interval = 123
1419 samdb = self.get_local_samdb()
1420 series = self.gmsa_series(password_interval)
1421 self.set_db_time(samdb, series.start_of_interval(0))
1423 creds = self.gmsa_account(
1424 samdb=samdb, interval=password_interval, kerberos_enabled=False
1427 # Set the time to within five minutes of the *following* password’s expiry,
1428 # and perform a gensec logon with the original password.
1429 expired_time = series.within_previous_password_valid_window(2)
1430 self.set_db_time(samdb, expired_time)
1432 # Expect the gensec logon to fail.
1433 self.gensec_ntlmssp_logon(creds, samdb, expect_success=False)
1435 def test_gmsa_can_perform_gensec_ntlmssp_logon_with_previous_password_beyond_five_minutes(
1436 self,
1438 password_interval = 456
1440 samdb = self.get_local_samdb()
1441 series = self.gmsa_series(password_interval)
1442 self.set_db_time(samdb, series.start_of_interval(0))
1444 creds = self.gmsa_account(
1445 samdb=samdb, interval=password_interval, kerberos_enabled=False
1448 # Set the time to five minutes beyond the original password’s expiry,
1449 # and try to perform a gensec logon with the original password.
1450 expired_time = series.outside_previous_password_valid_window(1)
1451 self.set_db_time(samdb, expired_time)
1453 # Perform a gensec logon.
1454 session = self.gensec_ntlmssp_logon(creds, samdb)
1456 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1457 token = session.security_token
1458 token_sids = token.sids
1459 self.assertGreater(len(token_sids), 0)
1461 # Ensure that they match.
1462 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1464 def test_gmsa_cannot_perform_gensec_ntlmssp_logon_with_previous_password_five_minutes_apart(
1465 self,
1467 password_interval = 789
1469 samdb = self.get_local_samdb()
1470 series = self.gmsa_series(password_interval)
1471 self.set_db_time(samdb, series.start_of_interval(0))
1473 creds = self.gmsa_account(
1474 samdb=samdb, interval=password_interval, kerberos_enabled=False
1476 gmsa_sid = creds.get_sid()
1478 # Set the time to after the original password’s expiry, and perform a
1479 # gensec logon with the original password.
1480 db_time = series.during_interval(1)
1481 self.set_db_time(samdb, db_time)
1483 # Perform a gensec logon.
1484 session = self.gensec_ntlmssp_logon(creds, samdb)
1486 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1487 token = session.security_token
1488 token_sids = token.sids
1489 self.assertGreater(len(token_sids), 0)
1491 # Ensure that they match.
1492 self.assertEqual(security.dom_sid(gmsa_sid), token_sids[0])
1494 # Set the time to not quite five minutes later, and perform a gensec
1495 # logon with the original password.
1496 self.set_db_time(samdb, NtTime(db_time + MAX_CLOCK_SKEW - 1))
1498 # Perform a gensec logon.
1499 session = self.gensec_ntlmssp_logon(creds, samdb)
1501 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1502 token = session.security_token
1503 token_sids = token.sids
1504 self.assertGreater(len(token_sids), 0)
1506 # Ensure that they match.
1507 self.assertEqual(security.dom_sid(gmsa_sid), token_sids[0])
1509 # Now set the time to exactly five minutes later, and try to perform a
1510 # gensec logon with the original password.
1511 self.set_db_time(samdb, NtTime(db_time + MAX_CLOCK_SKEW))
1513 # Expect the gensec logon to fail.
1514 self.gensec_ntlmssp_logon(creds, samdb, expect_success=False)
1516 def test_gmsa_can_perform_netlogon(self):
1517 self._test_samlogon(
1518 self.gmsa_account(kerberos_enabled=False),
1519 netlogon.NetlogonNetworkInformation,
1520 validation_level=netlogon.NetlogonValidationSamInfo4,
1523 def test_computer_cannot_perform_interactive_logon(self):
1524 self._test_samlogon(
1525 self.get_mach_creds(),
1526 netlogon.NetlogonInteractiveInformation,
1527 expect_error=ntstatus.NT_STATUS_NO_SUCH_USER,
1528 validation_level=netlogon.NetlogonValidationSamInfo4,
1531 def test_gmsa_cannot_perform_interactive_logon(self):
1532 self._test_samlogon(
1533 self.gmsa_account(kerberos_enabled=False),
1534 netlogon.NetlogonInteractiveInformation,
1535 expect_error=ntstatus.NT_STATUS_NO_SUCH_USER,
1536 validation_level=netlogon.NetlogonValidationSamInfo4,
1539 def _gmsa_can_perform_as_req(self, *, enctype: kcrypto.Enctype) -> None:
1540 self._as_req(self.gmsa_account(), self.get_service_creds(), enctype)
1542 def test_gmsa_can_perform_as_req_with_aes256(self):
1543 self._gmsa_can_perform_as_req(enctype=kcrypto.Enctype.AES256)
1545 def test_gmsa_can_perform_as_req_with_rc4(self):
1546 self._gmsa_can_perform_as_req(enctype=kcrypto.Enctype.RC4)
1548 def _gmsa_can_authenticate_to_ldap(self, *, with_kerberos: bool) -> None:
1549 creds = self.gmsa_account(kerberos_enabled=with_kerberos)
1551 protocol = "ldap"
1553 # Authenticate to LDAP.
1554 samdb_user = SamDB(
1555 url=f"{protocol}://{self.dc_host}", credentials=creds, lp=self.get_lp()
1558 # Search for the user’s token groups.
1559 res = samdb_user.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
1560 self.assertEqual(1, len(res))
1562 token_groups = res[0].get("tokenGroups", idx=0)
1563 self.assertIsNotNone(token_groups)
1565 # Ensure that the token SID matches.
1566 token_sid = ndr_unpack(security.dom_sid, token_groups)
1567 self.assertEqual(security.dom_sid(creds.get_sid()), token_sid)
1569 def test_gmsa_can_authenticate_to_ldap_with_kerberos(self):
1570 self._gmsa_can_authenticate_to_ldap(with_kerberos=True)
1572 def test_gmsa_can_authenticate_to_ldap_without_kerberos(self):
1573 self._gmsa_can_authenticate_to_ldap(with_kerberos=False)
1576 if __name__ == "__main__":
1577 import unittest
1579 unittest.main()