1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett 2012
5 # Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 from samba
.tests
.samba_tool
.base
import SambaToolCmdTest
26 from samba
.netcmd
.gpo
import get_gpo_dn
, get_gpo_info
27 from samba
.param
import LoadParm
28 from samba
.tests
.gpo
import stage_file
, unstage_file
29 from samba
.dcerpc
import preg
30 from samba
.ndr
import ndr_pack
, ndr_unpack
31 from samba
.common
import get_string
32 from configparser
import ConfigParser
33 from io
import StringIO
34 import xml
.etree
.ElementTree
as etree
36 source_path
= os
.path
.abspath(os
.path
.join(os
.path
.dirname(__file__
), "../../../../.."))
38 def has_difference(path1
, path2
, binary
=True, xml
=True, sortlines
=False):
39 """Use this function to determine if the GPO backup differs from another.
41 xml=True checks whether any xml files are equal
42 binary=True checks whether any .SAMBABACKUP files are equal
44 if os
.path
.isfile(path1
):
46 file1
= open(path1
).readlines()
48 file2
= open(path1
).readlines()
53 elif open(path1
).read() != open(path2
).read():
64 dirlist
= os
.listdir(l_dir
)
65 dirlist_other
= os
.listdir(r_dir
)
69 if dirlist
!= dirlist_other
:
73 l_name
= os
.path
.join(l_dir
, e
)
74 r_name
= os
.path
.join(r_dir
, e
)
76 if os
.path
.isdir(l_name
):
80 if (l_name
.endswith('.xml') and xml
or
81 l_name
.endswith('.SAMBABACKUP') and binary
):
82 if open(l_name
, "rb").read() != open(r_name
, "rb").read():
88 class GpoCmdTestCase(SambaToolCmdTest
):
89 """Tests for samba-tool time subcommands"""
93 # This exists in the source tree to be restored
94 backup_gpo_guid
= "{1E1DC8EA-390C-4800-B327-98B56A0AEA5D}"
96 def test_gpo_list(self
):
97 """Run gpo list against the server and make sure it looks accurate"""
98 (result
, out
, err
) = self
.runsubcmd("gpo", "listall", "-H", "ldap://%s" % os
.environ
["SERVER"])
99 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo listall ran successfully")
101 def test_fetchfail(self
):
102 """Run against a non-existent GPO, and make sure it fails (this hard-coded UUID is very unlikely to exist"""
103 (result
, out
, err
) = self
.runsubcmd("gpo", "fetch", "c25cac17-a02a-4151-835d-fae17446ee43", "-H", "ldap://%s" % os
.environ
["SERVER"])
104 self
.assertCmdFail(result
, "check for result code")
106 def test_fetch(self
):
107 """Run against a real GPO, and make sure it passes"""
108 (result
, out
, err
) = self
.runsubcmd("gpo", "fetch", self
.gpo_guid
, "-H", "ldap://%s" % os
.environ
["SERVER"], "--tmpdir", self
.tempdir
)
109 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo fetched successfully")
110 shutil
.rmtree(os
.path
.join(self
.tempdir
, "policy"))
113 """Show a real GPO, and make sure it passes"""
114 (result
, out
, err
) = self
.runsubcmd("gpo", "show", self
.gpo_guid
, "-H", "ldap://%s" % os
.environ
["SERVER"])
115 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo fetched successfully")
117 def test_show_as_admin(self
):
118 """Show a real GPO, and make sure it passes"""
119 (result
, out
, err
) = self
.runsubcmd("gpo", "show", self
.gpo_guid
, "-H", "ldap://%s" % os
.environ
["SERVER"], "-U%s%%%s" % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"]))
120 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo fetched successfully")
122 def test_aclcheck(self
):
123 """Check all the GPOs on the remote server have correct ACLs"""
124 (result
, out
, err
) = self
.runsubcmd("gpo", "aclcheck", "-H", "ldap://%s" % os
.environ
["SERVER"], "-U%s%%%s" % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"]))
125 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo checked successfully")
127 def test_getlink_empty(self
):
128 self
.samdb
= self
.getSamDB("-H", "ldap://%s" % os
.environ
["DC_SERVER"],
129 "-U%s%%%s" % (os
.environ
["DC_USERNAME"],
130 os
.environ
["DC_PASSWORD"]))
132 container_dn
= 'OU=gpo_test_link,%s' % self
.samdb
.get_default_basedn()
136 'objectClass': 'organizationalUnit'
139 (result
, out
, err
) = self
.runsubcmd("gpo", "getlink", container_dn
,
140 "-H", "ldap://%s" % os
.environ
["SERVER"],
141 "-U%s%%%s" % (os
.environ
["USERNAME"],
142 os
.environ
["PASSWORD"]))
143 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo link fetched successfully")
145 # Microsoft appears to allow an empty space character after deletion of
146 # a GPO. We should be able to handle this.
148 m
.dn
= ldb
.Dn(self
.samdb
, container_dn
)
149 m
['gPLink'] = ldb
.MessageElement(' ', ldb
.FLAG_MOD_REPLACE
, 'gPLink')
152 (result
, out
, err
) = self
.runsubcmd("gpo", "getlink", container_dn
,
153 "-H", "ldap://%s" % os
.environ
["SERVER"],
154 "-U%s%%%s" % (os
.environ
["USERNAME"],
155 os
.environ
["PASSWORD"]))
156 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo link fetched successfully")
158 self
.samdb
.delete(container_dn
)
160 def test_backup_restore_compare_binary(self
):
161 """Restore from a static backup and compare the binary contents"""
163 static_path
= os
.path
.join(self
.backup_path
, 'policy',
164 self
.backup_gpo_guid
)
166 temp_path
= os
.path
.join(self
.tempdir
, 'temp')
169 new_path
= os
.path
.join(self
.tempdir
, 'new')
174 (result
, out
, err
) = self
.runsubcmd("gpo", "restore", "BACKUP_RESTORE1",
177 os
.environ
["SERVER"], "--tmpdir",
178 temp_path
, "--entities",
179 self
.entity_file
, "-U%s%%%s" %
180 (os
.environ
["USERNAME"],
181 os
.environ
["PASSWORD"]),
182 "--restore-metadata")
184 gpo_guid
= "{%s}" % out
.split("{")[1].split("}")[0]
186 (result
, out
, err
) = self
.runsubcmd("gpo", "backup", gpo_guid
,
188 os
.environ
["SERVER"],
189 "--tmpdir", new_path
)
191 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo fetched successfully")
193 # Compare the directories
194 self
.assertIsNone(has_difference(os
.path
.join(new_path
, 'policy',
196 static_path
, binary
=True,
200 (result
, out
, err
) = self
.runsubcmd("gpo", "del", gpo_guid
,
202 os
.environ
["SERVER"],
204 (os
.environ
["USERNAME"],
205 os
.environ
["PASSWORD"]))
206 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo deleted successfully")
208 shutil
.rmtree(temp_path
)
209 shutil
.rmtree(new_path
)
211 def test_backup_restore_no_entities_compare_binary(self
):
212 """Restore from a static backup (and use no entity file, resulting in
213 copy-restore fallback), and compare the binary contents"""
215 static_path
= os
.path
.join(self
.backup_path
, 'policy',
216 self
.backup_gpo_guid
)
218 temp_path
= os
.path
.join(self
.tempdir
, 'temp')
221 new_path
= os
.path
.join(self
.tempdir
, 'new')
228 (result
, out
, err
) = self
.runsubcmd("gpo", "restore", "BACKUP_RESTORE1",
231 os
.environ
["SERVER"], "--tmpdir",
232 temp_path
, "--entities",
233 self
.entity_file
, "-U%s%%%s" %
234 (os
.environ
["USERNAME"],
235 os
.environ
["PASSWORD"]),
236 "--restore-metadata")
238 gpo_guid
= "{%s}" % out
.split("{")[1].split("}")[0]
241 # Do not output entities file
242 (result
, out
, err
) = self
.runsubcmd("gpo", "backup", gpo_guid
,
244 os
.environ
["SERVER"],
245 "--tmpdir", new_path
,
248 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo fetched successfully")
250 # Do not use an entities file
251 (result
, out
, err
) = self
.runsubcmd("gpo", "restore", "BACKUP_RESTORE2",
252 os
.path
.join(new_path
, 'policy', gpo_guid1
),
254 os
.environ
["SERVER"], "--tmpdir",
255 temp_path
, "-U%s%%%s" %
256 (os
.environ
["USERNAME"],
257 os
.environ
["PASSWORD"]),
258 "--restore-metadata")
260 gpo_guid
= "{%s}" % out
.split("{")[1].split("}")[0]
263 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo restored successfully")
265 (result
, out
, err
) = self
.runsubcmd("gpo", "backup", gpo_guid
,
267 os
.environ
["SERVER"],
268 "--tmpdir", new_path
)
270 # Compare the directories
271 self
.assertIsNone(has_difference(os
.path
.join(new_path
, 'policy',
273 os
.path
.join(new_path
, 'policy',
275 binary
=True, xml
=False))
278 (result
, out
, err
) = self
.runsubcmd("gpo", "del", gpo_guid1
,
280 os
.environ
["SERVER"],
282 (os
.environ
["USERNAME"],
283 os
.environ
["PASSWORD"]))
284 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo deleted successfully")
287 (result
, out
, err
) = self
.runsubcmd("gpo", "del", gpo_guid2
,
289 os
.environ
["SERVER"],
291 (os
.environ
["USERNAME"],
292 os
.environ
["PASSWORD"]))
293 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo deleted successfully")
295 shutil
.rmtree(temp_path
)
296 shutil
.rmtree(new_path
)
298 def test_backup_restore_backup_compare_XML(self
):
299 """Restore from a static backup and backup to compare XML"""
300 static_path
= os
.path
.join(self
.backup_path
, 'policy',
301 self
.backup_gpo_guid
)
303 temp_path
= os
.path
.join(self
.tempdir
, 'temp')
306 new_path
= os
.path
.join(self
.tempdir
, 'new')
313 (result
, out
, err
) = self
.runsubcmd("gpo", "restore", "BACKUP_RESTORE1",
316 os
.environ
["SERVER"], "--tmpdir",
317 temp_path
, "--entities",
318 self
.entity_file
, "-U%s%%%s" %
319 (os
.environ
["USERNAME"],
320 os
.environ
["PASSWORD"]),
321 "--restore-metadata")
323 gpo_guid
= "{%s}" % out
.split("{")[1].split("}")[0]
326 (result
, out
, err
) = self
.runsubcmd("gpo", "backup", gpo_guid
,
328 os
.environ
["SERVER"],
329 "--tmpdir", new_path
)
331 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo fetched successfully")
333 (result
, out
, err
) = self
.runsubcmd("gpo", "restore", "BACKUP_RESTORE2",
334 os
.path
.join(new_path
, 'policy', gpo_guid1
),
336 os
.environ
["SERVER"], "--tmpdir",
337 temp_path
, "--entities",
338 self
.entity_file
, "-U%s%%%s" %
339 (os
.environ
["USERNAME"],
340 os
.environ
["PASSWORD"]),
341 "--restore-metadata")
343 gpo_guid
= "{%s}" % out
.split("{")[1].split("}")[0]
346 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo restored successfully")
348 (result
, out
, err
) = self
.runsubcmd("gpo", "backup", gpo_guid
,
350 os
.environ
["SERVER"],
351 "--tmpdir", new_path
)
353 # Compare the directories
354 self
.assertIsNone(has_difference(os
.path
.join(new_path
, 'policy',
356 os
.path
.join(new_path
, 'policy',
358 binary
=True, xml
=True))
361 (result
, out
, err
) = self
.runsubcmd("gpo", "del", gpo_guid1
,
363 os
.environ
["SERVER"],
365 (os
.environ
["USERNAME"],
366 os
.environ
["PASSWORD"]))
367 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo deleted successfully")
370 (result
, out
, err
) = self
.runsubcmd("gpo", "del", gpo_guid2
,
372 os
.environ
["SERVER"],
374 (os
.environ
["USERNAME"],
375 os
.environ
["PASSWORD"]))
376 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo deleted successfully")
378 shutil
.rmtree(temp_path
)
379 shutil
.rmtree(new_path
)
381 def test_backup_restore_generalize(self
):
382 """Restore from a static backup with different entities, generalize it
383 again, and compare the XML"""
384 static_path
= os
.path
.join(self
.backup_path
, 'policy',
385 self
.backup_gpo_guid
)
387 temp_path
= os
.path
.join(self
.tempdir
, 'temp')
390 new_path
= os
.path
.join(self
.tempdir
, 'new')
393 alt_entity_file
= os
.path
.join(new_path
, 'entities')
394 with
open(alt_entity_file
, 'wb') as f
:
395 f
.write(b
'''<!ENTITY SAMBA__NETWORK_PATH__82419dafed126a07d6b96c66fc943735__ "\\\\samdom.example.com">
396 <!ENTITY SAMBA__NETWORK_PATH__0484cd41ded45a0728333a9c5e5ef619__ "\\\\samdom">
397 <!ENTITY SAMBA____SDDL_ACL____4ce8277be3f630300cbcf80a80e21cf4__ "D:PAR(A;CI;KA;;;BA)(A;CIIO;KA;;;CO)(A;CI;KA;;;SY)(A;CI;KR;;;S-1-16-0)">
398 <!ENTITY SAMBA____USER_ID_____d0970f5a1e19cb803f916c203d5c39c4__ "*S-1-5-113">
399 <!ENTITY SAMBA____USER_ID_____7b7bc2512ee1fedcd76bdc68926d4f7b__ "Administrator">
400 <!ENTITY SAMBA____USER_ID_____a3069f5a7a6530293ad8df6abd32af3d__ "Foobaz">
401 <!ENTITY SAMBA____USER_ID_____fdf60b2473b319c8c341de5f62479a7d__ "*S-1-5-32-545">
402 <!ENTITY SAMBA____USER_ID_____adb831a7fdd83dd1e2a309ce7591dff8__ "Guest">
403 <!ENTITY SAMBA____USER_ID_____9fa835214b4fc8b6102c991f7d97c2f8__ "*S-1-5-32-547">
404 <!ENTITY SAMBA____USER_ID_____bf8caafa94a19a6262bad2e8b6d4bce6__ "*S-1-5-32-546">
405 <!ENTITY SAMBA____USER_ID_____a45da96d0bf6575970f2d27af22be28a__ "System">
406 <!ENTITY SAMBA____USER_ID_____171d33a63ebd67f856552940ed491ad3__ "s-1-5-32-545">
407 <!ENTITY SAMBA____USER_ID_____7140932fff16ce85cc64d3caab588d0d__ "s-1-1-0">
410 gen_entity_file
= os
.path
.join(temp_path
, 'entities')
414 (result
, out
, err
) = self
.runsubcmd("gpo", "restore", "BACKUP_RESTORE1",
417 os
.environ
["SERVER"], "--tmpdir",
418 temp_path
, "--entities",
419 alt_entity_file
, "-U%s%%%s" %
420 (os
.environ
["USERNAME"],
421 os
.environ
["PASSWORD"]),
422 "--restore-metadata")
424 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo restored successfully")
426 gpo_guid
= "{%s}" % out
.split("{")[1].split("}")[0]
428 (result
, out
, err
) = self
.runsubcmd("gpo", "backup", gpo_guid
,
430 os
.environ
["SERVER"],
431 "--tmpdir", new_path
,
432 "--generalize", "--entities",
435 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo fetched successfully")
437 # Assert entity files are identical (except for line order)
438 self
.assertIsNone(has_difference(alt_entity_file
,
442 # Compare the directories (XML)
443 self
.assertIsNone(has_difference(os
.path
.join(new_path
, 'policy',
445 static_path
, binary
=False,
449 (result
, out
, err
) = self
.runsubcmd("gpo", "del", gpo_guid
,
451 os
.environ
["SERVER"],
453 (os
.environ
["USERNAME"],
454 os
.environ
["PASSWORD"]))
455 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo deleted successfully")
457 shutil
.rmtree(temp_path
)
458 shutil
.rmtree(new_path
)
460 def test_backup_with_extension_attributes(self
):
461 self
.samdb
= self
.getSamDB("-H", "ldap://%s" % os
.environ
["DC_SERVER"],
462 "-U%s%%%s" % (os
.environ
["DC_USERNAME"],
463 os
.environ
["DC_PASSWORD"]))
465 temp_path
= os
.path
.join(self
.tempdir
, 'temp')
469 # Taken from "source4/setup/provision_group_policy.ldif" on domain
470 'gPCMachineExtensionNames': '[{35378EAC-683F-11D2-A89A-00C04FBBCFA2}{53D6AB1B-2488-11D1-A28C-00C04FB94F17}][{827D319E-6EAC-11D2-A4EA-00C04F79F83A}{803E14A0-B4FB-11D0-A0D0-00A0C90F574B}][{B1BE8D72-6EAC-11D2-A4EA-00C04F79F83A}{53D6AB1B-2488-11D1-A28C-00C04FB94F17}]',
471 'gPCUserExtensionNames': '[{3060E8D0-7020-11D2-842D-00C04FA372D4}{3060E8CE-7020-11D2-842D-00C04FA372D4}][{35378EAC-683F-11D2-A89A-00C04FBBCFA2}{0F6B957E-509E-11D1-A7CC-0000F87571E3}]'
474 gpo_dn
= get_gpo_dn(self
.samdb
, self
.gpo_guid
)
475 for ext
in extensions
:
476 data
= extensions
[ext
]
480 m
[ext
] = ldb
.MessageElement(data
, ldb
.FLAG_MOD_REPLACE
, ext
)
485 (result
, out
, err
) = self
.runsubcmd("gpo", "backup", self
.gpo_guid
,
487 os
.environ
["SERVER"],
488 "--tmpdir", temp_path
)
490 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo fetched successfully")
492 guid
= "{%s}" % out
.split("{")[1].split("}")[0]
494 temp_path
= os
.path
.join(temp_path
, 'policy', guid
)
496 (result
, out
, err
) = self
.runsubcmd("gpo", "restore", "RESTORE_EXT",
499 os
.environ
["SERVER"], "--tmpdir",
500 self
.tempdir
, "-U%s%%%s" %
501 (os
.environ
["USERNAME"],
502 os
.environ
["PASSWORD"]),
503 "--restore-metadata")
505 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo restored successfully")
507 gpo_guid
= "{%s}" % out
.split("{")[1].split("}")[0]
509 msg
= get_gpo_info(self
.samdb
, gpo_guid
)
510 self
.assertEqual(len(msg
), 1)
512 for ext
in extensions
:
513 self
.assertTrue(ext
in msg
[0])
514 self
.assertEqual(extensions
[ext
], str(msg
[0][ext
][0]))
518 (result
, out
, err
) = self
.runsubcmd("gpo", "del", gpo_guid
,
520 os
.environ
["SERVER"],
522 (os
.environ
["USERNAME"],
523 os
.environ
["PASSWORD"]))
524 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo deleted successfully")
526 shutil
.rmtree(os
.path
.join(self
.tempdir
, "policy"))
527 shutil
.rmtree(os
.path
.join(self
.tempdir
, 'temp'))
529 def test_admx_load(self
):
531 lp
.load(os
.environ
['SERVERCONFFILE'])
532 local_path
= lp
.get('path', 'sysvol')
533 admx_path
= os
.path
.join(local_path
, os
.environ
['REALM'].lower(),
534 'Policies', 'PolicyDefinitions')
535 (result
, out
, err
) = self
.runsubcmd("gpo", "admxload",
537 os
.environ
["SERVER"],
539 os
.path
.join(source_path
,
542 (os
.environ
["USERNAME"],
543 os
.environ
["PASSWORD"]))
544 self
.assertCmdSuccess(result
, out
, err
,
545 'Filling PolicyDefinitions failed')
546 self
.assertTrue(os
.path
.exists(admx_path
),
547 'PolicyDefinitions was not created')
548 self
.assertTrue(os
.path
.exists(os
.path
.join(admx_path
, 'samba.admx')),
549 'Filling PolicyDefinitions failed')
550 shutil
.rmtree(admx_path
)
552 def test_smb_conf_set(self
):
554 lp
.load(os
.environ
['SERVERCONFFILE'])
555 local_path
= lp
.get('path', 'sysvol')
556 reg_pol
= os
.path
.join(local_path
, lp
.get('realm').lower(), 'Policies',
557 self
.gpo_guid
, 'Machine/Registry.pol')
559 policy
= 'apply group policies'
560 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage", "smb_conf",
561 "set"), self
.gpo_guid
,
564 os
.environ
["SERVER"],
566 (os
.environ
["USERNAME"],
567 os
.environ
["PASSWORD"]))
568 self
.assertCmdSuccess(result
, out
, err
,
569 'Failed to set apply group policies')
571 self
.assertTrue(os
.path
.exists(reg_pol
),
572 'The Registry.pol does not exist')
573 reg_data
= ndr_unpack(preg
.file, open(reg_pol
, 'rb').read())
574 ret
= any([get_string(e
.valuename
) == policy
and e
.data
== 1 \
575 for e
in reg_data
.entries
])
576 self
.assertTrue(ret
, 'The sudoers entry was not added')
578 # Ensure an empty set command deletes the entry
579 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage", "smb_conf",
580 "set"), self
.gpo_guid
,
581 policy
, "-H", "ldap://%s" %
582 os
.environ
["SERVER"],
584 (os
.environ
["USERNAME"],
585 os
.environ
["PASSWORD"]))
586 self
.assertCmdSuccess(result
, out
, err
,
587 'Failed to unset apply group policies')
589 reg_data
= ndr_unpack(preg
.file, open(reg_pol
, 'rb').read())
590 ret
= not any([get_string(e
.valuename
) == policy
and e
.data
== 1 \
591 for e
in reg_data
.entries
])
592 self
.assertTrue(ret
, 'The sudoers entry was not removed')
594 def test_smb_conf_list(self
):
596 lp
.load(os
.environ
['SERVERCONFFILE'])
597 local_path
= lp
.get('path', 'sysvol')
598 reg_pol
= os
.path
.join(local_path
, lp
.get('realm').lower(), 'Policies',
599 self
.gpo_guid
, 'Machine/Registry.pol')
601 # Stage the Registry.pol file with test data
604 e
.keyname
= b
'Software\\Policies\\Samba\\smb_conf'
605 e
.valuename
= b
'apply group policies'
608 stage
.num_entries
= 1
610 ret
= stage_file(reg_pol
, ndr_pack(stage
))
611 self
.assertTrue(ret
, 'Could not create the target %s' % reg_pol
)
613 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage", "smb_conf",
614 "list"), self
.gpo_guid
,
616 os
.environ
["SERVER"],
618 (os
.environ
["USERNAME"],
619 os
.environ
["PASSWORD"]))
620 self
.assertIn('%s = True' % e
.valuename
, out
, 'The test entry was not found!')
622 # Unstage the Registry.pol file
623 unstage_file(reg_pol
)
625 def test_security_set(self
):
627 lp
.load(os
.environ
['SERVERCONFFILE'])
628 local_path
= lp
.get('path', 'sysvol')
629 inf_pol
= os
.path
.join(local_path
, lp
.get('realm').lower(), 'Policies',
630 self
.gpo_guid
, 'Machine/Microsoft/Windows NT/SecEdit/GptTmpl.inf')
632 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage", "security",
633 "set"), self
.gpo_guid
,
634 'MaxTicketAge', '10',
636 os
.environ
["SERVER"],
638 (os
.environ
["USERNAME"],
639 os
.environ
["PASSWORD"]))
640 self
.assertCmdSuccess(result
, out
, err
,
641 'Failed to set MaxTicketAge')
642 self
.assertTrue(os
.path
.exists(inf_pol
),
643 '%s was not created' % inf_pol
)
644 inf_pol_contents
= open(inf_pol
, 'r').read()
645 self
.assertIn('MaxTicketAge = 10', inf_pol_contents
,
646 'The test entry was not found!')
648 # Ensure an empty set command deletes the entry
649 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage", "security",
650 "set"), self
.gpo_guid
,
653 os
.environ
["SERVER"],
655 (os
.environ
["USERNAME"],
656 os
.environ
["PASSWORD"]))
657 self
.assertCmdSuccess(result
, out
, err
,
658 'Failed to unset MaxTicketAge')
659 inf_pol_contents
= open(inf_pol
, 'r').read()
660 self
.assertNotIn('MaxTicketAge = 10', inf_pol_contents
,
661 'The test entry was still found!')
663 def test_security_list(self
):
664 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage", "security",
665 "set"), self
.gpo_guid
,
666 'MaxTicketAge', '10',
668 os
.environ
["SERVER"],
670 (os
.environ
["USERNAME"],
671 os
.environ
["PASSWORD"]))
672 self
.assertCmdSuccess(result
, out
, err
,
673 'Failed to set MaxTicketAge')
675 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage", "security",
676 "list"), self
.gpo_guid
,
678 os
.environ
["SERVER"],
680 (os
.environ
["USERNAME"],
681 os
.environ
["PASSWORD"]))
682 self
.assertIn('MaxTicketAge = 10', out
, 'The test entry was not found!')
684 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage", "security",
685 "set"), self
.gpo_guid
,
688 os
.environ
["SERVER"],
690 (os
.environ
["USERNAME"],
691 os
.environ
["PASSWORD"]))
692 self
.assertCmdSuccess(result
, out
, err
,
693 'Failed to unset MaxTicketAge')
695 def test_sudoers_remove(self
):
697 lp
.load(os
.environ
['SERVERCONFFILE'])
698 local_path
= lp
.get('path', 'sysvol')
699 reg_pol
= os
.path
.join(local_path
, lp
.get('realm').lower(), 'Policies',
700 self
.gpo_guid
, 'Machine/Registry.pol')
702 # Stage the Registry.pol file with test data
705 e
.keyname
= b
'Software\\Policies\\Samba\\Unix Settings\\Sudo Rights'
706 e
.valuename
= b
'Software\\Policies\\Samba\\Unix Settings'
708 e
.data
= b
'fakeu ALL=(ALL) NOPASSWD: ALL'
709 stage
.num_entries
= 1
711 ret
= stage_file(reg_pol
, ndr_pack(stage
))
712 self
.assertTrue(ret
, 'Could not create the target %s' % reg_pol
)
714 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage", "sudoers",
715 "remove"), self
.gpo_guid
,
718 os
.environ
["SERVER"],
720 (os
.environ
["USERNAME"],
721 os
.environ
["PASSWORD"]))
722 self
.assertCmdSuccess(result
, out
, err
, 'Sudoers remove failed')
724 def test_sudoers_add(self
):
726 lp
.load(os
.environ
['SERVERCONFFILE'])
727 local_path
= lp
.get('path', 'sysvol')
728 reg_pol
= os
.path
.join(local_path
, lp
.get('realm').lower(), 'Policies',
729 self
.gpo_guid
, 'Machine/Registry.pol')
731 entry
= 'fakeu ALL=(ALL) NOPASSWD: ALL'
732 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage", "sudoers",
733 "add"), self
.gpo_guid
, entry
,
735 os
.environ
["SERVER"],
737 (os
.environ
["USERNAME"],
738 os
.environ
["PASSWORD"]))
739 self
.assertCmdSuccess(result
, out
, err
, 'Sudoers add failed')
741 self
.assertTrue(os
.path
.exists(reg_pol
),
742 'The Registry.pol does not exist')
743 reg_data
= ndr_unpack(preg
.file, open(reg_pol
, 'rb').read())
744 self
.assertTrue(any([get_string(e
.data
) == entry
for e
in reg_data
.entries
]),
745 'The sudoers entry was not added')
747 def test_sudoers_list(self
):
749 lp
.load(os
.environ
['SERVERCONFFILE'])
750 local_path
= lp
.get('path', 'sysvol')
751 reg_pol
= os
.path
.join(local_path
, lp
.get('realm').lower(), 'Policies',
752 self
.gpo_guid
, 'Machine/Registry.pol')
754 # Stage the Registry.pol file with test data
757 e
.keyname
= b
'Software\\Policies\\Samba\\Unix Settings\\Sudo Rights'
758 e
.valuename
= b
'Software\\Policies\\Samba\\Unix Settings'
760 e
.data
= b
'fakeu ALL=(ALL) NOPASSWD: ALL'
761 stage
.num_entries
= 1
763 ret
= stage_file(reg_pol
, ndr_pack(stage
))
764 self
.assertTrue(ret
, 'Could not create the target %s' % reg_pol
)
766 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage", "sudoers",
767 "list"), self
.gpo_guid
,
769 os
.environ
["SERVER"],
771 (os
.environ
["USERNAME"],
772 os
.environ
["PASSWORD"]))
773 self
.assertIn(e
.data
, out
, 'The test entry was not found!')
775 # Unstage the Registry.pol file
776 unstage_file(reg_pol
)
778 def test_symlink_list(self
):
780 lp
.load(os
.environ
['SERVERCONFFILE'])
781 local_path
= lp
.get('path', 'sysvol')
782 vgp_xml
= os
.path
.join(local_path
, lp
.get('realm').lower(), 'Policies',
783 self
.gpo_guid
, 'Machine/VGP/VTLA/Unix',
784 'Symlink/manifest.xml')
785 stage
= etree
.Element('vgppolicy')
786 policysetting
= etree
.SubElement(stage
, 'policysetting')
787 pv
= etree
.SubElement(policysetting
, 'version')
789 name
= etree
.SubElement(policysetting
, 'name')
790 name
.text
= 'Symlink Policy'
791 description
= etree
.SubElement(policysetting
, 'description')
792 description
.text
= 'Specifies symbolic link data'
793 apply_mode
= etree
.SubElement(policysetting
, 'apply_mode')
794 apply_mode
.text
= 'merge'
795 data
= etree
.SubElement(policysetting
, 'data')
796 file_properties
= etree
.SubElement(data
, 'file_properties')
797 source
= etree
.SubElement(file_properties
, 'source')
798 source
.text
= os
.path
.join(self
.tempdir
, 'test.source')
799 target
= etree
.SubElement(file_properties
, 'target')
800 target
.text
= os
.path
.join(self
.tempdir
, 'test.target')
801 ret
= stage_file(vgp_xml
, etree
.tostring(stage
, 'utf-8'))
802 self
.assertTrue(ret
, 'Could not create the target %s' % vgp_xml
)
804 symlink
= 'ln -s %s %s' % (source
.text
, target
.text
)
805 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage",
809 os
.environ
["SERVER"],
811 (os
.environ
["USERNAME"],
812 os
.environ
["PASSWORD"]))
813 self
.assertIn(symlink
, out
, 'The test entry was not found!')
815 # Unstage the manifest.xml file
816 unstage_file(vgp_xml
)
818 def test_symlink_add(self
):
819 source_text
= os
.path
.join(self
.tempdir
, 'test.source')
820 target_text
= os
.path
.join(self
.tempdir
, 'test.target')
821 symlink
= 'ln -s %s %s' % (source_text
, target_text
)
822 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage",
825 source_text
, target_text
,
827 os
.environ
["SERVER"],
829 (os
.environ
["USERNAME"],
830 os
.environ
["PASSWORD"]))
831 self
.assertCmdSuccess(result
, out
, err
, 'Symlink add failed')
833 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage",
837 os
.environ
["SERVER"],
839 (os
.environ
["USERNAME"],
840 os
.environ
["PASSWORD"]))
841 self
.assertIn(symlink
, out
, 'The test entry was not found!')
843 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage",
844 "symlink", "remove"),
846 source_text
, target_text
,
848 os
.environ
["SERVER"],
850 (os
.environ
["USERNAME"],
851 os
.environ
["PASSWORD"]))
852 self
.assertCmdSuccess(result
, out
, err
, 'Symlink remove failed')
854 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage",
858 os
.environ
["SERVER"],
860 (os
.environ
["USERNAME"],
861 os
.environ
["PASSWORD"]))
862 self
.assertNotIn(symlink
, out
, 'The test entry was not removed!')
864 def test_files_list(self
):
866 lp
.load(os
.environ
['SERVERCONFFILE'])
867 local_path
= lp
.get('path', 'sysvol')
868 vgp_xml
= os
.path
.join(local_path
, lp
.get('realm').lower(), 'Policies',
869 self
.gpo_guid
, 'Machine/VGP/VTLA/Unix',
870 'Files/manifest.xml')
871 source_file
= os
.path
.join(local_path
, lp
.get('realm').lower(),
872 'Policies', self
.gpo_guid
, 'Machine/VGP',
873 'VTLA/Unix/Files/test.source')
874 stage
= etree
.Element('vgppolicy')
875 policysetting
= etree
.SubElement(stage
, 'policysetting')
876 pv
= etree
.SubElement(policysetting
, 'version')
878 name
= etree
.SubElement(policysetting
, 'name')
880 description
= etree
.SubElement(policysetting
, 'description')
881 description
.text
= 'Represents file data to set/copy on clients'
882 data
= etree
.SubElement(policysetting
, 'data')
883 file_properties
= etree
.SubElement(data
, 'file_properties')
884 source
= etree
.SubElement(file_properties
, 'source')
885 source
.text
= source_file
886 target
= etree
.SubElement(file_properties
, 'target')
887 target
.text
= os
.path
.join(self
.tempdir
, 'test.target')
888 user
= etree
.SubElement(file_properties
, 'user')
889 user
.text
= pwd
.getpwuid(os
.getuid()).pw_name
890 group
= etree
.SubElement(file_properties
, 'group')
891 group
.text
= grp
.getgrgid(os
.getgid()).gr_name
893 # Request permissions of 755
894 permissions
= etree
.SubElement(file_properties
, 'permissions')
895 permissions
.set('type', 'user')
896 etree
.SubElement(permissions
, 'read')
897 etree
.SubElement(permissions
, 'write')
898 etree
.SubElement(permissions
, 'execute')
899 permissions
= etree
.SubElement(file_properties
, 'permissions')
900 permissions
.set('type', 'group')
901 etree
.SubElement(permissions
, 'read')
902 etree
.SubElement(permissions
, 'execute')
903 permissions
= etree
.SubElement(file_properties
, 'permissions')
904 permissions
.set('type', 'other')
905 etree
.SubElement(permissions
, 'read')
906 etree
.SubElement(permissions
, 'execute')
908 ret
= stage_file(vgp_xml
, etree
.tostring(stage
, 'utf-8'))
909 self
.assertTrue(ret
, 'Could not create the target %s' % vgp_xml
)
911 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage",
915 os
.environ
["SERVER"],
917 (os
.environ
["USERNAME"],
918 os
.environ
["PASSWORD"]))
919 self
.assertIn(target
.text
, out
, 'The test entry was not found!')
920 self
.assertIn('-rwxr-xr-x', out
,
921 'The test entry permissions were not found')
923 # Unstage the manifest.xml file
924 unstage_file(vgp_xml
)
926 def test_files_add(self
):
928 lp
.load(os
.environ
['SERVERCONFFILE'])
929 local_path
= lp
.get('path', 'sysvol')
930 sysvol_source
= os
.path
.join(local_path
, lp
.get('realm').lower(),
931 'Policies', self
.gpo_guid
, 'Machine/VGP',
932 'VTLA/Unix/Files/test.source')
933 source_file
= os
.path
.join(self
.tempdir
, 'test.source')
934 source_data
= '#!/bin/sh\necho hello world'
935 with
open(source_file
, 'w') as w
:
937 target_file
= os
.path
.join(self
.tempdir
, 'test.target')
938 user
= pwd
.getpwuid(os
.getuid()).pw_name
939 group
= grp
.getgrgid(os
.getgid()).gr_name
940 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage",
948 os
.environ
["SERVER"],
950 (os
.environ
["USERNAME"],
951 os
.environ
["PASSWORD"]))
952 self
.assertCmdSuccess(result
, out
, err
, 'File add failed')
953 self
.assertIn(source_data
, open(sysvol_source
, 'r').read(),
954 'Failed to find the source file on the sysvol')
956 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage",
960 os
.environ
["SERVER"],
962 (os
.environ
["USERNAME"],
963 os
.environ
["PASSWORD"]))
964 self
.assertIn(target_file
, out
, 'The test entry was not found!')
965 self
.assertIn('-rwxr-xr-x', out
,
966 'The test entry permissions were not found')
968 os
.unlink(source_file
)
970 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage",
975 os
.environ
["SERVER"],
977 (os
.environ
["USERNAME"],
978 os
.environ
["PASSWORD"]))
979 self
.assertCmdSuccess(result
, out
, err
, 'File remove failed')
981 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage",
985 os
.environ
["SERVER"],
987 (os
.environ
["USERNAME"],
988 os
.environ
["PASSWORD"]))
989 self
.assertNotIn(target_file
, out
, 'The test entry was still found!')
991 def test_vgp_openssh_list(self
):
993 lp
.load(os
.environ
['SERVERCONFFILE'])
994 local_path
= lp
.get('path', 'sysvol')
995 vgp_xml
= os
.path
.join(local_path
, lp
.get('realm').lower(), 'Policies',
996 self
.gpo_guid
, 'Machine/VGP/VTLA/SshCfg',
999 stage
= etree
.Element('vgppolicy')
1000 policysetting
= etree
.SubElement(stage
, 'policysetting')
1001 pv
= etree
.SubElement(policysetting
, 'version')
1003 name
= etree
.SubElement(policysetting
, 'name')
1004 name
.text
= 'Configuration File'
1005 description
= etree
.SubElement(policysetting
, 'description')
1006 description
.text
= 'Represents Unix configuration file settings'
1007 apply_mode
= etree
.SubElement(policysetting
, 'apply_mode')
1008 apply_mode
.text
= 'merge'
1009 data
= etree
.SubElement(policysetting
, 'data')
1010 configfile
= etree
.SubElement(data
, 'configfile')
1011 etree
.SubElement(configfile
, 'filename')
1012 configsection
= etree
.SubElement(configfile
, 'configsection')
1013 etree
.SubElement(configsection
, 'sectionname')
1014 opt
= etree
.SubElement(configsection
, 'keyvaluepair')
1015 key
= etree
.SubElement(opt
, 'key')
1016 key
.text
= 'KerberosAuthentication'
1017 value
= etree
.SubElement(opt
, 'value')
1019 ret
= stage_file(vgp_xml
, etree
.tostring(stage
, 'utf-8'))
1020 self
.assertTrue(ret
, 'Could not create the target %s' % vgp_xml
)
1022 openssh
= 'KerberosAuthentication Yes'
1023 (result
, out
, err
) = self
.runsublevelcmd("gpo", ("manage",
1025 self
.gpo_guid
, "-H",
1027 os
.environ
["SERVER"],
1029 (os
.environ
["USERNAME"],
1030 os
.environ
["PASSWORD"]))
1031 self
.assertIn(openssh
, out
, 'The test entry was not found!')
1033 # Unstage the manifest.xml file
1034 unstage_file(vgp_xml
)
1037 """set up a temporary GPO to work with"""
1038 super(GpoCmdTestCase
, self
).setUp()
1039 (result
, out
, err
) = self
.runsubcmd("gpo", "create", self
.gpo_name
,
1040 "-H", "ldap://%s" % os
.environ
["SERVER"],
1041 "-U%s%%%s" % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"]),
1042 "--tmpdir", self
.tempdir
)
1043 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo created successfully")
1044 shutil
.rmtree(os
.path
.join(self
.tempdir
, "policy"))
1046 self
.gpo_guid
= "{%s}" % out
.split("{")[1].split("}")[0]
1048 self
.fail("Failed to find GUID in output: %s" % out
)
1050 self
.backup_path
= os
.path
.join(samba
.source_tree_topdir(), 'source4',
1051 'selftest', 'provisions',
1052 'generalized-gpo-backup')
1054 self
.entity_file
= os
.path
.join(self
.backup_path
, 'entities')
1057 """remove the temporary GPO to work with"""
1058 (result
, out
, err
) = self
.runsubcmd("gpo", "del", self
.gpo_guid
, "-H", "ldap://%s" % os
.environ
["SERVER"], "-U%s%%%s" % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"]))
1059 self
.assertCmdSuccess(result
, out
, err
, "Ensuring gpo deleted successfully")
1060 super(GpoCmdTestCase
, self
).tearDown()