1 # Integration tests for pycredentials
3 # Copyright (C) Catalyst IT Ltd. 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from samba
.tests
import TestCase
, delete_force
22 from samba
.auth
import system_session
23 from samba
.credentials
import (
28 from samba
.dcerpc
import netlogon
, ntlmssp
, srvsvc
29 from samba
.dcerpc
.netlogon
import (
31 netr_WorkstationInformation
,
32 MSV1_0_ALLOW_MSVCHAPV2
34 from samba
.dcerpc
.misc
import SEC_CHAN_WKSTA
35 from samba
.dsdb
import (
36 UF_WORKSTATION_TRUST_ACCOUNT
,
39 from samba
.ndr
import ndr_pack
40 from samba
.samdb
import SamDB
41 from samba
import NTSTATUSError
, ntstatus
45 Integration tests for pycredentials
51 class PyCredentialsTests(TestCase
):
54 super(PyCredentialsTests
, self
).setUp()
56 self
.server
= os
.environ
["SERVER"]
57 self
.domain
= os
.environ
["DOMAIN"]
58 self
.host
= os
.environ
["SERVER_IP"]
59 self
.lp
= self
.get_loadparm()
61 self
.credentials
= self
.get_credentials()
63 self
.session
= system_session()
64 self
.ldb
= SamDB(url
="ldap://%s" % self
.host
,
65 session_info
=self
.session
,
66 credentials
=self
.credentials
,
69 self
.create_machine_account()
70 self
.create_user_account()
74 super(PyCredentialsTests
, self
).tearDown()
75 delete_force(self
.ldb
, self
.machine_dn
)
76 delete_force(self
.ldb
, self
.user_dn
)
78 # Until a successful netlogon connection has been established there will
79 # not be a valid authenticator associated with the credentials
80 # and new_client_authenticator should throw a ValueError
81 def test_no_netlogon_connection(self
):
82 self
.assertRaises(ValueError,
83 self
.machine_creds
.new_client_authenticator
)
85 # Once a netlogon connection has been established,
86 # new_client_authenticator should return a value
88 def test_have_netlogon_connection(self
):
89 c
= self
.get_netlogon_connection()
90 a
= self
.machine_creds
.new_client_authenticator()
91 self
.assertIsNotNone(a
)
93 # Get an authenticator and use it on a sequence of operations requiring
95 def test_client_authenticator(self
):
96 c
= self
.get_netlogon_connection()
97 (authenticator
, subsequent
) = self
.get_authenticator(c
)
98 self
.do_NetrLogonSamLogonWithFlags(c
, authenticator
, subsequent
)
99 (authenticator
, subsequent
) = self
.get_authenticator(c
)
100 self
.do_NetrLogonGetDomainInfo(c
, authenticator
, subsequent
)
101 (authenticator
, subsequent
) = self
.get_authenticator(c
)
102 self
.do_NetrLogonGetDomainInfo(c
, authenticator
, subsequent
)
103 (authenticator
, subsequent
) = self
.get_authenticator(c
)
104 self
.do_NetrLogonGetDomainInfo(c
, authenticator
, subsequent
)
107 def test_SamLogonEx(self
):
108 c
= self
.get_netlogon_connection()
110 logon
= samlogon_logon_info(self
.domain
,
114 logon_level
= netlogon
.NetlogonNetworkTransitiveInformation
115 validation_level
= netlogon
.NetlogonValidationSamInfo4
119 c
.netr_LogonSamLogonEx(self
.server
,
120 self
.user_creds
.get_workstation(),
125 except NTSTATUSError
as e
:
126 enum
= ctypes
.c_uint32(e
.args
[0]).value
127 if enum
== ntstatus
.NT_STATUS_WRONG_PASSWORD
:
128 self
.fail("got wrong password error")
132 def test_SamLogonEx_no_domain(self
):
133 c
= self
.get_netlogon_connection()
135 self
.user_creds
.set_domain('')
137 logon
= samlogon_logon_info(self
.domain
,
141 logon_level
= netlogon
.NetlogonNetworkTransitiveInformation
142 validation_level
= netlogon
.NetlogonValidationSamInfo4
146 c
.netr_LogonSamLogonEx(self
.server
,
147 self
.user_creds
.get_workstation(),
152 except NTSTATUSError
as e
:
153 enum
= ctypes
.c_uint32(e
.args
[0]).value
154 if enum
== ntstatus
.NT_STATUS_WRONG_PASSWORD
:
155 self
.fail("got wrong password error")
157 self
.fail("got unexpected error" + str(e
))
159 def test_SamLogonExNTLM(self
):
160 c
= self
.get_netlogon_connection()
162 logon
= samlogon_logon_info(self
.domain
,
165 flags
=CLI_CRED_NTLM_AUTH
)
167 logon_level
= netlogon
.NetlogonNetworkTransitiveInformation
168 validation_level
= netlogon
.NetlogonValidationSamInfo4
172 c
.netr_LogonSamLogonEx(self
.server
,
173 self
.user_creds
.get_workstation(),
178 except NTSTATUSError
as e
:
179 enum
= ctypes
.c_uint32(e
.args
[0]).value
180 if enum
== ntstatus
.NT_STATUS_WRONG_PASSWORD
:
181 self
.fail("got wrong password error")
185 def test_SamLogonExMSCHAPv2(self
):
186 c
= self
.get_netlogon_connection()
188 logon
= samlogon_logon_info(self
.domain
,
191 flags
=CLI_CRED_NTLM_AUTH
)
193 logon
.identity_info
.parameter_control
= MSV1_0_ALLOW_MSVCHAPV2
195 logon_level
= netlogon
.NetlogonNetworkTransitiveInformation
196 validation_level
= netlogon
.NetlogonValidationSamInfo4
200 c
.netr_LogonSamLogonEx(self
.server
,
201 self
.user_creds
.get_workstation(),
206 except NTSTATUSError
as e
:
207 enum
= ctypes
.c_uint32(e
.args
[0]).value
208 if enum
== ntstatus
.NT_STATUS_WRONG_PASSWORD
:
209 self
.fail("got wrong password error")
214 # Test Credentials.encrypt_netr_crypt_password
215 # By performing a NetrServerPasswordSet2
216 # And the logging on using the new password.
217 def test_encrypt_netr_password(self
):
218 # Change the password
219 self
.do_Netr_ServerPasswordSet2()
220 # Now use the new password to perform an operation
221 srvsvc
.srvsvc("ncacn_np:%s" % (self
.server
),
226 # Change the current machine account password with a
227 # netr_ServerPasswordSet2 call.
229 def do_Netr_ServerPasswordSet2(self
):
230 c
= self
.get_netlogon_connection()
231 (authenticator
, subsequent
) = self
.get_authenticator(c
)
234 newpass
= samba
.generate_random_password(PWD_LEN
, PWD_LEN
)
235 encoded
= newpass
.encode('utf-16-le')
236 pwd_len
= len(encoded
)
237 filler
= [ord(x
) for x
in os
.urandom(DATA_LEN
-pwd_len
)]
238 pwd
= netlogon
.netr_CryptPassword()
240 pwd
.data
= filler
+ [ord(x
) for x
in encoded
]
241 self
.machine_creds
.encrypt_netr_crypt_password(pwd
)
242 c
.netr_ServerPasswordSet2(self
.server
,
243 self
.machine_creds
.get_workstation(),
249 self
.machine_pass
= newpass
250 self
.machine_creds
.set_password(newpass
)
252 # Establish sealed schannel netlogon connection over TCP/IP
254 def get_netlogon_connection(self
):
255 return netlogon
.netlogon("ncacn_ip_tcp:%s[schannel,seal]" % self
.server
,
260 # Create the machine account
261 def create_machine_account(self
):
262 self
.machine_pass
= samba
.generate_random_password(32, 32)
263 self
.machine_name
= MACHINE_NAME
264 self
.machine_dn
= "cn=%s,%s" % (self
.machine_name
, self
.ldb
.domain_dn())
266 # remove the account if it exists, this will happen if a previous test
268 delete_force(self
.ldb
, self
.machine_dn
)
271 '"' + self
.machine_pass
.encode('utf-8') + '"', 'utf-8'
272 ).encode('utf-16-le')
274 "dn": self
.machine_dn
,
275 "objectclass": "computer",
276 "sAMAccountName": "%s$" % self
.machine_name
,
277 "userAccountControl":
278 str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD
),
279 "unicodePwd": utf16pw
})
281 self
.machine_creds
= Credentials()
282 self
.machine_creds
.guess(self
.get_loadparm())
283 self
.machine_creds
.set_secure_channel_type(SEC_CHAN_WKSTA
)
284 self
.machine_creds
.set_kerberos_state(DONT_USE_KERBEROS
)
285 self
.machine_creds
.set_password(self
.machine_pass
)
286 self
.machine_creds
.set_username(self
.machine_name
+ "$")
287 self
.machine_creds
.set_workstation(self
.machine_name
)
290 # Create a test user account
291 def create_user_account(self
):
292 self
.user_pass
= samba
.generate_random_password(32, 32)
293 self
.user_name
= USER_NAME
294 self
.user_dn
= "cn=%s,%s" % (self
.user_name
, self
.ldb
.domain_dn())
296 # remove the account if it exists, this will happen if a previous test
298 delete_force(self
.ldb
, self
.user_dn
)
301 '"' + self
.user_pass
.encode('utf-8') + '"', 'utf-8'
302 ).encode('utf-16-le')
305 "objectclass": "user",
306 "sAMAccountName": "%s" % self
.user_name
,
307 "userAccountControl": str(UF_NORMAL_ACCOUNT
),
308 "unicodePwd": utf16pw
})
310 self
.user_creds
= Credentials()
311 self
.user_creds
.guess(self
.get_loadparm())
312 self
.user_creds
.set_password(self
.user_pass
)
313 self
.user_creds
.set_username(self
.user_name
)
314 self
.user_creds
.set_workstation(self
.machine_name
)
318 # Get the authenticator from the machine creds.
319 def get_authenticator(self
, c
):
320 auth
= self
.machine_creds
.new_client_authenticator();
321 current
= netr_Authenticator()
322 current
.cred
.data
= [ord(x
) for x
in auth
["credential"]]
323 current
.timestamp
= auth
["timestamp"]
325 subsequent
= netr_Authenticator()
326 return (current
, subsequent
)
328 def do_NetrLogonSamLogonWithFlags(self
, c
, current
, subsequent
):
329 logon
= samlogon_logon_info(self
.domain
,
333 logon_level
= netlogon
.NetlogonNetworkTransitiveInformation
334 validation_level
= netlogon
.NetlogonValidationSamInfo4
336 c
.netr_LogonSamLogonWithFlags(self
.server
,
337 self
.user_creds
.get_workstation(),
345 def do_NetrLogonGetDomainInfo(self
, c
, current
, subsequent
):
346 query
= netr_WorkstationInformation()
348 c
.netr_LogonGetDomainInfo(self
.server
,
349 self
.user_creds
.get_workstation(),
356 # Build the logon data required by NetrLogonSamLogonWithFlags
357 def samlogon_logon_info(domain_name
, computer_name
, creds
,
358 flags
=CLI_CRED_NTLMv2_AUTH
):
360 target_info_blob
= samlogon_target(domain_name
, computer_name
)
362 challenge
= b
"abcdefgh"
363 # User account under test
364 response
= creds
.get_ntlm_response(flags
=flags
,
366 target_info
=target_info_blob
)
368 logon
= netlogon
.netr_NetworkInfo()
370 logon
.challenge
= [ord(x
) for x
in challenge
]
371 logon
.nt
= netlogon
.netr_ChallengeResponse()
372 logon
.nt
.length
= len(response
["nt_response"])
373 logon
.nt
.data
= [ord(x
) for x
in response
["nt_response"]]
374 logon
.identity_info
= netlogon
.netr_IdentityInfo()
376 (username
, domain
) = creds
.get_ntlm_username_domain()
377 logon
.identity_info
.domain_name
.string
= domain
378 logon
.identity_info
.account_name
.string
= username
379 logon
.identity_info
.workstation
.string
= creds
.get_workstation()
384 # Build the samlogon target info.
385 def samlogon_target(domain_name
, computer_name
):
386 target_info
= ntlmssp
.AV_PAIR_LIST()
387 target_info
.count
= 3
388 computername
= ntlmssp
.AV_PAIR()
389 computername
.AvId
= ntlmssp
.MsvAvNbComputerName
390 computername
.Value
= computer_name
392 domainname
= ntlmssp
.AV_PAIR()
393 domainname
.AvId
= ntlmssp
.MsvAvNbDomainName
394 domainname
.Value
= domain_name
396 eol
= ntlmssp
.AV_PAIR()
397 eol
.AvId
= ntlmssp
.MsvAvEOL
398 target_info
.pair
= [domainname
, computername
, eol
]
400 return ndr_pack(target_info
)