1 # -*- coding: utf-8 -*-
2 # Unix SMB/CIFS implementation. Tests for smb manipulation
3 # Copyright (C) David Mulder <dmulder@suse.com> 2018
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 from samba
import NTSTATUSError
23 from samba
.ntstatus
import (NT_STATUS_OBJECT_NAME_NOT_FOUND
,
24 NT_STATUS_OBJECT_PATH_NOT_FOUND
)
25 from samba
.samba3
import libsmb_samba_internal
as libsmb
26 from samba
.samba3
import param
as s3param
28 PY3
= sys
.version_info
[0] == 3
29 realm
= os
.environ
.get('REALM')
30 domain_dir
= realm
.lower() + '/'
31 test_contents
= 'abcd' * 256
32 utf_contents
= u
'Süßigkeiten Äpfel ' * 128
33 test_literal_bytes_embed_nulls
= b
'\xff\xfe\x14\x61\x00\x00\x62\x63\x64' * 256
34 binary_contents
= b
'\xff\xfe'
35 binary_contents
= binary_contents
+ "Hello cruel world of python3".encode('utf8') * 128
36 test_dir
= os
.path
.join(domain_dir
, 'testing_%d' % random
.randint(0, 0xFFFF))
37 test_file
= os
.path
.join(test_dir
, 'testing').replace('/', '\\')
40 class SMBTests(samba
.tests
.TestCase
):
42 super(SMBTests
, self
).setUp()
43 self
.server
= os
.environ
["SERVER"]
44 creds
= self
.insta_creds(template
=self
.get_credentials())
46 # create an SMB connection to the server
47 lp
= s3param
.get_context()
48 lp
.load(os
.getenv("SMB_CONF_PATH"))
49 self
.smb_conn
= libsmb
.Conn(self
.server
, "sysvol", lp
, creds
)
51 self
.smb_conn
.mkdir(test_dir
)
54 super(SMBTests
, self
).tearDown()
56 self
.smb_conn
.deltree(test_dir
)
61 # check a basic listing returns the items we expect
62 ls
= [f
['name'] for f
in self
.smb_conn
.list(domain_dir
)]
63 self
.assertIn('scripts', ls
,
64 msg
='"scripts" directory not found in sysvol')
65 self
.assertIn('Policies', ls
,
66 msg
='"Policies" directory not found in sysvol')
67 self
.assertNotIn('..', ls
,
68 msg
='Parent (..) found in directory listing')
69 self
.assertNotIn('.', ls
,
70 msg
='Current dir (.) found in directory listing')
72 # using a '*' mask should be the same as using no mask
73 ls_wildcard
= [f
['name'] for f
in self
.smb_conn
.list(domain_dir
, "*")]
74 self
.assertEqual(ls
, ls_wildcard
)
76 # applying a mask should only return items that match that mask
77 ls_pol
= [f
['name'] for f
in self
.smb_conn
.list(domain_dir
, "Pol*")]
78 expected
= ["Policies"]
79 self
.assertEqual(ls_pol
, expected
)
81 # each item in the listing is a has with expected keys
82 expected_keys
= ['attrib', 'mtime', 'name', 'short_name', 'size']
83 for item
in self
.smb_conn
.list(domain_dir
):
84 for key
in expected_keys
:
85 self
.assertIn(key
, item
,
86 msg
="Key '%s' not in listing '%s'" % (key
, item
))
88 def test_deltree(self
):
89 """The smb.deltree API should delete files and sub-dirs"""
90 # create some test sub-dirs
95 for subdir
in ["subdir-X", "subdir-Y", "subdir-Z"]:
96 path
= self
.make_sysvol_path(cur_dir
, subdir
)
97 self
.smb_conn
.mkdir(path
)
101 # create another empty dir just for kicks
102 path
= self
.make_sysvol_path(cur_dir
, "another")
103 self
.smb_conn
.mkdir(path
)
104 empty_dirs
.append(path
)
106 # create some files in these directories
108 for subdir
in dirpaths
:
109 for i
in range(1, 4):
110 contents
= "I'm file {0} in dir {1}!".format(i
, subdir
)
111 path
= self
.make_sysvol_path(subdir
, "file-{0}.txt".format(i
))
112 self
.smb_conn
.savefile(path
, test_contents
.encode('utf8'))
113 filepaths
.append(path
)
115 # sanity-check these dirs/files exist
116 for subdir
in dirpaths
+ empty_dirs
:
117 self
.assertTrue(self
.smb_conn
.chkpath(subdir
),
118 "Failed to create {0}".format(subdir
))
119 for path
in filepaths
:
120 self
.assertTrue(self
.file_exists(path
),
121 "Failed to create {0}".format(path
))
123 # try using deltree to remove a single empty directory
124 path
= empty_dirs
.pop(0)
125 self
.smb_conn
.deltree(path
)
126 self
.assertFalse(self
.smb_conn
.chkpath(path
),
127 "Failed to delete {0}".format(path
))
129 # try using deltree to remove a single file
130 path
= filepaths
.pop(0)
131 self
.smb_conn
.deltree(path
)
132 self
.assertFalse(self
.file_exists(path
),
133 "Failed to delete {0}".format(path
))
135 # delete the top-level dir
136 self
.smb_conn
.deltree(test_dir
)
138 # now check that all the dirs/files are no longer there
139 for subdir
in dirpaths
+ empty_dirs
:
140 self
.assertFalse(self
.smb_conn
.chkpath(subdir
),
141 "Failed to delete {0}".format(subdir
))
142 for path
in filepaths
:
143 self
.assertFalse(self
.file_exists(path
),
144 "Failed to delete {0}".format(path
))
146 def file_exists(self
, filepath
):
147 """Returns whether a regular file exists (by trying to open it)"""
149 self
.smb_conn
.loadfile(filepath
)
151 except NTSTATUSError
as err
:
152 if (err
.args
[0] == NT_STATUS_OBJECT_NAME_NOT_FOUND
or
153 err
.args
[0] == NT_STATUS_OBJECT_PATH_NOT_FOUND
):
159 def test_unlink(self
):
161 The smb.unlink API should delete file
163 # create the test file
164 self
.assertFalse(self
.file_exists(test_file
))
165 self
.smb_conn
.savefile(test_file
, binary_contents
)
166 self
.assertTrue(self
.file_exists(test_file
))
168 # delete it and check that it's gone
169 self
.smb_conn
.unlink(test_file
)
170 self
.assertFalse(self
.file_exists(test_file
))
172 def test_chkpath(self
):
173 """Tests .chkpath determines whether or not a directory exists"""
175 self
.assertTrue(self
.smb_conn
.chkpath(test_dir
))
177 # should return False for a non-existent directory
178 bad_dir
= self
.make_sysvol_path(test_dir
, 'dont_exist')
179 self
.assertFalse(self
.smb_conn
.chkpath(bad_dir
))
181 # should return False for files (because they're not directories)
182 self
.smb_conn
.savefile(test_file
, binary_contents
)
183 self
.assertFalse(self
.smb_conn
.chkpath(test_file
))
185 # check correct result after creating and then deleting a new dir
186 new_dir
= self
.make_sysvol_path(test_dir
, 'test-new')
187 self
.smb_conn
.mkdir(new_dir
)
188 self
.assertTrue(self
.smb_conn
.chkpath(new_dir
))
189 self
.smb_conn
.rmdir(new_dir
)
190 self
.assertFalse(self
.smb_conn
.chkpath(new_dir
))
192 def test_save_load_text(self
):
194 self
.smb_conn
.savefile(test_file
, test_contents
.encode('utf8'))
196 contents
= self
.smb_conn
.loadfile(test_file
)
197 self
.assertEqual(contents
.decode('utf8'), test_contents
,
198 msg
='contents of test file did not match what was written')
200 # check we can overwrite the file with new contents
201 new_contents
= 'wxyz' * 128
202 self
.smb_conn
.savefile(test_file
, new_contents
.encode('utf8'))
203 contents
= self
.smb_conn
.loadfile(test_file
)
204 self
.assertEqual(contents
.decode('utf8'), new_contents
,
205 msg
='contents of test file did not match what was written')
207 # with python2 this will save/load str type (with embedded nulls)
208 # with python3 this will save/load bytes type
209 def test_save_load_string_bytes(self
):
210 self
.smb_conn
.savefile(test_file
, test_literal_bytes_embed_nulls
)
212 contents
= self
.smb_conn
.loadfile(test_file
)
213 self
.assertEqual(contents
, test_literal_bytes_embed_nulls
,
214 msg
='contents of test file did not match what was written')
216 # python3 only this will save/load unicode
217 def test_save_load_utfcontents(self
):
219 self
.smb_conn
.savefile(test_file
, utf_contents
.encode('utf8'))
221 contents
= self
.smb_conn
.loadfile(test_file
)
222 self
.assertEqual(contents
.decode('utf8'), utf_contents
,
223 msg
='contents of test file did not match what was written')
225 # with python2 this will save/load str type
226 # with python3 this will save/load bytes type
227 def test_save_binary_contents(self
):
228 self
.smb_conn
.savefile(test_file
, binary_contents
)
230 contents
= self
.smb_conn
.loadfile(test_file
)
231 self
.assertEqual(contents
, binary_contents
,
232 msg
='contents of test file did not match what was written')
234 def make_sysvol_path(self
, dirpath
, filename
):
235 # return the dir + filename as a sysvol path
236 return os
.path
.join(dirpath
, filename
).replace('/', '\\')