1 # -*- coding: utf-8 -*-
2 # Originally based on tests for samba.kcc.ldif_import_export.
3 # Copyright (C) Andrew Bartlett 2015, 2018
5 # by Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Tests for samba-tool visualize using the vampire DC and promoted DC
21 environments. For most tests we assume we can't assert much about what
22 state they are in, so we mainly check for command failure, but for
23 others we try to grasp control of replication and make more specific
27 from __future__
import print_function
32 from samba
.tests
.samba_tool
.base
import SambaToolCmdTest
37 'promoted_dc': ['CN=PROMOTEDVDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com',
38 'CN=LOCALDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'],
39 'vampire_dc': ['CN=LOCALDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com',
40 'CN=LOCALVAMPIREDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'],
44 def set_auto_replication(dc
, allow
):
45 credstring
= '-U%s%%%s' % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"])
46 on_or_off
= '-' if allow
else '+'
48 for opt
in ['DISABLE_INBOUND_REPL',
49 'DISABLE_OUTBOUND_REPL']:
50 cmd
= ['bin/samba-tool',
53 "--dsa-option=%s%s" % (on_or_off
, opt
)]
55 subprocess
.check_call(cmd
)
58 def force_replication(src
, dest
, base
):
59 credstring
= '-U%s%%%s' % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"])
60 cmd
= ['bin/samba-tool',
66 subprocess
.check_call(cmd
)
69 def get_utf8_matrix(s
):
70 # parse the graphical table *just* well enough for our tests
72 s
= re
.sub("\033" r
"\[[^m]+m", '', s
)
74 # matrix rows have '·' on the diagonal
75 rows
= [x
.strip().replace('·', '0') for x
in lines
if '·' in x
]
79 parts
= r
.rsplit(None, len(rows
))
80 k
, v
= parts
[0], parts
[1:]
81 # we want the FOO in 'CN=FOO+' or 'CN=FOO,CN=x,DC=...'
82 k
= re
.match(r
'cn=([^+,]+)', k
.lower()).group(1)
84 if len(v
) == 1: # this is a single-digit matrix, no spaces
86 values
.append([int(x
) if x
.isdigit() else 1e999
for x
in v
])
89 for n1
, row
in zip(names
, values
):
91 for n2
, v
in zip(names
, row
):
97 class SambaToolVisualizeDrsTest(SambaToolCmdTest
):
99 super(SambaToolVisualizeDrsTest
, self
).setUp()
101 def test_ntdsconn(self
):
102 server
= "ldap://%s" % os
.environ
["SERVER"]
103 creds
= "%s%%%s" % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"])
104 (result
, out
, err
) = self
.runsubcmd("visualize", "ntdsconn",
108 self
.assertCmdSuccess(result
, out
, err
)
110 def test_ntdsconn_remote(self
):
111 server
= "ldap://%s" % os
.environ
["SERVER"]
112 creds
= "%s%%%s" % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"])
113 (result
, out
, err
) = self
.runsubcmd("visualize", "ntdsconn",
116 '--color=no', '-S', '-r')
117 self
.assertCmdSuccess(result
, out
, err
)
120 server
= "ldap://%s" % os
.environ
["SERVER"]
121 creds
= "%s%%%s" % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"])
122 (result
, out
, err
) = self
.runsubcmd("visualize", "reps",
126 self
.assertCmdSuccess(result
, out
, err
)
128 def test_uptodateness_all_partitions(self
):
129 creds
= "%s%%%s" % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"])
130 dc1
= os
.environ
["SERVER"]
131 dc2
= os
.environ
["DC_SERVER"]
132 # We will check that the visualisation works for the two
133 # stopped DCs, but we can't make assertions that the output
134 # will be the same because there may be replication between
135 # the two calls. Stopping the replication on these ones is not
136 # enough because there are other DCs about.
137 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
139 '-H', "ldap://%s" % dc1
,
142 self
.assertCmdSuccess(result
, out
, err
)
144 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
146 '-H', "ldap://%s" % dc2
,
149 self
.assertCmdSuccess(result
, out
, err
)
151 def test_uptodateness_partitions(self
):
152 creds
= "%s%%%s" % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"])
153 dc1
= os
.environ
["SERVER"]
154 for part
in ["CONFIGURATION",
159 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
161 '-H', "ldap://%s" % dc1
,
165 self
.assertCmdSuccess(result
, out
, err
)
167 def assert_matrix_validity(self
, matrix
, dcs
=()):
169 self
.assertIn(dc
, matrix
)
170 for k
, row
in matrix
.items():
171 self
.assertEqual(row
[k
], 0)
173 def test_uptodateness_stop_replication_domain(self
):
174 creds
= "%s%%%s" % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"])
175 dc1
= os
.environ
["SERVER"]
176 dc2
= os
.environ
["DC_SERVER"]
177 self
.addCleanup(set_auto_replication
, dc1
, True)
178 self
.addCleanup(set_auto_replication
, dc2
, True)
180 def display(heading
, out
):
182 print("========", heading
, "=========")
185 samdb1
= self
.getSamDB("-H", "ldap://%s" % dc1
, "-U", creds
)
186 samdb2
= self
.getSamDB("-H", "ldap://%s" % dc2
, "-U", creds
)
188 domain_dn
= samdb1
.domain_dn()
189 self
.assertTrue(domain_dn
== samdb2
.domain_dn(),
190 "We expected the same domain_dn across DCs")
192 ou1
= "OU=dc1.%x,%s" % (random
.randrange(1 << 64), domain_dn
)
193 ou2
= "OU=dc2.%x,%s" % (random
.randrange(1 << 64), domain_dn
)
196 "objectclass": "organizationalUnit"
200 "objectclass": "organizationalUnit"
203 set_auto_replication(dc1
, False)
204 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
206 '-H', "ldap://%s" % dc1
,
210 '--partition', 'DOMAIN')
211 display("dc1 replication is now off", out
)
212 self
.assertCmdSuccess(result
, out
, err
)
213 matrix
= get_utf8_matrix(out
)
214 self
.assert_matrix_validity(matrix
, [dc1
, dc2
])
216 force_replication(dc2
, dc1
, domain_dn
)
217 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
219 '-H', "ldap://%s" % dc1
,
223 '--partition', 'DOMAIN')
224 display("forced replication %s -> %s" % (dc2
, dc1
), out
)
225 self
.assertCmdSuccess(result
, out
, err
)
226 matrix
= get_utf8_matrix(out
)
227 self
.assert_matrix_validity(matrix
, [dc1
, dc2
])
228 self
.assertEqual(matrix
[dc1
][dc2
], 0)
230 force_replication(dc1
, dc2
, domain_dn
)
231 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
233 '-H', "ldap://%s" % dc1
,
237 '--partition', 'DOMAIN')
238 display("forced replication %s -> %s" % (dc2
, dc1
), out
)
239 self
.assertCmdSuccess(result
, out
, err
)
240 matrix
= get_utf8_matrix(out
)
241 self
.assert_matrix_validity(matrix
, [dc1
, dc2
])
242 self
.assertEqual(matrix
[dc2
][dc1
], 0)
244 dn1
= 'cn=u1.%%d,%s' % (ou1
)
245 dn2
= 'cn=u2.%%d,%s' % (ou2
)
250 "objectclass": "user"
253 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
255 '-H', "ldap://%s" % dc1
,
259 '--partition', 'DOMAIN')
260 display("added 10 users on %s" % dc1
, out
)
261 self
.assertCmdSuccess(result
, out
, err
)
262 matrix
= get_utf8_matrix(out
)
263 self
.assert_matrix_validity(matrix
, [dc1
, dc2
])
264 # dc2's view of dc1 should now be 10 changes out of date
265 self
.assertEqual(matrix
[dc2
][dc1
], 10)
270 "objectclass": "user"
273 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
275 '-H', "ldap://%s" % dc1
,
279 '--partition', 'DOMAIN')
280 display("added 10 users on %s" % dc2
, out
)
281 self
.assertCmdSuccess(result
, out
, err
)
282 matrix
= get_utf8_matrix(out
)
283 self
.assert_matrix_validity(matrix
, [dc1
, dc2
])
284 # dc1's view of dc2 is probably 11 changes out of date
285 self
.assertGreaterEqual(matrix
[dc1
][dc2
], 10)
287 for i
in range(10, 101):
290 "objectclass": "user"
294 "objectclass": "user"
297 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
299 '-H', "ldap://%s" % dc1
,
303 '--partition', 'DOMAIN')
304 display("added 91 users on both", out
)
305 self
.assertCmdSuccess(result
, out
, err
)
306 matrix
= get_utf8_matrix(out
)
307 self
.assert_matrix_validity(matrix
, [dc1
, dc2
])
308 # the difference here should be ~101.
309 self
.assertGreaterEqual(matrix
[dc1
][dc2
], 100)
310 self
.assertGreaterEqual(matrix
[dc2
][dc1
], 100)
312 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
314 '-H', "ldap://%s" % dc1
,
318 '--partition', 'DOMAIN',
320 display("with --max-digits 2", out
)
321 self
.assertCmdSuccess(result
, out
, err
)
322 matrix
= get_utf8_matrix(out
)
323 self
.assert_matrix_validity(matrix
, [dc1
, dc2
])
324 # visualising with 2 digits mean these overflow into infinity
325 self
.assertGreaterEqual(matrix
[dc1
][dc2
], 1e99
)
326 self
.assertGreaterEqual(matrix
[dc2
][dc1
], 1e99
)
328 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
330 '-H', "ldap://%s" % dc1
,
334 '--partition', 'DOMAIN',
336 display("with --max-digits 1", out
)
337 self
.assertCmdSuccess(result
, out
, err
)
338 matrix
= get_utf8_matrix(out
)
339 self
.assert_matrix_validity(matrix
, [dc1
, dc2
])
340 # visualising with 1 digit means these overflow into infinity
341 self
.assertGreaterEqual(matrix
[dc1
][dc2
], 1e99
)
342 self
.assertGreaterEqual(matrix
[dc2
][dc1
], 1e99
)
344 force_replication(dc2
, dc1
, samdb1
.domain_dn())
345 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
347 '-H', "ldap://%s" % dc1
,
351 '--partition', 'DOMAIN')
353 display("forced replication %s -> %s" % (dc2
, dc1
), out
)
354 self
.assertCmdSuccess(result
, out
, err
)
355 matrix
= get_utf8_matrix(out
)
356 self
.assert_matrix_validity(matrix
, [dc1
, dc2
])
357 self
.assertEqual(matrix
[dc1
][dc2
], 0)
359 force_replication(dc1
, dc2
, samdb2
.domain_dn())
360 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
362 '-H', "ldap://%s" % dc1
,
366 '--partition', 'DOMAIN')
368 display("forced replication %s -> %s" % (dc1
, dc2
), out
)
369 self
.assertCmdSuccess(result
, out
, err
)
370 matrix
= get_utf8_matrix(out
)
371 self
.assert_matrix_validity(matrix
, [dc1
, dc2
])
372 self
.assertEqual(matrix
[dc2
][dc1
], 0)
374 samdb1
.delete(ou1
, ['tree_delete:1'])
375 samdb2
.delete(ou2
, ['tree_delete:1'])
377 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
379 '-H', "ldap://%s" % dc1
,
383 '--partition', 'DOMAIN')
384 display("tree delete both ous on %s" % (dc1
,), out
)
385 self
.assertCmdSuccess(result
, out
, err
)
386 matrix
= get_utf8_matrix(out
)
387 self
.assert_matrix_validity(matrix
, [dc1
, dc2
])
388 self
.assertGreaterEqual(matrix
[dc1
][dc2
], 100)
389 self
.assertGreaterEqual(matrix
[dc2
][dc1
], 100)
391 set_auto_replication(dc1
, True)
392 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
394 '-H', "ldap://%s" % dc1
,
398 '--partition', 'DOMAIN')
399 display("replication is now on", out
)
400 self
.assertCmdSuccess(result
, out
, err
)
401 matrix
= get_utf8_matrix(out
)
402 self
.assert_matrix_validity(matrix
, [dc1
, dc2
])
403 # We can't assert actual values after this because
404 # auto-replication is on and things will change underneath us.
406 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
408 '-H', "ldap://%s" % dc2
,
412 '--partition', 'DOMAIN')
414 display("%s's view" % dc2
, out
)
415 self
.assertCmdSuccess(result
, out
, err
)
416 matrix
= get_utf8_matrix(out
)
417 self
.assert_matrix_validity(matrix
, [dc1
, dc2
])
419 force_replication(dc1
, dc2
, samdb2
.domain_dn())
420 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
422 '-H', "ldap://%s" % dc1
,
426 '--partition', 'DOMAIN')
428 display("forced replication %s -> %s" % (dc1
, dc2
), out
)
429 self
.assertCmdSuccess(result
, out
, err
)
430 matrix
= get_utf8_matrix(out
)
431 self
.assert_matrix_validity(matrix
, [dc1
, dc2
])
433 force_replication(dc2
, dc1
, samdb2
.domain_dn())
434 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
436 '-H', "ldap://%s" % dc1
,
440 '--partition', 'DOMAIN')
441 display("forced replication %s -> %s" % (dc2
, dc1
), out
)
442 self
.assertCmdSuccess(result
, out
, err
)
443 matrix
= get_utf8_matrix(out
)
444 self
.assert_matrix_validity(matrix
, [dc1
, dc2
])
446 (result
, out
, err
) = self
.runsubcmd("visualize", "uptodateness",
448 '-H', "ldap://%s" % dc2
,
452 '--partition', 'DOMAIN')
453 display("%s's view" % dc2
, out
)
455 self
.assertCmdSuccess(result
, out
, err
)
456 matrix
= get_utf8_matrix(out
)
457 self
.assert_matrix_validity(matrix
, [dc1
, dc2
])
459 def test_reps_remote(self
):
460 server
= "ldap://%s" % os
.environ
["SERVER"]
461 creds
= "%s%%%s" % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"])
462 (result
, out
, err
) = self
.runsubcmd("visualize", "reps",
465 '--color=no', '-S', '-r')
466 self
.assertCmdSuccess(result
, out
, err
)
468 def test_ntdsconn_dot(self
):
469 server
= "ldap://%s" % os
.environ
["SERVER"]
470 creds
= "%s%%%s" % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"])
471 (result
, out
, err
) = self
.runsubcmd("visualize", "ntdsconn",
473 '-U', creds
, '--dot',
475 self
.assertCmdSuccess(result
, out
, err
)
477 def test_ntdsconn_remote_dot(self
):
478 server
= "ldap://%s" % os
.environ
["SERVER"]
479 creds
= "%s%%%s" % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"])
480 (result
, out
, err
) = self
.runsubcmd("visualize", "ntdsconn",
482 '-U', creds
, '--dot',
483 '--color=no', '-S', '-r')
484 self
.assertCmdSuccess(result
, out
, err
)
486 def test_reps_dot(self
):
487 server
= "ldap://%s" % os
.environ
["SERVER"]
488 creds
= "%s%%%s" % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"])
489 (result
, out
, err
) = self
.runsubcmd("visualize", "reps",
491 '-U', creds
, '--dot',
493 self
.assertCmdSuccess(result
, out
, err
)
495 def test_reps_remote_dot(self
):
496 server
= "ldap://%s" % os
.environ
["SERVER"]
497 creds
= "%s%%%s" % (os
.environ
["USERNAME"], os
.environ
["PASSWORD"])
498 (result
, out
, err
) = self
.runsubcmd("visualize", "reps",
500 '-U', creds
, '--dot',
501 '--color=no', '-S', '-r')
502 self
.assertCmdSuccess(result
, out
, err
)