1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__
import print_function
19 """Tests for the Auth and AuthZ logging.
22 from samba
.dcerpc
import srvsvc
, dnsserver
24 from samba
.samba3
import libsmb_samba_internal
as libsmb
25 from samba
.samba3
import param
as s3param
26 from samba
.samdb
import SamDB
27 import samba
.tests
.auth_log_base
28 from samba
.credentials
import DONT_USE_KERBEROS
, MUST_USE_KERBEROS
29 from samba
import NTSTATUSError
30 from subprocess
import call
31 from ldb
import LdbError
32 from samba
.dcerpc
.windows_event_ids
import (
33 EVT_ID_SUCCESSFUL_LOGON
,
34 EVT_ID_UNSUCCESSFUL_LOGON
,
36 EVT_LOGON_INTERACTIVE
,
37 EVT_LOGON_NETWORK_CLEAR_TEXT
42 class AuthLogTests(samba
.tests
.auth_log_base
.AuthLogTestBase
):
45 super(AuthLogTests
, self
).setUp()
46 self
.remoteAddress
= os
.environ
["CLIENT_IP"]
49 super(AuthLogTests
, self
).tearDown()
51 def smb_connection(self
, creds
, use_spnego
="yes", ntlmv2_auth
="yes",
53 # the SMB bindings rely on having a s3 loadparm
54 lp
= self
.get_loadparm()
55 s3_lp
= s3param
.get_context()
56 s3_lp
.load(lp
.configfile
)
58 # Allow the testcase to skip SPNEGO or use NTLMv1
59 s3_lp
.set("client use spnego", use_spnego
)
60 s3_lp
.set("client ntlmv2 auth", ntlmv2_auth
)
62 return libsmb
.Conn(self
.server
, "sysvol", lp
=s3_lp
, creds
=creds
,
63 force_smb1
=force_smb1
)
65 def _test_rpc_ncacn_np(self
, authTypes
, creds
, service
,
66 binding
, protection
, checkFunction
):
67 def isLastExpectedMessage(msg
):
68 return (msg
["type"] == "Authorization" and
69 (msg
["Authorization"]["serviceDescription"] == "DCE/RPC" or
70 msg
["Authorization"]["serviceDescription"] == service
) and
71 msg
["Authorization"]["authType"] == authTypes
[0] and
72 msg
["Authorization"]["transportProtection"] == protection
)
75 binding
= "[%s]" % binding
77 if service
== "dnsserver":
78 x
= dnsserver
.dnsserver("ncacn_np:%s%s" % (self
.server
, binding
),
81 elif service
== "srvsvc":
82 x
= srvsvc
.srvsvc("ncacn_np:%s%s" % (self
.server
, binding
),
86 # The connection is passed to ensure the server
87 # messaging context stays up until all the messages have been received.
88 messages
= self
.waitForMessages(isLastExpectedMessage
, x
)
89 checkFunction(messages
, authTypes
, service
, binding
, protection
)
91 def _assert_ncacn_np_serviceDescription(self
, binding
, serviceDescription
):
92 # Turn "[foo,bar]" into a list ("foo", "bar") to test
93 # lambda x: x removes anything that evaluates to False,
94 # including empty strings, so we handle "" as well
96 list(filter(lambda x
: x
, re
.compile('[\[,\]]').split(binding
)))
98 # Handle explicit smb2, smb1 or auto negotiation
99 if "smb2" in binding_list
:
100 self
.assertEquals(serviceDescription
, "SMB2")
101 elif "smb1" in binding_list
:
102 self
.assertEquals(serviceDescription
, "SMB")
104 self
.assertIn(serviceDescription
, ["SMB", "SMB2"])
106 def rpc_ncacn_np_ntlm_check(self
, messages
, authTypes
, service
,
107 binding
, protection
):
109 expected_messages
= len(authTypes
)
110 self
.assertEquals(expected_messages
,
112 "Did not receive the expected number of messages")
114 # Check the first message it should be an Authentication
116 self
.assertEquals("Authentication", msg
["type"])
117 self
.assertEquals("NT_STATUS_OK", msg
["Authentication"]["status"])
119 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
121 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
122 self
._assert
_ncacn
_np
_serviceDescription
(
123 binding
, msg
["Authentication"]["serviceDescription"])
124 self
.assertEquals(authTypes
[1],
125 msg
["Authentication"]["authDescription"])
127 # Check the second message it should be an Authorization
129 self
.assertEquals("Authorization", msg
["type"])
130 self
._assert
_ncacn
_np
_serviceDescription
(
131 binding
, msg
["Authorization"]["serviceDescription"])
132 self
.assertEquals(authTypes
[2], msg
["Authorization"]["authType"])
133 self
.assertEquals("SMB", msg
["Authorization"]["transportProtection"])
134 self
.assertTrue(self
.is_guid(msg
["Authorization"]["sessionId"]))
136 # Check the third message it should be an Authentication
137 # if we are expecting 4 messages
138 if expected_messages
== 4:
139 def checkServiceDescription(desc
):
140 return (desc
== "DCE/RPC" or desc
== service
)
143 self
.assertEquals("Authentication", msg
["type"])
144 self
.assertEquals("NT_STATUS_OK", msg
["Authentication"]["status"])
146 checkServiceDescription(
147 msg
["Authentication"]["serviceDescription"]))
149 self
.assertEquals(authTypes
[3],
150 msg
["Authentication"]["authDescription"])
152 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
154 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
156 def rpc_ncacn_np_krb5_check(
164 expected_messages
= len(authTypes
)
165 self
.assertEquals(expected_messages
,
167 "Did not receive the expected number of messages")
169 # Check the first message it should be an Authentication
170 # This is almost certainly Authentication over UDP, and is probably
171 # returning message too big,
173 self
.assertEquals("Authentication", msg
["type"])
174 self
.assertEquals("NT_STATUS_OK", msg
["Authentication"]["status"])
175 self
.assertEquals("Kerberos KDC",
176 msg
["Authentication"]["serviceDescription"])
177 self
.assertEquals(authTypes
[1],
178 msg
["Authentication"]["authDescription"])
180 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
182 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
184 # Check the second message it should be an Authentication
185 # This this the TCP Authentication in response to the message too big
186 # response to the UDP Authentication
188 self
.assertEquals("Authentication", msg
["type"])
189 self
.assertEquals("NT_STATUS_OK", msg
["Authentication"]["status"])
190 self
.assertEquals("Kerberos KDC",
191 msg
["Authentication"]["serviceDescription"])
192 self
.assertEquals(authTypes
[2],
193 msg
["Authentication"]["authDescription"])
195 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
197 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
199 # Check the third message it should be an Authorization
201 self
.assertEquals("Authorization", msg
["type"])
202 self
._assert
_ncacn
_np
_serviceDescription
(
203 binding
, msg
["Authorization"]["serviceDescription"])
204 self
.assertEquals(authTypes
[3], msg
["Authorization"]["authType"])
205 self
.assertEquals("SMB", msg
["Authorization"]["transportProtection"])
206 self
.assertTrue(self
.is_guid(msg
["Authorization"]["sessionId"]))
208 def test_rpc_ncacn_np_ntlm_dns_sign(self
):
209 creds
= self
.insta_creds(template
=self
.get_credentials(),
210 kerberos_state
=DONT_USE_KERBEROS
)
211 self
._test
_rpc
_ncacn
_np
(["NTLMSSP",
215 creds
, "dnsserver", "sign", "SIGN",
216 self
.rpc_ncacn_np_ntlm_check
)
218 def test_rpc_ncacn_np_ntlm_srv_sign(self
):
219 creds
= self
.insta_creds(template
=self
.get_credentials(),
220 kerberos_state
=DONT_USE_KERBEROS
)
221 self
._test
_rpc
_ncacn
_np
(["NTLMSSP",
225 creds
, "srvsvc", "sign", "SIGN",
226 self
.rpc_ncacn_np_ntlm_check
)
228 def test_rpc_ncacn_np_ntlm_dns(self
):
229 creds
= self
.insta_creds(template
=self
.get_credentials(),
230 kerberos_state
=DONT_USE_KERBEROS
)
231 self
._test
_rpc
_ncacn
_np
(["ncacn_np",
234 creds
, "dnsserver", "", "SMB",
235 self
.rpc_ncacn_np_ntlm_check
)
237 def test_rpc_ncacn_np_ntlm_srv(self
):
238 creds
= self
.insta_creds(template
=self
.get_credentials(),
239 kerberos_state
=DONT_USE_KERBEROS
)
240 self
._test
_rpc
_ncacn
_np
(["ncacn_np",
243 creds
, "srvsvc", "", "SMB",
244 self
.rpc_ncacn_np_ntlm_check
)
246 def test_rpc_ncacn_np_krb_dns_sign(self
):
247 creds
= self
.insta_creds(template
=self
.get_credentials(),
248 kerberos_state
=MUST_USE_KERBEROS
)
249 self
._test
_rpc
_ncacn
_np
(["krb5",
250 "ENC-TS Pre-authentication",
251 "ENC-TS Pre-authentication",
253 creds
, "dnsserver", "sign", "SIGN",
254 self
.rpc_ncacn_np_krb5_check
)
256 def test_rpc_ncacn_np_krb_srv_sign(self
):
257 creds
= self
.insta_creds(template
=self
.get_credentials(),
258 kerberos_state
=MUST_USE_KERBEROS
)
259 self
._test
_rpc
_ncacn
_np
(["krb5",
260 "ENC-TS Pre-authentication",
261 "ENC-TS Pre-authentication",
263 creds
, "srvsvc", "sign", "SIGN",
264 self
.rpc_ncacn_np_krb5_check
)
266 def test_rpc_ncacn_np_krb_dns(self
):
267 creds
= self
.insta_creds(template
=self
.get_credentials(),
268 kerberos_state
=MUST_USE_KERBEROS
)
269 self
._test
_rpc
_ncacn
_np
(["ncacn_np",
270 "ENC-TS Pre-authentication",
271 "ENC-TS Pre-authentication",
273 creds
, "dnsserver", "", "SMB",
274 self
.rpc_ncacn_np_krb5_check
)
276 def test_rpc_ncacn_np_krb_dns_smb2(self
):
277 creds
= self
.insta_creds(template
=self
.get_credentials(),
278 kerberos_state
=MUST_USE_KERBEROS
)
279 self
._test
_rpc
_ncacn
_np
(["ncacn_np",
280 "ENC-TS Pre-authentication",
281 "ENC-TS Pre-authentication",
283 creds
, "dnsserver", "smb2", "SMB",
284 self
.rpc_ncacn_np_krb5_check
)
286 def test_rpc_ncacn_np_krb_srv(self
):
287 creds
= self
.insta_creds(template
=self
.get_credentials(),
288 kerberos_state
=MUST_USE_KERBEROS
)
289 self
._test
_rpc
_ncacn
_np
(["ncacn_np",
290 "ENC-TS Pre-authentication",
291 "ENC-TS Pre-authentication",
293 creds
, "srvsvc", "", "SMB",
294 self
.rpc_ncacn_np_krb5_check
)
296 def _test_rpc_ncacn_ip_tcp(self
, authTypes
, creds
, service
,
297 binding
, protection
, checkFunction
):
298 def isLastExpectedMessage(msg
):
299 return (msg
["type"] == "Authorization" and
300 msg
["Authorization"]["serviceDescription"] == "DCE/RPC" and
301 msg
["Authorization"]["authType"] == authTypes
[0] and
302 msg
["Authorization"]["transportProtection"] == protection
)
305 binding
= "[%s]" % binding
307 if service
== "dnsserver":
308 conn
= dnsserver
.dnsserver(
309 "ncacn_ip_tcp:%s%s" % (self
.server
, binding
),
312 elif service
== "srvsvc":
313 conn
= srvsvc
.srvsvc("ncacn_ip_tcp:%s%s" % (self
.server
, binding
),
317 messages
= self
.waitForMessages(isLastExpectedMessage
, conn
)
318 checkFunction(messages
, authTypes
, service
, binding
, protection
)
320 def rpc_ncacn_ip_tcp_ntlm_check(self
, messages
, authTypes
, service
,
321 binding
, protection
):
323 expected_messages
= len(authTypes
)
324 self
.assertEquals(expected_messages
,
326 "Did not receive the expected number of messages")
328 # Check the first message it should be an Authorization
330 self
.assertEquals("Authorization", msg
["type"])
331 self
.assertEquals("DCE/RPC",
332 msg
["Authorization"]["serviceDescription"])
333 self
.assertEquals(authTypes
[1], msg
["Authorization"]["authType"])
334 self
.assertEquals("NONE", msg
["Authorization"]["transportProtection"])
335 self
.assertTrue(self
.is_guid(msg
["Authorization"]["sessionId"]))
337 # Check the second message it should be an Authentication
339 self
.assertEquals("Authentication", msg
["type"])
340 self
.assertEquals("NT_STATUS_OK", msg
["Authentication"]["status"])
341 self
.assertEquals("DCE/RPC",
342 msg
["Authentication"]["serviceDescription"])
343 self
.assertEquals(authTypes
[2],
344 msg
["Authentication"]["authDescription"])
346 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
348 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
350 def rpc_ncacn_ip_tcp_krb5_check(self
, messages
, authTypes
, service
,
351 binding
, protection
):
353 expected_messages
= len(authTypes
)
354 self
.assertEquals(expected_messages
,
356 "Did not receive the expected number of messages")
358 # Check the first message it should be an Authorization
360 self
.assertEquals("Authorization", msg
["type"])
361 self
.assertEquals("DCE/RPC",
362 msg
["Authorization"]["serviceDescription"])
363 self
.assertEquals(authTypes
[1], msg
["Authorization"]["authType"])
364 self
.assertEquals("NONE", msg
["Authorization"]["transportProtection"])
365 self
.assertTrue(self
.is_guid(msg
["Authorization"]["sessionId"]))
367 # Check the second message it should be an Authentication
369 self
.assertEquals("Authentication", msg
["type"])
370 self
.assertEquals("NT_STATUS_OK", msg
["Authentication"]["status"])
371 self
.assertEquals("Kerberos KDC",
372 msg
["Authentication"]["serviceDescription"])
373 self
.assertEquals(authTypes
[2],
374 msg
["Authentication"]["authDescription"])
376 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
378 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
380 # Check the third message it should be an Authentication
382 self
.assertEquals("Authentication", msg
["type"])
383 self
.assertEquals("NT_STATUS_OK", msg
["Authentication"]["status"])
384 self
.assertEquals("Kerberos KDC",
385 msg
["Authentication"]["serviceDescription"])
386 self
.assertEquals(authTypes
[2],
387 msg
["Authentication"]["authDescription"])
389 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
391 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
393 def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self
):
394 creds
= self
.insta_creds(template
=self
.get_credentials(),
395 kerberos_state
=DONT_USE_KERBEROS
)
396 self
._test
_rpc
_ncacn
_ip
_tcp
(["NTLMSSP",
399 creds
, "dnsserver", "sign", "SIGN",
400 self
.rpc_ncacn_ip_tcp_ntlm_check
)
402 def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self
):
403 creds
= self
.insta_creds(template
=self
.get_credentials(),
404 kerberos_state
=MUST_USE_KERBEROS
)
405 self
._test
_rpc
_ncacn
_ip
_tcp
(["krb5",
407 "ENC-TS Pre-authentication",
408 "ENC-TS Pre-authentication"],
409 creds
, "dnsserver", "sign", "SIGN",
410 self
.rpc_ncacn_ip_tcp_krb5_check
)
412 def test_rpc_ncacn_ip_tcp_ntlm_dns(self
):
413 creds
= self
.insta_creds(template
=self
.get_credentials(),
414 kerberos_state
=DONT_USE_KERBEROS
)
415 self
._test
_rpc
_ncacn
_ip
_tcp
(["NTLMSSP",
418 creds
, "dnsserver", "", "SIGN",
419 self
.rpc_ncacn_ip_tcp_ntlm_check
)
421 def test_rpc_ncacn_ip_tcp_krb5_dns(self
):
422 creds
= self
.insta_creds(template
=self
.get_credentials(),
423 kerberos_state
=MUST_USE_KERBEROS
)
424 self
._test
_rpc
_ncacn
_ip
_tcp
(["krb5",
426 "ENC-TS Pre-authentication",
427 "ENC-TS Pre-authentication"],
428 creds
, "dnsserver", "", "SIGN",
429 self
.rpc_ncacn_ip_tcp_krb5_check
)
431 def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self
):
432 creds
= self
.insta_creds(template
=self
.get_credentials(),
433 kerberos_state
=DONT_USE_KERBEROS
)
434 self
._test
_rpc
_ncacn
_ip
_tcp
(["NTLMSSP",
437 creds
, "dnsserver", "connect", "NONE",
438 self
.rpc_ncacn_ip_tcp_ntlm_check
)
440 def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self
):
441 creds
= self
.insta_creds(template
=self
.get_credentials(),
442 kerberos_state
=MUST_USE_KERBEROS
)
443 self
._test
_rpc
_ncacn
_ip
_tcp
(["krb5",
445 "ENC-TS Pre-authentication",
446 "ENC-TS Pre-authentication"],
447 creds
, "dnsserver", "connect", "NONE",
448 self
.rpc_ncacn_ip_tcp_krb5_check
)
450 def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self
):
451 creds
= self
.insta_creds(template
=self
.get_credentials(),
452 kerberos_state
=DONT_USE_KERBEROS
)
453 self
._test
_rpc
_ncacn
_ip
_tcp
(["NTLMSSP",
456 creds
, "dnsserver", "seal", "SEAL",
457 self
.rpc_ncacn_ip_tcp_ntlm_check
)
459 def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self
):
460 creds
= self
.insta_creds(template
=self
.get_credentials(),
461 kerberos_state
=MUST_USE_KERBEROS
)
462 self
._test
_rpc
_ncacn
_ip
_tcp
(["krb5",
464 "ENC-TS Pre-authentication",
465 "ENC-TS Pre-authentication"],
466 creds
, "dnsserver", "seal", "SEAL",
467 self
.rpc_ncacn_ip_tcp_krb5_check
)
471 def isLastExpectedMessage(msg
):
472 return (msg
["type"] == "Authorization" and
473 msg
["Authorization"]["serviceDescription"] == "LDAP" and
474 msg
["Authorization"]["transportProtection"] == "SIGN" and
475 msg
["Authorization"]["authType"] == "krb5")
477 self
.samdb
= SamDB(url
="ldap://%s" % os
.environ
["SERVER"],
478 lp
=self
.get_loadparm(),
479 credentials
=self
.get_credentials())
481 messages
= self
.waitForMessages(isLastExpectedMessage
)
484 "Did not receive the expected number of messages")
486 # Check the first message it should be an Authentication
488 self
.assertEquals("Authentication", msg
["type"])
489 self
.assertEquals("NT_STATUS_OK", msg
["Authentication"]["status"])
490 self
.assertEquals("Kerberos KDC",
491 msg
["Authentication"]["serviceDescription"])
492 self
.assertEquals("ENC-TS Pre-authentication",
493 msg
["Authentication"]["authDescription"])
494 self
.assertTrue(msg
["Authentication"]["duration"] > 0)
496 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
498 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
500 # Check the second message it should be an Authentication
502 self
.assertEquals("Authentication", msg
["type"])
503 self
.assertEquals("NT_STATUS_OK", msg
["Authentication"]["status"])
504 self
.assertEquals("Kerberos KDC",
505 msg
["Authentication"]["serviceDescription"])
506 self
.assertEquals("ENC-TS Pre-authentication",
507 msg
["Authentication"]["authDescription"])
508 self
.assertTrue(msg
["Authentication"]["duration"] > 0)
510 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
512 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
514 def test_ldap_ntlm(self
):
516 def isLastExpectedMessage(msg
):
517 return (msg
["type"] == "Authorization" and
518 msg
["Authorization"]["serviceDescription"] == "LDAP" and
519 msg
["Authorization"]["transportProtection"] == "SEAL" and
520 msg
["Authorization"]["authType"] == "NTLMSSP")
522 self
.samdb
= SamDB(url
="ldap://%s" % os
.environ
["SERVER_IP"],
523 lp
=self
.get_loadparm(),
524 credentials
=self
.get_credentials())
526 messages
= self
.waitForMessages(isLastExpectedMessage
)
529 "Did not receive the expected number of messages")
530 # Check the first message it should be an Authentication
532 self
.assertEquals("Authentication", msg
["type"])
533 self
.assertEquals("NT_STATUS_OK", msg
["Authentication"]["status"])
534 self
.assertEquals("LDAP",
535 msg
["Authentication"]["serviceDescription"])
536 self
.assertEquals("NTLMSSP", msg
["Authentication"]["authDescription"])
537 self
.assertTrue(msg
["Authentication"]["duration"] > 0)
539 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
541 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
543 def test_ldap_simple_bind(self
):
544 def isLastExpectedMessage(msg
):
545 return (msg
["type"] == "Authorization" and
546 msg
["Authorization"]["serviceDescription"] == "LDAP" and
547 msg
["Authorization"]["transportProtection"] == "TLS" and
548 msg
["Authorization"]["authType"] == "simple bind")
550 creds
= self
.insta_creds(template
=self
.get_credentials())
551 creds
.set_bind_dn("%s\\%s" % (creds
.get_domain(),
552 creds
.get_username()))
554 self
.samdb
= SamDB(url
="ldaps://%s" % os
.environ
["SERVER"],
555 lp
=self
.get_loadparm(),
558 messages
= self
.waitForMessages(isLastExpectedMessage
)
561 "Did not receive the expected number of messages")
563 # Check the first message it should be an Authentication
565 self
.assertEquals("Authentication", msg
["type"])
566 self
.assertEquals("NT_STATUS_OK", msg
["Authentication"]["status"])
567 self
.assertEquals("LDAP",
568 msg
["Authentication"]["serviceDescription"])
569 self
.assertEquals("simple bind",
570 msg
["Authentication"]["authDescription"])
572 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
574 EVT_LOGON_NETWORK_CLEAR_TEXT
, msg
["Authentication"]["logonType"])
576 def test_ldap_simple_bind_bad_password(self
):
577 def isLastExpectedMessage(msg
):
578 return (msg
["type"] == "Authentication" and
579 msg
["Authentication"]["serviceDescription"] == "LDAP" and
580 (msg
["Authentication"]["status"] ==
581 "NT_STATUS_WRONG_PASSWORD") and
582 (msg
["Authentication"]["authDescription"] ==
584 (msg
["Authentication"]["eventId"] ==
585 EVT_ID_UNSUCCESSFUL_LOGON
) and
586 (msg
["Authentication"]["logonType"] ==
587 EVT_LOGON_NETWORK_CLEAR_TEXT
))
589 creds
= self
.insta_creds(template
=self
.get_credentials())
590 creds
.set_password("badPassword")
591 creds
.set_bind_dn("%s\\%s" % (creds
.get_domain(),
592 creds
.get_username()))
596 self
.samdb
= SamDB(url
="ldaps://%s" % os
.environ
["SERVER"],
597 lp
=self
.get_loadparm(),
601 self
.assertEquals(thrown
, True)
603 messages
= self
.waitForMessages(isLastExpectedMessage
)
606 "Did not receive the expected number of messages")
608 def test_ldap_simple_bind_bad_user(self
):
609 def isLastExpectedMessage(msg
):
610 return (msg
["type"] == "Authentication" and
611 msg
["Authentication"]["serviceDescription"] == "LDAP" and
612 (msg
["Authentication"]["status"] ==
613 "NT_STATUS_NO_SUCH_USER") and
614 (msg
["Authentication"]["authDescription"] ==
616 (msg
["Authentication"]["eventId"] ==
617 EVT_ID_UNSUCCESSFUL_LOGON
) and
618 (msg
["Authentication"]["logonType"] ==
619 EVT_LOGON_NETWORK_CLEAR_TEXT
))
621 creds
= self
.insta_creds(template
=self
.get_credentials())
622 creds
.set_bind_dn("%s\\%s" % (creds
.get_domain(), "badUser"))
626 self
.samdb
= SamDB(url
="ldaps://%s" % os
.environ
["SERVER"],
627 lp
=self
.get_loadparm(),
631 self
.assertEquals(thrown
, True)
633 messages
= self
.waitForMessages(isLastExpectedMessage
)
636 "Did not receive the expected number of messages")
638 def test_ldap_simple_bind_unparseable_user(self
):
639 def isLastExpectedMessage(msg
):
640 return (msg
["type"] == "Authentication" and
641 msg
["Authentication"]["serviceDescription"] == "LDAP" and
642 (msg
["Authentication"]["status"] ==
643 "NT_STATUS_NO_SUCH_USER") and
644 (msg
["Authentication"]["authDescription"] ==
646 (msg
["Authentication"]["eventId"] ==
647 EVT_ID_UNSUCCESSFUL_LOGON
) and
648 (msg
["Authentication"]["logonType"] ==
649 EVT_LOGON_NETWORK_CLEAR_TEXT
))
651 creds
= self
.insta_creds(template
=self
.get_credentials())
652 creds
.set_bind_dn("%s\\%s" % (creds
.get_domain(), "abdcef"))
656 self
.samdb
= SamDB(url
="ldaps://%s" % os
.environ
["SERVER"],
657 lp
=self
.get_loadparm(),
661 self
.assertEquals(thrown
, True)
663 messages
= self
.waitForMessages(isLastExpectedMessage
)
666 "Did not receive the expected number of messages")
669 # Note: as this test does not expect any messages it will
670 # time out in the call to self.waitForMessages.
671 # This is expected, but it will slow this test.
672 def test_ldap_anonymous_access_bind_only(self
):
673 # Should be no logging for anonymous bind
674 # so receiving any message indicates a failure.
675 def isLastExpectedMessage(msg
):
678 creds
= self
.insta_creds(template
=self
.get_credentials())
679 creds
.set_anonymous()
681 self
.samdb
= SamDB(url
="ldaps://%s" % os
.environ
["SERVER"],
682 lp
=self
.get_loadparm(),
685 messages
= self
.waitForMessages(isLastExpectedMessage
)
688 "Did not receive the expected number of messages")
690 def test_ldap_anonymous_access(self
):
691 def isLastExpectedMessage(msg
):
692 return (msg
["type"] == "Authorization" and
693 msg
["Authorization"]["serviceDescription"] == "LDAP" and
694 msg
["Authorization"]["transportProtection"] == "TLS" and
695 msg
["Authorization"]["account"] == "ANONYMOUS LOGON" and
696 msg
["Authorization"]["authType"] == "no bind")
698 creds
= self
.insta_creds(template
=self
.get_credentials())
699 creds
.set_anonymous()
701 self
.samdb
= SamDB(url
="ldaps://%s" % os
.environ
["SERVER"],
702 lp
=self
.get_loadparm(),
706 self
.samdb
.search(base
=self
.samdb
.domain_dn())
707 self
.fail("Expected an LdbError exception")
711 messages
= self
.waitForMessages(isLastExpectedMessage
)
714 "Did not receive the expected number of messages")
717 def isLastExpectedMessage(msg
):
718 return (msg
["type"] == "Authorization" and
719 "SMB" in msg
["Authorization"]["serviceDescription"] and
720 msg
["Authorization"]["authType"] == "krb5" and
721 msg
["Authorization"]["transportProtection"] == "SMB")
723 creds
= self
.insta_creds(template
=self
.get_credentials())
724 self
.smb_connection(creds
)
726 messages
= self
.waitForMessages(isLastExpectedMessage
)
729 "Did not receive the expected number of messages")
730 # Check the first message it should be an Authentication
732 self
.assertEquals("Authentication", msg
["type"])
733 self
.assertEquals("NT_STATUS_OK", msg
["Authentication"]["status"])
734 self
.assertEquals("Kerberos KDC",
735 msg
["Authentication"]["serviceDescription"])
736 self
.assertEquals("ENC-TS Pre-authentication",
737 msg
["Authentication"]["authDescription"])
738 self
.assertEquals(EVT_ID_SUCCESSFUL_LOGON
,
739 msg
["Authentication"]["eventId"])
740 self
.assertEquals(EVT_LOGON_NETWORK
,
741 msg
["Authentication"]["logonType"])
743 # Check the second message it should be an Authentication
745 self
.assertEquals("Authentication", msg
["type"])
746 self
.assertEquals("NT_STATUS_OK", msg
["Authentication"]["status"])
747 self
.assertEquals("Kerberos KDC",
748 msg
["Authentication"]["serviceDescription"])
749 self
.assertEquals("ENC-TS Pre-authentication",
750 msg
["Authentication"]["authDescription"])
751 self
.assertEquals(EVT_ID_SUCCESSFUL_LOGON
,
752 msg
["Authentication"]["eventId"])
753 self
.assertEquals(EVT_LOGON_NETWORK
,
754 msg
["Authentication"]["logonType"])
756 def test_smb_bad_password(self
):
757 def isLastExpectedMessage(msg
):
758 return (msg
["type"] == "Authentication" and
759 (msg
["Authentication"]["serviceDescription"] ==
761 (msg
["Authentication"]["status"] ==
762 "NT_STATUS_WRONG_PASSWORD") and
763 (msg
["Authentication"]["authDescription"] ==
764 "ENC-TS Pre-authentication"))
766 creds
= self
.insta_creds(template
=self
.get_credentials())
767 creds
.set_kerberos_state(MUST_USE_KERBEROS
)
768 creds
.set_password("badPassword")
772 self
.smb_connection(creds
)
773 except NTSTATUSError
:
775 self
.assertEquals(thrown
, True)
777 messages
= self
.waitForMessages(isLastExpectedMessage
)
780 "Did not receive the expected number of messages")
782 def test_smb_bad_user(self
):
783 def isLastExpectedMessage(msg
):
784 return (msg
["type"] == "Authentication" and
785 (msg
["Authentication"]["serviceDescription"] ==
787 (msg
["Authentication"]["status"] ==
788 "NT_STATUS_NO_SUCH_USER") and
789 (msg
["Authentication"]["authDescription"] ==
790 "ENC-TS Pre-authentication") and
791 (msg
["Authentication"]["eventId"] ==
792 EVT_ID_UNSUCCESSFUL_LOGON
) and
793 (msg
["Authentication"]["logonType"] ==
796 creds
= self
.insta_creds(template
=self
.get_credentials())
797 creds
.set_kerberos_state(MUST_USE_KERBEROS
)
798 creds
.set_username("badUser")
802 self
.smb_connection(creds
)
803 except NTSTATUSError
:
805 self
.assertEquals(thrown
, True)
807 messages
= self
.waitForMessages(isLastExpectedMessage
)
810 "Did not receive the expected number of messages")
812 def test_smb1_anonymous(self
):
813 def isLastExpectedMessage(msg
):
814 return (msg
["type"] == "Authorization" and
815 msg
["Authorization"]["serviceDescription"] == "SMB" and
816 msg
["Authorization"]["authType"] == "NTLMSSP" and
817 msg
["Authorization"]["account"] == "ANONYMOUS LOGON" and
818 msg
["Authorization"]["transportProtection"] == "SMB")
820 server
= os
.environ
["SERVER"]
822 path
= "//%s/IPC$" % server
824 call(["bin/smbclient", path
, auth
, "-mNT1", "-c quit"])
826 messages
= self
.waitForMessages(isLastExpectedMessage
)
829 "Did not receive the expected number of messages")
831 # Check the first message it should be an Authentication
833 self
.assertEquals("Authentication", msg
["type"])
834 self
.assertEquals("NT_STATUS_NO_SUCH_USER",
835 msg
["Authentication"]["status"])
836 self
.assertEquals("SMB",
837 msg
["Authentication"]["serviceDescription"])
838 self
.assertEquals("NTLMSSP",
839 msg
["Authentication"]["authDescription"])
840 self
.assertEquals("No-Password",
841 msg
["Authentication"]["passwordType"])
842 self
.assertEquals(EVT_ID_UNSUCCESSFUL_LOGON
,
843 msg
["Authentication"]["eventId"])
844 self
.assertEquals(EVT_LOGON_NETWORK
,
845 msg
["Authentication"]["logonType"])
847 # Check the second message it should be an Authentication
849 self
.assertEquals("Authentication", msg
["type"])
850 self
.assertEquals("NT_STATUS_OK",
851 msg
["Authentication"]["status"])
852 self
.assertEquals("SMB",
853 msg
["Authentication"]["serviceDescription"])
854 self
.assertEquals("NTLMSSP",
855 msg
["Authentication"]["authDescription"])
856 self
.assertEquals("No-Password",
857 msg
["Authentication"]["passwordType"])
858 self
.assertEquals("ANONYMOUS LOGON",
859 msg
["Authentication"]["becameAccount"])
860 self
.assertEquals(EVT_ID_SUCCESSFUL_LOGON
,
861 msg
["Authentication"]["eventId"])
862 self
.assertEquals(EVT_LOGON_NETWORK
,
863 msg
["Authentication"]["logonType"])
865 def test_smb2_anonymous(self
):
866 def isLastExpectedMessage(msg
):
867 return (msg
["type"] == "Authorization" and
868 msg
["Authorization"]["serviceDescription"] == "SMB2" and
869 msg
["Authorization"]["authType"] == "NTLMSSP" and
870 msg
["Authorization"]["account"] == "ANONYMOUS LOGON" and
871 msg
["Authorization"]["transportProtection"] == "SMB")
873 server
= os
.environ
["SERVER"]
875 path
= "//%s/IPC$" % server
877 call(["bin/smbclient", path
, auth
, "-mSMB3", "-c quit"])
879 messages
= self
.waitForMessages(isLastExpectedMessage
)
882 "Did not receive the expected number of messages")
884 # Check the first message it should be an Authentication
886 self
.assertEquals("Authentication", msg
["type"])
887 self
.assertEquals("NT_STATUS_NO_SUCH_USER",
888 msg
["Authentication"]["status"])
889 self
.assertEquals("SMB2",
890 msg
["Authentication"]["serviceDescription"])
891 self
.assertEquals("NTLMSSP",
892 msg
["Authentication"]["authDescription"])
893 self
.assertEquals("No-Password",
894 msg
["Authentication"]["passwordType"])
895 self
.assertEquals(EVT_ID_UNSUCCESSFUL_LOGON
,
896 msg
["Authentication"]["eventId"])
897 self
.assertEquals(EVT_LOGON_NETWORK
,
898 msg
["Authentication"]["logonType"])
900 # Check the second message it should be an Authentication
902 self
.assertEquals("Authentication", msg
["type"])
903 self
.assertEquals("NT_STATUS_OK",
904 msg
["Authentication"]["status"])
905 self
.assertEquals("SMB2",
906 msg
["Authentication"]["serviceDescription"])
907 self
.assertEquals("NTLMSSP",
908 msg
["Authentication"]["authDescription"])
909 self
.assertEquals("No-Password",
910 msg
["Authentication"]["passwordType"])
911 self
.assertEquals("ANONYMOUS LOGON",
912 msg
["Authentication"]["becameAccount"])
913 self
.assertEquals(EVT_ID_SUCCESSFUL_LOGON
,
914 msg
["Authentication"]["eventId"])
915 self
.assertEquals(EVT_LOGON_NETWORK
,
916 msg
["Authentication"]["logonType"])
918 def test_smb_no_krb_spnego(self
):
919 def isLastExpectedMessage(msg
):
920 return (msg
["type"] == "Authorization" and
921 "SMB" in msg
["Authorization"]["serviceDescription"] and
922 msg
["Authorization"]["authType"] == "NTLMSSP" and
923 msg
["Authorization"]["transportProtection"] == "SMB")
925 creds
= self
.insta_creds(template
=self
.get_credentials(),
926 kerberos_state
=DONT_USE_KERBEROS
)
927 self
.smb_connection(creds
)
929 messages
= self
.waitForMessages(isLastExpectedMessage
)
932 "Did not receive the expected number of messages")
933 # Check the first message it should be an Authentication
935 self
.assertEquals("Authentication", msg
["type"])
936 self
.assertEquals("NT_STATUS_OK", msg
["Authentication"]["status"])
937 self
.assertIn(msg
["Authentication"]["serviceDescription"],
939 self
.assertEquals("NTLMSSP",
940 msg
["Authentication"]["authDescription"])
941 self
.assertEquals("NTLMv2",
942 msg
["Authentication"]["passwordType"])
943 self
.assertEquals(EVT_ID_SUCCESSFUL_LOGON
,
944 msg
["Authentication"]["eventId"])
945 self
.assertEquals(EVT_LOGON_NETWORK
,
946 msg
["Authentication"]["logonType"])
948 def test_smb_no_krb_spnego_bad_password(self
):
949 def isLastExpectedMessage(msg
):
950 return (msg
["type"] == "Authentication" and
951 "SMB" in msg
["Authentication"]["serviceDescription"] and
952 msg
["Authentication"]["authDescription"] == "NTLMSSP" and
953 msg
["Authentication"]["passwordType"] == "NTLMv2" and
954 (msg
["Authentication"]["status"] ==
955 "NT_STATUS_WRONG_PASSWORD") and
956 (msg
["Authentication"]["eventId"] ==
957 EVT_ID_UNSUCCESSFUL_LOGON
) and
958 (msg
["Authentication"]["logonType"] ==
961 creds
= self
.insta_creds(template
=self
.get_credentials(),
962 kerberos_state
=DONT_USE_KERBEROS
)
963 creds
.set_password("badPassword")
967 self
.smb_connection(creds
)
968 except NTSTATUSError
:
970 self
.assertEquals(thrown
, True)
972 messages
= self
.waitForMessages(isLastExpectedMessage
)
975 "Did not receive the expected number of messages")
977 def test_smb_no_krb_spnego_bad_user(self
):
978 def isLastExpectedMessage(msg
):
979 return (msg
["type"] == "Authentication" and
980 "SMB" in msg
["Authentication"]["serviceDescription"] and
981 msg
["Authentication"]["authDescription"] == "NTLMSSP" and
982 msg
["Authentication"]["passwordType"] == "NTLMv2" and
983 (msg
["Authentication"]["status"] ==
984 "NT_STATUS_NO_SUCH_USER") and
985 (msg
["Authentication"]["eventId"] ==
986 EVT_ID_UNSUCCESSFUL_LOGON
) and
987 (msg
["Authentication"]["logonType"] ==
990 creds
= self
.insta_creds(template
=self
.get_credentials(),
991 kerberos_state
=DONT_USE_KERBEROS
)
992 creds
.set_username("badUser")
996 self
.smb_connection(creds
)
997 except NTSTATUSError
:
999 self
.assertEquals(thrown
, True)
1001 messages
= self
.waitForMessages(isLastExpectedMessage
)
1002 self
.assertEquals(1,
1004 "Did not receive the expected number of messages")
1006 def test_smb_no_krb_no_spnego_no_ntlmv2(self
):
1007 def isLastExpectedMessage(msg
):
1008 return (msg
["type"] == "Authorization" and
1009 msg
["Authorization"]["serviceDescription"] == "SMB" and
1010 msg
["Authorization"]["authType"] == "bare-NTLM" and
1011 msg
["Authorization"]["transportProtection"] == "SMB")
1013 creds
= self
.insta_creds(template
=self
.get_credentials(),
1014 kerberos_state
=DONT_USE_KERBEROS
)
1015 self
.smb_connection(creds
,
1020 messages
= self
.waitForMessages(isLastExpectedMessage
)
1021 self
.assertEquals(2,
1023 "Did not receive the expected number of messages")
1024 # Check the first message it should be an Authentication
1026 self
.assertEquals("Authentication", msg
["type"])
1027 self
.assertEquals("NT_STATUS_OK", msg
["Authentication"]["status"])
1028 self
.assertEquals("SMB",
1029 msg
["Authentication"]["serviceDescription"])
1030 self
.assertEquals("bare-NTLM",
1031 msg
["Authentication"]["authDescription"])
1032 self
.assertEquals("NTLMv1",
1033 msg
["Authentication"]["passwordType"])
1034 self
.assertEquals(EVT_ID_SUCCESSFUL_LOGON
,
1035 msg
["Authentication"]["eventId"])
1036 self
.assertEquals(EVT_LOGON_NETWORK
,
1037 msg
["Authentication"]["logonType"])
1039 def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self
):
1040 def isLastExpectedMessage(msg
):
1041 return (msg
["type"] == "Authentication" and
1042 msg
["Authentication"]["serviceDescription"] == "SMB" and
1043 msg
["Authentication"]["authDescription"] == "bare-NTLM" and
1044 msg
["Authentication"]["passwordType"] == "NTLMv1" and
1045 (msg
["Authentication"]["status"] ==
1046 "NT_STATUS_WRONG_PASSWORD") and
1047 (msg
["Authentication"]["eventId"] ==
1048 EVT_ID_UNSUCCESSFUL_LOGON
) and
1049 (msg
["Authentication"]["logonType"] ==
1052 creds
= self
.insta_creds(template
=self
.get_credentials(),
1053 kerberos_state
=DONT_USE_KERBEROS
)
1054 creds
.set_password("badPassword")
1058 self
.smb_connection(creds
,
1062 except NTSTATUSError
:
1064 self
.assertEquals(thrown
, True)
1066 messages
= self
.waitForMessages(isLastExpectedMessage
)
1067 self
.assertEquals(1,
1069 "Did not receive the expected number of messages")
1071 def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self
):
1072 def isLastExpectedMessage(msg
):
1073 return (msg
["type"] == "Authentication" and
1074 msg
["Authentication"]["serviceDescription"] == "SMB" and
1075 msg
["Authentication"]["authDescription"] == "bare-NTLM" and
1076 msg
["Authentication"]["passwordType"] == "NTLMv1" and
1077 (msg
["Authentication"]["status"] ==
1078 "NT_STATUS_NO_SUCH_USER") and
1079 (msg
["Authentication"]["eventId"] ==
1080 EVT_ID_UNSUCCESSFUL_LOGON
) and
1081 (msg
["Authentication"]["logonType"] ==
1084 creds
= self
.insta_creds(template
=self
.get_credentials(),
1085 kerberos_state
=DONT_USE_KERBEROS
)
1086 creds
.set_username("badUser")
1090 self
.smb_connection(creds
,
1094 except NTSTATUSError
:
1096 self
.assertEquals(thrown
, True)
1098 messages
= self
.waitForMessages(isLastExpectedMessage
)
1099 self
.assertEquals(1,
1101 "Did not receive the expected number of messages")
1103 def test_samlogon_interactive(self
):
1105 workstation
= "AuthLogTests"
1107 def isLastExpectedMessage(msg
):
1108 return (msg
["type"] == "Authentication" and
1109 (msg
["Authentication"]["serviceDescription"] ==
1111 (msg
["Authentication"]["authDescription"] ==
1113 msg
["Authentication"]["status"] == "NT_STATUS_OK" and
1114 (msg
["Authentication"]["workstation"] ==
1115 r
"\\%s" % workstation
) and
1116 (msg
["Authentication"]["eventId"] ==
1117 EVT_ID_SUCCESSFUL_LOGON
) and
1118 (msg
["Authentication"]["logonType"] ==
1119 EVT_LOGON_INTERACTIVE
))
1121 server
= os
.environ
["SERVER"]
1122 user
= os
.environ
["USERNAME"]
1123 password
= os
.environ
["PASSWORD"]
1124 samlogon
= "samlogon %s %s %s %d" % (user
, password
, workstation
, 1)
1126 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1128 messages
= self
.waitForMessages(isLastExpectedMessage
)
1129 messages
= self
.remove_netlogon_messages(messages
)
1130 received
= len(messages
)
1132 (received
== 5 or received
== 6),
1133 "Did not receive the expected number of messages")
1135 def test_samlogon_interactive_bad_password(self
):
1137 workstation
= "AuthLogTests"
1139 def isLastExpectedMessage(msg
):
1140 return (msg
["type"] == "Authentication" and
1141 (msg
["Authentication"]["serviceDescription"] ==
1143 (msg
["Authentication"]["authDescription"] ==
1145 (msg
["Authentication"]["status"] ==
1146 "NT_STATUS_WRONG_PASSWORD") and
1147 (msg
["Authentication"]["workstation"] ==
1148 r
"\\%s" % workstation
) and
1149 (msg
["Authentication"]["eventId"] ==
1150 EVT_ID_UNSUCCESSFUL_LOGON
) and
1151 (msg
["Authentication"]["logonType"] ==
1152 EVT_LOGON_INTERACTIVE
))
1154 server
= os
.environ
["SERVER"]
1155 user
= os
.environ
["USERNAME"]
1156 password
= "badPassword"
1157 samlogon
= "samlogon %s %s %s %d" % (user
, password
, workstation
, 1)
1159 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1161 messages
= self
.waitForMessages(isLastExpectedMessage
)
1162 messages
= self
.remove_netlogon_messages(messages
)
1163 received
= len(messages
)
1165 (received
== 5 or received
== 6),
1166 "Did not receive the expected number of messages")
1168 def test_samlogon_interactive_bad_user(self
):
1170 workstation
= "AuthLogTests"
1172 def isLastExpectedMessage(msg
):
1173 return (msg
["type"] == "Authentication" and
1174 (msg
["Authentication"]["serviceDescription"] ==
1176 (msg
["Authentication"]["authDescription"] ==
1178 (msg
["Authentication"]["status"] ==
1179 "NT_STATUS_NO_SUCH_USER") and
1180 (msg
["Authentication"]["workstation"] ==
1181 r
"\\%s" % workstation
) and
1182 (msg
["Authentication"]["eventId"] ==
1183 EVT_ID_UNSUCCESSFUL_LOGON
) and
1184 (msg
["Authentication"]["logonType"] ==
1185 EVT_LOGON_INTERACTIVE
))
1187 server
= os
.environ
["SERVER"]
1189 password
= os
.environ
["PASSWORD"]
1190 samlogon
= "samlogon %s %s %s %d" % (user
, password
, workstation
, 1)
1192 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1194 messages
= self
.waitForMessages(isLastExpectedMessage
)
1195 messages
= self
.remove_netlogon_messages(messages
)
1196 received
= len(messages
)
1198 (received
== 5 or received
== 6),
1199 "Did not receive the expected number of messages")
1201 def test_samlogon_network(self
):
1203 workstation
= "AuthLogTests"
1205 def isLastExpectedMessage(msg
):
1206 return (msg
["type"] == "Authentication" and
1207 (msg
["Authentication"]["serviceDescription"] ==
1209 msg
["Authentication"]["authDescription"] == "network" and
1210 msg
["Authentication"]["status"] == "NT_STATUS_OK" and
1211 (msg
["Authentication"]["workstation"] ==
1212 r
"\\%s" % workstation
) and
1213 (msg
["Authentication"]["eventId"] ==
1214 EVT_ID_SUCCESSFUL_LOGON
) and
1215 (msg
["Authentication"]["logonType"] ==
1218 server
= os
.environ
["SERVER"]
1219 user
= os
.environ
["USERNAME"]
1220 password
= os
.environ
["PASSWORD"]
1221 samlogon
= "samlogon %s %s %s %d" % (user
, password
, workstation
, 2)
1223 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1225 messages
= self
.waitForMessages(isLastExpectedMessage
)
1226 messages
= self
.remove_netlogon_messages(messages
)
1227 received
= len(messages
)
1229 (received
== 5 or received
== 6),
1230 "Did not receive the expected number of messages")
1232 def test_samlogon_network_bad_password(self
):
1234 workstation
= "AuthLogTests"
1236 def isLastExpectedMessage(msg
):
1237 return (msg
["type"] == "Authentication" and
1238 (msg
["Authentication"]["serviceDescription"] ==
1240 msg
["Authentication"]["authDescription"] == "network" and
1241 (msg
["Authentication"]["status"] ==
1242 "NT_STATUS_WRONG_PASSWORD") and
1243 (msg
["Authentication"]["workstation"] ==
1244 r
"\\%s" % workstation
) and
1245 (msg
["Authentication"]["eventId"] ==
1246 EVT_ID_UNSUCCESSFUL_LOGON
) and
1247 (msg
["Authentication"]["logonType"] ==
1250 server
= os
.environ
["SERVER"]
1251 user
= os
.environ
["USERNAME"]
1252 password
= "badPassword"
1253 samlogon
= "samlogon %s %s %s %d" % (user
, password
, workstation
, 2)
1255 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1257 messages
= self
.waitForMessages(isLastExpectedMessage
)
1258 messages
= self
.remove_netlogon_messages(messages
)
1259 received
= len(messages
)
1261 (received
== 5 or received
== 6),
1262 "Did not receive the expected number of messages")
1264 def test_samlogon_network_bad_user(self
):
1266 workstation
= "AuthLogTests"
1268 def isLastExpectedMessage(msg
):
1269 return ((msg
["type"] == "Authentication") and
1270 (msg
["Authentication"]["serviceDescription"] ==
1272 (msg
["Authentication"]["authDescription"] == "network") and
1273 (msg
["Authentication"]["status"] ==
1274 "NT_STATUS_NO_SUCH_USER") and
1275 (msg
["Authentication"]["workstation"] ==
1276 r
"\\%s" % workstation
) and
1277 (msg
["Authentication"]["eventId"] ==
1278 EVT_ID_UNSUCCESSFUL_LOGON
) and
1279 (msg
["Authentication"]["logonType"] ==
1282 server
= os
.environ
["SERVER"]
1284 password
= os
.environ
["PASSWORD"]
1285 samlogon
= "samlogon %s %s %s %d" % (user
, password
, workstation
, 2)
1287 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1289 messages
= self
.waitForMessages(isLastExpectedMessage
)
1290 messages
= self
.remove_netlogon_messages(messages
)
1291 received
= len(messages
)
1293 (received
== 5 or received
== 6),
1294 "Did not receive the expected number of messages")
1296 def test_samlogon_network_mschap(self
):
1298 workstation
= "AuthLogTests"
1300 def isLastExpectedMessage(msg
):
1301 return ((msg
["type"] == "Authentication") and
1302 (msg
["Authentication"]["serviceDescription"] ==
1304 (msg
["Authentication"]["authDescription"] == "network") and
1305 (msg
["Authentication"]["status"] == "NT_STATUS_OK") and
1306 (msg
["Authentication"]["passwordType"] == "MSCHAPv2") and
1307 (msg
["Authentication"]["workstation"] ==
1308 r
"\\%s" % workstation
) and
1309 (msg
["Authentication"]["eventId"] ==
1310 EVT_ID_SUCCESSFUL_LOGON
) and
1311 (msg
["Authentication"]["logonType"] ==
1314 server
= os
.environ
["SERVER"]
1315 user
= os
.environ
["USERNAME"]
1316 password
= os
.environ
["PASSWORD"]
1317 samlogon
= "samlogon %s %s %s %d 0x00010000" % (
1318 user
, password
, workstation
, 2)
1320 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1322 messages
= self
.waitForMessages(isLastExpectedMessage
)
1323 messages
= self
.remove_netlogon_messages(messages
)
1324 received
= len(messages
)
1326 (received
== 5 or received
== 6),
1327 "Did not receive the expected number of messages")
1329 def test_samlogon_network_mschap_bad_password(self
):
1331 workstation
= "AuthLogTests"
1333 def isLastExpectedMessage(msg
):
1334 return ((msg
["type"] == "Authentication") and
1335 (msg
["Authentication"]["serviceDescription"] ==
1337 (msg
["Authentication"]["authDescription"] == "network") and
1338 (msg
["Authentication"]["status"] ==
1339 "NT_STATUS_WRONG_PASSWORD") and
1340 (msg
["Authentication"]["passwordType"] == "MSCHAPv2") and
1341 (msg
["Authentication"]["workstation"] ==
1342 r
"\\%s" % workstation
) and
1343 (msg
["Authentication"]["eventId"] ==
1344 EVT_ID_UNSUCCESSFUL_LOGON
) and
1345 (msg
["Authentication"]["logonType"] ==
1348 server
= os
.environ
["SERVER"]
1349 user
= os
.environ
["USERNAME"]
1350 password
= "badPassword"
1351 samlogon
= "samlogon %s %s %s %d 0x00010000" % (
1352 user
, password
, workstation
, 2)
1354 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1356 messages
= self
.waitForMessages(isLastExpectedMessage
)
1357 messages
= self
.remove_netlogon_messages(messages
)
1358 received
= len(messages
)
1360 (received
== 5 or received
== 6),
1361 "Did not receive the expected number of messages")
1363 def test_samlogon_network_mschap_bad_user(self
):
1365 workstation
= "AuthLogTests"
1367 def isLastExpectedMessage(msg
):
1368 return ((msg
["type"] == "Authentication") and
1369 (msg
["Authentication"]["serviceDescription"] ==
1371 (msg
["Authentication"]["authDescription"] == "network") and
1372 (msg
["Authentication"]["status"] ==
1373 "NT_STATUS_NO_SUCH_USER") and
1374 (msg
["Authentication"]["passwordType"] == "MSCHAPv2") and
1375 (msg
["Authentication"]["workstation"] ==
1376 r
"\\%s" % workstation
) and
1377 (msg
["Authentication"]["eventId"] ==
1378 EVT_ID_UNSUCCESSFUL_LOGON
) and
1379 (msg
["Authentication"]["logonType"] ==
1382 server
= os
.environ
["SERVER"]
1384 password
= os
.environ
["PASSWORD"]
1385 samlogon
= "samlogon %s %s %s %d 0x00010000" % (
1386 user
, password
, workstation
, 2)
1388 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1390 messages
= self
.waitForMessages(isLastExpectedMessage
)
1391 messages
= self
.remove_netlogon_messages(messages
)
1392 received
= len(messages
)
1394 (received
== 5 or received
== 6),
1395 "Did not receive the expected number of messages")
1397 def test_samlogon_schannel_seal(self
):
1399 workstation
= "AuthLogTests"
1401 def isLastExpectedMessage(msg
):
1402 return ((msg
["type"] == "Authentication") and
1403 (msg
["Authentication"]["serviceDescription"] ==
1405 (msg
["Authentication"]["authDescription"] == "network") and
1406 (msg
["Authentication"]["status"] == "NT_STATUS_OK") and
1407 (msg
["Authentication"]["workstation"] ==
1408 r
"\\%s" % workstation
) and
1409 (msg
["Authentication"]["eventId"] ==
1410 EVT_ID_SUCCESSFUL_LOGON
) and
1411 (msg
["Authentication"]["logonType"] ==
1414 server
= os
.environ
["SERVER"]
1415 user
= os
.environ
["USERNAME"]
1416 password
= os
.environ
["PASSWORD"]
1417 samlogon
= "schannel;samlogon %s %s %s" % (user
, password
, workstation
)
1419 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1421 messages
= self
.waitForMessages(isLastExpectedMessage
)
1422 messages
= self
.remove_netlogon_messages(messages
)
1423 received
= len(messages
)
1425 (received
== 5 or received
== 6),
1426 "Did not receive the expected number of messages")
1428 # Check the second to last message it should be an Authorization
1430 self
.assertEquals("Authorization", msg
["type"])
1431 self
.assertEquals("DCE/RPC",
1432 msg
["Authorization"]["serviceDescription"])
1433 self
.assertEquals("schannel", msg
["Authorization"]["authType"])
1434 self
.assertEquals("SEAL", msg
["Authorization"]["transportProtection"])
1435 self
.assertTrue(self
.is_guid(msg
["Authorization"]["sessionId"]))
1437 # Signed logons get promoted to sealed, this test ensures that
1438 # this behaviour is not removed accidentally
1439 def test_samlogon_schannel_sign(self
):
1441 workstation
= "AuthLogTests"
1443 def isLastExpectedMessage(msg
):
1444 return ((msg
["type"] == "Authentication") and
1445 (msg
["Authentication"]["serviceDescription"] ==
1447 (msg
["Authentication"]["authDescription"] == "network") and
1448 (msg
["Authentication"]["status"] == "NT_STATUS_OK") and
1449 (msg
["Authentication"]["workstation"] ==
1450 r
"\\%s" % workstation
) and
1451 (msg
["Authentication"]["eventId"] ==
1452 EVT_ID_SUCCESSFUL_LOGON
) and
1453 (msg
["Authentication"]["logonType"] ==
1456 server
= os
.environ
["SERVER"]
1457 user
= os
.environ
["USERNAME"]
1458 password
= os
.environ
["PASSWORD"]
1459 samlogon
= "schannelsign;samlogon %s %s %s" % (
1460 user
, password
, workstation
)
1462 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1464 messages
= self
.waitForMessages(isLastExpectedMessage
)
1465 messages
= self
.remove_netlogon_messages(messages
)
1466 received
= len(messages
)
1468 (received
== 5 or received
== 6),
1469 "Did not receive the expected number of messages")
1471 # Check the second to last message it should be an Authorization
1473 self
.assertEquals("Authorization", msg
["type"])
1474 self
.assertEquals("DCE/RPC",
1475 msg
["Authorization"]["serviceDescription"])
1476 self
.assertEquals("schannel", msg
["Authorization"]["authType"])
1477 self
.assertEquals("SEAL", msg
["Authorization"]["transportProtection"])
1478 self
.assertTrue(self
.is_guid(msg
["Authorization"]["sessionId"]))