s4-dsdb-tests: Make unique object names to test with in deletetest
[Samba.git] / source4 / dsdb / tests / python / deletetest.py
blob1d0848bea965413082507f2e732ccbbcc7eecc65
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
4 import optparse
5 import sys
6 import os
8 sys.path.insert(0, "bin/python")
9 import samba
11 from samba.tests.subunitrun import SubunitOptions, TestProgram
13 import samba.getopt as options
15 from samba.auth import system_session
16 from ldb import SCOPE_BASE, LdbError, Message, MessageElement, Dn, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE
17 from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF, ERR_ENTRY_ALREADY_EXISTS, ERR_ATTRIBUTE_OR_VALUE_EXISTS
18 from ldb import ERR_UNWILLING_TO_PERFORM, ERR_OPERATIONS_ERROR
19 from samba.samdb import SamDB
20 from samba.tests import delete_force
22 parser = optparse.OptionParser("deletetest.py [options] <host|file>")
23 sambaopts = options.SambaOptions(parser)
24 parser.add_option_group(sambaopts)
25 parser.add_option_group(options.VersionOptions(parser))
26 # use command line creds if available
27 credopts = options.CredentialsOptions(parser)
28 parser.add_option_group(credopts)
29 subunitopts = SubunitOptions(parser)
30 parser.add_option_group(subunitopts)
31 opts, args = parser.parse_args()
33 if len(args) < 1:
34 parser.print_usage()
35 sys.exit(1)
37 host = args[0]
39 lp = sambaopts.get_loadparm()
40 creds = credopts.get_credentials(lp)
42 class BaseDeleteTests(samba.tests.TestCase):
44 def GUID_string(self, guid):
45 return self.ldb.schema_format_value("objectGUID", guid)
47 def setUp(self):
48 super(BaseDeleteTests, self).setUp()
49 self.ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)
51 self.base_dn = self.ldb.domain_dn()
52 self.configuration_dn = self.ldb.get_config_basedn().get_linearized()
54 def search_guid(self, guid):
55 print "SEARCH by GUID %s" % self.GUID_string(guid)
57 res = self.ldb.search(base="<GUID=%s>" % self.GUID_string(guid),
58 scope=SCOPE_BASE, controls=["show_deleted:1"])
59 self.assertEquals(len(res), 1)
60 return res[0]
62 def search_dn(self,dn):
63 print "SEARCH by DN %s" % dn
65 res = self.ldb.search(expression="(objectClass=*)",
66 base=dn,
67 scope=SCOPE_BASE,
68 controls=["show_deleted:1"])
69 self.assertEquals(len(res), 1)
70 return res[0]
73 class BasicDeleteTests(BaseDeleteTests):
75 def setUp(self):
76 super(BasicDeleteTests, self).setUp()
78 def del_attr_values(self, delObj):
79 print "Checking attributes for %s" % delObj["dn"]
81 self.assertEquals(delObj["isDeleted"][0],"TRUE")
82 self.assertTrue(not("objectCategory" in delObj))
83 self.assertTrue(not("sAMAccountType" in delObj))
85 def preserved_attributes_list(self, liveObj, delObj):
86 print "Checking for preserved attributes list"
88 preserved_list = ["nTSecurityDescriptor", "attributeID", "attributeSyntax", "dNReferenceUpdate", "dNSHostName",
89 "flatName", "governsID", "groupType", "instanceType", "lDAPDisplayName", "legacyExchangeDN",
90 "isDeleted", "isRecycled", "lastKnownParent", "msDS-LastKnownRDN", "mS-DS-CreatorSID",
91 "mSMQOwnerID", "nCName", "objectClass", "distinguishedName", "objectGUID", "objectSid",
92 "oMSyntax", "proxiedObjectName", "name", "replPropertyMetaData", "sAMAccountName",
93 "securityIdentifier", "sIDHistory", "subClassOf", "systemFlags", "trustPartner", "trustDirection",
94 "trustType", "trustAttributes", "userAccountControl", "uSNChanged", "uSNCreated", "whenCreated"]
96 for a in liveObj:
97 if a in preserved_list:
98 self.assertTrue(a in delObj)
100 def check_rdn(self, liveObj, delObj, rdnName):
101 print "Checking for correct rDN"
102 rdn=liveObj[rdnName][0]
103 rdn2=delObj[rdnName][0]
104 name2=delObj[rdnName][0]
105 guid=liveObj["objectGUID"][0]
106 self.assertEquals(rdn2, rdn + "\nDEL:" + self.GUID_string(guid))
107 self.assertEquals(name2, rdn + "\nDEL:" + self.GUID_string(guid))
109 def delete_deleted(self, ldb, dn):
110 print "Testing the deletion of the already deleted dn %s" % dn
112 try:
113 ldb.delete(dn)
114 self.fail()
115 except LdbError, (num, _):
116 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
118 def test_delete_protection(self):
119 """Delete protection tests"""
121 print self.base_dn
123 delete_force(self.ldb, "cn=entry1,cn=ldaptestcontainer," + self.base_dn)
124 delete_force(self.ldb, "cn=entry2,cn=ldaptestcontainer," + self.base_dn)
125 delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)
127 self.ldb.add({
128 "dn": "cn=ldaptestcontainer," + self.base_dn,
129 "objectclass": "container"})
130 self.ldb.add({
131 "dn": "cn=entry1,cn=ldaptestcontainer," + self.base_dn,
132 "objectclass": "container"})
133 self.ldb.add({
134 "dn": "cn=entry2,cn=ldaptestcontainer," + self.base_dn,
135 "objectclass": "container"})
137 try:
138 self.ldb.delete("cn=ldaptestcontainer," + self.base_dn)
139 self.fail()
140 except LdbError, (num, _):
141 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
143 self.ldb.delete("cn=ldaptestcontainer," + self.base_dn, ["tree_delete:1"])
145 try:
146 res = self.ldb.search("cn=ldaptestcontainer," + self.base_dn,
147 scope=SCOPE_BASE, attrs=[])
148 self.fail()
149 except LdbError, (num, _):
150 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
151 try:
152 res = self.ldb.search("cn=entry1,cn=ldaptestcontainer," + self.base_dn,
153 scope=SCOPE_BASE, attrs=[])
154 self.fail()
155 except LdbError, (num, _):
156 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
157 try:
158 res = self.ldb.search("cn=entry2,cn=ldaptestcontainer," + self.base_dn,
159 scope=SCOPE_BASE, attrs=[])
160 self.fail()
161 except LdbError, (num, _):
162 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
164 delete_force(self.ldb, "cn=entry1,cn=ldaptestcontainer," + self.base_dn)
165 delete_force(self.ldb, "cn=entry2,cn=ldaptestcontainer," + self.base_dn)
166 delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)
168 # Performs some protected object delete testing
170 res = self.ldb.search(base="", expression="", scope=SCOPE_BASE,
171 attrs=["dsServiceName", "dNSHostName"])
172 self.assertEquals(len(res), 1)
174 # Delete failing since DC's nTDSDSA object is protected
175 try:
176 self.ldb.delete(res[0]["dsServiceName"][0])
177 self.fail()
178 except LdbError, (num, _):
179 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
181 res = self.ldb.search(self.base_dn, attrs=["rIDSetReferences"],
182 expression="(&(objectClass=computer)(dNSHostName=" + res[0]["dNSHostName"][0] + "))")
183 self.assertEquals(len(res), 1)
185 # Deletes failing since DC's rIDSet object is protected
186 try:
187 self.ldb.delete(res[0]["rIDSetReferences"][0])
188 self.fail()
189 except LdbError, (num, _):
190 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
191 try:
192 self.ldb.delete(res[0]["rIDSetReferences"][0], ["tree_delete:1"])
193 self.fail()
194 except LdbError, (num, _):
195 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
197 # Deletes failing since three main crossRef objects are protected
199 try:
200 self.ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn)
201 self.fail()
202 except LdbError, (num, _):
203 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
204 try:
205 self.ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn, ["tree_delete:1"])
206 self.fail()
207 except LdbError, (num, _):
208 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
210 try:
211 self.ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn)
212 self.fail()
213 except LdbError, (num, _):
214 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
215 try:
216 self.ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn, ["tree_delete:1"])
217 self.fail()
218 except LdbError, (num, _):
219 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
221 res = self.ldb.search("cn=Partitions," + self.configuration_dn, attrs=[],
222 expression="(nCName=%s)" % self.base_dn)
223 self.assertEquals(len(res), 1)
225 try:
226 self.ldb.delete(res[0].dn)
227 self.fail()
228 except LdbError, (num, _):
229 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
230 try:
231 self.ldb.delete(res[0].dn, ["tree_delete:1"])
232 self.fail()
233 except LdbError, (num, _):
234 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
236 # Delete failing since "SYSTEM_FLAG_DISALLOW_DELETE"
237 try:
238 self.ldb.delete("CN=Users," + self.base_dn)
239 self.fail()
240 except LdbError, (num, _):
241 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
243 # Tree-delete failing since "isCriticalSystemObject"
244 try:
245 self.ldb.delete("CN=Computers," + self.base_dn, ["tree_delete:1"])
246 self.fail()
247 except LdbError, (num, _):
248 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
250 def test_all(self):
251 """Basic delete tests"""
253 print self.base_dn
255 # user current time in ms to make unique objects
256 import time
257 marker = str(int(round(time.time()*1000)))
258 usr1_name = "u_" + marker
259 usr2_name = "u2_" + marker
260 grp_name = "g1_" + marker
261 site_name = "s1_" + marker
263 usr1 = "cn=%s,cn=users,%s" % (usr1_name, self.base_dn)
264 usr2 = "cn=%s,cn=users,%s" % (usr2_name, self.base_dn)
265 grp1 = "cn=%s,cn=users,%s" % (grp_name, self.base_dn)
266 sit1 = "cn=%s,cn=sites,%s" % (site_name, self.configuration_dn)
267 ss1 = "cn=NTDS Site Settings,cn=%s,cn=sites,%s" % (site_name, self.configuration_dn)
268 srv1 = "cn=Servers,cn=%s,cn=sites,%s" % (site_name, self.configuration_dn)
269 srv2 = "cn=TESTSRV,cn=Servers,cn=%s,cn=sites,%s" % (site_name, self.configuration_dn)
271 delete_force(self.ldb, usr1)
272 delete_force(self.ldb, usr2)
273 delete_force(self.ldb, grp1)
274 delete_force(self.ldb, ss1)
275 delete_force(self.ldb, srv2)
276 delete_force(self.ldb, srv1)
277 delete_force(self.ldb, sit1)
279 self.ldb.add({
280 "dn": usr1,
281 "objectclass": "user",
282 "description": "test user description",
283 "samaccountname": usr1_name})
285 self.ldb.add({
286 "dn": usr2,
287 "objectclass": "user",
288 "description": "test user 2 description",
289 "samaccountname": usr2_name})
291 self.ldb.add({
292 "dn": grp1,
293 "objectclass": "group",
294 "description": "test group",
295 "samaccountname": grp_name,
296 "member": [ usr1, usr2 ],
297 "isDeleted": "FALSE" })
299 self.ldb.add({
300 "dn": sit1,
301 "objectclass": "site" })
303 self.ldb.add({
304 "dn": ss1,
305 "objectclass": ["applicationSiteSettings", "nTDSSiteSettings"] })
307 self.ldb.add({
308 "dn": srv1,
309 "objectclass": "serversContainer" })
311 self.ldb.add({
312 "dn": srv2,
313 "objectClass": "server" })
315 objLive1 = self.search_dn(usr1)
316 guid1=objLive1["objectGUID"][0]
318 objLive2 = self.search_dn(usr2)
319 guid2=objLive2["objectGUID"][0]
321 objLive3 = self.search_dn(grp1)
322 guid3=objLive3["objectGUID"][0]
324 objLive4 = self.search_dn(sit1)
325 guid4=objLive4["objectGUID"][0]
327 objLive5 = self.search_dn(ss1)
328 guid5=objLive5["objectGUID"][0]
330 objLive6 = self.search_dn(srv1)
331 guid6=objLive6["objectGUID"][0]
333 objLive7 = self.search_dn(srv2)
334 guid7=objLive7["objectGUID"][0]
336 self.ldb.delete(usr1)
337 self.ldb.delete(usr2)
338 self.ldb.delete(grp1)
339 self.ldb.delete(srv1, ["tree_delete:1"])
340 self.ldb.delete(sit1, ["tree_delete:1"])
342 objDeleted1 = self.search_guid(guid1)
343 objDeleted2 = self.search_guid(guid2)
344 objDeleted3 = self.search_guid(guid3)
345 objDeleted4 = self.search_guid(guid4)
346 objDeleted5 = self.search_guid(guid5)
347 objDeleted6 = self.search_guid(guid6)
348 objDeleted7 = self.search_guid(guid7)
350 self.del_attr_values(objDeleted1)
351 self.del_attr_values(objDeleted2)
352 self.del_attr_values(objDeleted3)
353 self.del_attr_values(objDeleted4)
354 self.del_attr_values(objDeleted5)
355 self.del_attr_values(objDeleted6)
356 self.del_attr_values(objDeleted7)
358 self.preserved_attributes_list(objLive1, objDeleted1)
359 self.preserved_attributes_list(objLive2, objDeleted2)
360 self.preserved_attributes_list(objLive3, objDeleted3)
361 self.preserved_attributes_list(objLive4, objDeleted4)
362 self.preserved_attributes_list(objLive5, objDeleted5)
363 self.preserved_attributes_list(objLive6, objDeleted6)
364 self.preserved_attributes_list(objLive7, objDeleted7)
366 self.check_rdn(objLive1, objDeleted1, "cn")
367 self.check_rdn(objLive2, objDeleted2, "cn")
368 self.check_rdn(objLive3, objDeleted3, "cn")
369 self.check_rdn(objLive4, objDeleted4, "cn")
370 self.check_rdn(objLive5, objDeleted5, "cn")
371 self.check_rdn(objLive6, objDeleted6, "cn")
372 self.check_rdn(objLive7, objDeleted7, "cn")
374 self.delete_deleted(self.ldb, usr1)
375 self.delete_deleted(self.ldb, usr2)
376 self.delete_deleted(self.ldb, grp1)
377 self.delete_deleted(self.ldb, sit1)
378 self.delete_deleted(self.ldb, ss1)
379 self.delete_deleted(self.ldb, srv1)
380 self.delete_deleted(self.ldb, srv2)
382 self.assertTrue("CN=Deleted Objects" in str(objDeleted1.dn))
383 self.assertTrue("CN=Deleted Objects" in str(objDeleted2.dn))
384 self.assertTrue("CN=Deleted Objects" in str(objDeleted3.dn))
385 self.assertFalse("CN=Deleted Objects" in str(objDeleted4.dn))
386 self.assertTrue("CN=Deleted Objects" in str(objDeleted5.dn))
387 self.assertFalse("CN=Deleted Objects" in str(objDeleted6.dn))
388 self.assertFalse("CN=Deleted Objects" in str(objDeleted7.dn))
390 class BasicUndeleteTests(BaseDeleteTests):
392 def setUp(self):
393 super(BasicUndeleteTests, self).setUp()
395 def enable_recycle_bin(self):
396 msg = Message()
397 msg.dn = Dn(ldb, "")
398 msg["enableOptionalFeature"] = MessageElement(
399 "CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
400 FLAG_MOD_ADD, "enableOptionalFeature")
401 try:
402 ldb.modify(msg)
403 except LdbError, (num, _):
404 self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
406 def undelete_deleted(self, olddn, newdn, samldb):
407 msg = Message()
408 msg.dn = Dn(ldb, olddn)
409 msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
410 msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
411 res = samldb.modify(msg, ["show_deleted:1"])
413 def undelete_deleted_with_mod(self, olddn, newdn):
414 msg = Message()
415 msg.dn = Dn(ldb, olddn)
416 msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
417 msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
418 msg["url"] = MessageElement(["www.samba.org"], FLAG_MOD_REPLACE, "url")
419 res = ldb.modify(msg, ["show_deleted:1"])
422 def test_undelete(self):
423 print "Testing standard undelete operation"
424 usr1="cn=testuser,cn=users," + self.base_dn
425 delete_force(self.ldb, usr1)
426 ldb.add({
427 "dn": usr1,
428 "objectclass": "user",
429 "description": "test user description",
430 "samaccountname": "testuser"})
431 objLive1 = self.search_dn(usr1)
432 guid1=objLive1["objectGUID"][0]
433 ldb.delete(usr1)
434 objDeleted1 = self.search_guid(guid1)
435 self.undelete_deleted(str(objDeleted1.dn), usr1, ldb)
436 objLive2 = self.search_dn(usr1)
437 self.assertEqual(str(objLive2.dn),str(objLive1.dn))
438 delete_force(self.ldb, usr1)
440 def test_rename(self):
441 print "Testing attempt to rename deleted object"
442 usr1="cn=testuser,cn=users," + self.base_dn
443 ldb.add({
444 "dn": usr1,
445 "objectclass": "user",
446 "description": "test user description",
447 "samaccountname": "testuser"})
448 objLive1 = self.search_dn(usr1)
449 guid1=objLive1["objectGUID"][0]
450 ldb.delete(usr1)
451 objDeleted1 = self.search_guid(guid1)
452 #just to make sure we get the correct error if the show deleted is missing
453 try:
454 ldb.rename(str(objDeleted1.dn), usr1)
455 self.fail()
456 except LdbError, (num, _):
457 self.assertEquals(num,ERR_NO_SUCH_OBJECT)
459 try:
460 ldb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"])
461 self.fail()
462 except LdbError, (num, _):
463 self.assertEquals(num,ERR_UNWILLING_TO_PERFORM)
465 def test_undelete_with_mod(self):
466 print "Testing standard undelete operation with modification of additional attributes"
467 usr1="cn=testuser,cn=users," + self.base_dn
468 ldb.add({
469 "dn": usr1,
470 "objectclass": "user",
471 "description": "test user description",
472 "samaccountname": "testuser"})
473 objLive1 = self.search_dn(usr1)
474 guid1=objLive1["objectGUID"][0]
475 ldb.delete(usr1)
476 objDeleted1 = self.search_guid(guid1)
477 self.undelete_deleted_with_mod(str(objDeleted1.dn), usr1)
478 objLive2 = self.search_dn(usr1)
479 self.assertEqual(objLive2["url"][0],"www.samba.org")
480 delete_force(self.ldb, usr1)
482 def test_undelete_newuser(self):
483 print "Testing undelete user with a different dn"
484 usr1="cn=testuser,cn=users," + self.base_dn
485 usr2="cn=testuser2,cn=users," + self.base_dn
486 delete_force(self.ldb, usr1)
487 ldb.add({
488 "dn": usr1,
489 "objectclass": "user",
490 "description": "test user description",
491 "samaccountname": "testuser"})
492 objLive1 = self.search_dn(usr1)
493 guid1=objLive1["objectGUID"][0]
494 ldb.delete(usr1)
495 objDeleted1 = self.search_guid(guid1)
496 self.undelete_deleted(str(objDeleted1.dn), usr2, ldb)
497 objLive2 = self.search_dn(usr2)
498 delete_force(self.ldb, usr1)
499 delete_force(self.ldb, usr2)
501 def test_undelete_existing(self):
502 print "Testing undelete user after a user with the same dn has been created"
503 usr1="cn=testuser,cn=users," + self.base_dn
504 ldb.add({
505 "dn": usr1,
506 "objectclass": "user",
507 "description": "test user description",
508 "samaccountname": "testuser"})
509 objLive1 = self.search_dn(usr1)
510 guid1=objLive1["objectGUID"][0]
511 ldb.delete(usr1)
512 ldb.add({
513 "dn": usr1,
514 "objectclass": "user",
515 "description": "test user description",
516 "samaccountname": "testuser"})
517 objDeleted1 = self.search_guid(guid1)
518 try:
519 self.undelete_deleted(str(objDeleted1.dn), usr1, ldb)
520 self.fail()
521 except LdbError, (num, _):
522 self.assertEquals(num,ERR_ENTRY_ALREADY_EXISTS)
524 def test_undelete_cross_nc(self):
525 print "Cross NC undelete"
526 c1 = "cn=ldaptestcontainer," + self.base_dn
527 c2 = "cn=ldaptestcontainer2," + self.configuration_dn
528 c3 = "cn=ldaptestcontainer," + self.configuration_dn
529 c4 = "cn=ldaptestcontainer2," + self.base_dn
530 ldb.add({
531 "dn": c1,
532 "objectclass": "container"})
533 ldb.add({
534 "dn": c2,
535 "objectclass": "container"})
536 objLive1 = self.search_dn(c1)
537 objLive2 = self.search_dn(c2)
538 guid1=objLive1["objectGUID"][0]
539 guid2=objLive2["objectGUID"][0]
540 ldb.delete(c1)
541 ldb.delete(c2)
542 objDeleted1 = self.search_guid(guid1)
543 objDeleted2 = self.search_guid(guid2)
544 #try to undelete from base dn to config
545 try:
546 self.undelete_deleted(str(objDeleted1.dn), c3, ldb)
547 self.fail()
548 except LdbError, (num, _):
549 self.assertEquals(num, ERR_OPERATIONS_ERROR)
550 #try to undelete from config to base dn
551 try:
552 self.undelete_deleted(str(objDeleted2.dn), c4, ldb)
553 self.fail()
554 except LdbError, (num, _):
555 self.assertEquals(num, ERR_OPERATIONS_ERROR)
556 #assert undeletion will work in same nc
557 self.undelete_deleted(str(objDeleted1.dn), c4, ldb)
558 self.undelete_deleted(str(objDeleted2.dn), c3, ldb)
559 delete_force(self.ldb, c3)
560 delete_force(self.ldb, c4)
564 if not "://" in host:
565 if os.path.isfile(host):
566 host = "tdb://%s" % host
567 else:
568 host = "ldap://%s" % host
570 TestProgram(module=__name__, opts=subunitopts)