smbd: improve reinit_after_fork error handling
[Samba.git] / python / samba / tests / smb3unix.py
blob66aa98b5aa91d933cf2b6df490f5bac21ba25258
1 # Unix SMB/CIFS implementation.
2 # Copyright Volker Lendecke <vl@samba.org> 2022
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from samba.samba3 import libsmb_samba_internal as libsmb
19 from samba import NTSTATUSError,ntstatus
20 import samba.tests.libsmb
21 from samba.dcerpc import security
22 from samba.common import get_string
23 from samba.dcerpc import smb3posix
24 from samba.ndr import ndr_unpack
25 from samba.dcerpc.security import dom_sid
26 from samba import reparse_symlink
27 import os
28 import subprocess
30 def posix_context(mode):
31 return (libsmb.SMB2_CREATE_TAG_POSIX, mode.to_bytes(4, 'little'))
33 class Smb3UnixTests(samba.tests.libsmb.LibsmbTests):
35 def setUp(self):
36 super().setUp()
38 self.samsid = os.environ["SAMSID"]
39 prefix_abs = os.environ["PREFIX_ABS"]
40 p = subprocess.run(['stat', '-f', '-c', '%T', prefix_abs], capture_output=True, text=True)
41 self.fstype = p.stdout.strip().lower()
43 def connections(self, share1=None, posix1=False, share2=None, posix2=True):
44 if not share1:
45 share1 = samba.tests.env_get_var_value(
46 "SHARE1", allow_missing=True)
47 if not share1:
48 share1 = "tmp"
50 if not share2:
51 share2 = samba.tests.env_get_var_value(
52 "SHARE2", allow_missing=True)
53 if not share2:
54 share2 = "tmp"
56 conn1 = libsmb.Conn(
57 self.server_ip,
58 share1,
59 self.lp,
60 self.creds,
61 posix=posix1)
63 conn2 = libsmb.Conn(
64 self.server_ip,
65 share2,
66 self.lp,
67 self.creds,
68 posix=posix2)
70 return (conn1, conn2)
72 def test_negotiate_context_posix(self):
73 c = libsmb.Conn(
74 self.server_ip,
75 "tmp",
76 self.lp,
77 self.creds,
78 posix=True)
79 self.assertTrue(c.have_posix())
81 def test_negotiate_context_posix_invalid_length(self):
82 with self.assertRaises(NTSTATUSError) as cm:
83 c = libsmb.Conn(
84 self.server_ip,
85 "tmp",
86 self.lp,
87 self.creds,
88 negotiate_contexts=[(0x100, b'01234')])
90 e = cm.exception
91 self.assertEqual(e.args[0], ntstatus.NT_STATUS_INVALID_PARAMETER)
93 def test_negotiate_context_posix_invalid_blob(self):
94 c = libsmb.Conn(
95 self.server_ip,
96 "tmp",
97 self.lp,
98 self.creds,
99 negotiate_contexts=[(0x100, b'0123456789012345')])
100 self.assertFalse(c.have_posix())
102 def test_posix_create_context(self):
103 c = libsmb.Conn(
104 self.server_ip,
105 "tmp",
106 self.lp,
107 self.creds,
108 posix=True)
109 self.assertTrue(c.have_posix())
111 cc_in=[(libsmb.SMB2_CREATE_TAG_POSIX,b'0000')]
112 fnum,_,cc_out = c.create_ex("",CreateContexts=cc_in)
113 self.assertEqual(cc_in[0][0],cc_out[0][0])
115 c.close(fnum)
117 def test_posix_create_invalid_context_length(self):
118 c = libsmb.Conn(
119 self.server_ip,
120 "tmp",
121 self.lp,
122 self.creds,
123 posix=True)
124 self.assertTrue(c.have_posix())
126 cc_in=[(libsmb.SMB2_CREATE_TAG_POSIX,b'00000')]
128 with self.assertRaises(NTSTATUSError) as cm:
129 fnum,_,cc_out = c.create_ex("",CreateContexts=cc_in)
131 e = cm.exception
132 self.assertEqual(e.args[0], ntstatus.NT_STATUS_INVALID_PARAMETER)
134 def delete_test_file(self, c, fname, mode=0):
135 f,_,cc_out = c.create_ex(fname,
136 DesiredAccess=security.SEC_STD_ALL,
137 CreateDisposition=libsmb.FILE_OPEN,
138 CreateContexts=[posix_context(mode)])
139 c.delete_on_close(f, True)
140 c.close(f)
142 def test_posix_query_dir(self):
143 test_files = []
144 try:
145 c = libsmb.Conn(
146 self.server_ip,
147 "smb3_posix_share",
148 self.lp,
149 self.creds,
150 posix=True)
151 self.assertTrue(c.have_posix())
153 for i in range(10):
154 fname = '\\test%d' % i
155 f,_,cc_out = c.create_ex(fname,
156 CreateDisposition=libsmb.FILE_OPEN_IF,
157 CreateContexts=[posix_context(0o744)])
158 c.close(f)
159 test_files.append(fname)
161 expected_count = len(c.list(''))
162 self.assertNotEqual(expected_count, 0, 'No files were found')
164 actual_count = len(c.list('',
165 info_level=libsmb.SMB2_FIND_POSIX_INFORMATION))
166 self.assertEqual(actual_count-2, expected_count,
167 'SMB2_FIND_POSIX_INFORMATION failed to list contents')
169 finally:
170 if len(test_files) > 0:
171 for fname in test_files:
172 self.delete_test_file(c, fname)
174 def test_posix_reserved_char(self):
175 c = libsmb.Conn(
176 self.server_ip,
177 "smb3_posix_share",
178 self.lp,
179 self.creds,
180 posix=True)
181 self.assertTrue(c.have_posix())
183 test_files = ['a ', 'a ', '. ', '. ', 'a.',
184 '.a', ' \\ ', '>', '<' '?']
186 for fname in test_files:
187 try:
188 f,_,cc_out = c.create_ex('\\%s' % fname,
189 CreateDisposition=libsmb.FILE_CREATE,
190 DesiredAccess=security.SEC_STD_DELETE,
191 CreateContexts=[posix_context(0o744)])
192 except NTSTATUSError as e:
193 self.fail(e)
194 c.delete_on_close(f, True)
195 c.close(f)
197 def test_posix_delete_on_close(self):
198 c = libsmb.Conn(
199 self.server_ip,
200 "smb3_posix_share",
201 self.lp,
202 self.creds,
203 posix=True)
204 self.assertTrue(c.have_posix())
206 f,_,cc_out = c.create_ex('\\TESTING999',
207 DesiredAccess=security.SEC_STD_ALL,
208 CreateDisposition=libsmb.FILE_CREATE,
209 CreateContexts=[posix_context(0o744)])
210 c.delete_on_close(f, True)
211 c.close(f)
213 def test_posix_case_sensitive(self):
214 try:
215 c = libsmb.Conn(
216 self.server_ip,
217 "smb3_posix_share",
218 self.lp,
219 self.creds,
220 posix=True)
221 self.assertTrue(c.have_posix())
223 f,_,cc_out = c.create_ex('\\xx',
224 DesiredAccess=security.SEC_STD_ALL,
225 CreateDisposition=libsmb.FILE_CREATE,
226 CreateContexts=[posix_context(0o644)])
227 c.close(f)
229 fail = False
230 try:
231 f,_,cc_out = c.create_ex('\\XX',
232 DesiredAccess=security.SEC_STD_ALL,
233 CreateDisposition=libsmb.FILE_OPEN,
234 CreateContexts=[posix_context(0)])
235 except NTSTATUSError:
236 pass
237 else:
238 fail = True
239 c.close(f)
241 self.assertFalse(fail, "Opening uppercase file didn't fail")
243 finally:
244 self.delete_test_file(c, '\\xx')
246 def test_posix_perm_files(self):
247 test_files = {}
248 try:
249 c = libsmb.Conn(
250 self.server_ip,
251 "smb3_posix_share",
252 self.lp,
253 self.creds,
254 posix=True)
255 self.assertTrue(c.have_posix())
257 for perm in range(0o600, 0o7777+1):
258 # Owner write permission is required or cleanup will fail, and
259 # owner read is required to list the file if O_PATH is disabled
260 if perm & 0o600 != 0o600:
261 continue
263 # Don't create with setuid or setgid.
264 if perm & 0o6000 != 0:
265 continue
267 fname = 'testfile%04o' % perm
268 test_files[fname] = perm
269 f,_,cc_out = c.create_ex('\\%s' % fname,
270 DesiredAccess=security.SEC_FILE_ALL,
271 CreateDisposition=libsmb.FILE_CREATE,
272 CreateContexts=[posix_context(perm)])
273 if perm & 0o200 == 0o200:
274 c.write(f, buffer=b"data", offset=0)
275 c.close(f)
277 dname = 'testdir%04o' % perm
278 test_files[dname] = perm
279 f,_,cc_out = c.create_ex('\\%s' % dname,
280 DesiredAccess=security.SEC_STD_ALL,
281 CreateDisposition=libsmb.FILE_CREATE,
282 CreateOptions=libsmb.FILE_DIRECTORY_FILE,
283 CreateContexts=[posix_context(perm)])
284 c.close(f)
286 res = c.list("", info_level=libsmb.SMB2_FIND_POSIX_INFORMATION)
288 found_files = {get_string(i['name']): i for i in res}
289 for fname,perm in test_files.items():
290 self.assertIn(get_string(fname), found_files.keys(),
291 'Test file not found')
292 self.assertEqual(test_files[fname], found_files[fname]['perms'],
293 'Requested %04o, Received %04o' % \
294 (test_files[fname], found_files[fname]['perms']))
296 self.assertEqual(found_files[fname]['reparse_tag'],
297 libsmb.IO_REPARSE_TAG_RESERVED_ZERO)
298 self.assertEqual(found_files[fname]['perms'], perm)
299 self.assertEqual(found_files[fname]['owner_sid'],
300 self.samsid + "-1000")
301 self.assertTrue(found_files[fname]['group_sid'].startswith("S-1-22-2-"))
303 if fname.startswith("testfile"):
304 self.assertEqual(found_files[fname]['nlink'], 1)
305 self.assertEqual(found_files[fname]['size'], 4)
306 self.assertEqual(found_files[fname]['allocaction_size'],
307 4096)
308 self.assertEqual(found_files[fname]['attrib'],
309 libsmb.FILE_ATTRIBUTE_ARCHIVE)
310 else:
311 # Note: btrfs always reports the link count of directories as one.
312 if self.fstype == "btrfs":
313 self.assertEqual(found_files[fname]['nlink'], 1)
314 else:
315 self.assertEqual(found_files[fname]['nlink'], 2)
316 self.assertEqual(found_files[fname]['attrib'],
317 libsmb.FILE_ATTRIBUTE_DIRECTORY)
319 finally:
320 if len(test_files) > 0:
321 for fname in test_files.keys():
322 self.delete_test_file(c, '\\%s' % fname)
324 def test_share_root_null_sids_fid(self):
325 c = libsmb.Conn(
326 self.server_ip,
327 "smb3_posix_share",
328 self.lp,
329 self.creds,
330 posix=True)
331 self.assertTrue(c.have_posix())
333 res = c.list("", info_level=libsmb.SMB2_FIND_POSIX_INFORMATION)
334 found_files = {get_string(i['name']): i for i in res}
335 dotdot = found_files['..']
336 self.assertEqual('S-1-0-0', dotdot['owner_sid'],
337 'The owner sid for .. was not NULL')
338 self.assertEqual('S-1-0-0', dotdot['group_sid'],
339 'The group sid for .. was not NULL')
340 self.assertEqual(0, dotdot['ino'], 'The ino for .. was not 0')
341 self.assertEqual(0, dotdot['dev'], 'The dev for .. was not 0')
343 def test_create_context_basic1(self):
345 Check basic CreateContexts response
347 try:
348 c = libsmb.Conn(
349 self.server_ip,
350 "smb3_posix_share",
351 self.lp,
352 self.creds,
353 posix=True)
354 self.assertTrue(c.have_posix())
356 f,_,cc_out = c.create_ex('\\test_create_context_basic1_file',
357 DesiredAccess=security.SEC_STD_ALL,
358 CreateDisposition=libsmb.FILE_CREATE,
359 CreateContexts=[posix_context(0o600)])
360 c.close(f)
362 cc = ndr_unpack(smb3posix.smb3_posix_cc_info, cc_out[0][1])
364 self.assertEqual(cc.nlinks, 1)
365 self.assertEqual(cc.reparse_tag, libsmb.IO_REPARSE_TAG_RESERVED_ZERO)
366 self.assertEqual(cc.posix_perms, 0o600)
367 self.assertEqual(cc.owner, dom_sid(self.samsid + "-1000"))
368 self.assertTrue(str(cc.group).startswith("S-1-22-2-"))
370 f,_,cc_out = c.create_ex('\\test_create_context_basic1_dir',
371 DesiredAccess=security.SEC_STD_ALL,
372 CreateDisposition=libsmb.FILE_CREATE,
373 CreateOptions=libsmb.FILE_DIRECTORY_FILE,
374 CreateContexts=[posix_context(0o700)])
376 c.close(f)
378 cc = ndr_unpack(smb3posix.smb3_posix_cc_info, cc_out[0][1])
380 # Note: btrfs always reports the link count of directories as one.
381 if self.fstype == "btrfs":
382 self.assertEqual(cc.nlinks, 1)
383 else:
384 self.assertEqual(cc.nlinks, 2)
386 self.assertEqual(cc.reparse_tag, libsmb.IO_REPARSE_TAG_RESERVED_ZERO)
387 self.assertEqual(cc.posix_perms, 0o700)
388 self.assertEqual(cc.owner, dom_sid(self.samsid + "-1000"))
389 self.assertTrue(str(cc.group).startswith("S-1-22-2-"))
391 finally:
392 self.delete_test_file(c, '\\test_create_context_basic1_file')
393 self.delete_test_file(c, '\\test_create_context_basic1_dir')
395 def test_create_context_reparse(self):
397 Check reparse tag in posix create context response
399 try:
400 c = libsmb.Conn(
401 self.server_ip,
402 "smb3_posix_share",
403 self.lp,
404 self.creds,
405 posix=True)
406 self.assertTrue(c.have_posix())
408 tag = 0x80000025
410 f,_,cc_out = c.create_ex('\\reparse',
411 DesiredAccess=security.SEC_STD_ALL,
412 CreateDisposition=libsmb.FILE_CREATE,
413 CreateContexts=[posix_context(0o600)])
415 cc = ndr_unpack(smb3posix.smb3_posix_cc_info, cc_out[0][1])
416 self.assertEqual(cc.reparse_tag, libsmb.IO_REPARSE_TAG_RESERVED_ZERO)
418 b = reparse_symlink.put(tag, 0, b'asdf')
419 c.fsctl(f, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
421 c.close(f)
423 f,_,cc_out = c.create_ex('\\reparse',
424 DesiredAccess=security.SEC_STD_ALL,
425 CreateDisposition=libsmb.FILE_OPEN,
426 CreateContexts=[posix_context(0o600)])
427 c.close(f)
429 cc = ndr_unpack(smb3posix.smb3_posix_cc_info, cc_out[0][1])
430 self.assertEqual(cc.reparse_tag, tag)
432 finally:
433 self.delete_test_file(c, '\\reparse')
435 def test_delete_on_close(self):
437 Test two opens with delete-on-close:
438 1. Windows open
439 2. POSIX open
440 Closing handle 1 should unlink the file, a subsequent directory
441 listing shouldn't list the deleted file.
443 (winconn,posixconn) = self.connections()
445 self.clean_file(winconn, 'test_delete_on_close')
447 fdw = winconn.create(
448 'test_delete_on_close',
449 DesiredAccess=security.SEC_FILE_WRITE_ATTRIBUTE | security.SEC_STD_DELETE,
450 ShareAccess=0x07,
451 CreateDisposition=libsmb.FILE_CREATE)
452 self.addCleanup(self.clean_file, winconn, 'test_delete_on_close')
454 fdp,_,_ = posixconn.create_ex(
455 'test_delete_on_close',
456 DesiredAccess=security.SEC_FILE_WRITE_ATTRIBUTE | security.SEC_STD_DELETE,
457 ShareAccess=0x07,
458 CreateDisposition=libsmb.FILE_OPEN,
459 CreateContexts=[posix_context(0o600)])
461 winconn.delete_on_close(fdw, 1)
462 posixconn.delete_on_close(fdp, 1)
464 winconn.close(fdw)
466 # The file should now already be deleted
467 l = winconn.list('', mask='test_delete_on_close')
468 found_files = {get_string(f['name']): f for f in l}
469 self.assertFalse('test_delete_on_close' in found_files)