2 # Unix SMB/CIFS implementation. Tests for smb notify
3 # Copyright (C) Björn Baumbach <bb@samba.org> 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 sys
.path
.insert(0, "bin/python")
22 os
.environ
["PYTHONUNBUFFERED"] = "1"
26 from samba
.tests
import TestCase
27 from samba
import NTSTATUSError
28 from samba
import credentials
29 from samba
.ntstatus
import NT_STATUS_NOTIFY_CLEANUP
30 from samba
.samba3
import libsmb_samba_internal
as libsmb
31 from samba
.samba3
import param
as s3param
32 from samba
.dcerpc
import security
34 from samba
import ntacls
36 test_dir
= os
.path
.join('notify_test_%d' % random
.randint(0, 0xFFFF))
38 class SMBNotifyTests(TestCase
):
40 super(SMBNotifyTests
, self
).setUp()
41 self
.server
= samba
.tests
.env_get_var_value("SERVER")
43 # create an SMB connection to the server
44 self
.lp
= s3param
.get_context()
45 self
.lp
.load(samba
.tests
.env_get_var_value("SMB_CONF_PATH"))
47 self
.share
= samba
.tests
.env_get_var_value("NOTIFY_SHARE")
49 creds
= credentials
.Credentials()
51 creds
.set_username(samba
.tests
.env_get_var_value("USERNAME"))
52 creds
.set_password(samba
.tests
.env_get_var_value("PASSWORD"))
54 strict_checking
= samba
.tests
.env_get_var_value('STRICT_CHECKING', allow_missing
=True)
55 if strict_checking
is None:
57 self
.strict_checking
= bool(int(strict_checking
))
59 self
.smb_conn
= libsmb
.Conn(self
.server
, self
.share
, self
.lp
, creds
)
60 self
.smb_conn_unpriv
= None
63 self
.smb_conn
.deltree(test_dir
)
66 self
.smb_conn
.mkdir(test_dir
)
68 def connect_unpriv(self
):
69 creds_unpriv
= credentials
.Credentials()
70 creds_unpriv
.guess(self
.lp
)
71 creds_unpriv
.set_username(samba
.tests
.env_get_var_value("USERNAME_UNPRIV"))
72 creds_unpriv
.set_password(samba
.tests
.env_get_var_value("PASSWORD_UNPRIV"))
74 self
.smb_conn_unpriv
= libsmb
.Conn(self
.server
, self
.share
, self
.lp
, creds_unpriv
)
77 super(SMBNotifyTests
, self
).tearDown()
79 self
.smb_conn
.deltree(test_dir
)
83 def make_path(self
, dirpath
, filename
):
84 return os
.path
.join(dirpath
, filename
).replace('/', '\\')
86 def test_notify(self
):
87 # setup notification request on the share root
88 root_fnum
= self
.smb_conn
.create(Name
="", ShareAccess
=1)
89 root_notify
= self
.smb_conn
.notify(fnum
=root_fnum
,
91 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
93 # setup notification request on the test_dir
94 test_dir_fnum
= self
.smb_conn
.create(Name
=test_dir
, ShareAccess
=1)
95 test_dir_notify
= self
.smb_conn
.notify(fnum
=test_dir_fnum
,
97 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
100 # make sure we didn't receive any changes yet.
102 changes
= root_notify
.get_changes(wait
=False)
103 self
.assertIsNone(changes
)
104 changes
= test_dir_notify
.get_changes(wait
=False)
105 self
.assertIsNone(changes
)
107 # create a test directory
109 dir_path
= self
.make_path(test_dir
, dir_name
)
110 self
.smb_conn
.mkdir(dir_path
)
112 # check for 'added' notifications
113 changes
= root_notify
.get_changes(wait
=True)
114 self
.assertIsNotNone(changes
)
115 self
.assertEqual(changes
[0]['name'], dir_path
)
116 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_ADDED
)
117 self
.assertEqual(len(changes
), 1)
118 changes
= test_dir_notify
.get_changes(wait
=True)
119 self
.assertIsNotNone(changes
)
120 self
.assertEqual(changes
[0]['name'], dir_name
)
121 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_ADDED
)
122 self
.assertEqual(len(changes
), 1)
124 # readd notification requests
125 root_notify
= self
.smb_conn
.notify(fnum
=root_fnum
,
127 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
129 test_dir_notify
= self
.smb_conn
.notify(fnum
=test_dir_fnum
,
131 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
134 # make sure we didn't receive any changes yet.
136 changes
= root_notify
.get_changes(wait
=False)
137 self
.assertIsNone(changes
)
138 changes
= test_dir_notify
.get_changes(wait
=False)
139 self
.assertIsNone(changes
)
141 # create subdir and trigger notifications
143 sub_path_rel
= self
.make_path(dir_name
, sub_name
)
144 sub_path_full
= self
.make_path(dir_path
, sub_name
)
145 self
.smb_conn
.mkdir(sub_path_full
)
147 # check for 'added' notifications
148 changes
= root_notify
.get_changes(wait
=True)
149 self
.assertIsNotNone(changes
)
150 self
.assertEqual(changes
[0]['name'], sub_path_full
)
151 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_ADDED
)
152 self
.assertEqual(len(changes
), 1)
153 changes
= test_dir_notify
.get_changes(wait
=True)
154 self
.assertIsNotNone(changes
)
155 self
.assertEqual(changes
[0]['name'], sub_path_rel
)
156 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_ADDED
)
157 self
.assertEqual(len(changes
), 1)
159 # readd notification requests
160 root_notify
= self
.smb_conn
.notify(fnum
=root_fnum
,
162 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
164 test_dir_notify
= self
.smb_conn
.notify(fnum
=test_dir_fnum
,
166 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
169 # make sure we didn't receive any changes yet.
171 changes
= root_notify
.get_changes(wait
=False)
172 self
.assertIsNone(changes
)
173 changes
= test_dir_notify
.get_changes(wait
=False)
174 self
.assertIsNone(changes
)
176 # remove test dir and trigger notifications
177 self
.smb_conn
.rmdir(sub_path_full
)
179 # check for 'removed' notifications
180 changes
= root_notify
.get_changes(wait
=True)
181 self
.assertIsNotNone(changes
)
182 self
.assertEqual(changes
[0]['name'], sub_path_full
)
183 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_REMOVED
)
184 self
.assertEqual(len(changes
), 1)
185 changes
= test_dir_notify
.get_changes(wait
=True)
186 self
.assertIsNotNone(changes
)
187 self
.assertEqual(changes
[0]['name'], sub_path_rel
)
188 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_REMOVED
)
189 self
.assertEqual(len(changes
), 1)
191 # readd notification requests
192 root_notify
= self
.smb_conn
.notify(fnum
=root_fnum
,
194 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
196 test_dir_notify
= self
.smb_conn
.notify(fnum
=test_dir_fnum
,
198 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
201 # make sure we didn't receive any changes yet.
203 changes
= root_notify
.get_changes(wait
=False)
204 self
.assertIsNone(changes
)
205 changes
= test_dir_notify
.get_changes(wait
=False)
206 self
.assertIsNone(changes
)
208 # closing the handle on test_dir will trigger
209 # a NOTIFY_CLEANUP on test_dir_notify and
210 # it also seems to update something on test_dir it self
211 # and post a MODIFIED on root_notify
213 # TODO: find out why windows generates ACTION_MODIFIED
214 # and why Samba doesn't
215 self
.smb_conn
.close(test_dir_fnum
)
217 changes
= test_dir_notify
.get_changes(wait
=True)
219 except samba
.NTSTATUSError
as err
:
220 self
.assertEqual(err
.args
[0], NT_STATUS_NOTIFY_CLEANUP
)
222 changes
= root_notify
.get_changes(wait
=False)
223 if self
.strict_checking
:
224 self
.assertIsNotNone(changes
)
225 if changes
is not None:
226 self
.assertEqual(changes
[0]['name'], test_dir
)
227 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_MODIFIED
)
228 self
.assertEqual(len(changes
), 1)
230 # readd notification request
231 root_notify
= self
.smb_conn
.notify(fnum
=root_fnum
,
233 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
236 # make sure we didn't receive any changes yet.
238 changes
= root_notify
.get_changes(wait
=False)
239 self
.assertIsNone(changes
)
242 self
.smb_conn
.rmdir(dir_path
)
244 # check for 'removed' notifications
245 changes
= root_notify
.get_changes(wait
=True)
246 self
.assertIsNotNone(changes
)
247 self
.assertEqual(changes
[0]['name'], dir_path
)
248 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_REMOVED
)
249 self
.assertEqual(len(changes
), 1)
251 # readd notification request
252 root_notify
= self
.smb_conn
.notify(fnum
=root_fnum
,
254 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
256 # closing the handle on test_dir will trigger
257 # a NOTIFY_CLEANUP on root_notify
258 self
.smb_conn
.close(root_fnum
)
260 changes
= root_notify
.get_changes(wait
=True)
262 except samba
.NTSTATUSError
as err
:
263 self
.assertEqual(err
.args
[0], NT_STATUS_NOTIFY_CLEANUP
)
266 def _test_notify_privileged_path(self
,
269 self
.connect_unpriv()
271 domain_sid
= security
.dom_sid() # we just use S-0-0
272 smb_helper
= ntacls
.SMBHelper(self
.smb_conn
, domain_sid
)
274 private_name
= "private"
275 private_rel
= self
.make_path(rel_prefix
, private_name
)
276 private_path
= self
.make_path(test_dir
, private_name
)
277 # create a private test directory
278 self
.smb_conn
.mkdir(private_path
)
280 # Get the security descriptor and replace it
281 # with a one that only grants access to SYSTEM and the
283 private_path_sd_old
= smb_helper
.get_acl(private_path
)
284 private_path_sd_new
= security
.descriptor()
285 private_path_sd_new
.type = private_path_sd_old
.type
286 private_path_sd_new
.revision
= private_path_sd_old
.revision
287 private_path_sd_new
= security
.descriptor
.from_sddl("G:BAD:(A;;0x%x;;;%s)(A;;0x%x;;;%s)" % (
288 security
.SEC_RIGHTS_DIR_ALL
,
289 security
.SID_NT_SYSTEM
,
290 security
.SEC_RIGHTS_DIR_ALL
,
291 str(private_path_sd_old
.owner_sid
)),
293 private_path_sd_new
.type |
= security
.SEC_DESC_SELF_RELATIVE
294 private_path_sd_new
.type |
= security
.SEC_DESC_DACL_PROTECTED
295 set_secinfo
= security
.SECINFO_GROUP | security
.SECINFO_DACL | security
.SECINFO_PROTECTED_DACL
296 smb_helper
.set_acl(private_path
, private_path_sd_new
, sinfo
=set_secinfo
)
298 # setup notification request as privileged user
299 monitor_priv_fnum
= self
.smb_conn
.create(Name
=monitor_path
, ShareAccess
=1)
300 notify_priv
= self
.smb_conn
.notify(fnum
=monitor_priv_fnum
,
302 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
305 # setup notification request as unprivileged user
306 monitor_unpriv_fnum
= self
.smb_conn_unpriv
.create(Name
=monitor_path
, ShareAccess
=1)
307 notify_unpriv
= self
.smb_conn_unpriv
.notify(fnum
=monitor_unpriv_fnum
,
309 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
312 # make sure we didn't receive any changes yet.
314 changes
= notify_priv
.get_changes(wait
=False)
315 self
.assertIsNone(changes
)
316 self
.smb_conn_unpriv
.echo()
317 changes
= notify_unpriv
.get_changes(wait
=False)
318 self
.assertIsNone(changes
)
320 # trigger notification in the private dir
321 new_name
= 'test-new'
322 new_rel
= self
.make_path(private_rel
, new_name
)
323 new_path
= self
.make_path(private_path
, new_name
)
324 self
.smb_conn
.mkdir(new_path
)
326 # check that only the privileged user received the changes
327 changes
= notify_priv
.get_changes(wait
=True)
328 self
.assertIsNotNone(changes
)
329 self
.assertEqual(changes
[0]['name'], new_rel
)
330 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_ADDED
)
331 self
.assertEqual(len(changes
), 1)
332 notify_priv
= self
.smb_conn
.notify(fnum
=monitor_priv_fnum
,
334 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
337 # check that the unprivileged user does not receives the changes
338 self
.smb_conn_unpriv
.echo()
339 changes
= notify_unpriv
.get_changes(wait
=False)
340 self
.assertIsNone(changes
)
341 # and there's no additional change for the privileged user
343 changes
= notify_priv
.get_changes(wait
=False)
344 self
.assertIsNone(changes
)
346 # trigger notification in the private dir
347 self
.smb_conn
.rmdir(new_path
)
349 # check that only the privileged user received the changes
350 changes
= notify_priv
.get_changes(wait
=True)
351 self
.assertIsNotNone(changes
)
352 self
.assertEqual(changes
[0]['name'], new_rel
)
353 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_REMOVED
)
354 self
.assertEqual(len(changes
), 1)
355 notify_priv
= self
.smb_conn
.notify(fnum
=monitor_priv_fnum
,
357 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
360 # check that the unprivileged user does not receives the changes
361 self
.smb_conn_unpriv
.echo()
362 changes
= notify_unpriv
.get_changes(wait
=False)
363 self
.assertIsNone(changes
)
364 # and there's no additional change for the privileged user
366 changes
= notify_priv
.get_changes(wait
=False)
367 self
.assertIsNone(changes
)
369 # trigger notification for both
370 self
.smb_conn
.rmdir(private_path
)
372 # check that both get thte notification
373 changes
= notify_unpriv
.get_changes(wait
=True)
374 self
.assertIsNotNone(changes
)
375 self
.assertEqual(changes
[0]['name'], private_rel
)
376 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_REMOVED
)
377 self
.assertEqual(len(changes
), 1)
378 notify_unpriv
= self
.smb_conn_unpriv
.notify(fnum
=monitor_unpriv_fnum
,
380 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
382 changes
= notify_priv
.get_changes(wait
=True)
383 self
.assertIsNotNone(changes
)
384 self
.assertEqual(changes
[0]['name'], private_rel
)
385 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_REMOVED
)
386 self
.assertEqual(len(changes
), 1)
387 notify_priv
= self
.smb_conn
.notify(fnum
=monitor_priv_fnum
,
389 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
392 # check that the unprivileged user does not receives the changes
393 self
.smb_conn_unpriv
.echo()
394 changes
= notify_unpriv
.get_changes(wait
=False)
395 self
.assertIsNone(changes
)
396 # and there's no additional change for the privileged user
398 changes
= notify_priv
.get_changes(wait
=False)
399 self
.assertIsNone(changes
)
401 # closing the handle on will trigger a NOTIFY_CLEANUP
402 self
.smb_conn_unpriv
.close(monitor_unpriv_fnum
)
404 changes
= notify_unpriv
.get_changes(wait
=True)
406 except samba
.NTSTATUSError
as err
:
407 self
.assertEqual(err
.args
[0], NT_STATUS_NOTIFY_CLEANUP
)
409 # there's no additional change for the privileged user
411 changes
= notify_priv
.get_changes(wait
=False)
412 self
.assertIsNone(changes
)
414 # closing the handle on will trigger a NOTIFY_CLEANUP
415 self
.smb_conn
.close(monitor_priv_fnum
)
417 changes
= notify_priv
.get_changes(wait
=True)
419 except samba
.NTSTATUSError
as err
:
420 self
.assertEqual(err
.args
[0], NT_STATUS_NOTIFY_CLEANUP
)
422 def test_notify_privileged_test(self
):
423 return self
._test
_notify
_privileged
_path
(monitor_path
=test_dir
, rel_prefix
="")
425 def test_notify_privileged_root(self
):
426 return self
._test
_notify
_privileged
_path
(monitor_path
="", rel_prefix
=test_dir
)
428 if __name__
== "__main__":