s4:gensec/gssapi: use gensec_gssapi_max_{input,wrapped}_size() for all backends
[Samba.git] / python / samba / tests / __init__.py
blob3e7094fe067f1cef5f138685b3b64e7b9e0e0880
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Samba Python tests."""
20 import os
21 import ldb
22 import samba
23 import samba.auth
24 from samba import param
25 from samba.samdb import SamDB
26 from samba import credentials
27 import subprocess
28 import sys
29 import tempfile
30 import unittest
32 try:
33 from unittest import SkipTest
34 except ImportError:
35 class SkipTest(Exception):
36 """Test skipped."""
39 class TestCase(unittest.TestCase):
40 """A Samba test case."""
42 def setUp(self):
43 super(TestCase, self).setUp()
44 test_debug_level = os.getenv("TEST_DEBUG_LEVEL")
45 if test_debug_level is not None:
46 test_debug_level = int(test_debug_level)
47 self._old_debug_level = samba.get_debug_level()
48 samba.set_debug_level(test_debug_level)
49 self.addCleanup(samba.set_debug_level, test_debug_level)
51 def get_loadparm(self):
52 return env_loadparm()
54 def get_credentials(self):
55 return cmdline_credentials
57 # These functions didn't exist before Python2.7:
58 if sys.version_info < (2, 7):
59 import warnings
61 def skipTest(self, reason):
62 raise SkipTest(reason)
64 def assertIn(self, member, container, msg=None):
65 self.assertTrue(member in container, msg)
67 def assertIs(self, a, b, msg=None):
68 self.assertTrue(a is b, msg)
70 def assertIsNot(self, a, b, msg=None):
71 self.assertTrue(a is not b, msg)
73 def assertIsNotNone(self, a, msg=None):
74 self.assertTrue(a is not None)
76 def assertIsInstance(self, a, b, msg=None):
77 self.assertTrue(isinstance(a, b), msg)
79 def assertIsNone(self, a, msg=None):
80 self.assertTrue(a is None, msg)
82 def addCleanup(self, fn, *args, **kwargs):
83 self._cleanups = getattr(self, "_cleanups", []) + [
84 (fn, args, kwargs)]
86 def _addSkip(self, result, reason):
87 addSkip = getattr(result, 'addSkip', None)
88 if addSkip is not None:
89 addSkip(self, reason)
90 else:
91 warnings.warn("TestResult has no addSkip method, skips not reported",
92 RuntimeWarning, 2)
93 result.addSuccess(self)
95 def run(self, result=None):
96 if result is None: result = self.defaultTestResult()
97 result.startTest(self)
98 testMethod = getattr(self, self._testMethodName)
99 try:
100 try:
101 self.setUp()
102 except SkipTest, e:
103 self._addSkip(result, str(e))
104 return
105 except KeyboardInterrupt:
106 raise
107 except:
108 result.addError(self, self._exc_info())
109 return
111 ok = False
112 try:
113 testMethod()
114 ok = True
115 except SkipTest, e:
116 self._addSkip(result, str(e))
117 return
118 except self.failureException:
119 result.addFailure(self, self._exc_info())
120 except KeyboardInterrupt:
121 raise
122 except:
123 result.addError(self, self._exc_info())
125 try:
126 self.tearDown()
127 except SkipTest, e:
128 self._addSkip(result, str(e))
129 except KeyboardInterrupt:
130 raise
131 except:
132 result.addError(self, self._exc_info())
133 ok = False
135 for (fn, args, kwargs) in reversed(getattr(self, "_cleanups", [])):
136 fn(*args, **kwargs)
137 if ok: result.addSuccess(self)
138 finally:
139 result.stopTest(self)
142 class LdbTestCase(TestCase):
143 """Trivial test case for running tests against a LDB."""
145 def setUp(self):
146 super(LdbTestCase, self).setUp()
147 self.filename = os.tempnam()
148 self.ldb = samba.Ldb(self.filename)
150 def set_modules(self, modules=[]):
151 """Change the modules for this Ldb."""
152 m = ldb.Message()
153 m.dn = ldb.Dn(self.ldb, "@MODULES")
154 m["@LIST"] = ",".join(modules)
155 self.ldb.add(m)
156 self.ldb = samba.Ldb(self.filename)
159 class TestCaseInTempDir(TestCase):
161 def setUp(self):
162 super(TestCaseInTempDir, self).setUp()
163 self.tempdir = tempfile.mkdtemp()
164 self.addCleanup(self._remove_tempdir)
166 def _remove_tempdir(self):
167 self.assertEquals([], os.listdir(self.tempdir))
168 os.rmdir(self.tempdir)
169 self.tempdir = None
172 def env_loadparm():
173 lp = param.LoadParm()
174 try:
175 lp.load(os.environ["SMB_CONF_PATH"])
176 except KeyError:
177 raise KeyError("SMB_CONF_PATH not set")
178 return lp
181 def env_get_var_value(var_name):
182 """Returns value for variable in os.environ
184 Function throws AssertionError if variable is defined.
185 Unit-test based python tests require certain input params
186 to be set in environment, otherwise they can't be run
188 assert var_name in os.environ.keys(), "Please supply %s in environment" % var_name
189 return os.environ[var_name]
192 cmdline_credentials = None
194 class RpcInterfaceTestCase(TestCase):
195 """DCE/RPC Test case."""
198 class ValidNetbiosNameTests(TestCase):
200 def test_valid(self):
201 self.assertTrue(samba.valid_netbios_name("FOO"))
203 def test_too_long(self):
204 self.assertFalse(samba.valid_netbios_name("FOO"*10))
206 def test_invalid_characters(self):
207 self.assertFalse(samba.valid_netbios_name("*BLA"))
210 class BlackboxProcessError(Exception):
211 """This is raised when check_output() process returns a non-zero exit status
213 Exception instance should contain the exact exit code (S.returncode),
214 command line (S.cmd), process output (S.stdout) and process error stream
215 (S.stderr)
218 def __init__(self, returncode, cmd, stdout, stderr):
219 self.returncode = returncode
220 self.cmd = cmd
221 self.stdout = stdout
222 self.stderr = stderr
224 def __str__(self):
225 return "Command '%s'; exit status %d; stdout: '%s'; stderr: '%s'" % (self.cmd, self.returncode,
226 self.stdout, self.stderr)
228 class BlackboxTestCase(TestCase):
229 """Base test case for blackbox tests."""
231 def _make_cmdline(self, line):
232 bindir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../bin"))
233 parts = line.split(" ")
234 if os.path.exists(os.path.join(bindir, parts[0])):
235 parts[0] = os.path.join(bindir, parts[0])
236 line = " ".join(parts)
237 return line
239 def check_run(self, line):
240 line = self._make_cmdline(line)
241 p = subprocess.Popen(line, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
242 retcode = p.wait()
243 if retcode:
244 raise BlackboxProcessError(retcode, line, p.stdout.read(), p.stderr.read())
246 def check_output(self, line):
247 line = self._make_cmdline(line)
248 p = subprocess.Popen(line, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
249 retcode = p.wait()
250 if retcode:
251 raise BlackboxProcessError(retcode, line, p.stdout.read(), p.stderr.read())
252 return p.stdout.read()
255 def connect_samdb(samdb_url, lp=None, session_info=None, credentials=None,
256 flags=0, ldb_options=None, ldap_only=False, global_schema=True):
257 """Create SamDB instance and connects to samdb_url database.
259 :param samdb_url: Url for database to connect to.
260 :param lp: Optional loadparm object
261 :param session_info: Optional session information
262 :param credentials: Optional credentials, defaults to anonymous.
263 :param flags: Optional LDB flags
264 :param ldap_only: If set, only remote LDAP connection will be created.
265 :param global_schema: Whether to use global schema.
267 Added value for tests is that we have a shorthand function
268 to make proper URL for ldb.connect() while using default
269 parameters for connection based on test environment
271 if not "://" in samdb_url:
272 if not ldap_only and os.path.isfile(samdb_url):
273 samdb_url = "tdb://%s" % samdb_url
274 else:
275 samdb_url = "ldap://%s" % samdb_url
276 # use 'paged_search' module when connecting remotely
277 if samdb_url.startswith("ldap://"):
278 ldb_options = ["modules:paged_searches"]
279 elif ldap_only:
280 raise AssertionError("Trying to connect to %s while remote "
281 "connection is required" % samdb_url)
283 # set defaults for test environment
284 if lp is None:
285 lp = env_loadparm()
286 if session_info is None:
287 session_info = samba.auth.system_session(lp)
288 if credentials is None:
289 credentials = cmdline_credentials
291 return SamDB(url=samdb_url,
292 lp=lp,
293 session_info=session_info,
294 credentials=credentials,
295 flags=flags,
296 options=ldb_options,
297 global_schema=global_schema)
300 def connect_samdb_ex(samdb_url, lp=None, session_info=None, credentials=None,
301 flags=0, ldb_options=None, ldap_only=False):
302 """Connects to samdb_url database
304 :param samdb_url: Url for database to connect to.
305 :param lp: Optional loadparm object
306 :param session_info: Optional session information
307 :param credentials: Optional credentials, defaults to anonymous.
308 :param flags: Optional LDB flags
309 :param ldap_only: If set, only remote LDAP connection will be created.
310 :return: (sam_db_connection, rootDse_record) tuple
312 sam_db = connect_samdb(samdb_url, lp, session_info, credentials,
313 flags, ldb_options, ldap_only)
314 # fetch RootDse
315 res = sam_db.search(base="", expression="", scope=ldb.SCOPE_BASE,
316 attrs=["*"])
317 return (sam_db, res[0])
320 def connect_samdb_env(env_url, env_username, env_password, lp=None):
321 """Connect to SamDB by getting URL and Credentials from environment
323 :param env_url: Environment variable name to get lsb url from
324 :param env_username: Username environment variable
325 :param env_password: Password environment variable
326 :return: sam_db_connection
328 samdb_url = env_get_var_value(env_url)
329 creds = credentials.Credentials()
330 if lp is None:
331 # guess Credentials parameters here. Otherwise workstation
332 # and domain fields are NULL and gencache code segfalts
333 lp = param.LoadParm()
334 creds.guess(lp)
335 creds.set_username(env_get_var_value(env_username))
336 creds.set_password(env_get_var_value(env_password))
337 return connect_samdb(samdb_url, credentials=creds, lp=lp)
340 def delete_force(samdb, dn):
341 try:
342 samdb.delete(dn)
343 except ldb.LdbError, (num, errstr):
344 assert num == ldb.ERR_NO_SUCH_OBJECT, "ldb.delete() failed: %s" % errstr