2 # -*- coding: utf-8 -*-
8 sys
.path
.insert(0, "bin/python")
11 from samba
.tests
.subunitrun
import SubunitOptions
, TestProgram
13 import samba
.getopt
as options
15 from samba
.auth
import system_session
16 from ldb
import SCOPE_BASE
, LdbError
17 from ldb
import ERR_NO_SUCH_OBJECT
, ERR_NOT_ALLOWED_ON_NON_LEAF
18 from ldb
import ERR_UNWILLING_TO_PERFORM
19 from samba
.samdb
import SamDB
20 from samba
.tests
import delete_force
21 from samba
import dsdb
22 from samba
.common
import get_string
24 parser
= optparse
.OptionParser("deletetest.py [options] <host|file>")
25 sambaopts
= options
.SambaOptions(parser
)
26 parser
.add_option_group(sambaopts
)
27 parser
.add_option_group(options
.VersionOptions(parser
))
28 # use command line creds if available
29 credopts
= options
.CredentialsOptions(parser
)
30 parser
.add_option_group(credopts
)
31 subunitopts
= SubunitOptions(parser
)
32 parser
.add_option_group(subunitopts
)
33 opts
, args
= parser
.parse_args()
41 lp
= sambaopts
.get_loadparm()
42 creds
= credopts
.get_credentials(lp
)
45 class BaseDeleteTests(samba
.tests
.TestCase
):
47 def GUID_string(self
, guid
):
48 return get_string(self
.ldb
.schema_format_value("objectGUID", guid
))
51 super(BaseDeleteTests
, self
).setUp()
52 self
.ldb
= SamDB(host
, credentials
=creds
, session_info
=system_session(lp
), lp
=lp
)
54 self
.base_dn
= self
.ldb
.domain_dn()
55 self
.configuration_dn
= self
.ldb
.get_config_basedn().get_linearized()
57 def search_guid(self
, guid
):
58 print("SEARCH by GUID %s" % self
.GUID_string(guid
))
60 res
= self
.ldb
.search(base
="<GUID=%s>" % self
.GUID_string(guid
),
62 controls
=["show_deleted:1"],
63 attrs
=["*", "parentGUID"])
64 self
.assertEqual(len(res
), 1)
67 def search_dn(self
, dn
):
68 print("SEARCH by DN %s" % dn
)
70 res
= self
.ldb
.search(expression
="(objectClass=*)",
73 controls
=["show_deleted:1"],
74 attrs
=["*", "parentGUID"])
75 self
.assertEqual(len(res
), 1)
79 class BasicDeleteTests(BaseDeleteTests
):
82 super(BasicDeleteTests
, self
).setUp()
84 def del_attr_values(self
, delObj
):
85 print("Checking attributes for %s" % delObj
["dn"])
87 self
.assertEqual(str(delObj
["isDeleted"][0]), "TRUE")
88 self
.assertTrue(not("objectCategory" in delObj
))
89 self
.assertTrue(not("sAMAccountType" in delObj
))
91 def preserved_attributes_list(self
, liveObj
, delObj
):
92 print("Checking for preserved attributes list")
94 preserved_list
= ["nTSecurityDescriptor", "attributeID", "attributeSyntax", "dNReferenceUpdate", "dNSHostName",
95 "flatName", "governsID", "groupType", "instanceType", "lDAPDisplayName", "legacyExchangeDN",
96 "isDeleted", "isRecycled", "lastKnownParent", "msDS-LastKnownRDN", "mS-DS-CreatorSID",
97 "mSMQOwnerID", "nCName", "objectClass", "distinguishedName", "objectGUID", "objectSid",
98 "oMSyntax", "proxiedObjectName", "name", "replPropertyMetaData", "sAMAccountName",
99 "securityIdentifier", "sIDHistory", "subClassOf", "systemFlags", "trustPartner", "trustDirection",
100 "trustType", "trustAttributes", "userAccountControl", "uSNChanged", "uSNCreated", "whenCreated"]
103 if a
in preserved_list
:
104 self
.assertTrue(a
in delObj
)
106 def check_rdn(self
, liveObj
, delObj
, rdnName
):
107 print("Checking for correct rDN")
108 rdn
= liveObj
[rdnName
][0]
109 rdn2
= delObj
[rdnName
][0]
110 name2
= delObj
["name"][0]
111 dn_rdn
= delObj
.dn
.get_rdn_value()
112 guid
= liveObj
["objectGUID"][0]
113 self
.assertEqual(str(rdn2
), ("%s\nDEL:%s" % (rdn
, self
.GUID_string(guid
))))
114 self
.assertEqual(str(name2
), ("%s\nDEL:%s" % (rdn
, self
.GUID_string(guid
))))
115 self
.assertEqual(str(name2
), dn_rdn
)
117 def delete_deleted(self
, ldb
, dn
):
118 print("Testing the deletion of the already deleted dn %s" % dn
)
123 except LdbError
as e
:
125 self
.assertEqual(num
, ERR_NO_SUCH_OBJECT
)
127 def test_delete_protection(self
):
128 """Delete protection tests"""
132 delete_force(self
.ldb
, "cn=entry1,cn=ldaptestcontainer," + self
.base_dn
)
133 delete_force(self
.ldb
, "cn=entry2,cn=ldaptestcontainer," + self
.base_dn
)
134 delete_force(self
.ldb
, "cn=ldaptestcontainer," + self
.base_dn
)
137 "dn": "cn=ldaptestcontainer," + self
.base_dn
,
138 "objectclass": "container"})
140 "dn": "cn=entry1,cn=ldaptestcontainer," + self
.base_dn
,
141 "objectclass": "container"})
143 "dn": "cn=entry2,cn=ldaptestcontainer," + self
.base_dn
,
144 "objectclass": "container"})
147 self
.ldb
.delete("cn=ldaptestcontainer," + self
.base_dn
)
149 except LdbError
as e1
:
151 self
.assertEqual(num
, ERR_NOT_ALLOWED_ON_NON_LEAF
)
153 self
.ldb
.delete("cn=ldaptestcontainer," + self
.base_dn
, ["tree_delete:1"])
156 res
= self
.ldb
.search("cn=ldaptestcontainer," + self
.base_dn
,
157 scope
=SCOPE_BASE
, attrs
=[])
159 except LdbError
as e2
:
161 self
.assertEqual(num
, ERR_NO_SUCH_OBJECT
)
163 res
= self
.ldb
.search("cn=entry1,cn=ldaptestcontainer," + self
.base_dn
,
164 scope
=SCOPE_BASE
, attrs
=[])
166 except LdbError
as e3
:
168 self
.assertEqual(num
, ERR_NO_SUCH_OBJECT
)
170 res
= self
.ldb
.search("cn=entry2,cn=ldaptestcontainer," + self
.base_dn
,
171 scope
=SCOPE_BASE
, attrs
=[])
173 except LdbError
as e4
:
175 self
.assertEqual(num
, ERR_NO_SUCH_OBJECT
)
177 delete_force(self
.ldb
, "cn=entry1,cn=ldaptestcontainer," + self
.base_dn
)
178 delete_force(self
.ldb
, "cn=entry2,cn=ldaptestcontainer," + self
.base_dn
)
179 delete_force(self
.ldb
, "cn=ldaptestcontainer," + self
.base_dn
)
181 # Performs some protected object delete testing
183 res
= self
.ldb
.search(base
="", expression
="", scope
=SCOPE_BASE
,
184 attrs
=["dsServiceName", "dNSHostName"])
185 self
.assertEqual(len(res
), 1)
187 # Delete failing since DC's nTDSDSA object is protected
189 self
.ldb
.delete(res
[0]["dsServiceName"][0])
191 except LdbError
as e5
:
193 self
.assertEqual(num
, ERR_UNWILLING_TO_PERFORM
)
195 res
= self
.ldb
.search(self
.base_dn
, attrs
=["rIDSetReferences"],
196 expression
="(&(objectClass=computer)(dNSHostName=" + str(res
[0]["dNSHostName"][0]) + "))")
197 self
.assertEqual(len(res
), 1)
199 # Deletes failing since DC's rIDSet object is protected
201 self
.ldb
.delete(res
[0]["rIDSetReferences"][0])
203 except LdbError
as e6
:
205 self
.assertEqual(num
, ERR_UNWILLING_TO_PERFORM
)
207 self
.ldb
.delete(res
[0]["rIDSetReferences"][0], ["tree_delete:1"])
209 except LdbError
as e7
:
211 self
.assertEqual(num
, ERR_UNWILLING_TO_PERFORM
)
213 # Deletes failing since three main crossRef objects are protected
216 self
.ldb
.delete("cn=Enterprise Schema,cn=Partitions," + self
.configuration_dn
)
218 except LdbError
as e8
:
220 self
.assertEqual(num
, ERR_UNWILLING_TO_PERFORM
)
222 self
.ldb
.delete("cn=Enterprise Schema,cn=Partitions," + self
.configuration_dn
, ["tree_delete:1"])
224 except LdbError
as e9
:
226 self
.assertEqual(num
, ERR_UNWILLING_TO_PERFORM
)
229 self
.ldb
.delete("cn=Enterprise Configuration,cn=Partitions," + self
.configuration_dn
)
231 except LdbError
as e10
:
233 self
.assertEqual(num
, ERR_NOT_ALLOWED_ON_NON_LEAF
)
235 self
.ldb
.delete("cn=Enterprise Configuration,cn=Partitions," + self
.configuration_dn
, ["tree_delete:1"])
237 except LdbError
as e11
:
239 self
.assertEqual(num
, ERR_NOT_ALLOWED_ON_NON_LEAF
)
241 res
= self
.ldb
.search("cn=Partitions," + self
.configuration_dn
, attrs
=[],
242 expression
="(nCName=%s)" % self
.base_dn
)
243 self
.assertEqual(len(res
), 1)
246 self
.ldb
.delete(res
[0].dn
)
248 except LdbError
as e12
:
250 self
.assertEqual(num
, ERR_NOT_ALLOWED_ON_NON_LEAF
)
252 self
.ldb
.delete(res
[0].dn
, ["tree_delete:1"])
254 except LdbError
as e13
:
256 self
.assertEqual(num
, ERR_NOT_ALLOWED_ON_NON_LEAF
)
258 # Delete failing since "SYSTEM_FLAG_DISALLOW_DELETE"
260 self
.ldb
.delete("CN=Users," + self
.base_dn
)
262 except LdbError
as e14
:
264 self
.assertEqual(num
, ERR_UNWILLING_TO_PERFORM
)
266 # Tree-delete failing since "isCriticalSystemObject"
268 self
.ldb
.delete("CN=Computers," + self
.base_dn
, ["tree_delete:1"])
270 except LdbError
as e15
:
272 self
.assertEqual(num
, ERR_UNWILLING_TO_PERFORM
)
275 class BasicTreeDeleteTests(BasicDeleteTests
):
278 super(BasicTreeDeleteTests
, self
).setUp()
280 # user current time in ms to make unique objects
282 marker
= str(int(round(time
.time() * 1000)))
283 usr1_name
= "u_" + marker
284 usr2_name
= "u2_" + marker
285 grp_name
= "g1_" + marker
286 site_name
= "s1_" + marker
288 self
.usr1
= "cn=%s,cn=users,%s" % (usr1_name
, self
.base_dn
)
289 self
.usr2
= "cn=%s,cn=users,%s" % (usr2_name
, self
.base_dn
)
290 self
.grp1
= "cn=%s,cn=users,%s" % (grp_name
, self
.base_dn
)
291 self
.sit1
= "cn=%s,cn=sites,%s" % (site_name
, self
.configuration_dn
)
292 self
.ss1
= "cn=NTDS Site Settings,cn=%s,cn=sites,%s" % (site_name
, self
.configuration_dn
)
293 self
.srv1
= "cn=Servers,cn=%s,cn=sites,%s" % (site_name
, self
.configuration_dn
)
294 self
.srv2
= "cn=TESTSRV,cn=Servers,cn=%s,cn=sites,%s" % (site_name
, self
.configuration_dn
)
296 delete_force(self
.ldb
, self
.usr1
)
297 delete_force(self
.ldb
, self
.usr2
)
298 delete_force(self
.ldb
, self
.grp1
)
299 delete_force(self
.ldb
, self
.ss1
)
300 delete_force(self
.ldb
, self
.srv2
)
301 delete_force(self
.ldb
, self
.srv1
)
302 delete_force(self
.ldb
, self
.sit1
)
306 "objectclass": "user",
307 "description": "test user description",
308 "samaccountname": usr1_name
})
312 "objectclass": "user",
313 "description": "test user 2 description",
314 "samaccountname": usr2_name
})
318 "objectclass": "group",
319 "description": "test group",
320 "samaccountname": grp_name
,
321 "member": [self
.usr1
, self
.usr2
],
322 "isDeleted": "FALSE"})
326 "objectclass": "site"})
330 "objectclass": ["applicationSiteSettings", "nTDSSiteSettings"]})
334 "objectclass": "serversContainer"})
338 "objectClass": "server"})
340 self
.objLive1
= self
.search_dn(self
.usr1
)
341 self
.guid1
= self
.objLive1
["objectGUID"][0]
343 self
.objLive2
= self
.search_dn(self
.usr2
)
344 self
.guid2
= self
.objLive2
["objectGUID"][0]
346 self
.objLive3
= self
.search_dn(self
.grp1
)
347 self
.guid3
= self
.objLive3
["objectGUID"][0]
349 self
.objLive4
= self
.search_dn(self
.sit1
)
350 self
.guid4
= self
.objLive4
["objectGUID"][0]
352 self
.objLive5
= self
.search_dn(self
.ss1
)
353 self
.guid5
= self
.objLive5
["objectGUID"][0]
355 self
.objLive6
= self
.search_dn(self
.srv1
)
356 self
.guid6
= self
.objLive6
["objectGUID"][0]
358 self
.objLive7
= self
.search_dn(self
.srv2
)
359 self
.guid7
= self
.objLive7
["objectGUID"][0]
361 self
.deleted_objects_config_dn \
362 = self
.ldb
.get_wellknown_dn(self
.ldb
.get_config_basedn(),
363 dsdb
.DS_GUID_DELETED_OBJECTS_CONTAINER
)
364 deleted_objects_config_obj \
365 = self
.search_dn(self
.deleted_objects_config_dn
)
367 self
.deleted_objects_config_guid \
368 = deleted_objects_config_obj
["objectGUID"][0]
370 self
.deleted_objects_domain_dn \
371 = self
.ldb
.get_wellknown_dn(self
.ldb
.get_default_basedn(),
372 dsdb
.DS_GUID_DELETED_OBJECTS_CONTAINER
)
373 deleted_objects_domain_obj \
374 = self
.search_dn(self
.deleted_objects_domain_dn
)
376 self
.deleted_objects_domain_guid \
377 = deleted_objects_domain_obj
["objectGUID"][0]
379 self
.deleted_objects_domain_dn \
380 = self
.ldb
.get_wellknown_dn(self
.ldb
.get_default_basedn(),
381 dsdb
.DS_GUID_DELETED_OBJECTS_CONTAINER
)
382 sites_obj
= self
.search_dn("cn=sites,%s"
383 % self
.ldb
.get_config_basedn())
384 self
.sites_dn
= sites_obj
.dn
386 = sites_obj
["objectGUID"][0]
389 """Basic delete tests"""
391 self
.ldb
.delete(self
.usr1
)
392 self
.ldb
.delete(self
.usr2
)
393 self
.ldb
.delete(self
.grp1
)
394 self
.ldb
.delete(self
.srv1
, ["tree_delete:1"])
395 self
.ldb
.delete(self
.sit1
, ["tree_delete:1"])
399 def test_tree_delete(self
):
400 """Basic delete tests,
401 but use just one tree delete for the config records
404 self
.ldb
.delete(self
.usr1
)
405 self
.ldb
.delete(self
.usr2
)
406 self
.ldb
.delete(self
.grp1
)
407 self
.ldb
.delete(self
.sit1
, ["tree_delete:1"])
412 objDeleted1
= self
.search_guid(self
.guid1
)
413 objDeleted2
= self
.search_guid(self
.guid2
)
414 objDeleted3
= self
.search_guid(self
.guid3
)
415 objDeleted4
= self
.search_guid(self
.guid4
)
416 objDeleted5
= self
.search_guid(self
.guid5
)
417 objDeleted6
= self
.search_guid(self
.guid6
)
418 objDeleted7
= self
.search_guid(self
.guid7
)
420 self
.del_attr_values(objDeleted1
)
421 self
.del_attr_values(objDeleted2
)
422 self
.del_attr_values(objDeleted3
)
423 self
.del_attr_values(objDeleted4
)
424 self
.del_attr_values(objDeleted5
)
425 self
.del_attr_values(objDeleted6
)
426 self
.del_attr_values(objDeleted7
)
428 self
.preserved_attributes_list(self
.objLive1
, objDeleted1
)
429 self
.preserved_attributes_list(self
.objLive2
, objDeleted2
)
430 self
.preserved_attributes_list(self
.objLive3
, objDeleted3
)
431 self
.preserved_attributes_list(self
.objLive4
, objDeleted4
)
432 self
.preserved_attributes_list(self
.objLive5
, objDeleted5
)
433 self
.preserved_attributes_list(self
.objLive6
, objDeleted6
)
434 self
.preserved_attributes_list(self
.objLive7
, objDeleted7
)
436 self
.check_rdn(self
.objLive1
, objDeleted1
, "cn")
437 self
.check_rdn(self
.objLive2
, objDeleted2
, "cn")
438 self
.check_rdn(self
.objLive3
, objDeleted3
, "cn")
439 self
.check_rdn(self
.objLive4
, objDeleted4
, "cn")
440 self
.check_rdn(self
.objLive5
, objDeleted5
, "cn")
441 self
.check_rdn(self
.objLive6
, objDeleted6
, "cn")
442 self
.check_rdn(self
.objLive7
, objDeleted7
, "cn")
444 self
.delete_deleted(self
.ldb
, self
.usr1
)
445 self
.delete_deleted(self
.ldb
, self
.usr2
)
446 self
.delete_deleted(self
.ldb
, self
.grp1
)
447 self
.delete_deleted(self
.ldb
, self
.sit1
)
448 self
.delete_deleted(self
.ldb
, self
.ss1
)
449 self
.delete_deleted(self
.ldb
, self
.srv1
)
450 self
.delete_deleted(self
.ldb
, self
.srv2
)
452 self
.assertTrue("CN=Deleted Objects" in str(objDeleted1
.dn
))
453 self
.assertEqual(objDeleted1
.dn
.parent(),
454 self
.deleted_objects_domain_dn
)
455 self
.assertEqual(objDeleted1
["parentGUID"][0],
456 self
.deleted_objects_domain_guid
)
458 self
.assertTrue("CN=Deleted Objects" in str(objDeleted2
.dn
))
459 self
.assertEqual(objDeleted2
.dn
.parent(),
460 self
.deleted_objects_domain_dn
)
461 self
.assertEqual(objDeleted2
["parentGUID"][0],
462 self
.deleted_objects_domain_guid
)
464 self
.assertTrue("CN=Deleted Objects" in str(objDeleted3
.dn
))
465 self
.assertEqual(objDeleted3
.dn
.parent(),
466 self
.deleted_objects_domain_dn
)
467 self
.assertEqual(objDeleted3
["parentGUID"][0],
468 self
.deleted_objects_domain_guid
)
470 self
.assertFalse("CN=Deleted Objects" in str(objDeleted4
.dn
))
471 self
.assertEqual(objDeleted4
.dn
.parent(),
473 self
.assertEqual(objDeleted4
["parentGUID"][0],
476 self
.assertTrue("CN=Deleted Objects" in str(objDeleted5
.dn
))
477 self
.assertEqual(objDeleted5
.dn
.parent(),
478 self
.deleted_objects_config_dn
)
479 self
.assertEqual(objDeleted5
["parentGUID"][0],
480 self
.deleted_objects_config_guid
)
482 self
.assertFalse("CN=Deleted Objects" in str(objDeleted6
.dn
))
483 self
.assertEqual(objDeleted6
.dn
.parent(),
485 self
.assertEqual(objDeleted6
["parentGUID"][0],
486 objDeleted4
["objectGUID"][0])
488 self
.assertFalse("CN=Deleted Objects" in str(objDeleted7
.dn
))
489 self
.assertEqual(objDeleted7
.dn
.parent(),
491 self
.assertEqual(objDeleted7
["parentGUID"][0],
492 objDeleted6
["objectGUID"][0])
494 objDeleted1
= self
.search_guid(self
.guid1
)
495 objDeleted2
= self
.search_guid(self
.guid2
)
496 objDeleted3
= self
.search_guid(self
.guid3
)
497 objDeleted4
= self
.search_guid(self
.guid4
)
498 objDeleted5
= self
.search_guid(self
.guid5
)
499 objDeleted6
= self
.search_guid(self
.guid6
)
500 objDeleted7
= self
.search_guid(self
.guid7
)
502 self
.del_attr_values(objDeleted1
)
503 self
.del_attr_values(objDeleted2
)
504 self
.del_attr_values(objDeleted3
)
505 self
.del_attr_values(objDeleted4
)
506 self
.del_attr_values(objDeleted5
)
507 self
.del_attr_values(objDeleted6
)
508 self
.del_attr_values(objDeleted7
)
510 self
.preserved_attributes_list(self
.objLive1
, objDeleted1
)
511 self
.preserved_attributes_list(self
.objLive2
, objDeleted2
)
512 self
.preserved_attributes_list(self
.objLive3
, objDeleted3
)
513 self
.preserved_attributes_list(self
.objLive4
, objDeleted4
)
514 self
.preserved_attributes_list(self
.objLive5
, objDeleted5
)
515 self
.preserved_attributes_list(self
.objLive6
, objDeleted6
)
516 self
.preserved_attributes_list(self
.objLive7
, objDeleted7
)
518 self
.check_rdn(self
.objLive1
, objDeleted1
, "cn")
519 self
.check_rdn(self
.objLive2
, objDeleted2
, "cn")
520 self
.check_rdn(self
.objLive3
, objDeleted3
, "cn")
521 self
.check_rdn(self
.objLive4
, objDeleted4
, "cn")
522 self
.check_rdn(self
.objLive5
, objDeleted5
, "cn")
523 self
.check_rdn(self
.objLive6
, objDeleted6
, "cn")
524 self
.check_rdn(self
.objLive7
, objDeleted7
, "cn")
526 self
.delete_deleted(self
.ldb
, self
.usr1
)
527 self
.delete_deleted(self
.ldb
, self
.usr2
)
528 self
.delete_deleted(self
.ldb
, self
.grp1
)
529 self
.delete_deleted(self
.ldb
, self
.sit1
)
530 self
.delete_deleted(self
.ldb
, self
.ss1
)
531 self
.delete_deleted(self
.ldb
, self
.srv1
)
532 self
.delete_deleted(self
.ldb
, self
.srv2
)
534 self
.assertTrue("CN=Deleted Objects" in str(objDeleted1
.dn
))
535 self
.assertEqual(objDeleted1
.dn
.parent(),
536 self
.deleted_objects_domain_dn
)
537 self
.assertEqual(objDeleted1
["parentGUID"][0],
538 self
.deleted_objects_domain_guid
)
539 self
.assertTrue("CN=Deleted Objects" in str(objDeleted2
.dn
))
540 self
.assertEqual(objDeleted2
.dn
.parent(),
541 self
.deleted_objects_domain_dn
)
542 self
.assertEqual(objDeleted2
["parentGUID"][0],
543 self
.deleted_objects_domain_guid
)
544 self
.assertTrue("CN=Deleted Objects" in str(objDeleted3
.dn
))
545 self
.assertEqual(objDeleted3
.dn
.parent(),
546 self
.deleted_objects_domain_dn
)
547 self
.assertEqual(objDeleted3
["parentGUID"][0],
548 self
.deleted_objects_domain_guid
)
549 self
.assertFalse("CN=Deleted Objects" in str(objDeleted4
.dn
))
550 self
.assertTrue("CN=Deleted Objects" in str(objDeleted5
.dn
))
551 self
.assertEqual(objDeleted5
.dn
.parent(),
552 self
.deleted_objects_config_dn
)
553 self
.assertEqual(objDeleted5
["parentGUID"][0],
554 self
.deleted_objects_config_guid
)
555 self
.assertFalse("CN=Deleted Objects" in str(objDeleted6
.dn
))
556 self
.assertFalse("CN=Deleted Objects" in str(objDeleted7
.dn
))
559 if "://" not in host
:
560 if os
.path
.isfile(host
):
561 host
= "tdb://%s" % host
563 host
= "ldap://%s" % host
565 TestProgram(module
=__name__
, opts
=subunitopts
)