ctdb-server: Use find_public_ip_vnn() in a couple of extra places
[Samba.git] / python / samba / tests / reparsepoints.py
bloba8d506d48d61d3079e4458d876071242187a7523
1 # Unix SMB/CIFS implementation.
2 # Copyright Volker Lendecke <vl@samba.org> 2022
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from samba.samba3 import libsmb_samba_internal as libsmb
19 from samba import (ntstatus,NTSTATUSError)
20 from samba.dcerpc import security as sec
21 from samba import reparse_symlink
22 import samba.tests.libsmb
23 import stat
25 class ReparsePoints(samba.tests.libsmb.LibsmbTests):
27 def connection(self):
28 share = samba.tests.env_get_var_value("SHARENAME", allow_missing=True)
29 if not share:
30 share = "tmp"
31 smb1 = samba.tests.env_get_var_value("SMB1", allow_missing=True)
32 conn = libsmb.Conn(
33 self.server_ip,
34 share,
35 self.lp,
36 self.creds,
37 force_smb1=smb1)
38 return conn
40 def connection_posix(self):
41 share = samba.tests.env_get_var_value("SHARENAME", allow_missing=True)
42 if not share:
43 share = "posix_share"
44 conn = libsmb.Conn(
45 self.server_ip,
46 share,
47 self.lp,
48 self.creds,
49 force_smb1=True)
50 conn.smb1_posix()
51 return conn
53 def clean_file(self, conn, filename):
54 try:
55 conn.unlink(filename)
56 except NTSTATUSError as e:
57 err = e.args[0]
58 ok = (err == ntstatus.NT_STATUS_OBJECT_NAME_NOT_FOUND)
59 ok |= (err == ntstatus.NT_STATUS_OBJECT_PATH_NOT_FOUND)
60 ok |= (err == ntstatus.NT_STATUS_IO_REPARSE_TAG_NOT_HANDLED)
61 if not ok:
62 raise
64 def test_error_not_a_reparse_point(self):
65 conn = self.connection()
66 filename = 'reparse'
67 self.clean_file(conn, filename)
69 fd = conn.create(
70 filename,
71 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE,
72 CreateDisposition=libsmb.FILE_CREATE)
74 with self.assertRaises(NTSTATUSError) as e:
75 conn.fsctl(fd, libsmb.FSCTL_GET_REPARSE_POINT, b'', 1024)
77 self.assertEqual(e.exception.args[0],
78 ntstatus.NT_STATUS_NOT_A_REPARSE_POINT)
80 conn.close(fd)
82 self.clean_file(conn, filename)
84 def test_create_reparse(self):
85 conn = self.connection()
86 filename = 'reparse'
87 self.clean_file(conn, filename)
89 fd = conn.create(
90 filename,
91 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE | sec.SEC_STD_DELETE,
92 CreateDisposition=libsmb.FILE_CREATE)
94 conn.delete_on_close(fd, 1)
96 with self.assertRaises(NTSTATUSError) as e:
97 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b'', 0)
99 self.assertEqual(e.exception.args[0],
100 ntstatus.NT_STATUS_INVALID_BUFFER_SIZE)
102 for i in range(1,15):
103 with self.assertRaises(NTSTATUSError) as e:
104 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, i * b'0', 0)
106 self.assertEqual(e.exception.args[0],
107 ntstatus.NT_STATUS_IO_REPARSE_DATA_INVALID)
109 # Create a syntactically valid [MS-FSCC] 2.1.2.2 REPARSE_DATA_BUFFER
110 b = reparse_symlink.put(0x80000025, 0, b'asdfasdfasdfasdfasdfasdf')
112 # Show that SET_REPARSE_POINT does exact length checks
114 with self.assertRaises(NTSTATUSError) as e:
115 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b + b'0', 0)
116 self.assertEqual(e.exception.args[0],
117 ntstatus.NT_STATUS_IO_REPARSE_DATA_INVALID)
119 with self.assertRaises(NTSTATUSError) as e:
120 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b[:-1], 0)
121 self.assertEqual(e.exception.args[0],
122 ntstatus.NT_STATUS_IO_REPARSE_DATA_INVALID)
124 # Exact length works
125 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
127 b = reparse_symlink.put(0x80000026, 0, b'asdf')
129 # We can't overwrite an existing reparse point with a different tag
130 with self.assertRaises(NTSTATUSError) as e:
131 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
132 self.assertEqual(e.exception.args[0],
133 ntstatus.NT_STATUS_IO_REPARSE_TAG_MISMATCH)
135 def test_query_reparse_tag(self):
136 conn = self.connection()
137 filename = 'reparse'
138 self.clean_file(conn, filename)
140 fd = conn.create(
141 filename,
142 DesiredAccess=sec.SEC_FILE_READ_ATTRIBUTE |
143 sec.SEC_FILE_WRITE_ATTRIBUTE |
144 sec.SEC_STD_DELETE,
145 CreateDisposition=libsmb.FILE_CREATE)
147 conn.delete_on_close(fd, 1)
149 info = conn.qfileinfo(fd, libsmb.FSCC_FILE_ATTRIBUTE_TAG_INFORMATION);
150 self.assertEqual(info['tag'], 0)
152 b = reparse_symlink.put(0x80000026, 0, b'asdf')
153 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
155 info = conn.qfileinfo(fd, libsmb.FSCC_FILE_ATTRIBUTE_TAG_INFORMATION);
156 self.assertEqual(info['tag'], 0x80000026)
159 # Show that we can write to a reparse point when opened properly
160 def test_write_reparse(self):
161 conn = self.connection()
162 filename = 'reparse'
163 self.clean_file(conn, filename)
165 fd = conn.create(
166 filename,
167 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE,
168 CreateDisposition=libsmb.FILE_CREATE)
169 b = reparse_symlink.put(0x80000025, 0, b'asdfasdfasdfasdfasdfasdf')
170 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
171 conn.close(fd)
173 fd,cr,_ = conn.create_ex(
174 filename,
175 DesiredAccess=sec.SEC_FILE_WRITE_DATA|sec.SEC_STD_DELETE,
176 CreateOptions=libsmb.FILE_OPEN_REPARSE_POINT,
177 CreateDisposition=libsmb.FILE_OPEN)
178 self.assertEqual(
179 cr['file_attributes'] & libsmb.FILE_ATTRIBUTE_REPARSE_POINT,
180 libsmb.FILE_ATTRIBUTE_REPARSE_POINT)
182 conn.write(fd, b'x', 1)
184 conn.delete_on_close(fd, 1)
185 conn.close(fd)
187 def test_query_dir_reparse(self):
188 conn = self.connection()
189 filename = 'reparse'
190 self.clean_file(conn, filename)
192 fd = conn.create(
193 filename,
194 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE,
195 CreateDisposition=libsmb.FILE_CREATE)
196 b = reparse_symlink.symlink_put("y", "y", 0, 0)
197 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
198 conn.close(fd)
200 dirents = conn.list("", filename)
201 self.assertEqual(
202 dirents[0]["reparse_tag"],
203 libsmb.IO_REPARSE_TAG_SYMLINK)
205 self.clean_file(conn, filename)
207 # Show that directories can carry reparse points
209 def test_create_reparse_directory(self):
210 conn = self.connection()
211 dirname = "reparse_dir"
212 filename = f'{dirname}\\file.txt'
214 self.clean_file(conn, filename)
215 self.clean_file(conn, dirname)
217 dir_fd = conn.create(
218 dirname,
219 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE|
220 sec.SEC_STD_DELETE,
221 CreateDisposition=libsmb.FILE_CREATE,
222 CreateOptions=libsmb.FILE_DIRECTORY_FILE)
224 b = reparse_symlink.put(0x80000025, 0, b'asdfasdfasdfasdfasdfasdf')
226 try:
227 conn.fsctl(dir_fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
228 except NTSTATUSError as e:
229 err = e.args[0]
230 if (err != ntstatus.NT_STATUS_ACCESS_DENIED):
231 raise
233 if (err == ntstatus.NT_STATUS_ACCESS_DENIED):
234 self.fail("Could not set reparse point on directory")
235 conn.delete_on_close(fd, 1)
236 return
238 with self.assertRaises(NTSTATUSError) as e:
239 fd = conn.create(
240 filename,
241 DesiredAccess=sec.SEC_STD_DELETE,
242 CreateDisposition=libsmb.FILE_CREATE)
244 self.assertEqual(e.exception.args[0],
245 ntstatus.NT_STATUS_IO_REPARSE_TAG_NOT_HANDLED)
247 conn.delete_on_close(dir_fd, 1)
248 conn.close(dir_fd)
250 # Only empty directories can carry reparse points
252 def test_create_reparse_nonempty_directory(self):
253 conn = self.connection()
254 dirname = "reparse_dir"
255 filename = f'{dirname}\\file.txt'
257 self.clean_file(conn, filename)
258 self.clean_file(conn, dirname)
260 dir_fd = conn.create(
261 dirname,
262 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE|
263 sec.SEC_STD_DELETE,
264 CreateDisposition=libsmb.FILE_CREATE,
265 CreateOptions=libsmb.FILE_DIRECTORY_FILE)
266 fd = conn.create(
267 filename,
268 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE|
269 sec.SEC_STD_DELETE,
270 CreateDisposition=libsmb.FILE_CREATE)
273 b = reparse_symlink.put(0x80000025, 0, b'asdf')
274 try:
275 conn.fsctl(dir_fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
276 except NTSTATUSError as e:
277 err = e.args[0]
279 conn.delete_on_close(fd, 1)
280 conn.close(fd)
281 conn.delete_on_close(dir_fd, 1)
282 conn.close(dir_fd)
284 ok = (err == ntstatus.NT_STATUS_DIRECTORY_NOT_EMPTY)
285 if not ok:
286 self.fail(f'set_reparse on nonempty directory returned {err}')
288 # Show that reparse point opens respect share modes
290 def test_reparse_share_modes(self):
291 conn = self.connection()
292 filename = 'reparse'
293 self.clean_file(conn, filename)
295 fd = conn.create(
296 filename,
297 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE,
298 CreateDisposition=libsmb.FILE_CREATE)
299 b = reparse_symlink.put(0x80000025, 0, b'asdfasdfasdfasdfasdfasdf')
300 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
301 conn.close(fd)
303 fd1 = conn.create(
304 filename,
305 DesiredAccess=sec.SEC_FILE_READ_DATA|sec.SEC_STD_DELETE,
306 CreateDisposition=libsmb.FILE_OPEN,
307 CreateOptions=libsmb.FILE_OPEN_REPARSE_POINT)
309 with self.assertRaises(NTSTATUSError) as e:
310 fd2 = conn.create(
311 filename,
312 DesiredAccess=sec.SEC_FILE_READ_DATA,
313 CreateDisposition=libsmb.FILE_OPEN,
314 CreateOptions=libsmb.FILE_OPEN_REPARSE_POINT)
316 self.assertEqual(e.exception.args[0],
317 ntstatus.NT_STATUS_SHARING_VIOLATION)
319 conn.delete_on_close(fd1, 1)
320 conn.close(fd1)
322 def test_delete_reparse_point(self):
323 conn = self.connection()
324 filename = 'reparse'
325 self.clean_file(conn, filename)
327 fd = conn.create(
328 filename,
329 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE,
330 CreateDisposition=libsmb.FILE_CREATE)
331 b = reparse_symlink.put(0x80000025, 0, b'asdfasdfasdfasdfasdfasdf')
332 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
333 conn.close(fd)
335 (fd,cr,_) = conn.create_ex(
336 filename,
337 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE|sec.SEC_STD_DELETE,
338 CreateOptions=libsmb.FILE_OPEN_REPARSE_POINT,
339 CreateDisposition=libsmb.FILE_OPEN)
341 self.assertEqual(cr['file_attributes'] &
342 libsmb.FILE_ATTRIBUTE_REPARSE_POINT,
343 libsmb.FILE_ATTRIBUTE_REPARSE_POINT)
345 b = reparse_symlink.put(0x80000026, 0, b'')
346 with self.assertRaises(NTSTATUSError) as e:
347 conn.fsctl(fd, libsmb.FSCTL_DELETE_REPARSE_POINT, b, 0)
348 self.assertEqual(e.exception.args[0],
349 ntstatus.NT_STATUS_IO_REPARSE_TAG_MISMATCH)
351 b = reparse_symlink.put(0x80000026, 0, b' ')
352 with self.assertRaises(NTSTATUSError) as e:
353 conn.fsctl(fd, libsmb.FSCTL_DELETE_REPARSE_POINT, b, 0)
354 self.assertEqual(e.exception.args[0],
355 ntstatus.NT_STATUS_IO_REPARSE_DATA_INVALID)
357 b = reparse_symlink.put(0x80000025, 0, b' ')
358 with self.assertRaises(NTSTATUSError) as e:
359 conn.fsctl(fd, libsmb.FSCTL_DELETE_REPARSE_POINT, b, 0)
360 self.assertEqual(e.exception.args[0],
361 ntstatus.NT_STATUS_IO_REPARSE_DATA_INVALID)
363 b = reparse_symlink.put(0x80000025, 0, b'')
364 conn.fsctl(fd, libsmb.FSCTL_DELETE_REPARSE_POINT, b, 0)
366 with self.assertRaises(NTSTATUSError) as e:
367 conn.fsctl(fd, libsmb.FSCTL_DELETE_REPARSE_POINT, b, 0)
368 self.assertEqual(e.exception.args[0],
369 ntstatus.NT_STATUS_NOT_A_REPARSE_POINT)
371 conn.close(fd)
373 (fd,cr,_) = conn.create_ex(
374 filename,
375 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE|sec.SEC_STD_DELETE,
376 CreateDisposition=libsmb.FILE_OPEN)
378 self.assertEqual(cr['file_attributes'] &
379 libsmb.FILE_ATTRIBUTE_REPARSE_POINT,
382 conn.delete_on_close(fd, 1)
383 conn.close(fd)
385 def do_test_nfs_reparse(self, filename, filetype, nfstype):
386 """Test special file reparse tag"""
387 smb2 = self.connection()
388 smb1 = self.connection_posix()
390 self.clean_file(smb2, filename)
391 smb1.mknod(filename, filetype | 0o755)
393 fd = smb2.create(
394 filename,
395 DesiredAccess=sec.SEC_FILE_READ_ATTRIBUTE|sec.SEC_STD_DELETE,
396 CreateOptions=libsmb.FILE_OPEN_REPARSE_POINT,
397 CreateDisposition=libsmb.FILE_OPEN)
398 smb2.delete_on_close(fd, 1)
400 info = smb2.qfileinfo(fd, libsmb.FSCC_FILE_ATTRIBUTE_TAG_INFORMATION);
401 self.assertEqual(info['tag'], libsmb.IO_REPARSE_TAG_NFS)
403 reparse = smb2.fsctl(fd, libsmb.FSCTL_GET_REPARSE_POINT, b'', 1024)
404 (tag, ) = reparse_symlink.get(reparse)
405 self.assertEqual(tag, nfstype)
407 def test_fifo_reparse(self):
408 """Test FIFO reparse tag"""
409 self.do_test_nfs_reparse('fifo', stat.S_IFIFO, 'NFS_SPECFILE_FIFO')
411 def test_sock_reparse(self):
412 """Test SOCK reparse tag"""
413 self.do_test_nfs_reparse('sock', stat.S_IFSOCK, 'NFS_SPECFILE_SOCK')
415 if __name__ == '__main__':
416 import unittest
417 unittest.main()