tests/auth_log: Rename ‘self’ parameter to ‘cls’
[Samba.git] / python / samba / tests / auth_log_base.py
blobbb78e2fe7d019edb5e3b0d3b5721cb60aa9e7eda
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for the Auth and AuthZ logging.
19 """
21 import samba.tests
22 from samba.messaging import Messaging
23 from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
24 from samba.param import LoadParm
25 import time
26 import json
27 import os
28 import re
31 class AuthLogTestBase(samba.tests.TestCase):
33 @classmethod
34 def setUpClass(cls):
35 super().setUpClass()
37 # connect to the server's messaging bus (we need to explicitly load a
38 # different smb.conf here, because in all other respects this test
39 # wants to act as a separate remote client)
40 server_conf = os.getenv('SERVERCONFFILE')
41 if server_conf:
42 lp_ctx = LoadParm(filename_for_non_global_lp=server_conf)
43 else:
44 samba.tests.env_loadparm()
45 cls.msg_ctx = Messaging((1,), lp_ctx=lp_ctx)
46 cls.msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
48 # Now switch back to using the client-side smb.conf. The tests will
49 # use the first interface in the client.conf (we need to strip off
50 # the subnet mask portion)
51 lp_ctx = samba.tests.env_loadparm()
52 client_ip_and_mask = lp_ctx.get('interfaces')[0]
53 client_ip = client_ip_and_mask.split('/')[0]
55 # the messaging ctx is the server's view of the world, so our own
56 # client IP will be the remoteAddress when connections are logged
57 cls.remoteAddress = client_ip
59 def messageHandler(context, msgType, src, message):
60 # This does not look like sub unit output and it
61 # makes these tests much easier to debug.
62 print(message)
63 jsonMsg = json.loads(message)
64 context["messages"].append(jsonMsg)
66 cls.context = {"messages": []}
67 cls.msg_handler_and_context = (messageHandler, cls.context)
68 cls.msg_ctx.register(cls.msg_handler_and_context,
69 msg_type=MSG_AUTH_LOG)
71 cls.server = os.environ["SERVER"]
72 cls.connection = None
74 @classmethod
75 def tearDownClass(cls):
76 if cls.msg_handler_and_context:
77 cls.msg_ctx.deregister(cls.msg_handler_and_context,
78 msg_type=MSG_AUTH_LOG)
79 cls.msg_ctx.irpc_remove_name(AUTH_EVENT_NAME)
81 def setUp(self):
82 super(AuthLogTestBase, self).setUp()
83 self.discardMessages()
85 def waitForMessages(self, isLastExpectedMessage, connection=None):
86 """Wait for all the expected messages to arrive
87 The connection is passed through to keep the connection alive
88 until all the logging messages have been received.
89 """
91 def completed(messages):
92 for message in messages:
93 if isRemote(message) and isLastExpectedMessage(message):
94 return True
95 return False
97 def isRemote(message):
98 if self.remoteAddress is None:
99 return True
101 supported_types = {
102 "Authentication",
103 "Authorization",
105 message_type = message["type"]
106 if message_type in supported_types:
107 remote = message[message_type]["remoteAddress"]
108 else:
109 return False
111 try:
112 addr = remote.split(":")
113 return addr[1] == self.remoteAddress
114 except IndexError:
115 return False
117 self.connection = connection
119 start_time = time.time()
120 while not completed(self.context["messages"]):
121 self.msg_ctx.loop_once(0.1)
122 if time.time() - start_time > 1:
123 self.connection = None
124 return []
126 self.connection = None
127 return list(filter(isRemote, self.context["messages"]))
129 # Discard any previously queued messages.
130 @classmethod
131 def discardMessages(cls):
132 cls.msg_ctx.loop_once(0.001)
133 while len(cls.context["messages"]):
134 cls.context["messages"] = []
135 cls.msg_ctx.loop_once(0.001)
137 # Remove any NETLOGON authentication messages
138 # NETLOGON is only performed once per session, so to avoid ordering
139 # dependencies within the tests it's best to strip out NETLOGON messages.
141 def remove_netlogon_messages(self, messages):
142 def is_not_netlogon(msg):
143 if "Authentication" not in msg:
144 return True
145 sd = msg["Authentication"]["serviceDescription"]
146 return sd != "NETLOGON"
148 return list(filter(is_not_netlogon, messages))
150 GUID_RE = re.compile(
151 "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")
154 # Is the supplied GUID string correctly formatted
156 def is_guid(self, guid):
157 return self.GUID_RE.fullmatch(guid)