smbd: improve reinit_after_fork error handling
[Samba.git] / python / samba / tests / smb2symlink.py
bloba2b34aee63916c10e8e8e208d3ea2095670479b2
1 # Unix SMB/CIFS implementation.
2 # Copyright Volker Lendecke <vl@samba.org> 2022
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from samba.samba3 import libsmb_samba_internal as libsmb
19 from samba import reparse_symlink
20 from samba import (ntstatus,NTSTATUSError)
21 from samba.dcerpc import security as sec
22 import samba.tests.libsmb
24 class Smb2SymlinkTests(samba.tests.libsmb.LibsmbTests):
26 def connections(self, smb1share=None, smb2share=None):
27 if not smb1share:
28 smb1share = samba.tests.env_get_var_value(
29 "SMB1_SHARE", allow_missing=True)
30 if not smb1share:
31 smb1share = "nosymlinks_smb1allow"
33 try:
34 smb1 = libsmb.Conn(
35 self.server_ip,
36 smb1share,
37 self.lp,
38 self.creds,
39 force_smb1=True)
40 except NTSTATUSError as e:
41 if e.args[0] != ntstatus.NT_STATUS_CONNECTION_RESET:
42 raise
43 smb1.smb1_posix()
45 if not smb2share:
46 smb2share = samba.tests.env_get_var_value(
47 "SMB2_SHARE", allow_missing=True)
48 if not smb2share:
49 smb2share = "nosymlinks"
51 smb2 = libsmb.Conn(
52 self.server_ip,
53 smb2share,
54 self.lp,
55 self.creds)
56 return (smb1, smb2)
58 def create_symlink(self, conn, target, symlink):
59 self.clean_file(conn, symlink)
60 if (conn.protocol() < libsmb.PROTOCOL_SMB2_02 and conn.have_posix()):
61 conn.smb1_symlink(target, symlink)
62 else:
63 flags = 0 if target[0]=='/' else 1
64 syml = conn.create(
65 symlink,
66 DesiredAccess=sec.SEC_FILE_READ_ATTRIBUTE|
67 sec.SEC_FILE_WRITE_ATTRIBUTE|
68 sec.SEC_STD_DELETE,
69 FileAttributes=libsmb.FILE_ATTRIBUTE_NORMAL,
70 CreateDisposition=libsmb.FILE_OPEN_IF,
71 CreateOptions=libsmb.FILE_OPEN_REPARSE_POINT)
72 b = reparse_symlink.symlink_put(target, target, 0, 1)
73 conn.fsctl(syml, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
74 conn.close(syml)
76 def assert_symlink_exception(self, e, expect):
77 self.assertEqual(e.args[0], ntstatus.NT_STATUS_STOPPED_ON_SYMLINK)
78 for k,v in expect.items():
79 if (k == "flags"):
80 # Ignore symlink trust flags for now
81 expected = v & ~libsmb.SYMLINK_TRUST_MASK
82 got = e.args[2].get(k) & ~libsmb.SYMLINK_TRUST_MASK
83 self.assertEqual((k,got), (k,expected))
84 else:
85 self.assertEqual((k,e.args[2].get(k)), (k,v))
87 def test_symlinkerror_directory(self):
88 """Test a symlink in a nonterminal path component"""
89 (smb1,smb2) = self.connections()
90 symlink="syml"
91 target="foo"
92 suffix="bar"
94 self.create_symlink(smb1, target, symlink)
96 with self.assertRaises(NTSTATUSError) as e:
97 fd = smb2.create_ex(f'{symlink}\\{suffix}')
99 self.assert_symlink_exception(
100 e.exception,
101 { 'unparsed_path_length' : len(suffix)+1,
102 'substitute_name' : target,
103 'print_name' : target,
104 'flags' : 0x20000001 })
106 self.clean_file(smb1, symlink)
108 def test_symlinkerror_file(self):
109 """Test a simple symlink in a terminal path"""
110 (smb1,smb2) = self.connections()
111 symlink="syml"
112 target="foo"
114 self.create_symlink(smb1, target, symlink)
116 with self.assertRaises(NTSTATUSError) as e:
117 fd = smb2.create_ex(f'{symlink}')
119 self.assert_symlink_exception(
120 e.exception,
121 { 'unparsed_path_length' : 0,
122 'substitute_name' : target,
123 'print_name' : target,
124 'flags' : 0x20000001 })
126 self.clean_file(smb1, symlink)
128 def test_symlinkerror_absolute_outside_share(self):
130 Test symlinks to outside of the share
131 We return the contents 1:1
133 (smb1,smb2) = self.connections()
134 symlink="syml"
136 for target in ["/etc", "//foo/bar", "/"]:
138 self.create_symlink(smb1, target, symlink)
140 with self.assertRaises(NTSTATUSError) as e:
141 fd = smb2.create_ex(f'{symlink}')
143 self.assert_symlink_exception(
144 e.exception,
145 { 'unparsed_path_length' : 0,
146 'substitute_name' : target,
147 'print_name' : target,
148 'flags' : 0 })
150 self.clean_file(smb1, symlink)
152 def test_symlinkerror_absolute_inshare(self):
153 """Test an absolute symlink inside the share"""
154 (smb1,smb2) = self.connections()
155 symlink="syml"
157 localpath=samba.tests.env_get_var_value("LOCAL_PATH")
158 shareroot=f'{localpath}/nosymlinks'
159 rel_dest="dst"
160 target=f'{shareroot}/{rel_dest}'
162 self.create_symlink(smb1, target, symlink)
164 with self.assertRaises(NTSTATUSError) as e:
165 fd = smb2.create_ex(f'{symlink}')
167 self.assert_symlink_exception(
168 e.exception,
169 { 'unparsed_path_length' : 0,
170 'substitute_name' : rel_dest,
171 'print_name' : rel_dest,
172 'flags' : 0 })
174 self.clean_file(smb1, symlink)
176 def test_symlink_reparse_data_buffer_parse(self):
177 """Test parsing a symlink reparse buffer coming from Windows"""
179 buf = (b'\x0c\x00\x00\xa0\x18\x00\x00\x00'
180 b'\x06\x00\x06\x00\x00\x00\x06\x00'
181 b'\x01\x00\x00\x00\x62\x00\x61\x00'
182 b'\x72\x00\x62\x00\x61\x00\x72\x00')
184 try:
185 (tag,syml) = reparse_symlink.get(buf)
186 except:
187 self.fail("Could not parse symlink buffer")
189 self.assertEqual(tag, "IO_REPARSE_TAG_SYMLINK")
190 self.assertEqual(syml, ('bar', 'bar', 0, 1))
192 def test_bug15505(self):
193 """Test an absolute intermediate symlink inside the share"""
194 (smb1,smb2) = self.connections(smb1share="tmp",smb2share="tmp")
195 symlink="syml"
197 localpath=samba.tests.env_get_var_value("LOCAL_PATH")
199 smb1.mkdir("sub")
200 self.addCleanup(self.clean_file, smb1, "sub")
202 self.create_symlink(smb1, f'{localpath}/sub1', "sub/lnk")
203 self.addCleanup(self.clean_file, smb1, "sub/lnk")
205 smb1.mkdir("sub1")
206 self.addCleanup(self.clean_file, smb1, "sub1")
208 fd = smb1.create("sub1/x", CreateDisposition=libsmb.FILE_CREATE);
209 smb1.close(fd)
210 self.addCleanup(self.clean_file, smb1, "sub1/x")
212 fd = smb2.create("sub\\lnk\\x")
213 smb2.close(fd)
215 if __name__ == '__main__':
216 import unittest
217 unittest.main()