s4-dsdb/tests: Assert on expected set of attributes for restored objects
[Samba.git] / source4 / dsdb / tests / python / tombstone_reanimation.py
blob640727931e04f6a8c9a1bbb455ea5b8632ec571e
1 #!/usr/bin/env python
3 # Tombstone reanimation tests
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2014
6 # Copyright (C) Nadezhda Ivanova <nivanova@symas.com> 2014
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 import sys
22 import unittest
24 sys.path.insert(0, "bin/python")
25 import samba
27 import samba.tests
28 from ldb import (SCOPE_BASE, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message,
29 MessageElement, LdbError,
30 ERR_ATTRIBUTE_OR_VALUE_EXISTS, ERR_NO_SUCH_OBJECT, ERR_ENTRY_ALREADY_EXISTS,
31 ERR_OPERATIONS_ERROR, ERR_UNWILLING_TO_PERFORM)
34 class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase):
35 """ verify Samba restores required attributes when
36 user restores a Deleted object
37 """
39 def setUp(self):
40 super(RestoredObjectAttributesBaseTestCase, self).setUp()
41 self.samdb = samba.tests.connect_samdb_env("TEST_SERVER", "TEST_USERNAME", "TEST_PASSWORD")
42 self.base_dn = self.samdb.domain_dn()
43 self.schema_dn = self.samdb.get_schema_basedn().get_linearized()
44 self.configuration_dn = self.samdb.get_config_basedn().get_linearized()
45 # Get the old "dSHeuristics" if it was set
46 self.dsheuristics = self.samdb.get_dsheuristics()
47 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
48 self.samdb.set_dsheuristics("000000001")
49 # Get the old "minPwdAge"
50 self.minPwdAge = self.samdb.get_minPwdAge()
51 # Set it temporary to "0"
52 self.samdb.set_minPwdAge("0")
54 def tearDown(self):
55 super(RestoredObjectAttributesBaseTestCase, self).tearDown()
56 # Reset the "dSHeuristics" as they were before
57 self.samdb.set_dsheuristics(self.dsheuristics)
58 # Reset the "minPwdAge" as it was before
59 self.samdb.set_minPwdAge(self.minPwdAge)
61 def GUID_string(self, guid):
62 return self.samdb.schema_format_value("objectGUID", guid)
64 def search_guid(self, guid):
65 res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid),
66 scope=SCOPE_BASE, controls=["show_deleted:1"])
67 self.assertEquals(len(res), 1)
68 return res[0]
70 def search_dn(self, dn):
71 res = self.samdb.search(expression="(objectClass=*)",
72 base=dn,
73 scope=SCOPE_BASE,
74 controls=["show_recycled:1"])
75 self.assertEquals(len(res), 1)
76 return res[0]
78 def _create_object(self, msg):
79 """:param msg: dict with dn and attributes to create an object from"""
80 # delete an object if leftover from previous test
81 samba.tests.delete_force(self.samdb, msg['dn'])
82 self.samdb.add(msg)
83 return self.search_dn(msg['dn'])
85 def assertAttributesEqual(self, obj_orig, attrs_orig, obj_restored, attrs_rest):
86 self.assertEqual(attrs_orig, attrs_rest, "Actual object does not has expected attributes")
87 # remove volatile attributes, they can't be equal
88 attrs_orig -= set(["uSNChanged", "dSCorePropagationData", "whenChanged"])
89 for attr in attrs_orig:
90 # convert original attr value to ldif
91 orig_val = obj_orig.get(attr)
92 if orig_val is None:
93 continue
94 if not isinstance(orig_val, MessageElement):
95 orig_val = MessageElement(str(orig_val), 0, attr )
96 m = Message()
97 m.add(orig_val)
98 orig_ldif = self.samdb.write_ldif(m, 0)
99 # convert restored attr value to ldif
100 rest_val = obj_restored.get(attr)
101 self.assertFalse(rest_val is None)
102 m = Message()
103 if not isinstance(rest_val, MessageElement):
104 rest_val = MessageElement(str(rest_val), 0, attr)
105 m.add(rest_val)
106 rest_ldif = self.samdb.write_ldif(m, 0)
107 # compare generated ldif's
108 self.assertEqual(orig_ldif.lower(), rest_ldif.lower())
110 def assertAttributesExists(self, attr_expected, obj_msg):
111 """Check object contains at least expected attrbigutes
112 :param attr_expected: dict of expected attributes with values. ** is any value
113 :param obj_msg: Ldb.Message for the object under test
115 actual_names = set(obj_msg.keys())
116 # Samba does not use 'dSCorePropagationData', so skip it
117 actual_names -= set(['dSCorePropagationData'])
118 self.assertEqual(set(attr_expected.keys()), actual_names, "Actual object does not has expected attributes")
119 for name in attr_expected.keys():
120 expected_val = attr_expected[name]
121 actual_val = obj_msg.get(name)
122 self.assertFalse(actual_val is None, "No value for attribute '%s'" % name)
123 if expected_val == "**":
124 # "**" values means "any"
125 continue
126 self.assertEqual(expected_val.lower(), str(actual_val).lower(),
127 "Unexpected value for '%s'" % name)
129 @staticmethod
130 def restore_deleted_object(samdb, del_dn, new_dn, new_attrs=None):
131 """Restores a deleted object
132 :param samdb: SamDB connection to SAM
133 :param del_dn: str Deleted object DN
134 :param new_dn: str Where to restore the object
135 :param new_attrs: dict Additional attributes to set
137 msg = Message()
138 msg.dn = Dn(samdb, str(del_dn))
139 msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
140 msg["distinguishedName"] = MessageElement([str(new_dn)], FLAG_MOD_REPLACE, "distinguishedName")
141 if new_attrs is not None:
142 assert isinstance(new_attrs, dict)
143 for attr in new_attrs:
144 msg[attr] = MessageElement(new_attrs[attr], FLAG_MOD_REPLACE, attr)
145 samdb.modify(msg, ["show_deleted:1"])
148 class BaseRestoreObjectTestCase(RestoredObjectAttributesBaseTestCase):
149 def setUp(self):
150 super(BaseRestoreObjectTestCase, self).setUp()
152 def enable_recycle_bin(self):
153 msg = Message()
154 msg.dn = Dn(self.samdb, "")
155 msg["enableOptionalFeature"] = MessageElement(
156 "CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
157 FLAG_MOD_ADD, "enableOptionalFeature")
158 try:
159 self.samdb.modify(msg)
160 except LdbError, (num, _):
161 self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
163 def test_undelete(self):
164 print "Testing standard undelete operation"
165 usr1 = "cn=testuser,cn=users," + self.base_dn
166 samba.tests.delete_force(self.samdb, usr1)
167 self.samdb.add({
168 "dn": usr1,
169 "objectclass": "user",
170 "description": "test user description",
171 "samaccountname": "testuser"})
172 objLive1 = self.search_dn(usr1)
173 guid1 = objLive1["objectGUID"][0]
174 self.samdb.delete(usr1)
175 objDeleted1 = self.search_guid(guid1)
176 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
177 objLive2 = self.search_dn(usr1)
178 self.assertEqual(str(objLive2.dn).lower(), str(objLive1.dn).lower())
179 samba.tests.delete_force(self.samdb, usr1)
181 def test_rename(self):
182 print "Testing attempt to rename deleted object"
183 usr1 = "cn=testuser,cn=users," + self.base_dn
184 self.samdb.add({
185 "dn": usr1,
186 "objectclass": "user",
187 "description": "test user description",
188 "samaccountname": "testuser"})
189 objLive1 = self.search_dn(usr1)
190 guid1 = objLive1["objectGUID"][0]
191 self.samdb.delete(usr1)
192 objDeleted1 = self.search_guid(guid1)
193 # just to make sure we get the correct error if the show deleted is missing
194 try:
195 self.samdb.rename(str(objDeleted1.dn), usr1)
196 self.fail()
197 except LdbError, (num, _):
198 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
200 try:
201 self.samdb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"])
202 self.fail()
203 except LdbError, (num, _):
204 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
206 def test_undelete_with_mod(self):
207 print "Testing standard undelete operation with modification of additional attributes"
208 usr1 = "cn=testuser,cn=users," + self.base_dn
209 self.samdb.add({
210 "dn": usr1,
211 "objectclass": "user",
212 "description": "test user description",
213 "samaccountname": "testuser"})
214 objLive1 = self.search_dn(usr1)
215 guid1 = objLive1["objectGUID"][0]
216 self.samdb.delete(usr1)
217 objDeleted1 = self.search_guid(guid1)
218 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1, {"url": "www.samba.org"})
219 objLive2 = self.search_dn(usr1)
220 self.assertEqual(objLive2["url"][0], "www.samba.org")
221 samba.tests.delete_force(self.samdb, usr1)
223 def test_undelete_newuser(self):
224 print "Testing undelete user with a different dn"
225 usr1 = "cn=testuser,cn=users," + self.base_dn
226 usr2 = "cn=testuser2,cn=users," + self.base_dn
227 samba.tests.delete_force(self.samdb, usr1)
228 self.samdb.add({
229 "dn": usr1,
230 "objectclass": "user",
231 "description": "test user description",
232 "samaccountname": "testuser"})
233 objLive1 = self.search_dn(usr1)
234 guid1 = objLive1["objectGUID"][0]
235 self.samdb.delete(usr1)
236 objDeleted1 = self.search_guid(guid1)
237 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr2)
238 objLive2 = self.search_dn(usr2)
239 samba.tests.delete_force(self.samdb, usr1)
240 samba.tests.delete_force(self.samdb, usr2)
242 def test_undelete_existing(self):
243 print "Testing undelete user after a user with the same dn has been created"
244 usr1 = "cn=testuser,cn=users," + self.base_dn
245 self.samdb.add({
246 "dn": usr1,
247 "objectclass": "user",
248 "description": "test user description",
249 "samaccountname": "testuser"})
250 objLive1 = self.search_dn(usr1)
251 guid1 = objLive1["objectGUID"][0]
252 self.samdb.delete(usr1)
253 self.samdb.add({
254 "dn": usr1,
255 "objectclass": "user",
256 "description": "test user description",
257 "samaccountname": "testuser"})
258 objDeleted1 = self.search_guid(guid1)
259 try:
260 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
261 self.fail()
262 except LdbError, (num, _):
263 self.assertEquals(num, ERR_ENTRY_ALREADY_EXISTS)
265 def test_undelete_cross_nc(self):
266 print "Cross NC undelete"
267 c1 = "cn=ldaptestcontainer," + self.base_dn
268 c2 = "cn=ldaptestcontainer2," + self.configuration_dn
269 c3 = "cn=ldaptestcontainer," + self.configuration_dn
270 c4 = "cn=ldaptestcontainer2," + self.base_dn
271 samba.tests.delete_force(self.samdb, c1)
272 samba.tests.delete_force(self.samdb, c2)
273 samba.tests.delete_force(self.samdb, c3)
274 samba.tests.delete_force(self.samdb, c4)
275 self.samdb.add({
276 "dn": c1,
277 "objectclass": "container"})
278 self.samdb.add({
279 "dn": c2,
280 "objectclass": "container"})
281 objLive1 = self.search_dn(c1)
282 objLive2 = self.search_dn(c2)
283 guid1 = objLive1["objectGUID"][0]
284 guid2 = objLive2["objectGUID"][0]
285 self.samdb.delete(c1)
286 self.samdb.delete(c2)
287 objDeleted1 = self.search_guid(guid1)
288 objDeleted2 = self.search_guid(guid2)
289 # try to undelete from base dn to config
290 try:
291 self.restore_deleted_object(self.samdb, objDeleted1.dn, c3)
292 self.fail()
293 except LdbError, (num, _):
294 self.assertEquals(num, ERR_OPERATIONS_ERROR)
295 #try to undelete from config to base dn
296 try:
297 self.restore_deleted_object(self.samdb, objDeleted2.dn, c4)
298 self.fail()
299 except LdbError, (num, _):
300 self.assertEquals(num, ERR_OPERATIONS_ERROR)
301 #assert undeletion will work in same nc
302 self.restore_deleted_object(self.samdb, objDeleted1.dn, c4)
303 self.restore_deleted_object(self.samdb, objDeleted2.dn, c3)
306 class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
307 """Test cases for delete/reanimate user objects"""
309 def _expected_user_attributes(self, username, user_dn, category):
310 return {'dn': user_dn,
311 'objectClass': '**',
312 'cn': username,
313 'distinguishedName': user_dn,
314 'instanceType': '4',
315 'whenCreated': '**',
316 'whenChanged': '**',
317 'uSNCreated': '**',
318 'uSNChanged': '**',
319 'name': username,
320 'objectGUID': '**',
321 'userAccountControl': '546',
322 'badPwdCount': '0',
323 'badPasswordTime': '0',
324 'codePage': '0',
325 'countryCode': '0',
326 'lastLogon': '0',
327 'lastLogoff': '0',
328 'pwdLastSet': '0',
329 'primaryGroupID': '513',
330 'operatorCount': '0',
331 'objectSid': '**',
332 'adminCount': '0',
333 'accountExpires': '9223372036854775807',
334 'logonCount': '0',
335 'sAMAccountName': username,
336 'sAMAccountType': '805306368',
337 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
338 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
341 def test_restore_user(self):
342 print "Test restored user attributes"
343 username = "restore_user"
344 usr_dn = "cn=%s,cn=users,%s" % (username, self.base_dn)
345 samba.tests.delete_force(self.samdb, usr_dn)
346 self.samdb.add({
347 "dn": usr_dn,
348 "objectClass": "user",
349 "sAMAccountName": username})
350 obj = self.search_dn(usr_dn)
351 guid = obj["objectGUID"][0]
352 self.samdb.delete(usr_dn)
353 obj_del = self.search_guid(guid)
354 # restore the user and fetch what's restored
355 self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn)
356 obj_restore = self.search_guid(guid)
357 # check original attributes and restored one are same
358 orig_attrs = set(obj.keys())
359 # windows restore more attributes that originally we have
360 orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
361 rest_attrs = set(obj_restore.keys())
362 self.assertEqual(orig_attrs, rest_attrs, "Actual object does not has expected attributes")
363 self.assertAttributesExists(self._expected_user_attributes(username, usr_dn, "Person"), obj_restore)
366 class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase):
367 """Test different scenarios for delete/reanimate group objects"""
369 def _make_object_dn(self, name):
370 return "cn=%s,cn=users,%s" % (name, self.base_dn)
372 def _create_test_user(self, user_name):
373 user_dn = self._make_object_dn(user_name)
374 ldif = {
375 "dn": user_dn,
376 "objectClass": "user",
377 "sAMAccountName": user_name,
379 # delete an object if leftover from previous test
380 samba.tests.delete_force(self.samdb, user_dn)
381 # finally, create the group
382 self.samdb.add(ldif)
383 return self.search_dn(user_dn)
385 def _create_test_group(self, group_name, members=None):
386 group_dn = self._make_object_dn(group_name)
387 ldif = {
388 "dn": group_dn,
389 "objectClass": "group",
390 "sAMAccountName": group_name,
392 try:
393 ldif["member"] = [str(usr_dn) for usr_dn in members]
394 except TypeError:
395 pass
396 # delete an object if leftover from previous test
397 samba.tests.delete_force(self.samdb, group_dn)
398 # finally, create the group
399 self.samdb.add(ldif)
400 return self.search_dn(group_dn)
402 def _expected_group_attributes(self, groupname, group_dn, category):
403 return {'dn': group_dn,
404 'groupType': '-2147483646',
405 'distinguishedName': group_dn,
406 'sAMAccountName': groupname,
407 'name': groupname,
408 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn),
409 'objectClass': '**',
410 'objectGUID': '**',
411 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
412 'whenChanged': '**',
413 'sAMAccountType': '268435456',
414 'objectSid': '**',
415 'whenCreated': '**',
416 'uSNCreated': '**',
417 'operatorCount': '0',
418 'uSNChanged': '**',
419 'instanceType': '4',
420 'adminCount': '0',
421 'cn': groupname }
423 def test_plain_group(self):
424 print "Test restored Group attributes"
425 # create test group
426 obj = self._create_test_group("r_group")
427 guid = obj["objectGUID"][0]
428 # delete the group
429 self.samdb.delete(str(obj.dn))
430 obj_del = self.search_guid(guid)
431 # restore the Group and fetch what's restored
432 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
433 obj_restore = self.search_guid(guid)
434 # check original attributes and restored one are same
435 attr_orig = set(obj.keys())
436 # windows restore more attributes that originally we have
437 attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
438 attr_rest = set(obj_restore.keys())
439 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
440 self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
442 def test_group_with_members(self):
443 print "Test restored Group with members attributes"
444 # create test group
445 usr1 = self._create_test_user("r_user_1")
446 usr2 = self._create_test_user("r_user_2")
447 obj = self._create_test_group("r_group", [usr1.dn, usr2.dn])
448 guid = obj["objectGUID"][0]
449 # delete the group
450 self.samdb.delete(str(obj.dn))
451 obj_del = self.search_guid(guid)
452 # restore the Group and fetch what's restored
453 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
454 obj_restore = self.search_guid(guid)
455 # check original attributes and restored one are same
456 attr_orig = set(obj.keys())
457 # windows restore more attributes that originally we have
458 attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
459 # and does not restore following attributes
460 attr_orig.remove("member")
461 attr_rest = set(obj_restore.keys())
462 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
463 self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
466 class RestoreContainerObjectTestCase(RestoredObjectAttributesBaseTestCase):
467 """Test different scenarios for delete/reanimate OU/container objects"""
469 def _expected_container_attributes(self, rdn, name, dn, category):
470 if rdn == 'ou':
471 lastKnownParent = '%s' % self.base_dn
472 else:
473 lastKnownParent = 'CN=Users,%s' % self.base_dn
474 return {'dn': dn,
475 'distinguishedName': dn,
476 'name': name,
477 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn),
478 'objectClass': '**',
479 'objectGUID': '**',
480 'lastKnownParent': lastKnownParent,
481 'whenChanged': '**',
482 'whenCreated': '**',
483 'uSNCreated': '**',
484 'uSNChanged': '**',
485 'instanceType': '4',
486 rdn: name }
488 def _create_test_ou(self, rdn, name=None, description=None):
489 ou_dn = "OU=%s,%s" % (rdn, self.base_dn)
490 # delete an object if leftover from previous test
491 samba.tests.delete_force(self.samdb, ou_dn)
492 # create ou and return created object
493 self.samdb.create_ou(ou_dn, name=name, description=description)
494 return self.search_dn(ou_dn)
496 def test_ou_with_name_description(self):
497 print "Test OU reanimation"
498 # create OU to test with
499 obj = self._create_test_ou(rdn="r_ou",
500 name="r_ou name",
501 description="r_ou description")
502 guid = obj["objectGUID"][0]
503 # delete the object
504 self.samdb.delete(str(obj.dn))
505 obj_del = self.search_guid(guid)
506 # restore the Object and fetch what's restored
507 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
508 obj_restore = self.search_guid(guid)
509 # check original attributes and restored one are same
510 attr_orig = set(obj.keys())
511 attr_rest = set(obj_restore.keys())
512 # windows restore more attributes that originally we have
513 attr_orig.update(["lastKnownParent"])
514 # and does not restore following attributes
515 attr_orig -= set(["description"])
516 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
517 expected_attrs = self._expected_container_attributes("ou", "r_ou", str(obj.dn), "Organizational-Unit")
518 self.assertAttributesExists(expected_attrs, obj_restore)
520 def test_container(self):
521 print "Test Container reanimation"
522 # create test Container
523 obj = self._create_object({
524 "dn": "CN=r_container,CN=Users,%s" % self.base_dn,
525 "objectClass": "container"
527 guid = obj["objectGUID"][0]
528 # delete the object
529 self.samdb.delete(str(obj.dn))
530 obj_del = self.search_guid(guid)
531 # restore the Object and fetch what's restored
532 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
533 obj_restore = self.search_guid(guid)
534 # check original attributes and restored one are same
535 attr_orig = set(obj.keys())
536 attr_rest = set(obj_restore.keys())
537 # windows restore more attributes that originally we have
538 attr_orig.update(["lastKnownParent"])
539 # and does not restore following attributes
540 attr_orig -= set(["showInAdvancedViewOnly"])
541 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
542 expected_attrs = self._expected_container_attributes("cn", "r_container",
543 str(obj.dn), "container")
544 self.assertAttributesExists(expected_attrs, obj_restore)
547 if __name__ == '__main__':
548 unittest.main()