selftest: Increase account lockout windows to make test more realiable
[Samba.git] / source4 / dsdb / tests / python / rodc_rwdc.py
blob0340042e19d675fb4887e0d685b6c2b12130b602
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3 from __future__ import print_function
4 """Test communication of credentials etc, between an RODC and a RWDC.
6 How does it work when the password is changed on the RWDC?
7 """
9 import optparse
10 import sys
11 import base64
12 import uuid
13 import subprocess
14 import itertools
15 import time
17 sys.path.insert(0, "bin/python")
18 import samba
19 import ldb
21 from samba.tests.subunitrun import SubunitOptions, TestProgram
22 import samba.getopt as options
24 from samba.auth import system_session
25 from samba.samdb import SamDB
26 from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
27 from samba import gensec, dsdb
28 from ldb import SCOPE_BASE, LdbError, ERR_INVALID_CREDENTIALS
29 from samba.dcerpc import security, samr
30 import os
32 import password_lockout_base
34 def adjust_cmd_for_py_version(parts):
35 if os.getenv("PYTHON", None):
36 parts.insert(0, os.environ["PYTHON"])
37 return parts
39 def passwd_encode(pw):
40 return base64.b64encode(('"%s"' % pw).encode('utf-16-le')).decode('utf8')
43 class RodcRwdcTestException(Exception):
44 pass
47 def make_creds(username, password, kerberos_state=None):
48 # use the global CREDS as a template
49 c = Credentials()
50 c.set_username(username)
51 c.set_password(password)
52 c.set_domain(CREDS.get_domain())
53 c.set_realm(CREDS.get_realm())
54 c.set_workstation(CREDS.get_workstation())
56 if kerberos_state is None:
57 kerberos_state = CREDS.get_kerberos_state()
58 c.set_kerberos_state(kerberos_state)
60 print('-' * 73)
61 if kerberos_state == MUST_USE_KERBEROS:
62 print("we seem to be using kerberos for %s %s" % (username, password))
63 elif kerberos_state == DONT_USE_KERBEROS:
64 print("NOT using kerberos for %s %s" % (username, password))
65 else:
66 print("kerberos state is %s" % kerberos_state)
68 c.set_gensec_features(c.get_gensec_features() |
69 gensec.FEATURE_SEAL)
70 return c
73 def set_auto_replication(dc, allow):
74 credstring = '-U%s%%%s' % (CREDS.get_username(),
75 CREDS.get_password())
77 on_or_off = '-' if allow else '+'
79 for opt in ['DISABLE_INBOUND_REPL',
80 'DISABLE_OUTBOUND_REPL']:
81 cmd = adjust_cmd_for_py_version(['bin/samba-tool',
82 'drs', 'options',
83 credstring, dc,
84 "--dsa-option=%s%s" % (on_or_off, opt)])
86 p = subprocess.Popen(cmd,
87 stderr=subprocess.PIPE,
88 stdout=subprocess.PIPE)
89 stdout, stderr = p.communicate()
90 if p.returncode:
91 if b'LDAP_REFERRAL' not in stderr:
92 raise RodcRwdcTestException()
93 print("ignoring +%s REFERRAL error; assuming %s is RODC" %
94 (opt, dc))
97 def preload_rodc_user(user_dn):
98 credstring = '-U%s%%%s' % (CREDS.get_username(),
99 CREDS.get_password())
101 set_auto_replication(RWDC, True)
102 cmd = adjust_cmd_for_py_version(['bin/samba-tool',
103 'rodc', 'preload',
104 user_dn,
105 credstring,
106 '--server', RWDC, ])
108 print(' '.join(cmd))
109 subprocess.check_call(cmd)
110 set_auto_replication(RWDC, False)
113 def get_server_ref_from_samdb(samdb):
114 server_name = samdb.get_serverName()
115 res = samdb.search(server_name,
116 scope=ldb.SCOPE_BASE,
117 attrs=['serverReference'])
119 return res[0]['serverReference'][0]
122 class RodcRwdcCachedTests(password_lockout_base.BasePasswordTestCase):
124 def _check_account_initial(self, dn):
125 self.force_replication()
126 return super(RodcRwdcCachedTests, self)._check_account_initial(dn)
128 def _check_account(self, dn,
129 badPwdCount=None,
130 badPasswordTime=None,
131 logonCount=None,
132 lastLogon=None,
133 lastLogonTimestamp=None,
134 lockoutTime=None,
135 userAccountControl=None,
136 msDSUserAccountControlComputed=None,
137 effective_bad_password_count=None,
138 msg=None,
139 badPwdCountOnly=False):
140 # Wait for the RWDC to get any delayed messages
141 # e.g. SendToSam or KRB5 bad passwords via winbindd
142 if (self.kerberos and isinstance(badPasswordTime, tuple) or
143 badPwdCount == 0):
144 time.sleep(5)
146 return super(RodcRwdcCachedTests,
147 self)._check_account(dn, badPwdCount, badPasswordTime,
148 logonCount, lastLogon,
149 lastLogonTimestamp, lockoutTime,
150 userAccountControl,
151 msDSUserAccountControlComputed,
152 effective_bad_password_count, msg,
153 True)
155 def force_replication(self, base=None):
156 if base is None:
157 base = self.base_dn
159 # XXX feels like a horrendous way to do it.
160 credstring = '-U%s%%%s' % (CREDS.get_username(),
161 CREDS.get_password())
162 cmd = adjust_cmd_for_py_version(['bin/samba-tool',
163 'drs', 'replicate',
164 RODC, RWDC, base,
165 credstring,
166 '--sync-forced'])
168 p = subprocess.Popen(cmd,
169 stderr=subprocess.PIPE,
170 stdout=subprocess.PIPE)
171 stdout, stderr = p.communicate()
172 if p.returncode:
173 print("failed with code %s" % p.returncode)
174 print(' '.join(cmd))
175 print("stdout")
176 print(stdout)
177 print("stderr")
178 print(stderr)
179 raise RodcRwdcTestException()
181 def _change_password(self, user_dn, old_password, new_password):
182 self.rwdc_db.modify_ldif(
183 "dn: %s\n"
184 "changetype: modify\n"
185 "delete: userPassword\n"
186 "userPassword: %s\n"
187 "add: userPassword\n"
188 "userPassword: %s\n" % (user_dn, old_password, new_password))
190 def tearDown(self):
191 super(RodcRwdcCachedTests, self).tearDown()
192 set_auto_replication(RWDC, True)
194 def setUp(self):
195 self.kerberos = False # To be set later
197 self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
198 session_info=system_session(LP), lp=LP)
200 self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
201 session_info=system_session(LP), lp=LP)
203 # Define variables for BasePasswordTestCase
204 self.lp = LP
205 self.global_creds = CREDS
206 self.host = RWDC
207 self.host_url = 'ldap://%s' % RWDC
208 self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
209 credentials=self.global_creds, lp=self.lp)
211 super(RodcRwdcCachedTests, self).setUp()
212 self.host_url = 'ldap://%s' % RODC
214 self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
215 self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
216 self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
218 self.base_dn = self.rwdc_db.domain_dn()
220 root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
221 attrs=['dsServiceName'])
222 self.service = root[0]['dsServiceName'][0]
223 self.tag = uuid.uuid4().hex
225 self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
226 self.rwdc_db.set_dsheuristics("000000001")
228 set_auto_replication(RWDC, False)
230 # make sure DCs are synchronized before the test
231 self.force_replication()
233 def delete_ldb_connections(self):
234 super(RodcRwdcCachedTests, self).delete_ldb_connections()
235 del self.rwdc_db
236 del self.rodc_db
238 def test_cache_and_flush_password(self):
239 username = self.lockout1krb5_creds.get_username()
240 userpass = self.lockout1krb5_creds.get_password()
241 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
243 ldb_system = SamDB(session_info=system_session(self.lp),
244 credentials=self.global_creds, lp=self.lp)
246 res = ldb_system.search(userdn, attrs=['unicodePwd'])
247 self.assertFalse('unicodePwd' in res[0])
249 preload_rodc_user(userdn)
251 res = ldb_system.search(userdn, attrs=['unicodePwd'])
252 self.assertTrue('unicodePwd' in res[0])
254 # force replication here to flush any pending preloads (this
255 # was a racy test).
256 self.force_replication()
258 newpass = userpass + '!'
260 # Forcing replication should blank out password (when changed)
261 self._change_password(userdn, userpass, newpass)
262 self.force_replication()
264 res = ldb_system.search(userdn, attrs=['unicodePwd'])
265 self.assertFalse('unicodePwd' in res[0])
267 def test_login_lockout_krb5(self):
268 username = self.lockout1krb5_creds.get_username()
269 userpass = self.lockout1krb5_creds.get_password()
270 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
272 preload_rodc_user(userdn)
274 self.kerberos = True
276 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
278 res = self.rodc_db.search(self.rodc_dn,
279 scope=ldb.SCOPE_BASE,
280 attrs=['msDS-RevealOnDemandGroup'])
282 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
284 m = ldb.Message()
285 m.dn = ldb.Dn(self.rwdc_db, group)
286 m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
287 self.rwdc_db.modify(m)
289 m = ldb.Message()
290 m.dn = ldb.Dn(self.ldb, self.base_dn)
292 self.account_lockout_duration = 15
293 account_lockout_duration_ticks = -int(self.account_lockout_duration * (1e7))
295 m["lockoutDuration"] = ldb.MessageElement(str(account_lockout_duration_ticks),
296 ldb.FLAG_MOD_REPLACE,
297 "lockoutDuration")
299 self.lockout_observation_window = 15
300 lockout_observation_window_ticks = -int(self.lockout_observation_window * (1e7))
302 m["lockOutObservationWindow"] = ldb.MessageElement(str(lockout_observation_window_ticks),
303 ldb.FLAG_MOD_REPLACE,
304 "lockOutObservationWindow")
306 self.rwdc_db.modify(m)
307 self.force_replication()
309 self._test_login_lockout_rodc_rwdc(self.lockout1krb5_creds, userdn)
311 def test_login_lockout_ntlm(self):
312 username = self.lockout1ntlm_creds.get_username()
313 userpass = self.lockout1ntlm_creds.get_password()
314 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
316 preload_rodc_user(userdn)
318 self.kerberos = False
320 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
322 res = self.rodc_db.search(self.rodc_dn,
323 scope=ldb.SCOPE_BASE,
324 attrs=['msDS-RevealOnDemandGroup'])
326 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
328 m = ldb.Message()
329 m.dn = ldb.Dn(self.rwdc_db, group)
330 m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
331 self.rwdc_db.modify(m)
333 self._test_login_lockout_rodc_rwdc(self.lockout1ntlm_creds, userdn)
335 def test_login_lockout_not_revealed(self):
336 '''Test that SendToSam is restricted by preloaded users/groups'''
338 username = self.lockout1ntlm_creds.get_username()
339 userpass = self.lockout1ntlm_creds.get_password()
340 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
342 # Preload but do not add to revealed group
343 preload_rodc_user(userdn)
345 self.kerberos = False
347 creds = self.lockout1ntlm_creds
349 # Open a second LDB connection with the user credentials. Use the
350 # command line credentials for information like the domain, the realm
351 # and the workstation.
352 creds_lockout = self.insta_creds(creds)
354 # The wrong password
355 creds_lockout.set_password("thatsAcomplPASS1x")
357 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
359 badPasswordTime = 0
360 logonCount = 0
361 lastLogon = 0
362 lastLogonTimestamp = 0
363 logoncount_relation = ''
364 lastlogon_relation = ''
366 res = self._check_account(userdn,
367 badPwdCount=1,
368 badPasswordTime=("greater", badPasswordTime),
369 logonCount=logonCount,
370 lastLogon=lastLogon,
371 lastLogonTimestamp=lastLogonTimestamp,
372 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
373 msDSUserAccountControlComputed=0,
374 msg='lastlogontimestamp with wrong password')
375 badPasswordTime = int(res[0]["badPasswordTime"][0])
377 # BadPwdCount on RODC increases alongside RWDC
378 res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
379 self.assertTrue('badPwdCount' in res[0])
380 self.assertEqual(int(res[0]['badPwdCount'][0]), 1)
382 # Correct old password
383 creds_lockout.set_password(userpass)
385 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
387 # Wait for potential SendToSam...
388 time.sleep(5)
390 # BadPwdCount on RODC decreases, but not the RWDC
391 res = self._check_account(userdn,
392 badPwdCount=1,
393 badPasswordTime=badPasswordTime,
394 logonCount=(logoncount_relation, logonCount),
395 lastLogon=('greater', lastLogon),
396 lastLogonTimestamp=lastLogonTimestamp,
397 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
398 msDSUserAccountControlComputed=0,
399 msg='badPwdCount not reset on RWDC')
401 res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
402 self.assertTrue('badPwdCount' in res[0])
403 self.assertEqual(int(res[0]['badPwdCount'][0]), 0)
405 def _test_login_lockout_rodc_rwdc(self, creds, userdn):
406 username = creds.get_username()
407 userpass = creds.get_password()
409 # Open a second LDB connection with the user credentials. Use the
410 # command line credentials for information like the domain, the realm
411 # and the workstation.
412 creds_lockout = self.insta_creds(creds)
414 # The wrong password
415 creds_lockout.set_password("thatsAcomplPASS1x")
417 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
419 badPasswordTime = 0
420 logonCount = 0
421 lastLogon = 0
422 lastLogonTimestamp = 0
423 logoncount_relation = ''
424 lastlogon_relation = ''
426 res = self._check_account(userdn,
427 badPwdCount=1,
428 badPasswordTime=("greater", badPasswordTime),
429 logonCount=logonCount,
430 lastLogon=lastLogon,
431 lastLogonTimestamp=lastLogonTimestamp,
432 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
433 msDSUserAccountControlComputed=0,
434 msg='lastlogontimestamp with wrong password')
435 badPasswordTime = int(res[0]["badPasswordTime"][0])
437 # Correct old password
438 creds_lockout.set_password(userpass)
440 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
442 # lastLogonTimestamp should not change
443 # lastLogon increases if badPwdCount is non-zero (!)
444 res = self._check_account(userdn,
445 badPwdCount=0,
446 badPasswordTime=badPasswordTime,
447 logonCount=(logoncount_relation, logonCount),
448 lastLogon=('greater', lastLogon),
449 lastLogonTimestamp=lastLogonTimestamp,
450 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
451 msDSUserAccountControlComputed=0,
452 msg='LLTimestamp is updated to lastlogon')
454 logonCount = int(res[0]["logonCount"][0])
455 lastLogon = int(res[0]["lastLogon"][0])
457 # The wrong password
458 creds_lockout.set_password("thatsAcomplPASS1x")
460 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
462 res = self._check_account(userdn,
463 badPwdCount=1,
464 badPasswordTime=("greater", badPasswordTime),
465 logonCount=logonCount,
466 lastLogon=lastLogon,
467 lastLogonTimestamp=lastLogonTimestamp,
468 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
469 msDSUserAccountControlComputed=0)
470 badPasswordTime = int(res[0]["badPasswordTime"][0])
472 # The wrong password
473 creds_lockout.set_password("thatsAcomplPASS1x")
475 try:
476 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
477 self.fail()
479 except LdbError as e1:
480 (num, msg) = e1.args
481 self.assertEqual(num, ERR_INVALID_CREDENTIALS)
483 res = self._check_account(userdn,
484 badPwdCount=2,
485 badPasswordTime=("greater", badPasswordTime),
486 logonCount=logonCount,
487 lastLogon=lastLogon,
488 lastLogonTimestamp=lastLogonTimestamp,
489 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
490 msDSUserAccountControlComputed=0)
491 badPasswordTime = int(res[0]["badPasswordTime"][0])
493 print("two failed password change")
495 # The wrong password
496 creds_lockout.set_password("thatsAcomplPASS1x")
498 try:
499 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
500 self.fail()
502 except LdbError as e2:
503 (num, msg) = e2.args
504 self.assertEqual(num, ERR_INVALID_CREDENTIALS)
506 res = self._check_account(userdn,
507 badPwdCount=3,
508 badPasswordTime=("greater", badPasswordTime),
509 logonCount=logonCount,
510 lastLogon=lastLogon,
511 lastLogonTimestamp=lastLogonTimestamp,
512 lockoutTime=("greater", badPasswordTime),
513 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
514 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
515 badPasswordTime = int(res[0]["badPasswordTime"][0])
516 lockoutTime = int(res[0]["lockoutTime"][0])
518 # The wrong password
519 creds_lockout.set_password("thatsAcomplPASS1x")
520 try:
521 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
522 self.fail()
523 except LdbError as e3:
524 (num, msg) = e3.args
525 self.assertEqual(num, ERR_INVALID_CREDENTIALS)
527 res = self._check_account(userdn,
528 badPwdCount=3,
529 badPasswordTime=badPasswordTime,
530 logonCount=logonCount,
531 lastLogon=lastLogon,
532 lastLogonTimestamp=lastLogonTimestamp,
533 lockoutTime=lockoutTime,
534 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
535 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
537 # The wrong password
538 creds_lockout.set_password("thatsAcomplPASS1x")
539 try:
540 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
541 self.fail()
542 except LdbError as e4:
543 (num, msg) = e4.args
544 self.assertEqual(num, ERR_INVALID_CREDENTIALS)
546 res = self._check_account(userdn,
547 badPwdCount=3,
548 badPasswordTime=badPasswordTime,
549 logonCount=logonCount,
550 lastLogon=lastLogon,
551 lastLogonTimestamp=lastLogonTimestamp,
552 lockoutTime=lockoutTime,
553 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
554 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
556 # The correct password, but we are locked out
557 creds_lockout.set_password(userpass)
558 try:
559 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
560 self.fail()
561 except LdbError as e5:
562 (num, msg) = e5.args
563 self.assertEqual(num, ERR_INVALID_CREDENTIALS)
565 res = self._check_account(userdn,
566 badPwdCount=3,
567 badPasswordTime=badPasswordTime,
568 logonCount=logonCount,
569 lastLogon=lastLogon,
570 lastLogonTimestamp=lastLogonTimestamp,
571 lockoutTime=lockoutTime,
572 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
573 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
575 # wait for the lockout to end
576 time.sleep(self.account_lockout_duration + 1)
577 print(self.account_lockout_duration + 1)
579 res = self._check_account(userdn,
580 badPwdCount=3, effective_bad_password_count=0,
581 badPasswordTime=badPasswordTime,
582 logonCount=logonCount,
583 lockoutTime=lockoutTime,
584 lastLogon=lastLogon,
585 lastLogonTimestamp=lastLogonTimestamp,
586 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
587 msDSUserAccountControlComputed=0)
589 # The correct password after letting the timeout expire
591 creds_lockout.set_password(userpass)
593 creds_lockout2 = self.insta_creds(creds_lockout)
595 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout2, lp=self.lp)
596 time.sleep(3)
598 res = self._check_account(userdn,
599 badPwdCount=0,
600 badPasswordTime=badPasswordTime,
601 logonCount=(logoncount_relation, logonCount),
602 lastLogon=(lastlogon_relation, lastLogon),
603 lastLogonTimestamp=lastLogonTimestamp,
604 lockoutTime=lockoutTime,
605 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
606 msDSUserAccountControlComputed=0,
607 msg="lastLogon is way off")
609 # The wrong password
610 creds_lockout.set_password("thatsAcomplPASS1x")
611 try:
612 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
613 self.fail()
614 except LdbError as e6:
615 (num, msg) = e6.args
616 self.assertEqual(num, ERR_INVALID_CREDENTIALS)
618 res = self._check_account(userdn,
619 badPwdCount=1,
620 badPasswordTime=("greater", badPasswordTime),
621 logonCount=logonCount,
622 lockoutTime=lockoutTime,
623 lastLogon=lastLogon,
624 lastLogonTimestamp=lastLogonTimestamp,
625 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
626 msDSUserAccountControlComputed=0)
627 badPasswordTime = int(res[0]["badPasswordTime"][0])
629 # The wrong password
630 creds_lockout.set_password("thatsAcomplPASS1x")
631 try:
632 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
633 self.fail()
634 except LdbError as e7:
635 (num, msg) = e7.args
636 self.assertEqual(num, ERR_INVALID_CREDENTIALS)
638 res = self._check_account(userdn,
639 badPwdCount=2,
640 badPasswordTime=("greater", badPasswordTime),
641 logonCount=logonCount,
642 lockoutTime=lockoutTime,
643 lastLogon=lastLogon,
644 lastLogonTimestamp=lastLogonTimestamp,
645 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
646 msDSUserAccountControlComputed=0)
647 badPasswordTime = int(res[0]["badPasswordTime"][0])
649 time.sleep(self.lockout_observation_window + 1)
651 res = self._check_account(userdn,
652 badPwdCount=2, effective_bad_password_count=0,
653 badPasswordTime=badPasswordTime,
654 logonCount=logonCount,
655 lockoutTime=lockoutTime,
656 lastLogon=lastLogon,
657 lastLogonTimestamp=lastLogonTimestamp,
658 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
659 msDSUserAccountControlComputed=0)
661 # The wrong password
662 creds_lockout.set_password("thatsAcomplPASS1x")
663 try:
664 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
665 self.fail()
666 except LdbError as e8:
667 (num, msg) = e8.args
668 self.assertEqual(num, ERR_INVALID_CREDENTIALS)
670 res = self._check_account(userdn,
671 badPwdCount=1,
672 badPasswordTime=("greater", badPasswordTime),
673 logonCount=logonCount,
674 lockoutTime=lockoutTime,
675 lastLogon=lastLogon,
676 lastLogonTimestamp=lastLogonTimestamp,
677 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
678 msDSUserAccountControlComputed=0)
679 badPasswordTime = int(res[0]["badPasswordTime"][0])
681 # The correct password without letting the timeout expire
682 creds_lockout.set_password(userpass)
683 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
685 res = self._check_account(userdn,
686 badPwdCount=0,
687 badPasswordTime=badPasswordTime,
688 logonCount=(logoncount_relation, logonCount),
689 lockoutTime=lockoutTime,
690 lastLogon=("greater", lastLogon),
691 lastLogonTimestamp=lastLogonTimestamp,
692 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
693 msDSUserAccountControlComputed=0)
696 class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
697 counter = itertools.count(1, 1)
699 def force_replication(self, base=None):
700 if base is None:
701 base = self.base_dn
703 # XXX feels like a horrendous way to do it.
704 credstring = '-U%s%%%s' % (CREDS.get_username(),
705 CREDS.get_password())
706 cmd = adjust_cmd_for_py_version(['bin/samba-tool',
707 'drs', 'replicate',
708 RODC, RWDC, base,
709 credstring,
710 '--sync-forced'])
712 p = subprocess.Popen(cmd,
713 stderr=subprocess.PIPE,
714 stdout=subprocess.PIPE)
715 stdout, stderr = p.communicate()
716 if p.returncode:
717 print("failed with code %s" % p.returncode)
718 print(' '.join(cmd))
719 print("stdout")
720 print(stdout)
721 print("stderr")
722 print(stderr)
723 raise RodcRwdcTestException()
725 def _check_account_initial(self, dn):
726 self.force_replication()
727 return super(RodcRwdcTests, self)._check_account_initial(dn)
729 def tearDown(self):
730 super(RodcRwdcTests, self).tearDown()
731 self.rwdc_db.set_dsheuristics(self.rwdc_dsheuristics)
732 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
733 set_auto_replication(RWDC, True)
735 def setUp(self):
736 self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
737 session_info=system_session(LP), lp=LP)
739 self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
740 session_info=system_session(LP), lp=LP)
742 # Define variables for BasePasswordTestCase
743 self.lp = LP
744 self.global_creds = CREDS
745 self.host = RWDC
746 self.host_url = 'ldap://%s' % RWDC
747 self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
748 credentials=self.global_creds, lp=self.lp)
750 super(RodcRwdcTests, self).setUp()
751 self.host = RODC
752 self.host_url = 'ldap://%s' % RODC
753 self.ldb = SamDB(url='ldap://%s' % RODC, session_info=system_session(self.lp),
754 credentials=self.global_creds, lp=self.lp)
756 self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
757 self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
758 self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
760 self.base_dn = self.rwdc_db.domain_dn()
762 root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
763 attrs=['dsServiceName'])
764 self.service = root[0]['dsServiceName'][0]
765 self.tag = uuid.uuid4().hex
767 self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
768 self.rwdc_db.set_dsheuristics("000000001")
770 set_auto_replication(RWDC, False)
772 # make sure DCs are synchronized before the test
773 self.force_replication()
774 self.rwdc_dn = get_server_ref_from_samdb(self.rwdc_db)
775 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
777 def delete_ldb_connections(self):
778 super(RodcRwdcTests, self).delete_ldb_connections()
779 del self.rwdc_db
780 del self.rodc_db
782 def assertReferral(self, fn, *args, **kwargs):
783 try:
784 fn(*args, **kwargs)
785 self.fail("failed to raise ldap referral")
786 except ldb.LdbError as e9:
787 (code, msg) = e9.args
788 self.assertEqual(code, ldb.ERR_REFERRAL,
789 "expected referral, got %s %s" % (code, msg))
791 def _test_rodc_dsheuristics(self):
792 d = self.rodc_db.get_dsheuristics()
793 self.assertReferral(self.rodc_db.set_dsheuristics, "000000001")
794 self.assertReferral(self.rodc_db.set_dsheuristics, d)
796 def TEST_rodc_heuristics_kerberos(self):
797 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
798 self._test_rodc_dsheuristics()
800 def TEST_rodc_heuristics_ntlm(self):
801 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
802 self._test_rodc_dsheuristics()
804 def _test_add(self, objects, cross_ncs=False):
805 for o in objects:
806 dn = o['dn']
807 if cross_ncs:
808 base = str(self.rwdc_db.get_config_basedn())
809 controls = ["search_options:1:2"]
810 cn = dn.split(',', 1)[0]
811 expression = '(%s)' % cn
812 else:
813 base = dn
814 controls = []
815 expression = None
817 try:
818 res = self.rodc_db.search(base,
819 expression=expression,
820 scope=ldb.SCOPE_SUBTREE,
821 attrs=['dn'],
822 controls=controls)
823 self.assertEqual(len(res), 0)
824 except ldb.LdbError as e:
825 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
826 raise
828 try:
829 self.rwdc_db.add(o)
830 except ldb.LdbError as e:
831 (ecode, emsg) = e.args
832 self.fail("Failed to add %s to rwdc: ldb error: %s %s" %
833 (o, ecode, emsg))
835 if cross_ncs:
836 self.force_replication(base=base)
837 else:
838 self.force_replication()
840 try:
841 res = self.rodc_db.search(base,
842 expression=expression,
843 scope=ldb.SCOPE_SUBTREE,
844 attrs=['dn'],
845 controls=controls)
846 self.assertEqual(len(res), 1)
847 except ldb.LdbError as e:
848 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
849 "replication seems to have failed")
851 def _test_add_replicated_objects(self, mode):
852 tag = "%s%s" % (self.tag, mode)
853 self._test_add([
855 'dn': "ou=%s1,%s" % (tag, self.base_dn),
856 "objectclass": "organizationalUnit"
859 'dn': "cn=%s2,%s" % (tag, self.base_dn),
860 "objectclass": "user"
863 'dn': "cn=%s3,%s" % (tag, self.base_dn),
864 "objectclass": "group"
867 self.rwdc_db.delete("ou=%s1,%s" % (tag, self.base_dn))
868 self.rwdc_db.delete("cn=%s2,%s" % (tag, self.base_dn))
869 self.rwdc_db.delete("cn=%s3,%s" % (tag, self.base_dn))
871 def test_add_replicated_objects_kerberos(self):
872 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
873 self._test_add_replicated_objects('kerberos')
875 def test_add_replicated_objects_ntlm(self):
876 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
877 self._test_add_replicated_objects('ntlm')
879 def _test_add_replicated_connections(self, mode):
880 tag = "%s%s" % (self.tag, mode)
881 self._test_add([
883 'dn': "cn=%sfoofoofoo,%s" % (tag, self.service),
884 "objectclass": "NTDSConnection",
885 'enabledConnection': 'TRUE',
886 'fromServer': self.base_dn,
887 'options': '0'
889 ], cross_ncs=True)
890 self.rwdc_db.delete("cn=%sfoofoofoo,%s" % (tag, self.service))
892 def test_add_replicated_connections_kerberos(self):
893 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
894 self._test_add_replicated_connections('kerberos')
896 def test_add_replicated_connections_ntlm(self):
897 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
898 self._test_add_replicated_connections('ntlm')
900 def _test_modify_replicated_attributes(self):
901 dn = 'CN=Guest,CN=Users,' + self.base_dn
902 value = self.tag
903 for attr in ['carLicense', 'middleName']:
904 m = ldb.Message()
905 m.dn = ldb.Dn(self.rwdc_db, dn)
906 m[attr] = ldb.MessageElement(value,
907 ldb.FLAG_MOD_REPLACE,
908 attr)
909 try:
910 self.rwdc_db.modify(m)
911 except ldb.LdbError as e:
912 self.fail("Failed to modify %s %s on RWDC %s with %s" %
913 (dn, attr, RWDC, e))
915 self.force_replication()
917 try:
918 res = self.rodc_db.search(dn,
919 scope=ldb.SCOPE_SUBTREE,
920 attrs=[attr])
921 results = [str(x[attr][0]) for x in res]
922 self.assertEqual(results, [value])
923 except ldb.LdbError as e:
924 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
925 "replication seems to have failed")
927 def test_modify_replicated_attributes_kerberos(self):
928 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
929 self._test_modify_replicated_attributes()
931 def test_modify_replicated_attributes_ntlm(self):
932 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
933 self._test_modify_replicated_attributes()
935 def _test_add_modify_delete(self):
936 dn = "cn=%s_add_modify,%s" % (self.tag, self.base_dn)
937 values = ["%s%s" % (i, self.tag) for i in range(3)]
938 attr = "carLicense"
939 self._test_add([
941 'dn': dn,
942 "objectclass": "user",
943 attr: values[0]
946 self.force_replication()
947 for value in values[1:]:
949 m = ldb.Message()
950 m.dn = ldb.Dn(self.rwdc_db, dn)
951 m[attr] = ldb.MessageElement(value,
952 ldb.FLAG_MOD_REPLACE,
953 attr)
954 try:
955 self.rwdc_db.modify(m)
956 except ldb.LdbError as e:
957 self.fail("Failed to modify %s %s on RWDC %s with %s" %
958 (dn, attr, RWDC, e))
960 self.force_replication()
962 try:
963 res = self.rodc_db.search(dn,
964 scope=ldb.SCOPE_SUBTREE,
965 attrs=[attr])
966 results = [str(x[attr][0]) for x in res]
967 self.assertEqual(results, [value])
968 except ldb.LdbError as e:
969 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
970 "replication seems to have failed")
972 self.rwdc_db.delete(dn)
973 self.force_replication()
974 try:
975 res = self.rodc_db.search(dn,
976 scope=ldb.SCOPE_SUBTREE,
977 attrs=[attr])
978 if len(res) > 0:
979 self.fail("Failed to delete %s" % (dn))
980 except ldb.LdbError as e:
981 self.assertEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
982 "Failed to delete %s" % (dn))
984 def test_add_modify_delete_kerberos(self):
985 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
986 self._test_add_modify_delete()
988 def test_add_modify_delete_ntlm(self):
989 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
990 self._test_add_modify_delete()
992 def _new_user(self):
993 username = "u%sX%s" % (self.tag[:12], next(self.counter))
994 password = 'password#1'
995 dn = 'CN=%s,CN=Users,%s' % (username, self.base_dn)
996 o = {
997 'dn': dn,
998 "objectclass": "user",
999 'sAMAccountName': username,
1001 try:
1002 self.rwdc_db.add(o)
1003 except ldb.LdbError as e:
1004 self.fail("Failed to add %s to rwdc: ldb error: %s" % (o, e))
1006 self.rwdc_db.modify_ldif("dn: %s\n"
1007 "changetype: modify\n"
1008 "delete: userPassword\n"
1009 "add: userPassword\n"
1010 "userPassword: %s\n" % (dn, password))
1011 self.rwdc_db.enable_account("(sAMAccountName=%s)" % username)
1012 return (dn, username, password)
1014 def _change_password(self, user_dn, old_password, new_password):
1015 self.rwdc_db.modify_ldif(
1016 "dn: %s\n"
1017 "changetype: modify\n"
1018 "delete: userPassword\n"
1019 "userPassword: %s\n"
1020 "add: userPassword\n"
1021 "userPassword: %s\n" % (user_dn, old_password, new_password))
1023 def try_ldap_logon(self, server, creds, errno=None):
1024 try:
1025 tmpdb = SamDB('ldap://%s' % server, credentials=creds,
1026 session_info=system_session(LP), lp=LP)
1027 if errno is not None:
1028 self.fail("logon failed to fail with ldb error %s" % errno)
1029 except ldb.LdbError as e10:
1030 (code, msg) = e10.args
1031 if code != errno:
1032 if errno is None:
1033 self.fail("logon incorrectly raised ldb error (code=%s)" %
1034 code)
1035 else:
1036 self.fail("logon failed to raise correct ldb error"
1037 "Expected: %s Got: %s" %
1038 (errno, code))
1040 def zero_min_password_age(self):
1041 min_pwd_age = int(self.rwdc_db.get_minPwdAge())
1042 if min_pwd_age != 0:
1043 self.rwdc_db.set_minPwdAge('0')
1045 def _test_ldap_change_password(self, errno=None):
1046 self.zero_min_password_age()
1048 dn, username, password = self._new_user()
1049 creds1 = make_creds(username, password)
1051 # With NTLM, this should fail on RODC before replication,
1052 # because the user isn't known.
1053 self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1054 self.force_replication()
1056 # Now the user is replicated to RODC, so logon should work
1057 self.try_ldap_logon(RODC, creds1)
1059 passwords = ['password#%s' % i for i in range(1, 6)]
1060 for prev, password in zip(passwords[:-1], passwords[1:]):
1061 self._change_password(dn, prev, password)
1063 # The password has changed enough times to make the old
1064 # password invalid (though with kerberos that doesn't matter).
1065 # For NTLM, the old creds should always fail
1066 self.try_ldap_logon(RODC, creds1, errno)
1067 self.try_ldap_logon(RWDC, creds1, errno)
1069 creds2 = make_creds(username, password)
1071 # new creds work straight away with NTLM, because although it
1072 # doesn't have the password, it knows the user and forwards
1073 # the query.
1074 self.try_ldap_logon(RODC, creds2)
1075 self.try_ldap_logon(RWDC, creds2)
1077 self.force_replication()
1079 # After another replication check RODC still works and fails,
1080 # as appropriate to various creds
1081 self.try_ldap_logon(RODC, creds2)
1082 self.try_ldap_logon(RODC, creds1, errno)
1084 prev = password
1085 password = 'password#6'
1086 self._change_password(dn, prev, password)
1087 creds3 = make_creds(username, password)
1089 # previous password should still work.
1090 self.try_ldap_logon(RWDC, creds2)
1091 self.try_ldap_logon(RODC, creds2)
1093 # new password should still work.
1094 self.try_ldap_logon(RWDC, creds3)
1095 self.try_ldap_logon(RODC, creds3)
1097 # old password should still fail (but not on kerberos).
1098 self.try_ldap_logon(RWDC, creds1, errno)
1099 self.try_ldap_logon(RODC, creds1, errno)
1101 def test_ldap_change_password_kerberos(self):
1102 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1103 self._test_ldap_change_password()
1105 def test_ldap_change_password_ntlm(self):
1106 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1107 self._test_ldap_change_password(ldb.ERR_INVALID_CREDENTIALS)
1109 def _test_ldap_change_password_reveal_on_demand(self, errno=None):
1110 self.zero_min_password_age()
1112 res = self.rodc_db.search(self.rodc_dn,
1113 scope=ldb.SCOPE_BASE,
1114 attrs=['msDS-RevealOnDemandGroup'])
1116 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
1118 user_dn, username, password = self._new_user()
1119 creds1 = make_creds(username, password)
1121 m = ldb.Message()
1122 m.dn = ldb.Dn(self.rwdc_db, group)
1123 m['member'] = ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD, 'member')
1124 self.rwdc_db.modify(m)
1126 # Against Windows, this will just forward if no account exists on the KDC
1127 # Therefore, this does not error on Windows.
1128 self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1130 self.force_replication()
1132 # The proxy case
1133 self.try_ldap_logon(RODC, creds1)
1134 preload_rodc_user(user_dn)
1136 # Now the user AND password are replicated to RODC, so logon should work (not proxy case)
1137 self.try_ldap_logon(RODC, creds1)
1139 passwords = ['password#%s' % i for i in range(1, 6)]
1140 for prev, password in zip(passwords[:-1], passwords[1:]):
1141 self._change_password(user_dn, prev, password)
1143 # The password has changed enough times to make the old
1144 # password invalid, but the RODC shouldn't know that.
1145 self.try_ldap_logon(RODC, creds1)
1146 self.try_ldap_logon(RWDC, creds1, errno)
1148 creds2 = make_creds(username, password)
1149 self.try_ldap_logon(RWDC, creds2)
1150 # We can forward WRONG_PASSWORD over NTLM.
1151 # This SHOULD succeed.
1152 self.try_ldap_logon(RODC, creds2)
1154 def test_change_password_reveal_on_demand_ntlm(self):
1155 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1156 self._test_ldap_change_password_reveal_on_demand(ldb.ERR_INVALID_CREDENTIALS)
1158 def test_change_password_reveal_on_demand_kerberos(self):
1159 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1160 self._test_ldap_change_password_reveal_on_demand()
1162 def test_login_lockout_krb5(self):
1163 username = self.lockout1krb5_creds.get_username()
1164 userpass = self.lockout1krb5_creds.get_password()
1165 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1167 preload_rodc_user(userdn)
1169 use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1170 fail_creds = self.insta_creds(self.template_creds,
1171 username=username,
1172 userpass=userpass + "X",
1173 kerberos_state=use_kerberos)
1175 try:
1176 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1177 self.fail()
1178 except LdbError as e11:
1179 (num, msg) = e11.args
1180 self.assertEqual(num, ERR_INVALID_CREDENTIALS)
1182 # Succeed to reset everything to 0
1183 success_creds = self.insta_creds(self.template_creds,
1184 username=username,
1185 userpass=userpass,
1186 kerberos_state=use_kerberos)
1188 ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1190 self._test_login_lockout(self.lockout1krb5_creds)
1192 def test_login_lockout_ntlm(self):
1193 username = self.lockout1ntlm_creds.get_username()
1194 userpass = self.lockout1ntlm_creds.get_password()
1195 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1197 preload_rodc_user(userdn)
1199 use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1200 fail_creds = self.insta_creds(self.template_creds,
1201 username=username,
1202 userpass=userpass + "X",
1203 kerberos_state=use_kerberos)
1205 try:
1206 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1207 self.fail()
1208 except LdbError as e12:
1209 (num, msg) = e12.args
1210 self.assertEqual(num, ERR_INVALID_CREDENTIALS)
1212 # Succeed to reset everything to 0
1213 ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1215 self._test_login_lockout(self.lockout1ntlm_creds)
1217 def test_multiple_logon_krb5(self):
1218 username = self.lockout1krb5_creds.get_username()
1219 userpass = self.lockout1krb5_creds.get_password()
1220 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1222 preload_rodc_user(userdn)
1224 use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1225 fail_creds = self.insta_creds(self.template_creds,
1226 username=username,
1227 userpass=userpass + "X",
1228 kerberos_state=use_kerberos)
1230 try:
1231 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1232 self.fail()
1233 except LdbError as e13:
1234 (num, msg) = e13.args
1235 self.assertEqual(num, ERR_INVALID_CREDENTIALS)
1237 # Succeed to reset everything to 0
1238 success_creds = self.insta_creds(self.template_creds,
1239 username=username,
1240 userpass=userpass,
1241 kerberos_state=use_kerberos)
1243 ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1245 self._test_multiple_logon(self.lockout1krb5_creds)
1247 def test_multiple_logon_ntlm(self):
1248 username = self.lockout1ntlm_creds.get_username()
1249 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1250 userpass = self.lockout1ntlm_creds.get_password()
1252 preload_rodc_user(userdn)
1254 use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1255 fail_creds = self.insta_creds(self.template_creds,
1256 username=username,
1257 userpass=userpass + "X",
1258 kerberos_state=use_kerberos)
1260 try:
1261 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1262 self.fail()
1263 except LdbError as e14:
1264 (num, msg) = e14.args
1265 self.assertEqual(num, ERR_INVALID_CREDENTIALS)
1267 # Succeed to reset everything to 0
1268 ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1270 self._test_multiple_logon(self.lockout1ntlm_creds)
1273 def main():
1274 global RODC, RWDC, CREDS, LP
1275 parser = optparse.OptionParser(
1276 "rodc_rwdc.py [options] <rodc host> <rwdc host>")
1278 sambaopts = options.SambaOptions(parser)
1279 versionopts = options.VersionOptions(parser)
1280 credopts = options.CredentialsOptions(parser)
1281 subunitopts = SubunitOptions(parser)
1283 parser.add_option_group(sambaopts)
1284 parser.add_option_group(versionopts)
1285 parser.add_option_group(credopts)
1286 parser.add_option_group(subunitopts)
1288 opts, args = parser.parse_args()
1290 LP = sambaopts.get_loadparm()
1291 CREDS = credopts.get_credentials(LP)
1292 CREDS.set_gensec_features(CREDS.get_gensec_features() |
1293 gensec.FEATURE_SEAL)
1295 try:
1296 RODC, RWDC = args
1297 except ValueError:
1298 parser.print_usage()
1299 sys.exit(1)
1301 set_auto_replication(RWDC, True)
1302 try:
1303 TestProgram(module=__name__, opts=subunitopts)
1304 finally:
1305 set_auto_replication(RWDC, True)
1308 main()