Describe implication of upstream ICU-22610
[Samba.git] / python / samba / tests / krb5 / lockout_tests.py
blobe33d9acb4a87f24e506f0301e4badc1f87354630
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) Catalyst.Net Ltd
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 import sys
21 import os
23 sys.path.insert(0, 'bin/python')
24 os.environ['PYTHONUNBUFFERED'] = '1'
26 from concurrent import futures
27 from enum import Enum
28 from functools import partial
29 from multiprocessing import Pipe
30 import time
32 from cryptography.hazmat.backends import default_backend
33 from cryptography.hazmat.primitives.ciphers.base import Cipher
34 from cryptography.hazmat.primitives.ciphers import algorithms
36 import ldb
38 from samba import (
39 NTSTATUSError,
40 dsdb,
41 generate_random_bytes,
42 generate_random_password,
43 ntstatus,
44 unix2nttime,
45 werror,
47 from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS
48 from samba.crypto import (
49 aead_aes_256_cbc_hmac_sha512_blob,
50 des_crypt_blob_16,
51 md4_hash_blob,
52 sha512_pbkdf2,
54 from samba.dcerpc import lsa, samr
55 from samba.samdb import SamDB
57 from samba.tests import connect_samdb, env_get_var_value, env_loadparm
59 from samba.tests.krb5.as_req_tests import AsReqBaseTest
60 from samba.tests.krb5 import kcrypto
61 from samba.tests.krb5.kdc_tgs_tests import KdcTgsBaseTests
62 from samba.tests.krb5.raw_testcase import KerberosCredentials
63 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
64 from samba.tests.krb5.rfc4120_constants import (
65 KDC_ERR_CLIENT_REVOKED,
66 KDC_ERR_KEY_EXPIRED,
67 KDC_ERR_PREAUTH_FAILED,
68 KRB_AS_REP,
69 KRB_ERROR,
70 NT_PRINCIPAL,
71 NT_SRV_INST,
74 global_asn1_print = False
75 global_hexdump = False
78 class ConnectionResult(Enum):
79 LOCKED_OUT = 1
80 WRONG_PASSWORD = 2
81 SUCCESS = 3
84 def connect_kdc(pipe,
85 url,
86 hostname,
87 username,
88 password,
89 domain,
90 realm,
91 workstation,
92 dn,
93 expect_error=True,
94 expect_status=None):
95 AsReqBaseTest.setUpClass()
96 as_req_base = AsReqBaseTest()
97 as_req_base.setUp()
99 user_creds = KerberosCredentials()
100 user_creds.set_username(username)
101 user_creds.set_password(password)
102 user_creds.set_domain(domain)
103 user_creds.set_realm(realm)
104 user_creds.set_workstation(workstation)
105 user_creds.set_kerberos_state(DONT_USE_KERBEROS)
107 user_name = user_creds.get_username()
108 cname = as_req_base.PrincipalName_create(name_type=NT_PRINCIPAL,
109 names=user_name.split('/'))
111 krbtgt_creds = as_req_base.get_krbtgt_creds()
112 krbtgt_supported_etypes = krbtgt_creds.tgs_supported_enctypes
113 realm = krbtgt_creds.get_realm()
115 krbtgt_account = krbtgt_creds.get_username()
116 sname = as_req_base.PrincipalName_create(name_type=NT_SRV_INST,
117 names=[krbtgt_account, realm])
119 expected_salt = user_creds.get_salt()
121 till = as_req_base.get_KerberosTime(offset=36000)
123 kdc_options = krb5_asn1.KDCOptions('postdated')
125 preauth_key = as_req_base.PasswordKey_from_creds(user_creds,
126 kcrypto.Enctype.AES256)
128 ts_enc_padata = as_req_base.get_enc_timestamp_pa_data_from_key(preauth_key)
129 padata = [ts_enc_padata]
131 krbtgt_decryption_key = (
132 as_req_base.TicketDecryptionKey_from_creds(krbtgt_creds))
134 etypes = as_req_base.get_default_enctypes(user_creds)
136 # Remove the LDAP connection.
137 del type(as_req_base)._ldb
139 if expect_error:
140 expected_error_modes = (KDC_ERR_CLIENT_REVOKED,
141 KDC_ERR_PREAUTH_FAILED)
143 # Wrap generic_check_kdc_error() to expect an NTSTATUS code when the
144 # account is locked out.
145 def check_error_fn(kdc_exchange_dict,
146 callback_dict,
147 rep):
148 error_code = rep.get('error-code')
149 if error_code == KDC_ERR_CLIENT_REVOKED:
150 # The account was locked out.
151 kdc_exchange_dict['expected_status'] = (
152 ntstatus.NT_STATUS_ACCOUNT_LOCKED_OUT)
154 if expect_status:
155 # Expect to get a LOCKED_OUT NTSTATUS code.
156 kdc_exchange_dict['expect_edata'] = True
157 kdc_exchange_dict['expect_status'] = True
159 elif error_code == KDC_ERR_PREAUTH_FAILED:
160 # Just a wrong password: the account wasn’t locked out. Don’t
161 # expect an NTSTATUS code.
162 kdc_exchange_dict['expect_status'] = False
164 # Continue with the generic error-checking logic.
165 return as_req_base.generic_check_kdc_error(
166 kdc_exchange_dict,
167 callback_dict,
168 rep)
170 check_rep_fn = None
171 else:
172 expected_error_modes = 0
174 check_error_fn = None
175 check_rep_fn = as_req_base.generic_check_kdc_rep
177 def _generate_padata_copy(_kdc_exchange_dict,
178 _callback_dict,
179 req_body):
180 return padata, req_body
182 kdc_exchange_dict = as_req_base.as_exchange_dict(
183 creds=user_creds,
184 expected_crealm=realm,
185 expected_cname=cname,
186 expected_srealm=realm,
187 expected_sname=sname,
188 expected_account_name=user_name,
189 expected_supported_etypes=krbtgt_supported_etypes,
190 ticket_decryption_key=krbtgt_decryption_key,
191 generate_padata_fn=_generate_padata_copy,
192 check_error_fn=check_error_fn,
193 check_rep_fn=check_rep_fn,
194 check_kdc_private_fn=as_req_base.generic_check_kdc_private,
195 expected_error_mode=expected_error_modes,
196 expected_salt=expected_salt,
197 preauth_key=preauth_key,
198 kdc_options=str(kdc_options),
199 pac_request=True)
201 # Indicate that we're ready. This ensures we hit the right transaction
202 # lock.
203 pipe.send_bytes(b'0')
205 # Wait for the main process to take out a transaction lock.
206 if not pipe.poll(timeout=5):
207 raise AssertionError('main process failed to indicate readiness')
209 # Try making a Kerberos AS-REQ to the KDC. This might fail, either due to
210 # the user's account being locked out or due to using the wrong password.
211 as_rep = as_req_base._generic_kdc_exchange(kdc_exchange_dict,
212 cname=cname,
213 realm=realm,
214 sname=sname,
215 till_time=till,
216 etypes=etypes)
218 as_req_base.assertIsNotNone(as_rep)
220 msg_type = as_rep['msg-type']
221 if expect_error and msg_type != KRB_ERROR or (
222 not expect_error and msg_type != KRB_AS_REP):
223 raise AssertionError(f'wrong message type {msg_type}')
225 if not expect_error:
226 return ConnectionResult.SUCCESS
228 error_code = as_rep['error-code']
229 if error_code == KDC_ERR_CLIENT_REVOKED:
230 return ConnectionResult.LOCKED_OUT
231 elif error_code == KDC_ERR_PREAUTH_FAILED:
232 return ConnectionResult.WRONG_PASSWORD
233 else:
234 raise AssertionError(f'wrong error code {error_code}')
237 def connect_ntlm(pipe,
238 url,
239 hostname,
240 username,
241 password,
242 domain,
243 realm,
244 workstation,
245 dn):
246 user_creds = KerberosCredentials()
247 user_creds.set_username(username)
248 user_creds.set_password(password)
249 user_creds.set_domain(domain)
250 user_creds.set_workstation(workstation)
251 user_creds.set_kerberos_state(DONT_USE_KERBEROS)
253 # Indicate that we're ready. This ensures we hit the right transaction
254 # lock.
255 pipe.send_bytes(b'0')
257 # Wait for the main process to take out a transaction lock.
258 if not pipe.poll(timeout=5):
259 raise AssertionError('main process failed to indicate readiness')
261 try:
262 # Try connecting to SamDB. This should fail, either due to our
263 # account being locked out or due to using the wrong password.
264 SamDB(url=url,
265 credentials=user_creds,
266 lp=env_loadparm())
267 except ldb.LdbError as err:
268 num, estr = err.args
270 if num != ldb.ERR_INVALID_CREDENTIALS:
271 raise AssertionError(f'connection raised wrong error code '
272 f'({err})')
274 if f'data {werror.WERR_ACCOUNT_LOCKED_OUT:x},' in estr:
275 return ConnectionResult.LOCKED_OUT
276 elif f'data {werror.WERR_LOGON_FAILURE:x},' in estr:
277 return ConnectionResult.WRONG_PASSWORD
278 else:
279 raise AssertionError(f'connection raised wrong error code '
280 f'({estr})')
281 else:
282 return ConnectionResult.SUCCESS
285 def connect_samr(pipe,
286 url,
287 hostname,
288 username,
289 password,
290 domain,
291 realm,
292 workstation,
293 dn):
294 # Get the user's NT hash.
295 user_creds = KerberosCredentials()
296 user_creds.set_password(password)
297 nt_hash = user_creds.get_nt_hash()
299 # Generate a new UTF-16 password.
300 new_password = generate_random_password(32, 32)
301 new_password = new_password.encode('utf-16le')
303 # Generate the MD4 hash of the password.
304 new_password_md4 = md4_hash_blob(new_password)
306 # Prefix the password with padding so it is 512 bytes long.
307 new_password_len = len(new_password)
308 remaining_len = 512 - new_password_len
309 new_password = bytes(remaining_len) + new_password
311 # Append the 32-bit length of the password..
312 new_password += int.to_bytes(new_password_len,
313 length=4,
314 byteorder='little')
316 # Encrypt the password with RC4 and the existing NT hash.
317 encryptor = Cipher(algorithms.ARC4(nt_hash),
318 None,
319 default_backend()).encryptor()
320 new_password = encryptor.update(new_password)
322 # Create a key from the MD4 hash of the new password.
323 key = new_password_md4[:14]
325 # Encrypt the old NT hash with DES to obtain the verifier.
326 verifier = des_crypt_blob_16(nt_hash, key)
328 server = lsa.String()
329 server.string = hostname
331 account = lsa.String()
332 account.string = username
334 nt_password = samr.CryptPassword()
335 nt_password.data = list(new_password)
337 nt_verifier = samr.Password()
338 nt_verifier.hash = list(verifier)
340 conn = samr.samr(f'ncacn_np:{hostname}[krb5,seal,smb2]')
342 # Indicate that we're ready. This ensures we hit the right transaction
343 # lock.
344 pipe.send_bytes(b'0')
346 # Wait for the main process to take out a transaction lock.
347 if not pipe.poll(timeout=5):
348 raise AssertionError('main process failed to indicate readiness')
350 try:
351 # Try changing the password. This should fail, either due to our
352 # account being locked out or due to using the wrong password.
353 conn.ChangePasswordUser3(server=server,
354 account=account,
355 nt_password=nt_password,
356 nt_verifier=nt_verifier,
357 lm_change=True,
358 lm_password=None,
359 lm_verifier=None,
360 password3=None)
361 except NTSTATUSError as err:
362 num, estr = err.args
364 if num == ntstatus.NT_STATUS_ACCOUNT_LOCKED_OUT:
365 return ConnectionResult.LOCKED_OUT
366 elif num == ntstatus.NT_STATUS_WRONG_PASSWORD:
367 return ConnectionResult.WRONG_PASSWORD
368 else:
369 raise AssertionError(f'pwd change raised wrong error code '
370 f'({num:08X})')
371 else:
372 return ConnectionResult.SUCCESS
375 def connect_samr_aes(pipe,
376 url,
377 hostname,
378 username,
379 password,
380 domain,
381 realm,
382 workstation,
383 dn):
384 # Get the user's NT hash.
385 user_creds = KerberosCredentials()
386 user_creds.set_password(password)
387 nt_hash = user_creds.get_nt_hash()
389 # Generate a new UTF-16 password.
390 new_password = generate_random_password(32, 32)
391 new_password = new_password.encode('utf-16le')
393 # Prepend the 16-bit length of the password..
394 new_password_len = int.to_bytes(len(new_password),
395 length=2,
396 byteorder='little')
397 new_password = new_password_len + new_password
399 server = lsa.String()
400 server.string = hostname
402 account = lsa.String()
403 account.string = username
405 # Derive a key from the user's NT hash.
406 iv = generate_random_bytes(16)
407 iterations = 5555
408 cek = sha512_pbkdf2(nt_hash, iv, iterations)
410 enc_key_salt = (b'Microsoft SAM encryption key '
411 b'AEAD-AES-256-CBC-HMAC-SHA512 16\0')
412 mac_key_salt = (b'Microsoft SAM MAC key '
413 b'AEAD-AES-256-CBC-HMAC-SHA512 16\0')
415 # Encrypt the new password.
416 ciphertext, auth_data = aead_aes_256_cbc_hmac_sha512_blob(new_password,
417 cek,
418 enc_key_salt,
419 mac_key_salt,
422 # Create the new password structure
423 pwd_buf = samr.EncryptedPasswordAES()
424 pwd_buf.auth_data = list(auth_data)
425 pwd_buf.salt = list(iv)
426 pwd_buf.cipher_len = len(ciphertext)
427 pwd_buf.cipher = list(ciphertext)
428 pwd_buf.PBKDF2Iterations = iterations
430 conn = samr.samr(f'ncacn_np:{hostname}[krb5,seal,smb2]')
432 # Indicate that we're ready. This ensures we hit the right transaction
433 # lock.
434 pipe.send_bytes(b'0')
436 # Wait for the main process to take out a transaction lock.
437 if not pipe.poll(timeout=5):
438 raise AssertionError('main process failed to indicate readiness')
440 try:
441 # Try changing the password. This should fail, either due to our
442 # account being locked out or due to using the wrong password.
443 conn.ChangePasswordUser4(server=server,
444 account=account,
445 password=pwd_buf)
446 except NTSTATUSError as err:
447 num, estr = err.args
449 if num == ntstatus.NT_STATUS_ACCOUNT_LOCKED_OUT:
450 return ConnectionResult.LOCKED_OUT
451 elif num == ntstatus.NT_STATUS_WRONG_PASSWORD:
452 return ConnectionResult.WRONG_PASSWORD
453 else:
454 raise AssertionError(f'pwd change raised wrong error code '
455 f'({num:08X})')
456 else:
457 return ConnectionResult.SUCCESS
460 def ldap_pwd_change(pipe,
461 url,
462 hostname,
463 username,
464 password,
465 domain,
466 realm,
467 workstation,
468 dn):
469 lp = env_loadparm()
471 admin_creds = KerberosCredentials()
472 admin_creds.guess(lp)
473 admin_creds.set_username(env_get_var_value('ADMIN_USERNAME'))
474 admin_creds.set_password(env_get_var_value('ADMIN_PASSWORD'))
475 admin_creds.set_kerberos_state(MUST_USE_KERBEROS)
477 samdb = SamDB(url=url,
478 credentials=admin_creds,
479 lp=lp)
481 old_utf16pw = f'"{password}"'.encode('utf-16le')
483 new_password = generate_random_password(32, 32)
484 new_utf16pw = f'"{new_password}"'.encode('utf-16le')
486 msg = ldb.Message(ldb.Dn(samdb, dn))
487 msg['0'] = ldb.MessageElement(old_utf16pw,
488 ldb.FLAG_MOD_DELETE,
489 'unicodePwd')
490 msg['1'] = ldb.MessageElement(new_utf16pw,
491 ldb.FLAG_MOD_ADD,
492 'unicodePwd')
494 # Indicate that we're ready. This ensures we hit the right transaction
495 # lock.
496 pipe.send_bytes(b'0')
498 # Wait for the main process to take out a transaction lock.
499 if not pipe.poll(timeout=5):
500 raise AssertionError('main process failed to indicate readiness')
502 # Try changing the user's password. This should fail, either due to the
503 # user's account being locked out or due to specifying the wrong password.
504 try:
505 samdb.modify(msg)
506 except ldb.LdbError as err:
507 num, estr = err.args
508 if num != ldb.ERR_CONSTRAINT_VIOLATION:
509 raise AssertionError(f'pwd change raised wrong error code ({err})')
511 if f'<{werror.WERR_ACCOUNT_LOCKED_OUT:08X}:' in estr:
512 return ConnectionResult.LOCKED_OUT
513 elif f'<{werror.WERR_INVALID_PASSWORD:08X}:' in estr:
514 return ConnectionResult.WRONG_PASSWORD
515 else:
516 raise AssertionError(f'pwd change raised wrong error code '
517 f'({estr})')
518 else:
519 return ConnectionResult.SUCCESS
522 class LockoutTests(KdcTgsBaseTests):
524 def setUp(self):
525 super().setUp()
526 self.do_asn1_print = global_asn1_print
527 self.do_hexdump = global_hexdump
529 samdb = self.get_samdb()
530 base_dn = ldb.Dn(samdb, samdb.domain_dn())
532 def modify_attr(attr, value):
533 if value is None:
534 value = []
535 flag = ldb.FLAG_MOD_DELETE
536 else:
537 value = str(value)
538 flag = ldb.FLAG_MOD_REPLACE
540 msg = ldb.Message(base_dn)
541 msg[attr] = ldb.MessageElement(
542 value, flag, attr)
543 samdb.modify(msg)
545 res = samdb.search(base_dn,
546 scope=ldb.SCOPE_BASE,
547 attrs=['lockoutDuration',
548 'lockoutThreshold',
549 'msDS-LogonTimeSyncInterval'])
550 self.assertEqual(1, len(res))
552 # Reset the lockout duration as it was before.
553 lockout_duration = res[0].get('lockoutDuration', idx=0)
554 self.addCleanup(modify_attr, 'lockoutDuration', lockout_duration)
556 # Set the new lockout duration: locked out accounts now stay locked
557 # out.
558 modify_attr('lockoutDuration', 0)
560 # Reset the lockout threshold as it was before.
561 lockout_threshold = res[0].get('lockoutThreshold', idx=0)
562 self.addCleanup(modify_attr, 'lockoutThreshold', lockout_threshold)
564 # Set the new lockout threshold.
565 self.lockout_threshold = 3
566 modify_attr('lockoutThreshold', self.lockout_threshold)
568 # Reset the logon time sync interval as it was before.
569 sync_interval = res[0].get('msDS-LogonTimeSyncInterval', idx=0)
570 self.addCleanup(modify_attr,
571 'msDS-LogonTimeSyncInterval',
572 sync_interval)
574 # Set the new logon time sync interval. Setting it to 0 eliminates the
575 # need for this attribute to be updated on logon, and thus the
576 # requirement to take out a transaction.
577 modify_attr('msDS-LogonTimeSyncInterval', 0)
579 # Get the old 'minPwdAge'.
580 minPwdAge = samdb.get_minPwdAge()
582 # Reset the 'minPwdAge' as it was before.
583 self.addCleanup(samdb.set_minPwdAge, minPwdAge)
585 # Set it temporarily to '0'.
586 samdb.set_minPwdAge('0')
588 def wait_for_ready(self, pipe, future):
589 if pipe.poll(timeout=5):
590 return
592 # We failed to read a response from the pipe, so see if the test raised
593 # an exception with more information.
594 if future.done():
595 exception = future.exception(timeout=0)
596 if exception is not None:
597 raise exception
599 self.fail('test failed to indicate readiness')
601 def test_lockout_transaction_kdc(self):
602 self.do_lockout_transaction(connect_kdc)
604 def test_lockout_transaction_kdc_ntstatus(self):
605 self.do_lockout_transaction(partial(connect_kdc, expect_status=True))
607 # Test that performing AS‐REQs with accounts in various states of
608 # unusability results in appropriate NTSTATUS and Kerberos error codes.
610 def test_lockout_status_disabled(self):
611 self._run_lockout_status(
612 self._get_creds_disabled(),
613 expected_status=ntstatus.NT_STATUS_ACCOUNT_DISABLED,
614 expected_error=KDC_ERR_CLIENT_REVOKED,
617 def test_lockout_status_locked_out(self):
618 self._run_lockout_status(
619 self._get_creds_locked_out(),
620 expected_status=ntstatus.NT_STATUS_ACCOUNT_LOCKED_OUT,
621 expected_error=KDC_ERR_CLIENT_REVOKED,
624 def test_lockout_status_expired(self):
625 self._run_lockout_status(
626 self._get_creds_expired(),
627 expected_status=ntstatus.NT_STATUS_ACCOUNT_EXPIRED,
628 expected_error=KDC_ERR_CLIENT_REVOKED,
631 def test_lockout_status_must_change(self):
632 self._run_lockout_status(
633 self._get_creds_must_change(),
634 expected_status=ntstatus.NT_STATUS_PASSWORD_MUST_CHANGE,
635 expected_error=KDC_ERR_KEY_EXPIRED,
638 def test_lockout_status_password_expired(self):
639 self._run_lockout_status(
640 self._get_creds_password_expired(),
641 expected_status=ntstatus.NT_STATUS_PASSWORD_EXPIRED,
642 expected_error=KDC_ERR_KEY_EXPIRED,
645 # Test that performing the same AS‐REQs, this time with FAST, does not
646 # result in NTSTATUS codes.
648 def test_lockout_status_disabled_fast(self):
649 self._run_lockout_status_fast(
650 self._get_creds_disabled(), expected_error=KDC_ERR_CLIENT_REVOKED
653 def test_lockout_status_locked_out_fast(self):
654 self._run_lockout_status_fast(
655 self._get_creds_locked_out(), expected_error=KDC_ERR_CLIENT_REVOKED
658 def test_lockout_status_expired_fast(self):
659 self._run_lockout_status_fast(
660 self._get_creds_expired(), expected_error=KDC_ERR_CLIENT_REVOKED
663 def test_lockout_status_must_change_fast(self):
664 self._run_lockout_status_fast(
665 self._get_creds_must_change(), expected_error=KDC_ERR_KEY_EXPIRED
668 def test_lockout_status_password_expired_fast(self):
669 self._run_lockout_status_fast(
670 self._get_creds_password_expired(), expected_error=KDC_ERR_KEY_EXPIRED
673 def _get_creds_disabled(self):
674 return self.get_cached_creds(
675 account_type=self.AccountType.USER, opts={"enabled": False}
678 def _get_creds_locked_out(self) -> KerberosCredentials:
679 samdb = self.get_samdb()
681 user_creds = self.get_cached_creds(
682 account_type=self.AccountType.USER, use_cache=False
684 user_dn = user_creds.get_dn()
686 # Lock out the account.
688 old_utf16pw = '"Secret007"'.encode("utf-16le") # invalid pwd
689 new_utf16pw = '"Secret008"'.encode("utf-16le")
691 msg = ldb.Message(user_dn)
692 msg["0"] = ldb.MessageElement(old_utf16pw, ldb.FLAG_MOD_DELETE, "unicodePwd")
693 msg["1"] = ldb.MessageElement(new_utf16pw, ldb.FLAG_MOD_ADD, "unicodePwd")
695 for _ in range(self.lockout_threshold):
696 try:
697 samdb.modify(msg)
698 except ldb.LdbError as err:
699 num, _ = err.args
701 # We get an error, but the bad password count should
702 # still be updated.
703 self.assertEqual(num, ldb.ERR_CONSTRAINT_VIOLATION)
704 else:
705 self.fail("pwd change should have failed")
707 # Ensure the account is locked out.
709 res = samdb.search(
710 user_dn, scope=ldb.SCOPE_BASE, attrs=["msDS-User-Account-Control-Computed"]
712 self.assertEqual(1, len(res))
714 uac = int(res[0].get("msDS-User-Account-Control-Computed", idx=0))
715 self.assertTrue(uac & dsdb.UF_LOCKOUT)
717 return user_creds
719 def _get_creds_expired(self) -> KerberosCredentials:
720 return self.get_cached_creds(
721 account_type=self.AccountType.USER,
722 opts={"additional_details": self.freeze({"accountExpires": "1"})},
725 def _get_creds_must_change(self) -> KerberosCredentials:
726 return self.get_cached_creds(
727 account_type=self.AccountType.USER,
728 opts={"additional_details": self.freeze({"pwdLastSet": "0"})},
731 def _get_creds_password_expired(self) -> KerberosCredentials:
732 samdb = self.get_samdb()
733 self.addCleanup(samdb.set_maxPwdAge, samdb.get_maxPwdAge())
734 low_pwd_age = -2
735 samdb.set_maxPwdAge(low_pwd_age)
737 return self.get_cached_creds(account_type=self.AccountType.USER)
739 def _run_lockout_status(
740 self,
741 user_creds: KerberosCredentials,
743 expected_status: int,
744 expected_error: int,
745 ) -> None:
746 user_name = user_creds.get_username()
747 cname = self.PrincipalName_create(
748 name_type=NT_PRINCIPAL, names=user_name.split("/")
751 krbtgt_creds = self.get_krbtgt_creds()
752 realm = krbtgt_creds.get_realm()
754 sname = self.get_krbtgt_sname()
756 preauth_key = self.PasswordKey_from_creds(user_creds, kcrypto.Enctype.AES256)
758 ts_enc_padata = self.get_enc_timestamp_pa_data_from_key(preauth_key)
759 padata = [ts_enc_padata]
761 def _generate_padata_copy(_kdc_exchange_dict, _callback_dict, req_body):
762 return padata, req_body
764 kdc_exchange_dict = self.as_exchange_dict(
765 creds=user_creds,
766 expected_crealm=realm,
767 expected_cname=cname,
768 expected_srealm=realm,
769 expected_sname=sname,
770 expected_account_name=user_name,
771 expected_supported_etypes=krbtgt_creds.tgs_supported_enctypes,
772 expect_edata=True,
773 expect_status=True,
774 expected_status=expected_status,
775 ticket_decryption_key=self.TicketDecryptionKey_from_creds(krbtgt_creds),
776 generate_padata_fn=_generate_padata_copy,
777 check_error_fn=self.generic_check_kdc_error,
778 check_rep_fn=None,
779 check_kdc_private_fn=self.generic_check_kdc_private,
780 expected_error_mode=expected_error,
781 expected_salt=user_creds.get_salt(),
782 preauth_key=preauth_key,
783 kdc_options=str(krb5_asn1.KDCOptions("postdated")),
784 pac_request=True,
787 # Try making a Kerberos AS-REQ to the KDC. This might fail, either due
788 # to the user's account being locked out or due to using the wrong
789 # password.
790 self._generic_kdc_exchange(
791 kdc_exchange_dict,
792 cname=cname,
793 realm=realm,
794 sname=sname,
795 till_time=self.get_KerberosTime(offset=36000),
796 etypes=self.get_default_enctypes(user_creds),
799 def _run_lockout_status_fast(
800 self, user_creds: KerberosCredentials, *, expected_error: int
801 ) -> None:
802 self._armored_as_req(
803 user_creds,
804 self.get_krbtgt_creds(),
805 self.get_tgt(self.get_mach_creds()),
806 expected_error=expected_error,
807 expect_edata=self.expect_padata_outer,
808 # FAST‐armored responses never contain an NTSTATUS code.
809 expect_status=False,
812 def test_lockout_transaction_ntlm(self):
813 self.do_lockout_transaction(connect_ntlm)
815 def test_lockout_transaction_samr(self):
816 self.do_lockout_transaction(connect_samr)
818 def test_lockout_transaction_samr_aes(self):
819 self.do_lockout_transaction(connect_samr_aes)
821 def test_lockout_transaction_ldap_pw_change(self):
822 self.do_lockout_transaction(ldap_pwd_change)
824 # Tests to ensure we can handle the account being renamed. We do not test
825 # renames with SAMR password changes, because in that case the entire
826 # process happens inside a transaction, and the password change method only
827 # receives the account username. By the time it searches for the account,
828 # it will have already been renamed, and so it will always fail to find the
829 # account.
831 def test_lockout_transaction_rename_kdc(self):
832 self.do_lockout_transaction(connect_kdc, rename=True)
834 def test_lockout_transaction_rename_kdc_ntstatus(self):
835 self.do_lockout_transaction(partial(connect_kdc, expect_status=True),
836 rename=True)
838 def test_lockout_transaction_rename_ntlm(self):
839 self.do_lockout_transaction(connect_ntlm, rename=True)
841 def test_lockout_transaction_rename_ldap_pw_change(self):
842 self.do_lockout_transaction(ldap_pwd_change, rename=True)
844 def test_lockout_transaction_bad_pwd_kdc(self):
845 self.do_lockout_transaction(connect_kdc, correct_pw=False)
847 def test_lockout_transaction_bad_pwd_kdc_ntstatus(self):
848 self.do_lockout_transaction(partial(connect_kdc, expect_status=True),
849 correct_pw=False)
851 def test_lockout_transaction_bad_pwd_ntlm(self):
852 self.do_lockout_transaction(connect_ntlm, correct_pw=False)
854 def test_lockout_transaction_bad_pwd_samr(self):
855 self.do_lockout_transaction(connect_samr, correct_pw=False)
857 def test_lockout_transaction_bad_pwd_samr_aes(self):
858 self.do_lockout_transaction(connect_samr_aes, correct_pw=False)
860 def test_lockout_transaction_bad_pwd_ldap_pw_change(self):
861 self.do_lockout_transaction(ldap_pwd_change, correct_pw=False)
863 def test_bad_pwd_count_transaction_kdc(self):
864 self.do_bad_pwd_count_transaction(connect_kdc)
866 def test_bad_pwd_count_transaction_ntlm(self):
867 self.do_bad_pwd_count_transaction(connect_ntlm)
869 def test_bad_pwd_count_transaction_samr(self):
870 self.do_bad_pwd_count_transaction(connect_samr)
872 def test_bad_pwd_count_transaction_samr_aes(self):
873 self.do_bad_pwd_count_transaction(connect_samr_aes)
875 def test_bad_pwd_count_transaction_ldap_pw_change(self):
876 self.do_bad_pwd_count_transaction(ldap_pwd_change)
878 def test_bad_pwd_count_transaction_rename_kdc(self):
879 self.do_bad_pwd_count_transaction(connect_kdc, rename=True)
881 def test_bad_pwd_count_transaction_rename_ntlm(self):
882 self.do_bad_pwd_count_transaction(connect_ntlm, rename=True)
884 def test_bad_pwd_count_transaction_rename_ldap_pw_change(self):
885 self.do_bad_pwd_count_transaction(ldap_pwd_change, rename=True)
887 def test_lockout_race_kdc(self):
888 self.do_lockout_race(connect_kdc)
890 def test_lockout_race_kdc_ntstatus(self):
891 self.do_lockout_race(partial(connect_kdc, expect_status=True))
893 def test_lockout_race_ntlm(self):
894 self.do_lockout_race(connect_ntlm)
896 def test_lockout_race_samr(self):
897 self.do_lockout_race(connect_samr)
899 def test_lockout_race_samr_aes(self):
900 self.do_lockout_race(connect_samr_aes)
902 def test_lockout_race_ldap_pw_change(self):
903 self.do_lockout_race(ldap_pwd_change)
905 def test_logon_without_transaction_ntlm(self):
906 self.do_logon_without_transaction(connect_ntlm)
908 # Tests to ensure that the connection functions work correctly in the happy
909 # path.
911 def test_logon_kdc(self):
912 self.do_logon(partial(connect_kdc, expect_error=False))
914 def test_logon_ntlm(self):
915 self.do_logon(connect_ntlm)
917 def test_logon_samr(self):
918 self.do_logon(connect_samr)
920 def test_logon_samr_aes(self):
921 self.do_logon(connect_samr_aes)
923 def test_logon_ldap_pw_change(self):
924 self.do_logon(ldap_pwd_change)
926 # Test that connection without a correct password works.
927 def do_logon(self, connect_fn):
928 # Create the user account for testing.
929 user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
930 use_cache=False)
931 admin_creds = self.get_admin_creds()
932 lp = self.get_lp()
934 # Get a connection to our local SamDB.
935 samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
936 credentials=admin_creds)
937 self.assertLocalSamDB(samdb)
938 user_dn = ldb.Dn(samdb, str(user_creds.get_dn()))
940 password = user_creds.get_password()
942 # Prepare to connect to the server with a valid password.
943 our_pipe, their_pipe = Pipe(duplex=True)
945 # Inform the test function that it may proceed.
946 our_pipe.send_bytes(b'0')
948 result = connect_fn(pipe=their_pipe,
949 url=f'ldap://{samdb.host_dns_name()}',
950 hostname=samdb.host_dns_name(),
951 username=user_creds.get_username(),
952 password=password,
953 domain=user_creds.get_domain(),
954 realm=user_creds.get_realm(),
955 workstation=user_creds.get_workstation(),
956 dn=str(user_dn))
958 # The connection should succeed.
959 self.assertEqual(result, ConnectionResult.SUCCESS)
961 # Lock out the account while holding a transaction lock, then release the
962 # lock. A logon attempt already in progress should reread the account
963 # details and recognise the account is locked out. The account can
964 # additionally be renamed within the transaction to ensure that, by using
965 # the GUID, rereading the account's details still succeeds.
966 def do_lockout_transaction(self, connect_fn,
967 rename=False,
968 correct_pw=True):
969 # Create the user account for testing.
970 user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
971 use_cache=False)
973 admin_creds = self.get_admin_creds()
974 lp = self.get_lp()
976 # Get a connection to our local SamDB.
977 samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
978 credentials=admin_creds)
979 self.assertLocalSamDB(samdb)
981 user_dn = ldb.Dn(samdb, str(user_creds.get_dn()))
983 password = user_creds.get_password()
984 if not correct_pw:
985 password = password[:-1]
987 # Prepare to connect to the server.
988 with futures.ProcessPoolExecutor(max_workers=1) as executor:
989 our_pipe, their_pipe = Pipe(duplex=True)
990 connect_future = executor.submit(
991 connect_fn,
992 pipe=their_pipe,
993 url=f'ldap://{samdb.host_dns_name()}',
994 hostname=samdb.host_dns_name(),
995 username=user_creds.get_username(),
996 password=password,
997 domain=user_creds.get_domain(),
998 realm=user_creds.get_realm(),
999 workstation=user_creds.get_workstation(),
1000 dn=str(user_dn))
1002 # Wait until the test process indicates it's ready.
1003 self.wait_for_ready(our_pipe, connect_future)
1005 # Take out a transaction.
1006 samdb.transaction_start()
1007 try:
1008 # Lock out the account. We must do it using an actual password
1009 # change like so, rather than directly with a database
1010 # modification, so that the account is also added to the
1011 # auxiliary bad password database. Our goal is to get lockouts
1012 # to happen, i.e. password checking.
1014 old_utf16pw = '"Secret007"'.encode('utf-16le') # invalid pwd
1015 new_utf16pw = '"Secret008"'.encode('utf-16le')
1017 msg = ldb.Message(user_dn)
1018 msg['0'] = ldb.MessageElement(old_utf16pw,
1019 ldb.FLAG_MOD_DELETE,
1020 'unicodePwd')
1021 msg['1'] = ldb.MessageElement(new_utf16pw,
1022 ldb.FLAG_MOD_ADD,
1023 'unicodePwd')
1025 for i in range(self.lockout_threshold):
1026 try:
1027 samdb.modify(msg)
1028 except ldb.LdbError as err:
1029 num, estr = err.args
1031 # We get an error, but the bad password count should
1032 # still be updated.
1033 self.assertEqual(num, ldb.ERR_OPERATIONS_ERROR)
1034 self.assertEqual('Failed to obtain remote address for '
1035 'the LDAP client while changing the '
1036 'password',
1037 estr)
1038 else:
1039 self.fail('pwd change should have failed')
1041 # Ensure the account is locked out.
1043 res = samdb.search(
1044 user_dn, scope=ldb.SCOPE_BASE,
1045 attrs=['msDS-User-Account-Control-Computed'])
1046 self.assertEqual(1, len(res))
1048 uac = int(res[0].get('msDS-User-Account-Control-Computed',
1049 idx=0))
1050 self.assertTrue(uac & dsdb.UF_LOCKOUT)
1052 # Now the bad password database has been updated, inform the
1053 # test process that it may proceed.
1054 our_pipe.send_bytes(b'0')
1056 # Wait one second to ensure the test process hits the
1057 # transaction lock.
1058 time.sleep(1)
1060 if rename:
1061 # While we're at it, rename the account to ensure that is
1062 # also safe if a race occurs.
1063 msg = ldb.Message(user_dn)
1064 new_username = self.get_new_username()
1065 msg['sAMAccountName'] = ldb.MessageElement(
1066 new_username,
1067 ldb.FLAG_MOD_REPLACE,
1068 'sAMAccountName')
1069 samdb.modify(msg)
1071 except Exception:
1072 samdb.transaction_cancel()
1073 raise
1075 # Commit the local transaction.
1076 samdb.transaction_commit()
1078 result = connect_future.result(timeout=5)
1079 self.assertEqual(result, ConnectionResult.LOCKED_OUT)
1081 # Update the bad password count while holding a transaction lock, then
1082 # release the lock. A logon attempt already in progress should reread the
1083 # account details and ensure the bad password count is atomically
1084 # updated. The account can additionally be renamed within the transaction
1085 # to ensure that, by using the GUID, rereading the account's details still
1086 # succeeds.
1087 def do_bad_pwd_count_transaction(self, connect_fn, rename=False):
1088 # Create the user account for testing.
1089 user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
1090 use_cache=False)
1092 admin_creds = self.get_admin_creds()
1093 lp = self.get_lp()
1095 # Get a connection to our local SamDB.
1096 samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
1097 credentials=admin_creds)
1098 self.assertLocalSamDB(samdb)
1099 user_dn = ldb.Dn(samdb, str(user_creds.get_dn()))
1101 # Prepare to connect to the server with an invalid password.
1102 with futures.ProcessPoolExecutor(max_workers=1) as executor:
1103 our_pipe, their_pipe = Pipe(duplex=True)
1104 connect_future = executor.submit(
1105 connect_fn,
1106 pipe=their_pipe,
1107 url=f'ldap://{samdb.host_dns_name()}',
1108 hostname=samdb.host_dns_name(),
1109 username=user_creds.get_username(),
1110 password=user_creds.get_password()[:-1], # invalid password
1111 domain=user_creds.get_domain(),
1112 realm=user_creds.get_realm(),
1113 workstation=user_creds.get_workstation(),
1114 dn=str(user_dn))
1116 # Wait until the test process indicates it's ready.
1117 self.wait_for_ready(our_pipe, connect_future)
1119 # Take out a transaction.
1120 samdb.transaction_start()
1121 try:
1122 # Inform the test process that it may proceed.
1123 our_pipe.send_bytes(b'0')
1125 # Wait one second to ensure the test process hits the
1126 # transaction lock.
1127 time.sleep(1)
1129 # Set badPwdCount to 1.
1130 msg = ldb.Message(user_dn)
1131 now = int(time.time())
1132 bad_pwd_time = unix2nttime(now)
1133 msg['badPwdCount'] = ldb.MessageElement(
1134 '1',
1135 ldb.FLAG_MOD_REPLACE,
1136 'badPwdCount')
1137 msg['badPasswordTime'] = ldb.MessageElement(
1138 str(bad_pwd_time),
1139 ldb.FLAG_MOD_REPLACE,
1140 'badPasswordTime')
1141 if rename:
1142 # While we're at it, rename the account to ensure that is
1143 # also safe if a race occurs.
1144 new_username = self.get_new_username()
1145 msg['sAMAccountName'] = ldb.MessageElement(
1146 new_username,
1147 ldb.FLAG_MOD_REPLACE,
1148 'sAMAccountName')
1149 samdb.modify(msg)
1151 # Ensure the account is not yet locked out.
1153 res = samdb.search(
1154 user_dn, scope=ldb.SCOPE_BASE,
1155 attrs=['msDS-User-Account-Control-Computed'])
1156 self.assertEqual(1, len(res))
1158 uac = int(res[0].get('msDS-User-Account-Control-Computed',
1159 idx=0))
1160 self.assertFalse(uac & dsdb.UF_LOCKOUT)
1161 except Exception:
1162 samdb.transaction_cancel()
1163 raise
1165 # Commit the local transaction.
1166 samdb.transaction_commit()
1168 result = connect_future.result(timeout=5)
1169 self.assertEqual(result, ConnectionResult.WRONG_PASSWORD, result)
1171 # Check that badPwdCount has now increased to 2.
1173 res = samdb.search(user_dn,
1174 scope=ldb.SCOPE_BASE,
1175 attrs=['badPwdCount'])
1176 self.assertEqual(1, len(res))
1178 bad_pwd_count = int(res[0].get('badPwdCount', idx=0))
1179 self.assertEqual(2, bad_pwd_count)
1181 # Attempt to log in to the account with an incorrect password, using
1182 # lockoutThreshold+1 simultaneous attempts. We should get three 'wrong
1183 # password' errors and one 'locked out' error, showing that the bad
1184 # password count is checked and incremented atomically.
1185 def do_lockout_race(self, connect_fn):
1186 # Create the user account for testing.
1187 user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
1188 use_cache=False)
1190 admin_creds = self.get_admin_creds()
1191 lp = self.get_lp()
1193 # Get a connection to our local SamDB.
1194 samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
1195 credentials=admin_creds)
1196 self.assertLocalSamDB(samdb)
1197 user_dn = ldb.Dn(samdb, str(user_creds.get_dn()))
1199 # Prepare to connect to the server with an invalid password, using four
1200 # simultaneous requests. Only three of those attempts should get
1201 # through before the account is locked out.
1202 num_attempts = self.lockout_threshold + 1
1203 with futures.ProcessPoolExecutor(max_workers=num_attempts) as executor:
1204 connect_futures = []
1205 our_pipes = []
1206 for i in range(num_attempts):
1207 our_pipe, their_pipe = Pipe(duplex=True)
1208 our_pipes.append(our_pipe)
1210 connect_future = executor.submit(
1211 connect_fn,
1212 pipe=their_pipe,
1213 url=f'ldap://{samdb.host_dns_name()}',
1214 hostname=samdb.host_dns_name(),
1215 username=user_creds.get_username(),
1216 password=user_creds.get_password()[:-1], # invalid pw
1217 domain=user_creds.get_domain(),
1218 realm=user_creds.get_realm(),
1219 workstation=user_creds.get_workstation(),
1220 dn=str(user_dn))
1221 connect_futures.append(connect_future)
1223 # Wait until the test process indicates it's ready.
1224 self.wait_for_ready(our_pipe, connect_future)
1226 # Take out a transaction.
1227 samdb.transaction_start()
1228 try:
1229 # Inform the test processes that they may proceed.
1230 for our_pipe in our_pipes:
1231 our_pipe.send_bytes(b'0')
1233 # Wait one second to ensure the test processes hit the
1234 # transaction lock.
1235 time.sleep(1)
1236 except Exception:
1237 samdb.transaction_cancel()
1238 raise
1240 # Commit the local transaction.
1241 samdb.transaction_commit()
1243 lockouts = 0
1244 wrong_passwords = 0
1245 for i, connect_future in enumerate(connect_futures):
1246 result = connect_future.result(timeout=5)
1247 if result == ConnectionResult.LOCKED_OUT:
1248 lockouts += 1
1249 elif result == ConnectionResult.WRONG_PASSWORD:
1250 wrong_passwords += 1
1251 else:
1252 self.fail(f'process {i} gave an unexpected result '
1253 f'{result}')
1255 self.assertEqual(wrong_passwords, self.lockout_threshold)
1256 self.assertEqual(lockouts, num_attempts - self.lockout_threshold)
1258 # Ensure the account is now locked out.
1260 res = samdb.search(
1261 user_dn, scope=ldb.SCOPE_BASE,
1262 attrs=['badPwdCount',
1263 'msDS-User-Account-Control-Computed'])
1264 self.assertEqual(1, len(res))
1266 bad_pwd_count = int(res[0].get('badPwdCount', idx=0))
1267 self.assertEqual(self.lockout_threshold, bad_pwd_count)
1269 uac = int(res[0].get('msDS-User-Account-Control-Computed',
1270 idx=0))
1271 self.assertTrue(uac & dsdb.UF_LOCKOUT)
1273 # Test that logon is possible even while we locally hold a transaction
1274 # lock. This test only works with NTLM authentication; Kerberos
1275 # authentication must take out a transaction to update the logonCount
1276 # attribute, and LDAP and SAMR password changes both take out a transaction
1277 # to effect the password change. NTLM is the only logon method that does
1278 # not require a transaction, and can thus be performed while we're holding
1279 # the lock.
1280 def do_logon_without_transaction(self, connect_fn):
1281 # Create the user account for testing.
1282 user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
1283 use_cache=False)
1285 admin_creds = self.get_admin_creds()
1286 lp = self.get_lp()
1288 # Get a connection to our local SamDB.
1289 samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
1290 credentials=admin_creds)
1291 self.assertLocalSamDB(samdb)
1292 user_dn = ldb.Dn(samdb, str(user_creds.get_dn()))
1293 password = user_creds.get_password()
1295 # Prepare to connect to the server with a valid password.
1296 with futures.ProcessPoolExecutor(max_workers=1) as executor:
1297 our_pipe, their_pipe = Pipe(duplex=True)
1298 connect_future = executor.submit(
1299 connect_fn,
1300 pipe=their_pipe,
1301 url=f'ldap://{samdb.host_dns_name()}',
1302 hostname=samdb.host_dns_name(),
1303 username=user_creds.get_username(),
1304 password=password,
1305 domain=user_creds.get_domain(),
1306 realm=user_creds.get_realm(),
1307 workstation=user_creds.get_workstation(),
1308 dn=str(user_dn))
1310 # Wait until the test process indicates it's ready.
1311 self.wait_for_ready(our_pipe, connect_future)
1313 # Take out a transaction.
1314 samdb.transaction_start()
1315 try:
1316 # Inform the test process that it may proceed.
1317 our_pipe.send_bytes(b'0')
1319 # The connection should succeed, despite our holding a
1320 # transaction.
1321 result = connect_future.result(timeout=5)
1322 self.assertEqual(result, ConnectionResult.SUCCESS)
1323 except Exception:
1324 samdb.transaction_cancel()
1325 raise
1327 # Commit the local transaction.
1328 samdb.transaction_commit()
1331 if __name__ == '__main__':
1332 global_asn1_print = False
1333 global_hexdump = False
1334 import unittest
1335 unittest.main()