s4:dsdb/tests: improve the RestoreUserObjectTestCase test
[Samba.git] / source4 / dsdb / tests / python / tombstone_reanimation.py
blobcabde6ec357e9b25061f0d1648cede0b95372ecf
1 #!/usr/bin/env python
3 # Tombstone reanimation tests
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2014
6 # Copyright (C) Nadezhda Ivanova <nivanova@symas.com> 2014
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 import sys
22 import unittest
24 sys.path.insert(0, "bin/python")
25 import samba
27 from samba.ndr import ndr_unpack, ndr_print
28 from samba.dcerpc import misc
29 from samba.dcerpc import security
30 from samba.dcerpc import drsblobs
31 from samba.dcerpc.drsuapi import *
33 import samba.tests
34 from ldb import (SCOPE_BASE, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message,
35 MessageElement, LdbError,
36 ERR_ATTRIBUTE_OR_VALUE_EXISTS, ERR_NO_SUCH_OBJECT, ERR_ENTRY_ALREADY_EXISTS,
37 ERR_OPERATIONS_ERROR, ERR_UNWILLING_TO_PERFORM)
40 class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase):
41 """ verify Samba restores required attributes when
42 user restores a Deleted object
43 """
45 def setUp(self):
46 super(RestoredObjectAttributesBaseTestCase, self).setUp()
47 self.samdb = samba.tests.connect_samdb_env("TEST_SERVER", "TEST_USERNAME", "TEST_PASSWORD")
48 self.base_dn = self.samdb.domain_dn()
49 self.schema_dn = self.samdb.get_schema_basedn().get_linearized()
50 self.configuration_dn = self.samdb.get_config_basedn().get_linearized()
51 # Get the old "dSHeuristics" if it was set
52 self.dsheuristics = self.samdb.get_dsheuristics()
53 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
54 self.samdb.set_dsheuristics("000000001")
55 # Get the old "minPwdAge"
56 self.minPwdAge = self.samdb.get_minPwdAge()
57 # Set it temporary to "0"
58 self.samdb.set_minPwdAge("0")
60 def tearDown(self):
61 super(RestoredObjectAttributesBaseTestCase, self).tearDown()
62 # Reset the "dSHeuristics" as they were before
63 self.samdb.set_dsheuristics(self.dsheuristics)
64 # Reset the "minPwdAge" as it was before
65 self.samdb.set_minPwdAge(self.minPwdAge)
67 def GUID_string(self, guid):
68 return self.samdb.schema_format_value("objectGUID", guid)
70 def search_guid(self, guid, attrs=["*"]):
71 res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid),
72 scope=SCOPE_BASE, attrs=attrs,
73 controls=["show_deleted:1"])
74 self.assertEquals(len(res), 1)
75 return res[0]
77 def search_dn(self, dn):
78 res = self.samdb.search(expression="(objectClass=*)",
79 base=dn,
80 scope=SCOPE_BASE,
81 controls=["show_recycled:1"])
82 self.assertEquals(len(res), 1)
83 return res[0]
85 def _create_object(self, msg):
86 """:param msg: dict with dn and attributes to create an object from"""
87 # delete an object if leftover from previous test
88 samba.tests.delete_force(self.samdb, msg['dn'])
89 self.samdb.add(msg)
90 return self.search_dn(msg['dn'])
92 def assertNamesEqual(self, attrs_expected, attrs_extra):
93 self.assertEqual(attrs_expected, attrs_extra,
94 "Actual object does not have expected attributes, missing from expected (%s), extra (%s)"
95 % (str(attrs_expected.difference(attrs_extra)), str(attrs_extra.difference(attrs_expected))))
97 def assertAttributesEqual(self, obj_orig, attrs_orig, obj_restored, attrs_rest):
98 self.assertNamesEqual(attrs_orig, attrs_rest)
99 # remove volatile attributes, they can't be equal
100 attrs_orig -= set(["uSNChanged", "dSCorePropagationData", "whenChanged"])
101 for attr in attrs_orig:
102 # convert original attr value to ldif
103 orig_val = obj_orig.get(attr)
104 if orig_val is None:
105 continue
106 if not isinstance(orig_val, MessageElement):
107 orig_val = MessageElement(str(orig_val), 0, attr )
108 m = Message()
109 m.add(orig_val)
110 orig_ldif = self.samdb.write_ldif(m, 0)
111 # convert restored attr value to ldif
112 rest_val = obj_restored.get(attr)
113 self.assertFalse(rest_val is None)
114 m = Message()
115 if not isinstance(rest_val, MessageElement):
116 rest_val = MessageElement(str(rest_val), 0, attr)
117 m.add(rest_val)
118 rest_ldif = self.samdb.write_ldif(m, 0)
119 # compare generated ldif's
120 self.assertEqual(orig_ldif, rest_ldif)
122 def assertAttributesExists(self, attr_expected, obj_msg):
123 """Check object contains at least expected attrbigutes
124 :param attr_expected: dict of expected attributes with values. ** is any value
125 :param obj_msg: Ldb.Message for the object under test
127 actual_names = set(obj_msg.keys())
128 # Samba does not use 'dSCorePropagationData', so skip it
129 actual_names -= set(['dSCorePropagationData'])
130 expected_names = set(attr_expected.keys())
131 self.assertNamesEqual(expected_names, actual_names)
132 for name in attr_expected.keys():
133 expected_val = attr_expected[name]
134 actual_val = obj_msg.get(name)
135 self.assertFalse(actual_val is None, "No value for attribute '%s'" % name)
136 if expected_val == "**":
137 # "**" values means "any"
138 continue
139 self.assertEqual(expected_val, str(actual_val),
140 "Unexpected value (%s) for '%s', expected (%s)" % (
141 str(actual_val), name, expected_val))
143 def _check_metadata(self, metadata, expected):
144 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(metadata[0]))
146 repl_array = []
147 for o in repl.ctr.array:
148 repl_array.append((o.attid, o.version))
149 repl_set = set(repl_array)
151 expected_set = set(expected)
152 self.assertEqual(len(repl_set), len(expected),
153 "Unexpected metadata, missing from expected (%s), extra (%s)), repl: \n%s" % (
154 str(expected_set.difference(repl_set)),
155 str(repl_set.difference(expected_set)),
156 ndr_print(repl)))
158 i = 0
159 for o in repl.ctr.array:
160 e = expected[i]
161 (attid, version) = e
162 self.assertEquals(attid, o.attid,
163 "(LDAP) Wrong attid "
164 "for expected value %d, wanted 0x%08x got 0x%08x, "
165 "repl: \n%s"
166 % (i, attid, o.attid, ndr_print(repl)))
167 # Allow version to be skipped when it does not matter
168 if version is not None:
169 self.assertEquals(o.version, version,
170 "(LDAP) Wrong version for expected value %d, "
171 "attid 0x%08x, "
172 "wanted %d got %d, repl: \n%s"
173 % (i, o.attid,
174 version, o.version, ndr_print(repl)))
175 i = i + 1
177 @staticmethod
178 def restore_deleted_object(samdb, del_dn, new_dn, new_attrs=None):
179 """Restores a deleted object
180 :param samdb: SamDB connection to SAM
181 :param del_dn: str Deleted object DN
182 :param new_dn: str Where to restore the object
183 :param new_attrs: dict Additional attributes to set
185 msg = Message()
186 msg.dn = Dn(samdb, str(del_dn))
187 msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
188 msg["distinguishedName"] = MessageElement([str(new_dn)], FLAG_MOD_REPLACE, "distinguishedName")
189 if new_attrs is not None:
190 assert isinstance(new_attrs, dict)
191 for attr in new_attrs:
192 msg[attr] = MessageElement(new_attrs[attr], FLAG_MOD_REPLACE, attr)
193 samdb.modify(msg, ["show_deleted:1"])
196 class BaseRestoreObjectTestCase(RestoredObjectAttributesBaseTestCase):
197 def setUp(self):
198 super(BaseRestoreObjectTestCase, self).setUp()
200 def enable_recycle_bin(self):
201 msg = Message()
202 msg.dn = Dn(self.samdb, "")
203 msg["enableOptionalFeature"] = MessageElement(
204 "CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
205 FLAG_MOD_ADD, "enableOptionalFeature")
206 try:
207 self.samdb.modify(msg)
208 except LdbError, (num, _):
209 self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
211 def test_undelete(self):
212 print "Testing standard undelete operation"
213 usr1 = "cn=testuser,cn=users," + self.base_dn
214 samba.tests.delete_force(self.samdb, usr1)
215 self.samdb.add({
216 "dn": usr1,
217 "objectclass": "user",
218 "description": "test user description",
219 "samaccountname": "testuser"})
220 objLive1 = self.search_dn(usr1)
221 guid1 = objLive1["objectGUID"][0]
222 self.samdb.delete(usr1)
223 objDeleted1 = self.search_guid(guid1)
224 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
225 objLive2 = self.search_dn(usr1)
226 self.assertEqual(str(objLive2.dn).lower(), str(objLive1.dn).lower())
227 samba.tests.delete_force(self.samdb, usr1)
229 def test_rename(self):
230 print "Testing attempt to rename deleted object"
231 usr1 = "cn=testuser,cn=users," + self.base_dn
232 self.samdb.add({
233 "dn": usr1,
234 "objectclass": "user",
235 "description": "test user description",
236 "samaccountname": "testuser"})
237 objLive1 = self.search_dn(usr1)
238 guid1 = objLive1["objectGUID"][0]
239 self.samdb.delete(usr1)
240 objDeleted1 = self.search_guid(guid1)
241 # just to make sure we get the correct error if the show deleted is missing
242 try:
243 self.samdb.rename(str(objDeleted1.dn), usr1)
244 self.fail()
245 except LdbError, (num, _):
246 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
248 try:
249 self.samdb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"])
250 self.fail()
251 except LdbError, (num, _):
252 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
254 def test_undelete_with_mod(self):
255 print "Testing standard undelete operation with modification of additional attributes"
256 usr1 = "cn=testuser,cn=users," + self.base_dn
257 self.samdb.add({
258 "dn": usr1,
259 "objectclass": "user",
260 "description": "test user description",
261 "samaccountname": "testuser"})
262 objLive1 = self.search_dn(usr1)
263 guid1 = objLive1["objectGUID"][0]
264 self.samdb.delete(usr1)
265 objDeleted1 = self.search_guid(guid1)
266 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1, {"url": "www.samba.org"})
267 objLive2 = self.search_dn(usr1)
268 self.assertEqual(objLive2["url"][0], "www.samba.org")
269 samba.tests.delete_force(self.samdb, usr1)
271 def test_undelete_newuser(self):
272 print "Testing undelete user with a different dn"
273 usr1 = "cn=testuser,cn=users," + self.base_dn
274 usr2 = "cn=testuser2,cn=users," + self.base_dn
275 samba.tests.delete_force(self.samdb, usr1)
276 self.samdb.add({
277 "dn": usr1,
278 "objectclass": "user",
279 "description": "test user description",
280 "samaccountname": "testuser"})
281 objLive1 = self.search_dn(usr1)
282 guid1 = objLive1["objectGUID"][0]
283 self.samdb.delete(usr1)
284 objDeleted1 = self.search_guid(guid1)
285 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr2)
286 objLive2 = self.search_dn(usr2)
287 samba.tests.delete_force(self.samdb, usr1)
288 samba.tests.delete_force(self.samdb, usr2)
290 def test_undelete_existing(self):
291 print "Testing undelete user after a user with the same dn has been created"
292 usr1 = "cn=testuser,cn=users," + self.base_dn
293 self.samdb.add({
294 "dn": usr1,
295 "objectclass": "user",
296 "description": "test user description",
297 "samaccountname": "testuser"})
298 objLive1 = self.search_dn(usr1)
299 guid1 = objLive1["objectGUID"][0]
300 self.samdb.delete(usr1)
301 self.samdb.add({
302 "dn": usr1,
303 "objectclass": "user",
304 "description": "test user description",
305 "samaccountname": "testuser"})
306 objDeleted1 = self.search_guid(guid1)
307 try:
308 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
309 self.fail()
310 except LdbError, (num, _):
311 self.assertEquals(num, ERR_ENTRY_ALREADY_EXISTS)
313 def test_undelete_cross_nc(self):
314 print "Cross NC undelete"
315 c1 = "cn=ldaptestcontainer," + self.base_dn
316 c2 = "cn=ldaptestcontainer2," + self.configuration_dn
317 c3 = "cn=ldaptestcontainer," + self.configuration_dn
318 c4 = "cn=ldaptestcontainer2," + self.base_dn
319 samba.tests.delete_force(self.samdb, c1)
320 samba.tests.delete_force(self.samdb, c2)
321 samba.tests.delete_force(self.samdb, c3)
322 samba.tests.delete_force(self.samdb, c4)
323 self.samdb.add({
324 "dn": c1,
325 "objectclass": "container"})
326 self.samdb.add({
327 "dn": c2,
328 "objectclass": "container"})
329 objLive1 = self.search_dn(c1)
330 objLive2 = self.search_dn(c2)
331 guid1 = objLive1["objectGUID"][0]
332 guid2 = objLive2["objectGUID"][0]
333 self.samdb.delete(c1)
334 self.samdb.delete(c2)
335 objDeleted1 = self.search_guid(guid1)
336 objDeleted2 = self.search_guid(guid2)
337 # try to undelete from base dn to config
338 try:
339 self.restore_deleted_object(self.samdb, objDeleted1.dn, c3)
340 self.fail()
341 except LdbError, (num, _):
342 self.assertEquals(num, ERR_OPERATIONS_ERROR)
343 #try to undelete from config to base dn
344 try:
345 self.restore_deleted_object(self.samdb, objDeleted2.dn, c4)
346 self.fail()
347 except LdbError, (num, _):
348 self.assertEquals(num, ERR_OPERATIONS_ERROR)
349 #assert undeletion will work in same nc
350 self.restore_deleted_object(self.samdb, objDeleted1.dn, c4)
351 self.restore_deleted_object(self.samdb, objDeleted2.dn, c3)
354 class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
355 """Test cases for delete/reanimate user objects"""
357 def _expected_user_add_attributes(self, username, user_dn, category):
358 return {'dn': user_dn,
359 'objectClass': '**',
360 'cn': username,
361 'distinguishedName': user_dn,
362 'instanceType': '4',
363 'whenCreated': '**',
364 'whenChanged': '**',
365 'uSNCreated': '**',
366 'uSNChanged': '**',
367 'name': username,
368 'objectGUID': '**',
369 'userAccountControl': '546',
370 'badPwdCount': '0',
371 'badPasswordTime': '0',
372 'codePage': '0',
373 'countryCode': '0',
374 'lastLogon': '0',
375 'lastLogoff': '0',
376 'pwdLastSet': '0',
377 'primaryGroupID': '513',
378 'objectSid': '**',
379 'accountExpires': '9223372036854775807',
380 'logonCount': '0',
381 'sAMAccountName': username,
382 'sAMAccountType': '805306368',
383 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
386 def _expected_user_add_metadata(self):
387 return [
388 (DRSUAPI_ATTID_objectClass, 1),
389 (DRSUAPI_ATTID_cn, 1),
390 (DRSUAPI_ATTID_instanceType, 1),
391 (DRSUAPI_ATTID_whenCreated, 1),
392 (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
393 (DRSUAPI_ATTID_name, 1),
394 (DRSUAPI_ATTID_userAccountControl, None),
395 (DRSUAPI_ATTID_codePage, 1),
396 (DRSUAPI_ATTID_countryCode, 1),
397 (DRSUAPI_ATTID_dBCSPwd, 1),
398 (DRSUAPI_ATTID_logonHours, 1),
399 (DRSUAPI_ATTID_unicodePwd, 1),
400 (DRSUAPI_ATTID_ntPwdHistory, 1),
401 (DRSUAPI_ATTID_pwdLastSet, 1),
402 (DRSUAPI_ATTID_primaryGroupID, 1),
403 (DRSUAPI_ATTID_objectSid, 1),
404 (DRSUAPI_ATTID_accountExpires, 1),
405 (DRSUAPI_ATTID_lmPwdHistory, 1),
406 (DRSUAPI_ATTID_sAMAccountName, 1),
407 (DRSUAPI_ATTID_sAMAccountType, 1),
408 (DRSUAPI_ATTID_objectCategory, 1)]
410 def _expected_user_del_attributes(self, username, _guid, _sid):
411 guid = ndr_unpack(misc.GUID, _guid)
412 dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn)
413 cn = "%s\nDEL:%s" % (username, guid)
414 return {'dn': dn,
415 'objectClass': '**',
416 'cn': cn,
417 'distinguishedName': dn,
418 'isDeleted': 'TRUE',
419 'isRecycled': 'TRUE',
420 'instanceType': '4',
421 'whenCreated': '**',
422 'whenChanged': '**',
423 'uSNCreated': '**',
424 'uSNChanged': '**',
425 'name': cn,
426 'objectGUID': _guid,
427 'userAccountControl': '546',
428 'objectSid': _sid,
429 'sAMAccountName': username,
430 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
433 def _expected_user_del_metadata(self):
434 return [
435 (DRSUAPI_ATTID_objectClass, 1),
436 (DRSUAPI_ATTID_cn, 2),
437 (DRSUAPI_ATTID_instanceType, 1),
438 (DRSUAPI_ATTID_whenCreated, 1),
439 (DRSUAPI_ATTID_isDeleted, 1),
440 (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
441 (DRSUAPI_ATTID_name, 2),
442 (DRSUAPI_ATTID_userAccountControl, None),
443 (DRSUAPI_ATTID_codePage, 2),
444 (DRSUAPI_ATTID_countryCode, 2),
445 (DRSUAPI_ATTID_dBCSPwd, 1),
446 (DRSUAPI_ATTID_logonHours, 1),
447 (DRSUAPI_ATTID_unicodePwd, 1),
448 (DRSUAPI_ATTID_ntPwdHistory, 1),
449 (DRSUAPI_ATTID_pwdLastSet, 2),
450 (DRSUAPI_ATTID_primaryGroupID, 2),
451 (DRSUAPI_ATTID_objectSid, 1),
452 (DRSUAPI_ATTID_accountExpires, 2),
453 (DRSUAPI_ATTID_lmPwdHistory, 1),
454 (DRSUAPI_ATTID_sAMAccountName, 1),
455 (DRSUAPI_ATTID_sAMAccountType, 2),
456 (DRSUAPI_ATTID_lastKnownParent, 1),
457 (DRSUAPI_ATTID_objectCategory, 2),
458 (DRSUAPI_ATTID_isRecycled, 1)]
460 def _expected_user_restore_attributes(self, username, guid, sid, user_dn, category):
461 return {'dn': user_dn,
462 'objectClass': '**',
463 'cn': username,
464 'distinguishedName': user_dn,
465 'instanceType': '4',
466 'whenCreated': '**',
467 'whenChanged': '**',
468 'uSNCreated': '**',
469 'uSNChanged': '**',
470 'name': username,
471 'objectGUID': guid,
472 'userAccountControl': '546',
473 'badPwdCount': '0',
474 'badPasswordTime': '0',
475 'codePage': '0',
476 'countryCode': '0',
477 'lastLogon': '0',
478 'lastLogoff': '0',
479 'pwdLastSet': '0',
480 'primaryGroupID': '513',
481 'operatorCount': '0',
482 'objectSid': sid,
483 'adminCount': '0',
484 'accountExpires': '0',
485 'logonCount': '0',
486 'sAMAccountName': username,
487 'sAMAccountType': '805306368',
488 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
489 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
492 def _expected_user_restore_metadata(self):
493 return [
494 (DRSUAPI_ATTID_objectClass, 1),
495 (DRSUAPI_ATTID_cn, 3),
496 (DRSUAPI_ATTID_instanceType, 1),
497 (DRSUAPI_ATTID_whenCreated, 1),
498 (DRSUAPI_ATTID_isDeleted, 2),
499 (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
500 (DRSUAPI_ATTID_name, 3),
501 (DRSUAPI_ATTID_userAccountControl, None),
502 (DRSUAPI_ATTID_codePage, 3),
503 (DRSUAPI_ATTID_countryCode, 3),
504 (DRSUAPI_ATTID_dBCSPwd, 1),
505 (DRSUAPI_ATTID_logonHours, 1),
506 (DRSUAPI_ATTID_unicodePwd, 1),
507 (DRSUAPI_ATTID_ntPwdHistory, 1),
508 (DRSUAPI_ATTID_pwdLastSet, 3),
509 (DRSUAPI_ATTID_primaryGroupID, 3),
510 (DRSUAPI_ATTID_operatorCount, 1),
511 (DRSUAPI_ATTID_objectSid, 1),
512 (DRSUAPI_ATTID_adminCount, 1),
513 (DRSUAPI_ATTID_accountExpires, 3),
514 (DRSUAPI_ATTID_lmPwdHistory, 1),
515 (DRSUAPI_ATTID_sAMAccountName, 1),
516 (DRSUAPI_ATTID_sAMAccountType, 3),
517 (DRSUAPI_ATTID_lastKnownParent, 1),
518 (DRSUAPI_ATTID_objectCategory, 3),
519 (DRSUAPI_ATTID_isRecycled, 2)]
521 def test_restore_user(self):
522 print "Test restored user attributes"
523 username = "restore_user"
524 usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn)
525 samba.tests.delete_force(self.samdb, usr_dn)
526 self.samdb.add({
527 "dn": usr_dn,
528 "objectClass": "user",
529 "sAMAccountName": username})
530 obj = self.search_dn(usr_dn)
531 guid = obj["objectGUID"][0]
532 sid = obj["objectSID"][0]
533 obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
534 self.assertAttributesExists(self._expected_user_add_attributes(username, usr_dn, "Person"), obj)
535 self._check_metadata(obj_rmd["replPropertyMetaData"],
536 self._expected_user_add_metadata())
537 self.samdb.delete(usr_dn)
538 obj_del = self.search_guid(guid)
539 obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
540 orig_attrs = set(obj.keys())
541 del_attrs = set(obj_del.keys())
542 self.assertAttributesExists(self._expected_user_del_attributes(username, guid, sid), obj_del)
543 self._check_metadata(obj_del_rmd["replPropertyMetaData"],
544 self._expected_user_del_metadata())
545 # restore the user and fetch what's restored
546 self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn)
547 obj_restore = self.search_guid(guid)
548 obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
549 # check original attributes and restored one are same
550 orig_attrs = set(obj.keys())
551 # windows restore more attributes that originally we have
552 orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
553 rest_attrs = set(obj_restore.keys())
554 self.assertAttributesExists(self._expected_user_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore)
555 self._check_metadata(obj_restore_rmd["replPropertyMetaData"],
556 self._expected_user_restore_metadata())
559 class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase):
560 """Test different scenarios for delete/reanimate group objects"""
562 def _make_object_dn(self, name):
563 return "CN=%s,CN=Users,%s" % (name, self.base_dn)
565 def _create_test_user(self, user_name):
566 user_dn = self._make_object_dn(user_name)
567 ldif = {
568 "dn": user_dn,
569 "objectClass": "user",
570 "sAMAccountName": user_name,
572 # delete an object if leftover from previous test
573 samba.tests.delete_force(self.samdb, user_dn)
574 # finally, create the group
575 self.samdb.add(ldif)
576 return self.search_dn(user_dn)
578 def _create_test_group(self, group_name, members=None):
579 group_dn = self._make_object_dn(group_name)
580 ldif = {
581 "dn": group_dn,
582 "objectClass": "group",
583 "sAMAccountName": group_name,
585 try:
586 ldif["member"] = [str(usr_dn) for usr_dn in members]
587 except TypeError:
588 pass
589 # delete an object if leftover from previous test
590 samba.tests.delete_force(self.samdb, group_dn)
591 # finally, create the group
592 self.samdb.add(ldif)
593 return self.search_dn(group_dn)
595 def _expected_group_attributes(self, groupname, group_dn, category):
596 return {'dn': group_dn,
597 'groupType': '-2147483646',
598 'distinguishedName': group_dn,
599 'sAMAccountName': groupname,
600 'name': groupname,
601 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn),
602 'objectClass': '**',
603 'objectGUID': '**',
604 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
605 'whenChanged': '**',
606 'sAMAccountType': '268435456',
607 'objectSid': '**',
608 'whenCreated': '**',
609 'uSNCreated': '**',
610 'operatorCount': '0',
611 'uSNChanged': '**',
612 'instanceType': '4',
613 'adminCount': '0',
614 'cn': groupname }
616 def test_plain_group(self):
617 print "Test restored Group attributes"
618 # create test group
619 obj = self._create_test_group("r_group")
620 guid = obj["objectGUID"][0]
621 # delete the group
622 self.samdb.delete(str(obj.dn))
623 obj_del = self.search_guid(guid)
624 # restore the Group and fetch what's restored
625 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
626 obj_restore = self.search_guid(guid)
627 # check original attributes and restored one are same
628 attr_orig = set(obj.keys())
629 # windows restore more attributes that originally we have
630 attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
631 attr_rest = set(obj_restore.keys())
632 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
633 self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
635 def test_group_with_members(self):
636 print "Test restored Group with members attributes"
637 # create test group
638 usr1 = self._create_test_user("r_user_1")
639 usr2 = self._create_test_user("r_user_2")
640 obj = self._create_test_group("r_group", [usr1.dn, usr2.dn])
641 guid = obj["objectGUID"][0]
642 # delete the group
643 self.samdb.delete(str(obj.dn))
644 obj_del = self.search_guid(guid)
645 # restore the Group and fetch what's restored
646 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
647 obj_restore = self.search_guid(guid)
648 # check original attributes and restored one are same
649 attr_orig = set(obj.keys())
650 # windows restore more attributes that originally we have
651 attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
652 # and does not restore following attributes
653 attr_orig.remove("member")
654 attr_rest = set(obj_restore.keys())
655 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
656 self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
659 class RestoreContainerObjectTestCase(RestoredObjectAttributesBaseTestCase):
660 """Test different scenarios for delete/reanimate OU/container objects"""
662 def _expected_container_attributes(self, rdn, name, dn, category):
663 if rdn == 'OU':
664 lastKnownParent = '%s' % self.base_dn
665 else:
666 lastKnownParent = 'CN=Users,%s' % self.base_dn
667 return {'dn': dn,
668 'distinguishedName': dn,
669 'name': name,
670 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn),
671 'objectClass': '**',
672 'objectGUID': '**',
673 'lastKnownParent': lastKnownParent,
674 'whenChanged': '**',
675 'whenCreated': '**',
676 'uSNCreated': '**',
677 'uSNChanged': '**',
678 'instanceType': '4',
679 rdn.lower(): name }
681 def _create_test_ou(self, rdn, name=None, description=None):
682 ou_dn = "OU=%s,%s" % (rdn, self.base_dn)
683 # delete an object if leftover from previous test
684 samba.tests.delete_force(self.samdb, ou_dn)
685 # create ou and return created object
686 self.samdb.create_ou(ou_dn, name=name, description=description)
687 return self.search_dn(ou_dn)
689 def test_ou_with_name_description(self):
690 print "Test OU reanimation"
691 # create OU to test with
692 obj = self._create_test_ou(rdn="r_ou",
693 name="r_ou name",
694 description="r_ou description")
695 guid = obj["objectGUID"][0]
696 # delete the object
697 self.samdb.delete(str(obj.dn))
698 obj_del = self.search_guid(guid)
699 # restore the Object and fetch what's restored
700 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
701 obj_restore = self.search_guid(guid)
702 # check original attributes and restored one are same
703 attr_orig = set(obj.keys())
704 attr_rest = set(obj_restore.keys())
705 # windows restore more attributes that originally we have
706 attr_orig.update(["lastKnownParent"])
707 # and does not restore following attributes
708 attr_orig -= set(["description"])
709 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
710 expected_attrs = self._expected_container_attributes("OU", "r_ou", str(obj.dn), "Organizational-Unit")
711 self.assertAttributesExists(expected_attrs, obj_restore)
713 def test_container(self):
714 print "Test Container reanimation"
715 # create test Container
716 obj = self._create_object({
717 "dn": "CN=r_container,CN=Users,%s" % self.base_dn,
718 "objectClass": "container"
720 guid = obj["objectGUID"][0]
721 # delete the object
722 self.samdb.delete(str(obj.dn))
723 obj_del = self.search_guid(guid)
724 # restore the Object and fetch what's restored
725 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
726 obj_restore = self.search_guid(guid)
727 # check original attributes and restored one are same
728 attr_orig = set(obj.keys())
729 attr_rest = set(obj_restore.keys())
730 # windows restore more attributes that originally we have
731 attr_orig.update(["lastKnownParent"])
732 # and does not restore following attributes
733 attr_orig -= set(["showInAdvancedViewOnly"])
734 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
735 expected_attrs = self._expected_container_attributes("CN", "r_container",
736 str(obj.dn), "Container")
737 self.assertAttributesExists(expected_attrs, obj_restore)
740 if __name__ == '__main__':
741 unittest.main()