From b881da6584333e63737baaa8f90b518f0e0f639d Mon Sep 17 00:00:00 2001 From: Nadezhda Ivanova Date: Tue, 21 Oct 2014 16:35:30 +0300 Subject: [PATCH] s4-dsdb-tests: Some tests for deleted objects undelete operation Based on MS-ADTS 3.1.1.5.3.7.2 Signed-off-by: Nadezhda Ivanova Reviewed-by: Andrew Bartlett Reviewed-by: Garming Sam Change-Id: I650b315601fce574f9302435f812d1dd4b177e68 --- source4/dsdb/tests/python/deletetest.py | 203 +++++++++++++++++++++++++++++++- 1 file changed, 198 insertions(+), 5 deletions(-) diff --git a/source4/dsdb/tests/python/deletetest.py b/source4/dsdb/tests/python/deletetest.py index 370f56cdea4..cb08db404d3 100755 --- a/source4/dsdb/tests/python/deletetest.py +++ b/source4/dsdb/tests/python/deletetest.py @@ -13,9 +13,9 @@ from samba.tests.subunitrun import SubunitOptions, TestProgram import samba.getopt as options from samba.auth import system_session -from ldb import SCOPE_BASE, LdbError -from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF -from ldb import ERR_UNWILLING_TO_PERFORM +from ldb import SCOPE_BASE, LdbError, Message, MessageElement, Dn, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE +from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF, ERR_ENTRY_ALREADY_EXISTS, ERR_ATTRIBUTE_OR_VALUE_EXISTS +from ldb import ERR_UNWILLING_TO_PERFORM, ERR_OPERATIONS_ERROR from samba.samdb import SamDB from samba.tests import delete_force @@ -39,13 +39,13 @@ host = args[0] lp = sambaopts.get_loadparm() creds = credopts.get_credentials(lp) -class BasicDeleteTests(samba.tests.TestCase): +class BaseDeleteTests(samba.tests.TestCase): def GUID_string(self, guid): return self.ldb.schema_format_value("objectGUID", guid) def setUp(self): - super(BasicDeleteTests, self).setUp() + super(BaseDeleteTests, self).setUp() self.ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp) self.base_dn = self.ldb.domain_dn() @@ -69,6 +69,12 @@ class BasicDeleteTests(samba.tests.TestCase): self.assertEquals(len(res), 1) return res[0] + +class BasicDeleteTests(BaseDeleteTests): + + def setUp(self): + super(BasicDeleteTests, self).setUp() + def del_attr_values(self, delObj): print "Checking attributes for %s" % delObj["dn"] @@ -373,6 +379,193 @@ class BasicDeleteTests(samba.tests.TestCase): self.assertFalse("CN=Deleted Objects" in str(objDeleted6.dn)) self.assertFalse("CN=Deleted Objects" in str(objDeleted7.dn)) +class BasicUndeleteTests(BaseDeleteTests): + + def setUp(self): + super(BasicUndeleteTests, self).setUp() + + def enable_recycle_bin(self): + msg = Message() + msg.dn = Dn(ldb, "") + msg["enableOptionalFeature"] = MessageElement( + "CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a", + FLAG_MOD_ADD, "enableOptionalFeature") + try: + ldb.modify(msg) + except LdbError, (num, _): + self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS) + + def get_ldb_connection(self, target_username, target_password): + creds_tmp = Credentials() + creds_tmp.set_username(target_username) + creds_tmp.set_password(target_password) + creds_tmp.set_domain(creds.get_domain()) + creds_tmp.set_realm(creds.get_realm()) + creds_tmp.set_workstation(creds.get_workstation()) + creds_tmp.set_gensec_features(creds_tmp.get_gensec_features() + | gensec.FEATURE_SEAL) + creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop + ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp) + return ldb_target + + def undelete_deleted(self, olddn, newdn, samldb): + msg = Message() + msg.dn = Dn(ldb, olddn) + msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted") + msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName") + res = samldb.modify(msg, ["show_deleted:1"]) + + def undelete_deleted_with_mod(self, olddn, newdn): + msg = Message() + msg.dn = Dn(ldb, olddn) + msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted") + msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName") + msg["url"] = MessageElement(["www.samba.org"], FLAG_MOD_REPLACE, "url") + res = ldb.modify(msg, ["show_deleted:1"]) + + + def test_undelete(self): + print "Testing standard undelete operation" + usr1="cn=testuser,cn=users," + self.base_dn + delete_force(self.ldb, usr1) + ldb.add({ + "dn": usr1, + "objectclass": "user", + "description": "test user description", + "samaccountname": "testuser"}) + objLive1 = self.search_dn(usr1) + guid1=objLive1["objectGUID"][0] + ldb.delete(usr1) + objDeleted1 = self.search_guid(guid1) + self.undelete_deleted(str(objDeleted1.dn), usr1, ldb) + objLive2 = self.search_dn(usr1) + self.assertEqual(str(objLive2.dn),str(objLive1.dn)) + delete_force(self.ldb, usr1) + + def test_rename(self): + print "Testing attempt to rename deleted object" + usr1="cn=testuser,cn=users," + self.base_dn + ldb.add({ + "dn": usr1, + "objectclass": "user", + "description": "test user description", + "samaccountname": "testuser"}) + objLive1 = self.search_dn(usr1) + guid1=objLive1["objectGUID"][0] + ldb.delete(usr1) + objDeleted1 = self.search_guid(guid1) + #just to make sure we get the correct error if the show deleted is missing + try: + ldb.rename(str(objDeleted1.dn), usr1) + self.fail() + except LdbError, (num, _): + self.assertEquals(num,ERR_NO_SUCH_OBJECT) + + try: + ldb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"]) + self.fail() + except LdbError, (num, _): + self.assertEquals(num,ERR_UNWILLING_TO_PERFORM) + + def test_undelete_with_mod(self): + print "Testing standard undelete operation with modification of additional attributes" + usr1="cn=testuser,cn=users," + self.base_dn + ldb.add({ + "dn": usr1, + "objectclass": "user", + "description": "test user description", + "samaccountname": "testuser"}) + objLive1 = self.search_dn(usr1) + guid1=objLive1["objectGUID"][0] + ldb.delete(usr1) + objDeleted1 = self.search_guid(guid1) + self.undelete_deleted_with_mod(str(objDeleted1.dn), usr1) + objLive2 = self.search_dn(usr1) + self.assertEqual(objLive2["url"][0],"www.samba.org") + delete_force(self.ldb, usr1) + + def test_undelete_newuser(self): + print "Testing undelete user with a different dn" + usr1="cn=testuser,cn=users," + self.base_dn + usr2="cn=testuser2,cn=users," + self.base_dn + delete_force(self.ldb, usr1) + ldb.add({ + "dn": usr1, + "objectclass": "user", + "description": "test user description", + "samaccountname": "testuser"}) + objLive1 = self.search_dn(usr1) + guid1=objLive1["objectGUID"][0] + ldb.delete(usr1) + objDeleted1 = self.search_guid(guid1) + self.undelete_deleted(str(objDeleted1.dn), usr2, ldb) + objLive2 = self.search_dn(usr2) + delete_force(self.ldb, usr1) + delete_force(self.ldb, usr2) + + def test_undelete_existing(self): + print "Testing undelete user after a user with the same dn has been created" + usr1="cn=testuser,cn=users," + self.base_dn + ldb.add({ + "dn": usr1, + "objectclass": "user", + "description": "test user description", + "samaccountname": "testuser"}) + objLive1 = self.search_dn(usr1) + guid1=objLive1["objectGUID"][0] + ldb.delete(usr1) + ldb.add({ + "dn": usr1, + "objectclass": "user", + "description": "test user description", + "samaccountname": "testuser"}) + objDeleted1 = self.search_guid(guid1) + try: + self.undelete_deleted(str(objDeleted1.dn), usr1, ldb) + self.fail() + except LdbError, (num, _): + self.assertEquals(num,ERR_ENTRY_ALREADY_EXISTS) + + def test_undelete_cross_nc(self): + print "Cross NC undelete" + c1 = "cn=ldaptestcontainer," + self.base_dn; + c2 = "cn=ldaptestcontainer2," + self.configuration_dn + c3 = "cn=ldaptestcontainer," + self.configuration_dn + c4 = "cn=ldaptestcontainer2," + self.base_dn; + ldb.add({ + "dn": c1, + "objectclass": "container"}) + ldb.add({ + "dn": c2, + "objectclass": "container"}) + objLive1 = self.search_dn(c1) + objLive2 = self.search_dn(c2) + guid1=objLive1["objectGUID"][0] + guid2=objLive2["objectGUID"][0] + ldb.delete(c1) + ldb.delete(c2) + objDeleted1 = self.search_guid(guid1) + objDeleted2 = self.search_guid(guid2) + #try to undelete from base dn to config + try: + self.undelete_deleted(str(objDeleted1.dn), c3, ldb) + self.fail() + except LdbError, (num, _): + self.assertEquals(num, ERR_OPERATIONS_ERROR) + #try to undelete from config to base dn + try: + self.undelete_deleted(str(objDeleted2.dn), c4, ldb) + self.fail() + except LdbError, (num, _): + self.assertEquals(num, ERR_OPERATIONS_ERROR) + #assert undeletion will work in same nc + self.undelete_deleted(str(objDeleted1.dn), c4, ldb) + self.undelete_deleted(str(objDeleted2.dn), c3, ldb) + delete_force(self.ldb, c3) + delete_force(self.ldb, c4) + + + if not "://" in host: if os.path.isfile(host): host = "tdb://%s" % host -- 2.11.4.GIT