s4-selftest/drs Allow some DRS tests to operate against an IP
[Samba.git] / source4 / torture / drs / python / repl_move.py
blob3827c7cb39f9ce6e8f660cc9d4d91313fd9499e5
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
4 # Unix SMB/CIFS implementation.
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2010
6 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 # Usage:
24 # export DC1=dc1_dns_name
25 # export DC2=dc2_dns_name
26 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN repl_move -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
30 import time
31 import uuid
32 import samba.tests
34 from samba.ndr import ndr_unpack
35 from samba.dcerpc import drsblobs
36 from samba.dcerpc import misc
37 from samba.drs_utils import drs_DsBind
39 from ldb import (
40 SCOPE_BASE,
41 SCOPE_SUBTREE,
44 import drs_base
45 import ldb
46 from samba.dcerpc.drsuapi import *
49 class DrsMoveObjectTestCase(drs_base.DrsBaseTestCase):
51 def setUp(self):
52 super(DrsMoveObjectTestCase, self).setUp()
53 # disable automatic replication temporary
54 self._disable_all_repl(self.dnsname_dc1)
55 self._disable_all_repl(self.dnsname_dc2)
57 # make sure DCs are synchronized before the test
58 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
59 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
61 self.top_ou = samba.tests.create_test_ou(self.ldb_dc1,
62 "replica_move")
64 self.ou1_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU1")
65 self.ou1_dn.add_base(self.top_ou)
66 ou1 = {}
67 ou1["dn"] = self.ou1_dn
68 ou1["objectclass"] = "organizationalUnit"
69 ou1["ou"] = self.ou1_dn.get_component_value(0)
70 self.ldb_dc1.add(ou1)
72 self.ou2_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU2")
73 self.ou2_dn.add_base(self.top_ou)
74 ou2 = {}
75 ou2["dn"] = self.ou2_dn
76 ou2["objectclass"] = "organizationalUnit"
77 ou2["ou"] = self.ou2_dn.get_component_value(0)
78 self.ldb_dc1.add(ou2)
80 # trigger replication from DC1 to DC2
81 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
82 self.dc1_guid = self.ldb_dc1.get_invocation_id()
83 self.dc2_guid = self.ldb_dc2.get_invocation_id()
85 self.drs_dc1 = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)
86 self.drs_dc2 = self._ds_bind(self.dnsname_dc2, ip=self.url_dc2)
88 def tearDown(self):
89 try:
90 self.ldb_dc1.delete(self.top_ou, ["tree_delete:1"])
91 except ldb.LdbError as e:
92 (enum, string) = e.args
93 if enum == ldb.ERR_NO_SUCH_OBJECT:
94 pass
96 self._enable_all_repl(self.dnsname_dc1)
97 self._enable_all_repl(self.dnsname_dc2)
98 super(DrsMoveObjectTestCase, self).tearDown()
100 def _make_username(self):
101 return "DrsMoveU_" + time.strftime("%s", time.gmtime())
103 def _check_metadata(self, user_dn, sam_ldb, drs, metadata, expected):
104 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, metadata[0])
106 self.assertEqual(len(repl.ctr.array), len(expected))
108 i = 0
109 for o in repl.ctr.array:
110 e = expected[i]
111 (attid, orig_dsa, version) = e
112 self.assertEqual(attid, o.attid,
113 "(LDAP) Wrong attid "
114 "for expected value %d, wanted 0x%08x got 0x%08x"
115 % (i, attid, o.attid))
116 self.assertEqual(o.originating_invocation_id,
117 misc.GUID(orig_dsa),
118 "(LDAP) Wrong originating_invocation_id "
119 "for expected value %d, attid 0x%08x, wanted %s got %s"
120 % (i, o.attid,
121 misc.GUID(orig_dsa),
122 o.originating_invocation_id))
123 # Allow version to be skipped when it does not matter
124 if version is not None:
125 self.assertEqual(o.version, version,
126 "(LDAP) Wrong version for expected value %d, "
127 "attid 0x%08x, "
128 "wanted %d got %d"
129 % (i, o.attid,
130 version, o.version))
131 i = i + 1
133 if drs is None:
134 return
136 req8 = DsGetNCChangesRequest8()
138 req8.source_dsa_invocation_id = misc.GUID(sam_ldb.get_invocation_id())
139 req8.naming_context = DsReplicaObjectIdentifier()
140 req8.naming_context.dn = str(user_dn)
141 req8.highwatermark = DsReplicaHighWaterMark()
142 req8.highwatermark.tmp_highest_usn = 0
143 req8.highwatermark.reserved_usn = 0
144 req8.highwatermark.highest_usn = 0
145 req8.uptodateness_vector = None
146 req8.replica_flags = DRSUAPI_DRS_SYNC_FORCED
147 req8.max_object_count = 1
148 req8.max_ndr_size = 402116
149 req8.extended_op = DRSUAPI_EXOP_REPL_OBJ
150 req8.fsmo_info = 0
151 req8.partial_attribute_set = None
152 req8.partial_attribute_set_ex = None
153 req8.mapping_ctr.num_mappings = 0
154 req8.mapping_ctr.mappings = None
156 (drs_conn, drs_handle) = drs
158 (level, drs_ctr) = drs_conn.DsGetNCChanges(drs_handle, 8, req8)
159 self.assertEqual(level, 6)
160 self.assertEqual(drs_ctr.object_count, 1)
162 self.assertEqual(len(drs_ctr.first_object.meta_data_ctr.meta_data), len(expected) - 1)
163 att_idx = 0
164 for o in drs_ctr.first_object.meta_data_ctr.meta_data:
165 i = 0
166 drs_attid = drs_ctr.first_object.object.attribute_ctr.attributes[att_idx]
167 e = expected[i]
168 (attid, orig_dsa, version) = e
170 # Skip the RDN from the expected set, it is not sent over DRS
171 if (user_dn.get_rdn_name().upper() == "CN"
172 and attid == DRSUAPI_ATTID_cn) \
173 or (user_dn.get_rdn_name().upper() == "OU"
174 and attid == DRSUAPI_ATTID_ou):
175 i = i + 1
176 e = expected[i]
177 (attid, orig_dsa, version) = e
179 self.assertEqual(attid, drs_attid.attid,
180 "(DRS) Wrong attid "
181 "for expected value %d, wanted 0x%08x got 0x%08x"
182 % (i, attid, drs_attid.attid))
184 self.assertEqual(o.originating_invocation_id,
185 misc.GUID(orig_dsa),
186 "(DRS) Wrong originating_invocation_id "
187 "for expected value %d, attid 0x%08x, wanted %s got %s"
188 % (i, attid,
189 misc.GUID(orig_dsa),
190 o.originating_invocation_id))
191 # Allow version to be skipped when it does not matter
192 if version is not None:
193 self.assertEqual(o.version, version,
194 "(DRS) Wrong version for expected value %d, "
195 "attid 0x%08x, "
196 "wanted %d got %d"
197 % (i, attid, version, o.version))
198 break
199 i = i + 1
200 att_idx = att_idx + 1
202 # now also used to check the group
203 def _check_obj(self, sam_ldb, obj_orig, is_deleted, expected_metadata=None, drs=None):
204 # search the user by guid as it may be deleted
205 guid_str = self._GUID_string(obj_orig["objectGUID"][0])
206 res = sam_ldb.search(base='<GUID=%s>' % guid_str,
207 controls=["show_deleted:1"],
208 attrs=["*", "parentGUID",
209 "replPropertyMetaData"])
210 self.assertEqual(len(res), 1)
211 user_cur = res[0]
212 rdn_orig = str(obj_orig[user_cur.dn.get_rdn_name()][0])
213 rdn_cur = str(user_cur[user_cur.dn.get_rdn_name()][0])
214 name_orig = str(obj_orig["name"][0])
215 name_cur = str(user_cur["name"][0])
216 dn_orig = obj_orig["dn"]
217 dn_cur = user_cur["dn"]
218 # now check properties of the user
219 if is_deleted:
220 self.assertTrue("isDeleted" in user_cur)
221 self.assertEqual(rdn_cur.split('\n')[0], rdn_orig)
222 self.assertEqual(name_cur.split('\n')[0], name_orig)
223 self.assertEqual(dn_cur.get_rdn_value().split('\n')[0],
224 dn_orig.get_rdn_value())
225 self.assertEqual(name_cur, rdn_cur)
226 else:
227 self.assertFalse("isDeleted" in user_cur)
228 self.assertEqual(rdn_cur, rdn_orig)
229 self.assertEqual(name_cur, name_orig)
230 self.assertEqual(dn_cur, dn_orig)
231 self.assertEqual(name_cur, rdn_cur)
232 parent_cur = user_cur["parentGUID"][0]
233 try:
234 parent_orig = obj_orig["parentGUID"][0]
235 self.assertEqual(parent_orig, parent_cur)
236 except KeyError:
237 pass
238 self.assertEqual(name_cur, user_cur.dn.get_rdn_value())
240 if expected_metadata is not None:
241 self._check_metadata(dn_cur, sam_ldb, drs, user_cur["replPropertyMetaData"],
242 expected_metadata)
244 return user_cur
246 def test_ReplicateMoveObject1(self):
247 """Verifies how a moved container with a user inside is replicated between two DCs.
248 This test should verify that:
249 - the OU is replicated properly
250 - the OU is renamed
251 - We verify that after replication,
252 that the user has the correct DN (under OU2)
253 - the OU is deleted
254 - the OU is modified on DC2
255 - We verify that after replication,
256 that the user has the correct DN (deleted) and has not description
259 # work-out unique username to test with
260 username = self._make_username()
262 # create user on DC1
263 self.ldb_dc1.newuser(username=username,
264 userou="ou=%s,ou=%s"
265 % (self.ou1_dn.get_component_value(0),
266 self.top_ou.get_component_value(0)),
267 password=None, setpassword=False)
268 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
269 scope=SCOPE_SUBTREE,
270 expression="(samAccountName=%s)" % username,
271 attrs=["*", "parentGUID"])
272 self.assertEqual(len(ldb_res), 1)
273 user_orig = ldb_res[0]
274 user_dn = ldb_res[0]["dn"]
276 # check user info on DC1
277 print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])))
278 initial_metadata = [
279 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
280 (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
281 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
282 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
283 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
284 (DRSUAPI_ATTID_name, self.dc1_guid, 1),
285 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
286 (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
287 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
288 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
289 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
290 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
291 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
292 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
293 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
294 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
295 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
296 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
297 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
298 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
299 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
300 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
302 self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
303 obj_orig=user_orig, is_deleted=False,
304 expected_metadata=initial_metadata)
306 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
307 new_dn.add_base(self.ou2_dn)
308 self.ldb_dc1.rename(user_dn, new_dn)
309 ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
310 scope=SCOPE_SUBTREE,
311 expression="(samAccountName=%s)" % username,
312 attrs=["*", "parentGUID"])
313 self.assertEqual(len(ldb_res), 1)
315 user_moved_orig = ldb_res[0]
316 user_moved_dn = ldb_res[0]["dn"]
318 moved_metadata = [
319 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
320 (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
321 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
322 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
323 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
324 (DRSUAPI_ATTID_name, self.dc1_guid, 2),
325 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
326 (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
327 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
328 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
329 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
330 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
331 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
332 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
333 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
334 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
335 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
336 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
337 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
338 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
339 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
340 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
342 # check user info on DC1 after rename - should be valid user
343 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
344 obj_orig=user_moved_orig,
345 is_deleted=False,
346 expected_metadata=moved_metadata)
348 # trigger replication from DC1 to DC2
349 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
350 moved_metadata_dc2 = [
351 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
352 (DRSUAPI_ATTID_cn, self.dc2_guid, 1),
353 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
354 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
355 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
356 (DRSUAPI_ATTID_name, self.dc1_guid, 2),
357 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
358 (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
359 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
360 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
361 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
362 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
363 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
364 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
365 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
366 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
367 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
368 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
369 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
370 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
371 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
372 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
374 # check user info on DC2 - should be valid user
375 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
376 obj_orig=user_moved_orig,
377 is_deleted=False,
378 expected_metadata=moved_metadata_dc2)
380 # delete user on DC1
381 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
382 deleted_metadata = [
383 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
384 (DRSUAPI_ATTID_cn, self.dc1_guid, 2),
385 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
386 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
387 (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
388 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
389 (DRSUAPI_ATTID_name, self.dc1_guid, 3),
390 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
391 (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
392 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
393 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
394 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
395 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
396 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
397 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
398 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
399 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
400 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
401 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
402 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
403 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
404 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
405 (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
406 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
407 (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
409 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True, expected_metadata=deleted_metadata)
411 # Modify description on DC2. This triggers a replication, but
412 # not of 'name' and so a bug in Samba regarding the DN.
413 msg = ldb.Message()
414 msg.dn = new_dn
415 msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description")
416 self.ldb_dc2.modify(msg)
418 modified_metadata = [
419 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
420 (DRSUAPI_ATTID_cn, self.dc2_guid, 1),
421 (DRSUAPI_ATTID_description, self.dc2_guid, 1),
422 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
423 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
424 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
425 (DRSUAPI_ATTID_name, self.dc1_guid, 2),
426 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
427 (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
428 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
429 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
430 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
431 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
432 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
433 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
434 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
435 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
436 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
437 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
438 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
439 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
440 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
441 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
443 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
444 obj_orig=user_moved_orig,
445 is_deleted=False,
446 expected_metadata=modified_metadata)
448 # trigger replication from DC1 to DC2, for cleanup
449 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
451 deleted_modified_metadata_dc2 = [
452 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
453 (DRSUAPI_ATTID_cn, self.dc2_guid, 2),
454 (DRSUAPI_ATTID_description, self.dc2_guid, 2),
455 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
456 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
457 (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
458 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
459 (DRSUAPI_ATTID_name, self.dc1_guid, 3),
460 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
461 (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
462 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
463 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
464 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
465 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
466 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
467 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
468 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
469 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
470 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
471 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
472 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
473 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
474 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
475 (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
476 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
477 (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
479 # check user info on DC2 - should be deleted user
480 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
481 obj_orig=user_moved_orig,
482 is_deleted=True,
483 expected_metadata=deleted_modified_metadata_dc2)
484 self.assertFalse("description" in user_cur)
486 # trigger replication from DC2 to DC1, for cleanup
487 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
489 deleted_modified_metadata_dc1 = [
490 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
491 (DRSUAPI_ATTID_cn, self.dc1_guid, 2),
492 (DRSUAPI_ATTID_description, self.dc2_guid, 2),
493 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
494 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
495 (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
496 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
497 (DRSUAPI_ATTID_name, self.dc1_guid, 3),
498 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
499 (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
500 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
501 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
502 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
503 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
504 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
505 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
506 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
507 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
508 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
509 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
510 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
511 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
512 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
513 (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
514 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
515 (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
517 # check user info on DC1 - should be deleted user
518 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
519 obj_orig=user_moved_orig,
520 is_deleted=True,
521 expected_metadata=deleted_modified_metadata_dc1)
522 self.assertFalse("description" in user_cur)
524 def test_ReplicateMoveObject2(self):
525 """Verifies how a moved container with a user inside is not
526 replicated between two DCs as no replication is triggered
527 This test should verify that:
528 - the OU is not replicated
529 - the user is not replicated
532 # work-out unique username to test with
533 username = self._make_username()
535 # create user on DC1
536 self.ldb_dc1.newuser(username=username,
537 userou="ou=%s,ou=%s"
538 % (self.ou1_dn.get_component_value(0),
539 self.top_ou.get_component_value(0)),
540 password=None, setpassword=False)
541 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
542 scope=SCOPE_SUBTREE,
543 expression="(samAccountName=%s)" % username,
544 attrs=["*", "parentGUID"])
545 self.assertEqual(len(ldb_res), 1)
546 user_orig = ldb_res[0]
547 user_dn = ldb_res[0]["dn"]
549 # check user info on DC1
550 print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])))
551 initial_metadata = [
552 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
553 (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
554 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
555 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
556 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
557 (DRSUAPI_ATTID_name, self.dc1_guid, 1),
558 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
559 (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
560 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
561 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
562 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
563 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
564 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
565 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
566 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
567 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
568 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
569 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
570 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
571 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
572 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
573 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
575 self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
576 obj_orig=user_orig, is_deleted=False,
577 expected_metadata=initial_metadata)
579 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
580 new_dn.add_base(self.ou2_dn)
581 self.ldb_dc1.rename(user_dn, new_dn)
582 ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
583 scope=SCOPE_SUBTREE,
584 expression="(samAccountName=%s)" % username,
585 attrs=["*", "parentGUID"])
586 self.assertEqual(len(ldb_res), 1)
587 user_moved_orig = ldb_res[0]
589 moved_metadata = [
590 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
591 (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
592 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
593 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
594 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
595 (DRSUAPI_ATTID_name, self.dc1_guid, 2),
596 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
597 (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
598 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
599 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
600 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
601 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
602 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
603 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
604 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
605 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
606 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
607 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
608 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
609 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
610 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
611 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
613 # check user info on DC1 after rename - should be valid user
614 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
615 obj_orig=user_moved_orig,
616 is_deleted=False,
617 expected_metadata=moved_metadata)
619 # check user info on DC2 - should not be there, we have not done replication
620 ldb_res = self.ldb_dc2.search(base=self.ou2_dn,
621 scope=SCOPE_SUBTREE,
622 expression="(samAccountName=%s)" % username,
623 attrs=["*", "parentGUID"])
624 self.assertEqual(len(ldb_res), 0)
626 # delete user on DC1
627 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
629 deleted_metadata_dc1 = [
630 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
631 (DRSUAPI_ATTID_cn, self.dc1_guid, 2),
632 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
633 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
634 (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
635 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
636 (DRSUAPI_ATTID_name, self.dc1_guid, 3),
637 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
638 (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
639 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
640 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
641 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
642 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
643 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
644 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
645 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
646 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
647 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
648 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
649 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
650 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
651 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
652 (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
653 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
654 (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
656 # check user info on DC1 - should be deleted user
657 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
658 obj_orig=user_moved_orig,
659 is_deleted=True,
660 expected_metadata=deleted_metadata_dc1)
661 # trigger replication from DC1 to DC2, for cleanup
662 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
664 deleted_metadata_dc2 = [
665 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
666 (DRSUAPI_ATTID_cn, self.dc2_guid, 1),
667 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
668 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
669 (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
670 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
671 (DRSUAPI_ATTID_name, self.dc1_guid, 3),
672 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
673 (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
674 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
675 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
676 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
677 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
678 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
679 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
680 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
681 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
682 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
683 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
684 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
685 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
686 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
687 (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
688 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
689 (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
691 # check user info on DC2 - should be deleted user
692 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
693 obj_orig=user_moved_orig,
694 is_deleted=True,
695 expected_metadata=deleted_metadata_dc2)
697 # trigger replication from DC2 to DC1, for cleanup
698 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
700 # check user info on DC1 - should be deleted user
701 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
702 obj_orig=user_moved_orig,
703 is_deleted=True,
704 expected_metadata=deleted_metadata_dc1)
706 def test_ReplicateMoveObject3(self):
707 """Verifies how a moved container with a user inside is replicated between two DCs.
708 This test should verify that:
709 - the OU is created on DC1
710 - the OU is renamed on DC1
711 - We verify that after replication,
712 that the user has the correct DN (under OU2).
715 # work-out unique username to test with
716 username = self._make_username()
718 # create user on DC1
719 self.ldb_dc1.newuser(username=username,
720 userou="ou=%s,ou=%s"
721 % (self.ou1_dn.get_component_value(0),
722 self.top_ou.get_component_value(0)),
723 password=None, setpassword=False)
724 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
725 scope=SCOPE_SUBTREE,
726 expression="(samAccountName=%s)" % username,
727 attrs=["*", "parentGUID"])
728 self.assertEqual(len(ldb_res), 1)
729 user_orig = ldb_res[0]
730 user_dn = ldb_res[0]["dn"]
732 # check user info on DC1
733 print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])))
734 initial_metadata = [
735 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
736 (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
737 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
738 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
739 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
740 (DRSUAPI_ATTID_name, self.dc1_guid, 1),
741 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
742 (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
743 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
744 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
745 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
746 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
747 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
748 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
749 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
750 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
751 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
752 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
753 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
754 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
755 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
756 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
758 self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
759 obj_orig=user_orig, is_deleted=False,
760 expected_metadata=initial_metadata)
762 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
763 new_dn.add_base(self.ou2_dn)
764 self.ldb_dc1.rename(user_dn, new_dn)
765 ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
766 scope=SCOPE_SUBTREE,
767 expression="(samAccountName=%s)" % username,
768 attrs=["*", "parentGUID"])
769 self.assertEqual(len(ldb_res), 1)
771 user_moved_orig = ldb_res[0]
772 user_moved_dn = ldb_res[0]["dn"]
774 # trigger replication from DC1 to DC2
775 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
776 moved_metadata = [
777 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
778 (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
779 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
780 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
781 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
782 (DRSUAPI_ATTID_name, self.dc1_guid, 2),
783 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
784 (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
785 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
786 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
787 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
788 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
789 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
790 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
791 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
792 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
793 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
794 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
795 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
796 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
797 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
798 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
800 # check user info on DC1 after rename - should be valid user
801 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
802 obj_orig=user_moved_orig,
803 is_deleted=False,
804 expected_metadata=moved_metadata)
806 # delete user on DC1
807 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
808 deleted_metadata_dc1 = [
809 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
810 (DRSUAPI_ATTID_cn, self.dc1_guid, 2),
811 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
812 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
813 (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
814 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
815 (DRSUAPI_ATTID_name, self.dc1_guid, 3),
816 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
817 (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
818 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
819 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
820 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
821 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
822 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
823 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
824 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
825 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
826 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
827 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
828 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
829 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
830 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
831 (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
832 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
833 (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
835 # check user info on DC1 - should be deleted user
836 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
837 obj_orig=user_moved_orig,
838 is_deleted=True,
839 expected_metadata=deleted_metadata_dc1)
841 # trigger replication from DC2 to DC1
842 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
844 # check user info on DC1 - should be deleted user
845 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
846 obj_orig=user_moved_orig,
847 is_deleted=True,
848 expected_metadata=deleted_metadata_dc1)
850 # trigger replication from DC1 to DC2, for cleanup
851 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
853 deleted_metadata_dc2 = [
854 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
855 (DRSUAPI_ATTID_cn, self.dc2_guid, 2),
856 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
857 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
858 (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
859 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
860 (DRSUAPI_ATTID_name, self.dc1_guid, 3),
861 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
862 (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
863 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
864 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
865 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
866 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
867 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
868 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
869 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
870 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
871 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
872 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
873 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
874 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
875 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
876 (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
877 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
878 (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
880 # check user info on DC2 - should be deleted user
881 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
882 obj_orig=user_moved_orig,
883 is_deleted=True,
884 expected_metadata=deleted_metadata_dc2)
886 def test_ReplicateMoveObject3b(self):
887 """Verifies how a moved container with a user inside is replicated between two DCs.
888 This test should verify that:
889 - the OU is created on DC1
890 - the OU is renamed on DC1
891 - We verify that after replication,
892 that the user has the correct DN (under OU2).
895 # work-out unique username to test with
896 username = self._make_username()
898 # create user on DC1
899 self.ldb_dc1.newuser(username=username,
900 userou="ou=%s,ou=%s"
901 % (self.ou1_dn.get_component_value(0),
902 self.top_ou.get_component_value(0)),
903 password=None, setpassword=False)
904 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
905 scope=SCOPE_SUBTREE,
906 expression="(samAccountName=%s)" % username,
907 attrs=["*", "parentGUID"])
908 self.assertEqual(len(ldb_res), 1)
909 user_orig = ldb_res[0]
910 user_dn = ldb_res[0]["dn"]
912 # check user info on DC1
913 print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])))
914 initial_metadata = [
915 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
916 (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
917 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
918 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
919 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
920 (DRSUAPI_ATTID_name, self.dc1_guid, 1),
921 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
922 (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
923 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
924 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
925 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
926 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
927 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
928 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
929 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
930 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
931 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
932 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
933 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
934 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
935 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
936 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
938 self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
939 obj_orig=user_orig, is_deleted=False,
940 expected_metadata=initial_metadata)
942 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
943 new_dn.add_base(self.ou2_dn)
944 self.ldb_dc1.rename(user_dn, new_dn)
945 ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
946 scope=SCOPE_SUBTREE,
947 expression="(samAccountName=%s)" % username,
948 attrs=["*", "parentGUID"])
949 self.assertEqual(len(ldb_res), 1)
951 user_moved_orig = ldb_res[0]
952 user_moved_dn = ldb_res[0]["dn"]
954 # trigger replication from DC2 (Which has never seen the object) to DC1
955 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
956 moved_metadata = [
957 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
958 (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
959 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
960 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
961 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
962 (DRSUAPI_ATTID_name, self.dc1_guid, 2),
963 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
964 (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
965 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
966 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
967 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
968 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
969 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
970 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
971 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
972 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
973 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
974 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
975 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
976 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
977 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
978 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
980 # check user info on DC1 after rename - should be valid user
981 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
982 obj_orig=user_moved_orig,
983 is_deleted=False,
984 expected_metadata=moved_metadata)
986 # delete user on DC1
987 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
988 deleted_metadata_dc1 = [
989 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
990 (DRSUAPI_ATTID_cn, self.dc1_guid, 2),
991 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
992 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
993 (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
994 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
995 (DRSUAPI_ATTID_name, self.dc1_guid, 3),
996 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
997 (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
998 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
999 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
1000 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
1001 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
1002 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
1003 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
1004 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
1005 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
1006 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
1007 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
1008 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
1009 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
1010 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
1011 (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
1012 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
1013 (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
1015 # check user info on DC1 - should be deleted user
1016 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
1017 obj_orig=user_moved_orig,
1018 is_deleted=True,
1019 expected_metadata=deleted_metadata_dc1)
1021 # trigger replication from DC2 to DC1
1022 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
1024 # check user info on DC1 - should be deleted user
1025 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
1026 obj_orig=user_moved_orig,
1027 is_deleted=True,
1028 expected_metadata=deleted_metadata_dc1)
1030 # trigger replication from DC1 to DC2, for cleanup
1031 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1033 deleted_metadata_dc2 = [
1034 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
1035 (DRSUAPI_ATTID_cn, self.dc2_guid, 1),
1036 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
1037 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
1038 (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
1039 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
1040 (DRSUAPI_ATTID_name, self.dc1_guid, 3),
1041 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
1042 (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
1043 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
1044 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
1045 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
1046 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
1047 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
1048 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
1049 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
1050 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
1051 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
1052 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
1053 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
1054 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
1055 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
1056 (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
1057 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
1058 (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
1060 # check user info on DC2 - should be deleted user
1061 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
1062 obj_orig=user_moved_orig,
1063 is_deleted=True,
1064 expected_metadata=deleted_metadata_dc2)
1066 def test_ReplicateMoveObject4(self):
1067 """Verifies how a moved container with a user inside is replicated between two DCs.
1068 This test should verify that:
1069 - the OU is replicated properly
1070 - the user is modified on DC2
1071 - the OU is renamed on DC1
1072 - We verify that after replication DC1 -> DC2,
1073 that the user has the correct DN (under OU2), and the description
1076 # work-out unique username to test with
1077 username = self._make_username()
1079 # create user on DC1
1080 self.ldb_dc1.newuser(username=username,
1081 userou="ou=%s,ou=%s"
1082 % (self.ou1_dn.get_component_value(0),
1083 self.top_ou.get_component_value(0)),
1084 password=None, setpassword=False)
1085 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
1086 scope=SCOPE_SUBTREE,
1087 expression="(samAccountName=%s)" % username,
1088 attrs=["*", "parentGUID"])
1089 self.assertEqual(len(ldb_res), 1)
1090 user_orig = ldb_res[0]
1091 user_dn = ldb_res[0]["dn"]
1093 # check user info on DC1
1094 print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])))
1095 initial_metadata = [
1096 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
1097 (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
1098 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
1099 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
1100 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
1101 (DRSUAPI_ATTID_name, self.dc1_guid, 1),
1102 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
1103 (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
1104 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
1105 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
1106 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
1107 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
1108 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
1109 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
1110 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
1111 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
1112 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
1113 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
1114 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
1115 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
1116 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
1117 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
1119 self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
1120 obj_orig=user_orig, is_deleted=False,
1121 expected_metadata=initial_metadata)
1123 # trigger replication from DC1 to DC2
1124 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1126 initial_metadata_dc2 = [
1127 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
1128 (DRSUAPI_ATTID_cn, self.dc2_guid, 1),
1129 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
1130 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
1131 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
1132 (DRSUAPI_ATTID_name, self.dc1_guid, 1),
1133 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
1134 (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
1135 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
1136 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
1137 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
1138 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
1139 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
1140 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
1141 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
1142 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
1143 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
1144 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
1145 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
1146 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
1147 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
1148 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
1150 # check user info on DC2 - should still be valid user
1151 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
1152 obj_orig=user_orig, is_deleted=False,
1153 expected_metadata=initial_metadata_dc2)
1155 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
1156 new_dn.add_base(self.ou2_dn)
1157 self.ldb_dc1.rename(user_dn, new_dn)
1158 ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
1159 scope=SCOPE_SUBTREE,
1160 expression="(samAccountName=%s)" % username,
1161 attrs=["*", "parentGUID"])
1162 self.assertEqual(len(ldb_res), 1)
1164 user_moved_orig = ldb_res[0]
1165 user_moved_dn = ldb_res[0]["dn"]
1167 moved_metadata = [
1168 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
1169 (DRSUAPI_ATTID_cn, self.dc1_guid, 1),
1170 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
1171 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
1172 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
1173 (DRSUAPI_ATTID_name, self.dc1_guid, 2),
1174 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
1175 (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
1176 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
1177 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
1178 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
1179 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
1180 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
1181 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
1182 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
1183 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
1184 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
1185 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
1186 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
1187 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
1188 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
1189 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
1191 # check user info on DC1 after rename - should be valid user
1192 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
1193 obj_orig=user_moved_orig,
1194 is_deleted=False,
1195 expected_metadata=moved_metadata)
1197 # Modify description on DC2. This triggers a replication, but
1198 # not of 'name' and so a bug in Samba regarding the DN.
1199 msg = ldb.Message()
1200 msg.dn = user_dn
1201 msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description")
1202 self.ldb_dc2.modify(msg)
1204 modified_metadata = [
1205 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
1206 (DRSUAPI_ATTID_cn, self.dc2_guid, 1),
1207 (DRSUAPI_ATTID_description, self.dc2_guid, 1),
1208 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
1209 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
1210 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
1211 (DRSUAPI_ATTID_name, self.dc1_guid, 1),
1212 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
1213 (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
1214 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
1215 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
1216 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
1217 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
1218 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
1219 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
1220 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
1221 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
1222 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
1223 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
1224 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
1225 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
1226 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
1227 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
1229 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
1230 obj_orig=user_orig,
1231 is_deleted=False,
1232 expected_metadata=modified_metadata)
1234 # trigger replication from DC1 to DC2
1235 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1237 modified_renamed_metadata = [
1238 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
1239 (DRSUAPI_ATTID_cn, self.dc2_guid, 2),
1240 (DRSUAPI_ATTID_description, self.dc2_guid, 1),
1241 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
1242 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
1243 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
1244 (DRSUAPI_ATTID_name, self.dc1_guid, 2),
1245 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
1246 (DRSUAPI_ATTID_codePage, self.dc1_guid, 1),
1247 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 1),
1248 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
1249 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
1250 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
1251 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
1252 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 1),
1253 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 1),
1254 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
1255 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 1),
1256 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
1257 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
1258 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 1),
1259 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 1),
1260 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 1)]
1262 # check user info on DC2 - should still be valid user
1263 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
1264 obj_orig=user_moved_orig,
1265 is_deleted=False,
1266 expected_metadata=modified_renamed_metadata)
1268 self.assertTrue("description" in user_cur)
1270 # delete user on DC1
1271 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
1272 deleted_metadata_dc1 = [
1273 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
1274 (DRSUAPI_ATTID_cn, self.dc1_guid, 2),
1275 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
1276 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
1277 (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
1278 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
1279 (DRSUAPI_ATTID_name, self.dc1_guid, 3),
1280 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
1281 (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
1282 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
1283 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
1284 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
1285 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
1286 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
1287 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
1288 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
1289 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
1290 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
1291 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
1292 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
1293 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
1294 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
1295 (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
1296 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
1297 (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
1299 # check user info on DC1 - should be deleted user
1300 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
1301 obj_orig=user_moved_orig,
1302 is_deleted=True,
1303 expected_metadata=deleted_metadata_dc1)
1305 # trigger replication from DC2 to DC1
1306 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
1307 # check user info on DC2 - should still be valid user
1308 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
1309 obj_orig=user_moved_orig,
1310 is_deleted=False,
1311 expected_metadata=modified_renamed_metadata)
1313 self.assertTrue("description" in user_cur)
1315 deleted_metadata_dc1 = [
1316 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
1317 (DRSUAPI_ATTID_cn, self.dc1_guid, 2),
1318 (DRSUAPI_ATTID_description, self.dc1_guid, 2),
1319 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
1320 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
1321 (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
1322 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
1323 (DRSUAPI_ATTID_name, self.dc1_guid, 3),
1324 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
1325 (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
1326 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
1327 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
1328 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
1329 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
1330 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
1331 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
1332 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
1333 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
1334 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
1335 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
1336 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
1337 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
1338 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
1339 (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
1340 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
1341 (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
1343 # check user info on DC1 - should be deleted user
1344 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
1345 obj_orig=user_moved_orig,
1346 is_deleted=True,
1347 expected_metadata=deleted_metadata_dc1)
1349 self.assertFalse("description" in user_cur)
1351 # trigger replication from DC1 to DC2, for cleanup
1352 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1354 deleted_metadata_dc2 = [
1355 (DRSUAPI_ATTID_objectClass, self.dc1_guid, 1),
1356 (DRSUAPI_ATTID_cn, self.dc2_guid, 3),
1357 (DRSUAPI_ATTID_description, self.dc1_guid, 2),
1358 (DRSUAPI_ATTID_instanceType, self.dc1_guid, 1),
1359 (DRSUAPI_ATTID_whenCreated, self.dc1_guid, 1),
1360 (DRSUAPI_ATTID_isDeleted, self.dc1_guid, 1),
1361 (DRSUAPI_ATTID_ntSecurityDescriptor, self.dc1_guid, 1),
1362 (DRSUAPI_ATTID_name, self.dc1_guid, 3),
1363 (DRSUAPI_ATTID_userAccountControl, self.dc1_guid, None),
1364 (DRSUAPI_ATTID_codePage, self.dc1_guid, 2),
1365 (DRSUAPI_ATTID_countryCode, self.dc1_guid, 2),
1366 (DRSUAPI_ATTID_dBCSPwd, self.dc1_guid, 1),
1367 (DRSUAPI_ATTID_logonHours, self.dc1_guid, 1),
1368 (DRSUAPI_ATTID_unicodePwd, self.dc1_guid, 1),
1369 (DRSUAPI_ATTID_ntPwdHistory, self.dc1_guid, 1),
1370 (DRSUAPI_ATTID_pwdLastSet, self.dc1_guid, 2),
1371 (DRSUAPI_ATTID_primaryGroupID, self.dc1_guid, 2),
1372 (DRSUAPI_ATTID_objectSid, self.dc1_guid, 1),
1373 (DRSUAPI_ATTID_accountExpires, self.dc1_guid, 2),
1374 (DRSUAPI_ATTID_lmPwdHistory, self.dc1_guid, 1),
1375 (DRSUAPI_ATTID_sAMAccountName, self.dc1_guid, 1),
1376 (DRSUAPI_ATTID_sAMAccountType, self.dc1_guid, 2),
1377 (DRSUAPI_ATTID_userPrincipalName, self.dc1_guid, 2),
1378 (DRSUAPI_ATTID_lastKnownParent, self.dc1_guid, 1),
1379 (DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
1380 (DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
1382 # check user info on DC2 - should be deleted user
1383 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, drs=self.drs_dc2,
1384 obj_orig=user_moved_orig,
1385 is_deleted=True,
1386 expected_metadata=deleted_metadata_dc2)
1388 self.assertFalse("description" in user_cur)
1390 def test_ReplicateMoveObject5(self):
1391 """Verifies how a moved container with a user inside is replicated between two DCs.
1392 This test should verify that:
1393 - the OU is replicated properly
1394 - the user is modified on DC2
1395 - the OU is renamed on DC1
1396 - We verify that after replication DC2 -> DC1,
1397 that the user has the correct DN (under OU2), and the description
1400 # work-out unique username to test with
1401 username = self._make_username()
1403 # create user on DC1
1404 self.ldb_dc1.newuser(username=username,
1405 userou="ou=%s,ou=%s"
1406 % (self.ou1_dn.get_component_value(0),
1407 self.top_ou.get_component_value(0)),
1408 password=None, setpassword=False)
1409 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
1410 scope=SCOPE_SUBTREE,
1411 expression="(samAccountName=%s)" % username,
1412 attrs=["*", "parentGUID"])
1413 self.assertEqual(len(ldb_res), 1)
1414 user_orig = ldb_res[0]
1415 user_dn = ldb_res[0]["dn"]
1417 # trigger replication from DC1 to DC2
1418 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1419 # check user info on DC2 - should still be valid user
1420 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
1422 # check user info on DC1
1423 print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])))
1424 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
1426 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
1427 new_dn.add_base(self.ou2_dn)
1428 self.ldb_dc1.rename(user_dn, new_dn)
1429 ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
1430 scope=SCOPE_SUBTREE,
1431 expression="(samAccountName=%s)" % username,
1432 attrs=["*", "parentGUID"])
1433 self.assertEqual(len(ldb_res), 1)
1435 user_moved_orig = ldb_res[0]
1436 user_moved_dn = ldb_res[0]["dn"]
1438 # Modify description on DC2. This triggers a replication, but
1439 # not of 'name' and so a bug in Samba regarding the DN.
1440 msg = ldb.Message()
1441 msg.dn = user_dn
1442 msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description")
1443 self.ldb_dc2.modify(msg)
1445 # trigger replication from DC2 to DC1
1446 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
1447 # check user info on DC1 - should still be valid user
1448 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=False)
1449 self.assertTrue("description" in user_cur)
1451 # trigger replication from DC1 to DC2
1452 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1453 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False)
1454 self.assertTrue("description" in user_cur)
1456 # delete user on DC2
1457 self.ldb_dc2.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
1458 # trigger replication from DC2 to DC1 for cleanup
1459 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
1461 # check user info on DC1 - should be deleted user
1462 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True)
1463 self.assertFalse("description" in user_cur)
1465 def test_ReplicateMoveObject6(self):
1466 """Verifies how a moved container is replicated between two DCs.
1467 This test should verify that:
1468 - the OU1 is replicated properly
1469 - the OU1 is modified on DC2
1470 - the OU1 is renamed on DC1
1471 - We verify that after replication DC1 -> DC2,
1472 that the OU1 has the correct DN (under OU2), and the description
1475 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
1476 scope=SCOPE_BASE,
1477 attrs=["*", "parentGUID"])
1478 self.assertEqual(len(ldb_res), 1)
1479 ou_orig = ldb_res[0]
1480 ou_dn = ldb_res[0]["dn"]
1482 # check user info on DC1
1483 print("Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])))
1484 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
1486 # trigger replication from DC1 to DC2
1487 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1488 # check user info on DC2 - should still be valid user
1489 ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
1491 new_dn = ldb.Dn(self.ldb_dc1, "OU=%s" % self.ou1_dn.get_component_value(0))
1492 new_dn.add_base(self.ou2_dn)
1493 self.ldb_dc1.rename(ou_dn, new_dn)
1494 ldb_res = self.ldb_dc1.search(base=new_dn,
1495 scope=SCOPE_BASE,
1496 attrs=["*", "parentGUID"])
1497 self.assertEqual(len(ldb_res), 1)
1499 ou_moved_orig = ldb_res[0]
1500 ou_moved_dn = ldb_res[0]["dn"]
1502 # Modify description on DC2. This triggers a replication, but
1503 # not of 'name' and so a bug in Samba regarding the DN.
1504 msg = ldb.Message()
1505 msg.dn = ou_dn
1506 msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description")
1507 self.ldb_dc2.modify(msg)
1509 # trigger replication from DC1 to DC2
1510 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1511 # check user info on DC2 - should still be valid user
1512 ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=False)
1513 self.assertTrue("description" in ou_cur)
1515 # delete OU on DC1
1516 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0]))
1517 # trigger replication from DC2 to DC1
1518 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
1519 # check user info on DC2 - should be deleted user
1520 ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True)
1521 self.assertFalse("description" in ou_cur)
1523 # trigger replication from DC1 to DC2, for cleanup
1524 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1526 # check user info on DC2 - should be deleted user
1527 ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True)
1528 self.assertFalse("description" in ou_cur)
1530 def test_ReplicateMoveObject7(self):
1531 """Verifies how a moved container is replicated between two DCs.
1532 This test should verify that:
1533 - the OU1 is replicated properly
1534 - the OU1 is modified on DC2
1535 - the OU1 is renamed on DC1 to be under OU2
1536 - We verify that after replication DC2 -> DC1,
1537 that the OU1 has the correct DN (under OU2), and the description
1540 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
1541 scope=SCOPE_BASE,
1542 attrs=["*", "parentGUID"])
1543 self.assertEqual(len(ldb_res), 1)
1544 ou_orig = ldb_res[0]
1545 ou_dn = ldb_res[0]["dn"]
1547 # check user info on DC1
1548 print("Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])))
1549 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
1551 # trigger replication from DC1 to DC2
1552 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1553 # check user info on DC2 - should still be valid user
1554 ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
1556 new_dn = ldb.Dn(self.ldb_dc1, "OU=%s" % self.ou1_dn.get_component_value(0))
1557 new_dn.add_base(self.ou2_dn)
1558 self.ldb_dc1.rename(ou_dn, new_dn)
1559 ldb_res = self.ldb_dc1.search(base=new_dn,
1560 scope=SCOPE_BASE,
1561 attrs=["*", "parentGUID"])
1562 self.assertEqual(len(ldb_res), 1)
1564 ou_moved_orig = ldb_res[0]
1565 ou_moved_dn = ldb_res[0]["dn"]
1567 # Modify description on DC2. This triggers a replication, but
1568 # not of 'name' and so a bug in Samba regarding the DN.
1569 msg = ldb.Message()
1570 msg.dn = ou_dn
1571 msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description")
1572 self.ldb_dc2.modify(msg)
1574 # trigger replication from DC2 to DC1
1575 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
1576 # check user info on DC1 - should still be valid user
1577 ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=False)
1578 self.assertTrue("description" in ou_cur)
1580 # delete OU on DC1
1581 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0]))
1582 # trigger replication from DC2 to DC1
1583 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
1584 # check user info on DC2 - should be deleted user
1585 ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True)
1586 self.assertFalse("description" in ou_cur)
1588 # trigger replication from DC1 to DC2, for cleanup
1589 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1591 # check user info on DC2 - should be deleted user
1592 ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True)
1593 self.assertFalse("description" in ou_cur)
1595 def test_ReplicateMoveObject8(self):
1596 """Verifies how a moved container is replicated between two DCs.
1597 This test should verify that:
1598 - the OU1 is replicated properly
1599 - the OU1 is modified on DC2
1600 - the OU1 is renamed on DC1 to OU1-renamed
1601 - We verify that after replication DC1 -> DC2,
1602 that the OU1 has the correct DN (OU1-renamed), and the description
1605 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
1606 scope=SCOPE_BASE,
1607 attrs=["*", "parentGUID"])
1608 self.assertEqual(len(ldb_res), 1)
1609 ou_orig = ldb_res[0]
1610 ou_dn = ldb_res[0]["dn"]
1612 # check user info on DC1
1613 print("Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])))
1614 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
1616 # trigger replication from DC1 to DC2
1617 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1618 # check user info on DC2 - should still be valid user
1619 ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
1621 new_dn = ldb.Dn(self.ldb_dc1, "OU=%s-renamed" % self.ou1_dn.get_component_value(0))
1622 new_dn.add_base(self.ou1_dn.parent())
1623 self.ldb_dc1.rename(ou_dn, new_dn)
1624 ldb_res = self.ldb_dc1.search(base=new_dn,
1625 scope=SCOPE_BASE,
1626 attrs=["*", "parentGUID"])
1627 self.assertEqual(len(ldb_res), 1)
1629 ou_moved_orig = ldb_res[0]
1630 ou_moved_dn = ldb_res[0]["dn"]
1632 # Modify description on DC2. This triggers a replication, but
1633 # not of 'name' and so a bug in Samba regarding the DN.
1634 msg = ldb.Message()
1635 msg.dn = ou_dn
1636 msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description")
1637 self.ldb_dc2.modify(msg)
1639 # trigger replication from DC1 to DC2
1640 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1641 # check user info on DC2 - should still be valid user
1642 ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=False)
1643 self.assertTrue("description" in ou_cur)
1645 # delete OU on DC1
1646 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0]))
1647 # trigger replication from DC2 to DC1
1648 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
1649 # check user info on DC2 - should be deleted user
1650 ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True)
1651 self.assertFalse("description" in ou_cur)
1653 # trigger replication from DC1 to DC2, for cleanup
1654 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1656 # check user info on DC2 - should be deleted user
1657 ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True)
1658 self.assertFalse("description" in ou_cur)
1660 def test_ReplicateMoveObject9(self):
1661 """Verifies how a moved container is replicated between two DCs.
1662 This test should verify that:
1663 - the OU1 is replicated properly
1664 - the OU1 is modified on DC2
1665 - the OU1 is renamed on DC1 to be under OU2
1666 - the OU1 is renamed on DC1 to OU1-renamed
1667 - We verify that after replication DC1 -> DC2,
1668 that the OU1 has the correct DN (OU1-renamed), and the description
1671 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
1672 scope=SCOPE_BASE,
1673 attrs=["*", "parentGUID"])
1674 self.assertEqual(len(ldb_res), 1)
1675 ou_orig = ldb_res[0]
1676 ou_dn = ldb_res[0]["dn"]
1678 # check user info on DC1
1679 print("Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])))
1680 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
1682 # trigger replication from DC1 to DC2
1683 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1684 # check user info on DC2 - should still be valid user
1685 ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
1687 new_dn = ldb.Dn(self.ldb_dc1, "OU=%s-renamed" % self.ou1_dn.get_component_value(0))
1688 new_dn.add_base(self.ou1_dn.parent())
1689 self.ldb_dc1.rename(ou_dn, new_dn)
1690 ldb_res = self.ldb_dc1.search(base=new_dn,
1691 scope=SCOPE_BASE,
1692 attrs=["*", "parentGUID"])
1693 self.assertEqual(len(ldb_res), 1)
1695 ou_moved_orig = ldb_res[0]
1696 ou_moved_dn = ldb_res[0]["dn"]
1698 # Modify description on DC2. This triggers a replication, but
1699 # not of 'name' and so a bug in Samba regarding the DN.
1700 msg = ldb.Message()
1701 msg.dn = ou_dn
1702 msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description")
1703 self.ldb_dc2.modify(msg)
1705 # trigger replication from DC2 to DC1
1706 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
1707 # check user info on DC1 - should still be valid user
1708 ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=False)
1709 self.assertTrue("description" in ou_cur)
1711 # delete OU on DC1
1712 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0]))
1713 # trigger replication from DC2 to DC1
1714 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
1715 # check user info on DC2 - should be deleted user
1716 ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True)
1717 self.assertFalse("description" in ou_cur)
1719 # trigger replication from DC1 to DC2, for cleanup
1720 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1722 # check user info on DC2 - should be deleted user
1723 ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True)
1724 self.assertFalse("description" in ou_cur)
1726 def test_ReplicateMoveObject10(self):
1727 """Verifies how a moved container is replicated between two DCs.
1728 This test should verify that:
1729 - the OU1 is replicated properly
1730 - the OU1 is modified on DC2
1731 - the OU1 is deleted on DC1
1732 - We verify that after replication DC1 -> DC2,
1733 that the OU1 is deleted, and the description has gone away
1736 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
1737 scope=SCOPE_BASE,
1738 attrs=["*", "parentGUID"])
1739 self.assertEqual(len(ldb_res), 1)
1740 ou_orig = ldb_res[0]
1741 ou_dn = ldb_res[0]["dn"]
1743 # check user info on DC1
1744 print("Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])))
1745 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
1747 # trigger replication from DC1 to DC2
1748 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1749 # check user info on DC2 - should still be valid user
1750 ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
1752 # Modify description on DC2. This triggers a replication, but
1753 # not of 'name' and so a bug in Samba regarding the DN.
1754 msg = ldb.Message()
1755 msg.dn = ou_dn
1756 msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description")
1757 self.ldb_dc2.modify(msg)
1759 # delete OU on DC1
1760 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0]))
1761 # trigger replication from DC1 to DC2
1762 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1763 # check user info on DC2 - should be deleted OU
1764 ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_orig, is_deleted=True)
1765 self.assertFalse("description" in ou_cur)
1767 # trigger replication from DC1 to DC2, for cleanup
1768 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
1770 # check user info on DC2 - should be deleted OU
1771 ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=True)
1772 self.assertFalse("description" in ou_cur)
1774 def test_ReplicateMoveObject11(self):
1775 """Verifies how a moved container is replicated between two DCs.
1776 This test should verify that:
1777 - the OU1 is replicated properly
1778 - the OU1 is modified on DC2
1779 - the OU1 is deleted on DC1
1780 - We verify that after replication DC2 -> DC1,
1781 that the OU1 is deleted, and the description has gone away
1784 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
1785 scope=SCOPE_BASE,
1786 attrs=["*", "parentGUID"])
1787 self.assertEqual(len(ldb_res), 1)
1788 ou_orig = ldb_res[0]
1789 ou_dn = ldb_res[0]["dn"]
1791 # check user info on DC1
1792 print("Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])))
1793 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
1795 # trigger replication from DC1 to DC2
1796 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1797 # check user info on DC2 - should still be valid user
1798 ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
1800 # Modify description on DC2. This triggers a replication, but
1801 # not of 'name' and so a bug in Samba regarding the DN.
1802 msg = ldb.Message()
1803 msg.dn = ou_dn
1804 msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description")
1805 self.ldb_dc2.modify(msg)
1807 # delete OU on DC1
1808 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0]))
1809 # trigger replication from DC2 to DC1
1810 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
1811 # check user info on DC2 - should be deleted OU
1812 ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=True)
1813 self.assertFalse("description" in ou_cur)
1815 # trigger replication from DC1 to DC2, for cleanup
1816 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1818 # check user info on DC2 - should be deleted OU
1819 ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_orig, is_deleted=True)
1820 self.assertFalse("description" in ou_cur)
1823 class DrsMoveBetweenTreeOfObjectTestCase(drs_base.DrsBaseTestCase):
1825 def setUp(self):
1826 super(DrsMoveBetweenTreeOfObjectTestCase, self).setUp()
1827 # disable automatic replication temporary
1828 self._disable_all_repl(self.dnsname_dc1)
1829 self._disable_all_repl(self.dnsname_dc2)
1831 self.top_ou = samba.tests.create_test_ou(self.ldb_dc1,
1832 "replica_move")
1834 # make sure DCs are synchronized before the test
1835 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1836 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
1838 self.ou1_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU1")
1839 self.ou1_dn.add_base(self.top_ou)
1840 self.ou1 = {}
1841 self.ou1["dn"] = self.ou1_dn
1842 self.ou1["objectclass"] = "organizationalUnit"
1843 self.ou1["ou"] = self.ou1_dn.get_component_value(0)
1845 self.ou2_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU2,OU=DrsOU1")
1846 self.ou2_dn.add_base(self.top_ou)
1847 self.ou2 = {}
1848 self.ou2["dn"] = self.ou2_dn
1849 self.ou2["objectclass"] = "organizationalUnit"
1850 self.ou2["ou"] = self.ou2_dn.get_component_value(0)
1852 self.ou2b_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU2B,OU=DrsOU1")
1853 self.ou2b_dn.add_base(self.top_ou)
1854 self.ou2b = {}
1855 self.ou2b["dn"] = self.ou2b_dn
1856 self.ou2b["objectclass"] = "organizationalUnit"
1857 self.ou2b["ou"] = self.ou2b_dn.get_component_value(0)
1859 self.ou2c_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU2C,OU=DrsOU1")
1860 self.ou2c_dn.add_base(self.top_ou)
1862 self.ou3_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU3,OU=DrsOU2,OU=DrsOU1")
1863 self.ou3_dn.add_base(self.top_ou)
1864 self.ou3 = {}
1865 self.ou3["dn"] = self.ou3_dn
1866 self.ou3["objectclass"] = "organizationalUnit"
1867 self.ou3["ou"] = self.ou3_dn.get_component_value(0)
1869 self.ou4_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU4,OU=DrsOU3,OU=DrsOU2,OU=DrsOU1")
1870 self.ou4_dn.add_base(self.top_ou)
1871 self.ou4 = {}
1872 self.ou4["dn"] = self.ou4_dn
1873 self.ou4["objectclass"] = "organizationalUnit"
1874 self.ou4["ou"] = self.ou4_dn.get_component_value(0)
1876 self.ou5_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU5,OU=DrsOU4,OU=DrsOU3,OU=DrsOU2,OU=DrsOU1")
1877 self.ou5_dn.add_base(self.top_ou)
1878 self.ou5 = {}
1879 self.ou5["dn"] = self.ou5_dn
1880 self.ou5["objectclass"] = "organizationalUnit"
1881 self.ou5["ou"] = self.ou5_dn.get_component_value(0)
1883 self.ou6_dn = ldb.Dn(self.ldb_dc1, "OU=DrsOU6,OU=DrsOU5,OU=DrsOU4,OU=DrsOU3,OU=DrsOU2,OU=DrsOU1")
1884 self.ou6_dn.add_base(self.top_ou)
1885 self.ou6 = {}
1886 self.ou6["dn"] = self.ou6_dn
1887 self.ou6["objectclass"] = "organizationalUnit"
1888 self.ou6["ou"] = self.ou6_dn.get_component_value(0)
1890 def tearDown(self):
1891 self.ldb_dc1.delete(self.top_ou, ["tree_delete:1"])
1892 self._enable_all_repl(self.dnsname_dc1)
1893 self._enable_all_repl(self.dnsname_dc2)
1894 super(DrsMoveBetweenTreeOfObjectTestCase, self).tearDown()
1896 def _make_username(self):
1897 return "DrsTreeU_" + time.strftime("%s", time.gmtime())
1899 # now also used to check the group
1900 def _check_obj(self, sam_ldb, obj_orig, is_deleted):
1901 # search the user by guid as it may be deleted
1902 guid_str = self._GUID_string(obj_orig["objectGUID"][0])
1903 res = sam_ldb.search(base='<GUID=%s>' % guid_str,
1904 controls=["show_deleted:1"],
1905 attrs=["*", "parentGUID"])
1906 self.assertEqual(len(res), 1)
1907 user_cur = res[0]
1908 cn_orig = str(obj_orig["cn"][0])
1909 cn_cur = str(user_cur["cn"][0])
1910 name_orig = str(obj_orig["name"][0])
1911 name_cur = str(user_cur["name"][0])
1912 dn_orig = obj_orig["dn"]
1913 dn_cur = user_cur["dn"]
1914 # now check properties of the user
1915 if is_deleted:
1916 self.assertTrue("isDeleted" in user_cur)
1917 self.assertEqual(cn_cur.split('\n')[0], cn_orig)
1918 self.assertEqual(name_cur.split('\n')[0], name_orig)
1919 self.assertEqual(dn_cur.get_rdn_value().split('\n')[0],
1920 dn_orig.get_rdn_value())
1921 self.assertEqual(name_cur, cn_cur)
1922 else:
1923 self.assertFalse("isDeleted" in user_cur)
1924 self.assertEqual(cn_cur, cn_orig)
1925 self.assertEqual(name_cur, name_orig)
1926 self.assertEqual(dn_cur, dn_orig)
1927 self.assertEqual(name_cur, cn_cur)
1928 self.assertEqual(name_cur, user_cur.dn.get_rdn_value())
1930 return user_cur
1932 def test_ReplicateMoveInTree1(self):
1933 """Verifies how an object is replicated between two DCs.
1934 This test should verify that:
1935 - a complex OU tree can be replicated correctly
1936 - the user is in the correct spot (renamed into) within the tree
1937 on both DCs
1939 # work-out unique username to test with
1940 username = self._make_username()
1942 self.ldb_dc1.add(self.ou1)
1944 # create user on DC1
1945 self.ldb_dc1.newuser(username=username,
1946 userou="ou=%s,ou=%s"
1947 % (self.ou1_dn.get_component_value(0),
1948 self.top_ou.get_component_value(0)),
1949 password=None, setpassword=False)
1950 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
1951 scope=SCOPE_SUBTREE,
1952 expression="(samAccountName=%s)" % username)
1953 self.assertEqual(len(ldb_res), 1)
1954 user_orig = ldb_res[0]
1955 user_dn = ldb_res[0]["dn"]
1957 # check user info on DC1
1958 print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])))
1959 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
1961 self.ldb_dc1.add(self.ou2)
1962 self.ldb_dc1.add(self.ou3)
1963 self.ldb_dc1.add(self.ou4)
1964 self.ldb_dc1.add(self.ou5)
1966 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
1967 new_dn.add_base(self.ou5_dn)
1968 self.ldb_dc1.rename(user_dn, new_dn)
1969 ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
1970 scope=SCOPE_SUBTREE,
1971 expression="(samAccountName=%s)" % username)
1972 self.assertEqual(len(ldb_res), 1)
1974 user_moved_orig = ldb_res[0]
1975 user_moved_dn = ldb_res[0]["dn"]
1977 # trigger replication from DC1 to DC2
1978 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1979 # check user info on DC2 - should be valid user
1980 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False)
1982 # delete user on DC1
1983 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
1985 # trigger replication from DC1 to DC2, for cleanup
1986 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
1988 def test_ReplicateMoveInTree2(self):
1989 """Verifies how an object is replicated between two DCs.
1990 This test should verify that:
1991 - a complex OU tree can be replicated correctly
1992 - the user is in the correct spot (renamed into) within the tree
1993 on both DCs
1994 - that a rename back works correctly, and is replicated
1996 # work-out unique username to test with
1997 username = self._make_username()
1999 self.ldb_dc1.add(self.ou1)
2001 # create user on DC1
2002 self.ldb_dc1.newuser(username=username,
2003 userou="ou=%s,ou=%s"
2004 % (self.ou1_dn.get_component_value(0),
2005 self.top_ou.get_component_value(0)),
2006 password=None, setpassword=False)
2007 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
2008 scope=SCOPE_SUBTREE,
2009 expression="(samAccountName=%s)" % username)
2010 self.assertEqual(len(ldb_res), 1)
2011 user_orig = ldb_res[0]
2012 user_dn = ldb_res[0]["dn"]
2014 # check user info on DC1
2015 print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])))
2016 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
2018 self.ldb_dc1.add(self.ou2)
2019 self.ldb_dc1.add(self.ou2b)
2020 self.ldb_dc1.add(self.ou3)
2022 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
2023 new_dn.add_base(self.ou3_dn)
2024 self.ldb_dc1.rename(user_dn, new_dn)
2026 new_dn3 = ldb.Dn(self.ldb_dc1, "OU=%s" % self.ou3_dn.get_component_value(0))
2027 new_dn3.add_base(self.ou2b_dn)
2028 self.ldb_dc1.rename(self.ou3_dn, new_dn3)
2030 ldb_res = self.ldb_dc1.search(base=new_dn3,
2031 scope=SCOPE_SUBTREE,
2032 expression="(samAccountName=%s)" % username)
2033 self.assertEqual(len(ldb_res), 1)
2035 user_moved_orig = ldb_res[0]
2036 user_moved_dn = ldb_res[0]["dn"]
2038 # trigger replication from DC1 to DC2
2039 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2040 # check user info on DC2 - should be valid user
2041 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False)
2043 # Rename on DC1
2044 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
2045 new_dn.add_base(self.ou1_dn)
2046 self.ldb_dc1.rename(user_moved_dn, new_dn)
2048 # Modify description on DC2
2049 msg = ldb.Message()
2050 msg.dn = user_moved_dn
2051 msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description")
2052 self.ldb_dc2.modify(msg)
2054 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
2055 scope=SCOPE_SUBTREE,
2056 expression="(samAccountName=%s)" % username)
2057 self.assertEqual(len(ldb_res), 1)
2059 user_moved_orig = ldb_res[0]
2060 user_moved_dn = ldb_res[0]["dn"]
2062 # trigger replication from DC1 to DC2
2063 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2064 # check user info on DC2 - should be valid user
2065 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False)
2066 self.assertTrue("description" in user_cur)
2068 # delete user on DC1
2069 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
2071 # trigger replication from DC2 to DC1
2072 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
2073 # check user info on DC1 - should be deleted user
2074 user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True)
2075 self.assertFalse("description" in user_cur)
2077 # trigger replication from DC1 to DC2, for cleanup
2078 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2079 # check user info on DC2 - should be deleted user
2080 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True)
2081 self.assertFalse("description" in user_cur)
2083 def test_ReplicateMoveInTree3(self):
2084 """Verifies how an object is replicated between two DCs.
2085 This test should verify that:
2086 - a complex OU tree can be replicated correctly
2087 - the user is in the correct spot (renamed into) within the tree
2088 on both DCs
2089 - that a rename back works correctly, and is replicated
2091 # work-out unique username to test with
2092 username = self._make_username()
2094 self.ldb_dc1.add(self.ou1)
2096 # create user on DC1
2097 self.ldb_dc1.newuser(username=username,
2098 userou="ou=%s,ou=%s"
2099 % (self.ou1_dn.get_component_value(0),
2100 self.top_ou.get_component_value(0)),
2101 password=None, setpassword=False)
2102 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
2103 scope=SCOPE_SUBTREE,
2104 expression="(samAccountName=%s)" % username)
2105 self.assertEqual(len(ldb_res), 1)
2106 user_orig = ldb_res[0]
2107 user_dn = ldb_res[0]["dn"]
2109 # check user info on DC1
2110 print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])))
2111 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
2113 self.ldb_dc1.add(self.ou2)
2114 self.ldb_dc1.add(self.ou2b)
2115 self.ldb_dc1.add(self.ou3)
2117 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
2118 new_dn.add_base(self.ou3_dn)
2119 self.ldb_dc1.rename(user_dn, new_dn)
2121 new_dn3 = ldb.Dn(self.ldb_dc1, "OU=%s" % self.ou3_dn.get_component_value(0))
2122 new_dn3.add_base(self.ou2b_dn)
2123 self.ldb_dc1.rename(self.ou3_dn, new_dn3)
2125 ldb_res = self.ldb_dc1.search(base=new_dn3,
2126 scope=SCOPE_SUBTREE,
2127 expression="(samAccountName=%s)" % username)
2128 self.assertEqual(len(ldb_res), 1)
2130 user_moved_orig = ldb_res[0]
2131 user_moved_dn = ldb_res[0]["dn"]
2133 # trigger replication from DC1 to DC2
2134 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2135 # check user info on DC2 - should be valid user
2136 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False)
2138 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
2139 new_dn.add_base(self.ou2_dn)
2140 self.ldb_dc1.rename(user_moved_dn, new_dn)
2142 self.ldb_dc1.rename(self.ou2_dn, self.ou2c_dn)
2143 self.ldb_dc1.rename(self.ou2b_dn, self.ou2_dn)
2144 self.ldb_dc1.rename(self.ou2c_dn, self.ou2b_dn)
2146 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
2147 scope=SCOPE_SUBTREE,
2148 expression="(samAccountName=%s)" % username,
2149 attrs=["*", "parentGUID"])
2150 self.assertEqual(len(ldb_res), 1)
2152 user_moved_orig = ldb_res[0]
2153 user_moved_dn = ldb_res[0]["dn"]
2155 # trigger replication from DC1 to DC2
2156 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2157 # check user info on DC2 - should be valid user
2158 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False)
2160 self.assertEqual(user_cur["parentGUID"], user_moved_orig["parentGUID"])
2162 # delete user on DC1
2163 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
2165 # trigger replication from DC1 to DC2, for cleanup
2166 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2168 def test_ReplicateMoveInTree3b(self):
2169 """Verifies how an object is replicated between two DCs.
2170 This test should verify that:
2171 - a complex OU tree can be replicated correctly
2172 - the user is in the correct spot (renamed into) within the tree
2173 on both DCs
2174 - that a rename back works correctly, and is replicated
2175 - that a complex rename suffle, combined with unrelated changes to the object,
2176 is replicated correctly. The aim here is the send the objects out-of-order
2177 when sorted by usnChanged.
2178 - confirm that the OU tree and (in particular the user DN) is identical between
2179 the DCs once this has been replicated.
2181 # work-out unique username to test with
2182 username = self._make_username()
2184 self.ldb_dc1.add(self.ou1)
2186 # create user on DC1
2187 self.ldb_dc1.newuser(username=username,
2188 userou="ou=%s,ou=%s"
2189 % (self.ou1_dn.get_component_value(0),
2190 self.top_ou.get_component_value(0)),
2191 password=None, setpassword=False)
2192 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
2193 scope=SCOPE_SUBTREE,
2194 expression="(samAccountName=%s)" % username)
2195 self.assertEqual(len(ldb_res), 1)
2196 user_orig = ldb_res[0]
2197 user_dn = ldb_res[0]["dn"]
2199 # check user info on DC1
2200 print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])))
2201 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
2203 self.ldb_dc1.add(self.ou2)
2204 self.ldb_dc1.add(self.ou2b)
2205 self.ldb_dc1.add(self.ou3)
2207 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
2208 new_dn.add_base(self.ou2_dn)
2209 self.ldb_dc1.rename(user_dn, new_dn)
2211 ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
2212 scope=SCOPE_SUBTREE,
2213 expression="(samAccountName=%s)" % username)
2214 self.assertEqual(len(ldb_res), 1)
2216 user_moved_orig = ldb_res[0]
2217 user_moved_dn = ldb_res[0]["dn"]
2219 # trigger replication from DC1 to DC2
2220 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2221 # check user info on DC2 - should be valid user
2222 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False)
2224 msg = ldb.Message()
2225 msg.dn = new_dn
2226 msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description")
2227 self.ldb_dc1.modify(msg)
2229 # The sleep(1) calls here ensure that the name objects get a
2230 # new 1-sec based timestamp, and so we select how the conflict
2231 # resolution resolves.
2232 self.ldb_dc1.rename(self.ou2_dn, self.ou2c_dn)
2233 time.sleep(1)
2234 self.ldb_dc1.rename(self.ou2b_dn, self.ou2_dn)
2235 time.sleep(1)
2236 self.ldb_dc1.rename(self.ou2c_dn, self.ou2b_dn)
2238 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
2239 new_dn.add_base(self.ou2_dn)
2240 self.ldb_dc1.rename('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]), new_dn)
2242 msg = ldb.Message()
2243 msg.dn = self.ou2_dn
2244 msg["description"] = ldb.MessageElement("OU2 Description", ldb.FLAG_MOD_REPLACE, "description")
2245 self.ldb_dc1.modify(msg)
2247 msg = ldb.Message()
2248 msg.dn = self.ou2b_dn
2249 msg["description"] = ldb.MessageElement("OU2b Description", ldb.FLAG_MOD_REPLACE, "description")
2250 self.ldb_dc1.modify(msg)
2252 ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
2253 scope=SCOPE_SUBTREE,
2254 expression="(samAccountName=%s)" % username,
2255 attrs=["*", "parentGUID"])
2256 self.assertEqual(len(ldb_res), 1)
2258 user_moved_orig = ldb_res[0]
2259 user_moved_dn = ldb_res[0]["dn"]
2261 # trigger replication from DC1 to DC2
2262 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2263 # check user info on DC2 - should be valid user
2264 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False)
2265 self.assertEqual(user_cur["parentGUID"][0], user_moved_orig["parentGUID"][0])
2267 # delete user on DC1
2268 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
2270 # trigger replication from DC1 to DC2, for cleanup
2271 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2273 def test_ReplicateMoveInTree4(self):
2274 """Verifies how an object is replicated between two DCs.
2275 This test should verify that:
2276 - an OU and user can be replicated correctly, even after a rename
2277 - The creation and rename of the OU has been combined with unrelated changes to the object,
2278 The aim here is the send the objects out-of-order when sorted by usnChanged.
2279 - That is, the OU will be sorted by usnChanged after the user that is within that OU.
2280 - That will cause the client to need to get the OU first, by use of the GET_ANC flag
2282 # work-out unique username to test with
2283 username = self._make_username()
2285 self.ldb_dc1.add(self.ou1)
2287 # create user on DC1
2288 self.ldb_dc1.newuser(username=username,
2289 userou="ou=%s,ou=%s"
2290 % (self.ou1_dn.get_component_value(0),
2291 self.top_ou.get_component_value(0)),
2292 password=None, setpassword=False)
2293 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
2294 scope=SCOPE_SUBTREE,
2295 expression="(samAccountName=%s)" % username)
2296 self.assertEqual(len(ldb_res), 1)
2297 user_orig = ldb_res[0]
2298 user_dn = ldb_res[0]["dn"]
2300 # check user info on DC1
2301 print("Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])))
2302 self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
2304 self.ldb_dc1.add(self.ou2)
2306 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
2307 new_dn.add_base(self.ou2_dn)
2308 self.ldb_dc1.rename(user_dn, new_dn)
2310 msg = ldb.Message()
2311 msg.dn = self.ou2_dn
2312 msg["description"] = ldb.MessageElement("OU2 Description", ldb.FLAG_MOD_REPLACE, "description")
2313 self.ldb_dc1.modify(msg)
2315 ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
2316 scope=SCOPE_SUBTREE,
2317 expression="(samAccountName=%s)" % username)
2318 self.assertEqual(len(ldb_res), 1)
2320 user_moved_orig = ldb_res[0]
2321 user_moved_dn = ldb_res[0]["dn"]
2323 # trigger replication from DC1 to DC2
2324 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2325 # check user info on DC2 - should be valid user
2326 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False)
2328 # delete user on DC1
2329 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
2331 # trigger replication from DC1 to DC2, for cleanup
2332 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2334 def test_ReplicateAddInOU(self):
2335 """Verifies how an object is replicated between two DCs.
2336 This test should verify that:
2337 - an OU and user can be replicated correctly
2338 - The creation of the OU has been combined with unrelated changes to the object,
2339 The aim here is the send the objects out-of-order when sorted by usnChanged.
2340 - That is, the OU will be sorted by usnChanged after the user that is within that OU.
2341 - That will cause the client to need to get the OU first, by use of the GET_ANC flag
2343 # work-out unique username to test with
2344 username = self._make_username()
2346 self.ldb_dc1.add(self.ou1)
2348 # create user on DC1
2349 self.ldb_dc1.newuser(username=username,
2350 userou="ou=%s,ou=%s"
2351 % (self.ou1_dn.get_component_value(0),
2352 self.top_ou.get_component_value(0)),
2353 password=None, setpassword=False)
2354 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
2355 scope=SCOPE_SUBTREE,
2356 expression="(samAccountName=%s)" % username,
2357 attrs=["*", "parentGUID"])
2358 self.assertEqual(len(ldb_res), 1)
2359 user_orig = ldb_res[0]
2360 user_dn = ldb_res[0]["dn"]
2362 msg = ldb.Message()
2363 msg.dn = self.ou1_dn
2364 msg["description"] = ldb.MessageElement("OU1 Description", ldb.FLAG_MOD_REPLACE, "description")
2365 self.ldb_dc1.modify(msg)
2367 # trigger replication from DC1 to DC2
2368 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2369 # check user info on DC2 - should be valid user
2370 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=False)
2372 self.assertEqual(user_cur["parentGUID"], user_orig["parentGUID"])
2374 # delete user on DC1
2375 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
2377 # trigger replication from DC1 to DC2, for cleanup
2378 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2380 def test_ReplicateAddInMovedOU(self):
2381 """Verifies how an object is replicated between two DCs.
2382 This test should verify that:
2383 - an OU and user can be replicated correctly
2384 - The creation of the OU has been combined with unrelated changes to the object,
2385 The aim here is the send the objects out-of-order when sorted by usnChanged.
2386 - That is, the OU will be sorted by usnChanged after the user that is within that OU.
2387 - That will cause the client to need to get the OU first, by use of the GET_ANC flag
2389 # work-out unique username to test with
2390 username = self._make_username()
2392 self.ldb_dc1.add(self.ou1)
2393 self.ldb_dc1.add(self.ou2)
2395 # create user on DC1
2396 self.ldb_dc1.newuser(username=username,
2397 userou="ou=%s,ou=%s"
2398 % (self.ou1_dn.get_component_value(0),
2399 self.top_ou.get_component_value(0)),
2400 password=None, setpassword=False)
2401 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
2402 scope=SCOPE_SUBTREE,
2403 expression="(samAccountName=%s)" % username,
2404 attrs=["*", "parentGUID"])
2405 self.assertEqual(len(ldb_res), 1)
2406 user_orig = ldb_res[0]
2407 user_dn = ldb_res[0]["dn"]
2409 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
2410 new_dn.add_base(self.ou2_dn)
2411 self.ldb_dc1.rename(user_dn, new_dn)
2413 self.ldb_dc1.rename(self.ou2_dn, self.ou2b_dn)
2415 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
2416 scope=SCOPE_SUBTREE,
2417 expression="(samAccountName=%s)" % username,
2418 attrs=["*", "parentGUID"])
2419 self.assertEqual(len(ldb_res), 1)
2420 user_moved = ldb_res[0]
2421 user_moved_dn = ldb_res[0]["dn"]
2423 # trigger replication from DC1 to DC2
2424 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2425 # check user info on DC2 - should be valid user
2426 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved, is_deleted=False)
2428 self.assertEqual(user_cur["parentGUID"], user_moved["parentGUID"])
2430 # delete user on DC1
2431 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
2433 # trigger replication from DC1 to DC2, for cleanup
2434 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2436 def test_ReplicateAddInConflictOU_time(self):
2437 """Verifies how an object is replicated between two DCs, when created in an ambigious location
2438 This test should verify that:
2439 - Without replication, two conflicting objects can be created
2440 - force the conflict resolution algorithm so we know which copy will win
2441 (by sleeping while creating the objects, therefore increasing that timestamp on 'name')
2442 - confirm that the user object, created on DC1, ends up in the right place on DC2
2443 - therefore confirm that the conflict algorithm worked correctly, and that parentGUID was used.
2446 # work-out unique username to test with
2447 username = self._make_username()
2449 self.ldb_dc1.add(self.ou1)
2451 # create user on DC1
2452 self.ldb_dc1.newuser(username=username,
2453 userou="ou=%s,ou=%s"
2454 % (self.ou1_dn.get_component_value(0),
2455 self.top_ou.get_component_value(0)),
2456 password=None, setpassword=False)
2457 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
2458 scope=SCOPE_SUBTREE,
2459 expression="(samAccountName=%s)" % username,
2460 attrs=["*", "parentGUID"])
2461 self.assertEqual(len(ldb_res), 1)
2462 user_orig = ldb_res[0]
2463 user_dn = ldb_res[0]["dn"]
2465 # trigger replication from DC1 to DC2
2466 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2468 # Now create two, conflicting objects. This gives the user
2469 # object something to be under on both DCs.
2471 # We sleep between the two adds so that DC1 adds second, and
2472 # so wins the conflict resoution due to a later creation time
2473 # (modification timestamp on the name attribute).
2474 self.ldb_dc2.add(self.ou2)
2475 time.sleep(1)
2476 self.ldb_dc1.add(self.ou2)
2478 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
2479 new_dn.add_base(self.ou2_dn)
2480 self.ldb_dc1.rename(user_dn, new_dn)
2482 # Now that we have renamed the user (and so bumpted the
2483 # usnChanged), bump the value on the OUs.
2484 msg = ldb.Message()
2485 msg.dn = self.ou2_dn
2486 msg["description"] = ldb.MessageElement("OU2 Description", ldb.FLAG_MOD_REPLACE, "description")
2487 self.ldb_dc1.modify(msg)
2489 msg = ldb.Message()
2490 msg.dn = self.ou2_dn
2491 msg["description"] = ldb.MessageElement("OU2 Description", ldb.FLAG_MOD_REPLACE, "description")
2492 self.ldb_dc2.modify(msg)
2494 # trigger replication from DC1 to DC2
2495 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2496 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
2497 scope=SCOPE_SUBTREE,
2498 expression="(samAccountName=%s)" % username,
2499 attrs=["*", "parentGUID"])
2500 self.assertEqual(len(ldb_res), 1)
2501 user_moved = ldb_res[0]
2502 user_moved_dn = ldb_res[0]["dn"]
2504 # trigger replication from DC1 to DC2
2505 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2506 # check user info on DC2 - should be under the OU2 from DC1
2507 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved, is_deleted=False)
2509 self.assertEqual(user_cur["parentGUID"], user_moved["parentGUID"])
2511 # delete user on DC1
2512 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
2514 # trigger replication from DC1 to DC2, for cleanup
2515 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2517 def test_ReplicateAddInConflictOU2(self):
2518 """Verifies how an object is replicated between two DCs, when created in an ambigious location
2519 This test should verify that:
2520 - Without replication, two conflicting objects can be created
2521 - force the conflict resolution algorithm so we know which copy will win
2522 (by changing the description twice, therefore increasing that version count)
2523 - confirm that the user object, created on DC1, ends up in the right place on DC2
2524 - therefore confirm that the conflict algorithm worked correctly, and that parentGUID was used.
2526 # work-out unique username to test with
2527 username = self._make_username()
2529 self.ldb_dc1.add(self.ou1)
2531 # create user on DC1
2532 self.ldb_dc1.newuser(username=username,
2533 userou="ou=%s,ou=%s"
2534 % (self.ou1_dn.get_component_value(0),
2535 self.top_ou.get_component_value(0)),
2536 password=None, setpassword=False)
2537 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
2538 scope=SCOPE_SUBTREE,
2539 expression="(samAccountName=%s)" % username,
2540 attrs=["*", "parentGUID"])
2541 self.assertEqual(len(ldb_res), 1)
2542 user_orig = ldb_res[0]
2543 user_dn = ldb_res[0]["dn"]
2545 # trigger replication from DC1 to DC2
2546 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2548 # Now create two, conflicting objects. This gives the user
2549 # object something to be under on both DCs. We create it on
2550 # DC1 1sec later so that it will win the conflict resolution.
2552 self.ldb_dc2.add(self.ou2)
2553 time.sleep(1)
2554 self.ldb_dc1.add(self.ou2)
2556 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
2557 new_dn.add_base(self.ou2_dn)
2558 self.ldb_dc1.rename(user_dn, new_dn)
2560 # Now that we have renamed the user (and so bumpted the
2561 # usnChanged), bump the value on the OUs.
2562 msg = ldb.Message()
2563 msg.dn = self.ou2_dn
2564 msg["description"] = ldb.MessageElement("OU2 Description", ldb.FLAG_MOD_REPLACE, "description")
2565 self.ldb_dc1.modify(msg)
2567 msg = ldb.Message()
2568 msg.dn = self.ou2_dn
2569 msg["description"] = ldb.MessageElement("OU2 Description", ldb.FLAG_MOD_REPLACE, "description")
2570 self.ldb_dc2.modify(msg)
2572 # trigger replication from DC1 to DC2
2573 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2574 ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
2575 scope=SCOPE_SUBTREE,
2576 expression="(samAccountName=%s)" % username,
2577 attrs=["*", "parentGUID"])
2578 self.assertEqual(len(ldb_res), 1)
2579 user_moved = ldb_res[0]
2580 user_moved_dn = ldb_res[0]["dn"]
2582 # trigger replication from DC1 to DC2
2583 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
2584 # check user info on DC2 - should be under the OU2 from DC1
2585 user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved, is_deleted=False)
2587 self.assertEqual(user_cur["parentGUID"], user_moved["parentGUID"])
2589 # delete user on DC1
2590 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
2592 # trigger replication from DC1 to DC2, for cleanup
2593 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)