s4-dsdb-test: Initial implementation for Tombstone restore test suite
[Samba.git] / source4 / dsdb / tests / python / tombstone_reanimation.py
blob21df42d291466c74db805cea16259e4bb93281ee
1 #!/usr/bin/env python
3 # Tombstone reanimation tests
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2014
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 import sys
21 import optparse
22 import unittest
24 sys.path.insert(0, "bin/python")
25 import samba
27 import samba.tests
28 import samba.getopt as options
29 from ldb import (SCOPE_BASE, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message, MessageElement)
32 class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase):
33 """ verify Samba restores required attributes when
34 user restores a Deleted object
35 """
37 def setUp(self):
38 super(RestoredObjectAttributesBaseTestCase, self).setUp()
39 # load LoadParm
40 lp = options.SambaOptions(optparse.OptionParser()).get_loadparm()
41 self.samdb = samba.tests.connect_samdb_env("TEST_SERVER", "TEST_USERNAME", "TEST_PASSWORD", lp=lp)
42 self.base_dn = self.samdb.domain_dn()
43 self.schema_dn = self.samdb.get_schema_basedn().get_linearized()
44 # Get the old "dSHeuristics" if it was set
45 self.dsheuristics = self.samdb.get_dsheuristics()
46 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
47 self.samdb.set_dsheuristics("000000001")
48 # Get the old "minPwdAge"
49 self.minPwdAge = self.samdb.get_minPwdAge()
50 # Set it temporary to "0"
51 self.samdb.set_minPwdAge("0")
53 def tearDown(self):
54 super(RestoredObjectAttributesBaseTestCase, self).tearDown()
55 # Reset the "dSHeuristics" as they were before
56 self.samdb.set_dsheuristics(self.dsheuristics)
57 # Reset the "minPwdAge" as it was before
58 self.samdb.set_minPwdAge(self.minPwdAge)
60 def GUID_string(self, guid):
61 return self.samdb.schema_format_value("objectGUID", guid)
63 def search_guid(self, guid):
64 res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid),
65 scope=SCOPE_BASE, controls=["show_deleted:1"])
66 self.assertEquals(len(res), 1)
67 return res[0]
69 def search_dn(self, dn):
70 res = self.samdb.search(expression="(objectClass=*)",
71 base=dn,
72 scope=SCOPE_BASE,
73 controls=["show_recycled:1"])
74 self.assertEquals(len(res), 1)
75 return res[0]
77 def _create_object(self, msg):
78 """:param msg: dict with dn and attributes to create an object from"""
79 # delete an object if leftover from previous test
80 samba.tests.delete_force(self.samdb, msg['dn'])
81 self.samdb.add(msg)
82 return self.search_dn(msg['dn'])
84 def assertAttributesEqual(self, obj_orig, attrs_orig, obj_restored, attrs_rest):
85 self.assertSetEqual(attrs_orig, attrs_rest)
86 # remove volatile attributes, they can't be equal
87 attrs_orig -= set(["uSNChanged", "dSCorePropagationData", "whenChanged"])
88 for attr in attrs_orig:
89 # convert original attr value to ldif
90 orig_val = obj_orig.get(attr)
91 if orig_val is None:
92 continue
93 if not isinstance(orig_val, MessageElement):
94 orig_val = MessageElement(str(orig_val), 0, attr )
95 m = Message()
96 m.add(orig_val)
97 orig_ldif = self.samdb.write_ldif(m, 0)
98 # convert restored attr value to ldif
99 rest_val = obj_restored.get(attr)
100 self.assertIsNotNone(rest_val)
101 m = Message()
102 if not isinstance(rest_val, MessageElement):
103 rest_val = MessageElement(str(rest_val), 0, attr)
104 m.add(rest_val)
105 rest_ldif = self.samdb.write_ldif(m, 0)
106 # compare generated ldif's
107 self.assertEqual(orig_ldif.lower(), rest_ldif.lower())
109 @staticmethod
110 def restore_deleted_object(samdb, del_dn, new_dn):
111 """Restores a deleted object
112 :param samdb: SamDB connection to SAM
113 :param del_dn: str Deleted object DN
114 :param new_dn: str Where to restore the object
116 msg = Message()
117 msg.dn = Dn(samdb, str(del_dn))
118 msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
119 msg["distinguishedName"] = MessageElement([str(new_dn)], FLAG_MOD_REPLACE, "distinguishedName")
120 samdb.modify(msg, ["show_deleted:1"])
123 class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
124 """Test cases for delete/reanimate user objects"""
126 def test_restore_user(self):
127 print "Test restored user attributes"
128 username = "restore_user"
129 usr_dn = "cn=%s,cn=users,%s" % (username, self.base_dn)
130 samba.tests.delete_force(self.samdb, usr_dn)
131 self.samdb.add({
132 "dn": usr_dn,
133 "objectClass": "user",
134 "sAMAccountName": username})
135 obj = self.search_dn(usr_dn)
136 guid = obj["objectGUID"][0]
137 self.samdb.delete(usr_dn)
138 obj_del = self.search_guid(guid)
139 # restore the user and fetch what's restored
140 self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn)
141 obj_restore = self.search_guid(guid)
142 # check original attributes and restored one are same
143 orig_attrs = set(obj.keys())
144 # windows restore more attributes that originally we have
145 orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
146 rest_attrs = set(obj_restore.keys())
147 self.assertSetEqual(orig_attrs, rest_attrs)
150 class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase):
151 """Test different scenarios for delete/reanimate group objects"""
153 def _make_object_dn(self, name):
154 return "cn=%s,cn=users,%s" % (name, self.base_dn)
156 def _create_test_user(self, user_name):
157 user_dn = self._make_object_dn(user_name)
158 ldif = {
159 "dn": user_dn,
160 "objectClass": "user",
161 "sAMAccountName": user_name,
163 # delete an object if leftover from previous test
164 samba.tests.delete_force(self.samdb, user_dn)
165 # finally, create the group
166 self.samdb.add(ldif)
167 return self.search_dn(user_dn)
169 def _create_test_group(self, group_name, members=None):
170 group_dn = self._make_object_dn(group_name)
171 ldif = {
172 "dn": group_dn,
173 "objectClass": "group",
174 "sAMAccountName": group_name,
176 try:
177 ldif["member"] = [str(usr_dn) for usr_dn in members]
178 except TypeError:
179 pass
180 # delete an object if leftover from previous test
181 samba.tests.delete_force(self.samdb, group_dn)
182 # finally, create the group
183 self.samdb.add(ldif)
184 return self.search_dn(group_dn)
186 def test_plain_group(self):
187 print "Test restored Group attributes"
188 # create test group
189 obj = self._create_test_group("r_group")
190 guid = obj["objectGUID"][0]
191 # delete the group
192 self.samdb.delete(str(obj.dn))
193 obj_del = self.search_guid(guid)
194 # restore the Group and fetch what's restored
195 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
196 obj_restore = self.search_guid(guid)
197 # check original attributes and restored one are same
198 attr_orig = set(obj.keys())
199 # windows restore more attributes that originally we have
200 attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
201 attr_rest = set(obj_restore.keys())
202 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
204 def test_group_with_members(self):
205 print "Test restored Group with members attributes"
206 # create test group
207 usr1 = self._create_test_user("r_user_1")
208 usr2 = self._create_test_user("r_user_2")
209 obj = self._create_test_group("r_group", [usr1.dn, usr2.dn])
210 guid = obj["objectGUID"][0]
211 # delete the group
212 self.samdb.delete(str(obj.dn))
213 obj_del = self.search_guid(guid)
214 # restore the Group and fetch what's restored
215 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
216 obj_restore = self.search_guid(guid)
217 # check original attributes and restored one are same
218 attr_orig = set(obj.keys())
219 # windows restore more attributes that originally we have
220 attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
221 # and does not restore following attributes
222 attr_orig.remove("member")
223 attr_rest = set(obj_restore.keys())
224 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
227 class RestoreContainerObjectTestCase(RestoredObjectAttributesBaseTestCase):
228 """Test different scenarios for delete/reanimate OU/container objects"""
230 def _create_test_ou(self, rdn, name=None, description=None):
231 ou_dn = "OU=%s,%s" % (rdn, self.base_dn)
232 # delete an object if leftover from previous test
233 samba.tests.delete_force(self.samdb, ou_dn)
234 # create ou and return created object
235 self.samdb.create_ou(ou_dn, name=name, description=description)
236 return self.search_dn(ou_dn)
238 def test_ou_with_name_description(self):
239 print "Test OU reanimation"
240 # create OU to test with
241 obj = self._create_test_ou(rdn="r_ou",
242 name="r_ou name",
243 description="r_ou description")
244 guid = obj["objectGUID"][0]
245 # delete the object
246 self.samdb.delete(str(obj.dn))
247 obj_del = self.search_guid(guid)
248 # restore the Object and fetch what's restored
249 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
250 obj_restore = self.search_guid(guid)
251 # check original attributes and restored one are same
252 attr_orig = set(obj.keys())
253 attr_rest = set(obj_restore.keys())
254 # windows restore more attributes that originally we have
255 attr_orig.update(["lastKnownParent"])
256 # and does not restore following attributes
257 attr_orig -= {"description"}
258 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
260 def test_container(self):
261 print "Test Container reanimation"
262 # create test Container
263 obj = self._create_object({
264 "dn": "CN=r_container,CN=Users,%s" % self.base_dn,
265 "objectClass": "container"
267 guid = obj["objectGUID"][0]
268 # delete the object
269 self.samdb.delete(str(obj.dn))
270 obj_del = self.search_guid(guid)
271 # restore the Object and fetch what's restored
272 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
273 obj_restore = self.search_guid(guid)
274 # check original attributes and restored one are same
275 attr_orig = set(obj.keys())
276 attr_rest = set(obj_restore.keys())
277 # windows restore more attributes that originally we have
278 attr_orig.update(["lastKnownParent"])
279 # and does not restore following attributes
280 attr_orig -= {"showInAdvancedViewOnly"}
281 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
284 if __name__ == '__main__':
285 unittest.main()