2 # -*- coding: utf-8 -*-
3 from __future__
import print_function
4 """Test communication of credentials etc, between an RODC and a RWDC.
6 How does it work when the password is changed on the RWDC?
17 sys
.path
.insert(0, "bin/python")
21 from samba
.tests
.subunitrun
import SubunitOptions
, TestProgram
22 import samba
.getopt
as options
24 from samba
.auth
import system_session
25 from samba
.samdb
import SamDB
26 from samba
.credentials
import Credentials
, DONT_USE_KERBEROS
, MUST_USE_KERBEROS
27 from samba
import gensec
, dsdb
28 from ldb
import SCOPE_BASE
, LdbError
, ERR_INVALID_CREDENTIALS
29 from samba
.dcerpc
import security
, samr
32 import password_lockout_base
34 def adjust_cmd_for_py_version(parts
):
35 if os
.getenv("PYTHON", None):
36 parts
.insert(0, os
.environ
["PYTHON"])
39 def passwd_encode(pw
):
40 return base64
.b64encode(('"%s"' % pw
).encode('utf-16-le')).decode('utf8')
43 class RodcRwdcTestException(Exception):
47 def make_creds(username
, password
, kerberos_state
=None):
48 # use the global CREDS as a template
50 c
.set_username(username
)
51 c
.set_password(password
)
52 c
.set_domain(CREDS
.get_domain())
53 c
.set_realm(CREDS
.get_realm())
54 c
.set_workstation(CREDS
.get_workstation())
56 if kerberos_state
is None:
57 kerberos_state
= CREDS
.get_kerberos_state()
58 c
.set_kerberos_state(kerberos_state
)
61 if kerberos_state
== MUST_USE_KERBEROS
:
62 print("we seem to be using kerberos for %s %s" % (username
, password
))
63 elif kerberos_state
== DONT_USE_KERBEROS
:
64 print("NOT using kerberos for %s %s" % (username
, password
))
66 print("kerberos state is %s" % kerberos_state
)
68 c
.set_gensec_features(c
.get_gensec_features() |
73 def set_auto_replication(dc
, allow
):
74 credstring
= '-U%s%%%s' % (CREDS
.get_username(),
77 on_or_off
= '-' if allow
else '+'
79 for opt
in ['DISABLE_INBOUND_REPL',
80 'DISABLE_OUTBOUND_REPL']:
81 cmd
= adjust_cmd_for_py_version(['bin/samba-tool',
84 "--dsa-option=%s%s" % (on_or_off
, opt
)])
86 p
= subprocess
.Popen(cmd
,
87 stderr
=subprocess
.PIPE
,
88 stdout
=subprocess
.PIPE
)
89 stdout
, stderr
= p
.communicate()
91 if b
'LDAP_REFERRAL' not in stderr
:
92 raise RodcRwdcTestException()
93 print("ignoring +%s REFERRAL error; assuming %s is RODC" %
97 def preload_rodc_user(user_dn
):
98 credstring
= '-U%s%%%s' % (CREDS
.get_username(),
101 set_auto_replication(RWDC
, True)
102 cmd
= adjust_cmd_for_py_version(['bin/samba-tool',
109 subprocess
.check_call(cmd
)
110 set_auto_replication(RWDC
, False)
113 def get_server_ref_from_samdb(samdb
):
114 server_name
= samdb
.get_serverName()
115 res
= samdb
.search(server_name
,
116 scope
=ldb
.SCOPE_BASE
,
117 attrs
=['serverReference'])
119 return res
[0]['serverReference'][0]
122 class RodcRwdcCachedTests(password_lockout_base
.BasePasswordTestCase
):
124 def _check_account_initial(self
, dn
):
125 self
.force_replication()
126 return super(RodcRwdcCachedTests
, self
)._check
_account
_initial
(dn
)
128 def _check_account(self
, dn
,
130 badPasswordTime
=None,
133 lastLogonTimestamp
=None,
135 userAccountControl
=None,
136 msDSUserAccountControlComputed
=None,
137 effective_bad_password_count
=None,
139 badPwdCountOnly
=False):
140 # Wait for the RWDC to get any delayed messages
141 # e.g. SendToSam or KRB5 bad passwords via winbindd
142 if (self
.kerberos
and isinstance(badPasswordTime
, tuple) or
146 return super(RodcRwdcCachedTests
,
147 self
)._check
_account
(dn
, badPwdCount
, badPasswordTime
,
148 logonCount
, lastLogon
,
149 lastLogonTimestamp
, lockoutTime
,
151 msDSUserAccountControlComputed
,
152 effective_bad_password_count
, msg
,
155 def force_replication(self
, base
=None):
159 # XXX feels like a horrendous way to do it.
160 credstring
= '-U%s%%%s' % (CREDS
.get_username(),
161 CREDS
.get_password())
162 cmd
= adjust_cmd_for_py_version(['bin/samba-tool',
168 p
= subprocess
.Popen(cmd
,
169 stderr
=subprocess
.PIPE
,
170 stdout
=subprocess
.PIPE
)
171 stdout
, stderr
= p
.communicate()
173 print("failed with code %s" % p
.returncode
)
179 raise RodcRwdcTestException()
181 def _change_password(self
, user_dn
, old_password
, new_password
):
182 self
.rwdc_db
.modify_ldif(
184 "changetype: modify\n"
185 "delete: userPassword\n"
187 "add: userPassword\n"
188 "userPassword: %s\n" % (user_dn
, old_password
, new_password
))
191 super(RodcRwdcCachedTests
, self
).tearDown()
192 set_auto_replication(RWDC
, True)
195 self
.kerberos
= False # To be set later
197 self
.rodc_db
= SamDB('ldap://%s' % RODC
, credentials
=CREDS
,
198 session_info
=system_session(LP
), lp
=LP
)
200 self
.rwdc_db
= SamDB('ldap://%s' % RWDC
, credentials
=CREDS
,
201 session_info
=system_session(LP
), lp
=LP
)
203 # Define variables for BasePasswordTestCase
205 self
.global_creds
= CREDS
207 self
.host_url
= 'ldap://%s' % RWDC
208 self
.ldb
= SamDB(url
='ldap://%s' % RWDC
, session_info
=system_session(self
.lp
),
209 credentials
=self
.global_creds
, lp
=self
.lp
)
211 super(RodcRwdcCachedTests
, self
).setUp()
212 self
.host_url
= 'ldap://%s' % RODC
214 self
.samr
= samr
.samr("ncacn_ip_tcp:%s[seal]" % self
.host
, self
.lp
, self
.global_creds
)
215 self
.samr_handle
= self
.samr
.Connect2(None, security
.SEC_FLAG_MAXIMUM_ALLOWED
)
216 self
.samr_domain
= self
.samr
.OpenDomain(self
.samr_handle
, security
.SEC_FLAG_MAXIMUM_ALLOWED
, self
.domain_sid
)
218 self
.base_dn
= self
.rwdc_db
.domain_dn()
220 root
= self
.rodc_db
.search(base
='', scope
=ldb
.SCOPE_BASE
,
221 attrs
=['dsServiceName'])
222 self
.service
= root
[0]['dsServiceName'][0]
223 self
.tag
= uuid
.uuid4().hex
225 self
.rwdc_dsheuristics
= self
.rwdc_db
.get_dsheuristics()
226 self
.rwdc_db
.set_dsheuristics("000000001")
228 set_auto_replication(RWDC
, False)
230 # make sure DCs are synchronized before the test
231 self
.force_replication()
233 def delete_ldb_connections(self
):
234 super(RodcRwdcCachedTests
, self
).delete_ldb_connections()
238 def test_cache_and_flush_password(self
):
239 username
= self
.lockout1krb5_creds
.get_username()
240 userpass
= self
.lockout1krb5_creds
.get_password()
241 userdn
= "cn=%s,cn=users,%s" % (username
, self
.base_dn
)
243 ldb_system
= SamDB(session_info
=system_session(self
.lp
),
244 credentials
=self
.global_creds
, lp
=self
.lp
)
246 res
= ldb_system
.search(userdn
, attrs
=['unicodePwd'])
247 self
.assertFalse('unicodePwd' in res
[0])
249 preload_rodc_user(userdn
)
251 res
= ldb_system
.search(userdn
, attrs
=['unicodePwd'])
252 self
.assertTrue('unicodePwd' in res
[0])
254 # force replication here to flush any pending preloads (this
256 self
.force_replication()
258 newpass
= userpass
+ '!'
260 # Forcing replication should blank out password (when changed)
261 self
._change
_password
(userdn
, userpass
, newpass
)
262 self
.force_replication()
264 res
= ldb_system
.search(userdn
, attrs
=['unicodePwd'])
265 self
.assertFalse('unicodePwd' in res
[0])
267 def test_login_lockout_krb5(self
):
268 username
= self
.lockout1krb5_creds
.get_username()
269 userpass
= self
.lockout1krb5_creds
.get_password()
270 userdn
= "cn=%s,cn=users,%s" % (username
, self
.base_dn
)
272 preload_rodc_user(userdn
)
276 self
.rodc_dn
= get_server_ref_from_samdb(self
.rodc_db
)
278 res
= self
.rodc_db
.search(self
.rodc_dn
,
279 scope
=ldb
.SCOPE_BASE
,
280 attrs
=['msDS-RevealOnDemandGroup'])
282 group
= res
[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
285 m
.dn
= ldb
.Dn(self
.rwdc_db
, group
)
286 m
['member'] = ldb
.MessageElement(userdn
, ldb
.FLAG_MOD_ADD
, 'member')
287 self
.rwdc_db
.modify(m
)
290 m
.dn
= ldb
.Dn(self
.ldb
, self
.base_dn
)
292 self
.account_lockout_duration
= 15
293 account_lockout_duration_ticks
= -int(self
.account_lockout_duration
* (1e7
))
295 m
["lockoutDuration"] = ldb
.MessageElement(str(account_lockout_duration_ticks
),
296 ldb
.FLAG_MOD_REPLACE
,
299 self
.lockout_observation_window
= 15
300 lockout_observation_window_ticks
= -int(self
.lockout_observation_window
* (1e7
))
302 m
["lockOutObservationWindow"] = ldb
.MessageElement(str(lockout_observation_window_ticks
),
303 ldb
.FLAG_MOD_REPLACE
,
304 "lockOutObservationWindow")
306 self
.rwdc_db
.modify(m
)
307 self
.force_replication()
309 self
._test
_login
_lockout
_rodc
_rwdc
(self
.lockout1krb5_creds
, userdn
)
311 def test_login_lockout_ntlm(self
):
312 username
= self
.lockout1ntlm_creds
.get_username()
313 userpass
= self
.lockout1ntlm_creds
.get_password()
314 userdn
= "cn=%s,cn=users,%s" % (username
, self
.base_dn
)
316 preload_rodc_user(userdn
)
318 self
.kerberos
= False
320 self
.rodc_dn
= get_server_ref_from_samdb(self
.rodc_db
)
322 res
= self
.rodc_db
.search(self
.rodc_dn
,
323 scope
=ldb
.SCOPE_BASE
,
324 attrs
=['msDS-RevealOnDemandGroup'])
326 group
= res
[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
329 m
.dn
= ldb
.Dn(self
.rwdc_db
, group
)
330 m
['member'] = ldb
.MessageElement(userdn
, ldb
.FLAG_MOD_ADD
, 'member')
331 self
.rwdc_db
.modify(m
)
333 self
._test
_login
_lockout
_rodc
_rwdc
(self
.lockout1ntlm_creds
, userdn
)
335 def test_login_lockout_not_revealed(self
):
336 '''Test that SendToSam is restricted by preloaded users/groups'''
338 username
= self
.lockout1ntlm_creds
.get_username()
339 userpass
= self
.lockout1ntlm_creds
.get_password()
340 userdn
= "cn=%s,cn=users,%s" % (username
, self
.base_dn
)
342 # Preload but do not add to revealed group
343 preload_rodc_user(userdn
)
345 self
.kerberos
= False
347 creds
= self
.lockout1ntlm_creds
349 # Open a second LDB connection with the user credentials. Use the
350 # command line credentials for information like the domain, the realm
351 # and the workstation.
352 creds_lockout
= self
.insta_creds(creds
)
355 creds_lockout
.set_password("thatsAcomplPASS1x")
357 self
.assertLoginFailure(self
.host_url
, creds_lockout
, self
.lp
)
362 lastLogonTimestamp
= 0
363 logoncount_relation
= ''
364 lastlogon_relation
= ''
366 res
= self
._check
_account
(userdn
,
368 badPasswordTime
=("greater", badPasswordTime
),
369 logonCount
=logonCount
,
371 lastLogonTimestamp
=lastLogonTimestamp
,
372 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
373 msDSUserAccountControlComputed
=0,
374 msg
='lastlogontimestamp with wrong password')
375 badPasswordTime
= int(res
[0]["badPasswordTime"][0])
377 # BadPwdCount on RODC increases alongside RWDC
378 res
= self
.rodc_db
.search(userdn
, attrs
=['badPwdCount'])
379 self
.assertTrue('badPwdCount' in res
[0])
380 self
.assertEqual(int(res
[0]['badPwdCount'][0]), 1)
382 # Correct old password
383 creds_lockout
.set_password(userpass
)
385 ldb_lockout
= SamDB(url
=self
.host_url
, credentials
=creds_lockout
, lp
=self
.lp
)
387 # Wait for potential SendToSam...
390 # BadPwdCount on RODC decreases, but not the RWDC
391 res
= self
._check
_account
(userdn
,
393 badPasswordTime
=badPasswordTime
,
394 logonCount
=(logoncount_relation
, logonCount
),
395 lastLogon
=('greater', lastLogon
),
396 lastLogonTimestamp
=lastLogonTimestamp
,
397 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
398 msDSUserAccountControlComputed
=0,
399 msg
='badPwdCount not reset on RWDC')
401 res
= self
.rodc_db
.search(userdn
, attrs
=['badPwdCount'])
402 self
.assertTrue('badPwdCount' in res
[0])
403 self
.assertEqual(int(res
[0]['badPwdCount'][0]), 0)
405 def _test_login_lockout_rodc_rwdc(self
, creds
, userdn
):
406 username
= creds
.get_username()
407 userpass
= creds
.get_password()
409 # Open a second LDB connection with the user credentials. Use the
410 # command line credentials for information like the domain, the realm
411 # and the workstation.
412 creds_lockout
= self
.insta_creds(creds
)
415 creds_lockout
.set_password("thatsAcomplPASS1x")
417 self
.assertLoginFailure(self
.host_url
, creds_lockout
, self
.lp
)
422 lastLogonTimestamp
= 0
423 logoncount_relation
= ''
424 lastlogon_relation
= ''
426 res
= self
._check
_account
(userdn
,
428 badPasswordTime
=("greater", badPasswordTime
),
429 logonCount
=logonCount
,
431 lastLogonTimestamp
=lastLogonTimestamp
,
432 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
433 msDSUserAccountControlComputed
=0,
434 msg
='lastlogontimestamp with wrong password')
435 badPasswordTime
= int(res
[0]["badPasswordTime"][0])
437 # Correct old password
438 creds_lockout
.set_password(userpass
)
440 ldb_lockout
= SamDB(url
=self
.host_url
, credentials
=creds_lockout
, lp
=self
.lp
)
442 # lastLogonTimestamp should not change
443 # lastLogon increases if badPwdCount is non-zero (!)
444 res
= self
._check
_account
(userdn
,
446 badPasswordTime
=badPasswordTime
,
447 logonCount
=(logoncount_relation
, logonCount
),
448 lastLogon
=('greater', lastLogon
),
449 lastLogonTimestamp
=lastLogonTimestamp
,
450 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
451 msDSUserAccountControlComputed
=0,
452 msg
='LLTimestamp is updated to lastlogon')
454 logonCount
= int(res
[0]["logonCount"][0])
455 lastLogon
= int(res
[0]["lastLogon"][0])
458 creds_lockout
.set_password("thatsAcomplPASS1x")
460 self
.assertLoginFailure(self
.host_url
, creds_lockout
, self
.lp
)
462 res
= self
._check
_account
(userdn
,
464 badPasswordTime
=("greater", badPasswordTime
),
465 logonCount
=logonCount
,
467 lastLogonTimestamp
=lastLogonTimestamp
,
468 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
469 msDSUserAccountControlComputed
=0)
470 badPasswordTime
= int(res
[0]["badPasswordTime"][0])
473 creds_lockout
.set_password("thatsAcomplPASS1x")
476 ldb_lockout
= SamDB(url
=self
.host_url
, credentials
=creds_lockout
, lp
=self
.lp
)
479 except LdbError
as e1
:
481 self
.assertEqual(num
, ERR_INVALID_CREDENTIALS
)
483 res
= self
._check
_account
(userdn
,
485 badPasswordTime
=("greater", badPasswordTime
),
486 logonCount
=logonCount
,
488 lastLogonTimestamp
=lastLogonTimestamp
,
489 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
490 msDSUserAccountControlComputed
=0)
491 badPasswordTime
= int(res
[0]["badPasswordTime"][0])
493 print("two failed password change")
496 creds_lockout
.set_password("thatsAcomplPASS1x")
499 ldb_lockout
= SamDB(url
=self
.host_url
, credentials
=creds_lockout
, lp
=self
.lp
)
502 except LdbError
as e2
:
504 self
.assertEqual(num
, ERR_INVALID_CREDENTIALS
)
506 res
= self
._check
_account
(userdn
,
508 badPasswordTime
=("greater", badPasswordTime
),
509 logonCount
=logonCount
,
511 lastLogonTimestamp
=lastLogonTimestamp
,
512 lockoutTime
=("greater", badPasswordTime
),
513 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
514 msDSUserAccountControlComputed
=dsdb
.UF_LOCKOUT
)
515 badPasswordTime
= int(res
[0]["badPasswordTime"][0])
516 lockoutTime
= int(res
[0]["lockoutTime"][0])
519 creds_lockout
.set_password("thatsAcomplPASS1x")
521 ldb_lockout
= SamDB(url
=self
.host_url
, credentials
=creds_lockout
, lp
=self
.lp
)
523 except LdbError
as e3
:
525 self
.assertEqual(num
, ERR_INVALID_CREDENTIALS
)
527 res
= self
._check
_account
(userdn
,
529 badPasswordTime
=badPasswordTime
,
530 logonCount
=logonCount
,
532 lastLogonTimestamp
=lastLogonTimestamp
,
533 lockoutTime
=lockoutTime
,
534 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
535 msDSUserAccountControlComputed
=dsdb
.UF_LOCKOUT
)
538 creds_lockout
.set_password("thatsAcomplPASS1x")
540 ldb_lockout
= SamDB(url
=self
.host_url
, credentials
=creds_lockout
, lp
=self
.lp
)
542 except LdbError
as e4
:
544 self
.assertEqual(num
, ERR_INVALID_CREDENTIALS
)
546 res
= self
._check
_account
(userdn
,
548 badPasswordTime
=badPasswordTime
,
549 logonCount
=logonCount
,
551 lastLogonTimestamp
=lastLogonTimestamp
,
552 lockoutTime
=lockoutTime
,
553 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
554 msDSUserAccountControlComputed
=dsdb
.UF_LOCKOUT
)
556 # The correct password, but we are locked out
557 creds_lockout
.set_password(userpass
)
559 ldb_lockout
= SamDB(url
=self
.host_url
, credentials
=creds_lockout
, lp
=self
.lp
)
561 except LdbError
as e5
:
563 self
.assertEqual(num
, ERR_INVALID_CREDENTIALS
)
565 res
= self
._check
_account
(userdn
,
567 badPasswordTime
=badPasswordTime
,
568 logonCount
=logonCount
,
570 lastLogonTimestamp
=lastLogonTimestamp
,
571 lockoutTime
=lockoutTime
,
572 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
573 msDSUserAccountControlComputed
=dsdb
.UF_LOCKOUT
)
575 # wait for the lockout to end
576 time
.sleep(self
.account_lockout_duration
+ 1)
577 print(self
.account_lockout_duration
+ 1)
579 res
= self
._check
_account
(userdn
,
580 badPwdCount
=3, effective_bad_password_count
=0,
581 badPasswordTime
=badPasswordTime
,
582 logonCount
=logonCount
,
583 lockoutTime
=lockoutTime
,
585 lastLogonTimestamp
=lastLogonTimestamp
,
586 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
587 msDSUserAccountControlComputed
=0)
589 # The correct password after letting the timeout expire
591 creds_lockout
.set_password(userpass
)
593 creds_lockout2
= self
.insta_creds(creds_lockout
)
595 ldb_lockout
= SamDB(url
=self
.host_url
, credentials
=creds_lockout2
, lp
=self
.lp
)
598 res
= self
._check
_account
(userdn
,
600 badPasswordTime
=badPasswordTime
,
601 logonCount
=(logoncount_relation
, logonCount
),
602 lastLogon
=(lastlogon_relation
, lastLogon
),
603 lastLogonTimestamp
=lastLogonTimestamp
,
604 lockoutTime
=lockoutTime
,
605 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
606 msDSUserAccountControlComputed
=0,
607 msg
="lastLogon is way off")
610 creds_lockout
.set_password("thatsAcomplPASS1x")
612 ldb_lockout
= SamDB(url
=self
.host_url
, credentials
=creds_lockout
, lp
=self
.lp
)
614 except LdbError
as e6
:
616 self
.assertEqual(num
, ERR_INVALID_CREDENTIALS
)
618 res
= self
._check
_account
(userdn
,
620 badPasswordTime
=("greater", badPasswordTime
),
621 logonCount
=logonCount
,
622 lockoutTime
=lockoutTime
,
624 lastLogonTimestamp
=lastLogonTimestamp
,
625 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
626 msDSUserAccountControlComputed
=0)
627 badPasswordTime
= int(res
[0]["badPasswordTime"][0])
630 creds_lockout
.set_password("thatsAcomplPASS1x")
632 ldb_lockout
= SamDB(url
=self
.host_url
, credentials
=creds_lockout
, lp
=self
.lp
)
634 except LdbError
as e7
:
636 self
.assertEqual(num
, ERR_INVALID_CREDENTIALS
)
638 res
= self
._check
_account
(userdn
,
640 badPasswordTime
=("greater", badPasswordTime
),
641 logonCount
=logonCount
,
642 lockoutTime
=lockoutTime
,
644 lastLogonTimestamp
=lastLogonTimestamp
,
645 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
646 msDSUserAccountControlComputed
=0)
647 badPasswordTime
= int(res
[0]["badPasswordTime"][0])
649 time
.sleep(self
.lockout_observation_window
+ 1)
651 res
= self
._check
_account
(userdn
,
652 badPwdCount
=2, effective_bad_password_count
=0,
653 badPasswordTime
=badPasswordTime
,
654 logonCount
=logonCount
,
655 lockoutTime
=lockoutTime
,
657 lastLogonTimestamp
=lastLogonTimestamp
,
658 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
659 msDSUserAccountControlComputed
=0)
662 creds_lockout
.set_password("thatsAcomplPASS1x")
664 ldb_lockout
= SamDB(url
=self
.host_url
, credentials
=creds_lockout
, lp
=self
.lp
)
666 except LdbError
as e8
:
668 self
.assertEqual(num
, ERR_INVALID_CREDENTIALS
)
670 res
= self
._check
_account
(userdn
,
672 badPasswordTime
=("greater", badPasswordTime
),
673 logonCount
=logonCount
,
674 lockoutTime
=lockoutTime
,
676 lastLogonTimestamp
=lastLogonTimestamp
,
677 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
678 msDSUserAccountControlComputed
=0)
679 badPasswordTime
= int(res
[0]["badPasswordTime"][0])
681 # The correct password without letting the timeout expire
682 creds_lockout
.set_password(userpass
)
683 ldb_lockout
= SamDB(url
=self
.host_url
, credentials
=creds_lockout
, lp
=self
.lp
)
685 res
= self
._check
_account
(userdn
,
687 badPasswordTime
=badPasswordTime
,
688 logonCount
=(logoncount_relation
, logonCount
),
689 lockoutTime
=lockoutTime
,
690 lastLogon
=("greater", lastLogon
),
691 lastLogonTimestamp
=lastLogonTimestamp
,
692 userAccountControl
=dsdb
.UF_NORMAL_ACCOUNT
,
693 msDSUserAccountControlComputed
=0)
696 class RodcRwdcTests(password_lockout_base
.BasePasswordTestCase
):
697 counter
= itertools
.count(1, 1)
699 def force_replication(self
, base
=None):
703 # XXX feels like a horrendous way to do it.
704 credstring
= '-U%s%%%s' % (CREDS
.get_username(),
705 CREDS
.get_password())
706 cmd
= adjust_cmd_for_py_version(['bin/samba-tool',
712 p
= subprocess
.Popen(cmd
,
713 stderr
=subprocess
.PIPE
,
714 stdout
=subprocess
.PIPE
)
715 stdout
, stderr
= p
.communicate()
717 print("failed with code %s" % p
.returncode
)
723 raise RodcRwdcTestException()
725 def _check_account_initial(self
, dn
):
726 self
.force_replication()
727 return super(RodcRwdcTests
, self
)._check
_account
_initial
(dn
)
730 super(RodcRwdcTests
, self
).tearDown()
731 self
.rwdc_db
.set_dsheuristics(self
.rwdc_dsheuristics
)
732 CREDS
.set_kerberos_state(DONT_USE_KERBEROS
)
733 set_auto_replication(RWDC
, True)
736 self
.rodc_db
= SamDB('ldap://%s' % RODC
, credentials
=CREDS
,
737 session_info
=system_session(LP
), lp
=LP
)
739 self
.rwdc_db
= SamDB('ldap://%s' % RWDC
, credentials
=CREDS
,
740 session_info
=system_session(LP
), lp
=LP
)
742 # Define variables for BasePasswordTestCase
744 self
.global_creds
= CREDS
746 self
.host_url
= 'ldap://%s' % RWDC
747 self
.ldb
= SamDB(url
='ldap://%s' % RWDC
, session_info
=system_session(self
.lp
),
748 credentials
=self
.global_creds
, lp
=self
.lp
)
750 super(RodcRwdcTests
, self
).setUp()
752 self
.host_url
= 'ldap://%s' % RODC
753 self
.ldb
= SamDB(url
='ldap://%s' % RODC
, session_info
=system_session(self
.lp
),
754 credentials
=self
.global_creds
, lp
=self
.lp
)
756 self
.samr
= samr
.samr("ncacn_ip_tcp:%s[seal]" % self
.host
, self
.lp
, self
.global_creds
)
757 self
.samr_handle
= self
.samr
.Connect2(None, security
.SEC_FLAG_MAXIMUM_ALLOWED
)
758 self
.samr_domain
= self
.samr
.OpenDomain(self
.samr_handle
, security
.SEC_FLAG_MAXIMUM_ALLOWED
, self
.domain_sid
)
760 self
.base_dn
= self
.rwdc_db
.domain_dn()
762 root
= self
.rodc_db
.search(base
='', scope
=ldb
.SCOPE_BASE
,
763 attrs
=['dsServiceName'])
764 self
.service
= root
[0]['dsServiceName'][0]
765 self
.tag
= uuid
.uuid4().hex
767 self
.rwdc_dsheuristics
= self
.rwdc_db
.get_dsheuristics()
768 self
.rwdc_db
.set_dsheuristics("000000001")
770 set_auto_replication(RWDC
, False)
772 # make sure DCs are synchronized before the test
773 self
.force_replication()
774 self
.rwdc_dn
= get_server_ref_from_samdb(self
.rwdc_db
)
775 self
.rodc_dn
= get_server_ref_from_samdb(self
.rodc_db
)
777 def delete_ldb_connections(self
):
778 super(RodcRwdcTests
, self
).delete_ldb_connections()
782 def assertReferral(self
, fn
, *args
, **kwargs
):
785 self
.fail("failed to raise ldap referral")
786 except ldb
.LdbError
as e9
:
787 (code
, msg
) = e9
.args
788 self
.assertEqual(code
, ldb
.ERR_REFERRAL
,
789 "expected referral, got %s %s" % (code
, msg
))
791 def _test_rodc_dsheuristics(self
):
792 d
= self
.rodc_db
.get_dsheuristics()
793 self
.assertReferral(self
.rodc_db
.set_dsheuristics
, "000000001")
794 self
.assertReferral(self
.rodc_db
.set_dsheuristics
, d
)
796 def TEST_rodc_heuristics_kerberos(self
):
797 CREDS
.set_kerberos_state(MUST_USE_KERBEROS
)
798 self
._test
_rodc
_dsheuristics
()
800 def TEST_rodc_heuristics_ntlm(self
):
801 CREDS
.set_kerberos_state(DONT_USE_KERBEROS
)
802 self
._test
_rodc
_dsheuristics
()
804 def _test_add(self
, objects
, cross_ncs
=False):
808 base
= str(self
.rwdc_db
.get_config_basedn())
809 controls
= ["search_options:1:2"]
810 cn
= dn
.split(',', 1)[0]
811 expression
= '(%s)' % cn
818 res
= self
.rodc_db
.search(base
,
819 expression
=expression
,
820 scope
=ldb
.SCOPE_SUBTREE
,
823 self
.assertEqual(len(res
), 0)
824 except ldb
.LdbError
as e
:
825 if e
.args
[0] != ldb
.ERR_NO_SUCH_OBJECT
:
830 except ldb
.LdbError
as e
:
831 (ecode
, emsg
) = e
.args
832 self
.fail("Failed to add %s to rwdc: ldb error: %s %s" %
836 self
.force_replication(base
=base
)
838 self
.force_replication()
841 res
= self
.rodc_db
.search(base
,
842 expression
=expression
,
843 scope
=ldb
.SCOPE_SUBTREE
,
846 self
.assertEqual(len(res
), 1)
847 except ldb
.LdbError
as e
:
848 self
.assertNotEqual(e
.args
[0], ldb
.ERR_NO_SUCH_OBJECT
,
849 "replication seems to have failed")
851 def _test_add_replicated_objects(self
, mode
):
852 tag
= "%s%s" % (self
.tag
, mode
)
855 'dn': "ou=%s1,%s" % (tag
, self
.base_dn
),
856 "objectclass": "organizationalUnit"
859 'dn': "cn=%s2,%s" % (tag
, self
.base_dn
),
860 "objectclass": "user"
863 'dn': "cn=%s3,%s" % (tag
, self
.base_dn
),
864 "objectclass": "group"
867 self
.rwdc_db
.delete("ou=%s1,%s" % (tag
, self
.base_dn
))
868 self
.rwdc_db
.delete("cn=%s2,%s" % (tag
, self
.base_dn
))
869 self
.rwdc_db
.delete("cn=%s3,%s" % (tag
, self
.base_dn
))
871 def test_add_replicated_objects_kerberos(self
):
872 CREDS
.set_kerberos_state(MUST_USE_KERBEROS
)
873 self
._test
_add
_replicated
_objects
('kerberos')
875 def test_add_replicated_objects_ntlm(self
):
876 CREDS
.set_kerberos_state(DONT_USE_KERBEROS
)
877 self
._test
_add
_replicated
_objects
('ntlm')
879 def _test_add_replicated_connections(self
, mode
):
880 tag
= "%s%s" % (self
.tag
, mode
)
883 'dn': "cn=%sfoofoofoo,%s" % (tag
, self
.service
),
884 "objectclass": "NTDSConnection",
885 'enabledConnection': 'TRUE',
886 'fromServer': self
.base_dn
,
890 self
.rwdc_db
.delete("cn=%sfoofoofoo,%s" % (tag
, self
.service
))
892 def test_add_replicated_connections_kerberos(self
):
893 CREDS
.set_kerberos_state(MUST_USE_KERBEROS
)
894 self
._test
_add
_replicated
_connections
('kerberos')
896 def test_add_replicated_connections_ntlm(self
):
897 CREDS
.set_kerberos_state(DONT_USE_KERBEROS
)
898 self
._test
_add
_replicated
_connections
('ntlm')
900 def _test_modify_replicated_attributes(self
):
901 dn
= 'CN=Guest,CN=Users,' + self
.base_dn
903 for attr
in ['carLicense', 'middleName']:
905 m
.dn
= ldb
.Dn(self
.rwdc_db
, dn
)
906 m
[attr
] = ldb
.MessageElement(value
,
907 ldb
.FLAG_MOD_REPLACE
,
910 self
.rwdc_db
.modify(m
)
911 except ldb
.LdbError
as e
:
912 self
.fail("Failed to modify %s %s on RWDC %s with %s" %
915 self
.force_replication()
918 res
= self
.rodc_db
.search(dn
,
919 scope
=ldb
.SCOPE_SUBTREE
,
921 results
= [str(x
[attr
][0]) for x
in res
]
922 self
.assertEqual(results
, [value
])
923 except ldb
.LdbError
as e
:
924 self
.assertNotEqual(e
.args
[0], ldb
.ERR_NO_SUCH_OBJECT
,
925 "replication seems to have failed")
927 def test_modify_replicated_attributes_kerberos(self
):
928 CREDS
.set_kerberos_state(MUST_USE_KERBEROS
)
929 self
._test
_modify
_replicated
_attributes
()
931 def test_modify_replicated_attributes_ntlm(self
):
932 CREDS
.set_kerberos_state(DONT_USE_KERBEROS
)
933 self
._test
_modify
_replicated
_attributes
()
935 def _test_add_modify_delete(self
):
936 dn
= "cn=%s_add_modify,%s" % (self
.tag
, self
.base_dn
)
937 values
= ["%s%s" % (i
, self
.tag
) for i
in range(3)]
942 "objectclass": "user",
946 self
.force_replication()
947 for value
in values
[1:]:
950 m
.dn
= ldb
.Dn(self
.rwdc_db
, dn
)
951 m
[attr
] = ldb
.MessageElement(value
,
952 ldb
.FLAG_MOD_REPLACE
,
955 self
.rwdc_db
.modify(m
)
956 except ldb
.LdbError
as e
:
957 self
.fail("Failed to modify %s %s on RWDC %s with %s" %
960 self
.force_replication()
963 res
= self
.rodc_db
.search(dn
,
964 scope
=ldb
.SCOPE_SUBTREE
,
966 results
= [str(x
[attr
][0]) for x
in res
]
967 self
.assertEqual(results
, [value
])
968 except ldb
.LdbError
as e
:
969 self
.assertNotEqual(e
.args
[0], ldb
.ERR_NO_SUCH_OBJECT
,
970 "replication seems to have failed")
972 self
.rwdc_db
.delete(dn
)
973 self
.force_replication()
975 res
= self
.rodc_db
.search(dn
,
976 scope
=ldb
.SCOPE_SUBTREE
,
979 self
.fail("Failed to delete %s" % (dn
))
980 except ldb
.LdbError
as e
:
981 self
.assertEqual(e
.args
[0], ldb
.ERR_NO_SUCH_OBJECT
,
982 "Failed to delete %s" % (dn
))
984 def test_add_modify_delete_kerberos(self
):
985 CREDS
.set_kerberos_state(MUST_USE_KERBEROS
)
986 self
._test
_add
_modify
_delete
()
988 def test_add_modify_delete_ntlm(self
):
989 CREDS
.set_kerberos_state(DONT_USE_KERBEROS
)
990 self
._test
_add
_modify
_delete
()
993 username
= "u%sX%s" % (self
.tag
[:12], next(self
.counter
))
994 password
= 'password#1'
995 dn
= 'CN=%s,CN=Users,%s' % (username
, self
.base_dn
)
998 "objectclass": "user",
999 'sAMAccountName': username
,
1003 except ldb
.LdbError
as e
:
1004 self
.fail("Failed to add %s to rwdc: ldb error: %s" % (o
, e
))
1006 self
.rwdc_db
.modify_ldif("dn: %s\n"
1007 "changetype: modify\n"
1008 "delete: userPassword\n"
1009 "add: userPassword\n"
1010 "userPassword: %s\n" % (dn
, password
))
1011 self
.rwdc_db
.enable_account("(sAMAccountName=%s)" % username
)
1012 return (dn
, username
, password
)
1014 def _change_password(self
, user_dn
, old_password
, new_password
):
1015 self
.rwdc_db
.modify_ldif(
1017 "changetype: modify\n"
1018 "delete: userPassword\n"
1019 "userPassword: %s\n"
1020 "add: userPassword\n"
1021 "userPassword: %s\n" % (user_dn
, old_password
, new_password
))
1023 def try_ldap_logon(self
, server
, creds
, errno
=None):
1025 tmpdb
= SamDB('ldap://%s' % server
, credentials
=creds
,
1026 session_info
=system_session(LP
), lp
=LP
)
1027 if errno
is not None:
1028 self
.fail("logon failed to fail with ldb error %s" % errno
)
1029 except ldb
.LdbError
as e10
:
1030 (code
, msg
) = e10
.args
1033 self
.fail("logon incorrectly raised ldb error (code=%s)" %
1036 self
.fail("logon failed to raise correct ldb error"
1037 "Expected: %s Got: %s" %
1040 def zero_min_password_age(self
):
1041 min_pwd_age
= int(self
.rwdc_db
.get_minPwdAge())
1042 if min_pwd_age
!= 0:
1043 self
.rwdc_db
.set_minPwdAge('0')
1045 def _test_ldap_change_password(self
, errno
=None):
1046 self
.zero_min_password_age()
1048 dn
, username
, password
= self
._new
_user
()
1049 creds1
= make_creds(username
, password
)
1051 # With NTLM, this should fail on RODC before replication,
1052 # because the user isn't known.
1053 self
.try_ldap_logon(RODC
, creds1
, ldb
.ERR_INVALID_CREDENTIALS
)
1054 self
.force_replication()
1056 # Now the user is replicated to RODC, so logon should work
1057 self
.try_ldap_logon(RODC
, creds1
)
1059 passwords
= ['password#%s' % i
for i
in range(1, 6)]
1060 for prev
, password
in zip(passwords
[:-1], passwords
[1:]):
1061 self
._change
_password
(dn
, prev
, password
)
1063 # The password has changed enough times to make the old
1064 # password invalid (though with kerberos that doesn't matter).
1065 # For NTLM, the old creds should always fail
1066 self
.try_ldap_logon(RODC
, creds1
, errno
)
1067 self
.try_ldap_logon(RWDC
, creds1
, errno
)
1069 creds2
= make_creds(username
, password
)
1071 # new creds work straight away with NTLM, because although it
1072 # doesn't have the password, it knows the user and forwards
1074 self
.try_ldap_logon(RODC
, creds2
)
1075 self
.try_ldap_logon(RWDC
, creds2
)
1077 self
.force_replication()
1079 # After another replication check RODC still works and fails,
1080 # as appropriate to various creds
1081 self
.try_ldap_logon(RODC
, creds2
)
1082 self
.try_ldap_logon(RODC
, creds1
, errno
)
1085 password
= 'password#6'
1086 self
._change
_password
(dn
, prev
, password
)
1087 creds3
= make_creds(username
, password
)
1089 # previous password should still work.
1090 self
.try_ldap_logon(RWDC
, creds2
)
1091 self
.try_ldap_logon(RODC
, creds2
)
1093 # new password should still work.
1094 self
.try_ldap_logon(RWDC
, creds3
)
1095 self
.try_ldap_logon(RODC
, creds3
)
1097 # old password should still fail (but not on kerberos).
1098 self
.try_ldap_logon(RWDC
, creds1
, errno
)
1099 self
.try_ldap_logon(RODC
, creds1
, errno
)
1101 def test_ldap_change_password_kerberos(self
):
1102 CREDS
.set_kerberos_state(MUST_USE_KERBEROS
)
1103 self
._test
_ldap
_change
_password
()
1105 def test_ldap_change_password_ntlm(self
):
1106 CREDS
.set_kerberos_state(DONT_USE_KERBEROS
)
1107 self
._test
_ldap
_change
_password
(ldb
.ERR_INVALID_CREDENTIALS
)
1109 def _test_ldap_change_password_reveal_on_demand(self
, errno
=None):
1110 self
.zero_min_password_age()
1112 res
= self
.rodc_db
.search(self
.rodc_dn
,
1113 scope
=ldb
.SCOPE_BASE
,
1114 attrs
=['msDS-RevealOnDemandGroup'])
1116 group
= res
[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
1118 user_dn
, username
, password
= self
._new
_user
()
1119 creds1
= make_creds(username
, password
)
1122 m
.dn
= ldb
.Dn(self
.rwdc_db
, group
)
1123 m
['member'] = ldb
.MessageElement(user_dn
, ldb
.FLAG_MOD_ADD
, 'member')
1124 self
.rwdc_db
.modify(m
)
1126 # Against Windows, this will just forward if no account exists on the KDC
1127 # Therefore, this does not error on Windows.
1128 self
.try_ldap_logon(RODC
, creds1
, ldb
.ERR_INVALID_CREDENTIALS
)
1130 self
.force_replication()
1133 self
.try_ldap_logon(RODC
, creds1
)
1134 preload_rodc_user(user_dn
)
1136 # Now the user AND password are replicated to RODC, so logon should work (not proxy case)
1137 self
.try_ldap_logon(RODC
, creds1
)
1139 passwords
= ['password#%s' % i
for i
in range(1, 6)]
1140 for prev
, password
in zip(passwords
[:-1], passwords
[1:]):
1141 self
._change
_password
(user_dn
, prev
, password
)
1143 # The password has changed enough times to make the old
1144 # password invalid, but the RODC shouldn't know that.
1145 self
.try_ldap_logon(RODC
, creds1
)
1146 self
.try_ldap_logon(RWDC
, creds1
, errno
)
1148 creds2
= make_creds(username
, password
)
1149 self
.try_ldap_logon(RWDC
, creds2
)
1150 # We can forward WRONG_PASSWORD over NTLM.
1151 # This SHOULD succeed.
1152 self
.try_ldap_logon(RODC
, creds2
)
1154 def test_change_password_reveal_on_demand_ntlm(self
):
1155 CREDS
.set_kerberos_state(DONT_USE_KERBEROS
)
1156 self
._test
_ldap
_change
_password
_reveal
_on
_demand
(ldb
.ERR_INVALID_CREDENTIALS
)
1158 def test_change_password_reveal_on_demand_kerberos(self
):
1159 CREDS
.set_kerberos_state(MUST_USE_KERBEROS
)
1160 self
._test
_ldap
_change
_password
_reveal
_on
_demand
()
1162 def test_login_lockout_krb5(self
):
1163 username
= self
.lockout1krb5_creds
.get_username()
1164 userpass
= self
.lockout1krb5_creds
.get_password()
1165 userdn
= "cn=%s,cn=users,%s" % (username
, self
.base_dn
)
1167 preload_rodc_user(userdn
)
1169 use_kerberos
= self
.lockout1krb5_creds
.get_kerberos_state()
1170 fail_creds
= self
.insta_creds(self
.template_creds
,
1172 userpass
=userpass
+ "X",
1173 kerberos_state
=use_kerberos
)
1176 ldb
= SamDB(url
=self
.host_url
, credentials
=fail_creds
, lp
=self
.lp
)
1178 except LdbError
as e11
:
1179 (num
, msg
) = e11
.args
1180 self
.assertEqual(num
, ERR_INVALID_CREDENTIALS
)
1182 # Succeed to reset everything to 0
1183 success_creds
= self
.insta_creds(self
.template_creds
,
1186 kerberos_state
=use_kerberos
)
1188 ldb
= SamDB(url
=self
.host_url
, credentials
=success_creds
, lp
=self
.lp
)
1190 self
._test
_login
_lockout
(self
.lockout1krb5_creds
)
1192 def test_login_lockout_ntlm(self
):
1193 username
= self
.lockout1ntlm_creds
.get_username()
1194 userpass
= self
.lockout1ntlm_creds
.get_password()
1195 userdn
= "cn=%s,cn=users,%s" % (username
, self
.base_dn
)
1197 preload_rodc_user(userdn
)
1199 use_kerberos
= self
.lockout1ntlm_creds
.get_kerberos_state()
1200 fail_creds
= self
.insta_creds(self
.template_creds
,
1202 userpass
=userpass
+ "X",
1203 kerberos_state
=use_kerberos
)
1206 ldb
= SamDB(url
=self
.host_url
, credentials
=fail_creds
, lp
=self
.lp
)
1208 except LdbError
as e12
:
1209 (num
, msg
) = e12
.args
1210 self
.assertEqual(num
, ERR_INVALID_CREDENTIALS
)
1212 # Succeed to reset everything to 0
1213 ldb
= SamDB(url
=self
.host_url
, credentials
=self
.lockout1ntlm_creds
, lp
=self
.lp
)
1215 self
._test
_login
_lockout
(self
.lockout1ntlm_creds
)
1217 def test_multiple_logon_krb5(self
):
1218 username
= self
.lockout1krb5_creds
.get_username()
1219 userpass
= self
.lockout1krb5_creds
.get_password()
1220 userdn
= "cn=%s,cn=users,%s" % (username
, self
.base_dn
)
1222 preload_rodc_user(userdn
)
1224 use_kerberos
= self
.lockout1krb5_creds
.get_kerberos_state()
1225 fail_creds
= self
.insta_creds(self
.template_creds
,
1227 userpass
=userpass
+ "X",
1228 kerberos_state
=use_kerberos
)
1231 ldb
= SamDB(url
=self
.host_url
, credentials
=fail_creds
, lp
=self
.lp
)
1233 except LdbError
as e13
:
1234 (num
, msg
) = e13
.args
1235 self
.assertEqual(num
, ERR_INVALID_CREDENTIALS
)
1237 # Succeed to reset everything to 0
1238 success_creds
= self
.insta_creds(self
.template_creds
,
1241 kerberos_state
=use_kerberos
)
1243 ldb
= SamDB(url
=self
.host_url
, credentials
=success_creds
, lp
=self
.lp
)
1245 self
._test
_multiple
_logon
(self
.lockout1krb5_creds
)
1247 def test_multiple_logon_ntlm(self
):
1248 username
= self
.lockout1ntlm_creds
.get_username()
1249 userdn
= "cn=%s,cn=users,%s" % (username
, self
.base_dn
)
1250 userpass
= self
.lockout1ntlm_creds
.get_password()
1252 preload_rodc_user(userdn
)
1254 use_kerberos
= self
.lockout1ntlm_creds
.get_kerberos_state()
1255 fail_creds
= self
.insta_creds(self
.template_creds
,
1257 userpass
=userpass
+ "X",
1258 kerberos_state
=use_kerberos
)
1261 ldb
= SamDB(url
=self
.host_url
, credentials
=fail_creds
, lp
=self
.lp
)
1263 except LdbError
as e14
:
1264 (num
, msg
) = e14
.args
1265 self
.assertEqual(num
, ERR_INVALID_CREDENTIALS
)
1267 # Succeed to reset everything to 0
1268 ldb
= SamDB(url
=self
.host_url
, credentials
=self
.lockout1ntlm_creds
, lp
=self
.lp
)
1270 self
._test
_multiple
_logon
(self
.lockout1ntlm_creds
)
1274 global RODC
, RWDC
, CREDS
, LP
1275 parser
= optparse
.OptionParser(
1276 "rodc_rwdc.py [options] <rodc host> <rwdc host>")
1278 sambaopts
= options
.SambaOptions(parser
)
1279 versionopts
= options
.VersionOptions(parser
)
1280 credopts
= options
.CredentialsOptions(parser
)
1281 subunitopts
= SubunitOptions(parser
)
1283 parser
.add_option_group(sambaopts
)
1284 parser
.add_option_group(versionopts
)
1285 parser
.add_option_group(credopts
)
1286 parser
.add_option_group(subunitopts
)
1288 opts
, args
= parser
.parse_args()
1290 LP
= sambaopts
.get_loadparm()
1291 CREDS
= credopts
.get_credentials(LP
)
1292 CREDS
.set_gensec_features(CREDS
.get_gensec_features() |
1293 gensec
.FEATURE_SEAL
)
1298 parser
.print_usage()
1301 set_auto_replication(RWDC
, True)
1303 TestProgram(module
=__name__
, opts
=subunitopts
)
1305 set_auto_replication(RWDC
, True)