s3: smbd: Move check_fsp_open() and check_fsp() to smb1_reply.c
[Samba.git] / python / samba / tests / smb-notify.py
blobc06eabe30c70381734377237c25dd917cc78a43a
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation. Tests for smb notify
3 # Copyright (C) Björn Baumbach <bb@samba.org> 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 import sys
19 import os
21 sys.path.insert(0, "bin/python")
22 os.environ["PYTHONUNBUFFERED"] = "1"
24 import samba
25 import random
26 from samba.tests import TestCase
27 from samba import NTSTATUSError
28 from samba import credentials
29 from samba.ntstatus import NT_STATUS_NOTIFY_CLEANUP
30 from samba.samba3 import libsmb_samba_internal as libsmb
31 from samba.samba3 import param as s3param
32 from samba.dcerpc import security
34 from samba import ntacls
36 test_dir = os.path.join('notify_test_%d' % random.randint(0, 0xFFFF))
38 class SMBNotifyTests(TestCase):
39 def setUp(self):
40 super(SMBNotifyTests, self).setUp()
41 self.server = samba.tests.env_get_var_value("SERVER")
43 # create an SMB connection to the server
44 self.lp = s3param.get_context()
45 self.lp.load(samba.tests.env_get_var_value("SMB_CONF_PATH"))
47 self.share = samba.tests.env_get_var_value("NOTIFY_SHARE")
49 creds = credentials.Credentials()
50 creds.guess(self.lp)
51 creds.set_username(samba.tests.env_get_var_value("USERNAME"))
52 creds.set_password(samba.tests.env_get_var_value("PASSWORD"))
54 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING', allow_missing=True)
55 if strict_checking is None:
56 strict_checking = '1'
57 self.strict_checking = bool(int(strict_checking))
59 self.smb_conn = libsmb.Conn(self.server, self.share, self.lp, creds)
60 self.smb_conn_unpriv = None
62 try:
63 self.smb_conn.deltree(test_dir)
64 except:
65 pass
66 self.smb_conn.mkdir(test_dir)
68 def connect_unpriv(self):
69 creds_unpriv = credentials.Credentials()
70 creds_unpriv.guess(self.lp)
71 creds_unpriv.set_username(samba.tests.env_get_var_value("USERNAME_UNPRIV"))
72 creds_unpriv.set_password(samba.tests.env_get_var_value("PASSWORD_UNPRIV"))
74 self.smb_conn_unpriv = libsmb.Conn(self.server, self.share, self.lp, creds_unpriv)
76 def tearDown(self):
77 super(SMBNotifyTests, self).tearDown()
78 try:
79 self.smb_conn.deltree(test_dir)
80 except:
81 pass
83 def make_path(self, dirpath, filename):
84 return os.path.join(dirpath, filename).replace('/', '\\')
86 def test_notify(self):
87 # setup notification request on the share root
88 root_fnum = self.smb_conn.create(Name="", ShareAccess=1)
89 root_notify = self.smb_conn.notify(fnum=root_fnum,
90 buffer_size=0xffff,
91 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
92 recursive=True)
93 # setup notification request on the test_dir
94 test_dir_fnum = self.smb_conn.create(Name=test_dir, ShareAccess=1)
95 test_dir_notify = self.smb_conn.notify(fnum=test_dir_fnum,
96 buffer_size=0xffff,
97 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
98 recursive=True)
100 # make sure we didn't receive any changes yet.
101 self.smb_conn.echo()
102 changes = root_notify.get_changes(wait=False)
103 self.assertIsNone(changes)
104 changes = test_dir_notify.get_changes(wait=False)
105 self.assertIsNone(changes)
107 # create a test directory
108 dir_name = "dir"
109 dir_path = self.make_path(test_dir, dir_name)
110 self.smb_conn.mkdir(dir_path)
112 # check for 'added' notifications
113 changes = root_notify.get_changes(wait=True)
114 self.assertIsNotNone(changes)
115 self.assertEqual(changes[0]['name'], dir_path)
116 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_ADDED)
117 self.assertEqual(len(changes), 1)
118 changes = test_dir_notify.get_changes(wait=True)
119 self.assertIsNotNone(changes)
120 self.assertEqual(changes[0]['name'], dir_name)
121 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_ADDED)
122 self.assertEqual(len(changes), 1)
124 # readd notification requests
125 root_notify = self.smb_conn.notify(fnum=root_fnum,
126 buffer_size=0xffff,
127 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
128 recursive=True)
129 test_dir_notify = self.smb_conn.notify(fnum=test_dir_fnum,
130 buffer_size=0xffff,
131 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
132 recursive=True)
134 # make sure we didn't receive any changes yet.
135 self.smb_conn.echo()
136 changes = root_notify.get_changes(wait=False)
137 self.assertIsNone(changes)
138 changes = test_dir_notify.get_changes(wait=False)
139 self.assertIsNone(changes)
141 # create subdir and trigger notifications
142 sub_name = "subdir"
143 sub_path_rel = self.make_path(dir_name, sub_name)
144 sub_path_full = self.make_path(dir_path, sub_name)
145 self.smb_conn.mkdir(sub_path_full)
147 # check for 'added' notifications
148 changes = root_notify.get_changes(wait=True)
149 self.assertIsNotNone(changes)
150 self.assertEqual(changes[0]['name'], sub_path_full)
151 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_ADDED)
152 self.assertEqual(len(changes), 1)
153 changes = test_dir_notify.get_changes(wait=True)
154 self.assertIsNotNone(changes)
155 self.assertEqual(changes[0]['name'], sub_path_rel)
156 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_ADDED)
157 self.assertEqual(len(changes), 1)
159 # readd notification requests
160 root_notify = self.smb_conn.notify(fnum=root_fnum,
161 buffer_size=0xffff,
162 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
163 recursive=True)
164 test_dir_notify = self.smb_conn.notify(fnum=test_dir_fnum,
165 buffer_size=0xffff,
166 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
167 recursive=True)
169 # make sure we didn't receive any changes yet.
170 self.smb_conn.echo()
171 changes = root_notify.get_changes(wait=False)
172 self.assertIsNone(changes)
173 changes = test_dir_notify.get_changes(wait=False)
174 self.assertIsNone(changes)
176 # remove test dir and trigger notifications
177 self.smb_conn.rmdir(sub_path_full)
179 # check for 'removed' notifications
180 changes = root_notify.get_changes(wait=True)
181 self.assertIsNotNone(changes)
182 self.assertEqual(changes[0]['name'], sub_path_full)
183 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_REMOVED)
184 self.assertEqual(len(changes), 1)
185 changes = test_dir_notify.get_changes(wait=True)
186 self.assertIsNotNone(changes)
187 self.assertEqual(changes[0]['name'], sub_path_rel)
188 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_REMOVED)
189 self.assertEqual(len(changes), 1)
191 # readd notification requests
192 root_notify = self.smb_conn.notify(fnum=root_fnum,
193 buffer_size=0xffff,
194 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
195 recursive=True)
196 test_dir_notify = self.smb_conn.notify(fnum=test_dir_fnum,
197 buffer_size=0xffff,
198 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
199 recursive=True)
201 # make sure we didn't receive any changes yet.
202 self.smb_conn.echo()
203 changes = root_notify.get_changes(wait=False)
204 self.assertIsNone(changes)
205 changes = test_dir_notify.get_changes(wait=False)
206 self.assertIsNone(changes)
208 # closing the handle on test_dir will trigger
209 # a NOTIFY_CLEANUP on test_dir_notify and
210 # it also seems to update something on test_dir it self
211 # and post a MODIFIED on root_notify
213 # TODO: find out why windows generates ACTION_MODIFIED
214 # and why Samba doesn't
215 self.smb_conn.close(test_dir_fnum)
216 try:
217 changes = test_dir_notify.get_changes(wait=True)
218 self.fail()
219 except samba.NTSTATUSError as err:
220 self.assertEqual(err.args[0], NT_STATUS_NOTIFY_CLEANUP)
221 self.smb_conn.echo()
222 changes = root_notify.get_changes(wait=False)
223 if self.strict_checking:
224 self.assertIsNotNone(changes)
225 if changes is not None:
226 self.assertEqual(changes[0]['name'], test_dir)
227 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_MODIFIED)
228 self.assertEqual(len(changes), 1)
230 # readd notification request
231 root_notify = self.smb_conn.notify(fnum=root_fnum,
232 buffer_size=0xffff,
233 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
234 recursive=True)
236 # make sure we didn't receive any changes yet.
237 self.smb_conn.echo()
238 changes = root_notify.get_changes(wait=False)
239 self.assertIsNone(changes)
241 # remove test_dir
242 self.smb_conn.rmdir(dir_path)
244 # check for 'removed' notifications
245 changes = root_notify.get_changes(wait=True)
246 self.assertIsNotNone(changes)
247 self.assertEqual(changes[0]['name'], dir_path)
248 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_REMOVED)
249 self.assertEqual(len(changes), 1)
251 # readd notification request
252 root_notify = self.smb_conn.notify(fnum=root_fnum,
253 buffer_size=0xffff,
254 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
255 recursive=True)
256 # closing the handle on test_dir will trigger
257 # a NOTIFY_CLEANUP on root_notify
258 self.smb_conn.close(root_fnum)
259 try:
260 changes = root_notify.get_changes(wait=True)
261 self.fail()
262 except samba.NTSTATUSError as err:
263 self.assertEqual(err.args[0], NT_STATUS_NOTIFY_CLEANUP)
266 def _test_notify_privileged_path(self,
267 monitor_path=None,
268 rel_prefix=None):
269 self.connect_unpriv()
271 domain_sid = security.dom_sid() # we just use S-0-0
272 smb_helper = ntacls.SMBHelper(self.smb_conn, domain_sid)
274 private_name = "private"
275 private_rel = self.make_path(rel_prefix, private_name)
276 private_path = self.make_path(test_dir, private_name)
277 # create a private test directory
278 self.smb_conn.mkdir(private_path)
280 # Get the security descriptor and replace it
281 # with a one that only grants access to SYSTEM and the
282 # owner.
283 private_path_sd_old = smb_helper.get_acl(private_path)
284 private_path_sd_new = security.descriptor()
285 private_path_sd_new.type = private_path_sd_old.type
286 private_path_sd_new.revision = private_path_sd_old.revision
287 private_path_sd_new = security.descriptor.from_sddl("G:BAD:(A;;0x%x;;;%s)(A;;0x%x;;;%s)" % (
288 security.SEC_RIGHTS_DIR_ALL,
289 security.SID_NT_SYSTEM,
290 security.SEC_RIGHTS_DIR_ALL,
291 str(private_path_sd_old.owner_sid)),
292 domain_sid)
293 private_path_sd_new.type |= security.SEC_DESC_SELF_RELATIVE
294 private_path_sd_new.type |= security.SEC_DESC_DACL_PROTECTED
295 set_secinfo = security.SECINFO_GROUP | security.SECINFO_DACL | security.SECINFO_PROTECTED_DACL
296 smb_helper.set_acl(private_path, private_path_sd_new, sinfo=set_secinfo)
298 # setup notification request as privileged user
299 monitor_priv_fnum = self.smb_conn.create(Name=monitor_path, ShareAccess=1)
300 notify_priv = self.smb_conn.notify(fnum=monitor_priv_fnum,
301 buffer_size=0xffff,
302 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
303 recursive=True)
305 # setup notification request as unprivileged user
306 monitor_unpriv_fnum = self.smb_conn_unpriv.create(Name=monitor_path, ShareAccess=1)
307 notify_unpriv = self.smb_conn_unpriv.notify(fnum=monitor_unpriv_fnum,
308 buffer_size=0xffff,
309 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
310 recursive=True)
312 # make sure we didn't receive any changes yet.
313 self.smb_conn.echo()
314 changes = notify_priv.get_changes(wait=False)
315 self.assertIsNone(changes)
316 self.smb_conn_unpriv.echo()
317 changes = notify_unpriv.get_changes(wait=False)
318 self.assertIsNone(changes)
320 # trigger notification in the private dir
321 new_name = 'test-new'
322 new_rel = self.make_path(private_rel, new_name)
323 new_path = self.make_path(private_path, new_name)
324 self.smb_conn.mkdir(new_path)
326 # check that only the privileged user received the changes
327 changes = notify_priv.get_changes(wait=True)
328 self.assertIsNotNone(changes)
329 self.assertEqual(changes[0]['name'], new_rel)
330 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_ADDED)
331 self.assertEqual(len(changes), 1)
332 notify_priv = self.smb_conn.notify(fnum=monitor_priv_fnum,
333 buffer_size=0xffff,
334 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
335 recursive=True)
337 # check that the unprivileged user does not receives the changes
338 self.smb_conn_unpriv.echo()
339 changes = notify_unpriv.get_changes(wait=False)
340 self.assertIsNone(changes)
341 # and there's no additional change for the privileged user
342 self.smb_conn.echo()
343 changes = notify_priv.get_changes(wait=False)
344 self.assertIsNone(changes)
346 # trigger notification in the private dir
347 self.smb_conn.rmdir(new_path)
349 # check that only the privileged user received the changes
350 changes = notify_priv.get_changes(wait=True)
351 self.assertIsNotNone(changes)
352 self.assertEqual(changes[0]['name'], new_rel)
353 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_REMOVED)
354 self.assertEqual(len(changes), 1)
355 notify_priv = self.smb_conn.notify(fnum=monitor_priv_fnum,
356 buffer_size=0xffff,
357 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
358 recursive=True)
360 # check that the unprivileged user does not receives the changes
361 self.smb_conn_unpriv.echo()
362 changes = notify_unpriv.get_changes(wait=False)
363 self.assertIsNone(changes)
364 # and there's no additional change for the privileged user
365 self.smb_conn.echo()
366 changes = notify_priv.get_changes(wait=False)
367 self.assertIsNone(changes)
369 # trigger notification for both
370 self.smb_conn.rmdir(private_path)
372 # check that both get thte notification
373 changes = notify_unpriv.get_changes(wait=True)
374 self.assertIsNotNone(changes)
375 self.assertEqual(changes[0]['name'], private_rel)
376 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_REMOVED)
377 self.assertEqual(len(changes), 1)
378 notify_unpriv = self.smb_conn_unpriv.notify(fnum=monitor_unpriv_fnum,
379 buffer_size=0xffff,
380 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
381 recursive=True)
382 changes = notify_priv.get_changes(wait=True)
383 self.assertIsNotNone(changes)
384 self.assertEqual(changes[0]['name'], private_rel)
385 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_REMOVED)
386 self.assertEqual(len(changes), 1)
387 notify_priv = self.smb_conn.notify(fnum=monitor_priv_fnum,
388 buffer_size=0xffff,
389 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
390 recursive=True)
392 # check that the unprivileged user does not receives the changes
393 self.smb_conn_unpriv.echo()
394 changes = notify_unpriv.get_changes(wait=False)
395 self.assertIsNone(changes)
396 # and there's no additional change for the privileged user
397 self.smb_conn.echo()
398 changes = notify_priv.get_changes(wait=False)
399 self.assertIsNone(changes)
401 # closing the handle on will trigger a NOTIFY_CLEANUP
402 self.smb_conn_unpriv.close(monitor_unpriv_fnum)
403 try:
404 changes = notify_unpriv.get_changes(wait=True)
405 self.fail()
406 except samba.NTSTATUSError as err:
407 self.assertEqual(err.args[0], NT_STATUS_NOTIFY_CLEANUP)
409 # there's no additional change for the privileged user
410 self.smb_conn.echo()
411 changes = notify_priv.get_changes(wait=False)
412 self.assertIsNone(changes)
414 # closing the handle on will trigger a NOTIFY_CLEANUP
415 self.smb_conn.close(monitor_priv_fnum)
416 try:
417 changes = notify_priv.get_changes(wait=True)
418 self.fail()
419 except samba.NTSTATUSError as err:
420 self.assertEqual(err.args[0], NT_STATUS_NOTIFY_CLEANUP)
422 def test_notify_privileged_test(self):
423 return self._test_notify_privileged_path(monitor_path=test_dir, rel_prefix="")
425 def test_notify_privileged_root(self):
426 return self._test_notify_privileged_path(monitor_path="", rel_prefix=test_dir)
428 if __name__ == "__main__":
429 import unittest
430 unittest.main()