docs-xml: "cluster addresses" dns registration
[Samba.git] / python / samba / tests / auth_log.py
blobdaf088f2f32dd916702126d97e7f46a01b47d9be
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
19 """Tests for the Auth and AuthZ logging.
20 """
21 import samba.tests
22 from samba.dcerpc import srvsvc, dnsserver
23 import os
24 from samba.samba3 import libsmb_samba_internal as libsmb
25 from samba.samba3 import param as s3param
26 from samba.samdb import SamDB
27 import samba.tests.auth_log_base
28 from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS
29 from samba import NTSTATUSError
30 from subprocess import call
31 from ldb import LdbError
32 from samba.dcerpc.windows_event_ids import (
33 EVT_ID_SUCCESSFUL_LOGON,
34 EVT_ID_UNSUCCESSFUL_LOGON,
35 EVT_LOGON_NETWORK,
36 EVT_LOGON_INTERACTIVE,
37 EVT_LOGON_NETWORK_CLEAR_TEXT
39 import re
42 class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase):
44 def setUp(self):
45 super(AuthLogTests, self).setUp()
46 self.remoteAddress = os.environ["CLIENT_IP"]
48 def tearDown(self):
49 super(AuthLogTests, self).tearDown()
51 def smb_connection(self, creds, use_spnego="yes", ntlmv2_auth="yes",
52 force_smb1=False):
53 # the SMB bindings rely on having a s3 loadparm
54 lp = self.get_loadparm()
55 s3_lp = s3param.get_context()
56 s3_lp.load(lp.configfile)
58 # Allow the testcase to skip SPNEGO or use NTLMv1
59 s3_lp.set("client use spnego", use_spnego)
60 s3_lp.set("client ntlmv2 auth", ntlmv2_auth)
62 return libsmb.Conn(self.server, "sysvol", lp=s3_lp, creds=creds,
63 force_smb1=force_smb1)
65 def _test_rpc_ncacn_np(self, authTypes, creds, service,
66 binding, protection, checkFunction):
67 def isLastExpectedMessage(msg):
68 return (msg["type"] == "Authorization" and
69 (msg["Authorization"]["serviceDescription"] == "DCE/RPC" or
70 msg["Authorization"]["serviceDescription"] == service) and
71 msg["Authorization"]["authType"] == authTypes[0] and
72 msg["Authorization"]["transportProtection"] == protection)
74 if binding:
75 binding = "[%s]" % binding
77 if service == "dnsserver":
78 x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding),
79 self.get_loadparm(),
80 creds)
81 elif service == "srvsvc":
82 x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding),
83 self.get_loadparm(),
84 creds)
86 # The connection is passed to ensure the server
87 # messaging context stays up until all the messages have been received.
88 messages = self.waitForMessages(isLastExpectedMessage, x)
89 checkFunction(messages, authTypes, service, binding, protection)
91 def _assert_ncacn_np_serviceDescription(self, binding, serviceDescription):
92 # Turn "[foo,bar]" into a list ("foo", "bar") to test
93 # lambda x: x removes anything that evaluates to False,
94 # including empty strings, so we handle "" as well
95 binding_list = \
96 list(filter(lambda x: x, re.compile('[\[,\]]').split(binding)))
98 # Handle explicit smb2, smb1 or auto negotiation
99 if "smb2" in binding_list:
100 self.assertEquals(serviceDescription, "SMB2")
101 elif "smb1" in binding_list:
102 self.assertEquals(serviceDescription, "SMB")
103 else:
104 self.assertIn(serviceDescription, ["SMB", "SMB2"])
106 def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service,
107 binding, protection):
109 expected_messages = len(authTypes)
110 self.assertEquals(expected_messages,
111 len(messages),
112 "Did not receive the expected number of messages")
114 # Check the first message it should be an Authentication
115 msg = messages[0]
116 self.assertEquals("Authentication", msg["type"])
117 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
118 self.assertEquals(
119 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
120 self.assertEquals(
121 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
122 self._assert_ncacn_np_serviceDescription(
123 binding, msg["Authentication"]["serviceDescription"])
124 self.assertEquals(authTypes[1],
125 msg["Authentication"]["authDescription"])
127 # Check the second message it should be an Authorization
128 msg = messages[1]
129 self.assertEquals("Authorization", msg["type"])
130 self._assert_ncacn_np_serviceDescription(
131 binding, msg["Authorization"]["serviceDescription"])
132 self.assertEquals(authTypes[2], msg["Authorization"]["authType"])
133 self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
134 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
136 # Check the third message it should be an Authentication
137 # if we are expecting 4 messages
138 if expected_messages == 4:
139 def checkServiceDescription(desc):
140 return (desc == "DCE/RPC" or desc == service)
142 msg = messages[2]
143 self.assertEquals("Authentication", msg["type"])
144 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
145 self.assertTrue(
146 checkServiceDescription(
147 msg["Authentication"]["serviceDescription"]))
149 self.assertEquals(authTypes[3],
150 msg["Authentication"]["authDescription"])
151 self.assertEquals(
152 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
153 self.assertEquals(
154 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
156 def rpc_ncacn_np_krb5_check(
157 self,
158 messages,
159 authTypes,
160 service,
161 binding,
162 protection):
164 expected_messages = len(authTypes)
165 self.assertEquals(expected_messages,
166 len(messages),
167 "Did not receive the expected number of messages")
169 # Check the first message it should be an Authentication
170 # This is almost certainly Authentication over UDP, and is probably
171 # returning message too big,
172 msg = messages[0]
173 self.assertEquals("Authentication", msg["type"])
174 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
175 self.assertEquals("Kerberos KDC",
176 msg["Authentication"]["serviceDescription"])
177 self.assertEquals(authTypes[1],
178 msg["Authentication"]["authDescription"])
179 self.assertEquals(
180 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
181 self.assertEquals(
182 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
184 # Check the second message it should be an Authentication
185 # This this the TCP Authentication in response to the message too big
186 # response to the UDP Authentication
187 msg = messages[1]
188 self.assertEquals("Authentication", msg["type"])
189 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
190 self.assertEquals("Kerberos KDC",
191 msg["Authentication"]["serviceDescription"])
192 self.assertEquals(authTypes[2],
193 msg["Authentication"]["authDescription"])
194 self.assertEquals(
195 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
196 self.assertEquals(
197 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
199 # Check the third message it should be an Authorization
200 msg = messages[2]
201 self.assertEquals("Authorization", msg["type"])
202 self._assert_ncacn_np_serviceDescription(
203 binding, msg["Authorization"]["serviceDescription"])
204 self.assertEquals(authTypes[3], msg["Authorization"]["authType"])
205 self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
206 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
208 def test_rpc_ncacn_np_ntlm_dns_sign(self):
209 creds = self.insta_creds(template=self.get_credentials(),
210 kerberos_state=DONT_USE_KERBEROS)
211 self._test_rpc_ncacn_np(["NTLMSSP",
212 "NTLMSSP",
213 "NTLMSSP",
214 "NTLMSSP"],
215 creds, "dnsserver", "sign", "SIGN",
216 self.rpc_ncacn_np_ntlm_check)
218 def test_rpc_ncacn_np_ntlm_srv_sign(self):
219 creds = self.insta_creds(template=self.get_credentials(),
220 kerberos_state=DONT_USE_KERBEROS)
221 self._test_rpc_ncacn_np(["NTLMSSP",
222 "NTLMSSP",
223 "NTLMSSP",
224 "NTLMSSP"],
225 creds, "srvsvc", "sign", "SIGN",
226 self.rpc_ncacn_np_ntlm_check)
228 def test_rpc_ncacn_np_ntlm_dns(self):
229 creds = self.insta_creds(template=self.get_credentials(),
230 kerberos_state=DONT_USE_KERBEROS)
231 self._test_rpc_ncacn_np(["ncacn_np",
232 "NTLMSSP",
233 "NTLMSSP"],
234 creds, "dnsserver", "", "SMB",
235 self.rpc_ncacn_np_ntlm_check)
237 def test_rpc_ncacn_np_ntlm_srv(self):
238 creds = self.insta_creds(template=self.get_credentials(),
239 kerberos_state=DONT_USE_KERBEROS)
240 self._test_rpc_ncacn_np(["ncacn_np",
241 "NTLMSSP",
242 "NTLMSSP"],
243 creds, "srvsvc", "", "SMB",
244 self.rpc_ncacn_np_ntlm_check)
246 def test_rpc_ncacn_np_krb_dns_sign(self):
247 creds = self.insta_creds(template=self.get_credentials(),
248 kerberos_state=MUST_USE_KERBEROS)
249 self._test_rpc_ncacn_np(["krb5",
250 "ENC-TS Pre-authentication",
251 "ENC-TS Pre-authentication",
252 "krb5"],
253 creds, "dnsserver", "sign", "SIGN",
254 self.rpc_ncacn_np_krb5_check)
256 def test_rpc_ncacn_np_krb_srv_sign(self):
257 creds = self.insta_creds(template=self.get_credentials(),
258 kerberos_state=MUST_USE_KERBEROS)
259 self._test_rpc_ncacn_np(["krb5",
260 "ENC-TS Pre-authentication",
261 "ENC-TS Pre-authentication",
262 "krb5"],
263 creds, "srvsvc", "sign", "SIGN",
264 self.rpc_ncacn_np_krb5_check)
266 def test_rpc_ncacn_np_krb_dns(self):
267 creds = self.insta_creds(template=self.get_credentials(),
268 kerberos_state=MUST_USE_KERBEROS)
269 self._test_rpc_ncacn_np(["ncacn_np",
270 "ENC-TS Pre-authentication",
271 "ENC-TS Pre-authentication",
272 "krb5"],
273 creds, "dnsserver", "", "SMB",
274 self.rpc_ncacn_np_krb5_check)
276 def test_rpc_ncacn_np_krb_dns_smb2(self):
277 creds = self.insta_creds(template=self.get_credentials(),
278 kerberos_state=MUST_USE_KERBEROS)
279 self._test_rpc_ncacn_np(["ncacn_np",
280 "ENC-TS Pre-authentication",
281 "ENC-TS Pre-authentication",
282 "krb5"],
283 creds, "dnsserver", "smb2", "SMB",
284 self.rpc_ncacn_np_krb5_check)
286 def test_rpc_ncacn_np_krb_srv(self):
287 creds = self.insta_creds(template=self.get_credentials(),
288 kerberos_state=MUST_USE_KERBEROS)
289 self._test_rpc_ncacn_np(["ncacn_np",
290 "ENC-TS Pre-authentication",
291 "ENC-TS Pre-authentication",
292 "krb5"],
293 creds, "srvsvc", "", "SMB",
294 self.rpc_ncacn_np_krb5_check)
296 def _test_rpc_ncacn_ip_tcp(self, authTypes, creds, service,
297 binding, protection, checkFunction):
298 def isLastExpectedMessage(msg):
299 return (msg["type"] == "Authorization" and
300 msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
301 msg["Authorization"]["authType"] == authTypes[0] and
302 msg["Authorization"]["transportProtection"] == protection)
304 if binding:
305 binding = "[%s]" % binding
307 if service == "dnsserver":
308 conn = dnsserver.dnsserver(
309 "ncacn_ip_tcp:%s%s" % (self.server, binding),
310 self.get_loadparm(),
311 creds)
312 elif service == "srvsvc":
313 conn = srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding),
314 self.get_loadparm(),
315 creds)
317 messages = self.waitForMessages(isLastExpectedMessage, conn)
318 checkFunction(messages, authTypes, service, binding, protection)
320 def rpc_ncacn_ip_tcp_ntlm_check(self, messages, authTypes, service,
321 binding, protection):
323 expected_messages = len(authTypes)
324 self.assertEquals(expected_messages,
325 len(messages),
326 "Did not receive the expected number of messages")
328 # Check the first message it should be an Authorization
329 msg = messages[0]
330 self.assertEquals("Authorization", msg["type"])
331 self.assertEquals("DCE/RPC",
332 msg["Authorization"]["serviceDescription"])
333 self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
334 self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
335 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
337 # Check the second message it should be an Authentication
338 msg = messages[1]
339 self.assertEquals("Authentication", msg["type"])
340 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
341 self.assertEquals("DCE/RPC",
342 msg["Authentication"]["serviceDescription"])
343 self.assertEquals(authTypes[2],
344 msg["Authentication"]["authDescription"])
345 self.assertEquals(
346 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
347 self.assertEquals(
348 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
350 def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service,
351 binding, protection):
353 expected_messages = len(authTypes)
354 self.assertEquals(expected_messages,
355 len(messages),
356 "Did not receive the expected number of messages")
358 # Check the first message it should be an Authorization
359 msg = messages[0]
360 self.assertEquals("Authorization", msg["type"])
361 self.assertEquals("DCE/RPC",
362 msg["Authorization"]["serviceDescription"])
363 self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
364 self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
365 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
367 # Check the second message it should be an Authentication
368 msg = messages[1]
369 self.assertEquals("Authentication", msg["type"])
370 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
371 self.assertEquals("Kerberos KDC",
372 msg["Authentication"]["serviceDescription"])
373 self.assertEquals(authTypes[2],
374 msg["Authentication"]["authDescription"])
375 self.assertEquals(
376 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
377 self.assertEquals(
378 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
380 # Check the third message it should be an Authentication
381 msg = messages[2]
382 self.assertEquals("Authentication", msg["type"])
383 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
384 self.assertEquals("Kerberos KDC",
385 msg["Authentication"]["serviceDescription"])
386 self.assertEquals(authTypes[2],
387 msg["Authentication"]["authDescription"])
388 self.assertEquals(
389 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
390 self.assertEquals(
391 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
393 def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self):
394 creds = self.insta_creds(template=self.get_credentials(),
395 kerberos_state=DONT_USE_KERBEROS)
396 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
397 "ncacn_ip_tcp",
398 "NTLMSSP"],
399 creds, "dnsserver", "sign", "SIGN",
400 self.rpc_ncacn_ip_tcp_ntlm_check)
402 def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self):
403 creds = self.insta_creds(template=self.get_credentials(),
404 kerberos_state=MUST_USE_KERBEROS)
405 self._test_rpc_ncacn_ip_tcp(["krb5",
406 "ncacn_ip_tcp",
407 "ENC-TS Pre-authentication",
408 "ENC-TS Pre-authentication"],
409 creds, "dnsserver", "sign", "SIGN",
410 self.rpc_ncacn_ip_tcp_krb5_check)
412 def test_rpc_ncacn_ip_tcp_ntlm_dns(self):
413 creds = self.insta_creds(template=self.get_credentials(),
414 kerberos_state=DONT_USE_KERBEROS)
415 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
416 "ncacn_ip_tcp",
417 "NTLMSSP"],
418 creds, "dnsserver", "", "SIGN",
419 self.rpc_ncacn_ip_tcp_ntlm_check)
421 def test_rpc_ncacn_ip_tcp_krb5_dns(self):
422 creds = self.insta_creds(template=self.get_credentials(),
423 kerberos_state=MUST_USE_KERBEROS)
424 self._test_rpc_ncacn_ip_tcp(["krb5",
425 "ncacn_ip_tcp",
426 "ENC-TS Pre-authentication",
427 "ENC-TS Pre-authentication"],
428 creds, "dnsserver", "", "SIGN",
429 self.rpc_ncacn_ip_tcp_krb5_check)
431 def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self):
432 creds = self.insta_creds(template=self.get_credentials(),
433 kerberos_state=DONT_USE_KERBEROS)
434 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
435 "ncacn_ip_tcp",
436 "NTLMSSP"],
437 creds, "dnsserver", "connect", "NONE",
438 self.rpc_ncacn_ip_tcp_ntlm_check)
440 def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self):
441 creds = self.insta_creds(template=self.get_credentials(),
442 kerberos_state=MUST_USE_KERBEROS)
443 self._test_rpc_ncacn_ip_tcp(["krb5",
444 "ncacn_ip_tcp",
445 "ENC-TS Pre-authentication",
446 "ENC-TS Pre-authentication"],
447 creds, "dnsserver", "connect", "NONE",
448 self.rpc_ncacn_ip_tcp_krb5_check)
450 def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self):
451 creds = self.insta_creds(template=self.get_credentials(),
452 kerberos_state=DONT_USE_KERBEROS)
453 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
454 "ncacn_ip_tcp",
455 "NTLMSSP"],
456 creds, "dnsserver", "seal", "SEAL",
457 self.rpc_ncacn_ip_tcp_ntlm_check)
459 def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self):
460 creds = self.insta_creds(template=self.get_credentials(),
461 kerberos_state=MUST_USE_KERBEROS)
462 self._test_rpc_ncacn_ip_tcp(["krb5",
463 "ncacn_ip_tcp",
464 "ENC-TS Pre-authentication",
465 "ENC-TS Pre-authentication"],
466 creds, "dnsserver", "seal", "SEAL",
467 self.rpc_ncacn_ip_tcp_krb5_check)
469 def test_ldap(self):
471 def isLastExpectedMessage(msg):
472 return (msg["type"] == "Authorization" and
473 msg["Authorization"]["serviceDescription"] == "LDAP" and
474 msg["Authorization"]["transportProtection"] == "SIGN" and
475 msg["Authorization"]["authType"] == "krb5")
477 self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"],
478 lp=self.get_loadparm(),
479 credentials=self.get_credentials())
481 messages = self.waitForMessages(isLastExpectedMessage)
482 self.assertEquals(3,
483 len(messages),
484 "Did not receive the expected number of messages")
486 # Check the first message it should be an Authentication
487 msg = messages[0]
488 self.assertEquals("Authentication", msg["type"])
489 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
490 self.assertEquals("Kerberos KDC",
491 msg["Authentication"]["serviceDescription"])
492 self.assertEquals("ENC-TS Pre-authentication",
493 msg["Authentication"]["authDescription"])
494 self.assertTrue(msg["Authentication"]["duration"] > 0)
495 self.assertEquals(
496 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
497 self.assertEquals(
498 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
500 # Check the second message it should be an Authentication
501 msg = messages[1]
502 self.assertEquals("Authentication", msg["type"])
503 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
504 self.assertEquals("Kerberos KDC",
505 msg["Authentication"]["serviceDescription"])
506 self.assertEquals("ENC-TS Pre-authentication",
507 msg["Authentication"]["authDescription"])
508 self.assertTrue(msg["Authentication"]["duration"] > 0)
509 self.assertEquals(
510 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
511 self.assertEquals(
512 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
514 def test_ldap_ntlm(self):
516 def isLastExpectedMessage(msg):
517 return (msg["type"] == "Authorization" and
518 msg["Authorization"]["serviceDescription"] == "LDAP" and
519 msg["Authorization"]["transportProtection"] == "SEAL" and
520 msg["Authorization"]["authType"] == "NTLMSSP")
522 self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"],
523 lp=self.get_loadparm(),
524 credentials=self.get_credentials())
526 messages = self.waitForMessages(isLastExpectedMessage)
527 self.assertEquals(2,
528 len(messages),
529 "Did not receive the expected number of messages")
530 # Check the first message it should be an Authentication
531 msg = messages[0]
532 self.assertEquals("Authentication", msg["type"])
533 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
534 self.assertEquals("LDAP",
535 msg["Authentication"]["serviceDescription"])
536 self.assertEquals("NTLMSSP", msg["Authentication"]["authDescription"])
537 self.assertTrue(msg["Authentication"]["duration"] > 0)
538 self.assertEquals(
539 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
540 self.assertEquals(
541 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
543 def test_ldap_simple_bind(self):
544 def isLastExpectedMessage(msg):
545 return (msg["type"] == "Authorization" and
546 msg["Authorization"]["serviceDescription"] == "LDAP" and
547 msg["Authorization"]["transportProtection"] == "TLS" and
548 msg["Authorization"]["authType"] == "simple bind")
550 creds = self.insta_creds(template=self.get_credentials())
551 creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
552 creds.get_username()))
554 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
555 lp=self.get_loadparm(),
556 credentials=creds)
558 messages = self.waitForMessages(isLastExpectedMessage)
559 self.assertEquals(2,
560 len(messages),
561 "Did not receive the expected number of messages")
563 # Check the first message it should be an Authentication
564 msg = messages[0]
565 self.assertEquals("Authentication", msg["type"])
566 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
567 self.assertEquals("LDAP",
568 msg["Authentication"]["serviceDescription"])
569 self.assertEquals("simple bind",
570 msg["Authentication"]["authDescription"])
571 self.assertEquals(
572 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
573 self.assertEquals(
574 EVT_LOGON_NETWORK_CLEAR_TEXT, msg["Authentication"]["logonType"])
576 def test_ldap_simple_bind_bad_password(self):
577 def isLastExpectedMessage(msg):
578 return (msg["type"] == "Authentication" and
579 msg["Authentication"]["serviceDescription"] == "LDAP" and
580 (msg["Authentication"]["status"] ==
581 "NT_STATUS_WRONG_PASSWORD") and
582 (msg["Authentication"]["authDescription"] ==
583 "simple bind") and
584 (msg["Authentication"]["eventId"] ==
585 EVT_ID_UNSUCCESSFUL_LOGON) and
586 (msg["Authentication"]["logonType"] ==
587 EVT_LOGON_NETWORK_CLEAR_TEXT))
589 creds = self.insta_creds(template=self.get_credentials())
590 creds.set_password("badPassword")
591 creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
592 creds.get_username()))
594 thrown = False
595 try:
596 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
597 lp=self.get_loadparm(),
598 credentials=creds)
599 except LdbError:
600 thrown = True
601 self.assertEquals(thrown, True)
603 messages = self.waitForMessages(isLastExpectedMessage)
604 self.assertEquals(1,
605 len(messages),
606 "Did not receive the expected number of messages")
608 def test_ldap_simple_bind_bad_user(self):
609 def isLastExpectedMessage(msg):
610 return (msg["type"] == "Authentication" and
611 msg["Authentication"]["serviceDescription"] == "LDAP" and
612 (msg["Authentication"]["status"] ==
613 "NT_STATUS_NO_SUCH_USER") and
614 (msg["Authentication"]["authDescription"] ==
615 "simple bind") and
616 (msg["Authentication"]["eventId"] ==
617 EVT_ID_UNSUCCESSFUL_LOGON) and
618 (msg["Authentication"]["logonType"] ==
619 EVT_LOGON_NETWORK_CLEAR_TEXT))
621 creds = self.insta_creds(template=self.get_credentials())
622 creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "badUser"))
624 thrown = False
625 try:
626 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
627 lp=self.get_loadparm(),
628 credentials=creds)
629 except LdbError:
630 thrown = True
631 self.assertEquals(thrown, True)
633 messages = self.waitForMessages(isLastExpectedMessage)
634 self.assertEquals(1,
635 len(messages),
636 "Did not receive the expected number of messages")
638 def test_ldap_simple_bind_unparseable_user(self):
639 def isLastExpectedMessage(msg):
640 return (msg["type"] == "Authentication" and
641 msg["Authentication"]["serviceDescription"] == "LDAP" and
642 (msg["Authentication"]["status"] ==
643 "NT_STATUS_NO_SUCH_USER") and
644 (msg["Authentication"]["authDescription"] ==
645 "simple bind") and
646 (msg["Authentication"]["eventId"] ==
647 EVT_ID_UNSUCCESSFUL_LOGON) and
648 (msg["Authentication"]["logonType"] ==
649 EVT_LOGON_NETWORK_CLEAR_TEXT))
651 creds = self.insta_creds(template=self.get_credentials())
652 creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "abdcef"))
654 thrown = False
655 try:
656 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
657 lp=self.get_loadparm(),
658 credentials=creds)
659 except LdbError:
660 thrown = True
661 self.assertEquals(thrown, True)
663 messages = self.waitForMessages(isLastExpectedMessage)
664 self.assertEquals(1,
665 len(messages),
666 "Did not receive the expected number of messages")
669 # Note: as this test does not expect any messages it will
670 # time out in the call to self.waitForMessages.
671 # This is expected, but it will slow this test.
672 def test_ldap_anonymous_access_bind_only(self):
673 # Should be no logging for anonymous bind
674 # so receiving any message indicates a failure.
675 def isLastExpectedMessage(msg):
676 return True
678 creds = self.insta_creds(template=self.get_credentials())
679 creds.set_anonymous()
681 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
682 lp=self.get_loadparm(),
683 credentials=creds)
685 messages = self.waitForMessages(isLastExpectedMessage)
686 self.assertEquals(0,
687 len(messages),
688 "Did not receive the expected number of messages")
690 def test_ldap_anonymous_access(self):
691 def isLastExpectedMessage(msg):
692 return (msg["type"] == "Authorization" and
693 msg["Authorization"]["serviceDescription"] == "LDAP" and
694 msg["Authorization"]["transportProtection"] == "TLS" and
695 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
696 msg["Authorization"]["authType"] == "no bind")
698 creds = self.insta_creds(template=self.get_credentials())
699 creds.set_anonymous()
701 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
702 lp=self.get_loadparm(),
703 credentials=creds)
705 try:
706 self.samdb.search(base=self.samdb.domain_dn())
707 self.fail("Expected an LdbError exception")
708 except LdbError:
709 pass
711 messages = self.waitForMessages(isLastExpectedMessage)
712 self.assertEquals(1,
713 len(messages),
714 "Did not receive the expected number of messages")
716 def test_smb(self):
717 def isLastExpectedMessage(msg):
718 return (msg["type"] == "Authorization" and
719 "SMB" in msg["Authorization"]["serviceDescription"] and
720 msg["Authorization"]["authType"] == "krb5" and
721 msg["Authorization"]["transportProtection"] == "SMB")
723 creds = self.insta_creds(template=self.get_credentials())
724 self.smb_connection(creds)
726 messages = self.waitForMessages(isLastExpectedMessage)
727 self.assertEquals(3,
728 len(messages),
729 "Did not receive the expected number of messages")
730 # Check the first message it should be an Authentication
731 msg = messages[0]
732 self.assertEquals("Authentication", msg["type"])
733 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
734 self.assertEquals("Kerberos KDC",
735 msg["Authentication"]["serviceDescription"])
736 self.assertEquals("ENC-TS Pre-authentication",
737 msg["Authentication"]["authDescription"])
738 self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
739 msg["Authentication"]["eventId"])
740 self.assertEquals(EVT_LOGON_NETWORK,
741 msg["Authentication"]["logonType"])
743 # Check the second message it should be an Authentication
744 msg = messages[1]
745 self.assertEquals("Authentication", msg["type"])
746 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
747 self.assertEquals("Kerberos KDC",
748 msg["Authentication"]["serviceDescription"])
749 self.assertEquals("ENC-TS Pre-authentication",
750 msg["Authentication"]["authDescription"])
751 self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
752 msg["Authentication"]["eventId"])
753 self.assertEquals(EVT_LOGON_NETWORK,
754 msg["Authentication"]["logonType"])
756 def test_smb_bad_password(self):
757 def isLastExpectedMessage(msg):
758 return (msg["type"] == "Authentication" and
759 (msg["Authentication"]["serviceDescription"] ==
760 "Kerberos KDC") and
761 (msg["Authentication"]["status"] ==
762 "NT_STATUS_WRONG_PASSWORD") and
763 (msg["Authentication"]["authDescription"] ==
764 "ENC-TS Pre-authentication"))
766 creds = self.insta_creds(template=self.get_credentials())
767 creds.set_kerberos_state(MUST_USE_KERBEROS)
768 creds.set_password("badPassword")
770 thrown = False
771 try:
772 self.smb_connection(creds)
773 except NTSTATUSError:
774 thrown = True
775 self.assertEquals(thrown, True)
777 messages = self.waitForMessages(isLastExpectedMessage)
778 self.assertEquals(1,
779 len(messages),
780 "Did not receive the expected number of messages")
782 def test_smb_bad_user(self):
783 def isLastExpectedMessage(msg):
784 return (msg["type"] == "Authentication" and
785 (msg["Authentication"]["serviceDescription"] ==
786 "Kerberos KDC") and
787 (msg["Authentication"]["status"] ==
788 "NT_STATUS_NO_SUCH_USER") and
789 (msg["Authentication"]["authDescription"] ==
790 "ENC-TS Pre-authentication") and
791 (msg["Authentication"]["eventId"] ==
792 EVT_ID_UNSUCCESSFUL_LOGON) and
793 (msg["Authentication"]["logonType"] ==
794 EVT_LOGON_NETWORK))
796 creds = self.insta_creds(template=self.get_credentials())
797 creds.set_kerberos_state(MUST_USE_KERBEROS)
798 creds.set_username("badUser")
800 thrown = False
801 try:
802 self.smb_connection(creds)
803 except NTSTATUSError:
804 thrown = True
805 self.assertEquals(thrown, True)
807 messages = self.waitForMessages(isLastExpectedMessage)
808 self.assertEquals(1,
809 len(messages),
810 "Did not receive the expected number of messages")
812 def test_smb1_anonymous(self):
813 def isLastExpectedMessage(msg):
814 return (msg["type"] == "Authorization" and
815 msg["Authorization"]["serviceDescription"] == "SMB" and
816 msg["Authorization"]["authType"] == "NTLMSSP" and
817 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
818 msg["Authorization"]["transportProtection"] == "SMB")
820 server = os.environ["SERVER"]
822 path = "//%s/IPC$" % server
823 auth = "-N"
824 call(["bin/smbclient", path, auth, "-mNT1", "-c quit"])
826 messages = self.waitForMessages(isLastExpectedMessage)
827 self.assertEquals(3,
828 len(messages),
829 "Did not receive the expected number of messages")
831 # Check the first message it should be an Authentication
832 msg = messages[0]
833 self.assertEquals("Authentication", msg["type"])
834 self.assertEquals("NT_STATUS_NO_SUCH_USER",
835 msg["Authentication"]["status"])
836 self.assertEquals("SMB",
837 msg["Authentication"]["serviceDescription"])
838 self.assertEquals("NTLMSSP",
839 msg["Authentication"]["authDescription"])
840 self.assertEquals("No-Password",
841 msg["Authentication"]["passwordType"])
842 self.assertEquals(EVT_ID_UNSUCCESSFUL_LOGON,
843 msg["Authentication"]["eventId"])
844 self.assertEquals(EVT_LOGON_NETWORK,
845 msg["Authentication"]["logonType"])
847 # Check the second message it should be an Authentication
848 msg = messages[1]
849 self.assertEquals("Authentication", msg["type"])
850 self.assertEquals("NT_STATUS_OK",
851 msg["Authentication"]["status"])
852 self.assertEquals("SMB",
853 msg["Authentication"]["serviceDescription"])
854 self.assertEquals("NTLMSSP",
855 msg["Authentication"]["authDescription"])
856 self.assertEquals("No-Password",
857 msg["Authentication"]["passwordType"])
858 self.assertEquals("ANONYMOUS LOGON",
859 msg["Authentication"]["becameAccount"])
860 self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
861 msg["Authentication"]["eventId"])
862 self.assertEquals(EVT_LOGON_NETWORK,
863 msg["Authentication"]["logonType"])
865 def test_smb2_anonymous(self):
866 def isLastExpectedMessage(msg):
867 return (msg["type"] == "Authorization" and
868 msg["Authorization"]["serviceDescription"] == "SMB2" and
869 msg["Authorization"]["authType"] == "NTLMSSP" and
870 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
871 msg["Authorization"]["transportProtection"] == "SMB")
873 server = os.environ["SERVER"]
875 path = "//%s/IPC$" % server
876 auth = "-N"
877 call(["bin/smbclient", path, auth, "-mSMB3", "-c quit"])
879 messages = self.waitForMessages(isLastExpectedMessage)
880 self.assertEquals(3,
881 len(messages),
882 "Did not receive the expected number of messages")
884 # Check the first message it should be an Authentication
885 msg = messages[0]
886 self.assertEquals("Authentication", msg["type"])
887 self.assertEquals("NT_STATUS_NO_SUCH_USER",
888 msg["Authentication"]["status"])
889 self.assertEquals("SMB2",
890 msg["Authentication"]["serviceDescription"])
891 self.assertEquals("NTLMSSP",
892 msg["Authentication"]["authDescription"])
893 self.assertEquals("No-Password",
894 msg["Authentication"]["passwordType"])
895 self.assertEquals(EVT_ID_UNSUCCESSFUL_LOGON,
896 msg["Authentication"]["eventId"])
897 self.assertEquals(EVT_LOGON_NETWORK,
898 msg["Authentication"]["logonType"])
900 # Check the second message it should be an Authentication
901 msg = messages[1]
902 self.assertEquals("Authentication", msg["type"])
903 self.assertEquals("NT_STATUS_OK",
904 msg["Authentication"]["status"])
905 self.assertEquals("SMB2",
906 msg["Authentication"]["serviceDescription"])
907 self.assertEquals("NTLMSSP",
908 msg["Authentication"]["authDescription"])
909 self.assertEquals("No-Password",
910 msg["Authentication"]["passwordType"])
911 self.assertEquals("ANONYMOUS LOGON",
912 msg["Authentication"]["becameAccount"])
913 self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
914 msg["Authentication"]["eventId"])
915 self.assertEquals(EVT_LOGON_NETWORK,
916 msg["Authentication"]["logonType"])
918 def test_smb_no_krb_spnego(self):
919 def isLastExpectedMessage(msg):
920 return (msg["type"] == "Authorization" and
921 "SMB" in msg["Authorization"]["serviceDescription"] and
922 msg["Authorization"]["authType"] == "NTLMSSP" and
923 msg["Authorization"]["transportProtection"] == "SMB")
925 creds = self.insta_creds(template=self.get_credentials(),
926 kerberos_state=DONT_USE_KERBEROS)
927 self.smb_connection(creds)
929 messages = self.waitForMessages(isLastExpectedMessage)
930 self.assertEquals(2,
931 len(messages),
932 "Did not receive the expected number of messages")
933 # Check the first message it should be an Authentication
934 msg = messages[0]
935 self.assertEquals("Authentication", msg["type"])
936 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
937 self.assertIn(msg["Authentication"]["serviceDescription"],
938 ["SMB", "SMB2"])
939 self.assertEquals("NTLMSSP",
940 msg["Authentication"]["authDescription"])
941 self.assertEquals("NTLMv2",
942 msg["Authentication"]["passwordType"])
943 self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
944 msg["Authentication"]["eventId"])
945 self.assertEquals(EVT_LOGON_NETWORK,
946 msg["Authentication"]["logonType"])
948 def test_smb_no_krb_spnego_bad_password(self):
949 def isLastExpectedMessage(msg):
950 return (msg["type"] == "Authentication" and
951 "SMB" in msg["Authentication"]["serviceDescription"] and
952 msg["Authentication"]["authDescription"] == "NTLMSSP" and
953 msg["Authentication"]["passwordType"] == "NTLMv2" and
954 (msg["Authentication"]["status"] ==
955 "NT_STATUS_WRONG_PASSWORD") and
956 (msg["Authentication"]["eventId"] ==
957 EVT_ID_UNSUCCESSFUL_LOGON) and
958 (msg["Authentication"]["logonType"] ==
959 EVT_LOGON_NETWORK))
961 creds = self.insta_creds(template=self.get_credentials(),
962 kerberos_state=DONT_USE_KERBEROS)
963 creds.set_password("badPassword")
965 thrown = False
966 try:
967 self.smb_connection(creds)
968 except NTSTATUSError:
969 thrown = True
970 self.assertEquals(thrown, True)
972 messages = self.waitForMessages(isLastExpectedMessage)
973 self.assertEquals(1,
974 len(messages),
975 "Did not receive the expected number of messages")
977 def test_smb_no_krb_spnego_bad_user(self):
978 def isLastExpectedMessage(msg):
979 return (msg["type"] == "Authentication" and
980 "SMB" in msg["Authentication"]["serviceDescription"] and
981 msg["Authentication"]["authDescription"] == "NTLMSSP" and
982 msg["Authentication"]["passwordType"] == "NTLMv2" and
983 (msg["Authentication"]["status"] ==
984 "NT_STATUS_NO_SUCH_USER") and
985 (msg["Authentication"]["eventId"] ==
986 EVT_ID_UNSUCCESSFUL_LOGON) and
987 (msg["Authentication"]["logonType"] ==
988 EVT_LOGON_NETWORK))
990 creds = self.insta_creds(template=self.get_credentials(),
991 kerberos_state=DONT_USE_KERBEROS)
992 creds.set_username("badUser")
994 thrown = False
995 try:
996 self.smb_connection(creds)
997 except NTSTATUSError:
998 thrown = True
999 self.assertEquals(thrown, True)
1001 messages = self.waitForMessages(isLastExpectedMessage)
1002 self.assertEquals(1,
1003 len(messages),
1004 "Did not receive the expected number of messages")
1006 def test_smb_no_krb_no_spnego_no_ntlmv2(self):
1007 def isLastExpectedMessage(msg):
1008 return (msg["type"] == "Authorization" and
1009 msg["Authorization"]["serviceDescription"] == "SMB" and
1010 msg["Authorization"]["authType"] == "bare-NTLM" and
1011 msg["Authorization"]["transportProtection"] == "SMB")
1013 creds = self.insta_creds(template=self.get_credentials(),
1014 kerberos_state=DONT_USE_KERBEROS)
1015 self.smb_connection(creds,
1016 force_smb1=True,
1017 ntlmv2_auth="no",
1018 use_spnego="no")
1020 messages = self.waitForMessages(isLastExpectedMessage)
1021 self.assertEquals(2,
1022 len(messages),
1023 "Did not receive the expected number of messages")
1024 # Check the first message it should be an Authentication
1025 msg = messages[0]
1026 self.assertEquals("Authentication", msg["type"])
1027 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
1028 self.assertEquals("SMB",
1029 msg["Authentication"]["serviceDescription"])
1030 self.assertEquals("bare-NTLM",
1031 msg["Authentication"]["authDescription"])
1032 self.assertEquals("NTLMv1",
1033 msg["Authentication"]["passwordType"])
1034 self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
1035 msg["Authentication"]["eventId"])
1036 self.assertEquals(EVT_LOGON_NETWORK,
1037 msg["Authentication"]["logonType"])
1039 def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
1040 def isLastExpectedMessage(msg):
1041 return (msg["type"] == "Authentication" and
1042 msg["Authentication"]["serviceDescription"] == "SMB" and
1043 msg["Authentication"]["authDescription"] == "bare-NTLM" and
1044 msg["Authentication"]["passwordType"] == "NTLMv1" and
1045 (msg["Authentication"]["status"] ==
1046 "NT_STATUS_WRONG_PASSWORD") and
1047 (msg["Authentication"]["eventId"] ==
1048 EVT_ID_UNSUCCESSFUL_LOGON) and
1049 (msg["Authentication"]["logonType"] ==
1050 EVT_LOGON_NETWORK))
1052 creds = self.insta_creds(template=self.get_credentials(),
1053 kerberos_state=DONT_USE_KERBEROS)
1054 creds.set_password("badPassword")
1056 thrown = False
1057 try:
1058 self.smb_connection(creds,
1059 force_smb1=True,
1060 ntlmv2_auth="no",
1061 use_spnego="no")
1062 except NTSTATUSError:
1063 thrown = True
1064 self.assertEquals(thrown, True)
1066 messages = self.waitForMessages(isLastExpectedMessage)
1067 self.assertEquals(1,
1068 len(messages),
1069 "Did not receive the expected number of messages")
1071 def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self):
1072 def isLastExpectedMessage(msg):
1073 return (msg["type"] == "Authentication" and
1074 msg["Authentication"]["serviceDescription"] == "SMB" and
1075 msg["Authentication"]["authDescription"] == "bare-NTLM" and
1076 msg["Authentication"]["passwordType"] == "NTLMv1" and
1077 (msg["Authentication"]["status"] ==
1078 "NT_STATUS_NO_SUCH_USER") and
1079 (msg["Authentication"]["eventId"] ==
1080 EVT_ID_UNSUCCESSFUL_LOGON) and
1081 (msg["Authentication"]["logonType"] ==
1082 EVT_LOGON_NETWORK))
1084 creds = self.insta_creds(template=self.get_credentials(),
1085 kerberos_state=DONT_USE_KERBEROS)
1086 creds.set_username("badUser")
1088 thrown = False
1089 try:
1090 self.smb_connection(creds,
1091 force_smb1=True,
1092 ntlmv2_auth="no",
1093 use_spnego="no")
1094 except NTSTATUSError:
1095 thrown = True
1096 self.assertEquals(thrown, True)
1098 messages = self.waitForMessages(isLastExpectedMessage)
1099 self.assertEquals(1,
1100 len(messages),
1101 "Did not receive the expected number of messages")
1103 def test_samlogon_interactive(self):
1105 workstation = "AuthLogTests"
1107 def isLastExpectedMessage(msg):
1108 return (msg["type"] == "Authentication" and
1109 (msg["Authentication"]["serviceDescription"] ==
1110 "SamLogon") and
1111 (msg["Authentication"]["authDescription"] ==
1112 "interactive") and
1113 msg["Authentication"]["status"] == "NT_STATUS_OK" and
1114 (msg["Authentication"]["workstation"] ==
1115 r"\\%s" % workstation) and
1116 (msg["Authentication"]["eventId"] ==
1117 EVT_ID_SUCCESSFUL_LOGON) and
1118 (msg["Authentication"]["logonType"] ==
1119 EVT_LOGON_INTERACTIVE))
1121 server = os.environ["SERVER"]
1122 user = os.environ["USERNAME"]
1123 password = os.environ["PASSWORD"]
1124 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1126 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1128 messages = self.waitForMessages(isLastExpectedMessage)
1129 messages = self.remove_netlogon_messages(messages)
1130 received = len(messages)
1131 self.assertIs(True,
1132 (received == 5 or received == 6),
1133 "Did not receive the expected number of messages")
1135 def test_samlogon_interactive_bad_password(self):
1137 workstation = "AuthLogTests"
1139 def isLastExpectedMessage(msg):
1140 return (msg["type"] == "Authentication" and
1141 (msg["Authentication"]["serviceDescription"] ==
1142 "SamLogon") and
1143 (msg["Authentication"]["authDescription"] ==
1144 "interactive") and
1145 (msg["Authentication"]["status"] ==
1146 "NT_STATUS_WRONG_PASSWORD") and
1147 (msg["Authentication"]["workstation"] ==
1148 r"\\%s" % workstation) and
1149 (msg["Authentication"]["eventId"] ==
1150 EVT_ID_UNSUCCESSFUL_LOGON) and
1151 (msg["Authentication"]["logonType"] ==
1152 EVT_LOGON_INTERACTIVE))
1154 server = os.environ["SERVER"]
1155 user = os.environ["USERNAME"]
1156 password = "badPassword"
1157 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1159 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1161 messages = self.waitForMessages(isLastExpectedMessage)
1162 messages = self.remove_netlogon_messages(messages)
1163 received = len(messages)
1164 self.assertIs(True,
1165 (received == 5 or received == 6),
1166 "Did not receive the expected number of messages")
1168 def test_samlogon_interactive_bad_user(self):
1170 workstation = "AuthLogTests"
1172 def isLastExpectedMessage(msg):
1173 return (msg["type"] == "Authentication" and
1174 (msg["Authentication"]["serviceDescription"] ==
1175 "SamLogon") and
1176 (msg["Authentication"]["authDescription"] ==
1177 "interactive") and
1178 (msg["Authentication"]["status"] ==
1179 "NT_STATUS_NO_SUCH_USER") and
1180 (msg["Authentication"]["workstation"] ==
1181 r"\\%s" % workstation) and
1182 (msg["Authentication"]["eventId"] ==
1183 EVT_ID_UNSUCCESSFUL_LOGON) and
1184 (msg["Authentication"]["logonType"] ==
1185 EVT_LOGON_INTERACTIVE))
1187 server = os.environ["SERVER"]
1188 user = "badUser"
1189 password = os.environ["PASSWORD"]
1190 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1192 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1194 messages = self.waitForMessages(isLastExpectedMessage)
1195 messages = self.remove_netlogon_messages(messages)
1196 received = len(messages)
1197 self.assertIs(True,
1198 (received == 5 or received == 6),
1199 "Did not receive the expected number of messages")
1201 def test_samlogon_network(self):
1203 workstation = "AuthLogTests"
1205 def isLastExpectedMessage(msg):
1206 return (msg["type"] == "Authentication" and
1207 (msg["Authentication"]["serviceDescription"] ==
1208 "SamLogon") and
1209 msg["Authentication"]["authDescription"] == "network" and
1210 msg["Authentication"]["status"] == "NT_STATUS_OK" and
1211 (msg["Authentication"]["workstation"] ==
1212 r"\\%s" % workstation) and
1213 (msg["Authentication"]["eventId"] ==
1214 EVT_ID_SUCCESSFUL_LOGON) and
1215 (msg["Authentication"]["logonType"] ==
1216 EVT_LOGON_NETWORK))
1218 server = os.environ["SERVER"]
1219 user = os.environ["USERNAME"]
1220 password = os.environ["PASSWORD"]
1221 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1223 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1225 messages = self.waitForMessages(isLastExpectedMessage)
1226 messages = self.remove_netlogon_messages(messages)
1227 received = len(messages)
1228 self.assertIs(True,
1229 (received == 5 or received == 6),
1230 "Did not receive the expected number of messages")
1232 def test_samlogon_network_bad_password(self):
1234 workstation = "AuthLogTests"
1236 def isLastExpectedMessage(msg):
1237 return (msg["type"] == "Authentication" and
1238 (msg["Authentication"]["serviceDescription"] ==
1239 "SamLogon") and
1240 msg["Authentication"]["authDescription"] == "network" and
1241 (msg["Authentication"]["status"] ==
1242 "NT_STATUS_WRONG_PASSWORD") and
1243 (msg["Authentication"]["workstation"] ==
1244 r"\\%s" % workstation) and
1245 (msg["Authentication"]["eventId"] ==
1246 EVT_ID_UNSUCCESSFUL_LOGON) and
1247 (msg["Authentication"]["logonType"] ==
1248 EVT_LOGON_NETWORK))
1250 server = os.environ["SERVER"]
1251 user = os.environ["USERNAME"]
1252 password = "badPassword"
1253 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1255 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1257 messages = self.waitForMessages(isLastExpectedMessage)
1258 messages = self.remove_netlogon_messages(messages)
1259 received = len(messages)
1260 self.assertIs(True,
1261 (received == 5 or received == 6),
1262 "Did not receive the expected number of messages")
1264 def test_samlogon_network_bad_user(self):
1266 workstation = "AuthLogTests"
1268 def isLastExpectedMessage(msg):
1269 return ((msg["type"] == "Authentication") and
1270 (msg["Authentication"]["serviceDescription"] ==
1271 "SamLogon") and
1272 (msg["Authentication"]["authDescription"] == "network") and
1273 (msg["Authentication"]["status"] ==
1274 "NT_STATUS_NO_SUCH_USER") and
1275 (msg["Authentication"]["workstation"] ==
1276 r"\\%s" % workstation) and
1277 (msg["Authentication"]["eventId"] ==
1278 EVT_ID_UNSUCCESSFUL_LOGON) and
1279 (msg["Authentication"]["logonType"] ==
1280 EVT_LOGON_NETWORK))
1282 server = os.environ["SERVER"]
1283 user = "badUser"
1284 password = os.environ["PASSWORD"]
1285 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1287 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1289 messages = self.waitForMessages(isLastExpectedMessage)
1290 messages = self.remove_netlogon_messages(messages)
1291 received = len(messages)
1292 self.assertIs(True,
1293 (received == 5 or received == 6),
1294 "Did not receive the expected number of messages")
1296 def test_samlogon_network_mschap(self):
1298 workstation = "AuthLogTests"
1300 def isLastExpectedMessage(msg):
1301 return ((msg["type"] == "Authentication") and
1302 (msg["Authentication"]["serviceDescription"] ==
1303 "SamLogon") and
1304 (msg["Authentication"]["authDescription"] == "network") and
1305 (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1306 (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1307 (msg["Authentication"]["workstation"] ==
1308 r"\\%s" % workstation) and
1309 (msg["Authentication"]["eventId"] ==
1310 EVT_ID_SUCCESSFUL_LOGON) and
1311 (msg["Authentication"]["logonType"] ==
1312 EVT_LOGON_NETWORK))
1314 server = os.environ["SERVER"]
1315 user = os.environ["USERNAME"]
1316 password = os.environ["PASSWORD"]
1317 samlogon = "samlogon %s %s %s %d 0x00010000" % (
1318 user, password, workstation, 2)
1320 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1322 messages = self.waitForMessages(isLastExpectedMessage)
1323 messages = self.remove_netlogon_messages(messages)
1324 received = len(messages)
1325 self.assertIs(True,
1326 (received == 5 or received == 6),
1327 "Did not receive the expected number of messages")
1329 def test_samlogon_network_mschap_bad_password(self):
1331 workstation = "AuthLogTests"
1333 def isLastExpectedMessage(msg):
1334 return ((msg["type"] == "Authentication") and
1335 (msg["Authentication"]["serviceDescription"] ==
1336 "SamLogon") and
1337 (msg["Authentication"]["authDescription"] == "network") and
1338 (msg["Authentication"]["status"] ==
1339 "NT_STATUS_WRONG_PASSWORD") and
1340 (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1341 (msg["Authentication"]["workstation"] ==
1342 r"\\%s" % workstation) and
1343 (msg["Authentication"]["eventId"] ==
1344 EVT_ID_UNSUCCESSFUL_LOGON) and
1345 (msg["Authentication"]["logonType"] ==
1346 EVT_LOGON_NETWORK))
1348 server = os.environ["SERVER"]
1349 user = os.environ["USERNAME"]
1350 password = "badPassword"
1351 samlogon = "samlogon %s %s %s %d 0x00010000" % (
1352 user, password, workstation, 2)
1354 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1356 messages = self.waitForMessages(isLastExpectedMessage)
1357 messages = self.remove_netlogon_messages(messages)
1358 received = len(messages)
1359 self.assertIs(True,
1360 (received == 5 or received == 6),
1361 "Did not receive the expected number of messages")
1363 def test_samlogon_network_mschap_bad_user(self):
1365 workstation = "AuthLogTests"
1367 def isLastExpectedMessage(msg):
1368 return ((msg["type"] == "Authentication") and
1369 (msg["Authentication"]["serviceDescription"] ==
1370 "SamLogon") and
1371 (msg["Authentication"]["authDescription"] == "network") and
1372 (msg["Authentication"]["status"] ==
1373 "NT_STATUS_NO_SUCH_USER") and
1374 (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1375 (msg["Authentication"]["workstation"] ==
1376 r"\\%s" % workstation) and
1377 (msg["Authentication"]["eventId"] ==
1378 EVT_ID_UNSUCCESSFUL_LOGON) and
1379 (msg["Authentication"]["logonType"] ==
1380 EVT_LOGON_NETWORK))
1382 server = os.environ["SERVER"]
1383 user = "badUser"
1384 password = os.environ["PASSWORD"]
1385 samlogon = "samlogon %s %s %s %d 0x00010000" % (
1386 user, password, workstation, 2)
1388 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1390 messages = self.waitForMessages(isLastExpectedMessage)
1391 messages = self.remove_netlogon_messages(messages)
1392 received = len(messages)
1393 self.assertIs(True,
1394 (received == 5 or received == 6),
1395 "Did not receive the expected number of messages")
1397 def test_samlogon_schannel_seal(self):
1399 workstation = "AuthLogTests"
1401 def isLastExpectedMessage(msg):
1402 return ((msg["type"] == "Authentication") and
1403 (msg["Authentication"]["serviceDescription"] ==
1404 "SamLogon") and
1405 (msg["Authentication"]["authDescription"] == "network") and
1406 (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1407 (msg["Authentication"]["workstation"] ==
1408 r"\\%s" % workstation) and
1409 (msg["Authentication"]["eventId"] ==
1410 EVT_ID_SUCCESSFUL_LOGON) and
1411 (msg["Authentication"]["logonType"] ==
1412 EVT_LOGON_NETWORK))
1414 server = os.environ["SERVER"]
1415 user = os.environ["USERNAME"]
1416 password = os.environ["PASSWORD"]
1417 samlogon = "schannel;samlogon %s %s %s" % (user, password, workstation)
1419 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1421 messages = self.waitForMessages(isLastExpectedMessage)
1422 messages = self.remove_netlogon_messages(messages)
1423 received = len(messages)
1424 self.assertIs(True,
1425 (received == 5 or received == 6),
1426 "Did not receive the expected number of messages")
1428 # Check the second to last message it should be an Authorization
1429 msg = messages[-2]
1430 self.assertEquals("Authorization", msg["type"])
1431 self.assertEquals("DCE/RPC",
1432 msg["Authorization"]["serviceDescription"])
1433 self.assertEquals("schannel", msg["Authorization"]["authType"])
1434 self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])
1435 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
1437 # Signed logons get promoted to sealed, this test ensures that
1438 # this behaviour is not removed accidentally
1439 def test_samlogon_schannel_sign(self):
1441 workstation = "AuthLogTests"
1443 def isLastExpectedMessage(msg):
1444 return ((msg["type"] == "Authentication") and
1445 (msg["Authentication"]["serviceDescription"] ==
1446 "SamLogon") and
1447 (msg["Authentication"]["authDescription"] == "network") and
1448 (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1449 (msg["Authentication"]["workstation"] ==
1450 r"\\%s" % workstation) and
1451 (msg["Authentication"]["eventId"] ==
1452 EVT_ID_SUCCESSFUL_LOGON) and
1453 (msg["Authentication"]["logonType"] ==
1454 EVT_LOGON_NETWORK))
1456 server = os.environ["SERVER"]
1457 user = os.environ["USERNAME"]
1458 password = os.environ["PASSWORD"]
1459 samlogon = "schannelsign;samlogon %s %s %s" % (
1460 user, password, workstation)
1462 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1464 messages = self.waitForMessages(isLastExpectedMessage)
1465 messages = self.remove_netlogon_messages(messages)
1466 received = len(messages)
1467 self.assertIs(True,
1468 (received == 5 or received == 6),
1469 "Did not receive the expected number of messages")
1471 # Check the second to last message it should be an Authorization
1472 msg = messages[-2]
1473 self.assertEquals("Authorization", msg["type"])
1474 self.assertEquals("DCE/RPC",
1475 msg["Authorization"]["serviceDescription"])
1476 self.assertEquals("schannel", msg["Authorization"]["authType"])
1477 self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])
1478 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))