s3:ntlm_auth: make logs more consistent with length check
[Samba.git] / python / samba / tests / gkdi.py
blob1fec624248b1ec482b6395832b0ad4321e15a979
2 # Helper classes for testing the Group Key Distribution Service.
4 # Copyright (C) Catalyst.Net Ltd 2023
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <https://www.gnu.org/licenses/>.
20 import sys
21 import os
23 sys.path.insert(0, "bin/python")
24 os.environ["PYTHONUNBUFFERED"] = "1"
26 import datetime
27 import secrets
28 from typing import NewType, Optional, Tuple, Union
30 import ldb
32 from cryptography.hazmat.backends import default_backend
33 from cryptography.hazmat.primitives import hashes
34 from cryptography.hazmat.primitives.kdf.kbkdf import CounterLocation, KBKDFHMAC, Mode
36 from samba import (
37 ntstatus,
38 NTSTATUSError,
39 werror,
41 from samba.credentials import Credentials
42 from samba.dcerpc import gkdi, misc
43 from samba.gkdi import (
44 Algorithm,
45 Gkid,
46 GkidType,
47 GroupKey,
48 KEY_CYCLE_DURATION,
49 KEY_LEN_BYTES,
50 MAX_CLOCK_SKEW,
51 SeedKeyPair,
53 from samba.hresult import (
54 HRES_E_INVALIDARG,
55 HRES_NTE_BAD_KEY,
56 HRES_NTE_NO_KEY,
58 from samba.ndr import ndr_pack, ndr_unpack
59 from samba.nt_time import (
60 datetime_from_nt_time,
61 nt_time_from_datetime,
62 NtTime,
63 NtTimeDelta,
64 timedelta_from_nt_time_delta,
66 from samba.param import LoadParm
67 from samba.samdb import SamDB
69 from samba.tests import delete_force, TestCase
72 HResult = NewType("HResult", int)
73 RootKey = NewType("RootKey", ldb.Message)
76 ROOT_KEY_START_TIME = NtTime(KEY_CYCLE_DURATION + MAX_CLOCK_SKEW)
78 DSDB_GMSA_TIME_OPAQUE = "dsdb_gmsa_time_opaque"
81 class GetKeyError(Exception):
82 def __init__(self, status: HResult, message: str):
83 super().__init__(status, message)
86 class GkdiBaseTest(TestCase):
87 # This is the NDR‐encoded security descriptor O:SYD:(A;;FRFW;;;S-1-5-9).
88 gmsa_sd = (
89 b"\x01\x00\x04\x800\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
90 b"\x14\x00\x00\x00\x02\x00\x1c\x00\x01\x00\x00\x00\x00\x00\x14\x00"
91 b"\x9f\x01\x12\x00\x01\x01\x00\x00\x00\x00\x00\x05\t\x00\x00\x00"
92 b"\x01\x01\x00\x00\x00\x00\x00\x05\x12\x00\x00\x00"
95 def set_db_time(self, samdb: SamDB, time: Optional[NtTime]) -> None:
96 samdb.set_opaque(DSDB_GMSA_TIME_OPAQUE, time)
98 def get_db_time(self, samdb: SamDB) -> Optional[NtTime]:
99 return samdb.get_opaque(DSDB_GMSA_TIME_OPAQUE)
101 def current_time(
102 self, samdb: SamDB, *, offset: Optional[datetime.timedelta] = None
103 ) -> datetime.datetime:
104 now = self.get_db_time(samdb)
105 if now is None:
106 current_time = datetime.datetime.now(tz=datetime.timezone.utc)
107 else:
108 current_time = datetime_from_nt_time(now)
110 if offset is not None:
111 current_time += offset
113 return current_time
115 def current_nt_time(
116 self, samdb: SamDB, *, offset: Optional[datetime.timedelta] = None
117 ) -> NtTime:
118 return nt_time_from_datetime(self.current_time(samdb, offset=offset))
120 def current_gkid(
121 self, samdb: SamDB, *, offset: Optional[datetime.timedelta] = None
122 ) -> Gkid:
123 return Gkid.from_nt_time(self.current_nt_time(samdb, offset=offset))
125 def gkdi_connect(
126 self, host: str, lp: LoadParm, server_creds: Credentials
127 ) -> gkdi.gkdi:
128 try:
129 return gkdi.gkdi(f"ncacn_ip_tcp:{host}[seal]", lp, server_creds)
130 except NTSTATUSError as err:
131 if err.args[0] == ntstatus.NT_STATUS_PORT_UNREACHABLE:
132 self.fail(
133 "Try starting the Microsoft Key Distribution Service (KdsSvc).\n"
134 "In PowerShell, run:\n\tStart-Service -Name KdsSvc"
137 raise
139 def rpc_get_key(
140 self,
141 conn: gkdi.gkdi,
142 target_sd: bytes,
143 root_key_id: Optional[misc.GUID],
144 gkid: Gkid,
145 ) -> SeedKeyPair:
146 out_len, out, result = conn.GetKey(
147 list(target_sd), root_key_id, gkid.l0_idx, gkid.l1_idx, gkid.l2_idx
149 result_code, result_string = result
150 if (
151 root_key_id is None
152 and result_code & 0xFFFF == werror.WERR_TOO_MANY_OPEN_FILES
154 self.fail(
155 "The server has given up selecting a root key because there are too"
156 " many keys (more than 1000) in the Master Root Keys container. Delete"
157 " some root keys and try again."
159 if result != (0, None):
160 raise GetKeyError(result_code, result_string)
161 self.assertEqual(len(out), out_len, "output len mismatch")
163 envelope = ndr_unpack(gkdi.GroupKeyEnvelope, bytes(out))
165 gkid = Gkid(envelope.l0_index, envelope.l1_index, envelope.l2_index)
166 l1_key = bytes(envelope.l1_key) if envelope.l1_key else None
167 l2_key = bytes(envelope.l2_key) if envelope.l2_key else None
169 hash_algorithm = Algorithm.from_kdf_parameters(bytes(envelope.kdf_parameters))
171 root_key_id = envelope.root_key_id
173 return SeedKeyPair(l1_key, l2_key, gkid, hash_algorithm, root_key_id)
175 def get_root_key_object(
176 self, samdb: SamDB, root_key_id: Optional[misc.GUID], gkid: Gkid
177 ) -> Tuple[RootKey, misc.GUID]:
178 """Return a root key object and its corresponding GUID.
180 *root_key_id* specifies the GUID of the root key object to return. It
181 can be ``None`` to indicate that the selected key should be the most
182 recently created key starting not after the time indicated by *gkid*.
184 Bear in mind as that the Microsoft Key Distribution Service caches root
185 keys, the most recently created key might not be the one that Windows
186 chooses."""
188 root_key_attrs = [
189 "cn",
190 "msKds-CreateTime",
191 "msKds-KDFAlgorithmID",
192 "msKds-KDFParam",
193 "msKds-RootKeyData",
194 "msKds-UseStartTime",
195 "msKds-Version",
198 gkid_start_nt_time = gkid.start_nt_time()
200 exact_key_specified = root_key_id is not None
201 if exact_key_specified:
202 root_key_dn = self.get_root_key_container_dn(samdb)
203 root_key_dn.add_child(f"CN={root_key_id}")
205 try:
206 root_key_res = samdb.search(
207 root_key_dn, scope=ldb.SCOPE_BASE, attrs=root_key_attrs
209 except ldb.LdbError as err:
210 if err.args[0] == ldb.ERR_NO_SUCH_OBJECT:
211 raise GetKeyError(HRES_NTE_NO_KEY, "no such root key exists")
213 raise
215 root_key_object = root_key_res[0]
216 else:
217 root_keys = samdb.search(
218 self.get_root_key_container_dn(samdb),
219 scope=ldb.SCOPE_SUBTREE,
220 expression=f"(msKds-UseStartTime<={gkid_start_nt_time})",
221 attrs=root_key_attrs,
223 if not root_keys:
224 raise GetKeyError(
225 HRES_NTE_NO_KEY, "no root keys exist at specified time"
228 def root_key_create_time(key: RootKey) -> NtTime:
229 create_time = key.get("msKds-CreateTime", idx=0)
230 if create_time is None:
231 return NtTime(0)
233 return NtTime(int(create_time))
235 root_key_object = max(root_keys, key=root_key_create_time)
237 root_key_cn = root_key_object.get("cn", idx=0)
238 self.assertIsNotNone(root_key_cn)
239 root_key_id = misc.GUID(root_key_cn)
241 data = root_key_object.get("msKds-RootKeyData", idx=0)
242 self.assertIsNotNone(data)
243 if len(data) != KEY_LEN_BYTES:
244 raise GetKeyError(
245 HRES_NTE_BAD_KEY, f"root key data must be {KEY_LEN_BYTES} bytes"
248 use_start_nt_time = NtTime(
249 int(root_key_object.get("msKds-UseStartTime", idx=0))
251 if use_start_nt_time == 0:
252 raise GetKeyError(HRES_NTE_BAD_KEY, "root key effective time is 0")
253 use_start_nt_time = NtTime(
254 use_start_nt_time - NtTimeDelta(KEY_CYCLE_DURATION + MAX_CLOCK_SKEW)
257 if exact_key_specified and not (0 <= use_start_nt_time <= gkid_start_nt_time):
258 raise GetKeyError(HRES_E_INVALIDARG, "root key is not yet valid")
260 return root_key_object, root_key_id
262 def validate_get_key_request(
263 self, gkid: Gkid, current_time: NtTime, root_key_specified: bool
264 ) -> None:
265 # The key being requested must not be from the future. That said, we
266 # allow for a little bit of clock skew so that we can compute the next
267 # managed password prior to the expiration of the current one.
268 current_gkid = Gkid.from_nt_time(NtTime(current_time + MAX_CLOCK_SKEW))
269 if gkid > current_gkid:
270 raise GetKeyError(
271 HRES_E_INVALIDARG,
272 f"invalid request for a key from the future: {gkid} > {current_gkid}",
275 gkid_type = gkid.gkid_type()
276 if gkid_type is GkidType.DEFAULT:
277 derived_from = (
278 " derived from the specified root key" if root_key_specified else ""
280 raise NotImplementedError(
281 f"The latest group key{derived_from} is being requested."
284 if gkid_type is not GkidType.L2_SEED_KEY:
285 raise GetKeyError(
286 HRES_E_INVALIDARG, f"invalid request for {gkid_type.description()}"
289 def get_key(
290 self,
291 samdb: SamDB,
292 target_sd: bytes, # An NDR‐encoded valid security descriptor in self‐relative format.
293 root_key_id: Optional[misc.GUID],
294 gkid: Gkid,
296 root_key_id_hint: Optional[misc.GUID] = None,
297 current_time: Optional[NtTime] = None,
298 ) -> SeedKeyPair:
299 """Emulate the ISDKey.GetKey() RPC method.
301 When passed a NULL root key ID, GetKey() may use a cached root key
302 rather than picking the most recently created applicable key as the
303 documentation implies. If it’s important to arrive at the same result as
304 Windows, pass a GUID in the *root_key_id_hint* parameter to specify a
305 particular root key to use."""
307 if current_time is None:
308 current_time = self.current_nt_time(samdb)
310 root_key_specified = root_key_id is not None
311 if root_key_specified:
312 self.assertIsNone(
313 root_key_id_hint, "don’t provide both root key ID parameters"
316 self.validate_get_key_request(gkid, current_time, root_key_specified)
318 root_key_object, root_key_id = self.get_root_key_object(
319 samdb, root_key_id if root_key_specified else root_key_id_hint, gkid
322 if root_key_specified:
323 current_gkid = Gkid.from_nt_time(current_time)
324 if gkid.l0_idx < current_gkid.l0_idx:
325 # All of the seed keys with an L0 index less than the current L0
326 # index are from the past and thus are safe to return. If the
327 # caller has requested a specific seed key with a past L0 index,
328 # return the L1 seed key (L0, 31, −1), from which any L1 or L2
329 # seed key having that L0 index can be derived.
330 l1_gkid = Gkid(gkid.l0_idx, 31, -1)
331 seed_key = self.compute_seed_key(
332 target_sd, root_key_id, root_key_object, l1_gkid
334 return SeedKeyPair(
335 seed_key.key,
336 None,
337 Gkid(gkid.l0_idx, 31, 31),
338 seed_key.hash_algorithm,
339 root_key_id,
342 # All of the previous seed keys with an L0 index equal to the
343 # current L0 index can be derived from the current seed key or from
344 # the next older L1 seed key.
345 gkid = current_gkid
347 if gkid.l2_idx == 31:
348 # The current seed key, and all previous seed keys with that same L0
349 # index, can be derived from the L1 seed key (L0, L1, 31).
350 l1_gkid = Gkid(gkid.l0_idx, gkid.l1_idx, -1)
351 seed_key = self.compute_seed_key(
352 target_sd, root_key_id, root_key_object, l1_gkid
354 return SeedKeyPair(
355 seed_key.key, None, gkid, seed_key.hash_algorithm, root_key_id
358 # Compute the L2 seed key to return.
359 seed_key = self.compute_seed_key(target_sd, root_key_id, root_key_object, gkid)
361 next_older_seed_key = None
362 if gkid.l1_idx != 0:
363 # From the current seed key can be derived only those seed keys that
364 # share its L1 and L2 indices. To be able to derive previous seed
365 # keys with older L1 indices, the caller must be given the next
366 # older L1 seed key as well.
367 next_older_l1_gkid = Gkid(gkid.l0_idx, gkid.l1_idx - 1, -1)
368 next_older_seed_key = self.compute_seed_key(
369 target_sd, root_key_id, root_key_object, next_older_l1_gkid
370 ).key
372 return SeedKeyPair(
373 next_older_seed_key,
374 seed_key.key,
375 gkid,
376 seed_key.hash_algorithm,
377 root_key_id,
380 def get_key_exact(
381 self,
382 samdb: SamDB,
383 target_sd: bytes, # An NDR‐encoded valid security descriptor in self‐relative format.
384 root_key_id: Optional[misc.GUID],
385 gkid: Gkid,
386 current_time: Optional[NtTime] = None,
387 ) -> GroupKey:
388 if current_time is None:
389 current_time = self.current_nt_time(samdb)
391 root_key_specified = root_key_id is not None
392 self.validate_get_key_request(gkid, current_time, root_key_specified)
394 root_key_object, root_key_id = self.get_root_key_object(
395 samdb, root_key_id, gkid
398 return self.compute_seed_key(target_sd, root_key_id, root_key_object, gkid)
400 def get_root_key_data(self, root_key: RootKey) -> Tuple[bytes, Algorithm]:
401 version = root_key.get("msKds-Version", idx=0)
402 self.assertEqual(b"1", version)
404 algorithm_id = root_key.get("msKds-KDFAlgorithmID", idx=0)
405 self.assertEqual(b"SP800_108_CTR_HMAC", algorithm_id)
407 hash_algorithm = Algorithm.from_kdf_parameters(
408 root_key.get("msKds-KDFParam", idx=0)
411 root_key_data = root_key.get("msKds-RootKeyData", idx=0)
412 self.assertIsInstance(root_key_data, bytes)
414 return root_key_data, hash_algorithm
416 def compute_seed_key(
417 self,
418 target_sd: bytes,
419 root_key_id: misc.GUID,
420 root_key: RootKey,
421 target_gkid: Gkid,
422 ) -> GroupKey:
423 target_gkid_type = target_gkid.gkid_type()
424 self.assertIn(
425 target_gkid_type,
426 (GkidType.L1_SEED_KEY, GkidType.L2_SEED_KEY),
427 f"unexpected attempt to compute {target_gkid_type.description()}",
430 root_key_data, algorithm = self.get_root_key_data(root_key)
431 root_key_id_bytes = ndr_pack(root_key_id)
433 hash_algorithm = algorithm.algorithm()
435 # Derive the L0 seed key.
436 gkid = Gkid.l0_seed_key(target_gkid.l0_idx)
437 key = self.derive_key(root_key_data, root_key_id_bytes, hash_algorithm, gkid)
439 # Derive the L1 seed key.
441 gkid = gkid.derive_l1_seed_key()
442 key = self.derive_key(
443 key, root_key_id_bytes, hash_algorithm, gkid, target_sd=target_sd
446 while gkid.l1_idx != target_gkid.l1_idx:
447 gkid = gkid.derive_l1_seed_key()
448 key = self.derive_key(key, root_key_id_bytes, hash_algorithm, gkid)
450 # Derive the L2 seed key.
451 while gkid != target_gkid:
452 gkid = gkid.derive_l2_seed_key()
453 key = self.derive_key(key, root_key_id_bytes, hash_algorithm, gkid)
455 return GroupKey(key, gkid, algorithm, root_key_id)
457 def derive_key(
458 self,
459 key: bytes,
460 root_key_id_bytes: bytes,
461 hash_algorithm: hashes.HashAlgorithm,
462 gkid: Gkid,
464 target_sd: Optional[bytes] = None,
465 ) -> bytes:
466 def u32_bytes(n: int) -> bytes:
467 return (n & 0xFFFF_FFFF).to_bytes(length=4, byteorder="little")
469 context = (
470 root_key_id_bytes
471 + u32_bytes(gkid.l0_idx)
472 + u32_bytes(gkid.l1_idx)
473 + u32_bytes(gkid.l2_idx)
475 if target_sd is not None:
476 context += target_sd
477 return self.kdf(hash_algorithm, key, context)
479 def kdf(
480 self,
481 hash_algorithm: hashes.HashAlgorithm,
482 key: bytes,
483 context: bytes,
485 label="KDS service",
486 len_in_bytes=KEY_LEN_BYTES,
487 ) -> bytes:
488 label = label.encode("utf-16-le") + b"\x00\x00"
489 kdf = KBKDFHMAC(
490 algorithm=hash_algorithm,
491 mode=Mode.CounterMode,
492 length=len_in_bytes,
493 rlen=4,
494 llen=4,
495 location=CounterLocation.BeforeFixed,
496 label=label,
497 context=context,
498 fixed=None,
499 backend=default_backend(),
501 return kdf.derive(key)
503 def get_config_dn(self, samdb: SamDB, dn: str) -> ldb.Dn:
504 config_dn = samdb.get_config_basedn()
505 config_dn.add_child(dn)
506 return config_dn
508 def get_server_config_dn(self, samdb: SamDB) -> ldb.Dn:
509 # [MS-GKDI] has “CN=Sid Key Service” for “CN=Group Key Distribution
510 # Service”, and “CN=SID Key Server Configuration” for “CN=Group Key
511 # Distribution Service Server Configuration”.
512 return self.get_config_dn(
513 samdb,
514 "CN=Group Key Distribution Service Server Configuration,"
515 "CN=Server Configuration,"
516 "CN=Group Key Distribution Service,"
517 "CN=Services",
520 def get_root_key_container_dn(self, samdb: SamDB) -> ldb.Dn:
521 # [MS-GKDI] has “CN=Sid Key Service” for “CN=Group Key Distribution Service”.
522 return self.get_config_dn(
523 samdb,
524 "CN=Master Root Keys,CN=Group Key Distribution Service,CN=Services",
527 def create_root_key(
528 self,
529 samdb: SamDB,
530 domain_dn: ldb.Dn,
532 use_start_time: Optional[Union[datetime.datetime, NtTime]] = None,
533 hash_algorithm: Optional[Algorithm] = Algorithm.SHA512,
534 guid: Optional[misc.GUID] = None,
535 data: Optional[bytes] = None,
536 ) -> misc.GUID:
537 # we defer the actual work to the create_root_key() function,
538 # which exists so that the samba-tool tests can borrow that
539 # function.
541 root_key_guid, root_key_dn = create_root_key(
542 samdb,
543 domain_dn,
544 current_nt_time=self.current_nt_time(
545 samdb,
546 # Allow for clock skew.
547 offset=timedelta_from_nt_time_delta(MAX_CLOCK_SKEW),
549 use_start_time=use_start_time,
550 hash_algorithm=hash_algorithm,
551 guid=guid,
552 data=data,
555 if guid is not None:
556 # A test may request that a root key have a specific GUID so that
557 # results may be reproducible. Ensure these keys are cleaned up
558 # afterwards.
559 self.addCleanup(delete_force, samdb, root_key_dn)
560 self.assertEqual(guid, root_key_guid)
562 return root_key_guid
565 def create_root_key(
566 samdb: SamDB,
567 domain_dn: ldb.Dn,
569 current_nt_time: NtTime,
570 use_start_time: Optional[Union[datetime.datetime, NtTime]] = None,
571 hash_algorithm: Optional[Algorithm] = Algorithm.SHA512,
572 guid: Optional[misc.GUID] = None,
573 data: Optional[bytes] = None,
574 ) -> Tuple[misc.GUID, ldb.Dn]:
575 # [MS-GKDI] 3.1.4.1.1, “Creating a New Root Key”, states that if the
576 # server receives a GetKey request and the root keys container in Active
577 # Directory is empty, the the server must create a new root key object
578 # based on the default Server Configuration object. Additional root keys
579 # are to be created based on either the default Server Configuration
580 # object or an updated one specifying optional configuration values.
582 if guid is None:
583 guid = misc.GUID(secrets.token_bytes(16))
585 if data is None:
586 data = secrets.token_bytes(KEY_LEN_BYTES)
588 create_time = current_nt_time
590 if use_start_time is None:
591 # Root keys created by Windows without the ‘-EffectiveImmediately’
592 # parameter have an effective time of exactly ten days in the
593 # future, presumably to allow time for replication.
595 # Microsoft’s documentation on creating a KDS root key, located at
596 # https://learn.microsoft.com/en-us/windows-server/security/group-managed-service-accounts/create-the-key-distribution-services-kds-root-key,
597 # claims to the contrary that domain controllers will only wait up
598 # to ten hours before allowing Group Managed Service Accounts to be
599 # created.
601 # The same page includes instructions for creating a root key with
602 # an effective time of ten hours in the past (for testing purposes),
603 # but I’m not sure why — the KDS will consider a key valid for use
604 # immediately after its start time has passed, without bothering to
605 # wait ten hours first. In fact, it will consider a key to be valid
606 # a full ten hours (plus clock skew) *before* its declared start
607 # time — intentional, or (conceivably) the result of an accidental
608 # negation?
609 current_interval_start_nt_time = Gkid.from_nt_time(
610 current_nt_time
611 ).start_nt_time()
612 use_start_time = NtTime(
613 current_interval_start_nt_time + KEY_CYCLE_DURATION + MAX_CLOCK_SKEW
616 if isinstance(use_start_time, datetime.datetime):
617 use_start_nt_time = nt_time_from_datetime(use_start_time)
618 elif isinstance(use_start_time, int):
619 use_start_nt_time = use_start_time
620 else:
621 raise ValueError("use_start_time should be a datetime or int")
623 kdf_parameters = None
624 if hash_algorithm is not None:
625 kdf_parameters = gkdi.KdfParameters()
626 kdf_parameters.hash_algorithm = hash_algorithm.value
627 kdf_parameters = ndr_pack(kdf_parameters)
629 # These are the encoded p and g values, respectively, of the “2048‐bit
630 # MODP Group with 256‐bit Prime Order Subgroup” from RFC 5114 section
631 # 2.3.
632 field_order = (
633 b"\x87\xa8\xe6\x1d\xb4\xb6f<\xff\xbb\xd1\x9ce\x19Y\x99\x8c\xee\xf6\x08"
634 b"f\r\xd0\xf2],\xee\xd4C^;\x00\xe0\r\xf8\xf1\xd6\x19W\xd4\xfa\xf7\xdfE"
635 b"a\xb2\xaa0\x16\xc3\xd9\x114\to\xaa;\xf4)m\x83\x0e\x9a|"
636 b" \x9e\x0cd\x97Qz\xbd"
637 b'Z\x8a\x9d0k\xcfg\xed\x91\xf9\xe6r[GX\xc0"\xe0\xb1\xefBu\xbf{l[\xfc\x11'
638 b"\xd4_\x90\x88\xb9A\xf5N\xb1\xe5\x9b\xb8\xbc9\xa0\xbf\x120\x7f\\O\xdbp\xc5"
639 b"\x81\xb2?v\xb6:\xca\xe1\xca\xa6\xb7\x90-RRg5H\x8a\x0e\xf1<m\x9aQ\xbf\xa4\xab"
640 b":\xd84w\x96RM\x8e\xf6\xa1g\xb5\xa4\x18%\xd9g\xe1D\xe5\x14\x05d%"
641 b"\x1c\xca\xcb\x83\xe6\xb4\x86\xf6\xb3\xca?yqP`&\xc0\xb8W\xf6\x89\x96(V"
642 b"\xde\xd4\x01\n\xbd\x0b\xe6!\xc3\xa3\x96\nT\xe7\x10\xc3u\xf2cu\xd7\x01A\x03"
643 b"\xa4\xb5C0\xc1\x98\xaf\x12a\x16\xd2'n\x11q_i8w\xfa\xd7\xef\t\xca\xdb\tJ\xe9"
644 b"\x1e\x1a\x15\x97"
646 generator = (
647 b"?\xb3,\x9bs\x13M\x0b.wPf`\xed\xbdHL\xa7\xb1\x8f!\xef T\x07\xf4y:"
648 b"\x1a\x0b\xa1%\x10\xdb\xc1Pw\xbeF?\xffO\xedJ\xac\x0b\xb5U\xbe:l\x1b\x0ckG\xb1"
649 b"\xbc7s\xbf~\x8cob\x90\x12(\xf8\xc2\x8c\xbb\x18\xa5Z\xe3\x13A\x00\ne"
650 b"\x01\x96\xf91\xc7zW\xf2\xdd\xf4c\xe5\xe9\xec\x14Kw}\xe6*\xaa\xb8\xa8b"
651 b"\x8a\xc3v\xd2\x82\xd6\xed8d\xe6y\x82B\x8e\xbc\x83\x1d\x144\x8fo/\x91\x93"
652 b"\xb5\x04Z\xf2vqd\xe1\xdf\xc9g\xc1\xfb?.U\xa4\xbd\x1b\xff\xe8;\x9c\x80"
653 b"\xd0R\xb9\x85\xd1\x82\xea\n\xdb*;s\x13\xd3\xfe\x14\xc8HK\x1e\x05%\x88\xb9"
654 b"\xb7\xd2\xbb\xd2\xdf\x01a\x99\xec\xd0n\x15W\xcd\t\x15\xb35;\xbbd\xe0\xec7"
655 b"\x7f\xd0(7\r\xf9+R\xc7\x89\x14(\xcd\xc6~\xb6\x18KR=\x1d\xb2F\xc3/c\x07\x84"
656 b"\x90\xf0\x0e\xf8\xd6G\xd1H\xd4yTQ^#'\xcf\xef\x98\xc5\x82fKL\x0fl\xc4\x16Y"
658 assert len(field_order) == len(generator)
660 key_length = len(field_order)
662 ffc_dh_parameters = gkdi.FfcDhParameters()
663 ffc_dh_parameters.field_order = list(field_order)
664 ffc_dh_parameters.generator = list(generator)
665 ffc_dh_parameters.key_length = key_length
666 ffc_dh_parameters = ndr_pack(ffc_dh_parameters)
668 root_key_dn = samdb.get_config_basedn()
669 root_key_dn.add_child(
670 "CN=Master Root Keys,CN=Group Key Distribution Service,CN=Services"
673 root_key_dn.add_child(f"CN={guid}")
675 # Avoid deleting root key objects without subsequently restarting the
676 # Microsoft Key Distribution Service. This service will keep its root
677 # key cached even after the corresponding AD object has been deleted,
678 # breaking later tests that try to look up the root key object.
680 details = {
681 "dn": root_key_dn,
682 "objectClass": "msKds-ProvRootKey",
683 "msKds-RootKeyData": data,
684 "msKds-CreateTime": str(create_time),
685 "msKds-UseStartTime": str(use_start_nt_time),
686 "msKds-DomainID": str(domain_dn),
687 "msKds-Version": "1", # comes from Server Configuration object.
688 "msKds-KDFAlgorithmID": (
689 "SP800_108_CTR_HMAC"
690 ), # comes from Server Configuration.
691 "msKds-SecretAgreementAlgorithmID": "DH", # comes from Server Configuration.
692 "msKds-SecretAgreementParam": (
693 ffc_dh_parameters
694 ), # comes from Server Configuration.
695 "msKds-PublicKeyLength": "2048", # comes from Server Configuration.
696 "msKds-PrivateKeyLength": (
697 "512"
698 ), # comes from Server Configuration. [MS-GKDI] claims this defaults to ‘256’.
700 if kdf_parameters is not None:
701 details["msKds-KDFParam"] = kdf_parameters # comes from Server Configuration.
703 samdb.add(details)
705 return (guid, root_key_dn)