From f56692a56b5a500268d9daf9d5bc4ee55d465536 Mon Sep 17 00:00:00 2001 From: Eduardo Lima Date: Wed, 31 Mar 2010 11:54:09 -0300 Subject: [PATCH] s4-drs: additional delete test cases --- source4/lib/ldb/tests/python/deletetest.py | 602 +++++++++++++++++++++++++---- 1 file changed, 530 insertions(+), 72 deletions(-) diff --git a/source4/lib/ldb/tests/python/deletetest.py b/source4/lib/ldb/tests/python/deletetest.py index eff92c5f339..8e6ef2a2557 100755 --- a/source4/lib/ldb/tests/python/deletetest.py +++ b/source4/lib/ldb/tests/python/deletetest.py @@ -12,9 +12,10 @@ sys.path.append("../lib/testtools") import samba.getopt as options +import ldb from samba.auth import system_session -from ldb import SCOPE_BASE, LdbError -from ldb import ERR_NO_SUCH_OBJECT +from ldb import SCOPE_SUBTREE, SCOPE_BASE, LdbError +from ldb import ERR_NO_SUCH_OBJECT, ERR_ATTRIBUTE_OR_VALUE_EXISTS from samba import Ldb from subunit.run import SubunitTestRunner @@ -40,30 +41,30 @@ creds = credopts.get_credentials(lp) class BasicDeleteTests(unittest.TestCase): - def delete_force(self, ldb, dn): + def delete_force(self, ldb_ctx, dn): try: - ldb.delete(dn) + ldb_ctx.delete(dn) except LdbError, (num, _): self.assertEquals(num, ERR_NO_SUCH_OBJECT) def GUID_string(self, guid): - return self.ldb.schema_format_value("objectGUID", guid) + return self.ldb_ctx.schema_format_value("objectGUID", guid) - def find_basedn(self, ldb): - res = ldb.search(base="", expression="", scope=SCOPE_BASE, + def find_basedn(self, ldb_ctx): + res = ldb_ctx.search(base="", expression="", scope=SCOPE_BASE, attrs=["defaultNamingContext"]) self.assertEquals(len(res), 1) return res[0]["defaultNamingContext"][0] def setUp(self): - self.ldb = ldb - self.base_dn = self.find_basedn(ldb) + self.ldb_ctx = ldb_ctx + self.base_dn = self.find_basedn(ldb_ctx) def search_guid(self,guid): print "SEARCH by GUID %s" % self.GUID_string(guid) expression = "(objectGUID=%s)" % self.GUID_string(guid) - res = ldb.search(expression=expression, + res = ldb_ctx.search(expression=expression, controls=["show_deleted:1"]) self.assertEquals(len(res), 1) return res[0] @@ -71,7 +72,7 @@ class BasicDeleteTests(unittest.TestCase): def search_dn(self,dn): print "SEARCH by DN %s" % dn - res = ldb.search(expression="(objectClass=*)", + res = ldb_ctx.search(expression="(objectClass=*)", base=dn, scope=SCOPE_BASE, controls=["show_deleted:1"]) @@ -108,94 +109,551 @@ class BasicDeleteTests(unittest.TestCase): guid=liveObj["objectGUID"][0] self.assertEquals(rdn2, rdn + "\nDEL:" + self.GUID_string(guid)) self.assertEquals(name2, rdn + "\nDEL:" + self.GUID_string(guid)) - - def delete_deleted(self, ldb, dn): + + def delete_deleted(self, ldb_ctx, dn): print "Testing the deletion of the already deleted dn %s" % dn try: - ldb.delete(dn) + ldb_ctx.delete(dn) self.fail() except LdbError, (num, _): self.assertEquals(num, ERR_NO_SUCH_OBJECT) + + def find_domain_func_level(self,ldb_ctx): + print "Searching for the domain functional level" + + res = ldb_ctx.search(base="", expression="", scope=SCOPE_BASE, attrs=["domainControllerFunctionality"]) + self.assertEquals(len(res), 1) + return res[0]["domainControllerFunctionality"][0] + + def find_recycle_bin_msg(self,ldb_ctx): + print "Searching for recycle bin msg" + + res = ldb_ctx.search(base="", + expression="(&(objectClass=msDS-OptionalFeature)(msDS-OptionalFeatureGUID=766ddcd8-acd0-445e-f3b9-a7f9b6744f2a))", + scope=SCOPE_SUBTREE, attrs=["*"],controls=["search_options:1:2"]) + self.assertEquals(len(res), 1) + return res[0] + + def find_ntds_settings(self,ldb_ctx): + print "Searching for ntds settings dn" + + res = ldb_ctx.search(base="", expression="", scope=SCOPE_BASE, attrs=["dsServiceName"]) + self.assertEquals(len(res), 1) + return res[0]["dsServiceName"][0] + + def get_configuration_nc(self, ldb_ctx): + print "Searching for the configuration nc" + + res = ldb_ctx.search(base="", expression="", scope=SCOPE_BASE, attrs=["configurationNamingContext"]) + rootDse = res[0] + + return rootDse["configurationNamingContext"] + + def check_recyclebin_disabled(self, ldb_ctx): + print "Testing if recycle bin is disabled" + + ret = True + + recycle_bin = self.find_recycle_bin_msg(ldb_ctx) + ntds_settings = self.find_ntds_settings(ldb_ctx) + + configbase = self.get_configuration_nc(ldb_ctx) + partition = "CN=Partitions," + str(configbase) + + ntds_msg = self.search_dn(ntds_settings) + partition_msg = self.search_dn(partition) + recycle_bin_dn = str(recycle_bin["dn"]) + + try: + self.assertTrue(ntds_msg["msDS-EnabledFeature"][0] == recycle_bin_dn) + self.fail() + except KeyError, error: + try: + self.assertEquals(error[0], 'No such element') + except: + ret = False + except AssertionError: + ret = False + + try: + self.assertTrue(partition_msg["msDS-EnabledFeature"][0] == recycle_bin_dn) + self.fail() + except KeyError, error: + try: + self.assertEquals(error[0], 'No such element') + except: + ret = False + except AssertionError: + ret = False + + try: + count = 0 + for item in recycle_bin["msDS-EnabledFeatureBL"]: + if (item == partition) or (item == ntds_settings): + count += 1 + self.assertEquals(count, 0) + except KeyError, error: + try: + self.assertEquals(error[0], 'No such element') + except: + ret = False + except AssertionError: + ret = False + + return ret def test_all(self): """Basic delete tests""" - + print self.base_dn + + recycle_disabled = self.check_recyclebin_disabled(self.ldb_ctx) + if (recycle_disabled == False): + print "Recycle Bin is already enabled" + else: + dn1="cn=testuser,cn=users," + self.base_dn + dn2="cn=testuser2,cn=users," + self.base_dn + grp1="cn=testdelgroup1,cn=users," + self.base_dn + + self.delete_force(self.ldb_ctx, dn1) + self.delete_force(self.ldb_ctx, dn2) + self.delete_force(self.ldb_ctx, grp1) + + ldb_ctx.add({ + "dn": dn1, + "objectclass": "user", + "cn": "testuser", + "description": "test user description", + "samaccountname": "testuser"}) + + ldb_ctx.add({ + "dn": dn2, + "objectclass": "user", + "cn": "testuser2", + "description": "test user 2 description", + "samaccountname": "testuser2"}) + + ldb_ctx.add({ + "dn": grp1, + "objectclass": "group", + "cn": "testdelgroup1", + "description": "test group", + "samaccountname": "testdelgroup1", + "member": [ dn1, dn2 ] }) + + objLive1 = self.search_dn(dn1) + guid1=objLive1["objectGUID"][0] + + objLive2 = self.search_dn(dn2) + guid2=objLive2["objectGUID"][0] + + objLive3 = self.search_dn(grp1) + guid3=objLive3["objectGUID"][0] + + ldb_ctx.delete(dn1) + ldb_ctx.delete(dn2) + ldb_ctx.delete(grp1) + + objDeleted1 = self.search_guid(guid1) + objDeleted2 = self.search_guid(guid2) + objDeleted3 = self.search_guid(guid3) + + self.del_attr_values(objDeleted1) + self.del_attr_values(objDeleted2) + self.del_attr_values(objDeleted3) + + self.preserved_attributes_list(objLive1, objDeleted1) + self.preserved_attributes_list(objLive2, objDeleted2) + + self.check_rdn(objLive1, objDeleted1, "cn") + self.check_rdn(objLive2, objDeleted2, "cn") + self.check_rdn(objLive3, objDeleted3, "cn") + + self.delete_deleted(ldb_ctx, dn1) + self.delete_deleted(ldb_ctx, dn2) + self.delete_deleted(ldb_ctx, grp1) + + + +class RecycleBinActivationTests(unittest.TestCase): + + def find_basedn(self, ldb_ctx): + res = ldb_ctx.search(base="", expression="", scope=SCOPE_BASE, + attrs=["defaultNamingContext"]) + self.assertEquals(len(res), 1) + return res[0]["defaultNamingContext"][0] + + def setUp(self): + self.ldb_ctx = ldb_ctx + self.base_dn = self.find_basedn(ldb_ctx) + + def find_recycle_bin_msg(self,ldb_ctx): + print "Searching for recycle bin msg" + + res = ldb_ctx.search(base="", + expression="(&(objectClass=msDS-OptionalFeature)(msDS-OptionalFeatureGUID=766ddcd8-acd0-445e-f3b9-a7f9b6744f2a))", + scope=SCOPE_SUBTREE, attrs=["*"],controls=["search_options:1:2"]) + self.assertEquals(len(res), 1) + return res[0] + + def find_ntds_settings(self,ldb_ctx): + print "Searching for ntds settings dn" + + res = ldb_ctx.search(base="", expression="", scope=SCOPE_BASE, attrs=["dsServiceName"]) + self.assertEquals(len(res), 1) + return res[0]["dsServiceName"][0] + + def find_domain_func_level(self,ldb_ctx): + print "Searching for the domain functional level" + + res = ldb_ctx.search(base="", expression="", scope=SCOPE_BASE, attrs=["domainControllerFunctionality"]) + self.assertEquals(len(res), 1) + return res[0]["domainControllerFunctionality"][0] + + def search_dn(self,dn): + print "Searching by DN %s" % dn - dn1="cn=testuser,cn=users," + self.base_dn - dn2="cn=testuser2,cn=users," + self.base_dn - grp1="cn=testdelgroup1,cn=users," + self.base_dn - - self.delete_force(self.ldb, dn1) - self.delete_force(self.ldb, dn2) - self.delete_force(self.ldb, grp1) - - ldb.add({ - "dn": dn1, - "objectclass": "user", - "cn": "testuser", - "description": "test user description", - "samaccountname": "testuser"}) - - ldb.add({ - "dn": dn2, - "objectclass": "user", - "cn": "testuser2", - "description": "test user 2 description", - "samaccountname": "testuser2"}) - - ldb.add({ - "dn": grp1, - "objectclass": "group", - "cn": "testdelgroup1", - "description": "test group", - "samaccountname": "testdelgroup1", - "member": [ dn1, dn2 ] }) - - objLive1 = self.search_dn(dn1) - guid1=objLive1["objectGUID"][0] - - objLive2 = self.search_dn(dn2) - guid2=objLive2["objectGUID"][0] - - objLive3 = self.search_dn(grp1) - guid3=objLive3["objectGUID"][0] + res = ldb_ctx.search(expression="(objectClass=*)", + base=dn, + scope=SCOPE_BASE) + self.assertEquals(len(res), 1) + return res[0] + + def get_configuration_nc(self, ldb_ctx): + print "Searching for the configuration nc" + + res = ldb_ctx.search(base="", expression="", scope=SCOPE_BASE, attrs=["configurationNamingContext"]) + rootDse = res[0] + + return rootDse["configurationNamingContext"] + + def enablerecyclebin(self, ldb_ctx, partition): + print "Enabling the recycle bin optional feature" + + msg = ldb.Message() + msg.dn = ldb.Dn(ldb_ctx, "") + msg["enableOptionalFeature"] = ldb.MessageElement( + partition + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a", + ldb.FLAG_MOD_ADD, "enableOptionalFeature") + res = ldb_ctx.modify(msg) + + print "Recycle Bin feature enabled" + + def check_recyclebin_disabled(self, ldb_ctx): + print "Testing if recycle bin is disabled" + + ret = True + + recycle_bin = self.find_recycle_bin_msg(ldb_ctx) + ntds_settings = self.find_ntds_settings(ldb_ctx) + + configbase = self.get_configuration_nc(ldb_ctx) + partition = "CN=Partitions," + str(configbase) + + ntds_msg = self.search_dn(ntds_settings) + partition_msg = self.search_dn(partition) + recycle_bin_dn = str(recycle_bin["dn"]) + + try: + self.assertTrue(ntds_msg["msDS-EnabledFeature"][0] == recycle_bin_dn) + self.fail() + except KeyError, error: + try: + self.assertEquals(error[0], 'No such element') + except: + ret = False + except AssertionError: + ret = False + + try: + self.assertTrue(partition_msg["msDS-EnabledFeature"][0] == recycle_bin_dn) + self.fail() + except KeyError, error: + try: + self.assertEquals(error[0], 'No such element') + except: + ret = False + except AssertionError: + ret = False + + try: + count = 0 + for item in recycle_bin["msDS-EnabledFeatureBL"]: + if (item == partition) or (item == ntds_settings): + count += 1 + self.assertEquals(count, 0) + except KeyError, error: + try: + self.assertEquals(error[0], 'No such element') + except: + ret = False + except AssertionError: + ret = False + + return ret + + def test_all(self): + + """Recycle Bin activation tests""" + + print self.base_dn + + print "Testing domain's functional level" + func_level = self.find_domain_func_level(ldb_ctx) + recycle_bin = self.find_recycle_bin_msg(ldb_ctx) + required_func_level = recycle_bin["msDS-RequiredForestBehaviorVersion"][0] + + recycle_disabled = self.check_recyclebin_disabled(self.ldb_ctx) + if recycle_disabled == False: + print "Recycle Bin is already enabled" + + elif func_level < required_func_level: + print "Functional level should be at least %s to perform this test" %required_func_level + + else: + ntds_settings = self.find_ntds_settings(ldb_ctx) + + configbase = self.get_configuration_nc(ldb_ctx) + partition = "CN=Partitions," + str(configbase) + + ntds_msg = self.search_dn(ntds_settings) + partition_msg = self.search_dn(partition) + recycle_bin_dn = str(recycle_bin["dn"]) + + self.enablerecyclebin(ldb_ctx, partition) + + print "Testing if recycle bin was enabled successfully" + + ntds_msg = self.search_dn(ntds_settings) + partition_msg = self.search_dn(partition) + recycle_bin = self.find_recycle_bin_msg(ldb_ctx) + + self.assertTrue(ntds_msg["msDS-EnabledFeature"][0] == recycle_bin_dn) + self.assertTrue(partition_msg["msDS-EnabledFeature"][0] == recycle_bin_dn) + + print "Testing the Recycle Bin msg backlinks" + count = 0 + for item in recycle_bin["msDS-EnabledFeatureBL"]: + if (item == partition) or (item == ntds_settings): + count += 1 + self.assertEquals(count, 2) + + print "Testing the activation of a previously activated recycle bin" + try: + self.enablerecyclebin(ldb_ctx, partition) + self.fail() + except LdbError, (num, _): + self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) + +class ThreeStageDeleteTests(unittest.TestCase): + + def delete_force(self, ldb_ctx, dn): + try: + ldb_ctx.delete(dn) + except LdbError, (num, _): + self.assertEquals(num, ERR_NO_SUCH_OBJECT) + + def find_basedn(self, ldb_ctx): + res = ldb_ctx.search(base="", expression="", scope=SCOPE_BASE, + attrs=["defaultNamingContext"]) + self.assertEquals(len(res), 1) + return res[0]["defaultNamingContext"][0] + + def setUp(self): + self.ldb_ctx = ldb_ctx + self.base_dn = self.find_basedn(ldb_ctx) - ldb.delete(dn1) - ldb.delete(dn2) - ldb.delete(grp1) + def GUID_string(self, guid): + return self.ldb_ctx.schema_format_value("objectGUID", guid) - objDeleted1 = self.search_guid(guid1) - objDeleted2 = self.search_guid(guid2) - objDeleted3 = self.search_guid(guid3) + def search_dn(self,dn): + print "SEARCH by DN %s" % dn - self.del_attr_values(objDeleted1) - self.del_attr_values(objDeleted2) - self.del_attr_values(objDeleted3) + res = ldb_ctx.search(expression="(objectClass=*)", + base=dn, + scope=SCOPE_BASE, + controls=["show_deleted:1"]) + self.assertEquals(len(res), 1) + return res[0] + + def search_guid(self,guid): + print "SEARCH by GUID %s" % self.GUID_string(guid) - self.preserved_attributes_list(objLive1, objDeleted1) - self.preserved_attributes_list(objLive2, objDeleted2) + expression = "(objectGUID=%s)" % self.GUID_string(guid) + res = ldb_ctx.search(expression=expression, + controls=["show_deleted:1"]) + self.assertEquals(len(res), 1) + return res[0] + + def del_attr_values(self, delObj): + print "Checking attributes for %s" % delObj["dn"] - self.check_rdn(objLive1, objDeleted1, "cn") - self.check_rdn(objLive2, objDeleted2, "cn") - self.check_rdn(objLive3, objDeleted3, "cn") + self.assertEquals(delObj["isDeleted"][0],"TRUE") + self.assertTrue(not("objectCategory" in delObj)) + self.assertTrue(not("sAMAccountType" in delObj)) + + def check_rdn(self, liveObj, delObj, rdnName): + print "Checking for correct rDN" + rdn=liveObj[rdnName][0] + rdn2=delObj[rdnName][0] + name2=delObj[rdnName][0] + guid=liveObj["objectGUID"][0] + self.assertEquals(rdn2, rdn + "\nDEL:" + self.GUID_string(guid)) + self.assertEquals(name2, rdn + "\nDEL:" + self.GUID_string(guid)) + + def get_configuration_nc(self, ldb_ctx): + print "Searching for the configuration nc" + + res = ldb_ctx.search(base="", expression="", scope=SCOPE_BASE, attrs=["configurationNamingContext"]) + rootDse = res[0] + + return rootDse["configurationNamingContext"] + + def check_lastknownparent(self, liveObj, delObj, rdnName): + print "Checking for correct lastKnownParent" + liveObjCN=liveObj[rdnName][0] + liveObjDN=str(liveObj["dn"]) + delObjLastKnownParent=delObj["lastKnownParent"][0] + + self.assertEquals(liveObjDN, rdnName + "=" + liveObjCN + "," + delObjLastKnownParent) + + def check_lastknownrdn(self, liveObj, delObj, rdnName): + print "Checking for correct msDS-LastKnownRDN" + liveObjRDN=liveObj[rdnName][0] + delObjLastKnownRDN=delObj["msDS-LastKnownRDN"][0] + + self.assertEquals(liveObjRDN,delObjLastKnownRDN) + + def preserved_live_attributes(self, liveObj, delObj): + print "Checking if all the attributes in the liveObj are in the delObj" + + removed_attrs = ["objectCategory", "sAMAccountType"] + + for a in liveObj: + if a not in removed_attrs: + self.assertTrue(a in delObj) + + def preserved_attributes_list(self, liveObj, delObj): + print "Checking for preserved attributes list" - self.delete_deleted(ldb, dn1) - self.delete_deleted(ldb, dn2) - self.delete_deleted(ldb, grp1) + preserved_list = ["nTSecurityDescriptor", "attributeID", "attributeSyntax", "dNReferenceUpdate", "dNSHostName", + "flatName", "governsID", "groupType", "instanceType", "lDAPDisplayName", "legacyExchangeDN", + "isDeleted", "isRecycled", "lastKnownParent", "msDS-LastKnownRDN", "mS-DS-CreatorSID", + "mSMQOwnerID", "nCName", "objectClass", "distinguishedName", "objectGUID", "objectSid", + "oMSyntax", "proxiedObjectName", "name", "replPropertyMetaData", "sAMAccountName", + "securityIdentifier", "sIDHistory", "subClassOf", "systemFlags", "trustPartner", "trustDirection", + "trustType", "trustAttributes", "userAccountControl", "uSNChanged", "uSNCreated", "whenCreated"] + for a in liveObj: + if a in preserved_list: + self.assertTrue(a in delObj) + + def check_isRecycled(self, delObj): + print "Checking isRecycled attribute for %s" % delObj["dn"] + + self.assertEquals(delObj["isRecycled"][0],"TRUE") + + def enablerecyclebin(self, ldb_ctx, partition): + print "Enabling the recycle bin optional feature" + + msg = ldb.Message() + msg.dn = ldb.Dn(ldb_ctx, "") + msg["enableOptionalFeature"] = ldb.MessageElement( + partition + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a", + ldb.FLAG_MOD_ADD, "enableOptionalFeature") + res = ldb_ctx.modify(msg) + + print "Recycle Bin feature enabled" + + def find_domain_func_level(self,ldb_ctx): + print "Searching for the domain functional level" + + res = ldb_ctx.search(base="", expression="", scope=SCOPE_BASE, attrs=["domainControllerFunctionality"]) + self.assertEquals(len(res), 1) + return res[0]["domainControllerFunctionality"][0] + + def find_recycle_bin_msg(self,ldb_ctx): + print "Searching for recycle bin msg" + + res = ldb_ctx.search(base="", + expression="(&(objectClass=msDS-OptionalFeature)(msDS-OptionalFeatureGUID=766ddcd8-acd0-445e-f3b9-a7f9b6744f2a))", + scope=SCOPE_SUBTREE, attrs=["*"],controls=["search_options:1:2"]) + self.assertEquals(len(res), 1) + return res[0] + + def test_all(self): + + print "Testing domain's functional level" + func_level = self.find_domain_func_level(ldb_ctx) + recycle_bin = self.find_recycle_bin_msg(ldb_ctx) + required_func_level = recycle_bin["msDS-RequiredForestBehaviorVersion"][0] + + if func_level < required_func_level: + print "Functional level should be at least %s to perform this test" %required_func_level + + else: + configbase = self.get_configuration_nc(ldb_ctx) + partition = "CN=Partitions," + str(configbase) + + try: + self.enablerecyclebin(ldb_ctx, partition) + except LdbError, (num, _): + self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) + print "Recycle Bin already enabled" + + dn1="cn=testuser1,cn=users," + self.base_dn + + self.delete_force(self.ldb_ctx, dn1) + + ldb_ctx.add({ + "dn": dn1, + "objectclass": "user", + "cn": "testuser1", + "description": "test user description", + "samaccountname": "testuser1"}) + + objLive1 = self.search_dn(dn1) + guid1=objLive1["objectGUID"][0] + + ldb_ctx.delete(dn1) + + objDeleted1 = self.search_guid(guid1) + + temp = objDeleted1 + + self.check_rdn(objLive1, objDeleted1, "cn") + self.check_lastknownparent(objLive1, objDeleted1, "CN") + self.check_lastknownrdn(objLive1, objDeleted1, "CN") + self.del_attr_values(objDeleted1) + self.preserved_live_attributes(objLive1, objDeleted1) + + ldb_ctx.delete(objDeleted1["dn"]) + + objDeleted1 = self.search_guid(guid1) + + self.check_rdn(objLive1, objDeleted1, "cn") + self.check_lastknownparent(objLive1, objDeleted1, "CN") + self.check_lastknownrdn(objLive1, objDeleted1, "CN") + self.preserved_attributes_list(objLive1, objDeleted1) + self.del_attr_values(objDeleted1) + self.check_isRecycled(objDeleted1) + if not "://" in host: if os.path.isfile(host): host = "tdb://%s" % host else: host = "ldap://%s" % host -ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp) +ldb_ctx = Ldb(host, credentials=creds, session_info=system_session(), lp=lp) runner = SubunitTestRunner() rc = 0 if not runner.run(unittest.makeSuite(BasicDeleteTests)).wasSuccessful(): rc = 1 +if not runner.run(unittest.makeSuite(RecycleBinActivationTests)).wasSuccessful(): + rc = 1 +if not runner.run(unittest.makeSuite(ThreeStageDeleteTests)).wasSuccessful(): + rc = 1 -sys.exit(rc) +sys.exit(rc) \ No newline at end of file -- 2.11.4.GIT