s4-dsdb-tests: Remove unused method get_ldap_connection()
[Samba.git] / source4 / dsdb / tests / python / deletetest.py
blobc4b38223a34fea2ff5a85252adc0f82c33f65388
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
4 import optparse
5 import sys
6 import os
8 sys.path.insert(0, "bin/python")
9 import samba
11 from samba.tests.subunitrun import SubunitOptions, TestProgram
13 import samba.getopt as options
15 from samba.auth import system_session
16 from ldb import SCOPE_BASE, LdbError, Message, MessageElement, Dn, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE
17 from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF, ERR_ENTRY_ALREADY_EXISTS, ERR_ATTRIBUTE_OR_VALUE_EXISTS
18 from ldb import ERR_UNWILLING_TO_PERFORM, ERR_OPERATIONS_ERROR
19 from samba.samdb import SamDB
20 from samba.tests import delete_force
22 parser = optparse.OptionParser("deletetest.py [options] <host|file>")
23 sambaopts = options.SambaOptions(parser)
24 parser.add_option_group(sambaopts)
25 parser.add_option_group(options.VersionOptions(parser))
26 # use command line creds if available
27 credopts = options.CredentialsOptions(parser)
28 parser.add_option_group(credopts)
29 subunitopts = SubunitOptions(parser)
30 parser.add_option_group(subunitopts)
31 opts, args = parser.parse_args()
33 if len(args) < 1:
34 parser.print_usage()
35 sys.exit(1)
37 host = args[0]
39 lp = sambaopts.get_loadparm()
40 creds = credopts.get_credentials(lp)
42 class BaseDeleteTests(samba.tests.TestCase):
44 def GUID_string(self, guid):
45 return self.ldb.schema_format_value("objectGUID", guid)
47 def setUp(self):
48 super(BaseDeleteTests, self).setUp()
49 self.ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)
51 self.base_dn = self.ldb.domain_dn()
52 self.configuration_dn = self.ldb.get_config_basedn().get_linearized()
54 def search_guid(self, guid):
55 print "SEARCH by GUID %s" % self.GUID_string(guid)
57 res = self.ldb.search(base="<GUID=%s>" % self.GUID_string(guid),
58 scope=SCOPE_BASE, controls=["show_deleted:1"])
59 self.assertEquals(len(res), 1)
60 return res[0]
62 def search_dn(self,dn):
63 print "SEARCH by DN %s" % dn
65 res = self.ldb.search(expression="(objectClass=*)",
66 base=dn,
67 scope=SCOPE_BASE,
68 controls=["show_deleted:1"])
69 self.assertEquals(len(res), 1)
70 return res[0]
73 class BasicDeleteTests(BaseDeleteTests):
75 def setUp(self):
76 super(BasicDeleteTests, self).setUp()
78 def del_attr_values(self, delObj):
79 print "Checking attributes for %s" % delObj["dn"]
81 self.assertEquals(delObj["isDeleted"][0],"TRUE")
82 self.assertTrue(not("objectCategory" in delObj))
83 self.assertTrue(not("sAMAccountType" in delObj))
85 def preserved_attributes_list(self, liveObj, delObj):
86 print "Checking for preserved attributes list"
88 preserved_list = ["nTSecurityDescriptor", "attributeID", "attributeSyntax", "dNReferenceUpdate", "dNSHostName",
89 "flatName", "governsID", "groupType", "instanceType", "lDAPDisplayName", "legacyExchangeDN",
90 "isDeleted", "isRecycled", "lastKnownParent", "msDS-LastKnownRDN", "mS-DS-CreatorSID",
91 "mSMQOwnerID", "nCName", "objectClass", "distinguishedName", "objectGUID", "objectSid",
92 "oMSyntax", "proxiedObjectName", "name", "replPropertyMetaData", "sAMAccountName",
93 "securityIdentifier", "sIDHistory", "subClassOf", "systemFlags", "trustPartner", "trustDirection",
94 "trustType", "trustAttributes", "userAccountControl", "uSNChanged", "uSNCreated", "whenCreated"]
96 for a in liveObj:
97 if a in preserved_list:
98 self.assertTrue(a in delObj)
100 def check_rdn(self, liveObj, delObj, rdnName):
101 print "Checking for correct rDN"
102 rdn=liveObj[rdnName][0]
103 rdn2=delObj[rdnName][0]
104 name2=delObj[rdnName][0]
105 guid=liveObj["objectGUID"][0]
106 self.assertEquals(rdn2, rdn + "\nDEL:" + self.GUID_string(guid))
107 self.assertEquals(name2, rdn + "\nDEL:" + self.GUID_string(guid))
109 def delete_deleted(self, ldb, dn):
110 print "Testing the deletion of the already deleted dn %s" % dn
112 try:
113 ldb.delete(dn)
114 self.fail()
115 except LdbError, (num, _):
116 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
118 def test_delete_protection(self):
119 """Delete protection tests"""
121 print self.base_dn
123 delete_force(self.ldb, "cn=entry1,cn=ldaptestcontainer," + self.base_dn)
124 delete_force(self.ldb, "cn=entry2,cn=ldaptestcontainer," + self.base_dn)
125 delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)
127 self.ldb.add({
128 "dn": "cn=ldaptestcontainer," + self.base_dn,
129 "objectclass": "container"})
130 self.ldb.add({
131 "dn": "cn=entry1,cn=ldaptestcontainer," + self.base_dn,
132 "objectclass": "container"})
133 self.ldb.add({
134 "dn": "cn=entry2,cn=ldaptestcontainer," + self.base_dn,
135 "objectclass": "container"})
137 try:
138 self.ldb.delete("cn=ldaptestcontainer," + self.base_dn)
139 self.fail()
140 except LdbError, (num, _):
141 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
143 self.ldb.delete("cn=ldaptestcontainer," + self.base_dn, ["tree_delete:1"])
145 try:
146 res = self.ldb.search("cn=ldaptestcontainer," + self.base_dn,
147 scope=SCOPE_BASE, attrs=[])
148 self.fail()
149 except LdbError, (num, _):
150 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
151 try:
152 res = self.ldb.search("cn=entry1,cn=ldaptestcontainer," + self.base_dn,
153 scope=SCOPE_BASE, attrs=[])
154 self.fail()
155 except LdbError, (num, _):
156 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
157 try:
158 res = self.ldb.search("cn=entry2,cn=ldaptestcontainer," + self.base_dn,
159 scope=SCOPE_BASE, attrs=[])
160 self.fail()
161 except LdbError, (num, _):
162 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
164 delete_force(self.ldb, "cn=entry1,cn=ldaptestcontainer," + self.base_dn)
165 delete_force(self.ldb, "cn=entry2,cn=ldaptestcontainer," + self.base_dn)
166 delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)
168 # Performs some protected object delete testing
170 res = self.ldb.search(base="", expression="", scope=SCOPE_BASE,
171 attrs=["dsServiceName", "dNSHostName"])
172 self.assertEquals(len(res), 1)
174 # Delete failing since DC's nTDSDSA object is protected
175 try:
176 self.ldb.delete(res[0]["dsServiceName"][0])
177 self.fail()
178 except LdbError, (num, _):
179 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
181 res = self.ldb.search(self.base_dn, attrs=["rIDSetReferences"],
182 expression="(&(objectClass=computer)(dNSHostName=" + res[0]["dNSHostName"][0] + "))")
183 self.assertEquals(len(res), 1)
185 # Deletes failing since DC's rIDSet object is protected
186 try:
187 self.ldb.delete(res[0]["rIDSetReferences"][0])
188 self.fail()
189 except LdbError, (num, _):
190 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
191 try:
192 self.ldb.delete(res[0]["rIDSetReferences"][0], ["tree_delete:1"])
193 self.fail()
194 except LdbError, (num, _):
195 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
197 # Deletes failing since three main crossRef objects are protected
199 try:
200 self.ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn)
201 self.fail()
202 except LdbError, (num, _):
203 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
204 try:
205 self.ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn, ["tree_delete:1"])
206 self.fail()
207 except LdbError, (num, _):
208 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
210 try:
211 self.ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn)
212 self.fail()
213 except LdbError, (num, _):
214 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
215 try:
216 self.ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn, ["tree_delete:1"])
217 self.fail()
218 except LdbError, (num, _):
219 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
221 res = self.ldb.search("cn=Partitions," + self.configuration_dn, attrs=[],
222 expression="(nCName=%s)" % self.base_dn)
223 self.assertEquals(len(res), 1)
225 try:
226 self.ldb.delete(res[0].dn)
227 self.fail()
228 except LdbError, (num, _):
229 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
230 try:
231 self.ldb.delete(res[0].dn, ["tree_delete:1"])
232 self.fail()
233 except LdbError, (num, _):
234 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
236 # Delete failing since "SYSTEM_FLAG_DISALLOW_DELETE"
237 try:
238 self.ldb.delete("CN=Users," + self.base_dn)
239 self.fail()
240 except LdbError, (num, _):
241 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
243 # Tree-delete failing since "isCriticalSystemObject"
244 try:
245 self.ldb.delete("CN=Computers," + self.base_dn, ["tree_delete:1"])
246 self.fail()
247 except LdbError, (num, _):
248 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
250 def test_all(self):
251 """Basic delete tests"""
253 print self.base_dn
255 usr1="cn=testuser,cn=users," + self.base_dn
256 usr2="cn=testuser2,cn=users," + self.base_dn
257 grp1="cn=testdelgroup1,cn=users," + self.base_dn
258 sit1="cn=testsite1,cn=sites," + self.configuration_dn
259 ss1="cn=NTDS Site Settings,cn=testsite1,cn=sites," + self.configuration_dn
260 srv1="cn=Servers,cn=testsite1,cn=sites," + self.configuration_dn
261 srv2="cn=TESTSRV,cn=Servers,cn=testsite1,cn=sites," + self.configuration_dn
263 delete_force(self.ldb, usr1)
264 delete_force(self.ldb, usr2)
265 delete_force(self.ldb, grp1)
266 delete_force(self.ldb, ss1)
267 delete_force(self.ldb, srv2)
268 delete_force(self.ldb, srv1)
269 delete_force(self.ldb, sit1)
271 self.ldb.add({
272 "dn": usr1,
273 "objectclass": "user",
274 "description": "test user description",
275 "samaccountname": "testuser"})
277 self.ldb.add({
278 "dn": usr2,
279 "objectclass": "user",
280 "description": "test user 2 description",
281 "samaccountname": "testuser2"})
283 self.ldb.add({
284 "dn": grp1,
285 "objectclass": "group",
286 "description": "test group",
287 "samaccountname": "testdelgroup1",
288 "member": [ usr1, usr2 ],
289 "isDeleted": "FALSE" })
291 self.ldb.add({
292 "dn": sit1,
293 "objectclass": "site" })
295 self.ldb.add({
296 "dn": ss1,
297 "objectclass": ["applicationSiteSettings", "nTDSSiteSettings"] })
299 self.ldb.add({
300 "dn": srv1,
301 "objectclass": "serversContainer" })
303 self.ldb.add({
304 "dn": srv2,
305 "objectClass": "server" })
307 objLive1 = self.search_dn(usr1)
308 guid1=objLive1["objectGUID"][0]
310 objLive2 = self.search_dn(usr2)
311 guid2=objLive2["objectGUID"][0]
313 objLive3 = self.search_dn(grp1)
314 guid3=objLive3["objectGUID"][0]
316 objLive4 = self.search_dn(sit1)
317 guid4=objLive4["objectGUID"][0]
319 objLive5 = self.search_dn(ss1)
320 guid5=objLive5["objectGUID"][0]
322 objLive6 = self.search_dn(srv1)
323 guid6=objLive6["objectGUID"][0]
325 objLive7 = self.search_dn(srv2)
326 guid7=objLive7["objectGUID"][0]
328 self.ldb.delete(usr1)
329 self.ldb.delete(usr2)
330 self.ldb.delete(grp1)
331 self.ldb.delete(srv1, ["tree_delete:1"])
332 self.ldb.delete(sit1, ["tree_delete:1"])
334 objDeleted1 = self.search_guid(guid1)
335 objDeleted2 = self.search_guid(guid2)
336 objDeleted3 = self.search_guid(guid3)
337 objDeleted4 = self.search_guid(guid4)
338 objDeleted5 = self.search_guid(guid5)
339 objDeleted6 = self.search_guid(guid6)
340 objDeleted7 = self.search_guid(guid7)
342 self.del_attr_values(objDeleted1)
343 self.del_attr_values(objDeleted2)
344 self.del_attr_values(objDeleted3)
345 self.del_attr_values(objDeleted4)
346 self.del_attr_values(objDeleted5)
347 self.del_attr_values(objDeleted6)
348 self.del_attr_values(objDeleted7)
350 self.preserved_attributes_list(objLive1, objDeleted1)
351 self.preserved_attributes_list(objLive2, objDeleted2)
352 self.preserved_attributes_list(objLive3, objDeleted3)
353 self.preserved_attributes_list(objLive4, objDeleted4)
354 self.preserved_attributes_list(objLive5, objDeleted5)
355 self.preserved_attributes_list(objLive6, objDeleted6)
356 self.preserved_attributes_list(objLive7, objDeleted7)
358 self.check_rdn(objLive1, objDeleted1, "cn")
359 self.check_rdn(objLive2, objDeleted2, "cn")
360 self.check_rdn(objLive3, objDeleted3, "cn")
361 self.check_rdn(objLive4, objDeleted4, "cn")
362 self.check_rdn(objLive5, objDeleted5, "cn")
363 self.check_rdn(objLive6, objDeleted6, "cn")
364 self.check_rdn(objLive7, objDeleted7, "cn")
366 self.delete_deleted(self.ldb, usr1)
367 self.delete_deleted(self.ldb, usr2)
368 self.delete_deleted(self.ldb, grp1)
369 self.delete_deleted(self.ldb, sit1)
370 self.delete_deleted(self.ldb, ss1)
371 self.delete_deleted(self.ldb, srv1)
372 self.delete_deleted(self.ldb, srv2)
374 self.assertTrue("CN=Deleted Objects" in str(objDeleted1.dn))
375 self.assertTrue("CN=Deleted Objects" in str(objDeleted2.dn))
376 self.assertTrue("CN=Deleted Objects" in str(objDeleted3.dn))
377 self.assertFalse("CN=Deleted Objects" in str(objDeleted4.dn))
378 self.assertTrue("CN=Deleted Objects" in str(objDeleted5.dn))
379 self.assertFalse("CN=Deleted Objects" in str(objDeleted6.dn))
380 self.assertFalse("CN=Deleted Objects" in str(objDeleted7.dn))
382 class BasicUndeleteTests(BaseDeleteTests):
384 def setUp(self):
385 super(BasicUndeleteTests, self).setUp()
387 def enable_recycle_bin(self):
388 msg = Message()
389 msg.dn = Dn(ldb, "")
390 msg["enableOptionalFeature"] = MessageElement(
391 "CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
392 FLAG_MOD_ADD, "enableOptionalFeature")
393 try:
394 ldb.modify(msg)
395 except LdbError, (num, _):
396 self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
398 def undelete_deleted(self, olddn, newdn, samldb):
399 msg = Message()
400 msg.dn = Dn(ldb, olddn)
401 msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
402 msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
403 res = samldb.modify(msg, ["show_deleted:1"])
405 def undelete_deleted_with_mod(self, olddn, newdn):
406 msg = Message()
407 msg.dn = Dn(ldb, olddn)
408 msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
409 msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
410 msg["url"] = MessageElement(["www.samba.org"], FLAG_MOD_REPLACE, "url")
411 res = ldb.modify(msg, ["show_deleted:1"])
414 def test_undelete(self):
415 print "Testing standard undelete operation"
416 usr1="cn=testuser,cn=users," + self.base_dn
417 delete_force(self.ldb, usr1)
418 ldb.add({
419 "dn": usr1,
420 "objectclass": "user",
421 "description": "test user description",
422 "samaccountname": "testuser"})
423 objLive1 = self.search_dn(usr1)
424 guid1=objLive1["objectGUID"][0]
425 ldb.delete(usr1)
426 objDeleted1 = self.search_guid(guid1)
427 self.undelete_deleted(str(objDeleted1.dn), usr1, ldb)
428 objLive2 = self.search_dn(usr1)
429 self.assertEqual(str(objLive2.dn),str(objLive1.dn))
430 delete_force(self.ldb, usr1)
432 def test_rename(self):
433 print "Testing attempt to rename deleted object"
434 usr1="cn=testuser,cn=users," + self.base_dn
435 ldb.add({
436 "dn": usr1,
437 "objectclass": "user",
438 "description": "test user description",
439 "samaccountname": "testuser"})
440 objLive1 = self.search_dn(usr1)
441 guid1=objLive1["objectGUID"][0]
442 ldb.delete(usr1)
443 objDeleted1 = self.search_guid(guid1)
444 #just to make sure we get the correct error if the show deleted is missing
445 try:
446 ldb.rename(str(objDeleted1.dn), usr1)
447 self.fail()
448 except LdbError, (num, _):
449 self.assertEquals(num,ERR_NO_SUCH_OBJECT)
451 try:
452 ldb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"])
453 self.fail()
454 except LdbError, (num, _):
455 self.assertEquals(num,ERR_UNWILLING_TO_PERFORM)
457 def test_undelete_with_mod(self):
458 print "Testing standard undelete operation with modification of additional attributes"
459 usr1="cn=testuser,cn=users," + self.base_dn
460 ldb.add({
461 "dn": usr1,
462 "objectclass": "user",
463 "description": "test user description",
464 "samaccountname": "testuser"})
465 objLive1 = self.search_dn(usr1)
466 guid1=objLive1["objectGUID"][0]
467 ldb.delete(usr1)
468 objDeleted1 = self.search_guid(guid1)
469 self.undelete_deleted_with_mod(str(objDeleted1.dn), usr1)
470 objLive2 = self.search_dn(usr1)
471 self.assertEqual(objLive2["url"][0],"www.samba.org")
472 delete_force(self.ldb, usr1)
474 def test_undelete_newuser(self):
475 print "Testing undelete user with a different dn"
476 usr1="cn=testuser,cn=users," + self.base_dn
477 usr2="cn=testuser2,cn=users," + self.base_dn
478 delete_force(self.ldb, usr1)
479 ldb.add({
480 "dn": usr1,
481 "objectclass": "user",
482 "description": "test user description",
483 "samaccountname": "testuser"})
484 objLive1 = self.search_dn(usr1)
485 guid1=objLive1["objectGUID"][0]
486 ldb.delete(usr1)
487 objDeleted1 = self.search_guid(guid1)
488 self.undelete_deleted(str(objDeleted1.dn), usr2, ldb)
489 objLive2 = self.search_dn(usr2)
490 delete_force(self.ldb, usr1)
491 delete_force(self.ldb, usr2)
493 def test_undelete_existing(self):
494 print "Testing undelete user after a user with the same dn has been created"
495 usr1="cn=testuser,cn=users," + self.base_dn
496 ldb.add({
497 "dn": usr1,
498 "objectclass": "user",
499 "description": "test user description",
500 "samaccountname": "testuser"})
501 objLive1 = self.search_dn(usr1)
502 guid1=objLive1["objectGUID"][0]
503 ldb.delete(usr1)
504 ldb.add({
505 "dn": usr1,
506 "objectclass": "user",
507 "description": "test user description",
508 "samaccountname": "testuser"})
509 objDeleted1 = self.search_guid(guid1)
510 try:
511 self.undelete_deleted(str(objDeleted1.dn), usr1, ldb)
512 self.fail()
513 except LdbError, (num, _):
514 self.assertEquals(num,ERR_ENTRY_ALREADY_EXISTS)
516 def test_undelete_cross_nc(self):
517 print "Cross NC undelete"
518 c1 = "cn=ldaptestcontainer," + self.base_dn
519 c2 = "cn=ldaptestcontainer2," + self.configuration_dn
520 c3 = "cn=ldaptestcontainer," + self.configuration_dn
521 c4 = "cn=ldaptestcontainer2," + self.base_dn
522 ldb.add({
523 "dn": c1,
524 "objectclass": "container"})
525 ldb.add({
526 "dn": c2,
527 "objectclass": "container"})
528 objLive1 = self.search_dn(c1)
529 objLive2 = self.search_dn(c2)
530 guid1=objLive1["objectGUID"][0]
531 guid2=objLive2["objectGUID"][0]
532 ldb.delete(c1)
533 ldb.delete(c2)
534 objDeleted1 = self.search_guid(guid1)
535 objDeleted2 = self.search_guid(guid2)
536 #try to undelete from base dn to config
537 try:
538 self.undelete_deleted(str(objDeleted1.dn), c3, ldb)
539 self.fail()
540 except LdbError, (num, _):
541 self.assertEquals(num, ERR_OPERATIONS_ERROR)
542 #try to undelete from config to base dn
543 try:
544 self.undelete_deleted(str(objDeleted2.dn), c4, ldb)
545 self.fail()
546 except LdbError, (num, _):
547 self.assertEquals(num, ERR_OPERATIONS_ERROR)
548 #assert undeletion will work in same nc
549 self.undelete_deleted(str(objDeleted1.dn), c4, ldb)
550 self.undelete_deleted(str(objDeleted2.dn), c3, ldb)
551 delete_force(self.ldb, c3)
552 delete_force(self.ldb, c4)
556 if not "://" in host:
557 if os.path.isfile(host):
558 host = "tdb://%s" % host
559 else:
560 host = "ldap://%s" % host
562 TestProgram(module=__name__, opts=subunitopts)