2 # -*- coding: utf-8 -*-
8 sys
.path
.insert(0, "bin/python")
11 from samba
.tests
.subunitrun
import SubunitOptions
, TestProgram
13 import samba
.getopt
as options
15 from samba
.auth
import system_session
16 from ldb
import SCOPE_BASE
, LdbError
, Message
, MessageElement
, Dn
, FLAG_MOD_ADD
, FLAG_MOD_DELETE
, FLAG_MOD_REPLACE
17 from ldb
import ERR_NO_SUCH_OBJECT
, ERR_NOT_ALLOWED_ON_NON_LEAF
, ERR_ENTRY_ALREADY_EXISTS
, ERR_ATTRIBUTE_OR_VALUE_EXISTS
18 from ldb
import ERR_UNWILLING_TO_PERFORM
, ERR_OPERATIONS_ERROR
19 from samba
.samdb
import SamDB
20 from samba
.tests
import delete_force
22 parser
= optparse
.OptionParser("deletetest.py [options] <host|file>")
23 sambaopts
= options
.SambaOptions(parser
)
24 parser
.add_option_group(sambaopts
)
25 parser
.add_option_group(options
.VersionOptions(parser
))
26 # use command line creds if available
27 credopts
= options
.CredentialsOptions(parser
)
28 parser
.add_option_group(credopts
)
29 subunitopts
= SubunitOptions(parser
)
30 parser
.add_option_group(subunitopts
)
31 opts
, args
= parser
.parse_args()
39 lp
= sambaopts
.get_loadparm()
40 creds
= credopts
.get_credentials(lp
)
42 class BaseDeleteTests(samba
.tests
.TestCase
):
44 def GUID_string(self
, guid
):
45 return self
.ldb
.schema_format_value("objectGUID", guid
)
48 super(BaseDeleteTests
, self
).setUp()
49 self
.ldb
= SamDB(host
, credentials
=creds
, session_info
=system_session(lp
), lp
=lp
)
51 self
.base_dn
= self
.ldb
.domain_dn()
52 self
.configuration_dn
= self
.ldb
.get_config_basedn().get_linearized()
54 def search_guid(self
, guid
):
55 print "SEARCH by GUID %s" % self
.GUID_string(guid
)
57 res
= self
.ldb
.search(base
="<GUID=%s>" % self
.GUID_string(guid
),
58 scope
=SCOPE_BASE
, controls
=["show_deleted:1"])
59 self
.assertEquals(len(res
), 1)
62 def search_dn(self
,dn
):
63 print "SEARCH by DN %s" % dn
65 res
= self
.ldb
.search(expression
="(objectClass=*)",
68 controls
=["show_deleted:1"])
69 self
.assertEquals(len(res
), 1)
73 class BasicDeleteTests(BaseDeleteTests
):
76 super(BasicDeleteTests
, self
).setUp()
78 def del_attr_values(self
, delObj
):
79 print "Checking attributes for %s" % delObj
["dn"]
81 self
.assertEquals(delObj
["isDeleted"][0],"TRUE")
82 self
.assertTrue(not("objectCategory" in delObj
))
83 self
.assertTrue(not("sAMAccountType" in delObj
))
85 def preserved_attributes_list(self
, liveObj
, delObj
):
86 print "Checking for preserved attributes list"
88 preserved_list
= ["nTSecurityDescriptor", "attributeID", "attributeSyntax", "dNReferenceUpdate", "dNSHostName",
89 "flatName", "governsID", "groupType", "instanceType", "lDAPDisplayName", "legacyExchangeDN",
90 "isDeleted", "isRecycled", "lastKnownParent", "msDS-LastKnownRDN", "mS-DS-CreatorSID",
91 "mSMQOwnerID", "nCName", "objectClass", "distinguishedName", "objectGUID", "objectSid",
92 "oMSyntax", "proxiedObjectName", "name", "replPropertyMetaData", "sAMAccountName",
93 "securityIdentifier", "sIDHistory", "subClassOf", "systemFlags", "trustPartner", "trustDirection",
94 "trustType", "trustAttributes", "userAccountControl", "uSNChanged", "uSNCreated", "whenCreated"]
97 if a
in preserved_list
:
98 self
.assertTrue(a
in delObj
)
100 def check_rdn(self
, liveObj
, delObj
, rdnName
):
101 print "Checking for correct rDN"
102 rdn
=liveObj
[rdnName
][0]
103 rdn2
=delObj
[rdnName
][0]
104 name2
=delObj
[rdnName
][0]
105 guid
=liveObj
["objectGUID"][0]
106 self
.assertEquals(rdn2
, rdn
+ "\nDEL:" + self
.GUID_string(guid
))
107 self
.assertEquals(name2
, rdn
+ "\nDEL:" + self
.GUID_string(guid
))
109 def delete_deleted(self
, ldb
, dn
):
110 print "Testing the deletion of the already deleted dn %s" % dn
115 except LdbError
, (num
, _
):
116 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
118 def test_delete_protection(self
):
119 """Delete protection tests"""
123 delete_force(self
.ldb
, "cn=entry1,cn=ldaptestcontainer," + self
.base_dn
)
124 delete_force(self
.ldb
, "cn=entry2,cn=ldaptestcontainer," + self
.base_dn
)
125 delete_force(self
.ldb
, "cn=ldaptestcontainer," + self
.base_dn
)
128 "dn": "cn=ldaptestcontainer," + self
.base_dn
,
129 "objectclass": "container"})
131 "dn": "cn=entry1,cn=ldaptestcontainer," + self
.base_dn
,
132 "objectclass": "container"})
134 "dn": "cn=entry2,cn=ldaptestcontainer," + self
.base_dn
,
135 "objectclass": "container"})
138 self
.ldb
.delete("cn=ldaptestcontainer," + self
.base_dn
)
140 except LdbError
, (num
, _
):
141 self
.assertEquals(num
, ERR_NOT_ALLOWED_ON_NON_LEAF
)
143 self
.ldb
.delete("cn=ldaptestcontainer," + self
.base_dn
, ["tree_delete:1"])
146 res
= self
.ldb
.search("cn=ldaptestcontainer," + self
.base_dn
,
147 scope
=SCOPE_BASE
, attrs
=[])
149 except LdbError
, (num
, _
):
150 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
152 res
= self
.ldb
.search("cn=entry1,cn=ldaptestcontainer," + self
.base_dn
,
153 scope
=SCOPE_BASE
, attrs
=[])
155 except LdbError
, (num
, _
):
156 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
158 res
= self
.ldb
.search("cn=entry2,cn=ldaptestcontainer," + self
.base_dn
,
159 scope
=SCOPE_BASE
, attrs
=[])
161 except LdbError
, (num
, _
):
162 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
164 delete_force(self
.ldb
, "cn=entry1,cn=ldaptestcontainer," + self
.base_dn
)
165 delete_force(self
.ldb
, "cn=entry2,cn=ldaptestcontainer," + self
.base_dn
)
166 delete_force(self
.ldb
, "cn=ldaptestcontainer," + self
.base_dn
)
168 # Performs some protected object delete testing
170 res
= self
.ldb
.search(base
="", expression
="", scope
=SCOPE_BASE
,
171 attrs
=["dsServiceName", "dNSHostName"])
172 self
.assertEquals(len(res
), 1)
174 # Delete failing since DC's nTDSDSA object is protected
176 self
.ldb
.delete(res
[0]["dsServiceName"][0])
178 except LdbError
, (num
, _
):
179 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
181 res
= self
.ldb
.search(self
.base_dn
, attrs
=["rIDSetReferences"],
182 expression
="(&(objectClass=computer)(dNSHostName=" + res
[0]["dNSHostName"][0] + "))")
183 self
.assertEquals(len(res
), 1)
185 # Deletes failing since DC's rIDSet object is protected
187 self
.ldb
.delete(res
[0]["rIDSetReferences"][0])
189 except LdbError
, (num
, _
):
190 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
192 self
.ldb
.delete(res
[0]["rIDSetReferences"][0], ["tree_delete:1"])
194 except LdbError
, (num
, _
):
195 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
197 # Deletes failing since three main crossRef objects are protected
200 self
.ldb
.delete("cn=Enterprise Schema,cn=Partitions," + self
.configuration_dn
)
202 except LdbError
, (num
, _
):
203 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
205 self
.ldb
.delete("cn=Enterprise Schema,cn=Partitions," + self
.configuration_dn
, ["tree_delete:1"])
207 except LdbError
, (num
, _
):
208 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
211 self
.ldb
.delete("cn=Enterprise Configuration,cn=Partitions," + self
.configuration_dn
)
213 except LdbError
, (num
, _
):
214 self
.assertEquals(num
, ERR_NOT_ALLOWED_ON_NON_LEAF
)
216 self
.ldb
.delete("cn=Enterprise Configuration,cn=Partitions," + self
.configuration_dn
, ["tree_delete:1"])
218 except LdbError
, (num
, _
):
219 self
.assertEquals(num
, ERR_NOT_ALLOWED_ON_NON_LEAF
)
221 res
= self
.ldb
.search("cn=Partitions," + self
.configuration_dn
, attrs
=[],
222 expression
="(nCName=%s)" % self
.base_dn
)
223 self
.assertEquals(len(res
), 1)
226 self
.ldb
.delete(res
[0].dn
)
228 except LdbError
, (num
, _
):
229 self
.assertEquals(num
, ERR_NOT_ALLOWED_ON_NON_LEAF
)
231 self
.ldb
.delete(res
[0].dn
, ["tree_delete:1"])
233 except LdbError
, (num
, _
):
234 self
.assertEquals(num
, ERR_NOT_ALLOWED_ON_NON_LEAF
)
236 # Delete failing since "SYSTEM_FLAG_DISALLOW_DELETE"
238 self
.ldb
.delete("CN=Users," + self
.base_dn
)
240 except LdbError
, (num
, _
):
241 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
243 # Tree-delete failing since "isCriticalSystemObject"
245 self
.ldb
.delete("CN=Computers," + self
.base_dn
, ["tree_delete:1"])
247 except LdbError
, (num
, _
):
248 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
251 """Basic delete tests"""
255 # user current time in ms to make unique objects
257 marker
= str(int(round(time
.time()*1000)))
258 usr1_name
= "u_" + marker
259 usr2_name
= "u2_" + marker
260 grp_name
= "g1_" + marker
261 site_name
= "s1_" + marker
263 usr1
= "cn=%s,cn=users,%s" % (usr1_name
, self
.base_dn
)
264 usr2
= "cn=%s,cn=users,%s" % (usr2_name
, self
.base_dn
)
265 grp1
= "cn=%s,cn=users,%s" % (grp_name
, self
.base_dn
)
266 sit1
= "cn=%s,cn=sites,%s" % (site_name
, self
.configuration_dn
)
267 ss1
= "cn=NTDS Site Settings,cn=%s,cn=sites,%s" % (site_name
, self
.configuration_dn
)
268 srv1
= "cn=Servers,cn=%s,cn=sites,%s" % (site_name
, self
.configuration_dn
)
269 srv2
= "cn=TESTSRV,cn=Servers,cn=%s,cn=sites,%s" % (site_name
, self
.configuration_dn
)
271 delete_force(self
.ldb
, usr1
)
272 delete_force(self
.ldb
, usr2
)
273 delete_force(self
.ldb
, grp1
)
274 delete_force(self
.ldb
, ss1
)
275 delete_force(self
.ldb
, srv2
)
276 delete_force(self
.ldb
, srv1
)
277 delete_force(self
.ldb
, sit1
)
281 "objectclass": "user",
282 "description": "test user description",
283 "samaccountname": usr1_name
})
287 "objectclass": "user",
288 "description": "test user 2 description",
289 "samaccountname": usr2_name
})
293 "objectclass": "group",
294 "description": "test group",
295 "samaccountname": grp_name
,
296 "member": [ usr1
, usr2
],
297 "isDeleted": "FALSE" })
301 "objectclass": "site" })
305 "objectclass": ["applicationSiteSettings", "nTDSSiteSettings"] })
309 "objectclass": "serversContainer" })
313 "objectClass": "server" })
315 objLive1
= self
.search_dn(usr1
)
316 guid1
=objLive1
["objectGUID"][0]
318 objLive2
= self
.search_dn(usr2
)
319 guid2
=objLive2
["objectGUID"][0]
321 objLive3
= self
.search_dn(grp1
)
322 guid3
=objLive3
["objectGUID"][0]
324 objLive4
= self
.search_dn(sit1
)
325 guid4
=objLive4
["objectGUID"][0]
327 objLive5
= self
.search_dn(ss1
)
328 guid5
=objLive5
["objectGUID"][0]
330 objLive6
= self
.search_dn(srv1
)
331 guid6
=objLive6
["objectGUID"][0]
333 objLive7
= self
.search_dn(srv2
)
334 guid7
=objLive7
["objectGUID"][0]
336 self
.ldb
.delete(usr1
)
337 self
.ldb
.delete(usr2
)
338 self
.ldb
.delete(grp1
)
339 self
.ldb
.delete(srv1
, ["tree_delete:1"])
340 self
.ldb
.delete(sit1
, ["tree_delete:1"])
342 objDeleted1
= self
.search_guid(guid1
)
343 objDeleted2
= self
.search_guid(guid2
)
344 objDeleted3
= self
.search_guid(guid3
)
345 objDeleted4
= self
.search_guid(guid4
)
346 objDeleted5
= self
.search_guid(guid5
)
347 objDeleted6
= self
.search_guid(guid6
)
348 objDeleted7
= self
.search_guid(guid7
)
350 self
.del_attr_values(objDeleted1
)
351 self
.del_attr_values(objDeleted2
)
352 self
.del_attr_values(objDeleted3
)
353 self
.del_attr_values(objDeleted4
)
354 self
.del_attr_values(objDeleted5
)
355 self
.del_attr_values(objDeleted6
)
356 self
.del_attr_values(objDeleted7
)
358 self
.preserved_attributes_list(objLive1
, objDeleted1
)
359 self
.preserved_attributes_list(objLive2
, objDeleted2
)
360 self
.preserved_attributes_list(objLive3
, objDeleted3
)
361 self
.preserved_attributes_list(objLive4
, objDeleted4
)
362 self
.preserved_attributes_list(objLive5
, objDeleted5
)
363 self
.preserved_attributes_list(objLive6
, objDeleted6
)
364 self
.preserved_attributes_list(objLive7
, objDeleted7
)
366 self
.check_rdn(objLive1
, objDeleted1
, "cn")
367 self
.check_rdn(objLive2
, objDeleted2
, "cn")
368 self
.check_rdn(objLive3
, objDeleted3
, "cn")
369 self
.check_rdn(objLive4
, objDeleted4
, "cn")
370 self
.check_rdn(objLive5
, objDeleted5
, "cn")
371 self
.check_rdn(objLive6
, objDeleted6
, "cn")
372 self
.check_rdn(objLive7
, objDeleted7
, "cn")
374 self
.delete_deleted(self
.ldb
, usr1
)
375 self
.delete_deleted(self
.ldb
, usr2
)
376 self
.delete_deleted(self
.ldb
, grp1
)
377 self
.delete_deleted(self
.ldb
, sit1
)
378 self
.delete_deleted(self
.ldb
, ss1
)
379 self
.delete_deleted(self
.ldb
, srv1
)
380 self
.delete_deleted(self
.ldb
, srv2
)
382 self
.assertTrue("CN=Deleted Objects" in str(objDeleted1
.dn
))
383 self
.assertTrue("CN=Deleted Objects" in str(objDeleted2
.dn
))
384 self
.assertTrue("CN=Deleted Objects" in str(objDeleted3
.dn
))
385 self
.assertFalse("CN=Deleted Objects" in str(objDeleted4
.dn
))
386 self
.assertTrue("CN=Deleted Objects" in str(objDeleted5
.dn
))
387 self
.assertFalse("CN=Deleted Objects" in str(objDeleted6
.dn
))
388 self
.assertFalse("CN=Deleted Objects" in str(objDeleted7
.dn
))
390 class BasicUndeleteTests(BaseDeleteTests
):
393 super(BasicUndeleteTests
, self
).setUp()
395 def enable_recycle_bin(self
):
398 msg
["enableOptionalFeature"] = MessageElement(
399 "CN=Partitions," + self
.configuration_dn
+ ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
400 FLAG_MOD_ADD
, "enableOptionalFeature")
403 except LdbError
, (num
, _
):
404 self
.assertEquals(num
, ERR_ATTRIBUTE_OR_VALUE_EXISTS
)
406 def undelete_deleted(self
, olddn
, newdn
, samldb
):
408 msg
.dn
= Dn(ldb
, olddn
)
409 msg
["isDeleted"] = MessageElement([], FLAG_MOD_DELETE
, "isDeleted")
410 msg
["distinguishedName"] = MessageElement([newdn
], FLAG_MOD_REPLACE
, "distinguishedName")
411 res
= samldb
.modify(msg
, ["show_deleted:1"])
413 def undelete_deleted_with_mod(self
, olddn
, newdn
):
415 msg
.dn
= Dn(ldb
, olddn
)
416 msg
["isDeleted"] = MessageElement([], FLAG_MOD_DELETE
, "isDeleted")
417 msg
["distinguishedName"] = MessageElement([newdn
], FLAG_MOD_REPLACE
, "distinguishedName")
418 msg
["url"] = MessageElement(["www.samba.org"], FLAG_MOD_REPLACE
, "url")
419 res
= ldb
.modify(msg
, ["show_deleted:1"])
422 def test_undelete(self
):
423 print "Testing standard undelete operation"
424 usr1
="cn=testuser,cn=users," + self
.base_dn
425 delete_force(self
.ldb
, usr1
)
428 "objectclass": "user",
429 "description": "test user description",
430 "samaccountname": "testuser"})
431 objLive1
= self
.search_dn(usr1
)
432 guid1
=objLive1
["objectGUID"][0]
434 objDeleted1
= self
.search_guid(guid1
)
435 self
.undelete_deleted(str(objDeleted1
.dn
), usr1
, ldb
)
436 objLive2
= self
.search_dn(usr1
)
437 self
.assertEqual(str(objLive2
.dn
),str(objLive1
.dn
))
438 delete_force(self
.ldb
, usr1
)
440 def test_rename(self
):
441 print "Testing attempt to rename deleted object"
442 usr1
="cn=testuser,cn=users," + self
.base_dn
445 "objectclass": "user",
446 "description": "test user description",
447 "samaccountname": "testuser"})
448 objLive1
= self
.search_dn(usr1
)
449 guid1
=objLive1
["objectGUID"][0]
451 objDeleted1
= self
.search_guid(guid1
)
452 #just to make sure we get the correct error if the show deleted is missing
454 ldb
.rename(str(objDeleted1
.dn
), usr1
)
456 except LdbError
, (num
, _
):
457 self
.assertEquals(num
,ERR_NO_SUCH_OBJECT
)
460 ldb
.rename(str(objDeleted1
.dn
), usr1
, ["show_deleted:1"])
462 except LdbError
, (num
, _
):
463 self
.assertEquals(num
,ERR_UNWILLING_TO_PERFORM
)
465 def test_undelete_with_mod(self
):
466 print "Testing standard undelete operation with modification of additional attributes"
467 usr1
="cn=testuser,cn=users," + self
.base_dn
470 "objectclass": "user",
471 "description": "test user description",
472 "samaccountname": "testuser"})
473 objLive1
= self
.search_dn(usr1
)
474 guid1
=objLive1
["objectGUID"][0]
476 objDeleted1
= self
.search_guid(guid1
)
477 self
.undelete_deleted_with_mod(str(objDeleted1
.dn
), usr1
)
478 objLive2
= self
.search_dn(usr1
)
479 self
.assertEqual(objLive2
["url"][0],"www.samba.org")
480 delete_force(self
.ldb
, usr1
)
482 def test_undelete_newuser(self
):
483 print "Testing undelete user with a different dn"
484 usr1
="cn=testuser,cn=users," + self
.base_dn
485 usr2
="cn=testuser2,cn=users," + self
.base_dn
486 delete_force(self
.ldb
, usr1
)
489 "objectclass": "user",
490 "description": "test user description",
491 "samaccountname": "testuser"})
492 objLive1
= self
.search_dn(usr1
)
493 guid1
=objLive1
["objectGUID"][0]
495 objDeleted1
= self
.search_guid(guid1
)
496 self
.undelete_deleted(str(objDeleted1
.dn
), usr2
, ldb
)
497 objLive2
= self
.search_dn(usr2
)
498 delete_force(self
.ldb
, usr1
)
499 delete_force(self
.ldb
, usr2
)
501 def test_undelete_existing(self
):
502 print "Testing undelete user after a user with the same dn has been created"
503 usr1
="cn=testuser,cn=users," + self
.base_dn
506 "objectclass": "user",
507 "description": "test user description",
508 "samaccountname": "testuser"})
509 objLive1
= self
.search_dn(usr1
)
510 guid1
=objLive1
["objectGUID"][0]
514 "objectclass": "user",
515 "description": "test user description",
516 "samaccountname": "testuser"})
517 objDeleted1
= self
.search_guid(guid1
)
519 self
.undelete_deleted(str(objDeleted1
.dn
), usr1
, ldb
)
521 except LdbError
, (num
, _
):
522 self
.assertEquals(num
,ERR_ENTRY_ALREADY_EXISTS
)
524 def test_undelete_cross_nc(self
):
525 print "Cross NC undelete"
526 c1
= "cn=ldaptestcontainer," + self
.base_dn
527 c2
= "cn=ldaptestcontainer2," + self
.configuration_dn
528 c3
= "cn=ldaptestcontainer," + self
.configuration_dn
529 c4
= "cn=ldaptestcontainer2," + self
.base_dn
532 "objectclass": "container"})
535 "objectclass": "container"})
536 objLive1
= self
.search_dn(c1
)
537 objLive2
= self
.search_dn(c2
)
538 guid1
=objLive1
["objectGUID"][0]
539 guid2
=objLive2
["objectGUID"][0]
542 objDeleted1
= self
.search_guid(guid1
)
543 objDeleted2
= self
.search_guid(guid2
)
544 #try to undelete from base dn to config
546 self
.undelete_deleted(str(objDeleted1
.dn
), c3
, ldb
)
548 except LdbError
, (num
, _
):
549 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
550 #try to undelete from config to base dn
552 self
.undelete_deleted(str(objDeleted2
.dn
), c4
, ldb
)
554 except LdbError
, (num
, _
):
555 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
556 #assert undeletion will work in same nc
557 self
.undelete_deleted(str(objDeleted1
.dn
), c4
, ldb
)
558 self
.undelete_deleted(str(objDeleted2
.dn
), c3
, ldb
)
559 delete_force(self
.ldb
, c3
)
560 delete_force(self
.ldb
, c4
)
564 if not "://" in host
:
565 if os
.path
.isfile(host
):
566 host
= "tdb://%s" % host
568 host
= "ldap://%s" % host
570 TestProgram(module
=__name__
, opts
=subunitopts
)