samba-tool: Test gpo manage openssh list command
[Samba.git] / python / samba / tests / samba_tool / gpo.py
blob8c6f25929863aaca60e934e11b842d828fd45fd4
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett 2012
4 # based on time.py:
5 # Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 import os, pwd, grp
22 import ldb
23 import samba
24 from samba.tests.samba_tool.base import SambaToolCmdTest
25 import shutil
26 from samba.netcmd.gpo import get_gpo_dn, get_gpo_info
27 from samba.param import LoadParm
28 from samba.tests.gpo import stage_file, unstage_file
29 from samba.dcerpc import preg
30 from samba.ndr import ndr_pack, ndr_unpack
31 from samba.common import get_string
32 from configparser import ConfigParser
33 from io import StringIO
34 import xml.etree.ElementTree as etree
36 source_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../.."))
38 def has_difference(path1, path2, binary=True, xml=True, sortlines=False):
39 """Use this function to determine if the GPO backup differs from another.
41 xml=True checks whether any xml files are equal
42 binary=True checks whether any .SAMBABACKUP files are equal
43 """
44 if os.path.isfile(path1):
45 if sortlines:
46 file1 = open(path1).readlines()
47 file1.sort()
48 file2 = open(path1).readlines()
49 file2.sort()
50 if file1 != file2:
51 return path1
53 elif open(path1).read() != open(path2).read():
54 return path1
56 return None
58 l_dirs = [ path1 ]
59 r_dirs = [ path2 ]
60 while l_dirs:
61 l_dir = l_dirs.pop()
62 r_dir = r_dirs.pop()
64 dirlist = os.listdir(l_dir)
65 dirlist_other = os.listdir(r_dir)
67 dirlist.sort()
68 dirlist_other.sort()
69 if dirlist != dirlist_other:
70 return dirlist
72 for e in dirlist:
73 l_name = os.path.join(l_dir, e)
74 r_name = os.path.join(r_dir, e)
76 if os.path.isdir(l_name):
77 l_dirs.append(l_name)
78 r_dirs.append(r_name)
79 else:
80 if (l_name.endswith('.xml') and xml or
81 l_name.endswith('.SAMBABACKUP') and binary):
82 if open(l_name, "rb").read() != open(r_name, "rb").read():
83 return l_name
85 return None
88 class GpoCmdTestCase(SambaToolCmdTest):
89 """Tests for samba-tool time subcommands"""
91 gpo_name = "testgpo"
93 # This exists in the source tree to be restored
94 backup_gpo_guid = "{1E1DC8EA-390C-4800-B327-98B56A0AEA5D}"
96 def test_gpo_list(self):
97 """Run gpo list against the server and make sure it looks accurate"""
98 (result, out, err) = self.runsubcmd("gpo", "listall", "-H", "ldap://%s" % os.environ["SERVER"])
99 self.assertCmdSuccess(result, out, err, "Ensuring gpo listall ran successfully")
101 def test_fetchfail(self):
102 """Run against a non-existent GPO, and make sure it fails (this hard-coded UUID is very unlikely to exist"""
103 (result, out, err) = self.runsubcmd("gpo", "fetch", "c25cac17-a02a-4151-835d-fae17446ee43", "-H", "ldap://%s" % os.environ["SERVER"])
104 self.assertCmdFail(result, "check for result code")
106 def test_fetch(self):
107 """Run against a real GPO, and make sure it passes"""
108 (result, out, err) = self.runsubcmd("gpo", "fetch", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"], "--tmpdir", self.tempdir)
109 self.assertCmdSuccess(result, out, err, "Ensuring gpo fetched successfully")
110 shutil.rmtree(os.path.join(self.tempdir, "policy"))
112 def test_show(self):
113 """Show a real GPO, and make sure it passes"""
114 (result, out, err) = self.runsubcmd("gpo", "show", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"])
115 self.assertCmdSuccess(result, out, err, "Ensuring gpo fetched successfully")
117 def test_show_as_admin(self):
118 """Show a real GPO, and make sure it passes"""
119 (result, out, err) = self.runsubcmd("gpo", "show", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"], "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"]))
120 self.assertCmdSuccess(result, out, err, "Ensuring gpo fetched successfully")
122 def test_aclcheck(self):
123 """Check all the GPOs on the remote server have correct ACLs"""
124 (result, out, err) = self.runsubcmd("gpo", "aclcheck", "-H", "ldap://%s" % os.environ["SERVER"], "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"]))
125 self.assertCmdSuccess(result, out, err, "Ensuring gpo checked successfully")
127 def test_getlink_empty(self):
128 self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
129 "-U%s%%%s" % (os.environ["DC_USERNAME"],
130 os.environ["DC_PASSWORD"]))
132 container_dn = 'OU=gpo_test_link,%s' % self.samdb.get_default_basedn()
134 self.samdb.add({
135 'dn': container_dn,
136 'objectClass': 'organizationalUnit'
139 (result, out, err) = self.runsubcmd("gpo", "getlink", container_dn,
140 "-H", "ldap://%s" % os.environ["SERVER"],
141 "-U%s%%%s" % (os.environ["USERNAME"],
142 os.environ["PASSWORD"]))
143 self.assertCmdSuccess(result, out, err, "Ensuring gpo link fetched successfully")
145 # Microsoft appears to allow an empty space character after deletion of
146 # a GPO. We should be able to handle this.
147 m = ldb.Message()
148 m.dn = ldb.Dn(self.samdb, container_dn)
149 m['gPLink'] = ldb.MessageElement(' ', ldb.FLAG_MOD_REPLACE, 'gPLink')
150 self.samdb.modify(m)
152 (result, out, err) = self.runsubcmd("gpo", "getlink", container_dn,
153 "-H", "ldap://%s" % os.environ["SERVER"],
154 "-U%s%%%s" % (os.environ["USERNAME"],
155 os.environ["PASSWORD"]))
156 self.assertCmdSuccess(result, out, err, "Ensuring gpo link fetched successfully")
158 self.samdb.delete(container_dn)
160 def test_backup_restore_compare_binary(self):
161 """Restore from a static backup and compare the binary contents"""
163 static_path = os.path.join(self.backup_path, 'policy',
164 self.backup_gpo_guid)
166 temp_path = os.path.join(self.tempdir, 'temp')
167 os.mkdir(temp_path)
169 new_path = os.path.join(self.tempdir, 'new')
170 os.mkdir(new_path)
172 gpo_guid = None
173 try:
174 (result, out, err) = self.runsubcmd("gpo", "restore", "BACKUP_RESTORE1",
175 static_path,
176 "-H", "ldap://%s" %
177 os.environ["SERVER"], "--tmpdir",
178 temp_path, "--entities",
179 self.entity_file, "-U%s%%%s" %
180 (os.environ["USERNAME"],
181 os.environ["PASSWORD"]),
182 "--restore-metadata")
184 gpo_guid = "{%s}" % out.split("{")[1].split("}")[0]
186 (result, out, err) = self.runsubcmd("gpo", "backup", gpo_guid,
187 "-H", "ldap://%s" %
188 os.environ["SERVER"],
189 "--tmpdir", new_path)
191 self.assertCmdSuccess(result, out, err, "Ensuring gpo fetched successfully")
193 # Compare the directories
194 self.assertIsNone(has_difference(os.path.join(new_path, 'policy',
195 gpo_guid),
196 static_path, binary=True,
197 xml=False))
198 finally:
199 if gpo_guid:
200 (result, out, err) = self.runsubcmd("gpo", "del", gpo_guid,
201 "-H", "ldap://%s" %
202 os.environ["SERVER"],
203 "-U%s%%%s" %
204 (os.environ["USERNAME"],
205 os.environ["PASSWORD"]))
206 self.assertCmdSuccess(result, out, err, "Ensuring gpo deleted successfully")
208 shutil.rmtree(temp_path)
209 shutil.rmtree(new_path)
211 def test_backup_restore_no_entities_compare_binary(self):
212 """Restore from a static backup (and use no entity file, resulting in
213 copy-restore fallback), and compare the binary contents"""
215 static_path = os.path.join(self.backup_path, 'policy',
216 self.backup_gpo_guid)
218 temp_path = os.path.join(self.tempdir, 'temp')
219 os.mkdir(temp_path)
221 new_path = os.path.join(self.tempdir, 'new')
222 os.mkdir(new_path)
224 gpo_guid = None
225 gpo_guid1 = None
226 gpo_guid2 = None
227 try:
228 (result, out, err) = self.runsubcmd("gpo", "restore", "BACKUP_RESTORE1",
229 static_path,
230 "-H", "ldap://%s" %
231 os.environ["SERVER"], "--tmpdir",
232 temp_path, "--entities",
233 self.entity_file, "-U%s%%%s" %
234 (os.environ["USERNAME"],
235 os.environ["PASSWORD"]),
236 "--restore-metadata")
238 gpo_guid = "{%s}" % out.split("{")[1].split("}")[0]
239 gpo_guid1 = gpo_guid
241 # Do not output entities file
242 (result, out, err) = self.runsubcmd("gpo", "backup", gpo_guid,
243 "-H", "ldap://%s" %
244 os.environ["SERVER"],
245 "--tmpdir", new_path,
246 "--generalize")
248 self.assertCmdSuccess(result, out, err, "Ensuring gpo fetched successfully")
250 # Do not use an entities file
251 (result, out, err) = self.runsubcmd("gpo", "restore", "BACKUP_RESTORE2",
252 os.path.join(new_path, 'policy', gpo_guid1),
253 "-H", "ldap://%s" %
254 os.environ["SERVER"], "--tmpdir",
255 temp_path, "-U%s%%%s" %
256 (os.environ["USERNAME"],
257 os.environ["PASSWORD"]),
258 "--restore-metadata")
260 gpo_guid = "{%s}" % out.split("{")[1].split("}")[0]
261 gpo_guid2 = gpo_guid
263 self.assertCmdSuccess(result, out, err, "Ensuring gpo restored successfully")
265 (result, out, err) = self.runsubcmd("gpo", "backup", gpo_guid,
266 "-H", "ldap://%s" %
267 os.environ["SERVER"],
268 "--tmpdir", new_path)
270 # Compare the directories
271 self.assertIsNone(has_difference(os.path.join(new_path, 'policy',
272 gpo_guid1),
273 os.path.join(new_path, 'policy',
274 gpo_guid2),
275 binary=True, xml=False))
276 finally:
277 if gpo_guid1:
278 (result, out, err) = self.runsubcmd("gpo", "del", gpo_guid1,
279 "-H", "ldap://%s" %
280 os.environ["SERVER"],
281 "-U%s%%%s" %
282 (os.environ["USERNAME"],
283 os.environ["PASSWORD"]))
284 self.assertCmdSuccess(result, out, err, "Ensuring gpo deleted successfully")
286 if gpo_guid2:
287 (result, out, err) = self.runsubcmd("gpo", "del", gpo_guid2,
288 "-H", "ldap://%s" %
289 os.environ["SERVER"],
290 "-U%s%%%s" %
291 (os.environ["USERNAME"],
292 os.environ["PASSWORD"]))
293 self.assertCmdSuccess(result, out, err, "Ensuring gpo deleted successfully")
295 shutil.rmtree(temp_path)
296 shutil.rmtree(new_path)
298 def test_backup_restore_backup_compare_XML(self):
299 """Restore from a static backup and backup to compare XML"""
300 static_path = os.path.join(self.backup_path, 'policy',
301 self.backup_gpo_guid)
303 temp_path = os.path.join(self.tempdir, 'temp')
304 os.mkdir(temp_path)
306 new_path = os.path.join(self.tempdir, 'new')
307 os.mkdir(new_path)
309 gpo_guid = None
310 gpo_guid1 = None
311 gpo_guid2 = None
312 try:
313 (result, out, err) = self.runsubcmd("gpo", "restore", "BACKUP_RESTORE1",
314 static_path,
315 "-H", "ldap://%s" %
316 os.environ["SERVER"], "--tmpdir",
317 temp_path, "--entities",
318 self.entity_file, "-U%s%%%s" %
319 (os.environ["USERNAME"],
320 os.environ["PASSWORD"]),
321 "--restore-metadata")
323 gpo_guid = "{%s}" % out.split("{")[1].split("}")[0]
324 gpo_guid1 = gpo_guid
326 (result, out, err) = self.runsubcmd("gpo", "backup", gpo_guid,
327 "-H", "ldap://%s" %
328 os.environ["SERVER"],
329 "--tmpdir", new_path)
331 self.assertCmdSuccess(result, out, err, "Ensuring gpo fetched successfully")
333 (result, out, err) = self.runsubcmd("gpo", "restore", "BACKUP_RESTORE2",
334 os.path.join(new_path, 'policy', gpo_guid1),
335 "-H", "ldap://%s" %
336 os.environ["SERVER"], "--tmpdir",
337 temp_path, "--entities",
338 self.entity_file, "-U%s%%%s" %
339 (os.environ["USERNAME"],
340 os.environ["PASSWORD"]),
341 "--restore-metadata")
343 gpo_guid = "{%s}" % out.split("{")[1].split("}")[0]
344 gpo_guid2 = gpo_guid
346 self.assertCmdSuccess(result, out, err, "Ensuring gpo restored successfully")
348 (result, out, err) = self.runsubcmd("gpo", "backup", gpo_guid,
349 "-H", "ldap://%s" %
350 os.environ["SERVER"],
351 "--tmpdir", new_path)
353 # Compare the directories
354 self.assertIsNone(has_difference(os.path.join(new_path, 'policy',
355 gpo_guid1),
356 os.path.join(new_path, 'policy',
357 gpo_guid2),
358 binary=True, xml=True))
359 finally:
360 if gpo_guid1:
361 (result, out, err) = self.runsubcmd("gpo", "del", gpo_guid1,
362 "-H", "ldap://%s" %
363 os.environ["SERVER"],
364 "-U%s%%%s" %
365 (os.environ["USERNAME"],
366 os.environ["PASSWORD"]))
367 self.assertCmdSuccess(result, out, err, "Ensuring gpo deleted successfully")
369 if gpo_guid2:
370 (result, out, err) = self.runsubcmd("gpo", "del", gpo_guid2,
371 "-H", "ldap://%s" %
372 os.environ["SERVER"],
373 "-U%s%%%s" %
374 (os.environ["USERNAME"],
375 os.environ["PASSWORD"]))
376 self.assertCmdSuccess(result, out, err, "Ensuring gpo deleted successfully")
378 shutil.rmtree(temp_path)
379 shutil.rmtree(new_path)
381 def test_backup_restore_generalize(self):
382 """Restore from a static backup with different entities, generalize it
383 again, and compare the XML"""
384 static_path = os.path.join(self.backup_path, 'policy',
385 self.backup_gpo_guid)
387 temp_path = os.path.join(self.tempdir, 'temp')
388 os.mkdir(temp_path)
390 new_path = os.path.join(self.tempdir, 'new')
391 os.mkdir(new_path)
393 alt_entity_file = os.path.join(new_path, 'entities')
394 with open(alt_entity_file, 'wb') as f:
395 f.write(b'''<!ENTITY SAMBA__NETWORK_PATH__82419dafed126a07d6b96c66fc943735__ "\\\\samdom.example.com">
396 <!ENTITY SAMBA__NETWORK_PATH__0484cd41ded45a0728333a9c5e5ef619__ "\\\\samdom">
397 <!ENTITY SAMBA____SDDL_ACL____4ce8277be3f630300cbcf80a80e21cf4__ "D:PAR(A;CI;KA;;;BA)(A;CIIO;KA;;;CO)(A;CI;KA;;;SY)(A;CI;KR;;;S-1-16-0)">
398 <!ENTITY SAMBA____USER_ID_____d0970f5a1e19cb803f916c203d5c39c4__ "*S-1-5-113">
399 <!ENTITY SAMBA____USER_ID_____7b7bc2512ee1fedcd76bdc68926d4f7b__ "Administrator">
400 <!ENTITY SAMBA____USER_ID_____a3069f5a7a6530293ad8df6abd32af3d__ "Foobaz">
401 <!ENTITY SAMBA____USER_ID_____fdf60b2473b319c8c341de5f62479a7d__ "*S-1-5-32-545">
402 <!ENTITY SAMBA____USER_ID_____adb831a7fdd83dd1e2a309ce7591dff8__ "Guest">
403 <!ENTITY SAMBA____USER_ID_____9fa835214b4fc8b6102c991f7d97c2f8__ "*S-1-5-32-547">
404 <!ENTITY SAMBA____USER_ID_____bf8caafa94a19a6262bad2e8b6d4bce6__ "*S-1-5-32-546">
405 <!ENTITY SAMBA____USER_ID_____a45da96d0bf6575970f2d27af22be28a__ "System">
406 <!ENTITY SAMBA____USER_ID_____171d33a63ebd67f856552940ed491ad3__ "s-1-5-32-545">
407 <!ENTITY SAMBA____USER_ID_____7140932fff16ce85cc64d3caab588d0d__ "s-1-1-0">
408 ''')
410 gen_entity_file = os.path.join(temp_path, 'entities')
412 gpo_guid = None
413 try:
414 (result, out, err) = self.runsubcmd("gpo", "restore", "BACKUP_RESTORE1",
415 static_path,
416 "-H", "ldap://%s" %
417 os.environ["SERVER"], "--tmpdir",
418 temp_path, "--entities",
419 alt_entity_file, "-U%s%%%s" %
420 (os.environ["USERNAME"],
421 os.environ["PASSWORD"]),
422 "--restore-metadata")
424 self.assertCmdSuccess(result, out, err, "Ensuring gpo restored successfully")
426 gpo_guid = "{%s}" % out.split("{")[1].split("}")[0]
428 (result, out, err) = self.runsubcmd("gpo", "backup", gpo_guid,
429 "-H", "ldap://%s" %
430 os.environ["SERVER"],
431 "--tmpdir", new_path,
432 "--generalize", "--entities",
433 gen_entity_file)
435 self.assertCmdSuccess(result, out, err, "Ensuring gpo fetched successfully")
437 # Assert entity files are identical (except for line order)
438 self.assertIsNone(has_difference(alt_entity_file,
439 gen_entity_file,
440 sortlines=True))
442 # Compare the directories (XML)
443 self.assertIsNone(has_difference(os.path.join(new_path, 'policy',
444 gpo_guid),
445 static_path, binary=False,
446 xml=True))
447 finally:
448 if gpo_guid:
449 (result, out, err) = self.runsubcmd("gpo", "del", gpo_guid,
450 "-H", "ldap://%s" %
451 os.environ["SERVER"],
452 "-U%s%%%s" %
453 (os.environ["USERNAME"],
454 os.environ["PASSWORD"]))
455 self.assertCmdSuccess(result, out, err, "Ensuring gpo deleted successfully")
457 shutil.rmtree(temp_path)
458 shutil.rmtree(new_path)
460 def test_backup_with_extension_attributes(self):
461 self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
462 "-U%s%%%s" % (os.environ["DC_USERNAME"],
463 os.environ["DC_PASSWORD"]))
465 temp_path = os.path.join(self.tempdir, 'temp')
466 os.mkdir(temp_path)
468 extensions = {
469 # Taken from "source4/setup/provision_group_policy.ldif" on domain
470 'gPCMachineExtensionNames': '[{35378EAC-683F-11D2-A89A-00C04FBBCFA2}{53D6AB1B-2488-11D1-A28C-00C04FB94F17}][{827D319E-6EAC-11D2-A4EA-00C04F79F83A}{803E14A0-B4FB-11D0-A0D0-00A0C90F574B}][{B1BE8D72-6EAC-11D2-A4EA-00C04F79F83A}{53D6AB1B-2488-11D1-A28C-00C04FB94F17}]',
471 'gPCUserExtensionNames': '[{3060E8D0-7020-11D2-842D-00C04FA372D4}{3060E8CE-7020-11D2-842D-00C04FA372D4}][{35378EAC-683F-11D2-A89A-00C04FBBCFA2}{0F6B957E-509E-11D1-A7CC-0000F87571E3}]'
474 gpo_dn = get_gpo_dn(self.samdb, self.gpo_guid)
475 for ext in extensions:
476 data = extensions[ext]
478 m = ldb.Message()
479 m.dn = gpo_dn
480 m[ext] = ldb.MessageElement(data, ldb.FLAG_MOD_REPLACE, ext)
482 self.samdb.modify(m)
484 try:
485 (result, out, err) = self.runsubcmd("gpo", "backup", self.gpo_guid,
486 "-H", "ldap://%s" %
487 os.environ["SERVER"],
488 "--tmpdir", temp_path)
490 self.assertCmdSuccess(result, out, err, "Ensuring gpo fetched successfully")
492 guid = "{%s}" % out.split("{")[1].split("}")[0]
494 temp_path = os.path.join(temp_path, 'policy', guid)
496 (result, out, err) = self.runsubcmd("gpo", "restore", "RESTORE_EXT",
497 temp_path,
498 "-H", "ldap://%s" %
499 os.environ["SERVER"], "--tmpdir",
500 self.tempdir, "-U%s%%%s" %
501 (os.environ["USERNAME"],
502 os.environ["PASSWORD"]),
503 "--restore-metadata")
505 self.assertCmdSuccess(result, out, err, "Ensuring gpo restored successfully")
507 gpo_guid = "{%s}" % out.split("{")[1].split("}")[0]
509 msg = get_gpo_info(self.samdb, gpo_guid)
510 self.assertEqual(len(msg), 1)
512 for ext in extensions:
513 self.assertTrue(ext in msg[0])
514 self.assertEqual(extensions[ext], str(msg[0][ext][0]))
516 finally:
517 if gpo_guid:
518 (result, out, err) = self.runsubcmd("gpo", "del", gpo_guid,
519 "-H", "ldap://%s" %
520 os.environ["SERVER"],
521 "-U%s%%%s" %
522 (os.environ["USERNAME"],
523 os.environ["PASSWORD"]))
524 self.assertCmdSuccess(result, out, err, "Ensuring gpo deleted successfully")
526 shutil.rmtree(os.path.join(self.tempdir, "policy"))
527 shutil.rmtree(os.path.join(self.tempdir, 'temp'))
529 def test_admx_load(self):
530 lp = LoadParm()
531 lp.load(os.environ['SERVERCONFFILE'])
532 local_path = lp.get('path', 'sysvol')
533 admx_path = os.path.join(local_path, os.environ['REALM'].lower(),
534 'Policies', 'PolicyDefinitions')
535 (result, out, err) = self.runsubcmd("gpo", "admxload",
536 "-H", "ldap://%s" %
537 os.environ["SERVER"],
538 "--admx-dir=%s" %
539 os.path.join(source_path,
540 'libgpo/admx'),
541 "-U%s%%%s" %
542 (os.environ["USERNAME"],
543 os.environ["PASSWORD"]))
544 self.assertCmdSuccess(result, out, err,
545 'Filling PolicyDefinitions failed')
546 self.assertTrue(os.path.exists(admx_path),
547 'PolicyDefinitions was not created')
548 self.assertTrue(os.path.exists(os.path.join(admx_path, 'samba.admx')),
549 'Filling PolicyDefinitions failed')
550 shutil.rmtree(admx_path)
552 def test_smb_conf_set(self):
553 lp = LoadParm()
554 lp.load(os.environ['SERVERCONFFILE'])
555 local_path = lp.get('path', 'sysvol')
556 reg_pol = os.path.join(local_path, lp.get('realm').lower(), 'Policies',
557 self.gpo_guid, 'Machine/Registry.pol')
559 policy = 'apply group policies'
560 (result, out, err) = self.runsublevelcmd("gpo", ("manage", "smb_conf",
561 "set"), self.gpo_guid,
562 policy, "yes",
563 "-H", "ldap://%s" %
564 os.environ["SERVER"],
565 "-U%s%%%s" %
566 (os.environ["USERNAME"],
567 os.environ["PASSWORD"]))
568 self.assertCmdSuccess(result, out, err,
569 'Failed to set apply group policies')
571 self.assertTrue(os.path.exists(reg_pol),
572 'The Registry.pol does not exist')
573 reg_data = ndr_unpack(preg.file, open(reg_pol, 'rb').read())
574 ret = any([get_string(e.valuename) == policy and e.data == 1 \
575 for e in reg_data.entries])
576 self.assertTrue(ret, 'The sudoers entry was not added')
578 # Ensure an empty set command deletes the entry
579 (result, out, err) = self.runsublevelcmd("gpo", ("manage", "smb_conf",
580 "set"), self.gpo_guid,
581 policy, "-H", "ldap://%s" %
582 os.environ["SERVER"],
583 "-U%s%%%s" %
584 (os.environ["USERNAME"],
585 os.environ["PASSWORD"]))
586 self.assertCmdSuccess(result, out, err,
587 'Failed to unset apply group policies')
589 reg_data = ndr_unpack(preg.file, open(reg_pol, 'rb').read())
590 ret = not any([get_string(e.valuename) == policy and e.data == 1 \
591 for e in reg_data.entries])
592 self.assertTrue(ret, 'The sudoers entry was not removed')
594 def test_smb_conf_list(self):
595 lp = LoadParm()
596 lp.load(os.environ['SERVERCONFFILE'])
597 local_path = lp.get('path', 'sysvol')
598 reg_pol = os.path.join(local_path, lp.get('realm').lower(), 'Policies',
599 self.gpo_guid, 'Machine/Registry.pol')
601 # Stage the Registry.pol file with test data
602 stage = preg.file()
603 e = preg.entry()
604 e.keyname = b'Software\\Policies\\Samba\\smb_conf'
605 e.valuename = b'apply group policies'
606 e.type = 4
607 e.data = 1
608 stage.num_entries = 1
609 stage.entries = [e]
610 ret = stage_file(reg_pol, ndr_pack(stage))
611 self.assertTrue(ret, 'Could not create the target %s' % reg_pol)
613 (result, out, err) = self.runsublevelcmd("gpo", ("manage", "smb_conf",
614 "list"), self.gpo_guid,
615 "-H", "ldap://%s" %
616 os.environ["SERVER"],
617 "-U%s%%%s" %
618 (os.environ["USERNAME"],
619 os.environ["PASSWORD"]))
620 self.assertIn('%s = True' % e.valuename, out, 'The test entry was not found!')
622 # Unstage the Registry.pol file
623 unstage_file(reg_pol)
625 def test_security_set(self):
626 lp = LoadParm()
627 lp.load(os.environ['SERVERCONFFILE'])
628 local_path = lp.get('path', 'sysvol')
629 inf_pol = os.path.join(local_path, lp.get('realm').lower(), 'Policies',
630 self.gpo_guid, 'Machine/Microsoft/Windows NT/SecEdit/GptTmpl.inf')
632 (result, out, err) = self.runsublevelcmd("gpo", ("manage", "security",
633 "set"), self.gpo_guid,
634 'MaxTicketAge', '10',
635 "-H", "ldap://%s" %
636 os.environ["SERVER"],
637 "-U%s%%%s" %
638 (os.environ["USERNAME"],
639 os.environ["PASSWORD"]))
640 self.assertCmdSuccess(result, out, err,
641 'Failed to set MaxTicketAge')
642 self.assertTrue(os.path.exists(inf_pol),
643 '%s was not created' % inf_pol)
644 inf_pol_contents = open(inf_pol, 'r').read()
645 self.assertIn('MaxTicketAge = 10', inf_pol_contents,
646 'The test entry was not found!')
648 # Ensure an empty set command deletes the entry
649 (result, out, err) = self.runsublevelcmd("gpo", ("manage", "security",
650 "set"), self.gpo_guid,
651 'MaxTicketAge',
652 "-H", "ldap://%s" %
653 os.environ["SERVER"],
654 "-U%s%%%s" %
655 (os.environ["USERNAME"],
656 os.environ["PASSWORD"]))
657 self.assertCmdSuccess(result, out, err,
658 'Failed to unset MaxTicketAge')
659 inf_pol_contents = open(inf_pol, 'r').read()
660 self.assertNotIn('MaxTicketAge = 10', inf_pol_contents,
661 'The test entry was still found!')
663 def test_security_list(self):
664 (result, out, err) = self.runsublevelcmd("gpo", ("manage", "security",
665 "set"), self.gpo_guid,
666 'MaxTicketAge', '10',
667 "-H", "ldap://%s" %
668 os.environ["SERVER"],
669 "-U%s%%%s" %
670 (os.environ["USERNAME"],
671 os.environ["PASSWORD"]))
672 self.assertCmdSuccess(result, out, err,
673 'Failed to set MaxTicketAge')
675 (result, out, err) = self.runsublevelcmd("gpo", ("manage", "security",
676 "list"), self.gpo_guid,
677 "-H", "ldap://%s" %
678 os.environ["SERVER"],
679 "-U%s%%%s" %
680 (os.environ["USERNAME"],
681 os.environ["PASSWORD"]))
682 self.assertIn('MaxTicketAge = 10', out, 'The test entry was not found!')
684 (result, out, err) = self.runsublevelcmd("gpo", ("manage", "security",
685 "set"), self.gpo_guid,
686 'MaxTicketAge',
687 "-H", "ldap://%s" %
688 os.environ["SERVER"],
689 "-U%s%%%s" %
690 (os.environ["USERNAME"],
691 os.environ["PASSWORD"]))
692 self.assertCmdSuccess(result, out, err,
693 'Failed to unset MaxTicketAge')
695 def test_sudoers_remove(self):
696 lp = LoadParm()
697 lp.load(os.environ['SERVERCONFFILE'])
698 local_path = lp.get('path', 'sysvol')
699 reg_pol = os.path.join(local_path, lp.get('realm').lower(), 'Policies',
700 self.gpo_guid, 'Machine/Registry.pol')
702 # Stage the Registry.pol file with test data
703 stage = preg.file()
704 e = preg.entry()
705 e.keyname = b'Software\\Policies\\Samba\\Unix Settings\\Sudo Rights'
706 e.valuename = b'Software\\Policies\\Samba\\Unix Settings'
707 e.type = 1
708 e.data = b'fakeu ALL=(ALL) NOPASSWD: ALL'
709 stage.num_entries = 1
710 stage.entries = [e]
711 ret = stage_file(reg_pol, ndr_pack(stage))
712 self.assertTrue(ret, 'Could not create the target %s' % reg_pol)
714 (result, out, err) = self.runsublevelcmd("gpo", ("manage", "sudoers",
715 "remove"), self.gpo_guid,
716 get_string(e.data),
717 "-H", "ldap://%s" %
718 os.environ["SERVER"],
719 "-U%s%%%s" %
720 (os.environ["USERNAME"],
721 os.environ["PASSWORD"]))
722 self.assertCmdSuccess(result, out, err, 'Sudoers remove failed')
724 def test_sudoers_add(self):
725 lp = LoadParm()
726 lp.load(os.environ['SERVERCONFFILE'])
727 local_path = lp.get('path', 'sysvol')
728 reg_pol = os.path.join(local_path, lp.get('realm').lower(), 'Policies',
729 self.gpo_guid, 'Machine/Registry.pol')
731 entry = 'fakeu ALL=(ALL) NOPASSWD: ALL'
732 (result, out, err) = self.runsublevelcmd("gpo", ("manage", "sudoers",
733 "add"), self.gpo_guid, entry,
734 "-H", "ldap://%s" %
735 os.environ["SERVER"],
736 "-U%s%%%s" %
737 (os.environ["USERNAME"],
738 os.environ["PASSWORD"]))
739 self.assertCmdSuccess(result, out, err, 'Sudoers add failed')
741 self.assertTrue(os.path.exists(reg_pol),
742 'The Registry.pol does not exist')
743 reg_data = ndr_unpack(preg.file, open(reg_pol, 'rb').read())
744 self.assertTrue(any([get_string(e.data) == entry for e in reg_data.entries]),
745 'The sudoers entry was not added')
747 def test_sudoers_list(self):
748 lp = LoadParm()
749 lp.load(os.environ['SERVERCONFFILE'])
750 local_path = lp.get('path', 'sysvol')
751 reg_pol = os.path.join(local_path, lp.get('realm').lower(), 'Policies',
752 self.gpo_guid, 'Machine/Registry.pol')
754 # Stage the Registry.pol file with test data
755 stage = preg.file()
756 e = preg.entry()
757 e.keyname = b'Software\\Policies\\Samba\\Unix Settings\\Sudo Rights'
758 e.valuename = b'Software\\Policies\\Samba\\Unix Settings'
759 e.type = 1
760 e.data = b'fakeu ALL=(ALL) NOPASSWD: ALL'
761 stage.num_entries = 1
762 stage.entries = [e]
763 ret = stage_file(reg_pol, ndr_pack(stage))
764 self.assertTrue(ret, 'Could not create the target %s' % reg_pol)
766 (result, out, err) = self.runsublevelcmd("gpo", ("manage", "sudoers",
767 "list"), self.gpo_guid,
768 "-H", "ldap://%s" %
769 os.environ["SERVER"],
770 "-U%s%%%s" %
771 (os.environ["USERNAME"],
772 os.environ["PASSWORD"]))
773 self.assertIn(e.data, out, 'The test entry was not found!')
775 # Unstage the Registry.pol file
776 unstage_file(reg_pol)
778 def test_symlink_list(self):
779 lp = LoadParm()
780 lp.load(os.environ['SERVERCONFFILE'])
781 local_path = lp.get('path', 'sysvol')
782 vgp_xml = os.path.join(local_path, lp.get('realm').lower(), 'Policies',
783 self.gpo_guid, 'Machine/VGP/VTLA/Unix',
784 'Symlink/manifest.xml')
785 stage = etree.Element('vgppolicy')
786 policysetting = etree.SubElement(stage, 'policysetting')
787 pv = etree.SubElement(policysetting, 'version')
788 pv.text = '1'
789 name = etree.SubElement(policysetting, 'name')
790 name.text = 'Symlink Policy'
791 description = etree.SubElement(policysetting, 'description')
792 description.text = 'Specifies symbolic link data'
793 apply_mode = etree.SubElement(policysetting, 'apply_mode')
794 apply_mode.text = 'merge'
795 data = etree.SubElement(policysetting, 'data')
796 file_properties = etree.SubElement(data, 'file_properties')
797 source = etree.SubElement(file_properties, 'source')
798 source.text = os.path.join(self.tempdir, 'test.source')
799 target = etree.SubElement(file_properties, 'target')
800 target.text = os.path.join(self.tempdir, 'test.target')
801 ret = stage_file(vgp_xml, etree.tostring(stage, 'utf-8'))
802 self.assertTrue(ret, 'Could not create the target %s' % vgp_xml)
804 symlink = 'ln -s %s %s' % (source.text, target.text)
805 (result, out, err) = self.runsublevelcmd("gpo", ("manage",
806 "symlink", "list"),
807 self.gpo_guid, "-H",
808 "ldap://%s" %
809 os.environ["SERVER"],
810 "-U%s%%%s" %
811 (os.environ["USERNAME"],
812 os.environ["PASSWORD"]))
813 self.assertIn(symlink, out, 'The test entry was not found!')
815 # Unstage the manifest.xml file
816 unstage_file(vgp_xml)
818 def test_symlink_add(self):
819 source_text = os.path.join(self.tempdir, 'test.source')
820 target_text = os.path.join(self.tempdir, 'test.target')
821 symlink = 'ln -s %s %s' % (source_text, target_text)
822 (result, out, err) = self.runsublevelcmd("gpo", ("manage",
823 "symlink", "add"),
824 self.gpo_guid,
825 source_text, target_text,
826 "-H", "ldap://%s" %
827 os.environ["SERVER"],
828 "-U%s%%%s" %
829 (os.environ["USERNAME"],
830 os.environ["PASSWORD"]))
831 self.assertCmdSuccess(result, out, err, 'Symlink add failed')
833 (result, out, err) = self.runsublevelcmd("gpo", ("manage",
834 "symlink", "list"),
835 self.gpo_guid, "-H",
836 "ldap://%s" %
837 os.environ["SERVER"],
838 "-U%s%%%s" %
839 (os.environ["USERNAME"],
840 os.environ["PASSWORD"]))
841 self.assertIn(symlink, out, 'The test entry was not found!')
843 (result, out, err) = self.runsublevelcmd("gpo", ("manage",
844 "symlink", "remove"),
845 self.gpo_guid,
846 source_text, target_text,
847 "-H", "ldap://%s" %
848 os.environ["SERVER"],
849 "-U%s%%%s" %
850 (os.environ["USERNAME"],
851 os.environ["PASSWORD"]))
852 self.assertCmdSuccess(result, out, err, 'Symlink remove failed')
854 (result, out, err) = self.runsublevelcmd("gpo", ("manage",
855 "symlink", "list"),
856 self.gpo_guid, "-H",
857 "ldap://%s" %
858 os.environ["SERVER"],
859 "-U%s%%%s" %
860 (os.environ["USERNAME"],
861 os.environ["PASSWORD"]))
862 self.assertNotIn(symlink, out, 'The test entry was not removed!')
864 def test_files_list(self):
865 lp = LoadParm()
866 lp.load(os.environ['SERVERCONFFILE'])
867 local_path = lp.get('path', 'sysvol')
868 vgp_xml = os.path.join(local_path, lp.get('realm').lower(), 'Policies',
869 self.gpo_guid, 'Machine/VGP/VTLA/Unix',
870 'Files/manifest.xml')
871 source_file = os.path.join(local_path, lp.get('realm').lower(),
872 'Policies', self.gpo_guid, 'Machine/VGP',
873 'VTLA/Unix/Files/test.source')
874 stage = etree.Element('vgppolicy')
875 policysetting = etree.SubElement(stage, 'policysetting')
876 pv = etree.SubElement(policysetting, 'version')
877 pv.text = '1'
878 name = etree.SubElement(policysetting, 'name')
879 name.text = 'Files'
880 description = etree.SubElement(policysetting, 'description')
881 description.text = 'Represents file data to set/copy on clients'
882 data = etree.SubElement(policysetting, 'data')
883 file_properties = etree.SubElement(data, 'file_properties')
884 source = etree.SubElement(file_properties, 'source')
885 source.text = source_file
886 target = etree.SubElement(file_properties, 'target')
887 target.text = os.path.join(self.tempdir, 'test.target')
888 user = etree.SubElement(file_properties, 'user')
889 user.text = pwd.getpwuid(os.getuid()).pw_name
890 group = etree.SubElement(file_properties, 'group')
891 group.text = grp.getgrgid(os.getgid()).gr_name
893 # Request permissions of 755
894 permissions = etree.SubElement(file_properties, 'permissions')
895 permissions.set('type', 'user')
896 etree.SubElement(permissions, 'read')
897 etree.SubElement(permissions, 'write')
898 etree.SubElement(permissions, 'execute')
899 permissions = etree.SubElement(file_properties, 'permissions')
900 permissions.set('type', 'group')
901 etree.SubElement(permissions, 'read')
902 etree.SubElement(permissions, 'execute')
903 permissions = etree.SubElement(file_properties, 'permissions')
904 permissions.set('type', 'other')
905 etree.SubElement(permissions, 'read')
906 etree.SubElement(permissions, 'execute')
908 ret = stage_file(vgp_xml, etree.tostring(stage, 'utf-8'))
909 self.assertTrue(ret, 'Could not create the target %s' % vgp_xml)
911 (result, out, err) = self.runsublevelcmd("gpo", ("manage",
912 "files", "list"),
913 self.gpo_guid, "-H",
914 "ldap://%s" %
915 os.environ["SERVER"],
916 "-U%s%%%s" %
917 (os.environ["USERNAME"],
918 os.environ["PASSWORD"]))
919 self.assertIn(target.text, out, 'The test entry was not found!')
920 self.assertIn('-rwxr-xr-x', out,
921 'The test entry permissions were not found')
923 # Unstage the manifest.xml file
924 unstage_file(vgp_xml)
926 def test_files_add(self):
927 lp = LoadParm()
928 lp.load(os.environ['SERVERCONFFILE'])
929 local_path = lp.get('path', 'sysvol')
930 sysvol_source = os.path.join(local_path, lp.get('realm').lower(),
931 'Policies', self.gpo_guid, 'Machine/VGP',
932 'VTLA/Unix/Files/test.source')
933 source_file = os.path.join(self.tempdir, 'test.source')
934 source_data = '#!/bin/sh\necho hello world'
935 with open(source_file, 'w') as w:
936 w.write(source_data)
937 target_file = os.path.join(self.tempdir, 'test.target')
938 user = pwd.getpwuid(os.getuid()).pw_name
939 group = grp.getgrgid(os.getgid()).gr_name
940 (result, out, err) = self.runsublevelcmd("gpo", ("manage",
941 "files", "add"),
942 self.gpo_guid,
943 source_file,
944 target_file,
945 user, group,
946 '755', "-H",
947 "ldap://%s" %
948 os.environ["SERVER"],
949 "-U%s%%%s" %
950 (os.environ["USERNAME"],
951 os.environ["PASSWORD"]))
952 self.assertCmdSuccess(result, out, err, 'File add failed')
953 self.assertIn(source_data, open(sysvol_source, 'r').read(),
954 'Failed to find the source file on the sysvol')
956 (result, out, err) = self.runsublevelcmd("gpo", ("manage",
957 "files", "list"),
958 self.gpo_guid, "-H",
959 "ldap://%s" %
960 os.environ["SERVER"],
961 "-U%s%%%s" %
962 (os.environ["USERNAME"],
963 os.environ["PASSWORD"]))
964 self.assertIn(target_file, out, 'The test entry was not found!')
965 self.assertIn('-rwxr-xr-x', out,
966 'The test entry permissions were not found')
968 os.unlink(source_file)
970 (result, out, err) = self.runsublevelcmd("gpo", ("manage",
971 "files", "remove"),
972 self.gpo_guid,
973 target_file, "-H",
974 "ldap://%s" %
975 os.environ["SERVER"],
976 "-U%s%%%s" %
977 (os.environ["USERNAME"],
978 os.environ["PASSWORD"]))
979 self.assertCmdSuccess(result, out, err, 'File remove failed')
981 (result, out, err) = self.runsublevelcmd("gpo", ("manage",
982 "files", "list"),
983 self.gpo_guid, "-H",
984 "ldap://%s" %
985 os.environ["SERVER"],
986 "-U%s%%%s" %
987 (os.environ["USERNAME"],
988 os.environ["PASSWORD"]))
989 self.assertNotIn(target_file, out, 'The test entry was still found!')
991 def test_vgp_openssh_list(self):
992 lp = LoadParm()
993 lp.load(os.environ['SERVERCONFFILE'])
994 local_path = lp.get('path', 'sysvol')
995 vgp_xml = os.path.join(local_path, lp.get('realm').lower(), 'Policies',
996 self.gpo_guid, 'Machine/VGP/VTLA/SshCfg',
997 'SshD/manifest.xml')
999 stage = etree.Element('vgppolicy')
1000 policysetting = etree.SubElement(stage, 'policysetting')
1001 pv = etree.SubElement(policysetting, 'version')
1002 pv.text = '1'
1003 name = etree.SubElement(policysetting, 'name')
1004 name.text = 'Configuration File'
1005 description = etree.SubElement(policysetting, 'description')
1006 description.text = 'Represents Unix configuration file settings'
1007 apply_mode = etree.SubElement(policysetting, 'apply_mode')
1008 apply_mode.text = 'merge'
1009 data = etree.SubElement(policysetting, 'data')
1010 configfile = etree.SubElement(data, 'configfile')
1011 etree.SubElement(configfile, 'filename')
1012 configsection = etree.SubElement(configfile, 'configsection')
1013 etree.SubElement(configsection, 'sectionname')
1014 opt = etree.SubElement(configsection, 'keyvaluepair')
1015 key = etree.SubElement(opt, 'key')
1016 key.text = 'KerberosAuthentication'
1017 value = etree.SubElement(opt, 'value')
1018 value.text = 'Yes'
1019 ret = stage_file(vgp_xml, etree.tostring(stage, 'utf-8'))
1020 self.assertTrue(ret, 'Could not create the target %s' % vgp_xml)
1022 openssh = 'KerberosAuthentication Yes'
1023 (result, out, err) = self.runsublevelcmd("gpo", ("manage",
1024 "openssh", "list"),
1025 self.gpo_guid, "-H",
1026 "ldap://%s" %
1027 os.environ["SERVER"],
1028 "-U%s%%%s" %
1029 (os.environ["USERNAME"],
1030 os.environ["PASSWORD"]))
1031 self.assertIn(openssh, out, 'The test entry was not found!')
1033 # Unstage the manifest.xml file
1034 unstage_file(vgp_xml)
1036 def setUp(self):
1037 """set up a temporary GPO to work with"""
1038 super(GpoCmdTestCase, self).setUp()
1039 (result, out, err) = self.runsubcmd("gpo", "create", self.gpo_name,
1040 "-H", "ldap://%s" % os.environ["SERVER"],
1041 "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"]),
1042 "--tmpdir", self.tempdir)
1043 self.assertCmdSuccess(result, out, err, "Ensuring gpo created successfully")
1044 shutil.rmtree(os.path.join(self.tempdir, "policy"))
1045 try:
1046 self.gpo_guid = "{%s}" % out.split("{")[1].split("}")[0]
1047 except IndexError:
1048 self.fail("Failed to find GUID in output: %s" % out)
1050 self.backup_path = os.path.join(samba.source_tree_topdir(), 'source4',
1051 'selftest', 'provisions',
1052 'generalized-gpo-backup')
1054 self.entity_file = os.path.join(self.backup_path, 'entities')
1056 def tearDown(self):
1057 """remove the temporary GPO to work with"""
1058 (result, out, err) = self.runsubcmd("gpo", "del", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"], "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"]))
1059 self.assertCmdSuccess(result, out, err, "Ensuring gpo deleted successfully")
1060 super(GpoCmdTestCase, self).tearDown()