2 # -*- coding: utf-8 -*-
9 sys
.path
.append("bin/python")
10 sys
.path
.append("../lib/subunit/python")
11 sys
.path
.append("../lib/testtools")
13 import samba
.getopt
as options
16 from samba
.auth
import system_session
17 from ldb
import SCOPE_SUBTREE
, SCOPE_BASE
, LdbError
18 from ldb
import ERR_NO_SUCH_OBJECT
, ERR_ATTRIBUTE_OR_VALUE_EXISTS
21 from subunit
.run
import SubunitTestRunner
24 parser
= optparse
.OptionParser("deletetest.py [options] <host|file>")
25 sambaopts
= options
.SambaOptions(parser
)
26 parser
.add_option_group(sambaopts
)
27 parser
.add_option_group(options
.VersionOptions(parser
))
28 # use command line creds if available
29 credopts
= options
.CredentialsOptions(parser
)
30 parser
.add_option_group(credopts
)
31 opts
, args
= parser
.parse_args()
39 lp
= sambaopts
.get_loadparm()
40 creds
= credopts
.get_credentials(lp
)
42 class BasicDeleteTests(unittest
.TestCase
):
44 def delete_force(self
, ldb_ctx
, dn
):
47 except LdbError
, (num
, _
):
48 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
50 def GUID_string(self
, guid
):
51 return self
.ldb_ctx
.schema_format_value("objectGUID", guid
)
53 def find_basedn(self
, ldb_ctx
):
54 res
= ldb_ctx
.search(base
="", expression
="", scope
=SCOPE_BASE
,
55 attrs
=["defaultNamingContext"])
56 self
.assertEquals(len(res
), 1)
57 return res
[0]["defaultNamingContext"][0]
60 self
.ldb_ctx
= ldb_ctx
61 self
.base_dn
= self
.find_basedn(ldb_ctx
)
63 def search_guid(self
,guid
):
64 print "SEARCH by GUID %s" % self
.GUID_string(guid
)
66 expression
= "(objectGUID=%s)" % self
.GUID_string(guid
)
67 res
= ldb_ctx
.search(expression
=expression
,
68 controls
=["show_deleted:1"])
69 self
.assertEquals(len(res
), 1)
72 def search_dn(self
,dn
):
73 print "SEARCH by DN %s" % dn
75 res
= ldb_ctx
.search(expression
="(objectClass=*)",
78 controls
=["show_deleted:1"])
79 self
.assertEquals(len(res
), 1)
82 def del_attr_values(self
, delObj
):
83 print "Checking attributes for %s" % delObj
["dn"]
85 self
.assertEquals(delObj
["isDeleted"][0],"TRUE")
86 self
.assertTrue(not("objectCategory" in delObj
))
87 self
.assertTrue(not("sAMAccountType" in delObj
))
89 def preserved_attributes_list(self
, liveObj
, delObj
):
90 print "Checking for preserved attributes list"
92 preserved_list
= ["nTSecurityDescriptor", "attributeID", "attributeSyntax", "dNReferenceUpdate", "dNSHostName",
93 "flatName", "governsID", "groupType", "instanceType", "lDAPDisplayName", "legacyExchangeDN",
94 "isDeleted", "isRecycled", "lastKnownParent", "msDS-LastKnownRDN", "mS-DS-CreatorSID",
95 "mSMQOwnerID", "nCName", "objectClass", "distinguishedName", "objectGUID", "objectSid",
96 "oMSyntax", "proxiedObjectName", "name", "replPropertyMetaData", "sAMAccountName",
97 "securityIdentifier", "sIDHistory", "subClassOf", "systemFlags", "trustPartner", "trustDirection",
98 "trustType", "trustAttributes", "userAccountControl", "uSNChanged", "uSNCreated", "whenCreated"]
101 if a
in preserved_list
:
102 self
.assertTrue(a
in delObj
)
104 def check_rdn(self
, liveObj
, delObj
, rdnName
):
105 print "Checking for correct rDN"
106 rdn
=liveObj
[rdnName
][0]
107 rdn2
=delObj
[rdnName
][0]
108 name2
=delObj
[rdnName
][0]
109 guid
=liveObj
["objectGUID"][0]
110 self
.assertEquals(rdn2
, rdn
+ "\nDEL:" + self
.GUID_string(guid
))
111 self
.assertEquals(name2
, rdn
+ "\nDEL:" + self
.GUID_string(guid
))
113 def delete_deleted(self
, ldb_ctx
, dn
):
114 print "Testing the deletion of the already deleted dn %s" % dn
119 except LdbError
, (num
, _
):
120 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
122 def find_domain_func_level(self
,ldb_ctx
):
123 print "Searching for the domain functional level"
125 res
= ldb_ctx
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["domainControllerFunctionality"])
126 self
.assertEquals(len(res
), 1)
127 return res
[0]["domainControllerFunctionality"][0]
129 def find_recycle_bin_msg(self
,ldb_ctx
):
130 print "Searching for recycle bin msg"
132 res
= ldb_ctx
.search(base
="",
133 expression
="(&(objectClass=msDS-OptionalFeature)(msDS-OptionalFeatureGUID=766ddcd8-acd0-445e-f3b9-a7f9b6744f2a))",
134 scope
=SCOPE_SUBTREE
, attrs
=["*"],controls
=["search_options:1:2"])
135 self
.assertEquals(len(res
), 1)
138 def find_ntds_settings(self
,ldb_ctx
):
139 print "Searching for ntds settings dn"
141 res
= ldb_ctx
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["dsServiceName"])
142 self
.assertEquals(len(res
), 1)
143 return res
[0]["dsServiceName"][0]
145 def get_configuration_nc(self
, ldb_ctx
):
146 print "Searching for the configuration nc"
148 res
= ldb_ctx
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["configurationNamingContext"])
151 return rootDse
["configurationNamingContext"]
153 def check_recyclebin_disabled(self
, ldb_ctx
):
154 print "Testing if recycle bin is disabled"
158 recycle_bin
= self
.find_recycle_bin_msg(ldb_ctx
)
159 ntds_settings
= self
.find_ntds_settings(ldb_ctx
)
161 configbase
= self
.get_configuration_nc(ldb_ctx
)
162 partition
= "CN=Partitions," + str(configbase
)
164 ntds_msg
= self
.search_dn(ntds_settings
)
165 partition_msg
= self
.search_dn(partition
)
166 recycle_bin_dn
= str(recycle_bin
["dn"])
169 self
.assertTrue(ntds_msg
["msDS-EnabledFeature"][0] == recycle_bin_dn
)
171 except KeyError, error
:
173 self
.assertEquals(error
[0], 'No such element')
176 except AssertionError:
180 self
.assertTrue(partition_msg
["msDS-EnabledFeature"][0] == recycle_bin_dn
)
182 except KeyError, error
:
184 self
.assertEquals(error
[0], 'No such element')
187 except AssertionError:
192 for item
in recycle_bin
["msDS-EnabledFeatureBL"]:
193 if (item
== partition
) or (item
== ntds_settings
):
195 self
.assertEquals(count
, 0)
196 except KeyError, error
:
198 self
.assertEquals(error
[0], 'No such element')
201 except AssertionError:
207 """Basic delete tests"""
211 recycle_disabled
= self
.check_recyclebin_disabled(self
.ldb_ctx
)
212 if (recycle_disabled
== False):
213 print "Recycle Bin is already enabled"
215 dn1
="cn=testuser,cn=users," + self
.base_dn
216 dn2
="cn=testuser2,cn=users," + self
.base_dn
217 grp1
="cn=testdelgroup1,cn=users," + self
.base_dn
219 self
.delete_force(self
.ldb_ctx
, dn1
)
220 self
.delete_force(self
.ldb_ctx
, dn2
)
221 self
.delete_force(self
.ldb_ctx
, grp1
)
225 "objectclass": "user",
227 "description": "test user description",
228 "samaccountname": "testuser"})
232 "objectclass": "user",
234 "description": "test user 2 description",
235 "samaccountname": "testuser2"})
239 "objectclass": "group",
240 "cn": "testdelgroup1",
241 "description": "test group",
242 "samaccountname": "testdelgroup1",
243 "member": [ dn1
, dn2
] })
245 objLive1
= self
.search_dn(dn1
)
246 guid1
=objLive1
["objectGUID"][0]
248 objLive2
= self
.search_dn(dn2
)
249 guid2
=objLive2
["objectGUID"][0]
251 objLive3
= self
.search_dn(grp1
)
252 guid3
=objLive3
["objectGUID"][0]
258 objDeleted1
= self
.search_guid(guid1
)
259 objDeleted2
= self
.search_guid(guid2
)
260 objDeleted3
= self
.search_guid(guid3
)
262 self
.del_attr_values(objDeleted1
)
263 self
.del_attr_values(objDeleted2
)
264 self
.del_attr_values(objDeleted3
)
266 self
.preserved_attributes_list(objLive1
, objDeleted1
)
267 self
.preserved_attributes_list(objLive2
, objDeleted2
)
269 self
.check_rdn(objLive1
, objDeleted1
, "cn")
270 self
.check_rdn(objLive2
, objDeleted2
, "cn")
271 self
.check_rdn(objLive3
, objDeleted3
, "cn")
273 self
.delete_deleted(ldb_ctx
, dn1
)
274 self
.delete_deleted(ldb_ctx
, dn2
)
275 self
.delete_deleted(ldb_ctx
, grp1
)
279 class RecycleBinActivationTests(unittest
.TestCase
):
281 def find_basedn(self
, ldb_ctx
):
282 res
= ldb_ctx
.search(base
="", expression
="", scope
=SCOPE_BASE
,
283 attrs
=["defaultNamingContext"])
284 self
.assertEquals(len(res
), 1)
285 return res
[0]["defaultNamingContext"][0]
288 self
.ldb_ctx
= ldb_ctx
289 self
.base_dn
= self
.find_basedn(ldb_ctx
)
291 def find_recycle_bin_msg(self
,ldb_ctx
):
292 print "Searching for recycle bin msg"
294 res
= ldb_ctx
.search(base
="",
295 expression
="(&(objectClass=msDS-OptionalFeature)(msDS-OptionalFeatureGUID=766ddcd8-acd0-445e-f3b9-a7f9b6744f2a))",
296 scope
=SCOPE_SUBTREE
, attrs
=["*"],controls
=["search_options:1:2"])
297 self
.assertEquals(len(res
), 1)
300 def find_ntds_settings(self
,ldb_ctx
):
301 print "Searching for ntds settings dn"
303 res
= ldb_ctx
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["dsServiceName"])
304 self
.assertEquals(len(res
), 1)
305 return res
[0]["dsServiceName"][0]
307 def find_domain_func_level(self
,ldb_ctx
):
308 print "Searching for the domain functional level"
310 res
= ldb_ctx
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["domainControllerFunctionality"])
311 self
.assertEquals(len(res
), 1)
312 return res
[0]["domainControllerFunctionality"][0]
314 def search_dn(self
,dn
):
315 print "Searching by DN %s" % dn
317 res
= ldb_ctx
.search(expression
="(objectClass=*)",
320 self
.assertEquals(len(res
), 1)
323 def get_configuration_nc(self
, ldb_ctx
):
324 print "Searching for the configuration nc"
326 res
= ldb_ctx
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["configurationNamingContext"])
329 return rootDse
["configurationNamingContext"]
331 def enablerecyclebin(self
, ldb_ctx
, partition
):
332 print "Enabling the recycle bin optional feature"
335 msg
.dn
= ldb
.Dn(ldb_ctx
, "")
336 msg
["enableOptionalFeature"] = ldb
.MessageElement(
337 partition
+ ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
338 ldb
.FLAG_MOD_ADD
, "enableOptionalFeature")
339 res
= ldb_ctx
.modify(msg
)
341 print "Recycle Bin feature enabled"
343 def check_recyclebin_disabled(self
, ldb_ctx
):
344 print "Testing if recycle bin is disabled"
348 recycle_bin
= self
.find_recycle_bin_msg(ldb_ctx
)
349 ntds_settings
= self
.find_ntds_settings(ldb_ctx
)
351 configbase
= self
.get_configuration_nc(ldb_ctx
)
352 partition
= "CN=Partitions," + str(configbase
)
354 ntds_msg
= self
.search_dn(ntds_settings
)
355 partition_msg
= self
.search_dn(partition
)
356 recycle_bin_dn
= str(recycle_bin
["dn"])
359 self
.assertTrue(ntds_msg
["msDS-EnabledFeature"][0] == recycle_bin_dn
)
361 except KeyError, error
:
363 self
.assertEquals(error
[0], 'No such element')
366 except AssertionError:
370 self
.assertTrue(partition_msg
["msDS-EnabledFeature"][0] == recycle_bin_dn
)
372 except KeyError, error
:
374 self
.assertEquals(error
[0], 'No such element')
377 except AssertionError:
382 for item
in recycle_bin
["msDS-EnabledFeatureBL"]:
383 if (item
== partition
) or (item
== ntds_settings
):
385 self
.assertEquals(count
, 0)
386 except KeyError, error
:
388 self
.assertEquals(error
[0], 'No such element')
391 except AssertionError:
398 """Recycle Bin activation tests"""
402 print "Testing domain's functional level"
403 func_level
= self
.find_domain_func_level(ldb_ctx
)
404 recycle_bin
= self
.find_recycle_bin_msg(ldb_ctx
)
405 required_func_level
= recycle_bin
["msDS-RequiredForestBehaviorVersion"][0]
407 recycle_disabled
= self
.check_recyclebin_disabled(self
.ldb_ctx
)
408 if recycle_disabled
== False:
409 print "Recycle Bin is already enabled"
411 elif func_level
< required_func_level
:
412 print "Functional level should be at least %s to perform this test" %required
_func
_level
415 ntds_settings
= self
.find_ntds_settings(ldb_ctx
)
417 configbase
= self
.get_configuration_nc(ldb_ctx
)
418 partition
= "CN=Partitions," + str(configbase
)
420 ntds_msg
= self
.search_dn(ntds_settings
)
421 partition_msg
= self
.search_dn(partition
)
422 recycle_bin_dn
= str(recycle_bin
["dn"])
424 self
.enablerecyclebin(ldb_ctx
, partition
)
426 print "Testing if recycle bin was enabled successfully"
428 ntds_msg
= self
.search_dn(ntds_settings
)
429 partition_msg
= self
.search_dn(partition
)
430 recycle_bin
= self
.find_recycle_bin_msg(ldb_ctx
)
432 self
.assertTrue(ntds_msg
["msDS-EnabledFeature"][0] == recycle_bin_dn
)
433 self
.assertTrue(partition_msg
["msDS-EnabledFeature"][0] == recycle_bin_dn
)
435 print "Testing the Recycle Bin msg backlinks"
437 for item
in recycle_bin
["msDS-EnabledFeatureBL"]:
438 if (item
== partition
) or (item
== ntds_settings
):
440 self
.assertEquals(count
, 2)
442 print "Testing the activation of a previously activated recycle bin"
444 self
.enablerecyclebin(ldb_ctx
, partition
)
446 except LdbError
, (num
, _
):
447 self
.assertEquals(num
, ERR_ATTRIBUTE_OR_VALUE_EXISTS
)
449 class ThreeStageDeleteTests(unittest
.TestCase
):
451 def delete_force(self
, ldb_ctx
, dn
):
454 except LdbError
, (num
, _
):
455 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
457 def find_basedn(self
, ldb_ctx
):
458 res
= ldb_ctx
.search(base
="", expression
="", scope
=SCOPE_BASE
,
459 attrs
=["defaultNamingContext"])
460 self
.assertEquals(len(res
), 1)
461 return res
[0]["defaultNamingContext"][0]
464 self
.ldb_ctx
= ldb_ctx
465 self
.base_dn
= self
.find_basedn(ldb_ctx
)
467 def GUID_string(self
, guid
):
468 return self
.ldb_ctx
.schema_format_value("objectGUID", guid
)
470 def search_dn(self
,dn
):
471 print "SEARCH by DN %s" % dn
473 res
= ldb_ctx
.search(expression
="(objectClass=*)",
476 controls
=["show_deleted:1"])
477 self
.assertEquals(len(res
), 1)
480 def search_guid(self
,guid
):
481 print "SEARCH by GUID %s" % self
.GUID_string(guid
)
483 expression
= "(objectGUID=%s)" % self
.GUID_string(guid
)
484 res
= ldb_ctx
.search(expression
=expression
,
485 controls
=["show_deleted:1"])
486 self
.assertEquals(len(res
), 1)
489 def del_attr_values(self
, delObj
):
490 print "Checking attributes for %s" % delObj
["dn"]
492 self
.assertEquals(delObj
["isDeleted"][0],"TRUE")
493 self
.assertTrue(not("objectCategory" in delObj
))
494 self
.assertTrue(not("sAMAccountType" in delObj
))
496 def check_rdn(self
, liveObj
, delObj
, rdnName
):
497 print "Checking for correct rDN"
498 rdn
=liveObj
[rdnName
][0]
499 rdn2
=delObj
[rdnName
][0]
500 name2
=delObj
[rdnName
][0]
501 guid
=liveObj
["objectGUID"][0]
502 self
.assertEquals(rdn2
, rdn
+ "\nDEL:" + self
.GUID_string(guid
))
503 self
.assertEquals(name2
, rdn
+ "\nDEL:" + self
.GUID_string(guid
))
505 def get_configuration_nc(self
, ldb_ctx
):
506 print "Searching for the configuration nc"
508 res
= ldb_ctx
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["configurationNamingContext"])
511 return rootDse
["configurationNamingContext"]
513 def check_lastknownparent(self
, liveObj
, delObj
, rdnName
):
514 print "Checking for correct lastKnownParent"
515 liveObjCN
=liveObj
[rdnName
][0]
516 liveObjDN
=str(liveObj
["dn"])
517 delObjLastKnownParent
=delObj
["lastKnownParent"][0]
519 self
.assertEquals(liveObjDN
, rdnName
+ "=" + liveObjCN
+ "," + delObjLastKnownParent
)
521 def check_lastknownrdn(self
, liveObj
, delObj
, rdnName
):
522 print "Checking for correct msDS-LastKnownRDN"
523 liveObjRDN
=liveObj
[rdnName
][0]
524 delObjLastKnownRDN
=delObj
["msDS-LastKnownRDN"][0]
526 self
.assertEquals(liveObjRDN
,delObjLastKnownRDN
)
528 def preserved_live_attributes(self
, liveObj
, delObj
):
529 print "Checking if all the attributes in the liveObj are in the delObj"
531 removed_attrs
= ["objectCategory", "sAMAccountType"]
534 if a
not in removed_attrs
:
535 self
.assertTrue(a
in delObj
)
537 def preserved_attributes_list(self
, liveObj
, delObj
):
538 print "Checking for preserved attributes list"
540 preserved_list
= ["nTSecurityDescriptor", "attributeID", "attributeSyntax", "dNReferenceUpdate", "dNSHostName",
541 "flatName", "governsID", "groupType", "instanceType", "lDAPDisplayName", "legacyExchangeDN",
542 "isDeleted", "isRecycled", "lastKnownParent", "msDS-LastKnownRDN", "mS-DS-CreatorSID",
543 "mSMQOwnerID", "nCName", "objectClass", "distinguishedName", "objectGUID", "objectSid",
544 "oMSyntax", "proxiedObjectName", "name", "replPropertyMetaData", "sAMAccountName",
545 "securityIdentifier", "sIDHistory", "subClassOf", "systemFlags", "trustPartner", "trustDirection",
546 "trustType", "trustAttributes", "userAccountControl", "uSNChanged", "uSNCreated", "whenCreated"]
549 if a
in preserved_list
:
550 self
.assertTrue(a
in delObj
)
552 def check_isRecycled(self
, delObj
):
553 print "Checking isRecycled attribute for %s" % delObj
["dn"]
555 self
.assertEquals(delObj
["isRecycled"][0],"TRUE")
557 def enablerecyclebin(self
, ldb_ctx
, partition
):
558 print "Enabling the recycle bin optional feature"
561 msg
.dn
= ldb
.Dn(ldb_ctx
, "")
562 msg
["enableOptionalFeature"] = ldb
.MessageElement(
563 partition
+ ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
564 ldb
.FLAG_MOD_ADD
, "enableOptionalFeature")
565 res
= ldb_ctx
.modify(msg
)
567 print "Recycle Bin feature enabled"
569 def find_domain_func_level(self
,ldb_ctx
):
570 print "Searching for the domain functional level"
572 res
= ldb_ctx
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["domainControllerFunctionality"])
573 self
.assertEquals(len(res
), 1)
574 return res
[0]["domainControllerFunctionality"][0]
576 def find_recycle_bin_msg(self
,ldb_ctx
):
577 print "Searching for recycle bin msg"
579 res
= ldb_ctx
.search(base
="",
580 expression
="(&(objectClass=msDS-OptionalFeature)(msDS-OptionalFeatureGUID=766ddcd8-acd0-445e-f3b9-a7f9b6744f2a))",
581 scope
=SCOPE_SUBTREE
, attrs
=["*"],controls
=["search_options:1:2"])
582 self
.assertEquals(len(res
), 1)
587 print "Testing domain's functional level"
588 func_level
= self
.find_domain_func_level(ldb_ctx
)
589 recycle_bin
= self
.find_recycle_bin_msg(ldb_ctx
)
590 required_func_level
= recycle_bin
["msDS-RequiredForestBehaviorVersion"][0]
592 if func_level
< required_func_level
:
593 print "Functional level should be at least %s to perform this test" %required
_func
_level
596 configbase
= self
.get_configuration_nc(ldb_ctx
)
597 partition
= "CN=Partitions," + str(configbase
)
600 self
.enablerecyclebin(ldb_ctx
, partition
)
601 except LdbError
, (num
, _
):
602 self
.assertEquals(num
, ERR_ATTRIBUTE_OR_VALUE_EXISTS
)
603 print "Recycle Bin already enabled"
605 dn1
="cn=testuser1,cn=users," + self
.base_dn
607 self
.delete_force(self
.ldb_ctx
, dn1
)
611 "objectclass": "user",
613 "description": "test user description",
614 "samaccountname": "testuser1"})
616 objLive1
= self
.search_dn(dn1
)
617 guid1
=objLive1
["objectGUID"][0]
621 objDeleted1
= self
.search_guid(guid1
)
625 self
.check_rdn(objLive1
, objDeleted1
, "cn")
626 self
.check_lastknownparent(objLive1
, objDeleted1
, "CN")
627 self
.check_lastknownrdn(objLive1
, objDeleted1
, "CN")
628 self
.del_attr_values(objDeleted1
)
629 self
.preserved_live_attributes(objLive1
, objDeleted1
)
631 ldb_ctx
.delete(objDeleted1
["dn"])
633 objDeleted1
= self
.search_guid(guid1
)
635 self
.check_rdn(objLive1
, objDeleted1
, "cn")
636 self
.check_lastknownparent(objLive1
, objDeleted1
, "CN")
637 self
.check_lastknownrdn(objLive1
, objDeleted1
, "CN")
638 self
.preserved_attributes_list(objLive1
, objDeleted1
)
639 self
.del_attr_values(objDeleted1
)
640 self
.check_isRecycled(objDeleted1
)
642 if not "://" in host
:
643 if os
.path
.isfile(host
):
644 host
= "tdb://%s" % host
646 host
= "ldap://%s" % host
648 ldb_ctx
= Ldb(host
, credentials
=creds
, session_info
=system_session(), lp
=lp
)
650 runner
= SubunitTestRunner()
652 if not runner
.run(unittest
.makeSuite(BasicDeleteTests
)).wasSuccessful():
654 if not runner
.run(unittest
.makeSuite(RecycleBinActivationTests
)).wasSuccessful():
656 if not runner
.run(unittest
.makeSuite(ThreeStageDeleteTests
)).wasSuccessful():