docs: man locktest: Add missing meta data.
[Samba/gebeck_regimport.git] / source4 / torture / drs / python / getnc_exop.py
blob904c0133334719b8c2e1afaa09866fef7b7a78eb
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
4 # Tests various schema replication scenarios
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 # Usage:
24 # export DC1=dc1_dns_name
25 # export DC2=dc2_dns_name
26 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
30 import drs_base
31 import samba.tests
33 from ldb import SCOPE_BASE
35 from samba.dcerpc import drsuapi, misc, drsblobs
36 from samba.drs_utils import drs_DsBind
39 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
40 """Intended as a semi-black box test case for DsGetNCChanges
41 implementation for extended operations. It should be testing
42 how DsGetNCChanges handles different input params (mostly invalid).
43 Final goal is to make DsGetNCChanges as binary compatible to
44 Windows implementation as possible"""
46 def setUp(self):
47 super(DrsReplicaSyncTestCase, self).setUp()
49 def tearDown(self):
50 super(DrsReplicaSyncTestCase, self).tearDown()
52 def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop):
53 req8 = drsuapi.DsGetNCChangesRequest8()
55 req8.destination_dsa_guid = misc.GUID(dest_dsa)
56 req8.source_dsa_invocation_id = misc.GUID(invocation_id)
57 req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
58 req8.naming_context.dn = unicode(nc_dn_str)
59 req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
60 req8.highwatermark.tmp_highest_usn = 0
61 req8.highwatermark.reserved_usn = 0
62 req8.highwatermark.highest_usn = 0
63 req8.uptodateness_vector = None
64 req8.replica_flags = 0
65 req8.max_object_count = 0
66 req8.max_ndr_size = 402116
67 req8.extended_op = exop
68 req8.fsmo_info = 0
69 req8.partial_attribute_set = None
70 req8.partial_attribute_set_ex = None
71 req8.mapping_ctr.num_mappings = 0
72 req8.mapping_ctr.mappings = None
74 return req8
76 def _ds_bind(self, server_name):
77 binding_str = "ncacn_ip_tcp:%s[print,seal]" % server_name
79 drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
80 (drs_handle, supported_extensions) = drs_DsBind(drs)
81 return (drs, drs_handle)
83 def _determine_fSMORoleOwner(self, fsmo_obj_dn):
84 """Returns (owner, not_owner) pair where:
85 owner: dns name for FSMO owner
86 not_owner: dns name for DC not owning the FSMO"""
87 # collect info to return later
88 fsmo_info_1 = {"dns_name": self.dnsname_dc1,
89 "invocation_id": self.ldb_dc1.get_invocation_id(),
90 "ntds_guid": self.ldb_dc1.get_ntds_GUID()}
91 fsmo_info_2 = {"dns_name": self.dnsname_dc2,
92 "invocation_id": self.ldb_dc2.get_invocation_id(),
93 "ntds_guid": self.ldb_dc2.get_ntds_GUID()}
94 # determine the owner dc
95 res = self.ldb_dc1.search(fsmo_obj_dn,
96 scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
97 assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn
98 fsmo_owner = res[0]["fSMORoleOwner"][0]
99 if fsmo_owner == self.info_dc1["dsServiceName"][0]:
100 return (fsmo_info_1, fsmo_info_2)
101 return (fsmo_info_2, fsmo_info_1)
103 def _check_exop_failed(self, ctr6, expected_failure):
104 self.assertEqual(ctr6.extended_ret, expected_failure)
105 #self.assertEqual(ctr6.object_count, 0)
106 #self.assertEqual(ctr6.first_object, None)
107 self.assertEqual(ctr6.more_data, False)
108 self.assertEqual(ctr6.nc_object_count, 0)
109 self.assertEqual(ctr6.nc_linked_attributes_count, 0)
110 self.assertEqual(ctr6.linked_attributes_count, 0)
111 self.assertEqual(ctr6.linked_attributes, None)
112 self.assertEqual(ctr6.drs_error[0], 0)
114 def test_FSMONotOwner(self):
115 """Test role transfer with against DC not owner of the role"""
116 fsmo_dn = self.ldb_dc1.get_schema_basedn()
117 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
119 req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
120 invocation_id=fsmo_not_owner["invocation_id"],
121 nc_dn_str=fsmo_dn,
122 exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
124 (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
125 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
126 self.assertEqual(level, 6, "Expected level 6 response!")
127 self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_FSMO_NOT_OWNER)
128 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
129 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
131 def test_InvalidDestDSA(self):
132 """Test role transfer with invalid destination DSA guid"""
133 fsmo_dn = self.ldb_dc1.get_schema_basedn()
134 (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
136 req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
137 invocation_id=fsmo_owner["invocation_id"],
138 nc_dn_str=fsmo_dn,
139 exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
141 (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
142 (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
143 self.assertEqual(level, 6, "Expected level 6 response!")
144 self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
145 self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
146 self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))