1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Samba Python tests."""
24 from samba
import param
25 from samba
.samdb
import SamDB
26 from samba
import credentials
33 from unittest
import SkipTest
35 class SkipTest(Exception):
39 class TestCase(unittest
.TestCase
):
40 """A Samba test case."""
43 super(TestCase
, self
).setUp()
44 test_debug_level
= os
.getenv("TEST_DEBUG_LEVEL")
45 if test_debug_level
is not None:
46 test_debug_level
= int(test_debug_level
)
47 self
._old
_debug
_level
= samba
.get_debug_level()
48 samba
.set_debug_level(test_debug_level
)
49 self
.addCleanup(samba
.set_debug_level
, test_debug_level
)
51 def get_loadparm(self
):
54 def get_credentials(self
):
55 return cmdline_credentials
57 # These functions didn't exist before Python2.7:
58 if sys
.version_info
< (2, 7):
61 def skipTest(self
, reason
):
62 raise SkipTest(reason
)
64 def assertIn(self
, member
, container
, msg
=None):
65 self
.assertTrue(member
in container
, msg
)
67 def assertIs(self
, a
, b
, msg
=None):
68 self
.assertTrue(a
is b
, msg
)
70 def assertIsNot(self
, a
, b
, msg
=None):
71 self
.assertTrue(a
is not b
, msg
)
73 def assertIsNotNone(self
, a
, msg
=None):
74 self
.assertTrue(a
is not None)
76 def assertIsInstance(self
, a
, b
, msg
=None):
77 self
.assertTrue(isinstance(a
, b
), msg
)
79 def assertIsNone(self
, a
, msg
=None):
80 self
.assertTrue(a
is None, msg
)
82 def addCleanup(self
, fn
, *args
, **kwargs
):
83 self
._cleanups
= getattr(self
, "_cleanups", []) + [
86 def _addSkip(self
, result
, reason
):
87 addSkip
= getattr(result
, 'addSkip', None)
88 if addSkip
is not None:
91 warnings
.warn("TestResult has no addSkip method, skips not reported",
93 result
.addSuccess(self
)
95 def run(self
, result
=None):
96 if result
is None: result
= self
.defaultTestResult()
97 result
.startTest(self
)
98 testMethod
= getattr(self
, self
._testMethodName
)
103 self
._addSkip
(result
, str(e
))
105 except KeyboardInterrupt:
108 result
.addError(self
, self
._exc
_info
())
116 self
._addSkip
(result
, str(e
))
118 except self
.failureException
:
119 result
.addFailure(self
, self
._exc
_info
())
120 except KeyboardInterrupt:
123 result
.addError(self
, self
._exc
_info
())
128 self
._addSkip
(result
, str(e
))
129 except KeyboardInterrupt:
132 result
.addError(self
, self
._exc
_info
())
135 for (fn
, args
, kwargs
) in reversed(getattr(self
, "_cleanups", [])):
137 if ok
: result
.addSuccess(self
)
139 result
.stopTest(self
)
142 class LdbTestCase(TestCase
):
143 """Trivial test case for running tests against a LDB."""
146 super(LdbTestCase
, self
).setUp()
147 self
.filename
= os
.tempnam()
148 self
.ldb
= samba
.Ldb(self
.filename
)
150 def set_modules(self
, modules
=[]):
151 """Change the modules for this Ldb."""
153 m
.dn
= ldb
.Dn(self
.ldb
, "@MODULES")
154 m
["@LIST"] = ",".join(modules
)
156 self
.ldb
= samba
.Ldb(self
.filename
)
159 class TestCaseInTempDir(TestCase
):
162 super(TestCaseInTempDir
, self
).setUp()
163 self
.tempdir
= tempfile
.mkdtemp()
164 self
.addCleanup(self
._remove
_tempdir
)
166 def _remove_tempdir(self
):
167 self
.assertEquals([], os
.listdir(self
.tempdir
))
168 os
.rmdir(self
.tempdir
)
173 lp
= param
.LoadParm()
175 lp
.load(os
.environ
["SMB_CONF_PATH"])
177 raise KeyError("SMB_CONF_PATH not set")
181 def env_get_var_value(var_name
):
182 """Returns value for variable in os.environ
184 Function throws AssertionError if variable is defined.
185 Unit-test based python tests require certain input params
186 to be set in environment, otherwise they can't be run
188 assert var_name
in os
.environ
.keys(), "Please supply %s in environment" % var_name
189 return os
.environ
[var_name
]
192 cmdline_credentials
= None
194 class RpcInterfaceTestCase(TestCase
):
195 """DCE/RPC Test case."""
198 class ValidNetbiosNameTests(TestCase
):
200 def test_valid(self
):
201 self
.assertTrue(samba
.valid_netbios_name("FOO"))
203 def test_too_long(self
):
204 self
.assertFalse(samba
.valid_netbios_name("FOO"*10))
206 def test_invalid_characters(self
):
207 self
.assertFalse(samba
.valid_netbios_name("*BLA"))
210 class BlackboxProcessError(Exception):
211 """This is raised when check_output() process returns a non-zero exit status
213 Exception instance should contain the exact exit code (S.returncode),
214 command line (S.cmd), process output (S.stdout) and process error stream
218 def __init__(self
, returncode
, cmd
, stdout
, stderr
):
219 self
.returncode
= returncode
225 return "Command '%s'; exit status %d; stdout: '%s'; stderr: '%s'" % (self
.cmd
, self
.returncode
,
226 self
.stdout
, self
.stderr
)
228 class BlackboxTestCase(TestCase
):
229 """Base test case for blackbox tests."""
231 def _make_cmdline(self
, line
):
232 bindir
= os
.path
.abspath(os
.path
.join(os
.path
.dirname(__file__
), "../../../../bin"))
233 parts
= line
.split(" ")
234 if os
.path
.exists(os
.path
.join(bindir
, parts
[0])):
235 parts
[0] = os
.path
.join(bindir
, parts
[0])
236 line
= " ".join(parts
)
239 def check_run(self
, line
):
240 line
= self
._make
_cmdline
(line
)
241 p
= subprocess
.Popen(line
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
, shell
=True)
244 raise BlackboxProcessError(retcode
, line
, p
.stdout
.read(), p
.stderr
.read())
246 def check_output(self
, line
):
247 line
= self
._make
_cmdline
(line
)
248 p
= subprocess
.Popen(line
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
, shell
=True, close_fds
=True)
251 raise BlackboxProcessError(retcode
, line
, p
.stdout
.read(), p
.stderr
.read())
252 return p
.stdout
.read()
255 def connect_samdb(samdb_url
, lp
=None, session_info
=None, credentials
=None,
256 flags
=0, ldb_options
=None, ldap_only
=False, global_schema
=True):
257 """Create SamDB instance and connects to samdb_url database.
259 :param samdb_url: Url for database to connect to.
260 :param lp: Optional loadparm object
261 :param session_info: Optional session information
262 :param credentials: Optional credentials, defaults to anonymous.
263 :param flags: Optional LDB flags
264 :param ldap_only: If set, only remote LDAP connection will be created.
265 :param global_schema: Whether to use global schema.
267 Added value for tests is that we have a shorthand function
268 to make proper URL for ldb.connect() while using default
269 parameters for connection based on test environment
271 if not "://" in samdb_url
:
272 if not ldap_only
and os
.path
.isfile(samdb_url
):
273 samdb_url
= "tdb://%s" % samdb_url
275 samdb_url
= "ldap://%s" % samdb_url
276 # use 'paged_search' module when connecting remotely
277 if samdb_url
.startswith("ldap://"):
278 ldb_options
= ["modules:paged_searches"]
280 raise AssertionError("Trying to connect to %s while remote "
281 "connection is required" % samdb_url
)
283 # set defaults for test environment
286 if session_info
is None:
287 session_info
= samba
.auth
.system_session(lp
)
288 if credentials
is None:
289 credentials
= cmdline_credentials
291 return SamDB(url
=samdb_url
,
293 session_info
=session_info
,
294 credentials
=credentials
,
297 global_schema
=global_schema
)
300 def connect_samdb_ex(samdb_url
, lp
=None, session_info
=None, credentials
=None,
301 flags
=0, ldb_options
=None, ldap_only
=False):
302 """Connects to samdb_url database
304 :param samdb_url: Url for database to connect to.
305 :param lp: Optional loadparm object
306 :param session_info: Optional session information
307 :param credentials: Optional credentials, defaults to anonymous.
308 :param flags: Optional LDB flags
309 :param ldap_only: If set, only remote LDAP connection will be created.
310 :return: (sam_db_connection, rootDse_record) tuple
312 sam_db
= connect_samdb(samdb_url
, lp
, session_info
, credentials
,
313 flags
, ldb_options
, ldap_only
)
315 res
= sam_db
.search(base
="", expression
="", scope
=ldb
.SCOPE_BASE
,
317 return (sam_db
, res
[0])
320 def connect_samdb_env(env_url
, env_username
, env_password
, lp
=None):
321 """Connect to SamDB by getting URL and Credentials from environment
323 :param env_url: Environment variable name to get lsb url from
324 :param env_username: Username environment variable
325 :param env_password: Password environment variable
326 :return: sam_db_connection
328 samdb_url
= env_get_var_value(env_url
)
329 creds
= credentials
.Credentials()
331 # guess Credentials parameters here. Otherwise workstation
332 # and domain fields are NULL and gencache code segfalts
333 lp
= param
.LoadParm()
335 creds
.set_username(env_get_var_value(env_username
))
336 creds
.set_password(env_get_var_value(env_password
))
337 return connect_samdb(samdb_url
, credentials
=creds
, lp
=lp
)
340 def delete_force(samdb
, dn
):
343 except ldb
.LdbError
, (num
, errstr
):
344 assert num
== ldb
.ERR_NO_SUCH_OBJECT
, "ldb.delete() failed: %s" % errstr