s4-dsdb-test/reanimate: Fix whitespaces according to PEP8
[Samba.git] / source4 / dsdb / tests / python / tombstone_reanimation.py
blobfaa78bbdd1c6d8c35913160703e475bcfcab3a6f
1 #!/usr/bin/env python
3 # Tombstone reanimation tests
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2014
6 # Copyright (C) Nadezhda Ivanova <nivanova@symas.com> 2014
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 import sys
22 import optparse
23 import unittest
25 sys.path.insert(0, "bin/python")
26 import samba
28 import samba.tests
29 import samba.getopt as options
30 from ldb import (SCOPE_BASE, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message,
31 MessageElement, LdbError,
32 ERR_ATTRIBUTE_OR_VALUE_EXISTS, ERR_NO_SUCH_OBJECT, ERR_ENTRY_ALREADY_EXISTS,
33 ERR_OPERATIONS_ERROR, ERR_UNWILLING_TO_PERFORM)
36 class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase):
37 """ verify Samba restores required attributes when
38 user restores a Deleted object
39 """
41 def setUp(self):
42 super(RestoredObjectAttributesBaseTestCase, self).setUp()
43 # load LoadParm
44 lp = options.SambaOptions(optparse.OptionParser()).get_loadparm()
45 self.samdb = samba.tests.connect_samdb_env("TEST_SERVER", "TEST_USERNAME", "TEST_PASSWORD", lp=lp)
46 self.base_dn = self.samdb.domain_dn()
47 self.schema_dn = self.samdb.get_schema_basedn().get_linearized()
48 self.configuration_dn = self.samdb.get_config_basedn().get_linearized()
49 # Get the old "dSHeuristics" if it was set
50 self.dsheuristics = self.samdb.get_dsheuristics()
51 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
52 self.samdb.set_dsheuristics("000000001")
53 # Get the old "minPwdAge"
54 self.minPwdAge = self.samdb.get_minPwdAge()
55 # Set it temporary to "0"
56 self.samdb.set_minPwdAge("0")
58 def tearDown(self):
59 super(RestoredObjectAttributesBaseTestCase, self).tearDown()
60 # Reset the "dSHeuristics" as they were before
61 self.samdb.set_dsheuristics(self.dsheuristics)
62 # Reset the "minPwdAge" as it was before
63 self.samdb.set_minPwdAge(self.minPwdAge)
65 def GUID_string(self, guid):
66 return self.samdb.schema_format_value("objectGUID", guid)
68 def search_guid(self, guid):
69 res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid),
70 scope=SCOPE_BASE, controls=["show_deleted:1"])
71 self.assertEquals(len(res), 1)
72 return res[0]
74 def search_dn(self, dn):
75 res = self.samdb.search(expression="(objectClass=*)",
76 base=dn,
77 scope=SCOPE_BASE,
78 controls=["show_recycled:1"])
79 self.assertEquals(len(res), 1)
80 return res[0]
82 def _create_object(self, msg):
83 """:param msg: dict with dn and attributes to create an object from"""
84 # delete an object if leftover from previous test
85 samba.tests.delete_force(self.samdb, msg['dn'])
86 self.samdb.add(msg)
87 return self.search_dn(msg['dn'])
89 def assertAttributesEqual(self, obj_orig, attrs_orig, obj_restored, attrs_rest):
90 self.assertSetEqual(attrs_orig, attrs_rest)
91 # remove volatile attributes, they can't be equal
92 attrs_orig -= set(["uSNChanged", "dSCorePropagationData", "whenChanged"])
93 for attr in attrs_orig:
94 # convert original attr value to ldif
95 orig_val = obj_orig.get(attr)
96 if orig_val is None:
97 continue
98 if not isinstance(orig_val, MessageElement):
99 orig_val = MessageElement(str(orig_val), 0, attr )
100 m = Message()
101 m.add(orig_val)
102 orig_ldif = self.samdb.write_ldif(m, 0)
103 # convert restored attr value to ldif
104 rest_val = obj_restored.get(attr)
105 self.assertIsNotNone(rest_val)
106 m = Message()
107 if not isinstance(rest_val, MessageElement):
108 rest_val = MessageElement(str(rest_val), 0, attr)
109 m.add(rest_val)
110 rest_ldif = self.samdb.write_ldif(m, 0)
111 # compare generated ldif's
112 self.assertEqual(orig_ldif.lower(), rest_ldif.lower())
114 @staticmethod
115 def restore_deleted_object(samdb, del_dn, new_dn):
116 """Restores a deleted object
117 :param samdb: SamDB connection to SAM
118 :param del_dn: str Deleted object DN
119 :param new_dn: str Where to restore the object
121 msg = Message()
122 msg.dn = Dn(samdb, str(del_dn))
123 msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
124 msg["distinguishedName"] = MessageElement([str(new_dn)], FLAG_MOD_REPLACE, "distinguishedName")
125 samdb.modify(msg, ["show_deleted:1"])
128 class BaseRestoreObjectTestCase(RestoredObjectAttributesBaseTestCase):
129 def setUp(self):
130 super(BaseRestoreObjectTestCase, self).setUp()
132 def enable_recycle_bin(self):
133 msg = Message()
134 msg.dn = Dn(self.samdb, "")
135 msg["enableOptionalFeature"] = MessageElement(
136 "CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
137 FLAG_MOD_ADD, "enableOptionalFeature")
138 try:
139 self.samdb.modify(msg)
140 except LdbError, (num, _):
141 self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
143 def undelete_deleted(self, olddn, newdn, samldb):
144 msg = Message()
145 msg.dn = Dn(samldb, olddn)
146 msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
147 msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
148 samldb.modify(msg, ["show_deleted:1"])
150 def undelete_deleted_with_mod(self, olddn, newdn):
151 msg = Message()
152 msg.dn = Dn(self.samdb, olddn)
153 msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
154 msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
155 msg["url"] = MessageElement(["www.samba.org"], FLAG_MOD_REPLACE, "url")
156 self.samdb.modify(msg, ["show_deleted:1"])
159 def test_undelete(self):
160 print "Testing standard undelete operation"
161 usr1 = "cn=testuser,cn=users," + self.base_dn
162 samba.tests.delete_force(self.samdb, usr1)
163 self.samdb.add({
164 "dn": usr1,
165 "objectclass": "user",
166 "description": "test user description",
167 "samaccountname": "testuser"})
168 objLive1 = self.search_dn(usr1)
169 guid1 = objLive1["objectGUID"][0]
170 self.samdb.delete(usr1)
171 objDeleted1 = self.search_guid(guid1)
172 self.undelete_deleted(str(objDeleted1.dn), usr1, self.samdb)
173 objLive2 = self.search_dn(usr1)
174 self.assertEqual(str(objLive2.dn).lower(), str(objLive1.dn).lower())
175 samba.tests.delete_force(self.samdb, usr1)
177 def test_rename(self):
178 print "Testing attempt to rename deleted object"
179 usr1 = "cn=testuser,cn=users," + self.base_dn
180 self.samdb.add({
181 "dn": usr1,
182 "objectclass": "user",
183 "description": "test user description",
184 "samaccountname": "testuser"})
185 objLive1 = self.search_dn(usr1)
186 guid1 = objLive1["objectGUID"][0]
187 self.samdb.delete(usr1)
188 objDeleted1 = self.search_guid(guid1)
189 # just to make sure we get the correct error if the show deleted is missing
190 try:
191 self.samdb.rename(str(objDeleted1.dn), usr1)
192 self.fail()
193 except LdbError, (num, _):
194 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
196 try:
197 self.samdb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"])
198 self.fail()
199 except LdbError, (num, _):
200 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
202 def test_undelete_with_mod(self):
203 print "Testing standard undelete operation with modification of additional attributes"
204 usr1 = "cn=testuser,cn=users," + self.base_dn
205 self.samdb.add({
206 "dn": usr1,
207 "objectclass": "user",
208 "description": "test user description",
209 "samaccountname": "testuser"})
210 objLive1 = self.search_dn(usr1)
211 guid1 = objLive1["objectGUID"][0]
212 self.samdb.delete(usr1)
213 objDeleted1 = self.search_guid(guid1)
214 self.undelete_deleted_with_mod(str(objDeleted1.dn), usr1)
215 objLive2 = self.search_dn(usr1)
216 self.assertEqual(objLive2["url"][0], "www.samba.org")
217 samba.tests.delete_force(self.samdb, usr1)
219 def test_undelete_newuser(self):
220 print "Testing undelete user with a different dn"
221 usr1 = "cn=testuser,cn=users," + self.base_dn
222 usr2 = "cn=testuser2,cn=users," + self.base_dn
223 samba.tests.delete_force(self.samdb, usr1)
224 self.samdb.add({
225 "dn": usr1,
226 "objectclass": "user",
227 "description": "test user description",
228 "samaccountname": "testuser"})
229 objLive1 = self.search_dn(usr1)
230 guid1 = objLive1["objectGUID"][0]
231 self.samdb.delete(usr1)
232 objDeleted1 = self.search_guid(guid1)
233 self.undelete_deleted(str(objDeleted1.dn), usr2, self.samdb)
234 objLive2 = self.search_dn(usr2)
235 samba.tests.delete_force(self.samdb, usr1)
236 samba.tests.delete_force(self.samdb, usr2)
238 def test_undelete_existing(self):
239 print "Testing undelete user after a user with the same dn has been created"
240 usr1 = "cn=testuser,cn=users," + self.base_dn
241 self.samdb.add({
242 "dn": usr1,
243 "objectclass": "user",
244 "description": "test user description",
245 "samaccountname": "testuser"})
246 objLive1 = self.search_dn(usr1)
247 guid1 = objLive1["objectGUID"][0]
248 self.samdb.delete(usr1)
249 self.samdb.add({
250 "dn": usr1,
251 "objectclass": "user",
252 "description": "test user description",
253 "samaccountname": "testuser"})
254 objDeleted1 = self.search_guid(guid1)
255 try:
256 self.undelete_deleted(str(objDeleted1.dn), usr1, self.samdb)
257 self.fail()
258 except LdbError, (num, _):
259 self.assertEquals(num, ERR_ENTRY_ALREADY_EXISTS)
261 def test_undelete_cross_nc(self):
262 print "Cross NC undelete"
263 c1 = "cn=ldaptestcontainer," + self.base_dn
264 c2 = "cn=ldaptestcontainer2," + self.configuration_dn
265 c3 = "cn=ldaptestcontainer," + self.configuration_dn
266 c4 = "cn=ldaptestcontainer2," + self.base_dn
267 self.samdb.add({
268 "dn": c1,
269 "objectclass": "container"})
270 self.samdb.add({
271 "dn": c2,
272 "objectclass": "container"})
273 objLive1 = self.search_dn(c1)
274 objLive2 = self.search_dn(c2)
275 guid1 = objLive1["objectGUID"][0]
276 guid2 = objLive2["objectGUID"][0]
277 self.samdb.delete(c1)
278 self.samdb.delete(c2)
279 objDeleted1 = self.search_guid(guid1)
280 objDeleted2 = self.search_guid(guid2)
281 # try to undelete from base dn to config
282 try:
283 self.undelete_deleted(str(objDeleted1.dn), c3, self.samdb)
284 self.fail()
285 except LdbError, (num, _):
286 self.assertEquals(num, ERR_OPERATIONS_ERROR)
287 #try to undelete from config to base dn
288 try:
289 self.undelete_deleted(str(objDeleted2.dn), c4, self.samdb)
290 self.fail()
291 except LdbError, (num, _):
292 self.assertEquals(num, ERR_OPERATIONS_ERROR)
293 #assert undeletion will work in same nc
294 self.undelete_deleted(str(objDeleted1.dn), c4, self.samdb)
295 self.undelete_deleted(str(objDeleted2.dn), c3, self.samdb)
296 samba.tests.delete_force(self.samdb, c3)
297 samba.tests.delete_force(self.samdb, c4)
300 class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
301 """Test cases for delete/reanimate user objects"""
303 def test_restore_user(self):
304 print "Test restored user attributes"
305 username = "restore_user"
306 usr_dn = "cn=%s,cn=users,%s" % (username, self.base_dn)
307 samba.tests.delete_force(self.samdb, usr_dn)
308 self.samdb.add({
309 "dn": usr_dn,
310 "objectClass": "user",
311 "sAMAccountName": username})
312 obj = self.search_dn(usr_dn)
313 guid = obj["objectGUID"][0]
314 self.samdb.delete(usr_dn)
315 obj_del = self.search_guid(guid)
316 # restore the user and fetch what's restored
317 self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn)
318 obj_restore = self.search_guid(guid)
319 # check original attributes and restored one are same
320 orig_attrs = set(obj.keys())
321 # windows restore more attributes that originally we have
322 orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
323 rest_attrs = set(obj_restore.keys())
324 self.assertSetEqual(orig_attrs, rest_attrs)
327 class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase):
328 """Test different scenarios for delete/reanimate group objects"""
330 def _make_object_dn(self, name):
331 return "cn=%s,cn=users,%s" % (name, self.base_dn)
333 def _create_test_user(self, user_name):
334 user_dn = self._make_object_dn(user_name)
335 ldif = {
336 "dn": user_dn,
337 "objectClass": "user",
338 "sAMAccountName": user_name,
340 # delete an object if leftover from previous test
341 samba.tests.delete_force(self.samdb, user_dn)
342 # finally, create the group
343 self.samdb.add(ldif)
344 return self.search_dn(user_dn)
346 def _create_test_group(self, group_name, members=None):
347 group_dn = self._make_object_dn(group_name)
348 ldif = {
349 "dn": group_dn,
350 "objectClass": "group",
351 "sAMAccountName": group_name,
353 try:
354 ldif["member"] = [str(usr_dn) for usr_dn in members]
355 except TypeError:
356 pass
357 # delete an object if leftover from previous test
358 samba.tests.delete_force(self.samdb, group_dn)
359 # finally, create the group
360 self.samdb.add(ldif)
361 return self.search_dn(group_dn)
363 def test_plain_group(self):
364 print "Test restored Group attributes"
365 # create test group
366 obj = self._create_test_group("r_group")
367 guid = obj["objectGUID"][0]
368 # delete the group
369 self.samdb.delete(str(obj.dn))
370 obj_del = self.search_guid(guid)
371 # restore the Group and fetch what's restored
372 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
373 obj_restore = self.search_guid(guid)
374 # check original attributes and restored one are same
375 attr_orig = set(obj.keys())
376 # windows restore more attributes that originally we have
377 attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
378 attr_rest = set(obj_restore.keys())
379 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
381 def test_group_with_members(self):
382 print "Test restored Group with members attributes"
383 # create test group
384 usr1 = self._create_test_user("r_user_1")
385 usr2 = self._create_test_user("r_user_2")
386 obj = self._create_test_group("r_group", [usr1.dn, usr2.dn])
387 guid = obj["objectGUID"][0]
388 # delete the group
389 self.samdb.delete(str(obj.dn))
390 obj_del = self.search_guid(guid)
391 # restore the Group and fetch what's restored
392 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
393 obj_restore = self.search_guid(guid)
394 # check original attributes and restored one are same
395 attr_orig = set(obj.keys())
396 # windows restore more attributes that originally we have
397 attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
398 # and does not restore following attributes
399 attr_orig.remove("member")
400 attr_rest = set(obj_restore.keys())
401 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
404 class RestoreContainerObjectTestCase(RestoredObjectAttributesBaseTestCase):
405 """Test different scenarios for delete/reanimate OU/container objects"""
407 def _create_test_ou(self, rdn, name=None, description=None):
408 ou_dn = "OU=%s,%s" % (rdn, self.base_dn)
409 # delete an object if leftover from previous test
410 samba.tests.delete_force(self.samdb, ou_dn)
411 # create ou and return created object
412 self.samdb.create_ou(ou_dn, name=name, description=description)
413 return self.search_dn(ou_dn)
415 def test_ou_with_name_description(self):
416 print "Test OU reanimation"
417 # create OU to test with
418 obj = self._create_test_ou(rdn="r_ou",
419 name="r_ou name",
420 description="r_ou description")
421 guid = obj["objectGUID"][0]
422 # delete the object
423 self.samdb.delete(str(obj.dn))
424 obj_del = self.search_guid(guid)
425 # restore the Object and fetch what's restored
426 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
427 obj_restore = self.search_guid(guid)
428 # check original attributes and restored one are same
429 attr_orig = set(obj.keys())
430 attr_rest = set(obj_restore.keys())
431 # windows restore more attributes that originally we have
432 attr_orig.update(["lastKnownParent"])
433 # and does not restore following attributes
434 attr_orig -= {"description"}
435 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
437 def test_container(self):
438 print "Test Container reanimation"
439 # create test Container
440 obj = self._create_object({
441 "dn": "CN=r_container,CN=Users,%s" % self.base_dn,
442 "objectClass": "container"
444 guid = obj["objectGUID"][0]
445 # delete the object
446 self.samdb.delete(str(obj.dn))
447 obj_del = self.search_guid(guid)
448 # restore the Object and fetch what's restored
449 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
450 obj_restore = self.search_guid(guid)
451 # check original attributes and restored one are same
452 attr_orig = set(obj.keys())
453 attr_rest = set(obj_restore.keys())
454 # windows restore more attributes that originally we have
455 attr_orig.update(["lastKnownParent"])
456 # and does not restore following attributes
457 attr_orig -= {"showInAdvancedViewOnly"}
458 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
461 if __name__ == '__main__':
462 unittest.main()