2 # -*- coding: utf-8 -*-
8 sys
.path
.insert(0, "bin/python")
11 from samba
.tests
.subunitrun
import SubunitOptions
, TestProgram
13 import samba
.getopt
as options
15 from samba
.auth
import system_session
16 from ldb
import SCOPE_BASE
, LdbError
, Message
, MessageElement
, Dn
, FLAG_MOD_ADD
, FLAG_MOD_DELETE
, FLAG_MOD_REPLACE
17 from ldb
import ERR_NO_SUCH_OBJECT
, ERR_NOT_ALLOWED_ON_NON_LEAF
, ERR_ENTRY_ALREADY_EXISTS
, ERR_ATTRIBUTE_OR_VALUE_EXISTS
18 from ldb
import ERR_UNWILLING_TO_PERFORM
, ERR_OPERATIONS_ERROR
19 from samba
.samdb
import SamDB
20 from samba
.tests
import delete_force
22 parser
= optparse
.OptionParser("deletetest.py [options] <host|file>")
23 sambaopts
= options
.SambaOptions(parser
)
24 parser
.add_option_group(sambaopts
)
25 parser
.add_option_group(options
.VersionOptions(parser
))
26 # use command line creds if available
27 credopts
= options
.CredentialsOptions(parser
)
28 parser
.add_option_group(credopts
)
29 subunitopts
= SubunitOptions(parser
)
30 parser
.add_option_group(subunitopts
)
31 opts
, args
= parser
.parse_args()
39 lp
= sambaopts
.get_loadparm()
40 creds
= credopts
.get_credentials(lp
)
42 class BaseDeleteTests(samba
.tests
.TestCase
):
44 def GUID_string(self
, guid
):
45 return self
.ldb
.schema_format_value("objectGUID", guid
)
48 super(BaseDeleteTests
, self
).setUp()
49 self
.ldb
= SamDB(host
, credentials
=creds
, session_info
=system_session(lp
), lp
=lp
)
51 self
.base_dn
= self
.ldb
.domain_dn()
52 self
.configuration_dn
= self
.ldb
.get_config_basedn().get_linearized()
54 def search_guid(self
, guid
):
55 print "SEARCH by GUID %s" % self
.GUID_string(guid
)
57 res
= self
.ldb
.search(base
="<GUID=%s>" % self
.GUID_string(guid
),
58 scope
=SCOPE_BASE
, controls
=["show_deleted:1"])
59 self
.assertEquals(len(res
), 1)
62 def search_dn(self
,dn
):
63 print "SEARCH by DN %s" % dn
65 res
= self
.ldb
.search(expression
="(objectClass=*)",
68 controls
=["show_deleted:1"])
69 self
.assertEquals(len(res
), 1)
73 class BasicDeleteTests(BaseDeleteTests
):
76 super(BasicDeleteTests
, self
).setUp()
78 def del_attr_values(self
, delObj
):
79 print "Checking attributes for %s" % delObj
["dn"]
81 self
.assertEquals(delObj
["isDeleted"][0],"TRUE")
82 self
.assertTrue(not("objectCategory" in delObj
))
83 self
.assertTrue(not("sAMAccountType" in delObj
))
85 def preserved_attributes_list(self
, liveObj
, delObj
):
86 print "Checking for preserved attributes list"
88 preserved_list
= ["nTSecurityDescriptor", "attributeID", "attributeSyntax", "dNReferenceUpdate", "dNSHostName",
89 "flatName", "governsID", "groupType", "instanceType", "lDAPDisplayName", "legacyExchangeDN",
90 "isDeleted", "isRecycled", "lastKnownParent", "msDS-LastKnownRDN", "mS-DS-CreatorSID",
91 "mSMQOwnerID", "nCName", "objectClass", "distinguishedName", "objectGUID", "objectSid",
92 "oMSyntax", "proxiedObjectName", "name", "replPropertyMetaData", "sAMAccountName",
93 "securityIdentifier", "sIDHistory", "subClassOf", "systemFlags", "trustPartner", "trustDirection",
94 "trustType", "trustAttributes", "userAccountControl", "uSNChanged", "uSNCreated", "whenCreated"]
97 if a
in preserved_list
:
98 self
.assertTrue(a
in delObj
)
100 def check_rdn(self
, liveObj
, delObj
, rdnName
):
101 print "Checking for correct rDN"
102 rdn
=liveObj
[rdnName
][0]
103 rdn2
=delObj
[rdnName
][0]
104 name2
=delObj
[rdnName
][0]
105 guid
=liveObj
["objectGUID"][0]
106 self
.assertEquals(rdn2
, rdn
+ "\nDEL:" + self
.GUID_string(guid
))
107 self
.assertEquals(name2
, rdn
+ "\nDEL:" + self
.GUID_string(guid
))
109 def delete_deleted(self
, ldb
, dn
):
110 print "Testing the deletion of the already deleted dn %s" % dn
115 except LdbError
, (num
, _
):
116 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
118 def test_delete_protection(self
):
119 """Delete protection tests"""
123 delete_force(self
.ldb
, "cn=entry1,cn=ldaptestcontainer," + self
.base_dn
)
124 delete_force(self
.ldb
, "cn=entry2,cn=ldaptestcontainer," + self
.base_dn
)
125 delete_force(self
.ldb
, "cn=ldaptestcontainer," + self
.base_dn
)
128 "dn": "cn=ldaptestcontainer," + self
.base_dn
,
129 "objectclass": "container"})
131 "dn": "cn=entry1,cn=ldaptestcontainer," + self
.base_dn
,
132 "objectclass": "container"})
134 "dn": "cn=entry2,cn=ldaptestcontainer," + self
.base_dn
,
135 "objectclass": "container"})
138 self
.ldb
.delete("cn=ldaptestcontainer," + self
.base_dn
)
140 except LdbError
, (num
, _
):
141 self
.assertEquals(num
, ERR_NOT_ALLOWED_ON_NON_LEAF
)
143 self
.ldb
.delete("cn=ldaptestcontainer," + self
.base_dn
, ["tree_delete:1"])
146 res
= self
.ldb
.search("cn=ldaptestcontainer," + self
.base_dn
,
147 scope
=SCOPE_BASE
, attrs
=[])
149 except LdbError
, (num
, _
):
150 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
152 res
= self
.ldb
.search("cn=entry1,cn=ldaptestcontainer," + self
.base_dn
,
153 scope
=SCOPE_BASE
, attrs
=[])
155 except LdbError
, (num
, _
):
156 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
158 res
= self
.ldb
.search("cn=entry2,cn=ldaptestcontainer," + self
.base_dn
,
159 scope
=SCOPE_BASE
, attrs
=[])
161 except LdbError
, (num
, _
):
162 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
164 delete_force(self
.ldb
, "cn=entry1,cn=ldaptestcontainer," + self
.base_dn
)
165 delete_force(self
.ldb
, "cn=entry2,cn=ldaptestcontainer," + self
.base_dn
)
166 delete_force(self
.ldb
, "cn=ldaptestcontainer," + self
.base_dn
)
168 # Performs some protected object delete testing
170 res
= self
.ldb
.search(base
="", expression
="", scope
=SCOPE_BASE
,
171 attrs
=["dsServiceName", "dNSHostName"])
172 self
.assertEquals(len(res
), 1)
174 # Delete failing since DC's nTDSDSA object is protected
176 self
.ldb
.delete(res
[0]["dsServiceName"][0])
178 except LdbError
, (num
, _
):
179 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
181 res
= self
.ldb
.search(self
.base_dn
, attrs
=["rIDSetReferences"],
182 expression
="(&(objectClass=computer)(dNSHostName=" + res
[0]["dNSHostName"][0] + "))")
183 self
.assertEquals(len(res
), 1)
185 # Deletes failing since DC's rIDSet object is protected
187 self
.ldb
.delete(res
[0]["rIDSetReferences"][0])
189 except LdbError
, (num
, _
):
190 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
192 self
.ldb
.delete(res
[0]["rIDSetReferences"][0], ["tree_delete:1"])
194 except LdbError
, (num
, _
):
195 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
197 # Deletes failing since three main crossRef objects are protected
200 self
.ldb
.delete("cn=Enterprise Schema,cn=Partitions," + self
.configuration_dn
)
202 except LdbError
, (num
, _
):
203 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
205 self
.ldb
.delete("cn=Enterprise Schema,cn=Partitions," + self
.configuration_dn
, ["tree_delete:1"])
207 except LdbError
, (num
, _
):
208 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
211 self
.ldb
.delete("cn=Enterprise Configuration,cn=Partitions," + self
.configuration_dn
)
213 except LdbError
, (num
, _
):
214 self
.assertEquals(num
, ERR_NOT_ALLOWED_ON_NON_LEAF
)
216 self
.ldb
.delete("cn=Enterprise Configuration,cn=Partitions," + self
.configuration_dn
, ["tree_delete:1"])
218 except LdbError
, (num
, _
):
219 self
.assertEquals(num
, ERR_NOT_ALLOWED_ON_NON_LEAF
)
221 res
= self
.ldb
.search("cn=Partitions," + self
.configuration_dn
, attrs
=[],
222 expression
="(nCName=%s)" % self
.base_dn
)
223 self
.assertEquals(len(res
), 1)
226 self
.ldb
.delete(res
[0].dn
)
228 except LdbError
, (num
, _
):
229 self
.assertEquals(num
, ERR_NOT_ALLOWED_ON_NON_LEAF
)
231 self
.ldb
.delete(res
[0].dn
, ["tree_delete:1"])
233 except LdbError
, (num
, _
):
234 self
.assertEquals(num
, ERR_NOT_ALLOWED_ON_NON_LEAF
)
236 # Delete failing since "SYSTEM_FLAG_DISALLOW_DELETE"
238 self
.ldb
.delete("CN=Users," + self
.base_dn
)
240 except LdbError
, (num
, _
):
241 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
243 # Tree-delete failing since "isCriticalSystemObject"
245 self
.ldb
.delete("CN=Computers," + self
.base_dn
, ["tree_delete:1"])
247 except LdbError
, (num
, _
):
248 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
251 """Basic delete tests"""
255 usr1
="cn=testuser,cn=users," + self
.base_dn
256 usr2
="cn=testuser2,cn=users," + self
.base_dn
257 grp1
="cn=testdelgroup1,cn=users," + self
.base_dn
258 sit1
="cn=testsite1,cn=sites," + self
.configuration_dn
259 ss1
="cn=NTDS Site Settings,cn=testsite1,cn=sites," + self
.configuration_dn
260 srv1
="cn=Servers,cn=testsite1,cn=sites," + self
.configuration_dn
261 srv2
="cn=TESTSRV,cn=Servers,cn=testsite1,cn=sites," + self
.configuration_dn
263 delete_force(self
.ldb
, usr1
)
264 delete_force(self
.ldb
, usr2
)
265 delete_force(self
.ldb
, grp1
)
266 delete_force(self
.ldb
, ss1
)
267 delete_force(self
.ldb
, srv2
)
268 delete_force(self
.ldb
, srv1
)
269 delete_force(self
.ldb
, sit1
)
273 "objectclass": "user",
274 "description": "test user description",
275 "samaccountname": "testuser"})
279 "objectclass": "user",
280 "description": "test user 2 description",
281 "samaccountname": "testuser2"})
285 "objectclass": "group",
286 "description": "test group",
287 "samaccountname": "testdelgroup1",
288 "member": [ usr1
, usr2
],
289 "isDeleted": "FALSE" })
293 "objectclass": "site" })
297 "objectclass": ["applicationSiteSettings", "nTDSSiteSettings"] })
301 "objectclass": "serversContainer" })
305 "objectClass": "server" })
307 objLive1
= self
.search_dn(usr1
)
308 guid1
=objLive1
["objectGUID"][0]
310 objLive2
= self
.search_dn(usr2
)
311 guid2
=objLive2
["objectGUID"][0]
313 objLive3
= self
.search_dn(grp1
)
314 guid3
=objLive3
["objectGUID"][0]
316 objLive4
= self
.search_dn(sit1
)
317 guid4
=objLive4
["objectGUID"][0]
319 objLive5
= self
.search_dn(ss1
)
320 guid5
=objLive5
["objectGUID"][0]
322 objLive6
= self
.search_dn(srv1
)
323 guid6
=objLive6
["objectGUID"][0]
325 objLive7
= self
.search_dn(srv2
)
326 guid7
=objLive7
["objectGUID"][0]
328 self
.ldb
.delete(usr1
)
329 self
.ldb
.delete(usr2
)
330 self
.ldb
.delete(grp1
)
331 self
.ldb
.delete(srv1
, ["tree_delete:1"])
332 self
.ldb
.delete(sit1
, ["tree_delete:1"])
334 objDeleted1
= self
.search_guid(guid1
)
335 objDeleted2
= self
.search_guid(guid2
)
336 objDeleted3
= self
.search_guid(guid3
)
337 objDeleted4
= self
.search_guid(guid4
)
338 objDeleted5
= self
.search_guid(guid5
)
339 objDeleted6
= self
.search_guid(guid6
)
340 objDeleted7
= self
.search_guid(guid7
)
342 self
.del_attr_values(objDeleted1
)
343 self
.del_attr_values(objDeleted2
)
344 self
.del_attr_values(objDeleted3
)
345 self
.del_attr_values(objDeleted4
)
346 self
.del_attr_values(objDeleted5
)
347 self
.del_attr_values(objDeleted6
)
348 self
.del_attr_values(objDeleted7
)
350 self
.preserved_attributes_list(objLive1
, objDeleted1
)
351 self
.preserved_attributes_list(objLive2
, objDeleted2
)
352 self
.preserved_attributes_list(objLive3
, objDeleted3
)
353 self
.preserved_attributes_list(objLive4
, objDeleted4
)
354 self
.preserved_attributes_list(objLive5
, objDeleted5
)
355 self
.preserved_attributes_list(objLive6
, objDeleted6
)
356 self
.preserved_attributes_list(objLive7
, objDeleted7
)
358 self
.check_rdn(objLive1
, objDeleted1
, "cn")
359 self
.check_rdn(objLive2
, objDeleted2
, "cn")
360 self
.check_rdn(objLive3
, objDeleted3
, "cn")
361 self
.check_rdn(objLive4
, objDeleted4
, "cn")
362 self
.check_rdn(objLive5
, objDeleted5
, "cn")
363 self
.check_rdn(objLive6
, objDeleted6
, "cn")
364 self
.check_rdn(objLive7
, objDeleted7
, "cn")
366 self
.delete_deleted(self
.ldb
, usr1
)
367 self
.delete_deleted(self
.ldb
, usr2
)
368 self
.delete_deleted(self
.ldb
, grp1
)
369 self
.delete_deleted(self
.ldb
, sit1
)
370 self
.delete_deleted(self
.ldb
, ss1
)
371 self
.delete_deleted(self
.ldb
, srv1
)
372 self
.delete_deleted(self
.ldb
, srv2
)
374 self
.assertTrue("CN=Deleted Objects" in str(objDeleted1
.dn
))
375 self
.assertTrue("CN=Deleted Objects" in str(objDeleted2
.dn
))
376 self
.assertTrue("CN=Deleted Objects" in str(objDeleted3
.dn
))
377 self
.assertFalse("CN=Deleted Objects" in str(objDeleted4
.dn
))
378 self
.assertTrue("CN=Deleted Objects" in str(objDeleted5
.dn
))
379 self
.assertFalse("CN=Deleted Objects" in str(objDeleted6
.dn
))
380 self
.assertFalse("CN=Deleted Objects" in str(objDeleted7
.dn
))
382 class BasicUndeleteTests(BaseDeleteTests
):
385 super(BasicUndeleteTests
, self
).setUp()
387 def enable_recycle_bin(self
):
390 msg
["enableOptionalFeature"] = MessageElement(
391 "CN=Partitions," + self
.configuration_dn
+ ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
392 FLAG_MOD_ADD
, "enableOptionalFeature")
395 except LdbError
, (num
, _
):
396 self
.assertEquals(num
, ERR_ATTRIBUTE_OR_VALUE_EXISTS
)
398 def undelete_deleted(self
, olddn
, newdn
, samldb
):
400 msg
.dn
= Dn(ldb
, olddn
)
401 msg
["isDeleted"] = MessageElement([], FLAG_MOD_DELETE
, "isDeleted")
402 msg
["distinguishedName"] = MessageElement([newdn
], FLAG_MOD_REPLACE
, "distinguishedName")
403 res
= samldb
.modify(msg
, ["show_deleted:1"])
405 def undelete_deleted_with_mod(self
, olddn
, newdn
):
407 msg
.dn
= Dn(ldb
, olddn
)
408 msg
["isDeleted"] = MessageElement([], FLAG_MOD_DELETE
, "isDeleted")
409 msg
["distinguishedName"] = MessageElement([newdn
], FLAG_MOD_REPLACE
, "distinguishedName")
410 msg
["url"] = MessageElement(["www.samba.org"], FLAG_MOD_REPLACE
, "url")
411 res
= ldb
.modify(msg
, ["show_deleted:1"])
414 def test_undelete(self
):
415 print "Testing standard undelete operation"
416 usr1
="cn=testuser,cn=users," + self
.base_dn
417 delete_force(self
.ldb
, usr1
)
420 "objectclass": "user",
421 "description": "test user description",
422 "samaccountname": "testuser"})
423 objLive1
= self
.search_dn(usr1
)
424 guid1
=objLive1
["objectGUID"][0]
426 objDeleted1
= self
.search_guid(guid1
)
427 self
.undelete_deleted(str(objDeleted1
.dn
), usr1
, ldb
)
428 objLive2
= self
.search_dn(usr1
)
429 self
.assertEqual(str(objLive2
.dn
),str(objLive1
.dn
))
430 delete_force(self
.ldb
, usr1
)
432 def test_rename(self
):
433 print "Testing attempt to rename deleted object"
434 usr1
="cn=testuser,cn=users," + self
.base_dn
437 "objectclass": "user",
438 "description": "test user description",
439 "samaccountname": "testuser"})
440 objLive1
= self
.search_dn(usr1
)
441 guid1
=objLive1
["objectGUID"][0]
443 objDeleted1
= self
.search_guid(guid1
)
444 #just to make sure we get the correct error if the show deleted is missing
446 ldb
.rename(str(objDeleted1
.dn
), usr1
)
448 except LdbError
, (num
, _
):
449 self
.assertEquals(num
,ERR_NO_SUCH_OBJECT
)
452 ldb
.rename(str(objDeleted1
.dn
), usr1
, ["show_deleted:1"])
454 except LdbError
, (num
, _
):
455 self
.assertEquals(num
,ERR_UNWILLING_TO_PERFORM
)
457 def test_undelete_with_mod(self
):
458 print "Testing standard undelete operation with modification of additional attributes"
459 usr1
="cn=testuser,cn=users," + self
.base_dn
462 "objectclass": "user",
463 "description": "test user description",
464 "samaccountname": "testuser"})
465 objLive1
= self
.search_dn(usr1
)
466 guid1
=objLive1
["objectGUID"][0]
468 objDeleted1
= self
.search_guid(guid1
)
469 self
.undelete_deleted_with_mod(str(objDeleted1
.dn
), usr1
)
470 objLive2
= self
.search_dn(usr1
)
471 self
.assertEqual(objLive2
["url"][0],"www.samba.org")
472 delete_force(self
.ldb
, usr1
)
474 def test_undelete_newuser(self
):
475 print "Testing undelete user with a different dn"
476 usr1
="cn=testuser,cn=users," + self
.base_dn
477 usr2
="cn=testuser2,cn=users," + self
.base_dn
478 delete_force(self
.ldb
, usr1
)
481 "objectclass": "user",
482 "description": "test user description",
483 "samaccountname": "testuser"})
484 objLive1
= self
.search_dn(usr1
)
485 guid1
=objLive1
["objectGUID"][0]
487 objDeleted1
= self
.search_guid(guid1
)
488 self
.undelete_deleted(str(objDeleted1
.dn
), usr2
, ldb
)
489 objLive2
= self
.search_dn(usr2
)
490 delete_force(self
.ldb
, usr1
)
491 delete_force(self
.ldb
, usr2
)
493 def test_undelete_existing(self
):
494 print "Testing undelete user after a user with the same dn has been created"
495 usr1
="cn=testuser,cn=users," + self
.base_dn
498 "objectclass": "user",
499 "description": "test user description",
500 "samaccountname": "testuser"})
501 objLive1
= self
.search_dn(usr1
)
502 guid1
=objLive1
["objectGUID"][0]
506 "objectclass": "user",
507 "description": "test user description",
508 "samaccountname": "testuser"})
509 objDeleted1
= self
.search_guid(guid1
)
511 self
.undelete_deleted(str(objDeleted1
.dn
), usr1
, ldb
)
513 except LdbError
, (num
, _
):
514 self
.assertEquals(num
,ERR_ENTRY_ALREADY_EXISTS
)
516 def test_undelete_cross_nc(self
):
517 print "Cross NC undelete"
518 c1
= "cn=ldaptestcontainer," + self
.base_dn
519 c2
= "cn=ldaptestcontainer2," + self
.configuration_dn
520 c3
= "cn=ldaptestcontainer," + self
.configuration_dn
521 c4
= "cn=ldaptestcontainer2," + self
.base_dn
524 "objectclass": "container"})
527 "objectclass": "container"})
528 objLive1
= self
.search_dn(c1
)
529 objLive2
= self
.search_dn(c2
)
530 guid1
=objLive1
["objectGUID"][0]
531 guid2
=objLive2
["objectGUID"][0]
534 objDeleted1
= self
.search_guid(guid1
)
535 objDeleted2
= self
.search_guid(guid2
)
536 #try to undelete from base dn to config
538 self
.undelete_deleted(str(objDeleted1
.dn
), c3
, ldb
)
540 except LdbError
, (num
, _
):
541 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
542 #try to undelete from config to base dn
544 self
.undelete_deleted(str(objDeleted2
.dn
), c4
, ldb
)
546 except LdbError
, (num
, _
):
547 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
548 #assert undeletion will work in same nc
549 self
.undelete_deleted(str(objDeleted1
.dn
), c4
, ldb
)
550 self
.undelete_deleted(str(objDeleted2
.dn
), c3
, ldb
)
551 delete_force(self
.ldb
, c3
)
552 delete_force(self
.ldb
, c4
)
556 if not "://" in host
:
557 if os
.path
.isfile(host
):
558 host
= "tdb://%s" % host
560 host
= "ldap://%s" % host
562 TestProgram(module
=__name__
, opts
=subunitopts
)