From 8cf4e546960ab1493b0e39f0cef822a66d30bf56 Mon Sep 17 00:00:00 2001 From: Gary Lockyer Date: Mon, 30 Apr 2018 10:35:25 +1200 Subject: [PATCH] auth logging tests: Clean up flake8 warnings Signed-off-by: Gary Lockyer Reviewed-by: Andrew Bartlett --- python/samba/tests/auth_log.py | 471 +++++++++++----------- python/samba/tests/auth_log_base.py | 27 +- python/samba/tests/auth_log_ncalrpc.py | 27 +- python/samba/tests/auth_log_netlogon_bad_creds.py | 1 + python/samba/tests/auth_log_pass_change.py | 148 +++---- python/samba/tests/auth_log_samlogon.py | 23 +- 6 files changed, 326 insertions(+), 371 deletions(-) diff --git a/python/samba/tests/auth_log.py b/python/samba/tests/auth_log.py index 34312cb49c7..6cec63a8171 100644 --- a/python/samba/tests/auth_log.py +++ b/python/samba/tests/auth_log.py @@ -18,22 +18,18 @@ from __future__ import print_function """Tests for the Auth and AuthZ logging. """ -from samba import auth import samba.tests -from samba.messaging import Messaging -from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME from samba.dcerpc import srvsvc, dnsserver -import time -import json import os from samba import smb from samba.samdb import SamDB import samba.tests.auth_log_base -from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS +from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS from samba import NTSTATUSError from subprocess import call from ldb import LdbError + class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): def setUp(self): @@ -43,8 +39,6 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): def tearDown(self): super(AuthLogTests, self).tearDown() - - def _test_rpc_ncacn_np(self, authTypes, creds, service, binding, protection, checkFunction): def isLastExpectedMessage(msg): @@ -59,8 +53,8 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): if service == "dnsserver": x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding), - self.get_loadparm(), - creds) + self.get_loadparm(), + creds) elif service == "srvsvc": x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding), self.get_loadparm(), @@ -84,8 +78,9 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): self.assertEquals("Authentication", msg["type"]) self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) self.assertEquals("SMB", - msg["Authentication"]["serviceDescription"]) - self.assertEquals(authTypes[1], msg["Authentication"]["authDescription"]) + msg["Authentication"]["serviceDescription"]) + self.assertEquals(authTypes[1], + msg["Authentication"]["authDescription"]) # Check the second message it should be an Authorization msg = messages[1] @@ -106,11 +101,19 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): self.assertEquals("Authentication", msg["type"]) self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) self.assertTrue( - checkServiceDescription(msg["Authentication"]["serviceDescription"])) + checkServiceDescription( + msg["Authentication"]["serviceDescription"])) - self.assertEquals(authTypes[3], msg["Authentication"]["authDescription"]) + self.assertEquals(authTypes[3], + msg["Authentication"]["authDescription"]) - def rpc_ncacn_np_krb5_check(self, messages, authTypes, service, binding, protection): + def rpc_ncacn_np_krb5_check( + self, + messages, + authTypes, + service, + binding, + protection): expected_messages = len(authTypes) self.assertEquals(expected_messages, @@ -124,8 +127,9 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): self.assertEquals("Authentication", msg["type"]) self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) self.assertEquals("Kerberos KDC", - msg["Authentication"]["serviceDescription"]) - self.assertEquals(authTypes[1], msg["Authentication"]["authDescription"]) + msg["Authentication"]["serviceDescription"]) + self.assertEquals(authTypes[1], + msg["Authentication"]["authDescription"]) # Check the second message it should be an Authentication # This this the TCP Authentication in response to the message too big @@ -134,8 +138,9 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): self.assertEquals("Authentication", msg["type"]) self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) self.assertEquals("Kerberos KDC", - msg["Authentication"]["serviceDescription"]) - self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"]) + msg["Authentication"]["serviceDescription"]) + self.assertEquals(authTypes[2], + msg["Authentication"]["authDescription"]) # Check the third message it should be an Authorization msg = messages[2] @@ -151,7 +156,6 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): self.assertEquals("SMB", msg["Authorization"]["transportProtection"]) self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"])) - def test_rpc_ncacn_np_ntlm_dns_sign(self): creds = self.insta_creds(template=self.get_credentials(), kerberos_state=DONT_USE_KERBEROS) @@ -197,8 +201,8 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): "ENC-TS Pre-authentication", "ENC-TS Pre-authentication", "krb5"], - creds, "dnsserver", "sign", "SIGN", - self.rpc_ncacn_np_krb5_check) + creds, "dnsserver", "sign", "SIGN", + self.rpc_ncacn_np_krb5_check) def test_rpc_ncacn_np_krb_srv_sign(self): creds = self.insta_creds(template=self.get_credentials(), @@ -207,8 +211,8 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): "ENC-TS Pre-authentication", "ENC-TS Pre-authentication", "krb5"], - creds, "srvsvc", "sign", "SIGN", - self.rpc_ncacn_np_krb5_check) + creds, "srvsvc", "sign", "SIGN", + self.rpc_ncacn_np_krb5_check) def test_rpc_ncacn_np_krb_dns(self): creds = self.insta_creds(template=self.get_credentials(), @@ -234,9 +238,9 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): creds = self.insta_creds(template=self.get_credentials(), kerberos_state=MUST_USE_KERBEROS) self._test_rpc_ncacn_np(["ncacn_np", - "ENC-TS Pre-authentication", - "ENC-TS Pre-authentication", - "krb5"], + "ENC-TS Pre-authentication", + "ENC-TS Pre-authentication", + "krb5"], creds, "srvsvc", "", "SMB", self.rpc_ncacn_np_krb5_check) @@ -252,15 +256,15 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): binding = "[%s]" % binding if service == "dnsserver": - conn = dnsserver.dnsserver("ncacn_ip_tcp:%s%s" % (self.server, binding), - self.get_loadparm(), - creds) + conn = dnsserver.dnsserver( + "ncacn_ip_tcp:%s%s" % (self.server, binding), + self.get_loadparm(), + creds) elif service == "srvsvc": conn = srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding), self.get_loadparm(), creds) - messages = self.waitForMessages(isLastExpectedMessage, conn) checkFunction(messages, authTypes, service, binding, protection) @@ -286,8 +290,9 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): self.assertEquals("Authentication", msg["type"]) self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) self.assertEquals("DCE/RPC", - msg["Authentication"]["serviceDescription"]) - self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"]) + msg["Authentication"]["serviceDescription"]) + self.assertEquals(authTypes[2], + msg["Authentication"]["authDescription"]) def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service, binding, protection): @@ -311,16 +316,18 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): self.assertEquals("Authentication", msg["type"]) self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) self.assertEquals("Kerberos KDC", - msg["Authentication"]["serviceDescription"]) - self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"]) + msg["Authentication"]["serviceDescription"]) + self.assertEquals(authTypes[2], + msg["Authentication"]["authDescription"]) # Check the third message it should be an Authentication msg = messages[2] self.assertEquals("Authentication", msg["type"]) self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) self.assertEquals("Kerberos KDC", - msg["Authentication"]["serviceDescription"]) - self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"]) + msg["Authentication"]["serviceDescription"]) + self.assertEquals(authTypes[2], + msg["Authentication"]["authDescription"]) def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self): creds = self.insta_creds(template=self.get_credentials(), @@ -328,8 +335,8 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): self._test_rpc_ncacn_ip_tcp(["NTLMSSP", "ncacn_ip_tcp", "NTLMSSP"], - creds, "dnsserver", "sign", "SIGN", - self.rpc_ncacn_ip_tcp_ntlm_check) + creds, "dnsserver", "sign", "SIGN", + self.rpc_ncacn_ip_tcp_ntlm_check) def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self): creds = self.insta_creds(template=self.get_credentials(), @@ -338,8 +345,8 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): "ncacn_ip_tcp", "ENC-TS Pre-authentication", "ENC-TS Pre-authentication"], - creds, "dnsserver", "sign", "SIGN", - self.rpc_ncacn_ip_tcp_krb5_check) + creds, "dnsserver", "sign", "SIGN", + self.rpc_ncacn_ip_tcp_krb5_check) def test_rpc_ncacn_ip_tcp_ntlm_dns(self): creds = self.insta_creds(template=self.get_credentials(), @@ -347,8 +354,8 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): self._test_rpc_ncacn_ip_tcp(["NTLMSSP", "ncacn_ip_tcp", "NTLMSSP"], - creds, "dnsserver", "", "SIGN", - self.rpc_ncacn_ip_tcp_ntlm_check) + creds, "dnsserver", "", "SIGN", + self.rpc_ncacn_ip_tcp_ntlm_check) def test_rpc_ncacn_ip_tcp_krb5_dns(self): creds = self.insta_creds(template=self.get_credentials(), @@ -357,8 +364,8 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): "ncacn_ip_tcp", "ENC-TS Pre-authentication", "ENC-TS Pre-authentication"], - creds, "dnsserver", "", "SIGN", - self.rpc_ncacn_ip_tcp_krb5_check) + creds, "dnsserver", "", "SIGN", + self.rpc_ncacn_ip_tcp_krb5_check) def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self): creds = self.insta_creds(template=self.get_credentials(), @@ -366,8 +373,8 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): self._test_rpc_ncacn_ip_tcp(["NTLMSSP", "ncacn_ip_tcp", "NTLMSSP"], - creds, "dnsserver", "connect", "NONE", - self.rpc_ncacn_ip_tcp_ntlm_check) + creds, "dnsserver", "connect", "NONE", + self.rpc_ncacn_ip_tcp_ntlm_check) def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self): creds = self.insta_creds(template=self.get_credentials(), @@ -376,8 +383,8 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): "ncacn_ip_tcp", "ENC-TS Pre-authentication", "ENC-TS Pre-authentication"], - creds, "dnsserver", "connect", "NONE", - self.rpc_ncacn_ip_tcp_krb5_check) + creds, "dnsserver", "connect", "NONE", + self.rpc_ncacn_ip_tcp_krb5_check) def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self): creds = self.insta_creds(template=self.get_credentials(), @@ -385,8 +392,8 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): self._test_rpc_ncacn_ip_tcp(["NTLMSSP", "ncacn_ip_tcp", "NTLMSSP"], - creds, "dnsserver", "seal", "SEAL", - self.rpc_ncacn_ip_tcp_ntlm_check) + creds, "dnsserver", "seal", "SEAL", + self.rpc_ncacn_ip_tcp_ntlm_check) def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self): creds = self.insta_creds(template=self.get_credentials(), @@ -395,8 +402,8 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): "ncacn_ip_tcp", "ENC-TS Pre-authentication", "ENC-TS Pre-authentication"], - creds, "dnsserver", "seal", "SEAL", - self.rpc_ncacn_ip_tcp_krb5_check) + creds, "dnsserver", "seal", "SEAL", + self.rpc_ncacn_ip_tcp_krb5_check) def test_ldap(self): @@ -407,7 +414,7 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): msg["Authorization"]["authType"] == "krb5") self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"], - lp = self.get_loadparm(), + lp=self.get_loadparm(), credentials=self.get_credentials()) messages = self.waitForMessages(isLastExpectedMessage) @@ -420,18 +427,18 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): self.assertEquals("Authentication", msg["type"]) self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) self.assertEquals("Kerberos KDC", - msg["Authentication"]["serviceDescription"]) + msg["Authentication"]["serviceDescription"]) self.assertEquals("ENC-TS Pre-authentication", - msg["Authentication"]["authDescription"]) + msg["Authentication"]["authDescription"]) - # Check the first message it should be an Authentication + # Check the second message it should be an Authentication msg = messages[1] self.assertEquals("Authentication", msg["type"]) self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) self.assertEquals("Kerberos KDC", - msg["Authentication"]["serviceDescription"]) + msg["Authentication"]["serviceDescription"]) self.assertEquals("ENC-TS Pre-authentication", - msg["Authentication"]["authDescription"]) + msg["Authentication"]["authDescription"]) def test_ldap_ntlm(self): @@ -442,7 +449,7 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): msg["Authorization"]["authType"] == "NTLMSSP") self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"], - lp = self.get_loadparm(), + lp=self.get_loadparm(), credentials=self.get_credentials()) messages = self.waitForMessages(isLastExpectedMessage) @@ -454,7 +461,7 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): self.assertEquals("Authentication", msg["type"]) self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) self.assertEquals("LDAP", - msg["Authentication"]["serviceDescription"]) + msg["Authentication"]["serviceDescription"]) self.assertEquals("NTLMSSP", msg["Authentication"]["authDescription"]) def test_ldap_simple_bind(self): @@ -466,10 +473,10 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): creds = self.insta_creds(template=self.get_credentials()) creds.set_bind_dn("%s\\%s" % (creds.get_domain(), - creds.get_username())) + creds.get_username())) self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"], - lp = self.get_loadparm(), + lp=self.get_loadparm(), credentials=creds) messages = self.waitForMessages(isLastExpectedMessage) @@ -482,27 +489,27 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): self.assertEquals("Authentication", msg["type"]) self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) self.assertEquals("LDAP", - msg["Authentication"]["serviceDescription"]) + msg["Authentication"]["serviceDescription"]) self.assertEquals("simple bind", - msg["Authentication"]["authDescription"]) + msg["Authentication"]["authDescription"]) def test_ldap_simple_bind_bad_password(self): def isLastExpectedMessage(msg): return (msg["type"] == "Authentication" and msg["Authentication"]["serviceDescription"] == "LDAP" and - msg["Authentication"]["status"] - == "NT_STATUS_WRONG_PASSWORD" and + (msg["Authentication"]["status"] == + "NT_STATUS_WRONG_PASSWORD") and msg["Authentication"]["authDescription"] == "simple bind") creds = self.insta_creds(template=self.get_credentials()) creds.set_password("badPassword") creds.set_bind_dn("%s\\%s" % (creds.get_domain(), - creds.get_username())) + creds.get_username())) thrown = False try: self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"], - lp = self.get_loadparm(), + lp=self.get_loadparm(), credentials=creds) except LdbError: thrown = True @@ -513,13 +520,12 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): len(messages), "Did not receive the expected number of messages") - def test_ldap_simple_bind_bad_user(self): def isLastExpectedMessage(msg): return (msg["type"] == "Authentication" and msg["Authentication"]["serviceDescription"] == "LDAP" and - msg["Authentication"]["status"] - == "NT_STATUS_NO_SUCH_USER" and + (msg["Authentication"]["status"] == + "NT_STATUS_NO_SUCH_USER") and msg["Authentication"]["authDescription"] == "simple bind") creds = self.insta_creds(template=self.get_credentials()) @@ -528,7 +534,7 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): thrown = False try: self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"], - lp = self.get_loadparm(), + lp=self.get_loadparm(), credentials=creds) except LdbError: thrown = True @@ -539,13 +545,12 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): len(messages), "Did not receive the expected number of messages") - def test_ldap_simple_bind_unparseable_user(self): def isLastExpectedMessage(msg): return (msg["type"] == "Authentication" and msg["Authentication"]["serviceDescription"] == "LDAP" and - msg["Authentication"]["status"] - == "NT_STATUS_NO_SUCH_USER" and + (msg["Authentication"]["status"] == + "NT_STATUS_NO_SUCH_USER") and msg["Authentication"]["authDescription"] == "simple bind") creds = self.insta_creds(template=self.get_credentials()) @@ -554,7 +559,7 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): thrown = False try: self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"], - lp = self.get_loadparm(), + lp=self.get_loadparm(), credentials=creds) except LdbError: thrown = True @@ -572,23 +577,23 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): def test_ldap_anonymous_access_bind_only(self): # Should be no logging for anonymous bind # so receiving any message indicates a failure. - def isLastExpectedMessage( msg): + def isLastExpectedMessage(msg): return True creds = self.insta_creds(template=self.get_credentials()) creds.set_anonymous() self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"], - lp = self.get_loadparm(), + lp=self.get_loadparm(), credentials=creds) - messages = self.waitForMessages( isLastExpectedMessage) + messages = self.waitForMessages(isLastExpectedMessage) self.assertEquals(0, len(messages), "Did not receive the expected number of messages") def test_ldap_anonymous_access(self): - def isLastExpectedMessage( msg): + def isLastExpectedMessage(msg): return (msg["type"] == "Authorization" and msg["Authorization"]["serviceDescription"] == "LDAP" and msg["Authorization"]["transportProtection"] == "TLS" and @@ -599,19 +604,20 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): creds.set_anonymous() self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"], - lp = self.get_loadparm(), + lp=self.get_loadparm(), credentials=creds) try: - res = self.samdb.search(base=self.samdb.domain_dn()) - self.fail( "Expected an LdbError exception") + self.samdb.search(base=self.samdb.domain_dn()) + self.fail("Expected an LdbError exception") except LdbError: pass - messages = self.waitForMessages( isLastExpectedMessage) + messages = self.waitForMessages(isLastExpectedMessage) self.assertEquals(1, len(messages), "Did not receive the expected number of messages") + def test_smb(self): def isLastExpectedMessage(msg): return (msg["type"] == "Authorization" and @@ -634,28 +640,28 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): self.assertEquals("Authentication", msg["type"]) self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) self.assertEquals("Kerberos KDC", - msg["Authentication"]["serviceDescription"]) + msg["Authentication"]["serviceDescription"]) self.assertEquals("ENC-TS Pre-authentication", - msg["Authentication"]["authDescription"]) + msg["Authentication"]["authDescription"]) # Check the second message it should be an Authentication msg = messages[1] self.assertEquals("Authentication", msg["type"]) self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) self.assertEquals("Kerberos KDC", - msg["Authentication"]["serviceDescription"]) + msg["Authentication"]["serviceDescription"]) self.assertEquals("ENC-TS Pre-authentication", - msg["Authentication"]["authDescription"]) + msg["Authentication"]["authDescription"]) def test_smb_bad_password(self): def isLastExpectedMessage(msg): return (msg["type"] == "Authentication" and - msg["Authentication"]["serviceDescription"] - == "Kerberos KDC" and - msg["Authentication"]["status"] - == "NT_STATUS_WRONG_PASSWORD" and - msg["Authentication"]["authDescription"] - == "ENC-TS Pre-authentication") + (msg["Authentication"]["serviceDescription"] == + "Kerberos KDC") and + (msg["Authentication"]["status"] == + "NT_STATUS_WRONG_PASSWORD") and + (msg["Authentication"]["authDescription"] == + "ENC-TS Pre-authentication")) creds = self.insta_creds(template=self.get_credentials()) creds.set_password("badPassword") @@ -675,16 +681,15 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): len(messages), "Did not receive the expected number of messages") - def test_smb_bad_user(self): def isLastExpectedMessage(msg): return (msg["type"] == "Authentication" and - msg["Authentication"]["serviceDescription"] - == "Kerberos KDC" and - msg["Authentication"]["status"] - == "NT_STATUS_NO_SUCH_USER" and - msg["Authentication"]["authDescription"] - == "ENC-TS Pre-authentication") + (msg["Authentication"]["serviceDescription"] == + "Kerberos KDC") and + (msg["Authentication"]["status"] == + "NT_STATUS_NO_SUCH_USER") and + (msg["Authentication"]["authDescription"] == + "ENC-TS Pre-authentication")) creds = self.insta_creds(template=self.get_credentials()) creds.set_username("badUser") @@ -829,8 +834,8 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): msg["Authentication"]["serviceDescription"] == "SMB" and msg["Authentication"]["authDescription"] == "NTLMSSP" and msg["Authentication"]["passwordType"] == "NTLMv2" and - msg["Authentication"]["status"] - == "NT_STATUS_WRONG_PASSWORD") + (msg["Authentication"]["status"] == + "NT_STATUS_WRONG_PASSWORD")) creds = self.insta_creds(template=self.get_credentials(), kerberos_state=DONT_USE_KERBEROS) @@ -857,8 +862,8 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): msg["Authentication"]["serviceDescription"] == "SMB" and msg["Authentication"]["authDescription"] == "NTLMSSP" and msg["Authentication"]["passwordType"] == "NTLMv2" and - msg["Authentication"]["status"] - == "NT_STATUS_NO_SUCH_USER") + (msg["Authentication"]["status"] == + "NT_STATUS_NO_SUCH_USER")) creds = self.insta_creds(template=self.get_credentials(), kerberos_state=DONT_USE_KERBEROS) @@ -916,8 +921,8 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): msg["Authentication"]["serviceDescription"] == "SMB" and msg["Authentication"]["authDescription"] == "bare-NTLM" and msg["Authentication"]["passwordType"] == "NTLMv1" and - msg["Authentication"]["status"] - == "NT_STATUS_WRONG_PASSWORD") + (msg["Authentication"]["status"] == + "NT_STATUS_WRONG_PASSWORD")) creds = self.insta_creds(template=self.get_credentials(), kerberos_state=DONT_USE_KERBEROS) @@ -935,7 +940,6 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): thrown = True self.assertEquals(thrown, True) - messages = self.waitForMessages(isLastExpectedMessage) self.assertEquals(1, len(messages), @@ -947,8 +951,8 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): msg["Authentication"]["serviceDescription"] == "SMB" and msg["Authentication"]["authDescription"] == "bare-NTLM" and msg["Authentication"]["passwordType"] == "NTLMv1" and - msg["Authentication"]["status"] - == "NT_STATUS_NO_SUCH_USER") + (msg["Authentication"]["status"] == + "NT_STATUS_NO_SUCH_USER")) creds = self.insta_creds(template=self.get_credentials(), kerberos_state=DONT_USE_KERBEROS) @@ -966,7 +970,6 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): thrown = True self.assertEquals(thrown, True) - messages = self.waitForMessages(isLastExpectedMessage) self.assertEquals(1, len(messages), @@ -976,25 +979,24 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): workstation = "AuthLogTests" - def isLastExpectedMessage( msg): + def isLastExpectedMessage(msg): return (msg["type"] == "Authentication" and - msg["Authentication"]["serviceDescription"] - == "SamLogon" and - msg["Authentication"]["authDescription"] - == "interactive" and + (msg["Authentication"]["serviceDescription"] == + "SamLogon") and + (msg["Authentication"]["authDescription"] == + "interactive") and msg["Authentication"]["status"] == "NT_STATUS_OK" and - msg["Authentication"]["workstation"] - == r"\\%s" % workstation) + (msg["Authentication"]["workstation"] == + r"\\%s" % workstation)) server = os.environ["SERVER"] user = os.environ["USERNAME"] password = os.environ["PASSWORD"] samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1) - call(["bin/rpcclient", "-c", samlogon, "-U%", server]) - messages = self.waitForMessages( isLastExpectedMessage) + messages = self.waitForMessages(isLastExpectedMessage) messages = self.remove_netlogon_messages(messages) received = len(messages) self.assertIs(True, @@ -1005,26 +1007,25 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): workstation = "AuthLogTests" - def isLastExpectedMessage( msg): + def isLastExpectedMessage(msg): return (msg["type"] == "Authentication" and - msg["Authentication"]["serviceDescription"] - == "SamLogon" and - msg["Authentication"]["authDescription"] - == "interactive" and - msg["Authentication"]["status"] - == "NT_STATUS_WRONG_PASSWORD" and - msg["Authentication"]["workstation"] - == r"\\%s" % workstation) + (msg["Authentication"]["serviceDescription"] == + "SamLogon") and + (msg["Authentication"]["authDescription"] == + "interactive") and + (msg["Authentication"]["status"] == + "NT_STATUS_WRONG_PASSWORD") and + (msg["Authentication"]["workstation"] == + r"\\%s" % workstation)) server = os.environ["SERVER"] user = os.environ["USERNAME"] password = "badPassword" samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1) - call(["bin/rpcclient", "-c", samlogon, "-U%", server]) - messages = self.waitForMessages( isLastExpectedMessage) + messages = self.waitForMessages(isLastExpectedMessage) messages = self.remove_netlogon_messages(messages) received = len(messages) self.assertIs(True, @@ -1035,26 +1036,25 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): workstation = "AuthLogTests" - def isLastExpectedMessage( msg): + def isLastExpectedMessage(msg): return (msg["type"] == "Authentication" and - msg["Authentication"]["serviceDescription"] - == "SamLogon" and - msg["Authentication"]["authDescription"] - == "interactive" and - msg["Authentication"]["status"] - == "NT_STATUS_NO_SUCH_USER" and - msg["Authentication"]["workstation"] - == r"\\%s" % workstation) + (msg["Authentication"]["serviceDescription"] == + "SamLogon") and + (msg["Authentication"]["authDescription"] == + "interactive") and + (msg["Authentication"]["status"] == + "NT_STATUS_NO_SUCH_USER") and + (msg["Authentication"]["workstation"] == + r"\\%s" % workstation)) server = os.environ["SERVER"] user = "badUser" password = os.environ["PASSWORD"] samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1) - call(["bin/rpcclient", "-c", samlogon, "-U%", server]) - messages = self.waitForMessages( isLastExpectedMessage) + messages = self.waitForMessages(isLastExpectedMessage) messages = self.remove_netlogon_messages(messages) received = len(messages) self.assertIs(True, @@ -1065,25 +1065,23 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): workstation = "AuthLogTests" - def isLastExpectedMessage( msg): + def isLastExpectedMessage(msg): return (msg["type"] == "Authentication" and - msg["Authentication"]["serviceDescription"] - == "SamLogon" and - msg["Authentication"]["authDescription"] - == "network" and + (msg["Authentication"]["serviceDescription"] == + "SamLogon") and + msg["Authentication"]["authDescription"] == "network" and msg["Authentication"]["status"] == "NT_STATUS_OK" and - msg["Authentication"]["workstation"] - == r"\\%s" % workstation) + (msg["Authentication"]["workstation"] == + r"\\%s" % workstation)) server = os.environ["SERVER"] user = os.environ["USERNAME"] password = os.environ["PASSWORD"] samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2) - call(["bin/rpcclient", "-c", samlogon, "-U%", server]) - messages = self.waitForMessages( isLastExpectedMessage) + messages = self.waitForMessages(isLastExpectedMessage) messages = self.remove_netlogon_messages(messages) received = len(messages) self.assertIs(True, @@ -1094,26 +1092,24 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): workstation = "AuthLogTests" - def isLastExpectedMessage( msg): + def isLastExpectedMessage(msg): return (msg["type"] == "Authentication" and - msg["Authentication"]["serviceDescription"] - == "SamLogon" and - msg["Authentication"]["authDescription"] - == "network" and - msg["Authentication"]["status"] - == "NT_STATUS_WRONG_PASSWORD" and - msg["Authentication"]["workstation"] - == r"\\%s" % workstation) + (msg["Authentication"]["serviceDescription"] == + "SamLogon") and + msg["Authentication"]["authDescription"] == "network" and + (msg["Authentication"]["status"] == + "NT_STATUS_WRONG_PASSWORD") and + (msg["Authentication"]["workstation"] == + r"\\%s" % workstation)) server = os.environ["SERVER"] user = os.environ["USERNAME"] password = "badPassword" samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2) - call(["bin/rpcclient", "-c", samlogon, "-U%", server]) - messages = self.waitForMessages( isLastExpectedMessage) + messages = self.waitForMessages(isLastExpectedMessage) messages = self.remove_netlogon_messages(messages) received = len(messages) self.assertIs(True, @@ -1124,26 +1120,24 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): workstation = "AuthLogTests" - def isLastExpectedMessage( msg): - return (msg["type"] == "Authentication" and - msg["Authentication"]["serviceDescription"] - == "SamLogon" and - msg["Authentication"]["authDescription"] - == "network" and - msg["Authentication"]["status"] - == "NT_STATUS_NO_SUCH_USER" and - msg["Authentication"]["workstation"] - == r"\\%s" % workstation) + def isLastExpectedMessage(msg): + return ((msg["type"] == "Authentication") and + (msg["Authentication"]["serviceDescription"] == + "SamLogon") and + (msg["Authentication"]["authDescription"] == "network") and + (msg["Authentication"]["status"] == + "NT_STATUS_NO_SUCH_USER") and + (msg["Authentication"]["workstation"] == + r"\\%s" % workstation)) server = os.environ["SERVER"] user = "badUser" - password = os.environ["PASSWORD"] + password = os.environ["PASSWORD"] samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2) - call(["bin/rpcclient", "-c", samlogon, "-U%", server]) - messages = self.waitForMessages( isLastExpectedMessage) + messages = self.waitForMessages(isLastExpectedMessage) messages = self.remove_netlogon_messages(messages) received = len(messages) self.assertIs(True, @@ -1154,26 +1148,25 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): workstation = "AuthLogTests" - def isLastExpectedMessage( msg): - return (msg["type"] == "Authentication" and - msg["Authentication"]["serviceDescription"] - == "SamLogon" and - msg["Authentication"]["authDescription"] - == "network" and - msg["Authentication"]["status"] == "NT_STATUS_OK" and - msg["Authentication"]["passwordType"] == "MSCHAPv2" and - msg["Authentication"]["workstation"] - == r"\\%s" % workstation) + def isLastExpectedMessage(msg): + return ((msg["type"] == "Authentication") and + (msg["Authentication"]["serviceDescription"] == + "SamLogon") and + (msg["Authentication"]["authDescription"] == "network") and + (msg["Authentication"]["status"] == "NT_STATUS_OK") and + (msg["Authentication"]["passwordType"] == "MSCHAPv2") and + (msg["Authentication"]["workstation"] == + r"\\%s" % workstation)) server = os.environ["SERVER"] user = os.environ["USERNAME"] password = os.environ["PASSWORD"] - samlogon = "samlogon %s %s %s %d 0x00010000" % (user, password, workstation, 2) - + samlogon = "samlogon %s %s %s %d 0x00010000" % ( + user, password, workstation, 2) call(["bin/rpcclient", "-c", samlogon, "-U%", server]) - messages = self.waitForMessages( isLastExpectedMessage) + messages = self.waitForMessages(isLastExpectedMessage) messages = self.remove_netlogon_messages(messages) received = len(messages) self.assertIs(True, @@ -1184,27 +1177,26 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): workstation = "AuthLogTests" - def isLastExpectedMessage( msg): - return (msg["type"] == "Authentication" and - msg["Authentication"]["serviceDescription"] - == "SamLogon" and - msg["Authentication"]["authDescription"] - == "network" and - msg["Authentication"]["status"] - == "NT_STATUS_WRONG_PASSWORD" and - msg["Authentication"]["passwordType"] == "MSCHAPv2" and - msg["Authentication"]["workstation"] - == r"\\%s" % workstation) + def isLastExpectedMessage(msg): + return ((msg["type"] == "Authentication") and + (msg["Authentication"]["serviceDescription"] == + "SamLogon") and + (msg["Authentication"]["authDescription"] == "network") and + (msg["Authentication"]["status"] == + "NT_STATUS_WRONG_PASSWORD") and + (msg["Authentication"]["passwordType"] == "MSCHAPv2") and + (msg["Authentication"]["workstation"] == + r"\\%s" % workstation)) server = os.environ["SERVER"] user = os.environ["USERNAME"] password = "badPassword" - samlogon = "samlogon %s %s %s %d 0x00010000" % (user, password, workstation, 2) - + samlogon = "samlogon %s %s %s %d 0x00010000" % ( + user, password, workstation, 2) call(["bin/rpcclient", "-c", samlogon, "-U%", server]) - messages = self.waitForMessages( isLastExpectedMessage) + messages = self.waitForMessages(isLastExpectedMessage) messages = self.remove_netlogon_messages(messages) received = len(messages) self.assertIs(True, @@ -1215,27 +1207,26 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): workstation = "AuthLogTests" - def isLastExpectedMessage( msg): - return (msg["type"] == "Authentication" and - msg["Authentication"]["serviceDescription"] - == "SamLogon" and - msg["Authentication"]["authDescription"] - == "network" and - msg["Authentication"]["status"] - == "NT_STATUS_NO_SUCH_USER" and - msg["Authentication"]["passwordType"] == "MSCHAPv2" and - msg["Authentication"]["workstation"] - == r"\\%s" % workstation) + def isLastExpectedMessage(msg): + return ((msg["type"] == "Authentication") and + (msg["Authentication"]["serviceDescription"] == + "SamLogon") and + (msg["Authentication"]["authDescription"] == "network") and + (msg["Authentication"]["status"] == + "NT_STATUS_NO_SUCH_USER") and + (msg["Authentication"]["passwordType"] == "MSCHAPv2") and + (msg["Authentication"]["workstation"] == + r"\\%s" % workstation)) server = os.environ["SERVER"] user = "badUser" password = os.environ["PASSWORD"] - samlogon = "samlogon %s %s %s %d 0x00010000" % (user, password, workstation, 2) - + samlogon = "samlogon %s %s %s %d 0x00010000" % ( + user, password, workstation, 2) call(["bin/rpcclient", "-c", samlogon, "-U%", server]) - messages = self.waitForMessages( isLastExpectedMessage) + messages = self.waitForMessages(isLastExpectedMessage) messages = self.remove_netlogon_messages(messages) received = len(messages) self.assertIs(True, @@ -1246,25 +1237,23 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): workstation = "AuthLogTests" - def isLastExpectedMessage( msg): - return (msg["type"] == "Authentication" and - msg["Authentication"]["serviceDescription"] - == "SamLogon" and - msg["Authentication"]["authDescription"] - == "network" and - msg["Authentication"]["status"] == "NT_STATUS_OK" and - msg["Authentication"]["workstation"] - == r"\\%s" % workstation) + def isLastExpectedMessage(msg): + return ((msg["type"] == "Authentication") and + (msg["Authentication"]["serviceDescription"] == + "SamLogon") and + (msg["Authentication"]["authDescription"] == "network") and + (msg["Authentication"]["status"] == "NT_STATUS_OK") and + (msg["Authentication"]["workstation"] == + r"\\%s" % workstation)) server = os.environ["SERVER"] user = os.environ["USERNAME"] password = os.environ["PASSWORD"] samlogon = "schannel;samlogon %s %s %s" % (user, password, workstation) - call(["bin/rpcclient", "-c", samlogon, "-U%", server]) - messages = self.waitForMessages( isLastExpectedMessage) + messages = self.waitForMessages(isLastExpectedMessage) messages = self.remove_netlogon_messages(messages) received = len(messages) self.assertIs(True, @@ -1278,32 +1267,32 @@ class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): msg["Authorization"]["serviceDescription"]) self.assertEquals("schannel", msg["Authorization"]["authType"]) self.assertEquals("SEAL", msg["Authorization"]["transportProtection"]) + self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"])) # Signed logons get promoted to sealed, this test ensures that - # this behaviour is not removed accidently + # this behaviour is not removed accidentally def test_samlogon_schannel_sign(self): workstation = "AuthLogTests" - def isLastExpectedMessage( msg): - return (msg["type"] == "Authentication" and - msg["Authentication"]["serviceDescription"] - == "SamLogon" and - msg["Authentication"]["authDescription"] - == "network" and - msg["Authentication"]["status"] == "NT_STATUS_OK" and - msg["Authentication"]["workstation"] - == r"\\%s" % workstation) + def isLastExpectedMessage(msg): + return ((msg["type"] == "Authentication") and + (msg["Authentication"]["serviceDescription"] == + "SamLogon") and + (msg["Authentication"]["authDescription"] == "network") and + (msg["Authentication"]["status"] == "NT_STATUS_OK") and + (msg["Authentication"]["workstation"] == + r"\\%s" % workstation)) server = os.environ["SERVER"] user = os.environ["USERNAME"] password = os.environ["PASSWORD"] - samlogon = "schannelsign;samlogon %s %s %s" % (user, password, workstation) - + samlogon = "schannelsign;samlogon %s %s %s" % ( + user, password, workstation) call(["bin/rpcclient", "-c", samlogon, "-U%", server]) - messages = self.waitForMessages( isLastExpectedMessage) + messages = self.waitForMessages(isLastExpectedMessage) messages = self.remove_netlogon_messages(messages) received = len(messages) self.assertIs(True, diff --git a/python/samba/tests/auth_log_base.py b/python/samba/tests/auth_log_base.py index 5bb98219bba..5a70ce3f970 100644 --- a/python/samba/tests/auth_log_base.py +++ b/python/samba/tests/auth_log_base.py @@ -19,34 +19,31 @@ from __future__ import print_function """Tests for the Auth and AuthZ logging. """ -from samba import auth import samba.tests from samba.messaging import Messaging from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME -from samba.dcerpc import srvsvc, dnsserver import time import json import os import re -from samba import smb -from samba.samdb import SamDB + class AuthLogTestBase(samba.tests.TestCase): def setUp(self): super(AuthLogTestBase, self).setUp() lp_ctx = self.get_loadparm() - self.msg_ctx = Messaging((1,), lp_ctx=lp_ctx); + self.msg_ctx = Messaging((1,), lp_ctx=lp_ctx) self.msg_ctx.irpc_add_name(AUTH_EVENT_NAME) - def messageHandler( context, msgType, src, message): + def messageHandler(context, msgType, src, message): # This does not look like sub unit output and it # makes these tests much easier to debug. print(message) jsonMsg = json.loads(message) - context["messages"].append( jsonMsg) + context["messages"].append(jsonMsg) - self.context = { "messages": []} + self.context = {"messages": []} self.msg_handler_and_context = (messageHandler, self.context) self.msg_ctx.register(self.msg_handler_and_context, msg_type=MSG_AUTH_LOG) @@ -62,20 +59,19 @@ class AuthLogTestBase(samba.tests.TestCase): self.msg_ctx.deregister(self.msg_handler_and_context, msg_type=MSG_AUTH_LOG) - def waitForMessages(self, isLastExpectedMessage, connection=None): """Wait for all the expected messages to arrive The connection is passed through to keep the connection alive until all the logging messages have been received. """ - def completed( messages): + def completed(messages): for message in messages: - if isRemote( message) and isLastExpectedMessage( message): + if isRemote(message) and isLastExpectedMessage(message): return True return False - def isRemote( message): + def isRemote(message): remote = None if message["type"] == "Authorization": remote = message["Authorization"]["remoteAddress"] @@ -93,19 +89,19 @@ class AuthLogTestBase(samba.tests.TestCase): self.connection = connection start_time = time.time() - while not completed( self.context["messages"]): + while not completed(self.context["messages"]): self.msg_ctx.loop_once(0.1) if time.time() - start_time > 1: self.connection = None return [] self.connection = None - return filter( isRemote, self.context["messages"]) + return filter(isRemote, self.context["messages"]) # Discard any previously queued messages. def discardMessages(self): self.msg_ctx.loop_once(0.001) - while len( self.context["messages"]): + while len(self.context["messages"]): self.msg_ctx.loop_once(0.001) self.context["messages"] = [] @@ -123,6 +119,7 @@ class AuthLogTestBase(samba.tests.TestCase): return list(filter(is_not_netlogon, messages)) GUID_RE = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}" + # # Is the supplied GUID string correctly formatted # diff --git a/python/samba/tests/auth_log_ncalrpc.py b/python/samba/tests/auth_log_ncalrpc.py index 2f61cc5c672..849cee7a409 100644 --- a/python/samba/tests/auth_log_ncalrpc.py +++ b/python/samba/tests/auth_log_ncalrpc.py @@ -18,19 +18,12 @@ """Tests for the Auth and AuthZ logging. """ -from samba import auth import samba.tests -from samba.messaging import Messaging -from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME +from samba.credentials import DONT_USE_KERBEROS from samba.dcerpc.dcerpc import AS_SYSTEM_MAGIC_PATH_TOKEN from samba.dcerpc import samr -import time -import json -import os -from samba import smb -from samba.samdb import SamDB import samba.tests.auth_log_base -from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS + class AuthLogTestsNcalrpc(samba.tests.auth_log_base.AuthLogTestBase): @@ -39,25 +32,23 @@ class AuthLogTestsNcalrpc(samba.tests.auth_log_base.AuthLogTestBase): self.remoteAddress = AS_SYSTEM_MAGIC_PATH_TOKEN def tearDown(self): - super(AuthLogTestsNcalrpc , self).tearDown() - + super(AuthLogTestsNcalrpc, self).tearDown() def _test_rpc_ncaclrpc(self, authTypes, binding, creds, protection, checkFunction): - def isLastExpectedMessage( msg): + def isLastExpectedMessage(msg): return ( msg["type"] == "Authorization" and msg["Authorization"]["serviceDescription"] == "DCE/RPC" and msg["Authorization"]["authType"] == authTypes[0] and - msg["Authorization"]["transportProtection"] == protection - ) + msg["Authorization"]["transportProtection"] == protection) if binding: binding = "[%s]" % binding samr.samr("ncalrpc:%s" % binding, self.get_loadparm(), creds) - messages = self.waitForMessages( isLastExpectedMessage) + messages = self.waitForMessages(isLastExpectedMessage) checkFunction(messages, authTypes, protection) def rpc_ncacn_np_ntlm_check(self, messages, authTypes, protection): @@ -81,9 +72,9 @@ class AuthLogTestsNcalrpc(samba.tests.auth_log_base.AuthLogTestBase): self.assertEquals("Authentication", msg["type"]) self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) self.assertEquals("DCE/RPC", - msg["Authentication"]["serviceDescription"]) - self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"]) - + msg["Authentication"]["serviceDescription"]) + self.assertEquals(authTypes[2], + msg["Authentication"]["authDescription"]) def test_ncalrpc_ntlm_dns_sign(self): diff --git a/python/samba/tests/auth_log_netlogon_bad_creds.py b/python/samba/tests/auth_log_netlogon_bad_creds.py index c18d270ed82..a0a2e2885f8 100644 --- a/python/samba/tests/auth_log_netlogon_bad_creds.py +++ b/python/samba/tests/auth_log_netlogon_bad_creds.py @@ -38,6 +38,7 @@ from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_PASSWD_NOTREQD from samba.dcerpc.misc import SEC_CHAN_WKSTA from samba.dcerpc.netlogon import NETLOGON_NEG_STRONG_KEYS + class AuthLogTestsNetLogonBadCreds(samba.tests.auth_log_base.AuthLogTestBase): def setUp(self): diff --git a/python/samba/tests/auth_log_pass_change.py b/python/samba/tests/auth_log_pass_change.py index 9782389b97e..8890694d37e 100644 --- a/python/samba/tests/auth_log_pass_change.py +++ b/python/samba/tests/auth_log_pass_change.py @@ -19,23 +19,20 @@ from __future__ import print_function """Tests for the Auth and AuthZ logging of password changes. """ -from samba import auth import samba.tests -from samba.messaging import Messaging from samba.samdb import SamDB from samba.auth import system_session -import json import os import samba.tests.auth_log_base from samba.tests import delete_force from samba.net import Net -from samba import ntstatus import samba from subprocess import call from ldb import LdbError USER_NAME = "authlogtestuser" -USER_PASS = samba.generate_random_password(32,32) +USER_PASS = samba.generate_random_password(32, 32) + class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): @@ -56,9 +53,6 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): base_dn = self.ldb.domain_dn() print("base_dn %s" % base_dn) - # Gets back the configuration basedn - configuration_dn = self.ldb.get_config_basedn().get_linearized() - # Get the old "dSHeuristics" if it was set dsheuristics = self.ldb.get_dsheuristics() @@ -82,10 +76,10 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): # (Re)adds the test user USER_NAME with password USER_PASS delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn) self.ldb.add({ - "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn, - "objectclass": "user", - "sAMAccountName": USER_NAME, - "userPassword": USER_PASS + "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn, + "objectclass": "user", + "sAMAccountName": USER_NAME, + "userPassword": USER_PASS }) # discard any auth log messages for the password setup @@ -94,18 +88,16 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): def tearDown(self): super(AuthLogPassChangeTests, self).tearDown() - def test_admin_change_password(self): def isLastExpectedMessage(msg): - return (msg["type"] == "Authentication" and - msg["Authentication"]["status"] - == "NT_STATUS_OK" and - msg["Authentication"]["serviceDescription"] - == "SAMR Password Change" and - msg["Authentication"]["authDescription"] - == "samr_ChangePasswordUser3") + return ((msg["type"] == "Authentication") and + (msg["Authentication"]["status"] == "NT_STATUS_OK") and + (msg["Authentication"]["serviceDescription"] == + "SAMR Password Change") and + (msg["Authentication"]["authDescription"] == + "samr_ChangePasswordUser3")) - creds = self.insta_creds(template = self.get_credentials()) + creds = self.insta_creds(template=self.get_credentials()) lp = self.get_loadparm() net = Net(creds, lp, server=self.server_ip) @@ -115,7 +107,6 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): username=USER_NAME, oldpassword=USER_PASS) - messages = self.waitForMessages(isLastExpectedMessage) print("Received %d messages" % len(messages)) self.assertEquals(8, @@ -124,13 +115,13 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): def test_admin_change_password_new_password_fails_restriction(self): def isLastExpectedMessage(msg): - return (msg["type"] == "Authentication" and - msg["Authentication"]["status"] - == "NT_STATUS_PASSWORD_RESTRICTION" and - msg["Authentication"]["serviceDescription"] - == "SAMR Password Change" and - msg["Authentication"]["authDescription"] - == "samr_ChangePasswordUser3") + return ((msg["type"] == "Authentication") and + (msg["Authentication"]["status"] == + "NT_STATUS_PASSWORD_RESTRICTION") and + (msg["Authentication"]["serviceDescription"] == + "SAMR Password Change") and + (msg["Authentication"]["authDescription"] == + "samr_ChangePasswordUser3")) creds = self.insta_creds(template=self.get_credentials()) @@ -143,7 +134,7 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): net.change_password(newpassword=password.encode('utf-8'), oldpassword=USER_PASS, username=USER_NAME) - except Exception as msg: + except Exception: exception_thrown = True self.assertEquals(True, exception_thrown, "Expected exception not thrown") @@ -155,13 +146,13 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): def test_admin_change_password_unknown_user(self): def isLastExpectedMessage(msg): - return (msg["type"] == "Authentication" and - msg["Authentication"]["status"] - == "NT_STATUS_NO_SUCH_USER" and - msg["Authentication"]["serviceDescription"] - == "SAMR Password Change" and - msg["Authentication"]["authDescription"] - == "samr_ChangePasswordUser3") + return ((msg["type"] == "Authentication") and + (msg["Authentication"]["status"] == + "NT_STATUS_NO_SUCH_USER") and + (msg["Authentication"]["serviceDescription"] == + "SAMR Password Change") and + (msg["Authentication"]["authDescription"] == + "samr_ChangePasswordUser3")) creds = self.insta_creds(template=self.get_credentials()) @@ -174,7 +165,7 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): net.change_password(newpassword=password.encode('utf-8'), oldpassword=USER_PASS, username="badUser") - except Exception as msg: + except Exception: exception_thrown = True self.assertEquals(True, exception_thrown, "Expected exception not thrown") @@ -186,13 +177,13 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): def test_admin_change_password_bad_original_password(self): def isLastExpectedMessage(msg): - return (msg["type"] == "Authentication" and - msg["Authentication"]["status"] - == "NT_STATUS_WRONG_PASSWORD" and - msg["Authentication"]["serviceDescription"] - == "SAMR Password Change" and - msg["Authentication"]["authDescription"] - == "samr_ChangePasswordUser3") + return ((msg["type"] == "Authentication") and + (msg["Authentication"]["status"] == + "NT_STATUS_WRONG_PASSWORD") and + (msg["Authentication"]["serviceDescription"] == + "SAMR Password Change") and + (msg["Authentication"]["authDescription"] == + "samr_ChangePasswordUser3")) creds = self.insta_creds(template=self.get_credentials()) @@ -205,7 +196,7 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): net.change_password(newpassword=password.encode('utf-8'), oldpassword="badPassword", username=USER_NAME) - except Exception as msg: + except Exception: exception_thrown = True self.assertEquals(True, exception_thrown, "Expected exception not thrown") @@ -221,19 +212,19 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): # correctly, so we just check it triggers the wrong password path. def test_rap_change_password(self): def isLastExpectedMessage(msg): - return (msg["type"] == "Authentication" and - msg["Authentication"]["serviceDescription"] - == "SAMR Password Change" and - msg["Authentication"]["status"] - == "NT_STATUS_WRONG_PASSWORD" and - msg["Authentication"]["authDescription"] - == "OemChangePasswordUser2") + return ((msg["type"] == "Authentication") and + (msg["Authentication"]["serviceDescription"] == + "SAMR Password Change") and + (msg["Authentication"]["status"] == + "NT_STATUS_WRONG_PASSWORD") and + (msg["Authentication"]["authDescription"] == + "OemChangePasswordUser2")) username = os.environ["USERNAME"] server = os.environ["SERVER"] password = os.environ["PASSWORD"] server_param = "--server=%s" % server - creds = "-U%s%%%s" % (username,password) + creds = "-U%s%%%s" % (username, password) call(["bin/net", "rap", server_param, "password", USER_NAME, "notMyPassword", "notGoingToBeMyPassword", server, creds, "--option=client ipc max protocol=nt1"]) @@ -245,23 +236,21 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): def test_ldap_change_password(self): def isLastExpectedMessage(msg): - return (msg["type"] == "Authentication" and - msg["Authentication"]["status"] - == "NT_STATUS_OK" and - msg["Authentication"]["serviceDescription"] - == "LDAP Password Change" and - msg["Authentication"]["authDescription"] - == "LDAP Modify") - - new_password = samba.generate_random_password(32,32) + return ((msg["type"] == "Authentication") and + (msg["Authentication"]["status"] == "NT_STATUS_OK") and + (msg["Authentication"]["serviceDescription"] == + "LDAP Password Change") and + (msg["Authentication"]["authDescription"] == + "LDAP Modify")) + + new_password = samba.generate_random_password(32, 32) self.ldb.modify_ldif( "dn: cn=" + USER_NAME + ",cn=users," + self.base_dn + "\n" + "changetype: modify\n" + "delete: userPassword\n" + "userPassword: " + USER_PASS + "\n" + "add: userPassword\n" + - "userPassword: " + new_password + "\n" - ) + "userPassword: " + new_password + "\n") messages = self.waitForMessages(isLastExpectedMessage) print("Received %d messages" % len(messages)) @@ -276,11 +265,10 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): def test_ldap_change_password_bad_user(self): def isLastExpectedMessage(msg): return (msg["type"] == "Authorization" and - msg["Authorization"]["serviceDescription"] - == "LDAP" and + msg["Authorization"]["serviceDescription"] == "LDAP" and msg["Authorization"]["authType"] == "krb5") - new_password = samba.generate_random_password(32,32) + new_password = samba.generate_random_password(32, 32) try: self.ldb.modify_ldif( "dn: cn=" + "badUser" + ",cn=users," + self.base_dn + "\n" + @@ -288,8 +276,7 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): "delete: userPassword\n" + "userPassword: " + USER_PASS + "\n" + "add: userPassword\n" + - "userPassword: " + new_password + "\n" - ) + "userPassword: " + new_password + "\n") self.fail() except LdbError as e: (num, msg) = e.args @@ -303,15 +290,15 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): def test_ldap_change_password_bad_original_password(self): def isLastExpectedMessage(msg): - return (msg["type"] == "Authentication" and - msg["Authentication"]["status"] - == "NT_STATUS_WRONG_PASSWORD" and - msg["Authentication"]["serviceDescription"] - == "LDAP Password Change" and - msg["Authentication"]["authDescription"] - == "LDAP Modify") - - new_password = samba.generate_random_password(32,32) + return ((msg["type"] == "Authentication") and + (msg["Authentication"]["status"] == + "NT_STATUS_WRONG_PASSWORD") and + (msg["Authentication"]["serviceDescription"] == + "LDAP Password Change") and + (msg["Authentication"]["authDescription"] == + "LDAP Modify")) + + new_password = samba.generate_random_password(32, 32) try: self.ldb.modify_ldif( "dn: cn=" + USER_NAME + ",cn=users," + self.base_dn + "\n" + @@ -319,8 +306,7 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase): "delete: userPassword\n" + "userPassword: " + "badPassword" + "\n" + "add: userPassword\n" + - "userPassword: " + new_password + "\n" - ) + "userPassword: " + new_password + "\n") self.fail() except LdbError as e1: (num, msg) = e1.args diff --git a/python/samba/tests/auth_log_samlogon.py b/python/samba/tests/auth_log_samlogon.py index 105a16dea91..d3b14f3d69e 100644 --- a/python/samba/tests/auth_log_samlogon.py +++ b/python/samba/tests/auth_log_samlogon.py @@ -19,14 +19,8 @@ Tests auth logging tests that exercise SamLogon """ -from samba import auth import samba.tests -from samba.messaging import Messaging -from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME -import time -import json import os -from samba import smb from samba.samdb import SamDB import samba.tests.auth_log_base from samba.credentials import ( @@ -42,6 +36,7 @@ from samba.tests import delete_force from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_PASSWD_NOTREQD from samba.dcerpc.misc import SEC_CHAN_WKSTA + class AuthLogTestsSamLogon(samba.tests.auth_log_base.AuthLogTestBase): def setUp(self): @@ -63,9 +58,8 @@ class AuthLogTestsSamLogon(samba.tests.auth_log_base.AuthLogTestBase): self.samlogon_dn = ("cn=%s,cn=users,%s" % (self.netbios_name, self.base_dn)) - def tearDown(self): - super(AuthLogTestsSamLogon , self).tearDown() + super(AuthLogTestsSamLogon, self).tearDown() delete_force(self.ldb, self.samlogon_dn) def _test_samlogon(self, binding, creds, checkFunction): @@ -119,7 +113,6 @@ class AuthLogTestsSamLogon(samba.tests.auth_log_base.AuthLogTestBase): eol.AvId = ntlmssp.MsvAvEOL target_info.pair = [domainname, computername, eol] - target_info_blob = ndr_pack(target_info) response = creds.get_ntlm_response(flags=CLI_CRED_NTLMv2_AUTH, @@ -144,15 +137,14 @@ class AuthLogTestsSamLogon(samba.tests.auth_log_base.AuthLogTestBase): validation_level = samba.dcerpc.netlogon.NetlogonValidationSamInfo4 - - result = netlogon_conn.netr_LogonSamLogonEx(os.environ["SERVER"], - machine_creds.get_workstation(), - logon_level, logon, - validation_level, netr_flags) + result = netlogon_conn.netr_LogonSamLogonEx( + os.environ["SERVER"], + machine_creds.get_workstation(), + logon_level, logon, + validation_level, netr_flags) (validation, authoritative, netr_flags_out) = result - messages = self.waitForMessages(isLastExpectedMessage, netlogon_conn) checkFunction(messages) @@ -173,7 +165,6 @@ class AuthLogTestsSamLogon(samba.tests.auth_log_base.AuthLogTestBase): self.assertEquals("NONE", msg["Authorization"]["transportProtection"]) self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"])) - def test_ncalrpc_samlogon(self): creds = self.insta_creds(template=self.get_credentials(), -- 2.11.4.GIT