3 # Tombstone reanimation tests
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2014
6 # Copyright (C) Nadezhda Ivanova <nivanova@symas.com> 2014
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 sys
.path
.insert(0, "bin/python")
28 from ldb
import (SCOPE_BASE
, FLAG_MOD_ADD
, FLAG_MOD_DELETE
, FLAG_MOD_REPLACE
, Dn
, Message
,
29 MessageElement
, LdbError
,
30 ERR_ATTRIBUTE_OR_VALUE_EXISTS
, ERR_NO_SUCH_OBJECT
, ERR_ENTRY_ALREADY_EXISTS
,
31 ERR_OPERATIONS_ERROR
, ERR_UNWILLING_TO_PERFORM
)
34 class RestoredObjectAttributesBaseTestCase(samba
.tests
.TestCase
):
35 """ verify Samba restores required attributes when
36 user restores a Deleted object
40 super(RestoredObjectAttributesBaseTestCase
, self
).setUp()
41 self
.samdb
= samba
.tests
.connect_samdb_env("TEST_SERVER", "TEST_USERNAME", "TEST_PASSWORD")
42 self
.base_dn
= self
.samdb
.domain_dn()
43 self
.schema_dn
= self
.samdb
.get_schema_basedn().get_linearized()
44 self
.configuration_dn
= self
.samdb
.get_config_basedn().get_linearized()
45 # Get the old "dSHeuristics" if it was set
46 self
.dsheuristics
= self
.samdb
.get_dsheuristics()
47 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
48 self
.samdb
.set_dsheuristics("000000001")
49 # Get the old "minPwdAge"
50 self
.minPwdAge
= self
.samdb
.get_minPwdAge()
51 # Set it temporary to "0"
52 self
.samdb
.set_minPwdAge("0")
55 super(RestoredObjectAttributesBaseTestCase
, self
).tearDown()
56 # Reset the "dSHeuristics" as they were before
57 self
.samdb
.set_dsheuristics(self
.dsheuristics
)
58 # Reset the "minPwdAge" as it was before
59 self
.samdb
.set_minPwdAge(self
.minPwdAge
)
61 def GUID_string(self
, guid
):
62 return self
.samdb
.schema_format_value("objectGUID", guid
)
64 def search_guid(self
, guid
):
65 res
= self
.samdb
.search(base
="<GUID=%s>" % self
.GUID_string(guid
),
66 scope
=SCOPE_BASE
, controls
=["show_deleted:1"])
67 self
.assertEquals(len(res
), 1)
70 def search_dn(self
, dn
):
71 res
= self
.samdb
.search(expression
="(objectClass=*)",
74 controls
=["show_recycled:1"])
75 self
.assertEquals(len(res
), 1)
78 def _create_object(self
, msg
):
79 """:param msg: dict with dn and attributes to create an object from"""
80 # delete an object if leftover from previous test
81 samba
.tests
.delete_force(self
.samdb
, msg
['dn'])
83 return self
.search_dn(msg
['dn'])
85 def assertAttributesEqual(self
, obj_orig
, attrs_orig
, obj_restored
, attrs_rest
):
86 self
.assertEqual(attrs_orig
, attrs_rest
, "Actual object does not has expected attributes")
87 # remove volatile attributes, they can't be equal
88 attrs_orig
-= set(["uSNChanged", "dSCorePropagationData", "whenChanged"])
89 for attr
in attrs_orig
:
90 # convert original attr value to ldif
91 orig_val
= obj_orig
.get(attr
)
94 if not isinstance(orig_val
, MessageElement
):
95 orig_val
= MessageElement(str(orig_val
), 0, attr
)
98 orig_ldif
= self
.samdb
.write_ldif(m
, 0)
99 # convert restored attr value to ldif
100 rest_val
= obj_restored
.get(attr
)
101 self
.assertFalse(rest_val
is None)
103 if not isinstance(rest_val
, MessageElement
):
104 rest_val
= MessageElement(str(rest_val
), 0, attr
)
106 rest_ldif
= self
.samdb
.write_ldif(m
, 0)
107 # compare generated ldif's
108 self
.assertEqual(orig_ldif
.lower(), rest_ldif
.lower())
110 def assertAttributesExists(self
, attr_expected
, obj_msg
):
111 """Check object contains at least expected attrbigutes
112 :param attr_expected: dict of expected attributes with values. ** is any value
113 :param obj_msg: Ldb.Message for the object under test
115 actual_names
= set(obj_msg
.keys())
116 # Samba does not use 'dSCorePropagationData', so skip it
117 actual_names
-= set(['dSCorePropagationData'])
118 self
.assertEqual(set(attr_expected
.keys()), actual_names
, "Actual object does not has expected attributes")
119 for name
in attr_expected
.keys():
120 expected_val
= attr_expected
[name
]
121 actual_val
= obj_msg
.get(name
)
122 self
.assertFalse(actual_val
is None, "No value for attribute '%s'" % name
)
123 if expected_val
== "**":
124 # "**" values means "any"
126 self
.assertEqual(expected_val
.lower(), str(actual_val
).lower(),
127 "Unexpected value for '%s'" % name
)
130 def restore_deleted_object(samdb
, del_dn
, new_dn
, new_attrs
=None):
131 """Restores a deleted object
132 :param samdb: SamDB connection to SAM
133 :param del_dn: str Deleted object DN
134 :param new_dn: str Where to restore the object
135 :param new_attrs: dict Additional attributes to set
138 msg
.dn
= Dn(samdb
, str(del_dn
))
139 msg
["isDeleted"] = MessageElement([], FLAG_MOD_DELETE
, "isDeleted")
140 msg
["distinguishedName"] = MessageElement([str(new_dn
)], FLAG_MOD_REPLACE
, "distinguishedName")
141 if new_attrs
is not None:
142 assert isinstance(new_attrs
, dict)
143 for attr
in new_attrs
:
144 msg
[attr
] = MessageElement(new_attrs
[attr
], FLAG_MOD_REPLACE
, attr
)
145 samdb
.modify(msg
, ["show_deleted:1"])
148 class BaseRestoreObjectTestCase(RestoredObjectAttributesBaseTestCase
):
150 super(BaseRestoreObjectTestCase
, self
).setUp()
152 def enable_recycle_bin(self
):
154 msg
.dn
= Dn(self
.samdb
, "")
155 msg
["enableOptionalFeature"] = MessageElement(
156 "CN=Partitions," + self
.configuration_dn
+ ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
157 FLAG_MOD_ADD
, "enableOptionalFeature")
159 self
.samdb
.modify(msg
)
160 except LdbError
, (num
, _
):
161 self
.assertEquals(num
, ERR_ATTRIBUTE_OR_VALUE_EXISTS
)
163 def test_undelete(self
):
164 print "Testing standard undelete operation"
165 usr1
= "cn=testuser,cn=users," + self
.base_dn
166 samba
.tests
.delete_force(self
.samdb
, usr1
)
169 "objectclass": "user",
170 "description": "test user description",
171 "samaccountname": "testuser"})
172 objLive1
= self
.search_dn(usr1
)
173 guid1
= objLive1
["objectGUID"][0]
174 self
.samdb
.delete(usr1
)
175 objDeleted1
= self
.search_guid(guid1
)
176 self
.restore_deleted_object(self
.samdb
, objDeleted1
.dn
, usr1
)
177 objLive2
= self
.search_dn(usr1
)
178 self
.assertEqual(str(objLive2
.dn
).lower(), str(objLive1
.dn
).lower())
179 samba
.tests
.delete_force(self
.samdb
, usr1
)
181 def test_rename(self
):
182 print "Testing attempt to rename deleted object"
183 usr1
= "cn=testuser,cn=users," + self
.base_dn
186 "objectclass": "user",
187 "description": "test user description",
188 "samaccountname": "testuser"})
189 objLive1
= self
.search_dn(usr1
)
190 guid1
= objLive1
["objectGUID"][0]
191 self
.samdb
.delete(usr1
)
192 objDeleted1
= self
.search_guid(guid1
)
193 # just to make sure we get the correct error if the show deleted is missing
195 self
.samdb
.rename(str(objDeleted1
.dn
), usr1
)
197 except LdbError
, (num
, _
):
198 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
201 self
.samdb
.rename(str(objDeleted1
.dn
), usr1
, ["show_deleted:1"])
203 except LdbError
, (num
, _
):
204 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
206 def test_undelete_with_mod(self
):
207 print "Testing standard undelete operation with modification of additional attributes"
208 usr1
= "cn=testuser,cn=users," + self
.base_dn
211 "objectclass": "user",
212 "description": "test user description",
213 "samaccountname": "testuser"})
214 objLive1
= self
.search_dn(usr1
)
215 guid1
= objLive1
["objectGUID"][0]
216 self
.samdb
.delete(usr1
)
217 objDeleted1
= self
.search_guid(guid1
)
218 self
.restore_deleted_object(self
.samdb
, objDeleted1
.dn
, usr1
, {"url": "www.samba.org"})
219 objLive2
= self
.search_dn(usr1
)
220 self
.assertEqual(objLive2
["url"][0], "www.samba.org")
221 samba
.tests
.delete_force(self
.samdb
, usr1
)
223 def test_undelete_newuser(self
):
224 print "Testing undelete user with a different dn"
225 usr1
= "cn=testuser,cn=users," + self
.base_dn
226 usr2
= "cn=testuser2,cn=users," + self
.base_dn
227 samba
.tests
.delete_force(self
.samdb
, usr1
)
230 "objectclass": "user",
231 "description": "test user description",
232 "samaccountname": "testuser"})
233 objLive1
= self
.search_dn(usr1
)
234 guid1
= objLive1
["objectGUID"][0]
235 self
.samdb
.delete(usr1
)
236 objDeleted1
= self
.search_guid(guid1
)
237 self
.restore_deleted_object(self
.samdb
, objDeleted1
.dn
, usr2
)
238 objLive2
= self
.search_dn(usr2
)
239 samba
.tests
.delete_force(self
.samdb
, usr1
)
240 samba
.tests
.delete_force(self
.samdb
, usr2
)
242 def test_undelete_existing(self
):
243 print "Testing undelete user after a user with the same dn has been created"
244 usr1
= "cn=testuser,cn=users," + self
.base_dn
247 "objectclass": "user",
248 "description": "test user description",
249 "samaccountname": "testuser"})
250 objLive1
= self
.search_dn(usr1
)
251 guid1
= objLive1
["objectGUID"][0]
252 self
.samdb
.delete(usr1
)
255 "objectclass": "user",
256 "description": "test user description",
257 "samaccountname": "testuser"})
258 objDeleted1
= self
.search_guid(guid1
)
260 self
.restore_deleted_object(self
.samdb
, objDeleted1
.dn
, usr1
)
262 except LdbError
, (num
, _
):
263 self
.assertEquals(num
, ERR_ENTRY_ALREADY_EXISTS
)
265 def test_undelete_cross_nc(self
):
266 print "Cross NC undelete"
267 c1
= "cn=ldaptestcontainer," + self
.base_dn
268 c2
= "cn=ldaptestcontainer2," + self
.configuration_dn
269 c3
= "cn=ldaptestcontainer," + self
.configuration_dn
270 c4
= "cn=ldaptestcontainer2," + self
.base_dn
271 samba
.tests
.delete_force(self
.samdb
, c1
)
272 samba
.tests
.delete_force(self
.samdb
, c2
)
273 samba
.tests
.delete_force(self
.samdb
, c3
)
274 samba
.tests
.delete_force(self
.samdb
, c4
)
277 "objectclass": "container"})
280 "objectclass": "container"})
281 objLive1
= self
.search_dn(c1
)
282 objLive2
= self
.search_dn(c2
)
283 guid1
= objLive1
["objectGUID"][0]
284 guid2
= objLive2
["objectGUID"][0]
285 self
.samdb
.delete(c1
)
286 self
.samdb
.delete(c2
)
287 objDeleted1
= self
.search_guid(guid1
)
288 objDeleted2
= self
.search_guid(guid2
)
289 # try to undelete from base dn to config
291 self
.restore_deleted_object(self
.samdb
, objDeleted1
.dn
, c3
)
293 except LdbError
, (num
, _
):
294 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
295 #try to undelete from config to base dn
297 self
.restore_deleted_object(self
.samdb
, objDeleted2
.dn
, c4
)
299 except LdbError
, (num
, _
):
300 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
301 #assert undeletion will work in same nc
302 self
.restore_deleted_object(self
.samdb
, objDeleted1
.dn
, c4
)
303 self
.restore_deleted_object(self
.samdb
, objDeleted2
.dn
, c3
)
306 class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase
):
307 """Test cases for delete/reanimate user objects"""
309 def _expected_user_attributes(self
, username
, user_dn
, category
):
310 return {'dn': user_dn
,
313 'distinguishedName': user_dn
,
321 'userAccountControl': '546',
323 'badPasswordTime': '0',
329 'primaryGroupID': '513',
330 'operatorCount': '0',
333 'accountExpires': '9223372036854775807',
335 'sAMAccountName': username
,
336 'sAMAccountType': '805306368',
337 'lastKnownParent': 'CN=Users,%s' % self
.base_dn
,
338 'objectCategory': 'CN=%s,%s' % (category
, self
.schema_dn
)
341 def test_restore_user(self
):
342 print "Test restored user attributes"
343 username
= "restore_user"
344 usr_dn
= "cn=%s,cn=users,%s" % (username
, self
.base_dn
)
345 samba
.tests
.delete_force(self
.samdb
, usr_dn
)
348 "objectClass": "user",
349 "sAMAccountName": username
})
350 obj
= self
.search_dn(usr_dn
)
351 guid
= obj
["objectGUID"][0]
352 self
.samdb
.delete(usr_dn
)
353 obj_del
= self
.search_guid(guid
)
354 # restore the user and fetch what's restored
355 self
.restore_deleted_object(self
.samdb
, obj_del
.dn
, usr_dn
)
356 obj_restore
= self
.search_guid(guid
)
357 # check original attributes and restored one are same
358 orig_attrs
= set(obj
.keys())
359 # windows restore more attributes that originally we have
360 orig_attrs
.update(['adminCount', 'operatorCount', 'lastKnownParent'])
361 rest_attrs
= set(obj_restore
.keys())
362 self
.assertEqual(orig_attrs
, rest_attrs
, "Actual object does not has expected attributes")
363 self
.assertAttributesExists(self
._expected
_user
_attributes
(username
, usr_dn
, "Person"), obj_restore
)
366 class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase
):
367 """Test different scenarios for delete/reanimate group objects"""
369 def _make_object_dn(self
, name
):
370 return "cn=%s,cn=users,%s" % (name
, self
.base_dn
)
372 def _create_test_user(self
, user_name
):
373 user_dn
= self
._make
_object
_dn
(user_name
)
376 "objectClass": "user",
377 "sAMAccountName": user_name
,
379 # delete an object if leftover from previous test
380 samba
.tests
.delete_force(self
.samdb
, user_dn
)
381 # finally, create the group
383 return self
.search_dn(user_dn
)
385 def _create_test_group(self
, group_name
, members
=None):
386 group_dn
= self
._make
_object
_dn
(group_name
)
389 "objectClass": "group",
390 "sAMAccountName": group_name
,
393 ldif
["member"] = [str(usr_dn
) for usr_dn
in members
]
396 # delete an object if leftover from previous test
397 samba
.tests
.delete_force(self
.samdb
, group_dn
)
398 # finally, create the group
400 return self
.search_dn(group_dn
)
402 def _expected_group_attributes(self
, groupname
, group_dn
, category
):
403 return {'dn': group_dn
,
404 'groupType': '-2147483646',
405 'distinguishedName': group_dn
,
406 'sAMAccountName': groupname
,
408 'objectCategory': 'CN=%s,%s' % (category
, self
.schema_dn
),
411 'lastKnownParent': 'CN=Users,%s' % self
.base_dn
,
413 'sAMAccountType': '268435456',
417 'operatorCount': '0',
423 def test_plain_group(self
):
424 print "Test restored Group attributes"
426 obj
= self
._create
_test
_group
("r_group")
427 guid
= obj
["objectGUID"][0]
429 self
.samdb
.delete(str(obj
.dn
))
430 obj_del
= self
.search_guid(guid
)
431 # restore the Group and fetch what's restored
432 self
.restore_deleted_object(self
.samdb
, obj_del
.dn
, obj
.dn
)
433 obj_restore
= self
.search_guid(guid
)
434 # check original attributes and restored one are same
435 attr_orig
= set(obj
.keys())
436 # windows restore more attributes that originally we have
437 attr_orig
.update(['adminCount', 'operatorCount', 'lastKnownParent'])
438 attr_rest
= set(obj_restore
.keys())
439 self
.assertAttributesEqual(obj
, attr_orig
, obj_restore
, attr_rest
)
440 self
.assertAttributesExists(self
._expected
_group
_attributes
("r_group", str(obj
.dn
), "Group"), obj_restore
)
442 def test_group_with_members(self
):
443 print "Test restored Group with members attributes"
445 usr1
= self
._create
_test
_user
("r_user_1")
446 usr2
= self
._create
_test
_user
("r_user_2")
447 obj
= self
._create
_test
_group
("r_group", [usr1
.dn
, usr2
.dn
])
448 guid
= obj
["objectGUID"][0]
450 self
.samdb
.delete(str(obj
.dn
))
451 obj_del
= self
.search_guid(guid
)
452 # restore the Group and fetch what's restored
453 self
.restore_deleted_object(self
.samdb
, obj_del
.dn
, obj
.dn
)
454 obj_restore
= self
.search_guid(guid
)
455 # check original attributes and restored one are same
456 attr_orig
= set(obj
.keys())
457 # windows restore more attributes that originally we have
458 attr_orig
.update(['adminCount', 'operatorCount', 'lastKnownParent'])
459 # and does not restore following attributes
460 attr_orig
.remove("member")
461 attr_rest
= set(obj_restore
.keys())
462 self
.assertAttributesEqual(obj
, attr_orig
, obj_restore
, attr_rest
)
463 self
.assertAttributesExists(self
._expected
_group
_attributes
("r_group", str(obj
.dn
), "Group"), obj_restore
)
466 class RestoreContainerObjectTestCase(RestoredObjectAttributesBaseTestCase
):
467 """Test different scenarios for delete/reanimate OU/container objects"""
469 def _expected_container_attributes(self
, rdn
, name
, dn
, category
):
471 lastKnownParent
= '%s' % self
.base_dn
473 lastKnownParent
= 'CN=Users,%s' % self
.base_dn
475 'distinguishedName': dn
,
477 'objectCategory': 'CN=%s,%s' % (category
, self
.schema_dn
),
480 'lastKnownParent': lastKnownParent
,
488 def _create_test_ou(self
, rdn
, name
=None, description
=None):
489 ou_dn
= "OU=%s,%s" % (rdn
, self
.base_dn
)
490 # delete an object if leftover from previous test
491 samba
.tests
.delete_force(self
.samdb
, ou_dn
)
492 # create ou and return created object
493 self
.samdb
.create_ou(ou_dn
, name
=name
, description
=description
)
494 return self
.search_dn(ou_dn
)
496 def test_ou_with_name_description(self
):
497 print "Test OU reanimation"
498 # create OU to test with
499 obj
= self
._create
_test
_ou
(rdn
="r_ou",
501 description
="r_ou description")
502 guid
= obj
["objectGUID"][0]
504 self
.samdb
.delete(str(obj
.dn
))
505 obj_del
= self
.search_guid(guid
)
506 # restore the Object and fetch what's restored
507 self
.restore_deleted_object(self
.samdb
, obj_del
.dn
, obj
.dn
)
508 obj_restore
= self
.search_guid(guid
)
509 # check original attributes and restored one are same
510 attr_orig
= set(obj
.keys())
511 attr_rest
= set(obj_restore
.keys())
512 # windows restore more attributes that originally we have
513 attr_orig
.update(["lastKnownParent"])
514 # and does not restore following attributes
515 attr_orig
-= set(["description"])
516 self
.assertAttributesEqual(obj
, attr_orig
, obj_restore
, attr_rest
)
517 expected_attrs
= self
._expected
_container
_attributes
("ou", "r_ou", str(obj
.dn
), "Organizational-Unit")
518 self
.assertAttributesExists(expected_attrs
, obj_restore
)
520 def test_container(self
):
521 print "Test Container reanimation"
522 # create test Container
523 obj
= self
._create
_object
({
524 "dn": "CN=r_container,CN=Users,%s" % self
.base_dn
,
525 "objectClass": "container"
527 guid
= obj
["objectGUID"][0]
529 self
.samdb
.delete(str(obj
.dn
))
530 obj_del
= self
.search_guid(guid
)
531 # restore the Object and fetch what's restored
532 self
.restore_deleted_object(self
.samdb
, obj_del
.dn
, obj
.dn
)
533 obj_restore
= self
.search_guid(guid
)
534 # check original attributes and restored one are same
535 attr_orig
= set(obj
.keys())
536 attr_rest
= set(obj_restore
.keys())
537 # windows restore more attributes that originally we have
538 attr_orig
.update(["lastKnownParent"])
539 # and does not restore following attributes
540 attr_orig
-= set(["showInAdvancedViewOnly"])
541 self
.assertAttributesEqual(obj
, attr_orig
, obj_restore
, attr_rest
)
542 expected_attrs
= self
._expected
_container
_attributes
("cn", "r_container",
543 str(obj
.dn
), "container")
544 self
.assertAttributesExists(expected_attrs
, obj_restore
)
547 if __name__
== '__main__':