python:tests: Store keys as bytes rather than as tuples
[Samba.git] / python / samba / tests / dckeytab.py
bloba4ae38a8c0bc786cb69f45381966c40140f02388
1 # Tests for source4/libnet/py_net_dckeytab.c
3 # Copyright (C) David Mulder <dmulder@suse.com> 2018
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import os
20 import subprocess
21 from samba.net import Net
22 from samba import enable_net_export_keytab
24 from samba import credentials, dsdb, ntstatus, NTSTATUSError
25 from samba.dcerpc import krb5ccache, security
26 from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT
27 from samba.ndr import ndr_unpack, ndr_pack
28 from samba.param import LoadParm
29 from samba.samdb import SamDB
30 from samba.tests import TestCaseInTempDir, delete_force
32 from ldb import SCOPE_BASE
34 enable_net_export_keytab()
37 class DCKeytabTests(TestCaseInTempDir):
38 def setUp(self):
39 super().setUp()
40 self.lp = LoadParm()
41 self.lp.load_default()
42 self.creds = self.insta_creds(template=self.get_credentials())
43 self.samdb = SamDB(url=f"ldap://{os.environ.get('SERVER')}",
44 credentials=self.creds,
45 lp=self.lp)
47 self.ktfile = os.path.join(self.tempdir, 'test.keytab')
48 self.principal = self.creds.get_principal()
50 def tearDown(self):
51 super().tearDown()
53 def keytab_as_set(self, keytab_bytes):
54 def entry_to_tuple(entry):
55 principal = '/'.join(entry.principal.components) + f"@{entry.principal.realm}"
56 enctype = entry.enctype
57 kvno = entry.key_version
58 key = bytes(entry.key.data)
59 return (principal, enctype, kvno, key)
61 keytab = ndr_unpack(krb5ccache.KEYTAB, keytab_bytes)
62 entry = keytab.entry
64 keytab_as_set = set()
66 entry_as_tuple = entry_to_tuple(entry)
67 keytab_as_set.add(entry_as_tuple)
69 keytab_bytes = keytab.further_entry
70 while True:
71 multiple_entry = ndr_unpack(krb5ccache.MULTIPLE_KEYTAB_ENTRIES, keytab_bytes)
72 entry = multiple_entry.entry
73 entry_as_tuple = entry_to_tuple(entry)
74 self.assertNotIn(entry_as_tuple, keytab_as_set)
75 keytab_as_set.add(entry_as_tuple)
77 keytab_bytes = multiple_entry.further_entry
78 if not keytab_bytes:
79 break
81 return keytab_as_set
83 def test_export_keytab(self):
84 net = Net(None, self.lp)
85 self.addCleanup(self.rm_files, self.ktfile)
86 net.export_keytab(keytab=self.ktfile, principal=self.principal)
87 self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
89 # Parse the first entry in the keytab
90 with open(self.ktfile, 'rb') as bytes_kt:
91 keytab_bytes = bytes_kt.read()
93 # confirm only this principal was exported
94 for entry in self.keytab_as_set(keytab_bytes):
95 (principal, enctype, kvno, key) = entry
96 self.assertEqual(principal, self.principal)
98 def test_export_keytab_all(self):
99 net = Net(None, self.lp)
100 self.addCleanup(self.rm_files, self.ktfile)
101 net.export_keytab(keytab=self.ktfile)
102 self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
104 with open(self.ktfile, 'rb') as bytes_kt:
105 keytab_bytes = bytes_kt.read()
107 # Parse the keytab
108 keytab_as_set = self.keytab_as_set(keytab_bytes)
110 # confirm many principals were exported
111 self.assertGreater(len(keytab_as_set), 10)
113 def test_export_keytab_all_keep_stale(self):
114 net = Net(None, self.lp)
115 self.addCleanup(self.rm_files, self.ktfile)
116 net.export_keytab(keytab=self.ktfile)
118 new_principal=f"keytab_testuser@{self.creds.get_realm()}"
119 self.samdb.newuser("keytab_testuser", "4rfvBGT%")
120 self.addCleanup(self.samdb.deleteuser, "keytab_testuser")
122 net.export_keytab(keytab=self.ktfile, keep_stale_entries=True)
124 self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
126 with open(self.ktfile, 'rb') as bytes_kt:
127 keytab_bytes = bytes_kt.read()
129 # confirm many principals were exported
130 # self.keytab_as_set() will also check we only got it
131 # each entry once
132 keytab_as_set = self.keytab_as_set(keytab_bytes)
134 self.assertGreater(len(keytab_as_set), 10)
136 # Look for the new principal, showing this was updated
137 found = False
138 for entry in keytab_as_set:
139 (principal, enctype, kvno, key) = entry
140 if principal == new_principal:
141 found = True
143 self.assertTrue(found)
145 def test_export_keytab_nochange_update(self):
146 new_principal=f"keytab_testuser@{self.creds.get_realm()}"
147 self.samdb.newuser("keytab_testuser", "4rfvBGT%")
148 self.addCleanup(self.samdb.deleteuser, "keytab_testuser")
150 net = Net(None, self.lp)
151 self.addCleanup(self.rm_files, self.ktfile)
152 net.export_keytab(keytab=self.ktfile, principal=new_principal)
153 self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
155 cmd = ['klist', '-K', '-C', '-t', '-k', self.ktfile]
156 keytab_orig_content = subprocess.Popen(
157 cmd,
158 shell=False,
159 stdout=subprocess.PIPE,
160 stderr=subprocess.STDOUT,
161 ).communicate()[0]
163 with open(self.ktfile, 'rb') as bytes_kt:
164 keytab_orig_bytes = bytes_kt.read()
166 net.export_keytab(keytab=self.ktfile, principal=new_principal)
167 self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
169 keytab_content = subprocess.Popen(
170 cmd,
171 shell=False,
172 stdout=subprocess.PIPE,
173 stderr=subprocess.STDOUT,
174 ).communicate()[0]
176 self.assertEqual(keytab_orig_content, keytab_content)
178 # Parse the first entry in the keytab
179 with open(self.ktfile, 'rb') as bytes_kt:
180 keytab_bytes = bytes_kt.read()
182 self.assertEqual(keytab_orig_bytes, keytab_bytes)
184 # confirm only this principal was exported.
185 # self.keytab_as_set() will also check we only got it
186 # once
187 for entry in self.keytab_as_set(keytab_bytes):
188 (principal, enctype, kvno, key) = entry
189 self.assertEqual(principal, new_principal)
191 def test_export_keytab_change_update(self):
192 new_principal=f"keytab_testuser@{self.creds.get_realm()}"
193 self.samdb.newuser("keytab_testuser", "4rfvBGT%")
194 self.addCleanup(self.samdb.deleteuser, "keytab_testuser")
196 net = Net(None, self.lp)
197 self.addCleanup(self.rm_files, self.ktfile)
198 net.export_keytab(keytab=self.ktfile, principal=new_principal)
199 self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
201 # Parse the first entry in the keytab
202 with open(self.ktfile, 'rb') as bytes_kt:
203 keytab_orig_bytes = bytes_kt.read()
205 self.samdb.setpassword(f"(userPrincipalName={new_principal})", "5rfvBGT%")
207 net.export_keytab(keytab=self.ktfile, principal=new_principal)
209 with open(self.ktfile, 'rb') as bytes_kt:
210 keytab_change_bytes = bytes_kt.read()
212 self.assertNotEqual(keytab_orig_bytes, keytab_change_bytes)
214 # We can't parse it as the parser is simple and doesn't
215 # understand holes in the file.
217 def test_export_keytab_change2_update(self):
218 new_principal=f"keytab_testuser@{self.creds.get_realm()}"
219 self.samdb.newuser("keytab_testuser", "4rfvBGT%")
220 self.addCleanup(self.samdb.deleteuser, "keytab_testuser")
222 net = Net(None, self.lp)
223 self.addCleanup(self.rm_files, self.ktfile)
224 net.export_keytab(keytab=self.ktfile, principal=new_principal)
225 self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
227 # Parse the first entry in the keytab
228 with open(self.ktfile, 'rb') as bytes_kt:
229 keytab_orig_bytes = bytes_kt.read()
231 # intended to trigger the pruning code for old keys
232 self.samdb.setpassword(f"(userPrincipalName={new_principal})", "5rfvBGT%")
233 self.samdb.setpassword(f"(userPrincipalName={new_principal})", "6rfvBGT%")
235 net.export_keytab(keytab=self.ktfile, principal=new_principal)
237 with open(self.ktfile, 'rb') as bytes_kt:
238 keytab_change_bytes = bytes_kt.read()
240 self.assertNotEqual(keytab_orig_bytes, keytab_change_bytes)
242 # We can't parse it as the parser is simple and doesn't
243 # understand holes in the file.
245 def test_export_keytab_change3_update_keep(self):
246 new_principal=f"keytab_testuser@{self.creds.get_realm()}"
247 self.samdb.newuser("keytab_testuser", "4rfvBGT%")
248 self.addCleanup(self.samdb.deleteuser, "keytab_testuser")
249 net = Net(None, self.lp)
250 self.addCleanup(self.rm_files, self.ktfile)
251 net.export_keytab(keytab=self.ktfile, principal=new_principal)
252 self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
254 # Parse the first entry in the keytab
255 with open(self.ktfile, 'rb') as bytes_kt:
256 keytab_orig_bytes = bytes_kt.read()
258 # By changing the password three times, we allow Samba to fill
259 # out current, old, older from supplementalCredentials and
260 # still have one password that must still be from the original
261 # keytab
262 self.samdb.setpassword(f"(userPrincipalName={new_principal})", "5rfvBGT%")
263 self.samdb.setpassword(f"(userPrincipalName={new_principal})", "6rfvBGT%")
264 self.samdb.setpassword(f"(userPrincipalName={new_principal})", "6rfvBGT%")
266 net.export_keytab(keytab=self.ktfile, principal=new_principal, keep_stale_entries=True)
268 with open(self.ktfile, 'rb') as bytes_kt:
269 keytab_change_bytes = bytes_kt.read()
271 self.assertNotEqual(keytab_orig_bytes, keytab_change_bytes)
273 # self.keytab_as_set() will also check we got each entry
274 # exactly once
275 keytab_as_set = self.keytab_as_set(keytab_change_bytes)
277 # Look for the new principal, showing this was updated but the old kept
278 found = 0
279 for entry in keytab_as_set:
280 (principal, enctype, kvno, key) = entry
281 if principal == new_principal and enctype == credentials.ENCTYPE_AES128_CTS_HMAC_SHA1_96:
282 found += 1
284 # Samba currently does not export the previous keys into the keytab, but could.
285 self.assertEqual(found, 4)
287 # confirm at least 12 keys (4 changes, 1 in orig export and 3
288 # history in 2nd export, 3 enctypes) were exported
289 self.assertGreaterEqual(len(keytab_as_set), 12)
291 def test_export_keytab_change2_export2_update_keep(self):
292 new_principal=f"keytab_testuser@{self.creds.get_realm()}"
293 self.samdb.newuser("keytab_testuser", "4rfvBGT%")
294 self.addCleanup(self.samdb.deleteuser, "keytab_testuser")
295 net = Net(None, self.lp)
296 self.addCleanup(self.rm_files, self.ktfile)
297 net.export_keytab(keytab=self.ktfile, principal=new_principal)
298 self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
300 # Parse the first entry in the keytab
301 with open(self.ktfile, 'rb') as bytes_kt:
302 keytab_orig_bytes = bytes_kt.read()
304 self.samdb.setpassword(f"(userPrincipalName={new_principal})", "5rfvBGT%")
306 net.export_keytab(keytab=self.ktfile, principal=new_principal, keep_stale_entries=True)
308 self.samdb.setpassword(f"(userPrincipalName={new_principal})", "6rfvBGT%")
310 net.export_keytab(keytab=self.ktfile, principal=new_principal, keep_stale_entries=True)
312 with open(self.ktfile, 'rb') as bytes_kt:
313 keytab_change_bytes = bytes_kt.read()
315 self.assertNotEqual(keytab_orig_bytes, keytab_change_bytes)
317 # self.keytab_as_set() will also check we got each entry
318 # exactly once
319 keytab_as_set = self.keytab_as_set(keytab_change_bytes)
321 # Look for the new principal, showing this was updated but the old kept
322 found = 0
323 for entry in keytab_as_set:
324 (principal, enctype, kvno, key) = entry
325 if principal == new_principal and enctype == credentials.ENCTYPE_AES128_CTS_HMAC_SHA1_96:
326 found += 1
328 # This covers the simple case, one export per password change
329 self.assertEqual(found, 3)
331 # confirm at least 9 keys (3 exports, 3 enctypes) were exported
332 self.assertGreaterEqual(len(keytab_as_set), 9)
334 def test_export_keytab_not_a_dir(self):
335 net = Net(None, self.lp)
336 with open(self.ktfile, mode='w') as f:
337 f.write("NOT A KEYTAB")
338 self.addCleanup(self.rm_files, self.ktfile)
340 try:
341 net.export_keytab(keytab=self.ktfile + "/f")
342 self.fail("Expected failure to write to an existing file")
343 except NTSTATUSError as err:
344 num, _ = err.args
345 self.assertEqual(num, ntstatus.NT_STATUS_NOT_A_DIRECTORY)
347 def test_export_keytab_existing(self):
348 net = Net(None, self.lp)
349 with open(self.ktfile, mode='w') as f:
350 f.write("NOT A KEYTAB")
351 self.addCleanup(self.rm_files, self.ktfile)
353 try:
354 net.export_keytab(keytab=self.ktfile)
355 self.fail(f"Expected failure to write to an existing file {self.ktfile}")
356 except NTSTATUSError as err:
357 num, _ = err.args
358 self.assertEqual(num, ntstatus.NT_STATUS_OBJECT_NAME_EXISTS)
360 def test_export_keytab_gmsa(self):
362 # Create gMSA account
363 gmsa_username = "GMSA_K5KeytabTest$"
364 gmsa_principal = f"{gmsa_username}@{self.samdb.domain_dns_name().upper()}"
365 gmsa_base_dn = self.samdb.get_wellknown_dn(
366 self.samdb.get_default_basedn(),
367 dsdb.DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER,
369 gmsa_user_dn = f"CN={gmsa_username},{gmsa_base_dn}"
371 msg = self.samdb.search(base="", scope=SCOPE_BASE, attrs=["tokenGroups"])[0]
372 connecting_user_sid = str(ndr_unpack(security.dom_sid, msg["tokenGroups"][0]))
374 domain_sid = security.dom_sid(self.samdb.get_domain_sid())
375 allow_sddl = f"O:SYD:(A;;RP;;;{connecting_user_sid})"
376 allow_sd = ndr_pack(security.descriptor.from_sddl(allow_sddl, domain_sid))
378 details = {
379 "dn": str(gmsa_user_dn),
380 "objectClass": "msDS-GroupManagedServiceAccount",
381 "msDS-ManagedPasswordInterval": "1",
382 "msDS-GroupMSAMembership": allow_sd,
383 "sAMAccountName": gmsa_username,
384 "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT),
387 delete_force(self.samdb, gmsa_user_dn)
388 self.samdb.add(details)
389 self.addCleanup(delete_force, self.samdb, gmsa_user_dn)
391 # Export keytab of gMSA account remotely
392 net = Net(None, self.lp)
393 try:
394 net.export_keytab(samdb=self.samdb, keytab=self.ktfile, principal=gmsa_principal)
395 except RuntimeError as e:
396 self.fail(e)
398 self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
400 # Parse the first entry in the keytab
401 with open(self.ktfile, 'rb') as bytes_kt:
402 keytab_bytes = bytes_kt.read()
404 remote_keytab = ndr_unpack(krb5ccache.KEYTAB, keytab_bytes)
406 self.rm_files('test.keytab')
408 # Export keytab of gMSA account locally
409 try:
410 net.export_keytab(keytab=self.ktfile, principal=gmsa_principal)
411 except RuntimeError as e:
412 self.fail(e)
414 self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
416 # Parse the first entry in the keytab
417 with open(self.ktfile, 'rb') as bytes_kt:
418 keytab_bytes = bytes_kt.read()
420 self.rm_files('test.keytab')
422 local_keytab = ndr_unpack(krb5ccache.KEYTAB, keytab_bytes)
424 # Confirm that the principal is as expected
426 principal_parts = gmsa_principal.split('@')
428 self.assertEqual(local_keytab.entry.principal.component_count, 1)
429 self.assertEqual(local_keytab.entry.principal.realm, principal_parts[1])
430 self.assertEqual(local_keytab.entry.principal.components[0], principal_parts[0])
432 self.assertEqual(remote_keytab.entry.principal.component_count, 1)
433 self.assertEqual(remote_keytab.entry.principal.realm, principal_parts[1])
434 self.assertEqual(remote_keytab.entry.principal.components[0], principal_parts[0])
436 # Put all keys from each into a dictionary, and confirm all remote keys are in local keytab
438 remote_keys = {}
440 while True:
441 remote_keys[remote_keytab.entry.enctype] = remote_keytab.entry.key.data
442 keytab_bytes = remote_keytab.further_entry
443 if not keytab_bytes:
444 break
446 remote_keytab = ndr_unpack(krb5ccache.MULTIPLE_KEYTAB_ENTRIES, keytab_bytes)
448 local_keys = {}
450 while True:
451 local_keys[local_keytab.entry.enctype] = local_keytab.entry.key.data
452 keytab_bytes = local_keytab.further_entry
453 if not keytab_bytes:
454 break
455 local_keytab = ndr_unpack(krb5ccache.MULTIPLE_KEYTAB_ENTRIES, keytab_bytes)
457 # Check that the gMSA keys are in the local keys
458 remote_enctypes = set(remote_keys.keys())
460 # Check that at least the AES keys were generated
461 self.assertLessEqual({credentials.ENCTYPE_AES256_CTS_HMAC_SHA1_96,
462 credentials.ENCTYPE_AES128_CTS_HMAC_SHA1_96},
463 remote_enctypes)
465 local_enctypes = set(local_keys.keys())
467 self.assertLessEqual(remote_enctypes, local_enctypes)
469 common_enctypes = remote_enctypes & local_enctypes
471 for enctype in common_enctypes:
472 self.assertEqual(remote_keys[enctype], local_keys[enctype])