s3:ntlm_auth: make logs more consistent with length check
[Samba.git] / python / samba / tests / auth_log_base.py
blob5c5c878c8239151717ba68d7e67a5cd953472d38
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for the Auth and AuthZ logging.
19 """
21 import samba.tests
22 from samba.messaging import Messaging
23 from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
24 from samba.param import LoadParm
25 from samba import string_is_guid
26 import time
27 import json
28 import os
31 def default_msg_filter(msg):
32 # When our authentication logging tests were written, these were the only
33 # supported message types. The tests were built on the assumption that no
34 # new types would be added, and violating this assumption will result in
35 # many tests failing as they receive messages that they weren’t
36 # expecting. To allow these tests to continue to pass, this default filter
37 # makes sure that only messages for which the tests are prepared pass
38 # though.
39 default_supported_types = {
40 "Authentication",
41 "Authorization",
44 return msg['type'] in default_supported_types
47 class NoMessageException(Exception):
48 pass
51 class AuthLogTestBase(samba.tests.TestCase):
53 @classmethod
54 def setUpClass(cls):
55 super().setUpClass()
57 # connect to the server's messaging bus (we need to explicitly load a
58 # different smb.conf here, because in all other respects this test
59 # wants to act as a separate remote client)
60 server_conf = os.getenv('SERVERCONFFILE')
61 if server_conf:
62 lp_ctx = LoadParm(filename_for_non_global_lp=server_conf)
63 else:
64 lp_ctx = samba.tests.env_loadparm()
65 cls.msg_ctx = Messaging((1,), lp_ctx=lp_ctx)
66 cls.msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
68 # Now switch back to using the client-side smb.conf. The tests will
69 # use the first interface in the client.conf (we need to strip off
70 # the subnet mask portion)
71 lp_ctx = samba.tests.env_loadparm()
72 client_ip_and_mask = lp_ctx.get('interfaces')[0]
73 client_ip = client_ip_and_mask.split('/')[0]
75 # the messaging ctx is the server's view of the world, so our own
76 # client IP will be the remoteAddress when connections are logged
77 cls.remoteAddress = client_ip
79 def messageHandler(context, msgType, src, message):
80 # This does not look like sub unit output and it
81 # makes these tests much easier to debug.
82 print(message)
83 jsonMsg = json.loads(message)
84 context["messages"].append(jsonMsg)
86 cls.context = {"messages": []}
87 cls.msg_handler_and_context = (messageHandler, cls.context)
88 cls.msg_ctx.register(cls.msg_handler_and_context,
89 msg_type=MSG_AUTH_LOG)
91 cls.server = os.environ["SERVER"]
92 cls.connection = None
94 @classmethod
95 def tearDownClass(cls):
96 cls.msg_ctx.deregister(cls.msg_handler_and_context,
97 msg_type=MSG_AUTH_LOG)
98 cls.msg_ctx.irpc_remove_name(AUTH_EVENT_NAME)
100 super().tearDownClass()
102 def setUp(self):
103 super().setUp()
104 type(self).discardMessages()
106 def isRemote(self, message):
107 if self.remoteAddress is None:
108 return True
110 supported_types = {
111 "Authentication",
112 "Authorization",
113 "KDC Authorization",
115 message_type = message["type"]
116 if message_type in supported_types:
117 remote = message[message_type]["remoteAddress"]
118 else:
119 return False
121 try:
122 addr = remote.split(":")
123 return addr[1] == self.remoteAddress
124 except IndexError:
125 return False
127 def waitForMessages(self, isLastExpectedMessage, connection=None, *,
128 msgFilter=default_msg_filter):
129 """Wait for all the expected messages to arrive
130 The connection is passed through to keep the connection alive
131 until all the logging messages have been received.
133 By default, only Authentication and Authorization messages will be
134 returned, so that old tests continue to pass. To receive all messages,
135 pass msgFilter=None.
139 messages = []
140 while True:
141 try:
142 msg = self.nextMessage(msgFilter=msgFilter)
143 except NoMessageException:
144 return []
146 messages.append(msg)
147 if isLastExpectedMessage(msg):
148 return messages
150 def nextMessage(self, msgFilter=None):
151 """Return the next relevant message, or throw a NoMessageException."""
152 def is_relevant(msg):
153 if not self.isRemote(msg):
154 return False
156 if msgFilter is None:
157 return True
159 return msgFilter(msg)
161 messages = self.context['messages']
163 while True:
164 timeout = 2
165 until = time.time() + timeout
167 while not messages:
168 # Fetch a new message from the messaging bus.
170 current = time.time()
171 if until < current:
172 break
174 self.msg_ctx.loop_once(until - current)
176 if not messages:
177 raise NoMessageException('timed out looking for a message')
179 # Grab the next message from the queue.
180 msg = messages.pop(0)
181 if is_relevant(msg):
182 return msg
184 # Discard any previously queued messages.
185 @classmethod
186 def discardMessages(cls):
187 messages = cls.context["messages"]
189 while True:
190 messages.clear()
192 # tevent presumably has other tasks to run, so we might need two or
193 # three loops before a message comes through.
194 for _ in range(5):
195 cls.msg_ctx.loop_once(0.001)
197 if not messages:
198 # No new messages. We’ve probably got them all.
199 break
201 # Remove any NETLOGON authentication messages
202 # NETLOGON is only performed once per session, so to avoid ordering
203 # dependencies within the tests it's best to strip out NETLOGON messages.
205 def remove_netlogon_messages(self, messages):
206 def is_not_netlogon(msg):
207 if "Authentication" not in msg:
208 return True
209 sd = msg["Authentication"]["serviceDescription"]
210 return sd != "NETLOGON"
212 return list(filter(is_not_netlogon, messages))
214 def is_guid(self, guid):
215 """Is the supplied GUID string correctly formatted"""
216 return string_is_guid(guid)