PY3: change shebang to python3 in source4/torture dir
[Samba.git] / source4 / torture / drs / python / replica_sync.py
blob03292b5780584b6dea70da71130be4666cea7c5b
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
4 # Tests various schema replication scenarios
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 # Usage:
24 # export DC1=dc1_dns_name
25 # export DC2=dc2_dns_name
26 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN replica_sync -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
30 from __future__ import print_function
31 import drs_base
32 import samba.tests
33 import time
34 import ldb
36 from ldb import (
37 SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT)
40 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
41 """Intended as a black box test case for DsReplicaSync
42 implementation. It should test the behavior of this
43 case in cases when inbound replication is disabled"""
45 def setUp(self):
46 super(DrsReplicaSyncTestCase, self).setUp()
48 # This OU avoids this test conflicting with anything
49 # that may already be in the DB
50 self.top_ou = samba.tests.create_test_ou(self.ldb_dc1,
51 "replica_sync")
52 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
53 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
54 self.ou1 = None
55 self.ou2 = None
57 def tearDown(self):
58 self._cleanup_object(self.ou1)
59 self._cleanup_object(self.ou2)
60 self._cleanup_dn(self.top_ou)
62 # re-enable replication
63 self._enable_inbound_repl(self.dnsname_dc1)
64 self._enable_inbound_repl(self.dnsname_dc2)
66 super(DrsReplicaSyncTestCase, self).tearDown()
68 def _cleanup_dn(self, dn):
69 try:
70 self.ldb_dc2.delete(dn, ["tree_delete:1"])
71 except LdbError as e:
72 (num, _) = e.args
73 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
74 try:
75 self.ldb_dc1.delete(dn, ["tree_delete:1"])
76 except LdbError as e1:
77 (num, _) = e1.args
78 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
80 def _cleanup_object(self, guid):
81 """Cleans up a test object, if it still exists"""
82 if guid is not None:
83 self._cleanup_dn('<GUID=%s>' % guid)
85 def test_ReplEnabled(self):
86 """Tests we can replicate when replication is enabled"""
87 self._enable_inbound_repl(self.dnsname_dc1)
88 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=False)
90 def test_ReplDisabled(self):
91 """Tests we cann't replicate when replication is disabled"""
92 self._disable_inbound_repl(self.dnsname_dc1)
94 ccache_name = self.get_creds_ccache_name()
96 # Tunnel the command line credentials down to the
97 # subcommand to avoid a new kinit
98 cmdline_auth = "--krb5-ccache=%s" % ccache_name
100 # bin/samba-tool drs <drs_command> <cmdline_auth>
101 cmd_list = ["drs", "replicate", cmdline_auth]
103 nc_dn = self.domain_dn
104 # bin/samba-tool drs replicate <Dest_DC_NAME> <Src_DC_NAME> <Naming Context>
105 cmd_list += [self.dnsname_dc1, self.dnsname_dc2, nc_dn]
107 (result, out, err) = self.runsubcmd(*cmd_list)
108 self.assertCmdFail(result)
109 self.assertTrue('WERR_DS_DRA_SINK_DISABLED' in err)
111 def test_ReplDisabledForced(self):
112 """Tests we can force replicate when replication is disabled"""
113 self._disable_inbound_repl(self.dnsname_dc1)
114 out = self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
116 def test_ReplLocal(self):
117 """Tests we can replicate direct to the local db"""
118 self._enable_inbound_repl(self.dnsname_dc1)
119 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=False, local=True, full_sync=True)
121 def _create_ou(self, samdb, name):
122 ldif = """
123 dn: %s,%s
124 objectClass: organizationalUnit
125 """ % (name, self.top_ou)
126 samdb.add_ldif(ldif)
127 res = samdb.search(base="%s,%s" % (name, self.top_ou),
128 scope=SCOPE_BASE, attrs=["objectGUID"])
129 return self._GUID_string(res[0]["objectGUID"][0])
131 def _check_deleted(self, sam_ldb, guid):
132 # search the user by guid as it may be deleted
133 res = sam_ldb.search(base='<GUID=%s>' % guid,
134 controls=["show_deleted:1"],
135 attrs=["isDeleted", "objectCategory", "ou"])
136 self.assertEquals(len(res), 1)
137 ou_cur = res[0]
138 # Deleted Object base DN
139 dodn = self._deleted_objects_dn(sam_ldb)
140 # now check properties of the user
141 name_cur = ou_cur["ou"][0]
142 self.assertEquals(ou_cur["isDeleted"][0], b"TRUE")
143 self.assertTrue(not(b"objectCategory" in ou_cur))
144 self.assertTrue(dodn in str(ou_cur["dn"]),
145 "OU %s is deleted but it is not located under %s!" % (name_cur, dodn))
147 def test_ReplConflictsFullSync(self):
148 """Tests that objects created in conflict become conflict DNs (honour full sync override)"""
150 # First confirm local replication (so when we test against windows, this fails fast without creating objects)
151 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, local=True, forced=True, full_sync=True)
153 self._disable_inbound_repl(self.dnsname_dc1)
154 self._disable_inbound_repl(self.dnsname_dc2)
156 # Create conflicting objects on DC1 and DC2, with DC1 object created first
157 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Full Sync")
158 # We have to sleep to ensure that the two objects have different timestamps
159 time.sleep(1)
160 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Full Sync")
162 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, local=True, forced=True, full_sync=True)
164 # Check that DC2 got the DC1 object, and OU1 was make into conflict
165 res1 = self.ldb_dc2.search(base="<GUID=%s>" % self.ou1,
166 scope=SCOPE_BASE, attrs=["name"])
167 res2 = self.ldb_dc2.search(base="<GUID=%s>" % self.ou2,
168 scope=SCOPE_BASE, attrs=["name"])
169 print(res1[0]["name"][0])
170 print(res2[0]["name"][0])
171 self.assertFalse('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
172 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
173 self.assertTrue(self._lost_and_found_dn(self.ldb_dc2, self.domain_dn) not in str(res1[0].dn))
174 self.assertTrue(self._lost_and_found_dn(self.ldb_dc2, self.domain_dn) not in str(res2[0].dn))
175 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
176 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
178 # Delete both objects by GUID on DC2
180 self.ldb_dc2.delete('<GUID=%s>' % self.ou1)
181 self.ldb_dc2.delete('<GUID=%s>' % self.ou2)
183 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=True)
185 self._check_deleted(self.ldb_dc1, self.ou1)
186 self._check_deleted(self.ldb_dc1, self.ou2)
187 # Check deleted on DC2
188 self._check_deleted(self.ldb_dc2, self.ou1)
189 self._check_deleted(self.ldb_dc2, self.ou2)
191 def test_ReplConflictsRemoteWin(self):
192 """Tests that objects created in conflict become conflict DNs"""
193 self._disable_inbound_repl(self.dnsname_dc1)
194 self._disable_inbound_repl(self.dnsname_dc2)
196 # Create conflicting objects on DC1 and DC2, with DC1 object created first
197 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Remote Conflict")
198 # We have to sleep to ensure that the two objects have different timestamps
199 time.sleep(1)
200 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Remote Conflict")
202 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
204 # Check that DC2 got the DC1 object, and OU1 was make into conflict
205 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
206 scope=SCOPE_BASE, attrs=["name"])
207 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
208 scope=SCOPE_BASE, attrs=["name"])
209 print(res1[0]["name"][0])
210 print(res2[0]["name"][0])
211 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
212 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
213 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
214 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
215 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
217 # Delete both objects by GUID on DC1
219 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
220 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
222 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
224 self._check_deleted(self.ldb_dc1, self.ou1)
225 self._check_deleted(self.ldb_dc1, self.ou2)
226 # Check deleted on DC2
227 self._check_deleted(self.ldb_dc2, self.ou1)
228 self._check_deleted(self.ldb_dc2, self.ou2)
230 def test_ReplConflictsLocalWin(self):
231 """Tests that objects created in conflict become conflict DNs"""
232 self._disable_inbound_repl(self.dnsname_dc1)
233 self._disable_inbound_repl(self.dnsname_dc2)
235 # Create conflicting objects on DC1 and DC2, with DC2 object created first
236 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Local Conflict")
237 # We have to sleep to ensure that the two objects have different timestamps
238 time.sleep(1)
239 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Local Conflict")
241 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
243 # Check that DC2 got the DC1 object, and OU2 was make into conflict
244 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
245 scope=SCOPE_BASE, attrs=["name"])
246 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
247 scope=SCOPE_BASE, attrs=["name"])
248 print(res1[0]["name"][0])
249 print(res2[0]["name"][0])
250 self.assertTrue('CNF:%s' % self.ou2 in str(res2[0]["name"][0]), "Got %s for %s" % (str(res2[0]["name"][0]), self.ou2))
251 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
252 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
253 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
254 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
256 # Delete both objects by GUID on DC1
258 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
259 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
261 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
263 self._check_deleted(self.ldb_dc1, self.ou1)
264 self._check_deleted(self.ldb_dc1, self.ou2)
265 # Check deleted on DC2
266 self._check_deleted(self.ldb_dc2, self.ou1)
267 self._check_deleted(self.ldb_dc2, self.ou2)
269 def test_ReplConflictsRemoteWin_with_child(self):
270 """Tests that objects created in conflict become conflict DNs"""
271 self._disable_inbound_repl(self.dnsname_dc1)
272 self._disable_inbound_repl(self.dnsname_dc2)
274 # Create conflicting objects on DC1 and DC2, with DC1 object created first
275 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Parent Remote Conflict")
276 # We have to sleep to ensure that the two objects have different timestamps
277 time.sleep(1)
278 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Parent Remote Conflict")
279 # Create children on DC2
280 ou1_child = self._create_ou(self.ldb_dc1, "OU=Test Child,OU=Test Parent Remote Conflict")
281 ou2_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Test Parent Remote Conflict")
283 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
285 # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
286 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
287 scope=SCOPE_BASE, attrs=["name"])
288 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
289 scope=SCOPE_BASE, attrs=["name"])
290 print(res1[0]["name"][0])
291 print(res2[0]["name"][0])
292 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
293 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
294 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
295 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
296 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
298 # Delete both objects by GUID on DC1
300 self.ldb_dc1.delete('<GUID=%s>' % self.ou1, ["tree_delete:1"])
301 self.ldb_dc1.delete('<GUID=%s>' % self.ou2, ["tree_delete:1"])
303 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
305 self._check_deleted(self.ldb_dc1, self.ou1)
306 self._check_deleted(self.ldb_dc1, self.ou2)
307 # Check deleted on DC2
308 self._check_deleted(self.ldb_dc2, self.ou1)
309 self._check_deleted(self.ldb_dc2, self.ou2)
311 self._check_deleted(self.ldb_dc1, ou1_child)
312 self._check_deleted(self.ldb_dc1, ou2_child)
313 # Check deleted on DC2
314 self._check_deleted(self.ldb_dc2, ou1_child)
315 self._check_deleted(self.ldb_dc2, ou2_child)
317 def test_ReplConflictsRenamedVsNewRemoteWin(self):
318 """Tests resolving a DN conflict between a renamed object and a new object"""
319 self._disable_inbound_repl(self.dnsname_dc1)
320 self._disable_inbound_repl(self.dnsname_dc2)
322 # Create an OU and rename it on DC1
323 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Remote Rename Conflict orig")
324 self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Remote Rename Conflict,%s" % self.top_ou)
326 # We have to sleep to ensure that the two objects have different timestamps
327 time.sleep(1)
329 # create a conflicting object with the same DN on DC2
330 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Remote Rename Conflict")
332 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
334 # Check that DC2 got the DC1 object, and SELF.OU1 was made into conflict
335 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
336 scope=SCOPE_BASE, attrs=["name"])
337 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
338 scope=SCOPE_BASE, attrs=["name"])
339 print(res1[0]["name"][0])
340 print(res2[0]["name"][0])
341 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
342 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
343 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
344 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
345 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
347 # Delete both objects by GUID on DC1
348 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
349 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
351 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
353 self._check_deleted(self.ldb_dc1, self.ou1)
354 self._check_deleted(self.ldb_dc1, self.ou2)
355 # Check deleted on DC2
356 self._check_deleted(self.ldb_dc2, self.ou1)
357 self._check_deleted(self.ldb_dc2, self.ou2)
359 def test_ReplConflictsRenamedVsNewLocalWin(self):
360 """Tests resolving a DN conflict between a renamed object and a new object"""
361 self._disable_inbound_repl(self.dnsname_dc1)
362 self._disable_inbound_repl(self.dnsname_dc2)
364 # Create conflicting objects on DC1 and DC2, where the DC2 object has been renamed
365 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Rename Local Conflict orig")
366 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Rename Local Conflict,%s" % self.top_ou)
367 # We have to sleep to ensure that the two objects have different timestamps
368 time.sleep(1)
369 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Rename Local Conflict")
371 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
373 # Check that DC2 got the DC1 object, and OU2 was made into conflict
374 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
375 scope=SCOPE_BASE, attrs=["name"])
376 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
377 scope=SCOPE_BASE, attrs=["name"])
378 print(res1[0]["name"][0])
379 print(res2[0]["name"][0])
380 self.assertTrue('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
381 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
382 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
383 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
384 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
386 # Delete both objects by GUID on DC1
387 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
388 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
390 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
392 self._check_deleted(self.ldb_dc1, self.ou1)
393 self._check_deleted(self.ldb_dc1, self.ou2)
394 # Check deleted on DC2
395 self._check_deleted(self.ldb_dc2, self.ou1)
396 self._check_deleted(self.ldb_dc2, self.ou2)
398 def test_ReplConflictsRenameRemoteWin(self):
399 """Tests that objects created in conflict become conflict DNs"""
400 self._disable_inbound_repl(self.dnsname_dc1)
401 self._disable_inbound_repl(self.dnsname_dc2)
403 # Create conflicting objects on DC1 and DC2, with DC1 object created first
404 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Remote Rename Conflict")
405 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Remote Rename Conflict 2")
407 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
409 self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Remote Rename Conflict 3,%s" % self.top_ou)
410 # We have to sleep to ensure that the two objects have different timestamps
411 time.sleep(1)
412 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Remote Rename Conflict 3,%s" % self.top_ou)
414 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
416 # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
417 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
418 scope=SCOPE_BASE, attrs=["name"])
419 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
420 scope=SCOPE_BASE, attrs=["name"])
421 print(res1[0]["name"][0])
422 print(res2[0]["name"][0])
423 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
424 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
425 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
426 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
427 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
429 # Delete both objects by GUID on DC1
431 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
432 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
434 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
436 self._check_deleted(self.ldb_dc1, self.ou1)
437 self._check_deleted(self.ldb_dc1, self.ou2)
438 # Check deleted on DC2
439 self._check_deleted(self.ldb_dc2, self.ou1)
440 self._check_deleted(self.ldb_dc2, self.ou2)
442 def test_ReplConflictsRenameRemoteWin_with_child(self):
443 """Tests that objects created in conflict become conflict DNs"""
444 self._disable_inbound_repl(self.dnsname_dc1)
445 self._disable_inbound_repl(self.dnsname_dc2)
447 # Create conflicting objects on DC1 and DC2, with DC1 object created first
448 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Parent Remote Rename Conflict")
449 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Parent Remote Rename Conflict 2")
450 # Create children on DC2
451 ou1_child = self._create_ou(self.ldb_dc1, "OU=Test Child,OU=Test Parent Remote Rename Conflict")
452 ou2_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Test Parent Remote Rename Conflict 2")
454 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
456 self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Parent Remote Rename Conflict 3,%s" % self.top_ou)
457 # We have to sleep to ensure that the two objects have different timestamps
458 time.sleep(1)
459 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Parent Remote Rename Conflict 3,%s" % self.top_ou)
461 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
463 # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
464 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
465 scope=SCOPE_BASE, attrs=["name"])
466 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
467 scope=SCOPE_BASE, attrs=["name"])
468 print(res1[0]["name"][0])
469 print(res2[0]["name"][0])
470 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
471 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
472 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
473 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
474 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
476 # Delete both objects by GUID on DC1
478 self.ldb_dc1.delete('<GUID=%s>' % self.ou1, ["tree_delete:1"])
479 self.ldb_dc1.delete('<GUID=%s>' % self.ou2, ["tree_delete:1"])
481 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
483 self._check_deleted(self.ldb_dc1, self.ou1)
484 self._check_deleted(self.ldb_dc1, self.ou2)
485 # Check deleted on DC2
486 self._check_deleted(self.ldb_dc2, self.ou1)
487 self._check_deleted(self.ldb_dc2, self.ou2)
489 self._check_deleted(self.ldb_dc1, ou1_child)
490 self._check_deleted(self.ldb_dc1, ou2_child)
491 # Check deleted on DC2
492 self._check_deleted(self.ldb_dc2, ou1_child)
493 self._check_deleted(self.ldb_dc2, ou2_child)
495 def test_ReplConflictsRenameLocalWin(self):
496 """Tests that objects created in conflict become conflict DNs"""
497 self._disable_inbound_repl(self.dnsname_dc1)
498 self._disable_inbound_repl(self.dnsname_dc2)
500 # Create conflicting objects on DC1 and DC2, with DC1 object created first
501 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Rename Local Conflict")
502 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Rename Local Conflict 2")
504 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
506 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Rename Local Conflict 3,%s" % self.top_ou)
507 # We have to sleep to ensure that the two objects have different timestamps
508 time.sleep(1)
509 self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Rename Local Conflict 3,%s" % self.top_ou)
511 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
513 # Check that DC2 got the DC1 object, and OU2 was make into conflict
514 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
515 scope=SCOPE_BASE, attrs=["name"])
516 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
517 scope=SCOPE_BASE, attrs=["name"])
518 print(res1[0]["name"][0])
519 print(res2[0]["name"][0])
520 self.assertTrue('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
521 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
522 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
523 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
524 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
526 # Delete both objects by GUID on DC1
528 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
529 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
531 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
533 self._check_deleted(self.ldb_dc1, self.ou1)
534 self._check_deleted(self.ldb_dc1, self.ou2)
535 # Check deleted on DC2
536 self._check_deleted(self.ldb_dc2, self.ou1)
537 self._check_deleted(self.ldb_dc2, self.ou2)
539 def test_ReplLostAndFound(self):
540 """Tests that objects created under a OU deleted eleswhere end up in lostAndFound"""
541 self._disable_inbound_repl(self.dnsname_dc1)
542 self._disable_inbound_repl(self.dnsname_dc2)
544 # Create two OUs on DC2
545 self.ou1 = self._create_ou(self.ldb_dc2, "OU=Deleted parent")
546 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Deleted parent 2")
548 # replicate them from DC2 to DC1
549 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
551 # Delete both objects by GUID on DC1
553 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
554 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
556 # Create children on DC2
557 ou1_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Deleted parent")
558 ou2_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Deleted parent 2")
560 # Replicate from DC2
561 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
563 # Check the sub-OUs are now in lostAndFound and the first one is a conflict DN
565 # Check that DC2 got the DC1 object, and one or other object was make into conflict
566 res1 = self.ldb_dc1.search(base="<GUID=%s>" % ou1_child,
567 scope=SCOPE_BASE, attrs=["name"])
568 res2 = self.ldb_dc1.search(base="<GUID=%s>" % ou2_child,
569 scope=SCOPE_BASE, attrs=["name"])
570 print(res1[0]["name"][0])
571 print(res2[0]["name"][0])
572 self.assertTrue('CNF:%s' % ou1_child in str(res1[0]["name"][0]) or 'CNF:%s' % ou2_child in str(res2[0]["name"][0]))
573 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) in str(res1[0].dn))
574 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) in str(res2[0].dn))
575 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
576 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
578 # Delete all objects by GUID on DC1
580 self.ldb_dc1.delete('<GUID=%s>' % ou1_child)
581 self.ldb_dc1.delete('<GUID=%s>' % ou2_child)
583 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
585 # Check all deleted on DC1
586 self._check_deleted(self.ldb_dc1, self.ou1)
587 self._check_deleted(self.ldb_dc1, self.ou2)
588 self._check_deleted(self.ldb_dc1, ou1_child)
589 self._check_deleted(self.ldb_dc1, ou2_child)
590 # Check all deleted on DC2
591 self._check_deleted(self.ldb_dc2, self.ou1)
592 self._check_deleted(self.ldb_dc2, self.ou2)
593 self._check_deleted(self.ldb_dc2, ou1_child)
594 self._check_deleted(self.ldb_dc2, ou2_child)
596 def test_ReplRenames(self):
597 """Tests that objects created under a OU deleted eleswhere end up in lostAndFound"""
598 self._disable_inbound_repl(self.dnsname_dc1)
599 self._disable_inbound_repl(self.dnsname_dc2)
601 # Create two OUs on DC2
602 self.ou1 = self._create_ou(self.ldb_dc2, "OU=Original parent")
603 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Original parent 2")
605 # replicate them from DC2 to DC1
606 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
608 # Create children on DC1
609 ou1_child = self._create_ou(self.ldb_dc1, "OU=Test Child,OU=Original parent")
610 ou2_child = self._create_ou(self.ldb_dc1, "OU=Test Child 2,OU=Original parent")
611 ou3_child = self._create_ou(self.ldb_dc1, "OU=Test Case Child,OU=Original parent")
613 # replicate them from DC1 to DC2
614 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
616 self.ldb_dc1.rename("<GUID=%s>" % ou2_child, "OU=Test Child 3,OU=Original parent 2,%s" % self.top_ou)
617 self.ldb_dc1.rename("<GUID=%s>" % ou1_child, "OU=Test Child 2,OU=Original parent 2,%s" % self.top_ou)
618 self.ldb_dc1.rename("<GUID=%s>" % ou2_child, "OU=Test Child,OU=Original parent 2,%s" % self.top_ou)
619 self.ldb_dc1.rename("<GUID=%s>" % ou3_child, "OU=Test CASE Child,OU=Original parent,%s" % self.top_ou)
620 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Original parent 3,%s" % self.top_ou)
621 self.ldb_dc2.rename("<GUID=%s>" % self.ou1, "OU=Original parent 2,%s" % self.top_ou)
623 # replicate them from DC1 to DC2
624 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
626 # Check the sub-OUs are now under Original Parent 3 (original
627 # parent 2 for Test CASE Child), and both have the right names
629 # Check that DC2 got the DC1 object, and the renames are all correct
630 res1 = self.ldb_dc2.search(base="<GUID=%s>" % ou1_child,
631 scope=SCOPE_BASE, attrs=["name"])
632 res2 = self.ldb_dc2.search(base="<GUID=%s>" % ou2_child,
633 scope=SCOPE_BASE, attrs=["name"])
634 res3 = self.ldb_dc2.search(base="<GUID=%s>" % ou3_child,
635 scope=SCOPE_BASE, attrs=["name"])
636 print(res1[0].dn)
637 print(res2[0].dn)
638 print(res3[0].dn)
639 self.assertEqual('Test Child 2', str(res1[0]["name"][0]))
640 self.assertEqual('Test Child', str(res2[0]["name"][0]))
641 self.assertEqual('Test CASE Child', str(res3[0]["name"][0]))
642 self.assertEqual(str(res1[0].dn), "OU=Test Child 2,OU=Original parent 3,%s" % self.top_ou)
643 self.assertEqual(str(res2[0].dn), "OU=Test Child,OU=Original parent 3,%s" % self.top_ou)
644 self.assertEqual(str(res3[0].dn), "OU=Test CASE Child,OU=Original parent 2,%s" % self.top_ou)
646 # replicate them from DC2 to DC1
647 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
649 # Check that DC1 got the DC2 object, and the renames are all correct
650 res1 = self.ldb_dc1.search(base="<GUID=%s>" % ou1_child,
651 scope=SCOPE_BASE, attrs=["name"])
652 res2 = self.ldb_dc1.search(base="<GUID=%s>" % ou2_child,
653 scope=SCOPE_BASE, attrs=["name"])
654 res3 = self.ldb_dc1.search(base="<GUID=%s>" % ou3_child,
655 scope=SCOPE_BASE, attrs=["name"])
656 print(res1[0].dn)
657 print(res2[0].dn)
658 print(res3[0].dn)
659 self.assertEqual('Test Child 2', str(res1[0]["name"][0]))
660 self.assertEqual('Test Child', str(res2[0]["name"][0]))
661 self.assertEqual('Test CASE Child', str(res3[0]["name"][0]))
662 self.assertEqual(str(res1[0].dn), "OU=Test Child 2,OU=Original parent 3,%s" % self.top_ou)
663 self.assertEqual(str(res2[0].dn), "OU=Test Child,OU=Original parent 3,%s" % self.top_ou)
664 self.assertEqual(str(res3[0].dn), "OU=Test CASE Child,OU=Original parent 2,%s" % self.top_ou)
666 # Delete all objects by GUID on DC1
668 self.ldb_dc1.delete('<GUID=%s>' % ou1_child)
669 self.ldb_dc1.delete('<GUID=%s>' % ou2_child)
670 self.ldb_dc1.delete('<GUID=%s>' % ou3_child)
671 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
672 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
674 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
676 # Check all deleted on DC1
677 self._check_deleted(self.ldb_dc1, self.ou1)
678 self._check_deleted(self.ldb_dc1, self.ou2)
679 self._check_deleted(self.ldb_dc1, ou1_child)
680 self._check_deleted(self.ldb_dc1, ou2_child)
681 self._check_deleted(self.ldb_dc1, ou3_child)
682 # Check all deleted on DC2
683 self._check_deleted(self.ldb_dc2, self.ou1)
684 self._check_deleted(self.ldb_dc2, self.ou2)
685 self._check_deleted(self.ldb_dc2, ou1_child)
686 self._check_deleted(self.ldb_dc2, ou2_child)
687 self._check_deleted(self.ldb_dc2, ou3_child)
689 def reanimate_object(self, samdb, guid, new_dn):
690 """Re-animates a deleted object"""
691 res = samdb.search(base="<GUID=%s>" % guid, attrs=["isDeleted"],
692 controls=['show_deleted:1'], scope=SCOPE_BASE)
693 if len(res) != 1:
694 return
696 msg = ldb.Message()
697 msg.dn = res[0].dn
698 msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "isDeleted")
699 msg["distinguishedName"] = ldb.MessageElement([new_dn], ldb.FLAG_MOD_REPLACE, "distinguishedName")
700 samdb.modify(msg, ["show_deleted:1"])
702 def test_ReplReanimationConflict(self):
704 Checks that if a reanimated object conflicts with a new object, then
705 the conflict is resolved correctly.
708 self._disable_inbound_repl(self.dnsname_dc1)
709 self._disable_inbound_repl(self.dnsname_dc2)
711 # create an object, "accidentally" delete it, and replicate the changes to both DCs
712 self.ou1 = self._create_ou(self.ldb_dc2, "OU=Conflict object")
713 self.ldb_dc2.delete('<GUID=%s>' % self.ou1)
714 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
716 # Now pretend that the admin for one DC resolves the problem by
717 # re-animating the object...
718 self.reanimate_object(self.ldb_dc1, self.ou1, "OU=Conflict object,%s" % self.top_ou)
720 # ...whereas another admin just creates a user with the same name
721 # again on a different DC
722 time.sleep(1)
723 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Conflict object")
725 # Now sync the DCs to resolve the conflict
726 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
728 # Check the latest change won and SELF.OU1 was made into a conflict
729 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
730 scope=SCOPE_BASE, attrs=["name"])
731 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
732 scope=SCOPE_BASE, attrs=["name"])
733 print(res1[0]["name"][0])
734 print(res2[0]["name"][0])
735 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
736 self.assertFalse('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
738 # Delete both objects by GUID on DC1
739 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
740 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
742 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
744 self._check_deleted(self.ldb_dc1, self.ou1)
745 self._check_deleted(self.ldb_dc1, self.ou2)
746 # Check deleted on DC2
747 self._check_deleted(self.ldb_dc2, self.ou1)
748 self._check_deleted(self.ldb_dc2, self.ou2)