2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) Catalyst.Net Ltd 2024
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <https://www.gnu.org/licenses/>.
23 sys
.path
.insert(0, "bin/python")
24 os
.environ
["PYTHONUNBUFFERED"] = "1"
26 from typing
import Iterable
, NewType
, Optional
, Tuple
, TypeVar
29 from itertools
import chain
33 from samba
import auth
, dsdb
, gensec
, ntstatus
, NTSTATUSError
, werror
34 from samba
.dcerpc
import gkdi
, gmsa
, misc
, netlogon
, security
35 from samba
.ndr
import ndr_pack
, ndr_unpack
36 from samba
.nt_time
import (
37 nt_time_delta_from_timedelta
,
38 nt_time_from_datetime
,
41 timedelta_from_nt_time_delta
,
43 from samba
.samdb
import SamDB
44 from samba
.credentials
import Credentials
, DONT_USE_KERBEROS
45 from samba
.gkdi
import (
52 from samba
.tests
import connect_samdb
53 from samba
.tests
.krb5
import kcrypto
54 from samba
.tests
.gkdi
import GkdiBaseTest
, ROOT_KEY_START_TIME
55 from samba
.tests
.krb5
.kdc_base_test
import KDCBaseTest
56 from samba
.tests
.krb5
.raw_testcase
import KerberosCredentials
57 from samba
.tests
.krb5
.rfc4120_constants
import (
62 import samba
.tests
.krb5
.rfc4120_pyasn1
as krb5_asn1
64 GMSA_DEFAULT_MANAGED_PASSWORD_INTERVAL
= 30
66 Gmsa
= NewType("Gmsa", ldb
.Message
)
69 def gkdi_rollover_interval(managed_password_interval
: int) -> NtTimeDelta
:
70 rollover_interval
= NtTimeDelta(
71 managed_password_interval
* 24 // 10 * KEY_CYCLE_DURATION
73 if rollover_interval
== 0:
74 raise ValueError("rollover interval must not be zero")
75 return rollover_interval
80 rollover_interval
: NtTimeDelta
82 def __init__(self
, start_gkid
: Gkid
, rollover_interval
: NtTimeDelta
) -> None:
83 self
.start_time
= start_gkid
.start_nt_time()
84 self
.rollover_interval
= rollover_interval
86 def interval_gkid(self
, n
: int) -> Gkid
:
87 return Gkid
.from_nt_time(self
.start_of_interval(n
))
89 def start_of_interval(self
, n
: int) -> NtTime
:
90 if not isinstance(n
, int):
91 raise ValueError(f
"{n} must be an integer")
92 return NtTime(int(self
.start_time
+ n
* self
.rollover_interval
))
94 def during_interval(self
, n
: int) -> NtTime
:
95 return NtTime(int(self
.start_of_interval(n
) + self
.rollover_interval
// 2))
97 def during_skew_window(self
, n
: int) -> NtTime
:
98 two_minutes
= nt_time_delta_from_timedelta(datetime
.timedelta(minutes
=2))
100 int(self
.start_of_interval(n
) + self
.rollover_interval
- two_minutes
)
103 def outside_previous_password_valid_window(self
, n
: int) -> NtTime
:
104 return NtTime(self
.start_of_interval(n
) + MAX_CLOCK_SKEW
)
106 def within_previous_password_valid_window(self
, n
: int) -> NtTime
:
107 return NtTime(self
.outside_previous_password_valid_window(n
) - 1)
110 class GmsaTests(GkdiBaseTest
, KDCBaseTest
):
113 creds
: KerberosCredentials
,
114 target_creds
: KerberosCredentials
,
115 enctype
: kcrypto
.Enctype
,
117 preauth_key
= self
.PasswordKey_from_creds(creds
, enctype
)
119 def generate_padata_fn(
120 _kdc_exchange_dict
: dict, _callback_dict
: Optional
[dict], req_body
: dict
121 ) -> Tuple
[list, dict]:
124 patime
, pausec
= self
.get_KerberosTimeWithUsec()
125 enc_ts
= self
.PA_ENC_TS_ENC_create(patime
, pausec
)
126 enc_ts
= self
.der_encode(enc_ts
, asn1Spec
=krb5_asn1
.PA_ENC_TS_ENC())
128 enc_ts
= self
.EncryptedData_create(preauth_key
, KU_PA_ENC_TIMESTAMP
, enc_ts
)
129 enc_ts
= self
.der_encode(enc_ts
, asn1Spec
=krb5_asn1
.EncryptedData())
131 enc_ts
= self
.PA_DATA_create(PADATA_ENC_TIMESTAMP
, enc_ts
)
133 padata
.append(enc_ts
)
135 return padata
, req_body
137 user_name
= creds
.get_username()
138 cname
= self
.PrincipalName_create(
139 name_type
=NT_PRINCIPAL
, names
=user_name
.split("/")
142 target_name
= target_creds
.get_username()
143 target_realm
= target_creds
.get_realm()
145 sname
= self
.PrincipalName_create(
146 name_type
=NT_PRINCIPAL
, names
=["host", target_name
[:-1]]
149 check_error_fn
= None
150 check_rep_fn
= self
.generic_check_kdc_rep
152 expected_sname
= self
.PrincipalName_create(
153 name_type
=NT_PRINCIPAL
, names
=[target_name
]
156 kdc_options
= "forwardable,renewable,canonicalize,renewable-ok"
157 kdc_options
= krb5_asn1
.KDCOptions(kdc_options
)
159 ticket_decryption_key
= self
.TicketDecryptionKey_from_creds(target_creds
)
161 kdc_exchange_dict
= self
.as_exchange_dict(
163 expected_crealm
=creds
.get_realm(),
164 expected_cname
=cname
,
165 expected_srealm
=target_realm
,
166 expected_sname
=expected_sname
,
167 expected_supported_etypes
=target_creds
.tgs_supported_enctypes
,
168 ticket_decryption_key
=ticket_decryption_key
,
169 generate_padata_fn
=generate_padata_fn
,
170 check_error_fn
=check_error_fn
,
171 check_rep_fn
=check_rep_fn
,
172 check_kdc_private_fn
=self
.generic_check_kdc_private
,
173 expected_error_mode
=0,
174 expected_salt
=creds
.get_salt(),
175 preauth_key
=preauth_key
,
176 kdc_options
=str(kdc_options
),
179 till
= self
.get_KerberosTime(offset
=36000)
181 etypes
= kcrypto
.Enctype
.AES256
, kcrypto
.Enctype
.RC4
183 rep
= self
._generic
_kdc
_exchange
(
191 self
.check_as_reply(rep
)
193 return kdc_exchange_dict
196 def gkdi_get_key_start_time(self
, key_id
: gkdi
.KeyEnvelope
) -> NtTime
:
197 return Gkid
.from_key_envelope(key_id
).start_nt_time()
203 root_key_id
: Optional
[misc
.GUID
],
205 sid
: security
.dom_sid
,
207 group_key
= self
.get_key_exact(samdb
, target_sd
, root_key_id
, gkid
)
209 password
= self
.generate_gmsa_password(group_key
, sid
)
210 return self
.post_process_password_buffer(password
)
212 def get_password_based_on_gkid(
213 self
, samdb
: SamDB
, gkid
: Gkid
, sid
: security
.dom_sid
215 return self
.get_password(samdb
, self
.gmsa_sd
, None, gkid
, sid
)
217 def get_password_based_on_timestamp(
218 self
, samdb
: SamDB
, timestamp
: NtTime
, sid
: security
.dom_sid
220 return self
.get_password_based_on_gkid(samdb
, Gkid
.from_nt_time(timestamp
), sid
)
223 def get_password_based_on_key_id(
224 self
, samdb
: SamDB
, managed_password
: gkdi
.KeyEnvelope
, sid
: str
226 return self
.get_password(
229 managed_password
.root_key_id
,
230 Gkid
.from_key_envelope(managed_password
),
234 def generate_gmsa_password(self
, key
: GroupKey
, sid
: str) -> bytes
:
235 context
= ndr_pack(security
.dom_sid(sid
))
236 algorithm
= key
.hash_algorithm
.algorithm()
237 gmsa_password_len
= 256
243 label
="GMSA PASSWORD",
244 len_in_bytes
=gmsa_password_len
,
247 def post_process_password_buffer(self
, key
: bytes
) -> bytes
:
248 self
.assertEqual(0, len(key
) & 1, f
"length of key ({len(key)}) is not even")
250 def convert_null(t
: Tuple
[int, int]) -> Tuple
[int, int]:
258 def take_pairs(iterable
: Iterable
[T
]) -> Iterable
[Tuple
[T
, T
]]:
263 except StopIteration:
266 yield first
, next(it
)
268 return bytes(chain
.from_iterable(map(convert_null
, take_pairs(key
))))
270 def get_gmsa_object(self
, samdb
: SamDB
, dn
: ldb
.Dn
) -> Gmsa
:
273 scope
=ldb
.SCOPE_BASE
,
275 "msDS-ManagedPasswordInterval",
276 "msDS-ManagedPasswordId",
277 "msDS-ManagedPasswordPreviousId",
283 def gmsa_rollover_interval(self
, gmsa_object
: Gmsa
) -> NtTimeDelta
:
284 managed_password_interval
= gmsa_object
.get(
285 "msDS-ManagedPasswordInterval", idx
=0
287 if managed_password_interval
is None:
288 managed_password_interval
= GMSA_DEFAULT_MANAGED_PASSWORD_INTERVAL
290 managed_password_interval
= int(managed_password_interval
)
292 return gkdi_rollover_interval(managed_password_interval
)
294 def gmsa_creation_nt_time(self
, gmsa_object
: Gmsa
) -> NtTime
:
295 creation_time
: Optional
[bytes
] = gmsa_object
.get("whenCreated", idx
=0)
296 self
.assertIsNotNone(creation_time
)
297 assert creation_time
is not None # to help the type checker
299 create_time
= datetime
.datetime
.fromtimestamp(
300 ldb
.string_to_time(creation_time
.decode()), tz
=datetime
.timezone
.utc
302 return nt_time_from_datetime(create_time
)
304 def gmsa_series(self
, managed_password_interval
: int) -> GmsaSeries
:
306 self
.future_gkid(), gkdi_rollover_interval(managed_password_interval
)
309 def gmsa_series_for_account(
310 self
, samdb
: SamDB
, creds
: KerberosCredentials
, managed_password_interval
: int
312 gmsa_object
= self
.get_gmsa_object(samdb
, creds
.get_dn())
313 current_nt_time
= self
.current_nt_time(samdb
)
314 gkid
= Gkid
.from_nt_time(
315 self
.account_quantized_time(gmsa_object
, current_nt_time
)
317 return GmsaSeries(gkid
, gkdi_rollover_interval(managed_password_interval
))
320 self
, key_start_time
: NtTime
, time
: NtTime
, gkdi_rollover_interval
: NtTimeDelta
322 self
.assertLessEqual(key_start_time
, time
)
324 time_since_key_start
= NtTimeDelta(time
- key_start_time
)
325 quantized_time_since_key_start
= NtTimeDelta(
326 time_since_key_start
// gkdi_rollover_interval
* gkdi_rollover_interval
328 return NtTime(key_start_time
+ quantized_time_since_key_start
)
330 def account_quantized_time(self
, gmsa_object
: Gmsa
, time
: NtTime
) -> NtTime
:
331 pwd_id_blob
= gmsa_object
.get("msDS-ManagedPasswordId", idx
=0)
332 self
.assertIsNotNone(pwd_id_blob
, "SAM should have initialized password ID")
334 pwd_id
= ndr_unpack(gkdi
.KeyEnvelope
, pwd_id_blob
)
335 key_start_time
= Gkid
.from_key_envelope(pwd_id
).start_nt_time()
337 gkdi_rollover_interval
= self
.gmsa_rollover_interval(gmsa_object
)
338 return self
.quantized_time(key_start_time
, time
, gkdi_rollover_interval
)
340 def expected_gmsa_password_blob(
343 creds
: KerberosCredentials
,
346 query_expiration_gkid
: Gkid
,
347 previous_gkid
: Optional
[Gkid
] = None,
348 return_future_key
: bool = False,
349 ) -> gmsa
.MANAGEDPASSWORD_BLOB
:
350 new_password
= self
.get_password_based_on_gkid(samdb
, gkid
, creds
.get_sid())
352 if previous_gkid
is not None:
353 old_password
= self
.get_password_based_on_gkid(
354 samdb
, previous_gkid
, creds
.get_sid()
357 current_time
= self
.current_nt_time(samdb
)
359 gmsa_object
= self
.get_gmsa_object(samdb
, creds
.get_dn())
360 gkdi_rollover_interval
= self
.gmsa_rollover_interval(gmsa_object
)
362 query_expiration_time
= query_expiration_gkid
.start_nt_time()
363 query_password_interval
= NtTimeDelta(query_expiration_time
- current_time
)
364 unchanged_password_interval
= NtTimeDelta(
367 query_expiration_time
368 + (gkdi_rollover_interval
if return_future_key
else 0)
374 return self
.marshal_password(
377 query_password_interval
,
378 unchanged_password_interval
,
381 def expected_current_gmsa_password_blob(
384 creds
: KerberosCredentials
,
386 future_key_is_acceptable
: bool,
387 ) -> gmsa
.MANAGEDPASSWORD_BLOB
:
388 gmsa_object
= self
.get_gmsa_object(samdb
, creds
.get_dn())
390 gkdi_rollover_interval
= self
.gmsa_rollover_interval(gmsa_object
)
392 pwd_id_blob
= gmsa_object
.get("msDS-ManagedPasswordId", idx
=0)
393 self
.assertIsNotNone(pwd_id_blob
, "SAM should have initialized password ID")
395 pwd_id
= ndr_unpack(gkdi
.KeyEnvelope
, pwd_id_blob
)
396 key_start_time
= Gkid
.from_key_envelope(pwd_id
).start_nt_time()
398 current_time
= self
.current_nt_time(samdb
)
400 new_key_start_time
= self
.quantized_time(
401 key_start_time
, current_time
, gkdi_rollover_interval
403 new_key_expiration_time
= NtTime(new_key_start_time
+ gkdi_rollover_interval
)
405 account_sid
= creds
.get_sid()
407 within_clock_skew_window
= (
408 new_key_expiration_time
- current_time
<= MAX_CLOCK_SKEW
410 return_future_key
= future_key_is_acceptable
and within_clock_skew_window
411 if return_future_key
:
412 new_password
= self
.get_password_based_on_timestamp(
413 samdb
, new_key_expiration_time
, account_sid
415 old_password
= self
.get_password_based_on_timestamp(
416 samdb
, new_key_start_time
, account_sid
419 new_password
= self
.get_password_based_on_timestamp(
420 samdb
, new_key_start_time
, account_sid
423 account_age
= NtTimeDelta(
424 current_time
- self
.gmsa_creation_nt_time(gmsa_object
)
426 if account_age
>= gkdi_rollover_interval
:
427 old_password
= self
.get_password_based_on_timestamp(
429 NtTime(new_key_start_time
- gkdi_rollover_interval
),
433 # The account is not old enough to have a previous password.
436 key_expiration_time
= NtTime(key_start_time
+ gkdi_rollover_interval
)
437 key_is_expired
= key_expiration_time
<= current_time
439 query_expiration_time
= NtTime(
440 new_key_expiration_time
if key_is_expired
else key_expiration_time
442 query_password_interval
= NtTimeDelta(query_expiration_time
- current_time
)
443 unchanged_password_interval
= NtTimeDelta(
446 query_expiration_time
447 + (gkdi_rollover_interval
if return_future_key
else 0)
453 return self
.marshal_password(
456 query_password_interval
,
457 unchanged_password_interval
,
460 def marshal_password(
462 current_password
: bytes
,
463 previous_password
: Optional
[bytes
],
464 query_password_interval
: NtTimeDelta
,
465 unchanged_password_interval
: NtTimeDelta
,
466 ) -> gmsa
.MANAGEDPASSWORD_BLOB
:
467 managed_password
= gmsa
.MANAGEDPASSWORD_BLOB()
469 managed_password
.passwords
.current
= current_password
470 managed_password
.passwords
.previous
= previous_password
471 managed_password
.passwords
.query_interval
= query_password_interval
472 managed_password
.passwords
.unchanged_interval
= unchanged_password_interval
474 return managed_password
479 samdb
: Optional
[SamDB
] = None,
481 msa_membership
: Optional
[str] = None,
483 ) -> KerberosCredentials
:
484 if msa_membership
is None:
485 allow_world_sddl
= "O:SYD:(A;;RP;;;WD)"
486 msa_membership
= allow_world_sddl
488 msa_membership_sd
= ndr_pack(
489 security
.descriptor
.from_sddl(msa_membership
, security
.dom_sid())
493 creds
= self
.get_cached_creds(
495 account_type
=self
.AccountType
.GROUP_MANAGED_SERVICE
,
497 "additional_details": self
.freeze(
499 "msDS-GroupMSAMembership": msa_membership_sd
,
500 "msDS-ManagedPasswordInterval": str(interval
),
505 # Ensure the gMSA is a brand‐new account.
508 except ldb
.LdbError
as err
:
509 if err
.args
[0] == ldb
.ERR_UNWILLING_TO_PERFORM
:
511 "If you’re running these tests against Windows, try “warming up”"
512 " the GKDI service by running `samba.tests.krb5.gkdi_tests` first."
517 # Derive the account’s current password. The account is too new to have a previous password yet.
518 managed_pwd
= self
.expected_current_gmsa_password_blob(
519 self
.get_samdb() if samdb
is None else samdb
,
521 future_key_is_acceptable
=False,
525 self
.assertIsNotNone(
526 managed_pwd
.passwords
.current
, "current password must be present"
528 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
532 def get_local_samdb(self
) -> SamDB
:
533 """Return a connection to the local database."""
536 samdb
= connect_samdb(
537 samdb_url
=lp
.samdb_url(), lp
=lp
, credentials
=self
.get_admin_creds()
539 self
.assertLocalSamDB(samdb
)
543 # Perform a gensec logon using NTLMSSP. As samdb is passed in as a
544 # parameter, it can have a time set on it with set_db_time().
545 def gensec_ntlmssp_logon(
546 self
, client_creds
: Credentials
, samdb
: SamDB
, expect_success
: bool = True
547 ) -> "Optional[auth.session_info]":
549 lp
.set("server role", "active directory domain controller")
551 settings
= {"lp_ctx": lp
, "target_hostname": lp
.get("netbios name")}
553 gensec_client
= gensec
.Security
.start_client(settings
)
554 # Ensure that we don’t use Kerberos.
555 self
.assertEqual(DONT_USE_KERBEROS
, client_creds
.get_kerberos_state())
556 gensec_client
.set_credentials(client_creds
)
557 gensec_client
.want_feature(gensec
.FEATURE_SEAL
)
558 gensec_client
.start_mech_by_name("ntlmssp")
560 auth_context
= auth
.AuthContext(lp_ctx
=lp
, ldb
=samdb
)
562 gensec_server
= gensec
.Security
.start_server(settings
, auth_context
)
563 machine_creds
= Credentials()
564 machine_creds
.guess(lp
)
565 machine_creds
.set_machine_account(lp
)
566 gensec_server
.set_credentials(machine_creds
)
568 gensec_server
.start_mech_by_name("ntlmssp")
570 client_finished
= False
571 server_finished
= False
572 client_to_server
= b
""
573 server_to_client
= b
""
575 # Operate as both the client and the server to verify the user’s credentials.
576 while not client_finished
or not server_finished
:
577 if not client_finished
:
578 client_finished
, client_to_server
= gensec_client
.update(
581 if not server_finished
:
583 server_finished
, server_to_client
= gensec_server
.update(
586 except NTSTATUSError
as err
:
587 self
.assertFalse(expect_success
, "got an unexpected error")
589 self
.assertEqual(ntstatus
.NT_STATUS_WRONG_PASSWORD
, err
.args
[0])
592 self
.assertTrue(expect_success
, "expected to get an error")
594 # Retrieve the SIDs from the security token.
595 return gensec_server
.session_info()
597 def check_nt_interval(
599 expected_nt_interval
: NtTimeDelta
,
600 nt_interval
: NtTimeDelta
,
603 """Check that the intervals match to within thirty seconds or so."""
605 threshold
= datetime
.timedelta(seconds
=30)
607 interval
= timedelta_from_nt_time_delta(nt_interval
)
608 expected_interval
= timedelta_from_nt_time_delta(expected_nt_interval
)
609 interval_difference
= abs(interval
- expected_interval
)
613 f
"{interval_name} ({interval}) is out by {interval_difference} from"
614 f
" expected ({expected_interval})",
617 def check_managed_pwd_intervals(
619 expected_managed_pwd
: gmsa
.MANAGEDPASSWORD_BLOB
,
620 managed_pwd
: gmsa
.MANAGEDPASSWORD_BLOB
,
622 expected_passwords
= expected_managed_pwd
.passwords
623 passwords
= managed_pwd
.passwords
625 self
.check_nt_interval(
626 expected_passwords
.query_interval
,
627 passwords
.query_interval
,
630 self
.check_nt_interval(
631 expected_passwords
.unchanged_interval
,
632 passwords
.unchanged_interval
,
633 "unchanged interval",
636 def check_managed_pwd(
639 creds
: KerberosCredentials
,
641 expected_managed_pwd
: gmsa
.MANAGEDPASSWORD_BLOB
,
644 creds
.get_dn(), scope
=ldb
.SCOPE_BASE
, attrs
=["msDS-ManagedPassword"]
646 self
.assertEqual(1, len(res
), "gMSA not found")
647 managed_password
= res
[0].get("msDS-ManagedPassword", idx
=0)
649 self
.assertIsNotNone(managed_password
)
650 managed_pwd
= ndr_unpack(gmsa
.MANAGEDPASSWORD_BLOB
, managed_password
)
652 self
.assertEqual(1, managed_pwd
.version
)
653 self
.assertEqual(0, managed_pwd
.reserved
)
654 self
.assertEqual(len(managed_password
), managed_pwd
.length
)
656 self
.assertIsNotNone(expected_managed_pwd
.passwords
.current
)
659 managed_pwd
.passwords
.current
, expected_managed_pwd
.passwords
.current
662 managed_pwd
.passwords
.previous
, expected_managed_pwd
.passwords
.previous
665 self
.check_managed_pwd_intervals(expected_managed_pwd
, managed_pwd
)
667 # When creating a gMSA, Windows seems to pick the root key with the
668 # greatest msKds-CreateTime having msKds-UseStartTime ≤ ten hours ago.
669 # Bear in mind that it seems also to cache the key, so it won’t always
670 # use the latest one.
672 def get_managed_service_accounts_dn(self
) -> ldb
.Dn
:
673 samdb
= self
.get_samdb()
674 return samdb
.get_wellknown_dn(
675 samdb
.get_default_basedn(), dsdb
.DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER
678 def check_managed_password_access(
682 samdb
: Optional
[SamDB
] = None,
683 expect_access
: bool = False,
684 expected_werror
: int = werror
.WERR_SUCCESS
,
687 samdb
= self
.get_samdb()
689 self
.assertFalse(expect_access
)
690 managed_service_accounts_dn
= self
.get_managed_service_accounts_dn()
691 username
= creds
.get_username()
693 # Try base, subtree, and one‐level searches.
695 (creds
.get_dn(), ldb
.SCOPE_BASE
),
696 (managed_service_accounts_dn
, ldb
.SCOPE_SUBTREE
),
697 (managed_service_accounts_dn
, ldb
.SCOPE_ONELEVEL
),
700 for dn
, scope
in searches
:
701 # Perform a search and see whether we’re allowed to view the managed password.
707 expression
=f
"sAMAccountName={username}",
708 attrs
=["msDS-ManagedPassword"],
710 except ldb
.LdbError
as err
:
711 self
.assertTrue(expected_werror
, "got an unexpected error")
714 if num
!= ldb
.ERR_OPERATIONS_ERROR
:
717 self
.assertIn(f
"{expected_werror:08X}", estr
)
720 self
.assertFalse(expected_werror
, "expected to get an error")
721 self
.assertEqual(1, len(res
), "should always find the gMSA")
723 managed_password
= res
[0].get("msDS-ManagedPassword", idx
=0)
725 self
.assertIsNotNone(
726 managed_password
, "should be allowed to view the password"
730 managed_password
, "should not be allowed to view the password"
733 def test_retrieved_password_allowed(self
):
734 """Test being allowed to view the managed password."""
735 self
.check_managed_password_access(self
.gmsa_account(), expect_access
=True)
737 def test_retrieved_password_denied(self
):
738 """Test not being allowed to view the managed password."""
739 deny_world_sddl
= "O:SYD:(D;;RP;;;WD)"
740 self
.check_managed_password_access(
741 self
.gmsa_account(msa_membership
=deny_world_sddl
), expect_access
=False
744 def test_retrieving_password_over_sealed_connection(self
):
747 f
"ldap://{self.dc_host}",
748 credentials
=self
.get_admin_creds(),
749 session_info
=auth
.system_session(lp
),
753 self
.check_managed_password_access(
754 self
.gmsa_account(), samdb
=samdb
, expect_access
=True
757 def test_retrieving_password_over_unsealed_connection(self
):
758 # Requires --use-kerberos=required, or it automatically upgrades to an
759 # encrypted connection.
761 # Remove FEATURE_SEAL which gets added by insta_creds.
762 creds
= self
.insta_creds(template
=self
.get_admin_creds())
763 creds
.set_gensec_features(creds
.get_gensec_features() & ~gensec
.FEATURE_SEAL
)
767 sasl_wrap
= lp
.get("client ldap sasl wrapping")
768 self
.addCleanup(lp
.set, "client ldap sasl wrapping", sasl_wrap
)
769 lp
.set("client ldap sasl wrapping", "sign")
771 # Create a second ldb connection without seal.
773 f
"ldap://{self.dc_host}",
775 session_info
=auth
.system_session(lp
),
779 self
.check_managed_password_access(
782 expected_werror
=werror
.WERR_DS_CONFIDENTIALITY_REQUIRED
,
785 def test_retrieving_denied_password_over_unsealed_connection(self
):
786 # Requires --use-kerberos=required, or it automatically upgrades to an
787 # encrypted connection.
789 # Remove FEATURE_SEAL which gets added by insta_creds.
790 creds
= self
.insta_creds(template
=self
.get_admin_creds())
791 creds
.set_gensec_features(creds
.get_gensec_features() & ~gensec
.FEATURE_SEAL
)
795 sasl_wrap
= lp
.get("client ldap sasl wrapping")
796 self
.addCleanup(lp
.set, "client ldap sasl wrapping", sasl_wrap
)
797 lp
.set("client ldap sasl wrapping", "sign")
799 # Create a second ldb connection without seal.
801 f
"ldap://{self.dc_host}",
803 session_info
=auth
.system_session(lp
),
807 # Deny anyone from being able to view the password.
808 deny_world_sddl
= "O:SYD:(D;;RP;;;WD)"
809 self
.check_managed_password_access(
810 self
.gmsa_account(msa_membership
=deny_world_sddl
),
812 expected_werror
=werror
.WERR_DS_CONFIDENTIALITY_REQUIRED
,
815 def future_gkid(self
) -> Gkid
:
816 """Return (6333, 26, 5)—an arbitrary GKID far enough in the future that
817 it’s situated beyond any reasonable rollover period. But not so far in
818 the future that Python’s datetime library will throw OverflowErrors."""
819 future_date
= datetime
.datetime(9000, 1, 1, 0, 0, tzinfo
=datetime
.timezone
.utc
)
820 return Gkid
.from_nt_time(nt_time_from_datetime(future_date
))
822 def future_time(self
) -> NtTime
:
823 """Return an arbitrary time far enough in the future that it’s situated
824 beyond any reasonable rollover period. But not so far in the future that
825 Python’s datetime library will throw OverflowErrors."""
826 return self
.future_gkid().start_nt_time()
828 def test_retrieved_password(self
):
829 """Test that we can retrieve the correct password for a gMSA."""
831 samdb
= self
.get_samdb()
832 creds
= self
.gmsa_account()
834 expected
= self
.expected_current_gmsa_password_blob(
837 future_key_is_acceptable
=True,
839 self
.check_managed_pwd(samdb
, creds
, expected_managed_pwd
=expected
)
841 def test_retrieved_password_when_current_key_is_valid(self
):
842 """Test that we can retrieve the correct password for a gMSA at a time
843 when we are sure it is valid."""
844 password_interval
= 37
846 samdb
= self
.get_local_samdb()
847 series
= self
.gmsa_series(password_interval
)
848 self
.set_db_time(samdb
, series
.start_of_interval(0))
850 creds
= self
.gmsa_account(samdb
=samdb
, interval
=password_interval
)
852 # Check the managed password of the account the moment it has been
854 expected
= self
.expected_gmsa_password_blob(
857 series
.interval_gkid(0),
858 previous_gkid
=series
.interval_gkid(-1),
859 query_expiration_gkid
=series
.interval_gkid(1),
861 self
.check_managed_pwd(samdb
, creds
, expected_managed_pwd
=expected
)
863 def test_retrieved_password_when_current_key_is_expired(self
):
864 """Test that we can retrieve the correct password for a gMSA when the
865 original password has expired."""
866 password_interval
= 14
868 samdb
= self
.get_local_samdb()
869 series
= self
.gmsa_series(password_interval
)
870 self
.set_db_time(samdb
, series
.start_of_interval(0))
872 creds
= self
.gmsa_account(samdb
=samdb
, interval
=password_interval
)
874 # Set the time to the moment the original password has expired, and
875 # check that the managed password is correct.
876 expired_time
= series
.start_of_interval(1)
877 self
.set_db_time(samdb
, expired_time
)
878 expected
= self
.expected_gmsa_password_blob(
881 series
.interval_gkid(1),
882 previous_gkid
=series
.interval_gkid(0),
883 query_expiration_gkid
=series
.interval_gkid(2),
885 self
.check_managed_pwd(samdb
, creds
, expected_managed_pwd
=expected
)
887 def test_retrieved_password_when_next_key_is_expired(self
):
888 password_interval
= 1
890 samdb
= self
.get_local_samdb()
891 series
= self
.gmsa_series(password_interval
)
892 self
.set_db_time(samdb
, series
.start_of_interval(0))
894 creds
= self
.gmsa_account(samdb
=samdb
, interval
=password_interval
)
896 expired_time
= series
.start_of_interval(2)
897 self
.set_db_time(samdb
, expired_time
)
899 expected
= self
.expected_gmsa_password_blob(
902 series
.interval_gkid(2),
903 previous_gkid
=series
.interval_gkid(1),
904 query_expiration_gkid
=series
.interval_gkid(3),
906 self
.check_managed_pwd(samdb
, creds
, expected_managed_pwd
=expected
)
908 def test_retrieved_password_during_clock_skew_window_when_current_key_is_valid(
911 password_interval
= 60
913 samdb
= self
.get_local_samdb()
914 series
= self
.gmsa_series(password_interval
)
915 self
.set_db_time(samdb
, series
.start_of_interval(0))
917 creds
= self
.gmsa_account(samdb
=samdb
, interval
=password_interval
)
919 self
.set_db_time(samdb
, series
.during_skew_window(0))
921 expected
= self
.expected_gmsa_password_blob(
924 series
.interval_gkid(1),
925 previous_gkid
=series
.interval_gkid(0),
926 query_expiration_gkid
=series
.interval_gkid(1),
927 return_future_key
=True,
929 self
.check_managed_pwd(samdb
, creds
, expected_managed_pwd
=expected
)
931 def test_retrieved_password_during_clock_skew_window_when_current_key_is_expired(
934 password_interval
= 100
936 samdb
= self
.get_local_samdb()
937 series
= self
.gmsa_series(password_interval
)
938 self
.set_db_time(samdb
, series
.start_of_interval(0))
940 creds
= self
.gmsa_account(samdb
=samdb
, interval
=password_interval
)
942 self
.set_db_time(samdb
, series
.during_skew_window(1))
944 expected
= self
.expected_gmsa_password_blob(
947 series
.interval_gkid(2),
948 previous_gkid
=series
.interval_gkid(1),
949 query_expiration_gkid
=series
.interval_gkid(2),
950 return_future_key
=True,
952 self
.check_managed_pwd(samdb
, creds
, expected_managed_pwd
=expected
)
954 def test_retrieved_password_during_clock_skew_window_when_next_key_is_expired(
957 password_interval
= 16
959 samdb
= self
.get_local_samdb()
960 series
= self
.gmsa_series(password_interval
)
961 self
.set_db_time(samdb
, series
.start_of_interval(0))
963 creds
= self
.gmsa_account(samdb
=samdb
, interval
=password_interval
)
965 self
.set_db_time(samdb
, series
.during_skew_window(2))
967 expected
= self
.expected_gmsa_password_blob(
970 series
.interval_gkid(3),
971 previous_gkid
=series
.interval_gkid(2),
972 query_expiration_gkid
=series
.interval_gkid(3),
973 return_future_key
=True,
975 self
.check_managed_pwd(samdb
, creds
, expected_managed_pwd
=expected
)
977 def test_retrieving_managed_password_triggers_keys_update(self
):
978 # Create a root key with a start time early enough to be usable at the
979 # time the gMSA is purported to be created.
980 samdb
= self
.get_samdb()
981 domain_dn
= self
.get_server_dn(samdb
)
982 self
.create_root_key(samdb
, domain_dn
, use_start_time
=ROOT_KEY_START_TIME
)
984 password_interval
= 16
986 local_samdb
= self
.get_local_samdb()
987 series
= GmsaSeries(Gkid(100, 0, 0), gkdi_rollover_interval(password_interval
))
988 self
.set_db_time(local_samdb
, series
.start_of_interval(0))
990 creds
= self
.gmsa_account(samdb
=local_samdb
, interval
=password_interval
)
993 current_nt_time
= self
.current_nt_time(samdb
)
994 self
.set_db_time(local_samdb
, current_nt_time
)
996 # Search the local database for the account’s keys.
997 res
= local_samdb
.search(
998 dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["unicodePwd", "supplementalCredentials"]
1000 self
.assertEqual(1, len(res
))
1002 previous_nt_hash
= res
[0].get("unicodePwd", idx
=0)
1003 previous_supplemental_creds
= self
.unpack_supplemental_credentials(
1004 res
[0].get("supplementalCredentials", idx
=0)
1007 # Check that the NT hash is the value we expect.
1008 self
.assertEqual(creds
.get_nt_hash(), previous_nt_hash
)
1010 # Search for the managed password over LDAP, triggering an update of the
1011 # keys in the database.
1012 res
= samdb
.search(dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["msDS-ManagedPassword"])
1013 self
.assertEqual(1, len(res
))
1015 # Verify that the password is present in the result.
1016 managed_password
= res
[0].get("msDS-ManagedPassword", idx
=0)
1017 self
.assertIsNotNone(managed_password
, "should be allowed to view the password")
1019 # Search the local database again for the account’s keys, which should
1020 # have been updated.
1021 res
= local_samdb
.search(
1022 dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["unicodePwd", "supplementalCredentials"]
1024 self
.assertEqual(1, len(res
))
1026 nt_hash
= res
[0].get("unicodePwd", idx
=0)
1027 supplemental_creds
= self
.unpack_supplemental_credentials(
1028 res
[0].get("supplementalCredentials", idx
=0)
1031 self
.assertNotEqual(
1032 previous_nt_hash
, nt_hash
, "NT hash has not been updated (yet)"
1034 self
.assertNotEqual(
1035 previous_supplemental_creds
,
1037 "supplementalCredentials has not been updated (yet)",
1040 # Set the new password.
1041 managed_pwd
= ndr_unpack(gmsa
.MANAGEDPASSWORD_BLOB
, managed_password
)
1042 self
.assertIsNotNone(
1043 managed_pwd
.passwords
.current
, "current password must be present"
1045 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
1047 # Check that the new NT hash is the value we expect.
1048 self
.assertEqual(creds
.get_nt_hash(), nt_hash
)
1050 def test_authentication_triggers_keys_update(self
):
1051 # Create a root key with a start time early enough to be usable at the
1052 # time the gMSA is purported to be created. But don’t create it on a
1053 # local samdb with a specifically set time, because (if the key isn’t
1054 # deleted later) we could end up with multiple keys with identical
1055 # creation and start times, and tests failing when the test and the
1056 # server don’t agree on which root key to use at a specific time.
1057 samdb
= self
.get_samdb()
1058 domain_dn
= self
.get_server_dn(samdb
)
1059 self
.create_root_key(samdb
, domain_dn
, use_start_time
=ROOT_KEY_START_TIME
)
1061 password_interval
= 16
1063 local_samdb
= self
.get_local_samdb()
1064 series
= GmsaSeries(Gkid(100, 0, 0), gkdi_rollover_interval(password_interval
))
1065 self
.set_db_time(local_samdb
, series
.start_of_interval(0))
1067 creds
= self
.gmsa_account(samdb
=local_samdb
, interval
=password_interval
)
1070 current_nt_time
= self
.current_nt_time(samdb
)
1071 self
.set_db_time(local_samdb
, current_nt_time
)
1073 # Search the local database for the account’s keys.
1074 res
= local_samdb
.search(
1075 dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["unicodePwd", "supplementalCredentials"]
1077 self
.assertEqual(1, len(res
))
1079 previous_nt_hash
= res
[0].get("unicodePwd", idx
=0)
1080 previous_supplemental_creds
= self
.unpack_supplemental_credentials(
1081 res
[0].get("supplementalCredentials", idx
=0)
1084 # Check that the NT hash is the value we expect.
1085 self
.assertEqual(creds
.get_nt_hash(), previous_nt_hash
)
1087 # Calculate the password with which to authenticate.
1088 current_series
= self
.gmsa_series_for_account(
1089 local_samdb
, creds
, password_interval
1091 managed_pwd
= self
.expected_gmsa_password_blob(
1094 current_series
.interval_gkid(0),
1095 query_expiration_gkid
=current_series
.interval_gkid(1),
1098 # Set the new password.
1099 self
.assertIsNotNone(
1100 managed_pwd
.passwords
.current
, "current password must be present"
1102 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
1104 # Perform an authentication using the new password. The KDC should
1105 # recognize that the keys in the database are out of date and update
1107 self
._as
_req
(creds
, self
.get_service_creds(), kcrypto
.Enctype
.AES256
)
1109 # Search the local database again for the account’s keys, which should
1110 # have been updated.
1111 res
= local_samdb
.search(
1112 dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["unicodePwd", "supplementalCredentials"]
1114 self
.assertEqual(1, len(res
))
1116 nt_hash
= res
[0].get("unicodePwd", idx
=0)
1117 supplemental_creds
= self
.unpack_supplemental_credentials(
1118 res
[0].get("supplementalCredentials", idx
=0)
1121 self
.assertNotEqual(
1122 previous_nt_hash
, nt_hash
, "NT hash has not been updated (yet)"
1124 self
.assertNotEqual(
1125 previous_supplemental_creds
,
1127 "supplementalCredentials has not been updated (yet)",
1130 # Check that the new NT hash is the value we expect.
1131 self
.assertEqual(creds
.get_nt_hash(), nt_hash
)
1133 def test_gmsa_can_perform_gensec_ntlmssp_logon(self
):
1134 creds
= self
.gmsa_account(kerberos_enabled
=False)
1136 # Perform a gensec logon.
1137 session
= self
.gensec_ntlmssp_logon(creds
, self
.get_local_samdb())
1139 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1140 token
= session
.security_token
1141 token_sids
= token
.sids
1142 self
.assertGreater(len(token_sids
), 0)
1144 # Ensure that they match.
1145 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1147 def test_gmsa_can_perform_gensec_ntlmssp_logon_when_current_key_is_valid(self
):
1148 """Test that we can perform a gensec logon at a time when we are sure
1149 the current gMSA password is valid."""
1151 password_interval
= 18
1153 samdb
= self
.get_local_samdb()
1154 series
= self
.gmsa_series(password_interval
)
1155 self
.set_db_time(samdb
, series
.start_of_interval(0))
1157 creds
= self
.gmsa_account(
1158 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1161 # Perform a gensec logon.
1162 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1164 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1165 token
= session
.security_token
1166 token_sids
= token
.sids
1167 self
.assertGreater(len(token_sids
), 0)
1169 # Ensure that they match.
1170 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1172 def test_gmsa_can_perform_gensec_ntlmssp_logon_when_current_key_is_expired(self
):
1173 """Test that we can perform a gensec logon using NTLMSSP at a time when
1174 the current gMSA password has expired."""
1176 password_interval
= 40
1178 samdb
= self
.get_local_samdb()
1179 series
= self
.gmsa_series(password_interval
)
1180 self
.set_db_time(samdb
, series
.start_of_interval(0))
1182 creds
= self
.gmsa_account(
1183 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1186 # Set the time to the moment the original password has expired, and
1187 # perform a gensec logon.
1188 expired_time
= series
.start_of_interval(1)
1189 self
.set_db_time(samdb
, expired_time
)
1191 # Calculate the password with which to authenticate.
1192 current_series
= self
.gmsa_series_for_account(samdb
, creds
, password_interval
)
1193 managed_pwd
= self
.expected_gmsa_password_blob(
1196 current_series
.interval_gkid(0),
1197 previous_gkid
=current_series
.interval_gkid(-1),
1198 query_expiration_gkid
=current_series
.interval_gkid(1),
1201 # Set the new password.
1202 self
.assertIsNotNone(
1203 managed_pwd
.passwords
.current
, "current password must be present"
1205 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
1207 # Perform a gensec logon.
1208 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1210 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1211 token
= session
.security_token
1212 token_sids
= token
.sids
1213 self
.assertGreater(len(token_sids
), 0)
1215 # Ensure that they match.
1216 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1218 def test_gmsa_can_perform_gensec_ntlmssp_logon_when_next_key_is_expired(self
):
1219 password_interval
= 42
1221 samdb
= self
.get_local_samdb()
1222 series
= self
.gmsa_series(password_interval
)
1223 self
.set_db_time(samdb
, series
.start_of_interval(0))
1225 creds
= self
.gmsa_account(
1226 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1229 expired_time
= series
.start_of_interval(2)
1230 self
.set_db_time(samdb
, expired_time
)
1232 # Calculate the password with which to authenticate.
1233 current_series
= self
.gmsa_series_for_account(samdb
, creds
, password_interval
)
1234 managed_pwd
= self
.expected_gmsa_password_blob(
1237 current_series
.interval_gkid(0),
1238 previous_gkid
=current_series
.interval_gkid(-1),
1239 query_expiration_gkid
=current_series
.interval_gkid(1),
1242 # Set the new password.
1243 self
.assertIsNotNone(
1244 managed_pwd
.passwords
.current
, "current password must be present"
1246 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
1248 # Perform a gensec logon.
1249 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1251 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1252 token
= session
.security_token
1253 token_sids
= token
.sids
1254 self
.assertGreater(len(token_sids
), 0)
1256 # Ensure that they match.
1257 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1259 def test_gmsa_can_perform_gensec_ntlmssp_logon_during_clock_skew_window_when_current_key_is_valid(
1262 password_interval
= 43
1264 samdb
= self
.get_local_samdb()
1265 series
= self
.gmsa_series(password_interval
)
1266 self
.set_db_time(samdb
, series
.start_of_interval(0))
1268 creds
= self
.gmsa_account(
1269 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1272 self
.set_db_time(samdb
, series
.during_skew_window(0))
1274 # Calculate the password with which to authenticate.
1275 current_series
= self
.gmsa_series_for_account(samdb
, creds
, password_interval
)
1276 managed_pwd
= self
.expected_gmsa_password_blob(
1279 current_series
.interval_gkid(0),
1280 previous_gkid
=current_series
.interval_gkid(-1),
1281 query_expiration_gkid
=current_series
.interval_gkid(1),
1284 # Set the new password.
1285 self
.assertIsNotNone(
1286 managed_pwd
.passwords
.current
, "current password must be present"
1288 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
1290 # Perform a gensec logon.
1291 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1293 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1294 token
= session
.security_token
1295 token_sids
= token
.sids
1296 self
.assertGreater(len(token_sids
), 0)
1298 # Ensure that they match.
1299 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1301 def test_gmsa_can_perform_gensec_ntlmssp_logon_during_clock_skew_window_when_current_key_is_expired(
1304 password_interval
= 44
1306 samdb
= self
.get_local_samdb()
1307 series
= self
.gmsa_series(password_interval
)
1308 self
.set_db_time(samdb
, series
.start_of_interval(0))
1310 creds
= self
.gmsa_account(
1311 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1314 self
.set_db_time(samdb
, series
.during_skew_window(1))
1316 # Calculate the password with which to authenticate.
1317 current_series
= self
.gmsa_series_for_account(samdb
, creds
, password_interval
)
1318 managed_pwd
= self
.expected_gmsa_password_blob(
1321 current_series
.interval_gkid(0),
1322 previous_gkid
=current_series
.interval_gkid(-1),
1323 query_expiration_gkid
=current_series
.interval_gkid(1),
1326 # Set the new password.
1327 self
.assertIsNotNone(
1328 managed_pwd
.passwords
.current
, "current password must be present"
1330 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
1332 # Perform a gensec logon.
1333 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1335 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1336 token
= session
.security_token
1337 token_sids
= token
.sids
1338 self
.assertGreater(len(token_sids
), 0)
1340 # Ensure that they match.
1341 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1343 def test_gmsa_can_perform_gensec_ntlmssp_logon_during_clock_skew_window_when_next_key_is_expired(
1346 password_interval
= 47
1348 samdb
= self
.get_local_samdb()
1349 series
= self
.gmsa_series(password_interval
)
1350 self
.set_db_time(samdb
, series
.start_of_interval(0))
1352 creds
= self
.gmsa_account(
1353 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1356 self
.set_db_time(samdb
, series
.during_skew_window(2))
1358 # Calculate the password with which to authenticate.
1359 current_series
= self
.gmsa_series_for_account(samdb
, creds
, password_interval
)
1360 managed_pwd
= self
.expected_gmsa_password_blob(
1363 current_series
.interval_gkid(0),
1364 previous_gkid
=current_series
.interval_gkid(-1),
1365 query_expiration_gkid
=current_series
.interval_gkid(1),
1368 # Set the new password.
1369 self
.assertIsNotNone(
1370 managed_pwd
.passwords
.current
, "current password must be present"
1372 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
1374 # Perform a gensec logon.
1375 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1377 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1378 token
= session
.security_token
1379 token_sids
= token
.sids
1380 self
.assertGreater(len(token_sids
), 0)
1382 # Ensure that they match.
1383 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1385 def test_gmsa_can_perform_gensec_ntlmssp_logon_with_previous_password_within_five_minutes(
1388 password_interval
= 123
1390 samdb
= self
.get_local_samdb()
1391 series
= self
.gmsa_series(password_interval
)
1392 self
.set_db_time(samdb
, series
.start_of_interval(0))
1394 creds
= self
.gmsa_account(
1395 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1398 # Set the time to within five minutes of the original password’s expiry,
1399 # and perform a gensec logon with the original password.
1400 expired_time
= series
.within_previous_password_valid_window(1)
1401 self
.set_db_time(samdb
, expired_time
)
1403 # Perform a gensec logon.
1404 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1406 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1407 token
= session
.security_token
1408 token_sids
= token
.sids
1409 self
.assertGreater(len(token_sids
), 0)
1411 # Ensure that they match.
1412 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1414 def test_gmsa_cannot_perform_gensec_ntlmssp_logon_with_previous_but_one_password_within_five_minutes(
1417 password_interval
= 123
1419 samdb
= self
.get_local_samdb()
1420 series
= self
.gmsa_series(password_interval
)
1421 self
.set_db_time(samdb
, series
.start_of_interval(0))
1423 creds
= self
.gmsa_account(
1424 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1427 # Set the time to within five minutes of the *following* password’s expiry,
1428 # and perform a gensec logon with the original password.
1429 expired_time
= series
.within_previous_password_valid_window(2)
1430 self
.set_db_time(samdb
, expired_time
)
1432 # Expect the gensec logon to fail.
1433 self
.gensec_ntlmssp_logon(creds
, samdb
, expect_success
=False)
1435 def test_gmsa_can_perform_gensec_ntlmssp_logon_with_previous_password_beyond_five_minutes(
1438 password_interval
= 456
1440 samdb
= self
.get_local_samdb()
1441 series
= self
.gmsa_series(password_interval
)
1442 self
.set_db_time(samdb
, series
.start_of_interval(0))
1444 creds
= self
.gmsa_account(
1445 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1448 # Set the time to five minutes beyond the original password’s expiry,
1449 # and try to perform a gensec logon with the original password.
1450 expired_time
= series
.outside_previous_password_valid_window(1)
1451 self
.set_db_time(samdb
, expired_time
)
1453 # Perform a gensec logon.
1454 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1456 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1457 token
= session
.security_token
1458 token_sids
= token
.sids
1459 self
.assertGreater(len(token_sids
), 0)
1461 # Ensure that they match.
1462 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1464 def test_gmsa_cannot_perform_gensec_ntlmssp_logon_with_previous_password_five_minutes_apart(
1467 password_interval
= 789
1469 samdb
= self
.get_local_samdb()
1470 series
= self
.gmsa_series(password_interval
)
1471 self
.set_db_time(samdb
, series
.start_of_interval(0))
1473 creds
= self
.gmsa_account(
1474 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1476 gmsa_sid
= creds
.get_sid()
1478 # Set the time to after the original password’s expiry, and perform a
1479 # gensec logon with the original password.
1480 db_time
= series
.during_interval(1)
1481 self
.set_db_time(samdb
, db_time
)
1483 # Perform a gensec logon.
1484 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1486 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1487 token
= session
.security_token
1488 token_sids
= token
.sids
1489 self
.assertGreater(len(token_sids
), 0)
1491 # Ensure that they match.
1492 self
.assertEqual(security
.dom_sid(gmsa_sid
), token_sids
[0])
1494 # Set the time to not quite five minutes later, and perform a gensec
1495 # logon with the original password.
1496 self
.set_db_time(samdb
, NtTime(db_time
+ MAX_CLOCK_SKEW
- 1))
1498 # Perform a gensec logon.
1499 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1501 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1502 token
= session
.security_token
1503 token_sids
= token
.sids
1504 self
.assertGreater(len(token_sids
), 0)
1506 # Ensure that they match.
1507 self
.assertEqual(security
.dom_sid(gmsa_sid
), token_sids
[0])
1509 # Now set the time to exactly five minutes later, and try to perform a
1510 # gensec logon with the original password.
1511 self
.set_db_time(samdb
, NtTime(db_time
+ MAX_CLOCK_SKEW
))
1513 # Expect the gensec logon to fail.
1514 self
.gensec_ntlmssp_logon(creds
, samdb
, expect_success
=False)
1516 def test_gmsa_can_perform_netlogon(self
):
1517 self
._test
_samlogon
(
1518 self
.gmsa_account(kerberos_enabled
=False),
1519 netlogon
.NetlogonNetworkInformation
,
1520 validation_level
=netlogon
.NetlogonValidationSamInfo4
,
1523 def test_computer_cannot_perform_interactive_logon(self
):
1524 self
._test
_samlogon
(
1525 self
.get_mach_creds(),
1526 netlogon
.NetlogonInteractiveInformation
,
1527 expect_error
=ntstatus
.NT_STATUS_NO_SUCH_USER
,
1528 validation_level
=netlogon
.NetlogonValidationSamInfo4
,
1531 def test_gmsa_cannot_perform_interactive_logon(self
):
1532 self
._test
_samlogon
(
1533 self
.gmsa_account(kerberos_enabled
=False),
1534 netlogon
.NetlogonInteractiveInformation
,
1535 expect_error
=ntstatus
.NT_STATUS_NO_SUCH_USER
,
1536 validation_level
=netlogon
.NetlogonValidationSamInfo4
,
1539 def _gmsa_can_perform_as_req(self
, *, enctype
: kcrypto
.Enctype
) -> None:
1540 self
._as
_req
(self
.gmsa_account(), self
.get_service_creds(), enctype
)
1542 def test_gmsa_can_perform_as_req_with_aes256(self
):
1543 self
._gmsa
_can
_perform
_as
_req
(enctype
=kcrypto
.Enctype
.AES256
)
1545 def test_gmsa_can_perform_as_req_with_rc4(self
):
1546 self
._gmsa
_can
_perform
_as
_req
(enctype
=kcrypto
.Enctype
.RC4
)
1548 def _gmsa_can_authenticate_to_ldap(self
, *, with_kerberos
: bool) -> None:
1549 creds
= self
.gmsa_account(kerberos_enabled
=with_kerberos
)
1553 # Authenticate to LDAP.
1555 url
=f
"{protocol}://{self.dc_host}", credentials
=creds
, lp
=self
.get_lp()
1558 # Search for the user’s token groups.
1559 res
= samdb_user
.search("", scope
=ldb
.SCOPE_BASE
, attrs
=["tokenGroups"])
1560 self
.assertEqual(1, len(res
))
1562 token_groups
= res
[0].get("tokenGroups", idx
=0)
1563 self
.assertIsNotNone(token_groups
)
1565 # Ensure that the token SID matches.
1566 token_sid
= ndr_unpack(security
.dom_sid
, token_groups
)
1567 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sid
)
1569 def test_gmsa_can_authenticate_to_ldap_with_kerberos(self
):
1570 self
._gmsa
_can
_authenticate
_to
_ldap
(with_kerberos
=True)
1572 def test_gmsa_can_authenticate_to_ldap_without_kerberos(self
):
1573 self
._gmsa
_can
_authenticate
_to
_ldap
(with_kerberos
=False)
1576 if __name__
== "__main__":