2 # -*- coding: utf-8 -*-
4 # Tests various schema replication scenarios
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # export DC1=dc1_dns_name
25 # export DC2=dc2_dns_name
26 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN replica_sync -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
30 from __future__
import print_function
37 SCOPE_BASE
, LdbError
, ERR_NO_SUCH_OBJECT
)
40 class DrsReplicaSyncTestCase(drs_base
.DrsBaseTestCase
):
41 """Intended as a black box test case for DsReplicaSync
42 implementation. It should test the behavior of this
43 case in cases when inbound replication is disabled"""
46 super(DrsReplicaSyncTestCase
, self
).setUp()
48 # This OU avoids this test conflicting with anything
49 # that may already be in the DB
50 self
.top_ou
= samba
.tests
.create_test_ou(self
.ldb_dc1
,
52 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, forced
=True)
53 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True)
58 self
._cleanup
_object
(self
.ou1
)
59 self
._cleanup
_object
(self
.ou2
)
60 self
._cleanup
_dn
(self
.top_ou
)
62 # re-enable replication
63 self
._enable
_inbound
_repl
(self
.dnsname_dc1
)
64 self
._enable
_inbound
_repl
(self
.dnsname_dc2
)
66 super(DrsReplicaSyncTestCase
, self
).tearDown()
68 def _cleanup_dn(self
, dn
):
70 self
.ldb_dc2
.delete(dn
, ["tree_delete:1"])
73 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
75 self
.ldb_dc1
.delete(dn
, ["tree_delete:1"])
76 except LdbError
as e1
:
78 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
80 def _cleanup_object(self
, guid
):
81 """Cleans up a test object, if it still exists"""
83 self
._cleanup
_dn
('<GUID=%s>' % guid
)
85 def test_ReplEnabled(self
):
86 """Tests we can replicate when replication is enabled"""
87 self
._enable
_inbound
_repl
(self
.dnsname_dc1
)
88 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=False)
90 def test_ReplDisabled(self
):
91 """Tests we cann't replicate when replication is disabled"""
92 self
._disable
_inbound
_repl
(self
.dnsname_dc1
)
94 ccache_name
= self
.get_creds_ccache_name()
96 # Tunnel the command line credentials down to the
97 # subcommand to avoid a new kinit
98 cmdline_auth
= "--krb5-ccache=%s" % ccache_name
100 # bin/samba-tool drs <drs_command> <cmdline_auth>
101 cmd_list
= ["drs", "replicate", cmdline_auth
]
103 nc_dn
= self
.domain_dn
104 # bin/samba-tool drs replicate <Dest_DC_NAME> <Src_DC_NAME> <Naming Context>
105 cmd_list
+= [self
.dnsname_dc1
, self
.dnsname_dc2
, nc_dn
]
107 (result
, out
, err
) = self
.runsubcmd(*cmd_list
)
108 self
.assertCmdFail(result
)
109 self
.assertTrue('WERR_DS_DRA_SINK_DISABLED' in err
)
111 def test_ReplDisabledForced(self
):
112 """Tests we can force replicate when replication is disabled"""
113 self
._disable
_inbound
_repl
(self
.dnsname_dc1
)
114 out
= self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True)
116 def test_ReplLocal(self
):
117 """Tests we can replicate direct to the local db"""
118 self
._enable
_inbound
_repl
(self
.dnsname_dc1
)
119 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=False, local
=True, full_sync
=True)
121 def _create_ou(self
, samdb
, name
):
124 objectClass: organizationalUnit
125 """ % (name
, self
.top_ou
)
127 res
= samdb
.search(base
="%s,%s" % (name
, self
.top_ou
),
128 scope
=SCOPE_BASE
, attrs
=["objectGUID"])
129 return self
._GUID
_string
(res
[0]["objectGUID"][0])
131 def _check_deleted(self
, sam_ldb
, guid
):
132 # search the user by guid as it may be deleted
133 res
= sam_ldb
.search(base
='<GUID=%s>' % guid
,
134 controls
=["show_deleted:1"],
135 attrs
=["isDeleted", "objectCategory", "ou"])
136 self
.assertEquals(len(res
), 1)
138 # Deleted Object base DN
139 dodn
= self
._deleted
_objects
_dn
(sam_ldb
)
140 # now check properties of the user
141 name_cur
= ou_cur
["ou"][0]
142 self
.assertEquals(ou_cur
["isDeleted"][0], b
"TRUE")
143 self
.assertTrue(not(b
"objectCategory" in ou_cur
))
144 self
.assertTrue(dodn
in str(ou_cur
["dn"]),
145 "OU %s is deleted but it is not located under %s!" % (name_cur
, dodn
))
147 def test_ReplConflictsFullSync(self
):
148 """Tests that objects created in conflict become conflict DNs (honour full sync override)"""
150 # First confirm local replication (so when we test against windows, this fails fast without creating objects)
151 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, local
=True, forced
=True, full_sync
=True)
153 self
._disable
_inbound
_repl
(self
.dnsname_dc1
)
154 self
._disable
_inbound
_repl
(self
.dnsname_dc2
)
156 # Create conflicting objects on DC1 and DC2, with DC1 object created first
157 self
.ou1
= self
._create
_ou
(self
.ldb_dc1
, "OU=Test Full Sync")
158 # We have to sleep to ensure that the two objects have different timestamps
160 self
.ou2
= self
._create
_ou
(self
.ldb_dc2
, "OU=Test Full Sync")
162 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, local
=True, forced
=True, full_sync
=True)
164 # Check that DC2 got the DC1 object, and OU1 was make into conflict
165 res1
= self
.ldb_dc2
.search(base
="<GUID=%s>" % self
.ou1
,
166 scope
=SCOPE_BASE
, attrs
=["name"])
167 res2
= self
.ldb_dc2
.search(base
="<GUID=%s>" % self
.ou2
,
168 scope
=SCOPE_BASE
, attrs
=["name"])
169 print(res1
[0]["name"][0])
170 print(res2
[0]["name"][0])
171 self
.assertFalse('CNF:%s' % self
.ou2
in str(res2
[0]["name"][0]))
172 self
.assertTrue('CNF:%s' % self
.ou1
in str(res1
[0]["name"][0]))
173 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc2
, self
.domain_dn
) not in str(res1
[0].dn
))
174 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc2
, self
.domain_dn
) not in str(res2
[0].dn
))
175 self
.assertEqual(str(res1
[0]["name"][0]), res1
[0].dn
.get_rdn_value())
176 self
.assertEqual(str(res2
[0]["name"][0]), res2
[0].dn
.get_rdn_value())
178 # Delete both objects by GUID on DC2
180 self
.ldb_dc2
.delete('<GUID=%s>' % self
.ou1
)
181 self
.ldb_dc2
.delete('<GUID=%s>' % self
.ou2
)
183 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=True)
185 self
._check
_deleted
(self
.ldb_dc1
, self
.ou1
)
186 self
._check
_deleted
(self
.ldb_dc1
, self
.ou2
)
187 # Check deleted on DC2
188 self
._check
_deleted
(self
.ldb_dc2
, self
.ou1
)
189 self
._check
_deleted
(self
.ldb_dc2
, self
.ou2
)
191 def test_ReplConflictsRemoteWin(self
):
192 """Tests that objects created in conflict become conflict DNs"""
193 self
._disable
_inbound
_repl
(self
.dnsname_dc1
)
194 self
._disable
_inbound
_repl
(self
.dnsname_dc2
)
196 # Create conflicting objects on DC1 and DC2, with DC1 object created first
197 self
.ou1
= self
._create
_ou
(self
.ldb_dc1
, "OU=Test Remote Conflict")
198 # We have to sleep to ensure that the two objects have different timestamps
200 self
.ou2
= self
._create
_ou
(self
.ldb_dc2
, "OU=Test Remote Conflict")
202 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
204 # Check that DC2 got the DC1 object, and OU1 was make into conflict
205 res1
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou1
,
206 scope
=SCOPE_BASE
, attrs
=["name"])
207 res2
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou2
,
208 scope
=SCOPE_BASE
, attrs
=["name"])
209 print(res1
[0]["name"][0])
210 print(res2
[0]["name"][0])
211 self
.assertTrue('CNF:%s' % self
.ou1
in str(res1
[0]["name"][0]))
212 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) not in str(res1
[0].dn
))
213 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) not in str(res2
[0].dn
))
214 self
.assertEqual(str(res1
[0]["name"][0]), res1
[0].dn
.get_rdn_value())
215 self
.assertEqual(str(res2
[0]["name"][0]), res2
[0].dn
.get_rdn_value())
217 # Delete both objects by GUID on DC1
219 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou1
)
220 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou2
)
222 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, forced
=True, full_sync
=False)
224 self
._check
_deleted
(self
.ldb_dc1
, self
.ou1
)
225 self
._check
_deleted
(self
.ldb_dc1
, self
.ou2
)
226 # Check deleted on DC2
227 self
._check
_deleted
(self
.ldb_dc2
, self
.ou1
)
228 self
._check
_deleted
(self
.ldb_dc2
, self
.ou2
)
230 def test_ReplConflictsLocalWin(self
):
231 """Tests that objects created in conflict become conflict DNs"""
232 self
._disable
_inbound
_repl
(self
.dnsname_dc1
)
233 self
._disable
_inbound
_repl
(self
.dnsname_dc2
)
235 # Create conflicting objects on DC1 and DC2, with DC2 object created first
236 self
.ou2
= self
._create
_ou
(self
.ldb_dc2
, "OU=Test Local Conflict")
237 # We have to sleep to ensure that the two objects have different timestamps
239 self
.ou1
= self
._create
_ou
(self
.ldb_dc1
, "OU=Test Local Conflict")
241 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
243 # Check that DC2 got the DC1 object, and OU2 was make into conflict
244 res1
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou1
,
245 scope
=SCOPE_BASE
, attrs
=["name"])
246 res2
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou2
,
247 scope
=SCOPE_BASE
, attrs
=["name"])
248 print(res1
[0]["name"][0])
249 print(res2
[0]["name"][0])
250 self
.assertTrue('CNF:%s' % self
.ou2
in str(res2
[0]["name"][0]), "Got %s for %s" % (str(res2
[0]["name"][0]), self
.ou2
))
251 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) not in str(res1
[0].dn
))
252 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) not in str(res2
[0].dn
))
253 self
.assertEqual(str(res1
[0]["name"][0]), res1
[0].dn
.get_rdn_value())
254 self
.assertEqual(str(res2
[0]["name"][0]), res2
[0].dn
.get_rdn_value())
256 # Delete both objects by GUID on DC1
258 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou1
)
259 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou2
)
261 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, forced
=True, full_sync
=False)
263 self
._check
_deleted
(self
.ldb_dc1
, self
.ou1
)
264 self
._check
_deleted
(self
.ldb_dc1
, self
.ou2
)
265 # Check deleted on DC2
266 self
._check
_deleted
(self
.ldb_dc2
, self
.ou1
)
267 self
._check
_deleted
(self
.ldb_dc2
, self
.ou2
)
269 def test_ReplConflictsRemoteWin_with_child(self
):
270 """Tests that objects created in conflict become conflict DNs"""
271 self
._disable
_inbound
_repl
(self
.dnsname_dc1
)
272 self
._disable
_inbound
_repl
(self
.dnsname_dc2
)
274 # Create conflicting objects on DC1 and DC2, with DC1 object created first
275 self
.ou1
= self
._create
_ou
(self
.ldb_dc1
, "OU=Test Parent Remote Conflict")
276 # We have to sleep to ensure that the two objects have different timestamps
278 self
.ou2
= self
._create
_ou
(self
.ldb_dc2
, "OU=Test Parent Remote Conflict")
279 # Create children on DC2
280 ou1_child
= self
._create
_ou
(self
.ldb_dc1
, "OU=Test Child,OU=Test Parent Remote Conflict")
281 ou2_child
= self
._create
_ou
(self
.ldb_dc2
, "OU=Test Child,OU=Test Parent Remote Conflict")
283 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
285 # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
286 res1
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou1
,
287 scope
=SCOPE_BASE
, attrs
=["name"])
288 res2
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou2
,
289 scope
=SCOPE_BASE
, attrs
=["name"])
290 print(res1
[0]["name"][0])
291 print(res2
[0]["name"][0])
292 self
.assertTrue('CNF:%s' % self
.ou1
in str(res1
[0]["name"][0]))
293 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) not in str(res1
[0].dn
))
294 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) not in str(res2
[0].dn
))
295 self
.assertEqual(str(res1
[0]["name"][0]), res1
[0].dn
.get_rdn_value())
296 self
.assertEqual(str(res2
[0]["name"][0]), res2
[0].dn
.get_rdn_value())
298 # Delete both objects by GUID on DC1
300 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou1
, ["tree_delete:1"])
301 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou2
, ["tree_delete:1"])
303 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, forced
=True, full_sync
=False)
305 self
._check
_deleted
(self
.ldb_dc1
, self
.ou1
)
306 self
._check
_deleted
(self
.ldb_dc1
, self
.ou2
)
307 # Check deleted on DC2
308 self
._check
_deleted
(self
.ldb_dc2
, self
.ou1
)
309 self
._check
_deleted
(self
.ldb_dc2
, self
.ou2
)
311 self
._check
_deleted
(self
.ldb_dc1
, ou1_child
)
312 self
._check
_deleted
(self
.ldb_dc1
, ou2_child
)
313 # Check deleted on DC2
314 self
._check
_deleted
(self
.ldb_dc2
, ou1_child
)
315 self
._check
_deleted
(self
.ldb_dc2
, ou2_child
)
317 def test_ReplConflictsRenamedVsNewRemoteWin(self
):
318 """Tests resolving a DN conflict between a renamed object and a new object"""
319 self
._disable
_inbound
_repl
(self
.dnsname_dc1
)
320 self
._disable
_inbound
_repl
(self
.dnsname_dc2
)
322 # Create an OU and rename it on DC1
323 self
.ou1
= self
._create
_ou
(self
.ldb_dc1
, "OU=Test Remote Rename Conflict orig")
324 self
.ldb_dc1
.rename("<GUID=%s>" % self
.ou1
, "OU=Test Remote Rename Conflict,%s" % self
.top_ou
)
326 # We have to sleep to ensure that the two objects have different timestamps
329 # create a conflicting object with the same DN on DC2
330 self
.ou2
= self
._create
_ou
(self
.ldb_dc2
, "OU=Test Remote Rename Conflict")
332 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
334 # Check that DC2 got the DC1 object, and SELF.OU1 was made into conflict
335 res1
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou1
,
336 scope
=SCOPE_BASE
, attrs
=["name"])
337 res2
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou2
,
338 scope
=SCOPE_BASE
, attrs
=["name"])
339 print(res1
[0]["name"][0])
340 print(res2
[0]["name"][0])
341 self
.assertTrue('CNF:%s' % self
.ou1
in str(res1
[0]["name"][0]))
342 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) not in str(res1
[0].dn
))
343 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) not in str(res2
[0].dn
))
344 self
.assertEqual(str(res1
[0]["name"][0]), res1
[0].dn
.get_rdn_value())
345 self
.assertEqual(str(res2
[0]["name"][0]), res2
[0].dn
.get_rdn_value())
347 # Delete both objects by GUID on DC1
348 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou1
)
349 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou2
)
351 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, forced
=True, full_sync
=False)
353 self
._check
_deleted
(self
.ldb_dc1
, self
.ou1
)
354 self
._check
_deleted
(self
.ldb_dc1
, self
.ou2
)
355 # Check deleted on DC2
356 self
._check
_deleted
(self
.ldb_dc2
, self
.ou1
)
357 self
._check
_deleted
(self
.ldb_dc2
, self
.ou2
)
359 def test_ReplConflictsRenamedVsNewLocalWin(self
):
360 """Tests resolving a DN conflict between a renamed object and a new object"""
361 self
._disable
_inbound
_repl
(self
.dnsname_dc1
)
362 self
._disable
_inbound
_repl
(self
.dnsname_dc2
)
364 # Create conflicting objects on DC1 and DC2, where the DC2 object has been renamed
365 self
.ou2
= self
._create
_ou
(self
.ldb_dc2
, "OU=Test Rename Local Conflict orig")
366 self
.ldb_dc2
.rename("<GUID=%s>" % self
.ou2
, "OU=Test Rename Local Conflict,%s" % self
.top_ou
)
367 # We have to sleep to ensure that the two objects have different timestamps
369 self
.ou1
= self
._create
_ou
(self
.ldb_dc1
, "OU=Test Rename Local Conflict")
371 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
373 # Check that DC2 got the DC1 object, and OU2 was made into conflict
374 res1
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou1
,
375 scope
=SCOPE_BASE
, attrs
=["name"])
376 res2
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou2
,
377 scope
=SCOPE_BASE
, attrs
=["name"])
378 print(res1
[0]["name"][0])
379 print(res2
[0]["name"][0])
380 self
.assertTrue('CNF:%s' % self
.ou2
in str(res2
[0]["name"][0]))
381 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) not in str(res1
[0].dn
))
382 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) not in str(res2
[0].dn
))
383 self
.assertEqual(str(res1
[0]["name"][0]), res1
[0].dn
.get_rdn_value())
384 self
.assertEqual(str(res2
[0]["name"][0]), res2
[0].dn
.get_rdn_value())
386 # Delete both objects by GUID on DC1
387 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou1
)
388 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou2
)
390 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, forced
=True, full_sync
=False)
392 self
._check
_deleted
(self
.ldb_dc1
, self
.ou1
)
393 self
._check
_deleted
(self
.ldb_dc1
, self
.ou2
)
394 # Check deleted on DC2
395 self
._check
_deleted
(self
.ldb_dc2
, self
.ou1
)
396 self
._check
_deleted
(self
.ldb_dc2
, self
.ou2
)
398 def test_ReplConflictsRenameRemoteWin(self
):
399 """Tests that objects created in conflict become conflict DNs"""
400 self
._disable
_inbound
_repl
(self
.dnsname_dc1
)
401 self
._disable
_inbound
_repl
(self
.dnsname_dc2
)
403 # Create conflicting objects on DC1 and DC2, with DC1 object created first
404 self
.ou1
= self
._create
_ou
(self
.ldb_dc1
, "OU=Test Remote Rename Conflict")
405 self
.ou2
= self
._create
_ou
(self
.ldb_dc2
, "OU=Test Remote Rename Conflict 2")
407 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
409 self
.ldb_dc1
.rename("<GUID=%s>" % self
.ou1
, "OU=Test Remote Rename Conflict 3,%s" % self
.top_ou
)
410 # We have to sleep to ensure that the two objects have different timestamps
412 self
.ldb_dc2
.rename("<GUID=%s>" % self
.ou2
, "OU=Test Remote Rename Conflict 3,%s" % self
.top_ou
)
414 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
416 # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
417 res1
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou1
,
418 scope
=SCOPE_BASE
, attrs
=["name"])
419 res2
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou2
,
420 scope
=SCOPE_BASE
, attrs
=["name"])
421 print(res1
[0]["name"][0])
422 print(res2
[0]["name"][0])
423 self
.assertTrue('CNF:%s' % self
.ou1
in str(res1
[0]["name"][0]))
424 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) not in str(res1
[0].dn
))
425 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) not in str(res2
[0].dn
))
426 self
.assertEqual(str(res1
[0]["name"][0]), res1
[0].dn
.get_rdn_value())
427 self
.assertEqual(str(res2
[0]["name"][0]), res2
[0].dn
.get_rdn_value())
429 # Delete both objects by GUID on DC1
431 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou1
)
432 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou2
)
434 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, forced
=True, full_sync
=False)
436 self
._check
_deleted
(self
.ldb_dc1
, self
.ou1
)
437 self
._check
_deleted
(self
.ldb_dc1
, self
.ou2
)
438 # Check deleted on DC2
439 self
._check
_deleted
(self
.ldb_dc2
, self
.ou1
)
440 self
._check
_deleted
(self
.ldb_dc2
, self
.ou2
)
442 def test_ReplConflictsRenameRemoteWin_with_child(self
):
443 """Tests that objects created in conflict become conflict DNs"""
444 self
._disable
_inbound
_repl
(self
.dnsname_dc1
)
445 self
._disable
_inbound
_repl
(self
.dnsname_dc2
)
447 # Create conflicting objects on DC1 and DC2, with DC1 object created first
448 self
.ou1
= self
._create
_ou
(self
.ldb_dc1
, "OU=Test Parent Remote Rename Conflict")
449 self
.ou2
= self
._create
_ou
(self
.ldb_dc2
, "OU=Test Parent Remote Rename Conflict 2")
450 # Create children on DC2
451 ou1_child
= self
._create
_ou
(self
.ldb_dc1
, "OU=Test Child,OU=Test Parent Remote Rename Conflict")
452 ou2_child
= self
._create
_ou
(self
.ldb_dc2
, "OU=Test Child,OU=Test Parent Remote Rename Conflict 2")
454 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
456 self
.ldb_dc1
.rename("<GUID=%s>" % self
.ou1
, "OU=Test Parent Remote Rename Conflict 3,%s" % self
.top_ou
)
457 # We have to sleep to ensure that the two objects have different timestamps
459 self
.ldb_dc2
.rename("<GUID=%s>" % self
.ou2
, "OU=Test Parent Remote Rename Conflict 3,%s" % self
.top_ou
)
461 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
463 # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
464 res1
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou1
,
465 scope
=SCOPE_BASE
, attrs
=["name"])
466 res2
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou2
,
467 scope
=SCOPE_BASE
, attrs
=["name"])
468 print(res1
[0]["name"][0])
469 print(res2
[0]["name"][0])
470 self
.assertTrue('CNF:%s' % self
.ou1
in str(res1
[0]["name"][0]))
471 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) not in str(res1
[0].dn
))
472 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) not in str(res2
[0].dn
))
473 self
.assertEqual(str(res1
[0]["name"][0]), res1
[0].dn
.get_rdn_value())
474 self
.assertEqual(str(res2
[0]["name"][0]), res2
[0].dn
.get_rdn_value())
476 # Delete both objects by GUID on DC1
478 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou1
, ["tree_delete:1"])
479 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou2
, ["tree_delete:1"])
481 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, forced
=True, full_sync
=False)
483 self
._check
_deleted
(self
.ldb_dc1
, self
.ou1
)
484 self
._check
_deleted
(self
.ldb_dc1
, self
.ou2
)
485 # Check deleted on DC2
486 self
._check
_deleted
(self
.ldb_dc2
, self
.ou1
)
487 self
._check
_deleted
(self
.ldb_dc2
, self
.ou2
)
489 self
._check
_deleted
(self
.ldb_dc1
, ou1_child
)
490 self
._check
_deleted
(self
.ldb_dc1
, ou2_child
)
491 # Check deleted on DC2
492 self
._check
_deleted
(self
.ldb_dc2
, ou1_child
)
493 self
._check
_deleted
(self
.ldb_dc2
, ou2_child
)
495 def test_ReplConflictsRenameLocalWin(self
):
496 """Tests that objects created in conflict become conflict DNs"""
497 self
._disable
_inbound
_repl
(self
.dnsname_dc1
)
498 self
._disable
_inbound
_repl
(self
.dnsname_dc2
)
500 # Create conflicting objects on DC1 and DC2, with DC1 object created first
501 self
.ou1
= self
._create
_ou
(self
.ldb_dc1
, "OU=Test Rename Local Conflict")
502 self
.ou2
= self
._create
_ou
(self
.ldb_dc2
, "OU=Test Rename Local Conflict 2")
504 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
506 self
.ldb_dc2
.rename("<GUID=%s>" % self
.ou2
, "OU=Test Rename Local Conflict 3,%s" % self
.top_ou
)
507 # We have to sleep to ensure that the two objects have different timestamps
509 self
.ldb_dc1
.rename("<GUID=%s>" % self
.ou1
, "OU=Test Rename Local Conflict 3,%s" % self
.top_ou
)
511 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
513 # Check that DC2 got the DC1 object, and OU2 was make into conflict
514 res1
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou1
,
515 scope
=SCOPE_BASE
, attrs
=["name"])
516 res2
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou2
,
517 scope
=SCOPE_BASE
, attrs
=["name"])
518 print(res1
[0]["name"][0])
519 print(res2
[0]["name"][0])
520 self
.assertTrue('CNF:%s' % self
.ou2
in str(res2
[0]["name"][0]))
521 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) not in str(res1
[0].dn
))
522 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) not in str(res2
[0].dn
))
523 self
.assertEqual(str(res1
[0]["name"][0]), res1
[0].dn
.get_rdn_value())
524 self
.assertEqual(str(res2
[0]["name"][0]), res2
[0].dn
.get_rdn_value())
526 # Delete both objects by GUID on DC1
528 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou1
)
529 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou2
)
531 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, forced
=True, full_sync
=False)
533 self
._check
_deleted
(self
.ldb_dc1
, self
.ou1
)
534 self
._check
_deleted
(self
.ldb_dc1
, self
.ou2
)
535 # Check deleted on DC2
536 self
._check
_deleted
(self
.ldb_dc2
, self
.ou1
)
537 self
._check
_deleted
(self
.ldb_dc2
, self
.ou2
)
539 def test_ReplLostAndFound(self
):
540 """Tests that objects created under a OU deleted eleswhere end up in lostAndFound"""
541 self
._disable
_inbound
_repl
(self
.dnsname_dc1
)
542 self
._disable
_inbound
_repl
(self
.dnsname_dc2
)
544 # Create two OUs on DC2
545 self
.ou1
= self
._create
_ou
(self
.ldb_dc2
, "OU=Deleted parent")
546 self
.ou2
= self
._create
_ou
(self
.ldb_dc2
, "OU=Deleted parent 2")
548 # replicate them from DC2 to DC1
549 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
551 # Delete both objects by GUID on DC1
553 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou1
)
554 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou2
)
556 # Create children on DC2
557 ou1_child
= self
._create
_ou
(self
.ldb_dc2
, "OU=Test Child,OU=Deleted parent")
558 ou2_child
= self
._create
_ou
(self
.ldb_dc2
, "OU=Test Child,OU=Deleted parent 2")
561 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
563 # Check the sub-OUs are now in lostAndFound and the first one is a conflict DN
565 # Check that DC2 got the DC1 object, and one or other object was make into conflict
566 res1
= self
.ldb_dc1
.search(base
="<GUID=%s>" % ou1_child
,
567 scope
=SCOPE_BASE
, attrs
=["name"])
568 res2
= self
.ldb_dc1
.search(base
="<GUID=%s>" % ou2_child
,
569 scope
=SCOPE_BASE
, attrs
=["name"])
570 print(res1
[0]["name"][0])
571 print(res2
[0]["name"][0])
572 self
.assertTrue('CNF:%s' % ou1_child
in str(res1
[0]["name"][0]) or 'CNF:%s' % ou2_child
in str(res2
[0]["name"][0]))
573 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) in str(res1
[0].dn
))
574 self
.assertTrue(self
._lost
_and
_found
_dn
(self
.ldb_dc1
, self
.domain_dn
) in str(res2
[0].dn
))
575 self
.assertEqual(str(res1
[0]["name"][0]), res1
[0].dn
.get_rdn_value())
576 self
.assertEqual(str(res2
[0]["name"][0]), res2
[0].dn
.get_rdn_value())
578 # Delete all objects by GUID on DC1
580 self
.ldb_dc1
.delete('<GUID=%s>' % ou1_child
)
581 self
.ldb_dc1
.delete('<GUID=%s>' % ou2_child
)
583 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, forced
=True, full_sync
=False)
585 # Check all deleted on DC1
586 self
._check
_deleted
(self
.ldb_dc1
, self
.ou1
)
587 self
._check
_deleted
(self
.ldb_dc1
, self
.ou2
)
588 self
._check
_deleted
(self
.ldb_dc1
, ou1_child
)
589 self
._check
_deleted
(self
.ldb_dc1
, ou2_child
)
590 # Check all deleted on DC2
591 self
._check
_deleted
(self
.ldb_dc2
, self
.ou1
)
592 self
._check
_deleted
(self
.ldb_dc2
, self
.ou2
)
593 self
._check
_deleted
(self
.ldb_dc2
, ou1_child
)
594 self
._check
_deleted
(self
.ldb_dc2
, ou2_child
)
596 def test_ReplRenames(self
):
597 """Tests that objects created under a OU deleted eleswhere end up in lostAndFound"""
598 self
._disable
_inbound
_repl
(self
.dnsname_dc1
)
599 self
._disable
_inbound
_repl
(self
.dnsname_dc2
)
601 # Create two OUs on DC2
602 self
.ou1
= self
._create
_ou
(self
.ldb_dc2
, "OU=Original parent")
603 self
.ou2
= self
._create
_ou
(self
.ldb_dc2
, "OU=Original parent 2")
605 # replicate them from DC2 to DC1
606 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
608 # Create children on DC1
609 ou1_child
= self
._create
_ou
(self
.ldb_dc1
, "OU=Test Child,OU=Original parent")
610 ou2_child
= self
._create
_ou
(self
.ldb_dc1
, "OU=Test Child 2,OU=Original parent")
611 ou3_child
= self
._create
_ou
(self
.ldb_dc1
, "OU=Test Case Child,OU=Original parent")
613 # replicate them from DC1 to DC2
614 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, forced
=True, full_sync
=False)
616 self
.ldb_dc1
.rename("<GUID=%s>" % ou2_child
, "OU=Test Child 3,OU=Original parent 2,%s" % self
.top_ou
)
617 self
.ldb_dc1
.rename("<GUID=%s>" % ou1_child
, "OU=Test Child 2,OU=Original parent 2,%s" % self
.top_ou
)
618 self
.ldb_dc1
.rename("<GUID=%s>" % ou2_child
, "OU=Test Child,OU=Original parent 2,%s" % self
.top_ou
)
619 self
.ldb_dc1
.rename("<GUID=%s>" % ou3_child
, "OU=Test CASE Child,OU=Original parent,%s" % self
.top_ou
)
620 self
.ldb_dc2
.rename("<GUID=%s>" % self
.ou2
, "OU=Original parent 3,%s" % self
.top_ou
)
621 self
.ldb_dc2
.rename("<GUID=%s>" % self
.ou1
, "OU=Original parent 2,%s" % self
.top_ou
)
623 # replicate them from DC1 to DC2
624 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, forced
=True, full_sync
=False)
626 # Check the sub-OUs are now under Original Parent 3 (original
627 # parent 2 for Test CASE Child), and both have the right names
629 # Check that DC2 got the DC1 object, and the renames are all correct
630 res1
= self
.ldb_dc2
.search(base
="<GUID=%s>" % ou1_child
,
631 scope
=SCOPE_BASE
, attrs
=["name"])
632 res2
= self
.ldb_dc2
.search(base
="<GUID=%s>" % ou2_child
,
633 scope
=SCOPE_BASE
, attrs
=["name"])
634 res3
= self
.ldb_dc2
.search(base
="<GUID=%s>" % ou3_child
,
635 scope
=SCOPE_BASE
, attrs
=["name"])
639 self
.assertEqual('Test Child 2', str(res1
[0]["name"][0]))
640 self
.assertEqual('Test Child', str(res2
[0]["name"][0]))
641 self
.assertEqual('Test CASE Child', str(res3
[0]["name"][0]))
642 self
.assertEqual(str(res1
[0].dn
), "OU=Test Child 2,OU=Original parent 3,%s" % self
.top_ou
)
643 self
.assertEqual(str(res2
[0].dn
), "OU=Test Child,OU=Original parent 3,%s" % self
.top_ou
)
644 self
.assertEqual(str(res3
[0].dn
), "OU=Test CASE Child,OU=Original parent 2,%s" % self
.top_ou
)
646 # replicate them from DC2 to DC1
647 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
649 # Check that DC1 got the DC2 object, and the renames are all correct
650 res1
= self
.ldb_dc1
.search(base
="<GUID=%s>" % ou1_child
,
651 scope
=SCOPE_BASE
, attrs
=["name"])
652 res2
= self
.ldb_dc1
.search(base
="<GUID=%s>" % ou2_child
,
653 scope
=SCOPE_BASE
, attrs
=["name"])
654 res3
= self
.ldb_dc1
.search(base
="<GUID=%s>" % ou3_child
,
655 scope
=SCOPE_BASE
, attrs
=["name"])
659 self
.assertEqual('Test Child 2', str(res1
[0]["name"][0]))
660 self
.assertEqual('Test Child', str(res2
[0]["name"][0]))
661 self
.assertEqual('Test CASE Child', str(res3
[0]["name"][0]))
662 self
.assertEqual(str(res1
[0].dn
), "OU=Test Child 2,OU=Original parent 3,%s" % self
.top_ou
)
663 self
.assertEqual(str(res2
[0].dn
), "OU=Test Child,OU=Original parent 3,%s" % self
.top_ou
)
664 self
.assertEqual(str(res3
[0].dn
), "OU=Test CASE Child,OU=Original parent 2,%s" % self
.top_ou
)
666 # Delete all objects by GUID on DC1
668 self
.ldb_dc1
.delete('<GUID=%s>' % ou1_child
)
669 self
.ldb_dc1
.delete('<GUID=%s>' % ou2_child
)
670 self
.ldb_dc1
.delete('<GUID=%s>' % ou3_child
)
671 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou1
)
672 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou2
)
674 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, forced
=True, full_sync
=False)
676 # Check all deleted on DC1
677 self
._check
_deleted
(self
.ldb_dc1
, self
.ou1
)
678 self
._check
_deleted
(self
.ldb_dc1
, self
.ou2
)
679 self
._check
_deleted
(self
.ldb_dc1
, ou1_child
)
680 self
._check
_deleted
(self
.ldb_dc1
, ou2_child
)
681 self
._check
_deleted
(self
.ldb_dc1
, ou3_child
)
682 # Check all deleted on DC2
683 self
._check
_deleted
(self
.ldb_dc2
, self
.ou1
)
684 self
._check
_deleted
(self
.ldb_dc2
, self
.ou2
)
685 self
._check
_deleted
(self
.ldb_dc2
, ou1_child
)
686 self
._check
_deleted
(self
.ldb_dc2
, ou2_child
)
687 self
._check
_deleted
(self
.ldb_dc2
, ou3_child
)
689 def reanimate_object(self
, samdb
, guid
, new_dn
):
690 """Re-animates a deleted object"""
691 res
= samdb
.search(base
="<GUID=%s>" % guid
, attrs
=["isDeleted"],
692 controls
=['show_deleted:1'], scope
=SCOPE_BASE
)
698 msg
["isDeleted"] = ldb
.MessageElement([], ldb
.FLAG_MOD_DELETE
, "isDeleted")
699 msg
["distinguishedName"] = ldb
.MessageElement([new_dn
], ldb
.FLAG_MOD_REPLACE
, "distinguishedName")
700 samdb
.modify(msg
, ["show_deleted:1"])
702 def test_ReplReanimationConflict(self
):
704 Checks that if a reanimated object conflicts with a new object, then
705 the conflict is resolved correctly.
708 self
._disable
_inbound
_repl
(self
.dnsname_dc1
)
709 self
._disable
_inbound
_repl
(self
.dnsname_dc2
)
711 # create an object, "accidentally" delete it, and replicate the changes to both DCs
712 self
.ou1
= self
._create
_ou
(self
.ldb_dc2
, "OU=Conflict object")
713 self
.ldb_dc2
.delete('<GUID=%s>' % self
.ou1
)
714 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
716 # Now pretend that the admin for one DC resolves the problem by
717 # re-animating the object...
718 self
.reanimate_object(self
.ldb_dc1
, self
.ou1
, "OU=Conflict object,%s" % self
.top_ou
)
720 # ...whereas another admin just creates a user with the same name
721 # again on a different DC
723 self
.ou2
= self
._create
_ou
(self
.ldb_dc2
, "OU=Conflict object")
725 # Now sync the DCs to resolve the conflict
726 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, forced
=True, full_sync
=False)
728 # Check the latest change won and SELF.OU1 was made into a conflict
729 res1
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou1
,
730 scope
=SCOPE_BASE
, attrs
=["name"])
731 res2
= self
.ldb_dc1
.search(base
="<GUID=%s>" % self
.ou2
,
732 scope
=SCOPE_BASE
, attrs
=["name"])
733 print(res1
[0]["name"][0])
734 print(res2
[0]["name"][0])
735 self
.assertTrue('CNF:%s' % self
.ou1
in str(res1
[0]["name"][0]))
736 self
.assertFalse('CNF:%s' % self
.ou2
in str(res2
[0]["name"][0]))
738 # Delete both objects by GUID on DC1
739 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou1
)
740 self
.ldb_dc1
.delete('<GUID=%s>' % self
.ou2
)
742 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, forced
=True, full_sync
=False)
744 self
._check
_deleted
(self
.ldb_dc1
, self
.ou1
)
745 self
._check
_deleted
(self
.ldb_dc1
, self
.ou2
)
746 # Check deleted on DC2
747 self
._check
_deleted
(self
.ldb_dc2
, self
.ou1
)
748 self
._check
_deleted
(self
.ldb_dc2
, self
.ou2
)