s3: torture: adjust SMB1 cli_splice() test sizes
[Samba.git] / python / samba / tests / py_credentials.py
blobc29afd1cf704c93230426a8716c05fa8d5b62483
1 # Integration tests for pycredentials
3 # Copyright (C) Catalyst IT Ltd. 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from samba.tests import TestCase, delete_force
19 import os
21 import samba
22 from samba.auth import system_session
23 from samba.credentials import (
24 Credentials,
25 CLI_CRED_NTLMv2_AUTH,
26 CLI_CRED_NTLM_AUTH,
27 DONT_USE_KERBEROS)
28 from samba.dcerpc import netlogon, ntlmssp, srvsvc
29 from samba.dcerpc.netlogon import (
30 netr_Authenticator,
31 netr_WorkstationInformation,
32 MSV1_0_ALLOW_MSVCHAPV2
34 from samba.dcerpc.misc import SEC_CHAN_WKSTA
35 from samba.dsdb import (
36 UF_WORKSTATION_TRUST_ACCOUNT,
37 UF_PASSWD_NOTREQD,
38 UF_NORMAL_ACCOUNT)
39 from samba.ndr import ndr_pack
40 from samba.samdb import SamDB
41 from samba import NTSTATUSError, ntstatus
42 import ctypes
44 """
45 Integration tests for pycredentials
46 """
48 MACHINE_NAME = "PCTM"
49 USER_NAME = "PCTU"
51 class PyCredentialsTests(TestCase):
53 def setUp(self):
54 super(PyCredentialsTests, self).setUp()
56 self.server = os.environ["SERVER"]
57 self.domain = os.environ["DOMAIN"]
58 self.host = os.environ["SERVER_IP"]
59 self.lp = self.get_loadparm()
61 self.credentials = self.get_credentials()
63 self.session = system_session()
64 self.ldb = SamDB(url="ldap://%s" % self.host,
65 session_info=self.session,
66 credentials=self.credentials,
67 lp=self.lp)
69 self.create_machine_account()
70 self.create_user_account()
73 def tearDown(self):
74 super(PyCredentialsTests, self).tearDown()
75 delete_force(self.ldb, self.machine_dn)
76 delete_force(self.ldb, self.user_dn)
78 # Until a successful netlogon connection has been established there will
79 # not be a valid authenticator associated with the credentials
80 # and new_client_authenticator should throw a ValueError
81 def test_no_netlogon_connection(self):
82 self.assertRaises(ValueError,
83 self.machine_creds.new_client_authenticator)
85 # Once a netlogon connection has been established,
86 # new_client_authenticator should return a value
88 def test_have_netlogon_connection(self):
89 c = self.get_netlogon_connection()
90 a = self.machine_creds.new_client_authenticator()
91 self.assertIsNotNone(a)
93 # Get an authenticator and use it on a sequence of operations requiring
94 # an authenticator
95 def test_client_authenticator(self):
96 c = self.get_netlogon_connection()
97 (authenticator, subsequent) = self.get_authenticator(c)
98 self.do_NetrLogonSamLogonWithFlags(c, authenticator, subsequent)
99 (authenticator, subsequent) = self.get_authenticator(c)
100 self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
101 (authenticator, subsequent) = self.get_authenticator(c)
102 self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
103 (authenticator, subsequent) = self.get_authenticator(c)
104 self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
107 def test_SamLogonEx(self):
108 c = self.get_netlogon_connection()
110 logon = samlogon_logon_info(self.domain,
111 self.machine_name,
112 self.user_creds)
114 logon_level = netlogon.NetlogonNetworkTransitiveInformation
115 validation_level = netlogon.NetlogonValidationSamInfo4
116 netr_flags = 0
118 try:
119 c.netr_LogonSamLogonEx(self.server,
120 self.user_creds.get_workstation(),
121 logon_level,
122 logon,
123 validation_level,
124 netr_flags)
125 except NTSTATUSError as e:
126 enum = ctypes.c_uint32(e.args[0]).value
127 if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
128 self.fail("got wrong password error")
129 else:
130 raise
132 def test_SamLogonEx_no_domain(self):
133 c = self.get_netlogon_connection()
135 self.user_creds.set_domain('')
137 logon = samlogon_logon_info(self.domain,
138 self.machine_name,
139 self.user_creds)
141 logon_level = netlogon.NetlogonNetworkTransitiveInformation
142 validation_level = netlogon.NetlogonValidationSamInfo4
143 netr_flags = 0
145 try:
146 c.netr_LogonSamLogonEx(self.server,
147 self.user_creds.get_workstation(),
148 logon_level,
149 logon,
150 validation_level,
151 netr_flags)
152 except NTSTATUSError as e:
153 enum = ctypes.c_uint32(e.args[0]).value
154 if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
155 self.fail("got wrong password error")
156 else:
157 self.fail("got unexpected error" + str(e))
159 def test_SamLogonExNTLM(self):
160 c = self.get_netlogon_connection()
162 logon = samlogon_logon_info(self.domain,
163 self.machine_name,
164 self.user_creds,
165 flags=CLI_CRED_NTLM_AUTH)
167 logon_level = netlogon.NetlogonNetworkTransitiveInformation
168 validation_level = netlogon.NetlogonValidationSamInfo4
169 netr_flags = 0
171 try:
172 c.netr_LogonSamLogonEx(self.server,
173 self.user_creds.get_workstation(),
174 logon_level,
175 logon,
176 validation_level,
177 netr_flags)
178 except NTSTATUSError as e:
179 enum = ctypes.c_uint32(e.args[0]).value
180 if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
181 self.fail("got wrong password error")
182 else:
183 raise
185 def test_SamLogonExMSCHAPv2(self):
186 c = self.get_netlogon_connection()
188 logon = samlogon_logon_info(self.domain,
189 self.machine_name,
190 self.user_creds,
191 flags=CLI_CRED_NTLM_AUTH)
193 logon.identity_info.parameter_control = MSV1_0_ALLOW_MSVCHAPV2
195 logon_level = netlogon.NetlogonNetworkTransitiveInformation
196 validation_level = netlogon.NetlogonValidationSamInfo4
197 netr_flags = 0
199 try:
200 c.netr_LogonSamLogonEx(self.server,
201 self.user_creds.get_workstation(),
202 logon_level,
203 logon,
204 validation_level,
205 netr_flags)
206 except NTSTATUSError as e:
207 enum = ctypes.c_uint32(e.args[0]).value
208 if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
209 self.fail("got wrong password error")
210 else:
211 raise
214 # Test Credentials.encrypt_netr_crypt_password
215 # By performing a NetrServerPasswordSet2
216 # And the logging on using the new password.
217 def test_encrypt_netr_password(self):
218 # Change the password
219 self.do_Netr_ServerPasswordSet2()
220 # Now use the new password to perform an operation
221 srvsvc.srvsvc("ncacn_np:%s" % (self.server),
222 self.lp,
223 self.machine_creds)
226 # Change the current machine account password with a
227 # netr_ServerPasswordSet2 call.
229 def do_Netr_ServerPasswordSet2(self):
230 c = self.get_netlogon_connection()
231 (authenticator, subsequent) = self.get_authenticator(c)
232 PWD_LEN = 32
233 DATA_LEN = 512
234 newpass = samba.generate_random_password(PWD_LEN, PWD_LEN)
235 encoded = newpass.encode('utf-16-le')
236 pwd_len = len(encoded)
237 filler = [ord(x) for x in os.urandom(DATA_LEN-pwd_len)]
238 pwd = netlogon.netr_CryptPassword()
239 pwd.length = pwd_len
240 pwd.data = filler + [ord(x) for x in encoded]
241 self.machine_creds.encrypt_netr_crypt_password(pwd)
242 c.netr_ServerPasswordSet2(self.server,
243 self.machine_creds.get_workstation(),
244 SEC_CHAN_WKSTA,
245 self.machine_name,
246 authenticator,
247 pwd)
249 self.machine_pass = newpass
250 self.machine_creds.set_password(newpass)
252 # Establish sealed schannel netlogon connection over TCP/IP
254 def get_netlogon_connection(self):
255 return netlogon.netlogon("ncacn_ip_tcp:%s[schannel,seal]" % self.server,
256 self.lp,
257 self.machine_creds)
260 # Create the machine account
261 def create_machine_account(self):
262 self.machine_pass = samba.generate_random_password(32, 32)
263 self.machine_name = MACHINE_NAME
264 self.machine_dn = "cn=%s,%s" % (self.machine_name, self.ldb.domain_dn())
266 # remove the account if it exists, this will happen if a previous test
267 # run failed
268 delete_force(self.ldb, self.machine_dn)
270 utf16pw = unicode(
271 '"' + self.machine_pass.encode('utf-8') + '"', 'utf-8'
272 ).encode('utf-16-le')
273 self.ldb.add({
274 "dn": self.machine_dn,
275 "objectclass": "computer",
276 "sAMAccountName": "%s$" % self.machine_name,
277 "userAccountControl":
278 str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
279 "unicodePwd": utf16pw})
281 self.machine_creds = Credentials()
282 self.machine_creds.guess(self.get_loadparm())
283 self.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
284 self.machine_creds.set_kerberos_state(DONT_USE_KERBEROS)
285 self.machine_creds.set_password(self.machine_pass)
286 self.machine_creds.set_username(self.machine_name + "$")
287 self.machine_creds.set_workstation(self.machine_name)
290 # Create a test user account
291 def create_user_account(self):
292 self.user_pass = samba.generate_random_password(32, 32)
293 self.user_name = USER_NAME
294 self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn())
296 # remove the account if it exists, this will happen if a previous test
297 # run failed
298 delete_force(self.ldb, self.user_dn)
300 utf16pw = unicode(
301 '"' + self.user_pass.encode('utf-8') + '"', 'utf-8'
302 ).encode('utf-16-le')
303 self.ldb.add({
304 "dn": self.user_dn,
305 "objectclass": "user",
306 "sAMAccountName": "%s" % self.user_name,
307 "userAccountControl": str(UF_NORMAL_ACCOUNT),
308 "unicodePwd": utf16pw})
310 self.user_creds = Credentials()
311 self.user_creds.guess(self.get_loadparm())
312 self.user_creds.set_password(self.user_pass)
313 self.user_creds.set_username(self.user_name)
314 self.user_creds.set_workstation(self.machine_name)
315 pass
318 # Get the authenticator from the machine creds.
319 def get_authenticator(self, c):
320 auth = self.machine_creds.new_client_authenticator();
321 current = netr_Authenticator()
322 current.cred.data = [ord(x) for x in auth["credential"]]
323 current.timestamp = auth["timestamp"]
325 subsequent = netr_Authenticator()
326 return (current, subsequent)
328 def do_NetrLogonSamLogonWithFlags(self, c, current, subsequent):
329 logon = samlogon_logon_info(self.domain,
330 self.machine_name,
331 self.user_creds)
333 logon_level = netlogon.NetlogonNetworkTransitiveInformation
334 validation_level = netlogon.NetlogonValidationSamInfo4
335 netr_flags = 0
336 c.netr_LogonSamLogonWithFlags(self.server,
337 self.user_creds.get_workstation(),
338 current,
339 subsequent,
340 logon_level,
341 logon,
342 validation_level,
343 netr_flags)
345 def do_NetrLogonGetDomainInfo(self, c, current, subsequent):
346 query = netr_WorkstationInformation()
348 c.netr_LogonGetDomainInfo(self.server,
349 self.user_creds.get_workstation(),
350 current,
351 subsequent,
353 query)
356 # Build the logon data required by NetrLogonSamLogonWithFlags
357 def samlogon_logon_info(domain_name, computer_name, creds,
358 flags=CLI_CRED_NTLMv2_AUTH):
360 target_info_blob = samlogon_target(domain_name, computer_name)
362 challenge = b"abcdefgh"
363 # User account under test
364 response = creds.get_ntlm_response(flags=flags,
365 challenge=challenge,
366 target_info=target_info_blob)
368 logon = netlogon.netr_NetworkInfo()
370 logon.challenge = [ord(x) for x in challenge]
371 logon.nt = netlogon.netr_ChallengeResponse()
372 logon.nt.length = len(response["nt_response"])
373 logon.nt.data = [ord(x) for x in response["nt_response"]]
374 logon.identity_info = netlogon.netr_IdentityInfo()
376 (username, domain) = creds.get_ntlm_username_domain()
377 logon.identity_info.domain_name.string = domain
378 logon.identity_info.account_name.string = username
379 logon.identity_info.workstation.string = creds.get_workstation()
381 return logon
384 # Build the samlogon target info.
385 def samlogon_target(domain_name, computer_name):
386 target_info = ntlmssp.AV_PAIR_LIST()
387 target_info.count = 3
388 computername = ntlmssp.AV_PAIR()
389 computername.AvId = ntlmssp.MsvAvNbComputerName
390 computername.Value = computer_name
392 domainname = ntlmssp.AV_PAIR()
393 domainname.AvId = ntlmssp.MsvAvNbDomainName
394 domainname.Value = domain_name
396 eol = ntlmssp.AV_PAIR()
397 eol.AvId = ntlmssp.MsvAvEOL
398 target_info.pair = [domainname, computername, eol]
400 return ndr_pack(target_info)