2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) Catalyst.Net Ltd
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 sys
.path
.insert(0, 'bin/python')
24 os
.environ
['PYTHONUNBUFFERED'] = '1'
26 from concurrent
import futures
28 from functools
import partial
29 from multiprocessing
import Pipe
32 from cryptography
.hazmat
.backends
import default_backend
33 from cryptography
.hazmat
.primitives
.ciphers
.base
import Cipher
34 from cryptography
.hazmat
.primitives
.ciphers
import algorithms
41 generate_random_bytes
,
42 generate_random_password
,
47 from samba
.credentials
import DONT_USE_KERBEROS
, MUST_USE_KERBEROS
48 from samba
.crypto
import (
49 aead_aes_256_cbc_hmac_sha512_blob
,
54 from samba
.dcerpc
import lsa
, samr
55 from samba
.samdb
import SamDB
57 from samba
.tests
import connect_samdb
, env_get_var_value
, env_loadparm
59 from samba
.tests
.krb5
.as_req_tests
import AsReqBaseTest
60 from samba
.tests
.krb5
import kcrypto
61 from samba
.tests
.krb5
.kdc_tgs_tests
import KdcTgsBaseTests
62 from samba
.tests
.krb5
.raw_testcase
import KerberosCredentials
63 import samba
.tests
.krb5
.rfc4120_pyasn1
as krb5_asn1
64 from samba
.tests
.krb5
.rfc4120_constants
import (
65 KDC_ERR_CLIENT_REVOKED
,
67 KDC_ERR_PREAUTH_FAILED
,
74 global_asn1_print
= False
75 global_hexdump
= False
78 class ConnectionResult(Enum
):
95 AsReqBaseTest
.setUpClass()
96 as_req_base
= AsReqBaseTest()
99 user_creds
= KerberosCredentials()
100 user_creds
.set_username(username
)
101 user_creds
.set_password(password
)
102 user_creds
.set_domain(domain
)
103 user_creds
.set_realm(realm
)
104 user_creds
.set_workstation(workstation
)
105 user_creds
.set_kerberos_state(DONT_USE_KERBEROS
)
107 user_name
= user_creds
.get_username()
108 cname
= as_req_base
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
109 names
=user_name
.split('/'))
111 krbtgt_creds
= as_req_base
.get_krbtgt_creds()
112 krbtgt_supported_etypes
= krbtgt_creds
.tgs_supported_enctypes
113 realm
= krbtgt_creds
.get_realm()
115 krbtgt_account
= krbtgt_creds
.get_username()
116 sname
= as_req_base
.PrincipalName_create(name_type
=NT_SRV_INST
,
117 names
=[krbtgt_account
, realm
])
119 expected_salt
= user_creds
.get_salt()
121 till
= as_req_base
.get_KerberosTime(offset
=36000)
123 kdc_options
= krb5_asn1
.KDCOptions('postdated')
125 preauth_key
= as_req_base
.PasswordKey_from_creds(user_creds
,
126 kcrypto
.Enctype
.AES256
)
128 ts_enc_padata
= as_req_base
.get_enc_timestamp_pa_data_from_key(preauth_key
)
129 padata
= [ts_enc_padata
]
131 krbtgt_decryption_key
= (
132 as_req_base
.TicketDecryptionKey_from_creds(krbtgt_creds
))
134 etypes
= as_req_base
.get_default_enctypes(user_creds
)
136 # Remove the LDAP connection.
137 del type(as_req_base
)._ldb
140 expected_error_modes
= (KDC_ERR_CLIENT_REVOKED
,
141 KDC_ERR_PREAUTH_FAILED
)
143 # Wrap generic_check_kdc_error() to expect an NTSTATUS code when the
144 # account is locked out.
145 def check_error_fn(kdc_exchange_dict
,
148 error_code
= rep
.get('error-code')
149 if error_code
== KDC_ERR_CLIENT_REVOKED
:
150 # The account was locked out.
151 kdc_exchange_dict
['expected_status'] = (
152 ntstatus
.NT_STATUS_ACCOUNT_LOCKED_OUT
)
155 # Expect to get a LOCKED_OUT NTSTATUS code.
156 kdc_exchange_dict
['expect_edata'] = True
157 kdc_exchange_dict
['expect_status'] = True
159 elif error_code
== KDC_ERR_PREAUTH_FAILED
:
160 # Just a wrong password: the account wasn’t locked out. Don’t
161 # expect an NTSTATUS code.
162 kdc_exchange_dict
['expect_status'] = False
164 # Continue with the generic error-checking logic.
165 return as_req_base
.generic_check_kdc_error(
172 expected_error_modes
= 0
174 check_error_fn
= None
175 check_rep_fn
= as_req_base
.generic_check_kdc_rep
177 def _generate_padata_copy(_kdc_exchange_dict
,
180 return padata
, req_body
182 kdc_exchange_dict
= as_req_base
.as_exchange_dict(
184 expected_crealm
=realm
,
185 expected_cname
=cname
,
186 expected_srealm
=realm
,
187 expected_sname
=sname
,
188 expected_account_name
=user_name
,
189 expected_supported_etypes
=krbtgt_supported_etypes
,
190 ticket_decryption_key
=krbtgt_decryption_key
,
191 generate_padata_fn
=_generate_padata_copy
,
192 check_error_fn
=check_error_fn
,
193 check_rep_fn
=check_rep_fn
,
194 check_kdc_private_fn
=as_req_base
.generic_check_kdc_private
,
195 expected_error_mode
=expected_error_modes
,
196 expected_salt
=expected_salt
,
197 preauth_key
=preauth_key
,
198 kdc_options
=str(kdc_options
),
201 # Indicate that we're ready. This ensures we hit the right transaction
203 pipe
.send_bytes(b
'0')
205 # Wait for the main process to take out a transaction lock.
206 if not pipe
.poll(timeout
=5):
207 raise AssertionError('main process failed to indicate readiness')
209 # Try making a Kerberos AS-REQ to the KDC. This might fail, either due to
210 # the user's account being locked out or due to using the wrong password.
211 as_rep
= as_req_base
._generic
_kdc
_exchange
(kdc_exchange_dict
,
218 as_req_base
.assertIsNotNone(as_rep
)
220 msg_type
= as_rep
['msg-type']
221 if expect_error
and msg_type
!= KRB_ERROR
or (
222 not expect_error
and msg_type
!= KRB_AS_REP
):
223 raise AssertionError(f
'wrong message type {msg_type}')
226 return ConnectionResult
.SUCCESS
228 error_code
= as_rep
['error-code']
229 if error_code
== KDC_ERR_CLIENT_REVOKED
:
230 return ConnectionResult
.LOCKED_OUT
231 elif error_code
== KDC_ERR_PREAUTH_FAILED
:
232 return ConnectionResult
.WRONG_PASSWORD
234 raise AssertionError(f
'wrong error code {error_code}')
237 def connect_ntlm(pipe
,
246 user_creds
= KerberosCredentials()
247 user_creds
.set_username(username
)
248 user_creds
.set_password(password
)
249 user_creds
.set_domain(domain
)
250 user_creds
.set_workstation(workstation
)
251 user_creds
.set_kerberos_state(DONT_USE_KERBEROS
)
253 # Indicate that we're ready. This ensures we hit the right transaction
255 pipe
.send_bytes(b
'0')
257 # Wait for the main process to take out a transaction lock.
258 if not pipe
.poll(timeout
=5):
259 raise AssertionError('main process failed to indicate readiness')
262 # Try connecting to SamDB. This should fail, either due to our
263 # account being locked out or due to using the wrong password.
265 credentials
=user_creds
,
267 except ldb
.LdbError
as err
:
270 if num
!= ldb
.ERR_INVALID_CREDENTIALS
:
271 raise AssertionError(f
'connection raised wrong error code '
274 if f
'data {werror.WERR_ACCOUNT_LOCKED_OUT:x},' in estr
:
275 return ConnectionResult
.LOCKED_OUT
276 elif f
'data {werror.WERR_LOGON_FAILURE:x},' in estr
:
277 return ConnectionResult
.WRONG_PASSWORD
279 raise AssertionError(f
'connection raised wrong error code '
282 return ConnectionResult
.SUCCESS
285 def connect_samr(pipe
,
294 # Get the user's NT hash.
295 user_creds
= KerberosCredentials()
296 user_creds
.set_password(password
)
297 nt_hash
= user_creds
.get_nt_hash()
299 # Generate a new UTF-16 password.
300 new_password
= generate_random_password(32, 32)
301 new_password
= new_password
.encode('utf-16le')
303 # Generate the MD4 hash of the password.
304 new_password_md4
= md4_hash_blob(new_password
)
306 # Prefix the password with padding so it is 512 bytes long.
307 new_password_len
= len(new_password
)
308 remaining_len
= 512 - new_password_len
309 new_password
= bytes(remaining_len
) + new_password
311 # Append the 32-bit length of the password..
312 new_password
+= int.to_bytes(new_password_len
,
316 # Encrypt the password with RC4 and the existing NT hash.
317 encryptor
= Cipher(algorithms
.ARC4(nt_hash
),
319 default_backend()).encryptor()
320 new_password
= encryptor
.update(new_password
)
322 # Create a key from the MD4 hash of the new password.
323 key
= new_password_md4
[:14]
325 # Encrypt the old NT hash with DES to obtain the verifier.
326 verifier
= des_crypt_blob_16(nt_hash
, key
)
328 server
= lsa
.String()
329 server
.string
= hostname
331 account
= lsa
.String()
332 account
.string
= username
334 nt_password
= samr
.CryptPassword()
335 nt_password
.data
= list(new_password
)
337 nt_verifier
= samr
.Password()
338 nt_verifier
.hash = list(verifier
)
340 conn
= samr
.samr(f
'ncacn_np:{hostname}[krb5,seal,smb2]')
342 # Indicate that we're ready. This ensures we hit the right transaction
344 pipe
.send_bytes(b
'0')
346 # Wait for the main process to take out a transaction lock.
347 if not pipe
.poll(timeout
=5):
348 raise AssertionError('main process failed to indicate readiness')
351 # Try changing the password. This should fail, either due to our
352 # account being locked out or due to using the wrong password.
353 conn
.ChangePasswordUser3(server
=server
,
355 nt_password
=nt_password
,
356 nt_verifier
=nt_verifier
,
361 except NTSTATUSError
as err
:
364 if num
== ntstatus
.NT_STATUS_ACCOUNT_LOCKED_OUT
:
365 return ConnectionResult
.LOCKED_OUT
366 elif num
== ntstatus
.NT_STATUS_WRONG_PASSWORD
:
367 return ConnectionResult
.WRONG_PASSWORD
369 raise AssertionError(f
'pwd change raised wrong error code '
372 return ConnectionResult
.SUCCESS
375 def connect_samr_aes(pipe
,
384 # Get the user's NT hash.
385 user_creds
= KerberosCredentials()
386 user_creds
.set_password(password
)
387 nt_hash
= user_creds
.get_nt_hash()
389 # Generate a new UTF-16 password.
390 new_password
= generate_random_password(32, 32)
391 new_password
= new_password
.encode('utf-16le')
393 # Prepend the 16-bit length of the password..
394 new_password_len
= int.to_bytes(len(new_password
),
397 new_password
= new_password_len
+ new_password
399 server
= lsa
.String()
400 server
.string
= hostname
402 account
= lsa
.String()
403 account
.string
= username
405 # Derive a key from the user's NT hash.
406 iv
= generate_random_bytes(16)
408 cek
= sha512_pbkdf2(nt_hash
, iv
, iterations
)
410 enc_key_salt
= (b
'Microsoft SAM encryption key '
411 b
'AEAD-AES-256-CBC-HMAC-SHA512 16\0')
412 mac_key_salt
= (b
'Microsoft SAM MAC key '
413 b
'AEAD-AES-256-CBC-HMAC-SHA512 16\0')
415 # Encrypt the new password.
416 ciphertext
, auth_data
= aead_aes_256_cbc_hmac_sha512_blob(new_password
,
422 # Create the new password structure
423 pwd_buf
= samr
.EncryptedPasswordAES()
424 pwd_buf
.auth_data
= list(auth_data
)
425 pwd_buf
.salt
= list(iv
)
426 pwd_buf
.cipher_len
= len(ciphertext
)
427 pwd_buf
.cipher
= list(ciphertext
)
428 pwd_buf
.PBKDF2Iterations
= iterations
430 conn
= samr
.samr(f
'ncacn_np:{hostname}[krb5,seal,smb2]')
432 # Indicate that we're ready. This ensures we hit the right transaction
434 pipe
.send_bytes(b
'0')
436 # Wait for the main process to take out a transaction lock.
437 if not pipe
.poll(timeout
=5):
438 raise AssertionError('main process failed to indicate readiness')
441 # Try changing the password. This should fail, either due to our
442 # account being locked out or due to using the wrong password.
443 conn
.ChangePasswordUser4(server
=server
,
446 except NTSTATUSError
as err
:
449 if num
== ntstatus
.NT_STATUS_ACCOUNT_LOCKED_OUT
:
450 return ConnectionResult
.LOCKED_OUT
451 elif num
== ntstatus
.NT_STATUS_WRONG_PASSWORD
:
452 return ConnectionResult
.WRONG_PASSWORD
454 raise AssertionError(f
'pwd change raised wrong error code '
457 return ConnectionResult
.SUCCESS
460 def ldap_pwd_change(pipe
,
471 admin_creds
= KerberosCredentials()
472 admin_creds
.guess(lp
)
473 admin_creds
.set_username(env_get_var_value('ADMIN_USERNAME'))
474 admin_creds
.set_password(env_get_var_value('ADMIN_PASSWORD'))
475 admin_creds
.set_kerberos_state(MUST_USE_KERBEROS
)
477 samdb
= SamDB(url
=url
,
478 credentials
=admin_creds
,
481 old_utf16pw
= f
'"{password}"'.encode('utf-16le')
483 new_password
= generate_random_password(32, 32)
484 new_utf16pw
= f
'"{new_password}"'.encode('utf-16le')
486 msg
= ldb
.Message(ldb
.Dn(samdb
, dn
))
487 msg
['0'] = ldb
.MessageElement(old_utf16pw
,
490 msg
['1'] = ldb
.MessageElement(new_utf16pw
,
494 # Indicate that we're ready. This ensures we hit the right transaction
496 pipe
.send_bytes(b
'0')
498 # Wait for the main process to take out a transaction lock.
499 if not pipe
.poll(timeout
=5):
500 raise AssertionError('main process failed to indicate readiness')
502 # Try changing the user's password. This should fail, either due to the
503 # user's account being locked out or due to specifying the wrong password.
506 except ldb
.LdbError
as err
:
508 if num
!= ldb
.ERR_CONSTRAINT_VIOLATION
:
509 raise AssertionError(f
'pwd change raised wrong error code ({err})')
511 if f
'<{werror.WERR_ACCOUNT_LOCKED_OUT:08X}:' in estr
:
512 return ConnectionResult
.LOCKED_OUT
513 elif f
'<{werror.WERR_INVALID_PASSWORD:08X}:' in estr
:
514 return ConnectionResult
.WRONG_PASSWORD
516 raise AssertionError(f
'pwd change raised wrong error code '
519 return ConnectionResult
.SUCCESS
522 class LockoutTests(KdcTgsBaseTests
):
526 self
.do_asn1_print
= global_asn1_print
527 self
.do_hexdump
= global_hexdump
529 samdb
= self
.get_samdb()
530 base_dn
= ldb
.Dn(samdb
, samdb
.domain_dn())
532 def modify_attr(attr
, value
):
535 flag
= ldb
.FLAG_MOD_DELETE
538 flag
= ldb
.FLAG_MOD_REPLACE
540 msg
= ldb
.Message(base_dn
)
541 msg
[attr
] = ldb
.MessageElement(
545 res
= samdb
.search(base_dn
,
546 scope
=ldb
.SCOPE_BASE
,
547 attrs
=['lockoutDuration',
549 'msDS-LogonTimeSyncInterval'])
550 self
.assertEqual(1, len(res
))
552 # Reset the lockout duration as it was before.
553 lockout_duration
= res
[0].get('lockoutDuration', idx
=0)
554 self
.addCleanup(modify_attr
, 'lockoutDuration', lockout_duration
)
556 # Set the new lockout duration: locked out accounts now stay locked
558 modify_attr('lockoutDuration', 0)
560 # Reset the lockout threshold as it was before.
561 lockout_threshold
= res
[0].get('lockoutThreshold', idx
=0)
562 self
.addCleanup(modify_attr
, 'lockoutThreshold', lockout_threshold
)
564 # Set the new lockout threshold.
565 self
.lockout_threshold
= 3
566 modify_attr('lockoutThreshold', self
.lockout_threshold
)
568 # Reset the logon time sync interval as it was before.
569 sync_interval
= res
[0].get('msDS-LogonTimeSyncInterval', idx
=0)
570 self
.addCleanup(modify_attr
,
571 'msDS-LogonTimeSyncInterval',
574 # Set the new logon time sync interval. Setting it to 0 eliminates the
575 # need for this attribute to be updated on logon, and thus the
576 # requirement to take out a transaction.
577 modify_attr('msDS-LogonTimeSyncInterval', 0)
579 # Get the old 'minPwdAge'.
580 minPwdAge
= samdb
.get_minPwdAge()
582 # Reset the 'minPwdAge' as it was before.
583 self
.addCleanup(samdb
.set_minPwdAge
, minPwdAge
)
585 # Set it temporarily to '0'.
586 samdb
.set_minPwdAge('0')
588 def wait_for_ready(self
, pipe
, future
):
589 if pipe
.poll(timeout
=5):
592 # We failed to read a response from the pipe, so see if the test raised
593 # an exception with more information.
595 exception
= future
.exception(timeout
=0)
596 if exception
is not None:
599 self
.fail('test failed to indicate readiness')
601 def test_lockout_transaction_kdc(self
):
602 self
.do_lockout_transaction(connect_kdc
)
604 def test_lockout_transaction_kdc_ntstatus(self
):
605 self
.do_lockout_transaction(partial(connect_kdc
, expect_status
=True))
607 # Test that performing AS‐REQs with accounts in various states of
608 # unusability results in appropriate NTSTATUS and Kerberos error codes.
610 def test_lockout_status_disabled(self
):
611 self
._run
_lockout
_status
(
612 self
._get
_creds
_disabled
(),
613 expected_status
=ntstatus
.NT_STATUS_ACCOUNT_DISABLED
,
614 expected_error
=KDC_ERR_CLIENT_REVOKED
,
617 def test_lockout_status_locked_out(self
):
618 self
._run
_lockout
_status
(
619 self
._get
_creds
_locked
_out
(),
620 expected_status
=ntstatus
.NT_STATUS_ACCOUNT_LOCKED_OUT
,
621 expected_error
=KDC_ERR_CLIENT_REVOKED
,
624 def test_lockout_status_expired(self
):
625 self
._run
_lockout
_status
(
626 self
._get
_creds
_expired
(),
627 expected_status
=ntstatus
.NT_STATUS_ACCOUNT_EXPIRED
,
628 expected_error
=KDC_ERR_CLIENT_REVOKED
,
631 def test_lockout_status_must_change(self
):
632 self
._run
_lockout
_status
(
633 self
._get
_creds
_must
_change
(),
634 expected_status
=ntstatus
.NT_STATUS_PASSWORD_MUST_CHANGE
,
635 expected_error
=KDC_ERR_KEY_EXPIRED
,
638 def test_lockout_status_password_expired(self
):
639 self
._run
_lockout
_status
(
640 self
._get
_creds
_password
_expired
(),
641 expected_status
=ntstatus
.NT_STATUS_PASSWORD_EXPIRED
,
642 expected_error
=KDC_ERR_KEY_EXPIRED
,
645 # Test that performing the same AS‐REQs, this time with FAST, does not
646 # result in NTSTATUS codes.
648 def test_lockout_status_disabled_fast(self
):
649 self
._run
_lockout
_status
_fast
(
650 self
._get
_creds
_disabled
(), expected_error
=KDC_ERR_CLIENT_REVOKED
653 def test_lockout_status_locked_out_fast(self
):
654 self
._run
_lockout
_status
_fast
(
655 self
._get
_creds
_locked
_out
(), expected_error
=KDC_ERR_CLIENT_REVOKED
658 def test_lockout_status_expired_fast(self
):
659 self
._run
_lockout
_status
_fast
(
660 self
._get
_creds
_expired
(), expected_error
=KDC_ERR_CLIENT_REVOKED
663 def test_lockout_status_must_change_fast(self
):
664 self
._run
_lockout
_status
_fast
(
665 self
._get
_creds
_must
_change
(), expected_error
=KDC_ERR_KEY_EXPIRED
668 def test_lockout_status_password_expired_fast(self
):
669 self
._run
_lockout
_status
_fast
(
670 self
._get
_creds
_password
_expired
(), expected_error
=KDC_ERR_KEY_EXPIRED
673 def _get_creds_disabled(self
):
674 return self
.get_cached_creds(
675 account_type
=self
.AccountType
.USER
, opts
={"enabled": False}
678 def _get_creds_locked_out(self
) -> KerberosCredentials
:
679 samdb
= self
.get_samdb()
681 user_creds
= self
.get_cached_creds(
682 account_type
=self
.AccountType
.USER
, use_cache
=False
684 user_dn
= user_creds
.get_dn()
686 # Lock out the account.
688 old_utf16pw
= '"Secret007"'.encode("utf-16le") # invalid pwd
689 new_utf16pw
= '"Secret008"'.encode("utf-16le")
691 msg
= ldb
.Message(user_dn
)
692 msg
["0"] = ldb
.MessageElement(old_utf16pw
, ldb
.FLAG_MOD_DELETE
, "unicodePwd")
693 msg
["1"] = ldb
.MessageElement(new_utf16pw
, ldb
.FLAG_MOD_ADD
, "unicodePwd")
695 for _
in range(self
.lockout_threshold
):
698 except ldb
.LdbError
as err
:
701 # We get an error, but the bad password count should
703 self
.assertEqual(num
, ldb
.ERR_CONSTRAINT_VIOLATION
)
705 self
.fail("pwd change should have failed")
707 # Ensure the account is locked out.
710 user_dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["msDS-User-Account-Control-Computed"]
712 self
.assertEqual(1, len(res
))
714 uac
= int(res
[0].get("msDS-User-Account-Control-Computed", idx
=0))
715 self
.assertTrue(uac
& dsdb
.UF_LOCKOUT
)
719 def _get_creds_expired(self
) -> KerberosCredentials
:
720 return self
.get_cached_creds(
721 account_type
=self
.AccountType
.USER
,
722 opts
={"additional_details": self
.freeze({"accountExpires": "1"})},
725 def _get_creds_must_change(self
) -> KerberosCredentials
:
726 return self
.get_cached_creds(
727 account_type
=self
.AccountType
.USER
,
728 opts
={"additional_details": self
.freeze({"pwdLastSet": "0"})},
731 def _get_creds_password_expired(self
) -> KerberosCredentials
:
732 samdb
= self
.get_samdb()
733 self
.addCleanup(samdb
.set_maxPwdAge
, samdb
.get_maxPwdAge())
735 samdb
.set_maxPwdAge(low_pwd_age
)
737 return self
.get_cached_creds(account_type
=self
.AccountType
.USER
)
739 def _run_lockout_status(
741 user_creds
: KerberosCredentials
,
743 expected_status
: int,
746 user_name
= user_creds
.get_username()
747 cname
= self
.PrincipalName_create(
748 name_type
=NT_PRINCIPAL
, names
=user_name
.split("/")
751 krbtgt_creds
= self
.get_krbtgt_creds()
752 realm
= krbtgt_creds
.get_realm()
754 sname
= self
.get_krbtgt_sname()
756 preauth_key
= self
.PasswordKey_from_creds(user_creds
, kcrypto
.Enctype
.AES256
)
758 ts_enc_padata
= self
.get_enc_timestamp_pa_data_from_key(preauth_key
)
759 padata
= [ts_enc_padata
]
761 def _generate_padata_copy(_kdc_exchange_dict
, _callback_dict
, req_body
):
762 return padata
, req_body
764 kdc_exchange_dict
= self
.as_exchange_dict(
766 expected_crealm
=realm
,
767 expected_cname
=cname
,
768 expected_srealm
=realm
,
769 expected_sname
=sname
,
770 expected_account_name
=user_name
,
771 expected_supported_etypes
=krbtgt_creds
.tgs_supported_enctypes
,
774 expected_status
=expected_status
,
775 ticket_decryption_key
=self
.TicketDecryptionKey_from_creds(krbtgt_creds
),
776 generate_padata_fn
=_generate_padata_copy
,
777 check_error_fn
=self
.generic_check_kdc_error
,
779 check_kdc_private_fn
=self
.generic_check_kdc_private
,
780 expected_error_mode
=expected_error
,
781 expected_salt
=user_creds
.get_salt(),
782 preauth_key
=preauth_key
,
783 kdc_options
=str(krb5_asn1
.KDCOptions("postdated")),
787 # Try making a Kerberos AS-REQ to the KDC. This might fail, either due
788 # to the user's account being locked out or due to using the wrong
790 self
._generic
_kdc
_exchange
(
795 till_time
=self
.get_KerberosTime(offset
=36000),
796 etypes
=self
.get_default_enctypes(user_creds
),
799 def _run_lockout_status_fast(
800 self
, user_creds
: KerberosCredentials
, *, expected_error
: int
802 self
._armored
_as
_req
(
804 self
.get_krbtgt_creds(),
805 self
.get_tgt(self
.get_mach_creds()),
806 expected_error
=expected_error
,
807 expect_edata
=self
.expect_padata_outer
,
808 # FAST‐armored responses never contain an NTSTATUS code.
812 def test_lockout_transaction_ntlm(self
):
813 self
.do_lockout_transaction(connect_ntlm
)
815 def test_lockout_transaction_samr(self
):
816 self
.do_lockout_transaction(connect_samr
)
818 def test_lockout_transaction_samr_aes(self
):
819 self
.do_lockout_transaction(connect_samr_aes
)
821 def test_lockout_transaction_ldap_pw_change(self
):
822 self
.do_lockout_transaction(ldap_pwd_change
)
824 # Tests to ensure we can handle the account being renamed. We do not test
825 # renames with SAMR password changes, because in that case the entire
826 # process happens inside a transaction, and the password change method only
827 # receives the account username. By the time it searches for the account,
828 # it will have already been renamed, and so it will always fail to find the
831 def test_lockout_transaction_rename_kdc(self
):
832 self
.do_lockout_transaction(connect_kdc
, rename
=True)
834 def test_lockout_transaction_rename_kdc_ntstatus(self
):
835 self
.do_lockout_transaction(partial(connect_kdc
, expect_status
=True),
838 def test_lockout_transaction_rename_ntlm(self
):
839 self
.do_lockout_transaction(connect_ntlm
, rename
=True)
841 def test_lockout_transaction_rename_ldap_pw_change(self
):
842 self
.do_lockout_transaction(ldap_pwd_change
, rename
=True)
844 def test_lockout_transaction_bad_pwd_kdc(self
):
845 self
.do_lockout_transaction(connect_kdc
, correct_pw
=False)
847 def test_lockout_transaction_bad_pwd_kdc_ntstatus(self
):
848 self
.do_lockout_transaction(partial(connect_kdc
, expect_status
=True),
851 def test_lockout_transaction_bad_pwd_ntlm(self
):
852 self
.do_lockout_transaction(connect_ntlm
, correct_pw
=False)
854 def test_lockout_transaction_bad_pwd_samr(self
):
855 self
.do_lockout_transaction(connect_samr
, correct_pw
=False)
857 def test_lockout_transaction_bad_pwd_samr_aes(self
):
858 self
.do_lockout_transaction(connect_samr_aes
, correct_pw
=False)
860 def test_lockout_transaction_bad_pwd_ldap_pw_change(self
):
861 self
.do_lockout_transaction(ldap_pwd_change
, correct_pw
=False)
863 def test_bad_pwd_count_transaction_kdc(self
):
864 self
.do_bad_pwd_count_transaction(connect_kdc
)
866 def test_bad_pwd_count_transaction_ntlm(self
):
867 self
.do_bad_pwd_count_transaction(connect_ntlm
)
869 def test_bad_pwd_count_transaction_samr(self
):
870 self
.do_bad_pwd_count_transaction(connect_samr
)
872 def test_bad_pwd_count_transaction_samr_aes(self
):
873 self
.do_bad_pwd_count_transaction(connect_samr_aes
)
875 def test_bad_pwd_count_transaction_ldap_pw_change(self
):
876 self
.do_bad_pwd_count_transaction(ldap_pwd_change
)
878 def test_bad_pwd_count_transaction_rename_kdc(self
):
879 self
.do_bad_pwd_count_transaction(connect_kdc
, rename
=True)
881 def test_bad_pwd_count_transaction_rename_ntlm(self
):
882 self
.do_bad_pwd_count_transaction(connect_ntlm
, rename
=True)
884 def test_bad_pwd_count_transaction_rename_ldap_pw_change(self
):
885 self
.do_bad_pwd_count_transaction(ldap_pwd_change
, rename
=True)
887 def test_lockout_race_kdc(self
):
888 self
.do_lockout_race(connect_kdc
)
890 def test_lockout_race_kdc_ntstatus(self
):
891 self
.do_lockout_race(partial(connect_kdc
, expect_status
=True))
893 def test_lockout_race_ntlm(self
):
894 self
.do_lockout_race(connect_ntlm
)
896 def test_lockout_race_samr(self
):
897 self
.do_lockout_race(connect_samr
)
899 def test_lockout_race_samr_aes(self
):
900 self
.do_lockout_race(connect_samr_aes
)
902 def test_lockout_race_ldap_pw_change(self
):
903 self
.do_lockout_race(ldap_pwd_change
)
905 def test_logon_without_transaction_ntlm(self
):
906 self
.do_logon_without_transaction(connect_ntlm
)
908 # Tests to ensure that the connection functions work correctly in the happy
911 def test_logon_kdc(self
):
912 self
.do_logon(partial(connect_kdc
, expect_error
=False))
914 def test_logon_ntlm(self
):
915 self
.do_logon(connect_ntlm
)
917 def test_logon_samr(self
):
918 self
.do_logon(connect_samr
)
920 def test_logon_samr_aes(self
):
921 self
.do_logon(connect_samr_aes
)
923 def test_logon_ldap_pw_change(self
):
924 self
.do_logon(ldap_pwd_change
)
926 # Test that connection without a correct password works.
927 def do_logon(self
, connect_fn
):
928 # Create the user account for testing.
929 user_creds
= self
.get_cached_creds(account_type
=self
.AccountType
.USER
,
931 admin_creds
= self
.get_admin_creds()
934 # Get a connection to our local SamDB.
935 samdb
= connect_samdb(samdb_url
=lp
.samdb_url(), lp
=lp
,
936 credentials
=admin_creds
)
937 self
.assertLocalSamDB(samdb
)
938 user_dn
= ldb
.Dn(samdb
, str(user_creds
.get_dn()))
940 password
= user_creds
.get_password()
942 # Prepare to connect to the server with a valid password.
943 our_pipe
, their_pipe
= Pipe(duplex
=True)
945 # Inform the test function that it may proceed.
946 our_pipe
.send_bytes(b
'0')
948 result
= connect_fn(pipe
=their_pipe
,
949 url
=f
'ldap://{samdb.host_dns_name()}',
950 hostname
=samdb
.host_dns_name(),
951 username
=user_creds
.get_username(),
953 domain
=user_creds
.get_domain(),
954 realm
=user_creds
.get_realm(),
955 workstation
=user_creds
.get_workstation(),
958 # The connection should succeed.
959 self
.assertEqual(result
, ConnectionResult
.SUCCESS
)
961 # Lock out the account while holding a transaction lock, then release the
962 # lock. A logon attempt already in progress should reread the account
963 # details and recognise the account is locked out. The account can
964 # additionally be renamed within the transaction to ensure that, by using
965 # the GUID, rereading the account's details still succeeds.
966 def do_lockout_transaction(self
, connect_fn
,
969 # Create the user account for testing.
970 user_creds
= self
.get_cached_creds(account_type
=self
.AccountType
.USER
,
973 admin_creds
= self
.get_admin_creds()
976 # Get a connection to our local SamDB.
977 samdb
= connect_samdb(samdb_url
=lp
.samdb_url(), lp
=lp
,
978 credentials
=admin_creds
)
979 self
.assertLocalSamDB(samdb
)
981 user_dn
= ldb
.Dn(samdb
, str(user_creds
.get_dn()))
983 password
= user_creds
.get_password()
985 password
= password
[:-1]
987 # Prepare to connect to the server.
988 with futures
.ProcessPoolExecutor(max_workers
=1) as executor
:
989 our_pipe
, their_pipe
= Pipe(duplex
=True)
990 connect_future
= executor
.submit(
993 url
=f
'ldap://{samdb.host_dns_name()}',
994 hostname
=samdb
.host_dns_name(),
995 username
=user_creds
.get_username(),
997 domain
=user_creds
.get_domain(),
998 realm
=user_creds
.get_realm(),
999 workstation
=user_creds
.get_workstation(),
1002 # Wait until the test process indicates it's ready.
1003 self
.wait_for_ready(our_pipe
, connect_future
)
1005 # Take out a transaction.
1006 samdb
.transaction_start()
1008 # Lock out the account. We must do it using an actual password
1009 # change like so, rather than directly with a database
1010 # modification, so that the account is also added to the
1011 # auxiliary bad password database. Our goal is to get lockouts
1012 # to happen, i.e. password checking.
1014 old_utf16pw
= '"Secret007"'.encode('utf-16le') # invalid pwd
1015 new_utf16pw
= '"Secret008"'.encode('utf-16le')
1017 msg
= ldb
.Message(user_dn
)
1018 msg
['0'] = ldb
.MessageElement(old_utf16pw
,
1019 ldb
.FLAG_MOD_DELETE
,
1021 msg
['1'] = ldb
.MessageElement(new_utf16pw
,
1025 for i
in range(self
.lockout_threshold
):
1028 except ldb
.LdbError
as err
:
1029 num
, estr
= err
.args
1031 # We get an error, but the bad password count should
1033 self
.assertEqual(num
, ldb
.ERR_OPERATIONS_ERROR
)
1034 self
.assertEqual('Failed to obtain remote address for '
1035 'the LDAP client while changing the '
1039 self
.fail('pwd change should have failed')
1041 # Ensure the account is locked out.
1044 user_dn
, scope
=ldb
.SCOPE_BASE
,
1045 attrs
=['msDS-User-Account-Control-Computed'])
1046 self
.assertEqual(1, len(res
))
1048 uac
= int(res
[0].get('msDS-User-Account-Control-Computed',
1050 self
.assertTrue(uac
& dsdb
.UF_LOCKOUT
)
1052 # Now the bad password database has been updated, inform the
1053 # test process that it may proceed.
1054 our_pipe
.send_bytes(b
'0')
1056 # Wait one second to ensure the test process hits the
1061 # While we're at it, rename the account to ensure that is
1062 # also safe if a race occurs.
1063 msg
= ldb
.Message(user_dn
)
1064 new_username
= self
.get_new_username()
1065 msg
['sAMAccountName'] = ldb
.MessageElement(
1067 ldb
.FLAG_MOD_REPLACE
,
1072 samdb
.transaction_cancel()
1075 # Commit the local transaction.
1076 samdb
.transaction_commit()
1078 result
= connect_future
.result(timeout
=5)
1079 self
.assertEqual(result
, ConnectionResult
.LOCKED_OUT
)
1081 # Update the bad password count while holding a transaction lock, then
1082 # release the lock. A logon attempt already in progress should reread the
1083 # account details and ensure the bad password count is atomically
1084 # updated. The account can additionally be renamed within the transaction
1085 # to ensure that, by using the GUID, rereading the account's details still
1087 def do_bad_pwd_count_transaction(self
, connect_fn
, rename
=False):
1088 # Create the user account for testing.
1089 user_creds
= self
.get_cached_creds(account_type
=self
.AccountType
.USER
,
1092 admin_creds
= self
.get_admin_creds()
1095 # Get a connection to our local SamDB.
1096 samdb
= connect_samdb(samdb_url
=lp
.samdb_url(), lp
=lp
,
1097 credentials
=admin_creds
)
1098 self
.assertLocalSamDB(samdb
)
1099 user_dn
= ldb
.Dn(samdb
, str(user_creds
.get_dn()))
1101 # Prepare to connect to the server with an invalid password.
1102 with futures
.ProcessPoolExecutor(max_workers
=1) as executor
:
1103 our_pipe
, their_pipe
= Pipe(duplex
=True)
1104 connect_future
= executor
.submit(
1107 url
=f
'ldap://{samdb.host_dns_name()}',
1108 hostname
=samdb
.host_dns_name(),
1109 username
=user_creds
.get_username(),
1110 password
=user_creds
.get_password()[:-1], # invalid password
1111 domain
=user_creds
.get_domain(),
1112 realm
=user_creds
.get_realm(),
1113 workstation
=user_creds
.get_workstation(),
1116 # Wait until the test process indicates it's ready.
1117 self
.wait_for_ready(our_pipe
, connect_future
)
1119 # Take out a transaction.
1120 samdb
.transaction_start()
1122 # Inform the test process that it may proceed.
1123 our_pipe
.send_bytes(b
'0')
1125 # Wait one second to ensure the test process hits the
1129 # Set badPwdCount to 1.
1130 msg
= ldb
.Message(user_dn
)
1131 now
= int(time
.time())
1132 bad_pwd_time
= unix2nttime(now
)
1133 msg
['badPwdCount'] = ldb
.MessageElement(
1135 ldb
.FLAG_MOD_REPLACE
,
1137 msg
['badPasswordTime'] = ldb
.MessageElement(
1139 ldb
.FLAG_MOD_REPLACE
,
1142 # While we're at it, rename the account to ensure that is
1143 # also safe if a race occurs.
1144 new_username
= self
.get_new_username()
1145 msg
['sAMAccountName'] = ldb
.MessageElement(
1147 ldb
.FLAG_MOD_REPLACE
,
1151 # Ensure the account is not yet locked out.
1154 user_dn
, scope
=ldb
.SCOPE_BASE
,
1155 attrs
=['msDS-User-Account-Control-Computed'])
1156 self
.assertEqual(1, len(res
))
1158 uac
= int(res
[0].get('msDS-User-Account-Control-Computed',
1160 self
.assertFalse(uac
& dsdb
.UF_LOCKOUT
)
1162 samdb
.transaction_cancel()
1165 # Commit the local transaction.
1166 samdb
.transaction_commit()
1168 result
= connect_future
.result(timeout
=5)
1169 self
.assertEqual(result
, ConnectionResult
.WRONG_PASSWORD
, result
)
1171 # Check that badPwdCount has now increased to 2.
1173 res
= samdb
.search(user_dn
,
1174 scope
=ldb
.SCOPE_BASE
,
1175 attrs
=['badPwdCount'])
1176 self
.assertEqual(1, len(res
))
1178 bad_pwd_count
= int(res
[0].get('badPwdCount', idx
=0))
1179 self
.assertEqual(2, bad_pwd_count
)
1181 # Attempt to log in to the account with an incorrect password, using
1182 # lockoutThreshold+1 simultaneous attempts. We should get three 'wrong
1183 # password' errors and one 'locked out' error, showing that the bad
1184 # password count is checked and incremented atomically.
1185 def do_lockout_race(self
, connect_fn
):
1186 # Create the user account for testing.
1187 user_creds
= self
.get_cached_creds(account_type
=self
.AccountType
.USER
,
1190 admin_creds
= self
.get_admin_creds()
1193 # Get a connection to our local SamDB.
1194 samdb
= connect_samdb(samdb_url
=lp
.samdb_url(), lp
=lp
,
1195 credentials
=admin_creds
)
1196 self
.assertLocalSamDB(samdb
)
1197 user_dn
= ldb
.Dn(samdb
, str(user_creds
.get_dn()))
1199 # Prepare to connect to the server with an invalid password, using four
1200 # simultaneous requests. Only three of those attempts should get
1201 # through before the account is locked out.
1202 num_attempts
= self
.lockout_threshold
+ 1
1203 with futures
.ProcessPoolExecutor(max_workers
=num_attempts
) as executor
:
1204 connect_futures
= []
1206 for i
in range(num_attempts
):
1207 our_pipe
, their_pipe
= Pipe(duplex
=True)
1208 our_pipes
.append(our_pipe
)
1210 connect_future
= executor
.submit(
1213 url
=f
'ldap://{samdb.host_dns_name()}',
1214 hostname
=samdb
.host_dns_name(),
1215 username
=user_creds
.get_username(),
1216 password
=user_creds
.get_password()[:-1], # invalid pw
1217 domain
=user_creds
.get_domain(),
1218 realm
=user_creds
.get_realm(),
1219 workstation
=user_creds
.get_workstation(),
1221 connect_futures
.append(connect_future
)
1223 # Wait until the test process indicates it's ready.
1224 self
.wait_for_ready(our_pipe
, connect_future
)
1226 # Take out a transaction.
1227 samdb
.transaction_start()
1229 # Inform the test processes that they may proceed.
1230 for our_pipe
in our_pipes
:
1231 our_pipe
.send_bytes(b
'0')
1233 # Wait one second to ensure the test processes hit the
1237 samdb
.transaction_cancel()
1240 # Commit the local transaction.
1241 samdb
.transaction_commit()
1245 for i
, connect_future
in enumerate(connect_futures
):
1246 result
= connect_future
.result(timeout
=5)
1247 if result
== ConnectionResult
.LOCKED_OUT
:
1249 elif result
== ConnectionResult
.WRONG_PASSWORD
:
1250 wrong_passwords
+= 1
1252 self
.fail(f
'process {i} gave an unexpected result '
1255 self
.assertEqual(wrong_passwords
, self
.lockout_threshold
)
1256 self
.assertEqual(lockouts
, num_attempts
- self
.lockout_threshold
)
1258 # Ensure the account is now locked out.
1261 user_dn
, scope
=ldb
.SCOPE_BASE
,
1262 attrs
=['badPwdCount',
1263 'msDS-User-Account-Control-Computed'])
1264 self
.assertEqual(1, len(res
))
1266 bad_pwd_count
= int(res
[0].get('badPwdCount', idx
=0))
1267 self
.assertEqual(self
.lockout_threshold
, bad_pwd_count
)
1269 uac
= int(res
[0].get('msDS-User-Account-Control-Computed',
1271 self
.assertTrue(uac
& dsdb
.UF_LOCKOUT
)
1273 # Test that logon is possible even while we locally hold a transaction
1274 # lock. This test only works with NTLM authentication; Kerberos
1275 # authentication must take out a transaction to update the logonCount
1276 # attribute, and LDAP and SAMR password changes both take out a transaction
1277 # to effect the password change. NTLM is the only logon method that does
1278 # not require a transaction, and can thus be performed while we're holding
1280 def do_logon_without_transaction(self
, connect_fn
):
1281 # Create the user account for testing.
1282 user_creds
= self
.get_cached_creds(account_type
=self
.AccountType
.USER
,
1285 admin_creds
= self
.get_admin_creds()
1288 # Get a connection to our local SamDB.
1289 samdb
= connect_samdb(samdb_url
=lp
.samdb_url(), lp
=lp
,
1290 credentials
=admin_creds
)
1291 self
.assertLocalSamDB(samdb
)
1292 user_dn
= ldb
.Dn(samdb
, str(user_creds
.get_dn()))
1293 password
= user_creds
.get_password()
1295 # Prepare to connect to the server with a valid password.
1296 with futures
.ProcessPoolExecutor(max_workers
=1) as executor
:
1297 our_pipe
, their_pipe
= Pipe(duplex
=True)
1298 connect_future
= executor
.submit(
1301 url
=f
'ldap://{samdb.host_dns_name()}',
1302 hostname
=samdb
.host_dns_name(),
1303 username
=user_creds
.get_username(),
1305 domain
=user_creds
.get_domain(),
1306 realm
=user_creds
.get_realm(),
1307 workstation
=user_creds
.get_workstation(),
1310 # Wait until the test process indicates it's ready.
1311 self
.wait_for_ready(our_pipe
, connect_future
)
1313 # Take out a transaction.
1314 samdb
.transaction_start()
1316 # Inform the test process that it may proceed.
1317 our_pipe
.send_bytes(b
'0')
1319 # The connection should succeed, despite our holding a
1321 result
= connect_future
.result(timeout
=5)
1322 self
.assertEqual(result
, ConnectionResult
.SUCCESS
)
1324 samdb
.transaction_cancel()
1327 # Commit the local transaction.
1328 samdb
.transaction_commit()
1331 if __name__
== '__main__':
1332 global_asn1_print
= False
1333 global_hexdump
= False