From 055544321389de5d5cea8e129e0923369e9f558b Mon Sep 17 00:00:00 2001 From: Garming Sam Date: Tue, 26 Jul 2016 16:57:24 +1200 Subject: [PATCH] msds_intid: Add test for schema linked attributes This test only covers the forward link case. NOTE: We can't confirm this against Windows because they prevent us from modifying the schema for the schema classes. Signed-off-by: Garming Sam Reviewed-by: Andrew Bartlett --- selftest/knownfail | 1 + source4/torture/drs/python/repl_schema.py | 87 ++++++++++++++++++++++++++++++- 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/selftest/knownfail b/selftest/knownfail index 397e53c1f08..71b6fcfc864 100644 --- a/selftest/knownfail +++ b/selftest/knownfail @@ -286,3 +286,4 @@ ^samba4.krb5.kdc.*as-req-aes.*fl2000dc # nt4_member and ad_member don't support ntlmv1 ^samba3.blackbox.smbclient_auth.plain.*_member.*option=clientntlmv2auth=no.member.creds.*as.user +^samba4.drs.repl_schema.python.*repl_schema.DrsReplSchemaTestCase.test_classWithCustomLinkAttribute diff --git a/source4/torture/drs/python/repl_schema.py b/source4/torture/drs/python/repl_schema.py index eb508cb0c54..ff22adbf593 100644 --- a/source4/torture/drs/python/repl_schema.py +++ b/source4/torture/drs/python/repl_schema.py @@ -41,7 +41,8 @@ from ldb import ( import ldb import drs_base - +from samba.dcerpc import drsuapi, misc +from samba.drs_utils import drs_DsBind class DrsReplSchemaTestCase(drs_base.DrsBaseTestCase): @@ -50,6 +51,38 @@ class DrsReplSchemaTestCase(drs_base.DrsBaseTestCase): # current Class or Attribute object id obj_id = 0 + def _ds_bind(self, server_name): + binding_str = "ncacn_ip_tcp:%s[seal]" % server_name + + drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials()) + (drs_handle, supported_extensions) = drs_DsBind(drs) + return (drs, drs_handle) + + def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop, + replica_flags=0, max_objects=0): + req8 = drsuapi.DsGetNCChangesRequest8() + + req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID() + req8.source_dsa_invocation_id = misc.GUID(invocation_id) + req8.naming_context = drsuapi.DsReplicaObjectIdentifier() + req8.naming_context.dn = unicode(nc_dn_str) + req8.highwatermark = drsuapi.DsReplicaHighWaterMark() + req8.highwatermark.tmp_highest_usn = 0 + req8.highwatermark.reserved_usn = 0 + req8.highwatermark.highest_usn = 0 + req8.uptodateness_vector = None + req8.replica_flags = replica_flags + req8.max_object_count = max_objects + req8.max_ndr_size = 402116 + req8.extended_op = exop + req8.fsmo_info = 0 + req8.partial_attribute_set = None + req8.partial_attribute_set_ex = None + req8.mapping_ctr.num_mappings = 0 + req8.mapping_ctr.mappings = None + + return req8 + def setUp(self): super(DrsReplSchemaTestCase, self).setUp() @@ -202,6 +235,58 @@ class DrsReplSchemaTestCase(drs_base.DrsBaseTestCase): self._check_object(c_dn) self._check_object(a_dn) + def test_classWithCustomLinkAttribute(self): + """Create new Attribute and a Class, + that has value for newly created attribute. + This should check code path that searches for + AttributeID_id in Schema cache""" + # add new attributeSchema object + (a_ldn, a_dn) = self._schema_new_attr(self.ldb_dc1, "attr-Link-X", 1, + attrs={'linkID':"99990", + "attributeSyntax": "2.5.5.1", + "omSyntax": "127"}) + # add a base classSchema class so we can use our new + # attribute in class definition in a sibling class + (c_ldn, c_dn) = self._schema_new_class(self.ldb_dc1, "cls-Link-Y", 7, + 1, + {"systemMayContain": a_ldn, + "subClassOf": "classSchema"}) + # add new classSchema object with value for a_ldb attribute + (c_ldn, c_dn) = self._schema_new_class(self.ldb_dc1, "cls-Link-Z", 8, + 1, + {"objectClass": ["top", "classSchema", c_ldn], + a_ldn: self.schema_dn}) + # force replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, nc_dn=self.schema_dn, forced=True) + # check objects are replicated + self._check_object(c_dn) + self._check_object(a_dn) + + res = self.ldb_dc1.search(base=a_dn, + scope=SCOPE_BASE, + attrs=["msDS-IntId"]) + self.assertEqual(1, len(res)) + self.assertTrue("msDS-IntId" in res[0]) + int_id = int(res[0]["msDS-IntId"][0]) + if int_id < 0: + int_id += (1 << 32) + + dc_guid_1 = self.ldb_dc1.get_invocation_id() + + drs, drs_handle = self._ds_bind(self.dnsname_dc1) + + req8 = self._exop_req8(dest_dsa=None, + invocation_id=dc_guid_1, + nc_dn_str=c_dn, + exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ, + replica_flags=drsuapi.DRSUAPI_DRS_SYNC_FORCED) + + (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8) + + for link in ctr.linked_attributes: + self.assertTrue(link.attid != int_id, + 'Got %d for both' % link.attid) + def test_attribute(self): """Simple test for attributeSchema replication""" # add new attributeSchema object -- 2.11.4.GIT