1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Samba Python tests."""
24 from samba
import param
25 from samba
.samdb
import SamDB
26 from samba
import credentials
32 from unittest
import SkipTest
34 class SkipTest(Exception):
38 class TestCase(unittest
.TestCase
):
39 """A Samba test case."""
42 super(TestCase
, self
).setUp()
43 test_debug_level
= os
.getenv("TEST_DEBUG_LEVEL")
44 if test_debug_level
is not None:
45 test_debug_level
= int(test_debug_level
)
46 self
._old
_debug
_level
= samba
.get_debug_level()
47 samba
.set_debug_level(test_debug_level
)
48 self
.addCleanup(samba
.set_debug_level
, test_debug_level
)
50 def get_loadparm(self
):
53 def get_credentials(self
):
54 return cmdline_credentials
56 if not getattr(unittest
.TestCase
, "skipTest", None):
57 def skipTest(self
, reason
):
58 raise SkipTest(reason
)
61 class LdbTestCase(unittest
.TestCase
):
62 """Trivial test case for running tests against a LDB."""
65 super(LdbTestCase
, self
).setUp()
66 self
.filename
= os
.tempnam()
67 self
.ldb
= samba
.Ldb(self
.filename
)
69 def set_modules(self
, modules
=[]):
70 """Change the modules for this Ldb."""
72 m
.dn
= ldb
.Dn(self
.ldb
, "@MODULES")
73 m
["@LIST"] = ",".join(modules
)
75 self
.ldb
= samba
.Ldb(self
.filename
)
78 class TestCaseInTempDir(TestCase
):
81 super(TestCaseInTempDir
, self
).setUp()
82 self
.tempdir
= tempfile
.mkdtemp()
83 self
.addCleanup(self
._remove
_tempdir
)
85 def _remove_tempdir(self
):
86 self
.assertEquals([], os
.listdir(self
.tempdir
))
87 os
.rmdir(self
.tempdir
)
94 lp
.load(os
.environ
["SMB_CONF_PATH"])
96 raise KeyError("SMB_CONF_PATH not set")
100 def env_get_var_value(var_name
):
101 """Returns value for variable in os.environ
103 Function throws AssertionError if variable is defined.
104 Unit-test based python tests require certain input params
105 to be set in environment, otherwise they can't be run
107 assert var_name
in os
.environ
.keys(), "Please supply %s in environment" % var_name
108 return os
.environ
[var_name
]
111 cmdline_credentials
= None
113 class RpcInterfaceTestCase(TestCase
):
114 """DCE/RPC Test case."""
117 class ValidNetbiosNameTests(TestCase
):
119 def test_valid(self
):
120 self
.assertTrue(samba
.valid_netbios_name("FOO"))
122 def test_too_long(self
):
123 self
.assertFalse(samba
.valid_netbios_name("FOO"*10))
125 def test_invalid_characters(self
):
126 self
.assertFalse(samba
.valid_netbios_name("*BLA"))
129 class BlackboxProcessError(Exception):
130 """This is raised when check_output() process returns a non-zero exit status
132 Exception instance should contain the exact exit code (S.returncode),
133 command line (S.cmd), process output (S.stdout) and process error stream
137 def __init__(self
, returncode
, cmd
, stdout
, stderr
):
138 self
.returncode
= returncode
144 return "Command '%s'; exit status %d; stdout: '%s'; stderr: '%s'" % (self
.cmd
, self
.returncode
,
145 self
.stdout
, self
.stderr
)
147 class BlackboxTestCase(TestCase
):
148 """Base test case for blackbox tests."""
150 def _make_cmdline(self
, line
):
151 bindir
= os
.path
.abspath(os
.path
.join(os
.path
.dirname(__file__
), "../../../../bin"))
152 parts
= line
.split(" ")
153 if os
.path
.exists(os
.path
.join(bindir
, parts
[0])):
154 parts
[0] = os
.path
.join(bindir
, parts
[0])
155 line
= " ".join(parts
)
158 def check_run(self
, line
):
159 line
= self
._make
_cmdline
(line
)
160 p
= subprocess
.Popen(line
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
, shell
=True)
163 raise BlackboxProcessError(retcode
, line
, p
.stdout
.read(), p
.stderr
.read())
165 def check_output(self
, line
):
166 line
= self
._make
_cmdline
(line
)
167 p
= subprocess
.Popen(line
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
, shell
=True, close_fds
=True)
170 raise BlackboxProcessError(retcode
, line
, p
.stdout
.read(), p
.stderr
.read())
171 return p
.stdout
.read()
174 def connect_samdb(samdb_url
, lp
=None, session_info
=None, credentials
=None,
175 flags
=0, ldb_options
=None, ldap_only
=False, global_schema
=True):
176 """Create SamDB instance and connects to samdb_url database.
178 :param samdb_url: Url for database to connect to.
179 :param lp: Optional loadparm object
180 :param session_info: Optional session information
181 :param credentials: Optional credentials, defaults to anonymous.
182 :param flags: Optional LDB flags
183 :param ldap_only: If set, only remote LDAP connection will be created.
184 :param global_schema: Whether to use global schema.
186 Added value for tests is that we have a shorthand function
187 to make proper URL for ldb.connect() while using default
188 parameters for connection based on test environment
190 if not "://" in samdb_url
:
191 if not ldap_only
and os
.path
.isfile(samdb_url
):
192 samdb_url
= "tdb://%s" % samdb_url
194 samdb_url
= "ldap://%s" % samdb_url
195 # use 'paged_search' module when connecting remotely
196 if samdb_url
.startswith("ldap://"):
197 ldb_options
= ["modules:paged_searches"]
199 raise AssertionError("Trying to connect to %s while remote "
200 "connection is required" % samdb_url
)
202 # set defaults for test environment
205 if session_info
is None:
206 session_info
= samba
.auth
.system_session(lp
)
207 if credentials
is None:
208 credentials
= cmdline_credentials
210 return SamDB(url
=samdb_url
,
212 session_info
=session_info
,
213 credentials
=credentials
,
216 global_schema
=global_schema
)
219 def connect_samdb_ex(samdb_url
, lp
=None, session_info
=None, credentials
=None,
220 flags
=0, ldb_options
=None, ldap_only
=False):
221 """Connects to samdb_url database
223 :param samdb_url: Url for database to connect to.
224 :param lp: Optional loadparm object
225 :param session_info: Optional session information
226 :param credentials: Optional credentials, defaults to anonymous.
227 :param flags: Optional LDB flags
228 :param ldap_only: If set, only remote LDAP connection will be created.
229 :return: (sam_db_connection, rootDse_record) tuple
231 sam_db
= connect_samdb(samdb_url
, lp
, session_info
, credentials
,
232 flags
, ldb_options
, ldap_only
)
234 res
= sam_db
.search(base
="", expression
="", scope
=ldb
.SCOPE_BASE
,
236 return (sam_db
, res
[0])
239 def connect_samdb_env(env_url
, env_username
, env_password
, lp
=None):
240 """Connect to SamDB by getting URL and Credentials from environment
242 :param env_url: Environment variable name to get lsb url from
243 :param env_username: Username environment variable
244 :param env_password: Password environment variable
245 :return: sam_db_connection
247 samdb_url
= env_get_var_value(env_url
)
248 creds
= credentials
.Credentials()
250 # guess Credentials parameters here. Otherwise workstation
251 # and domain fields are NULL and gencache code segfalts
252 lp
= param
.LoadParm()
254 creds
.set_username(env_get_var_value(env_username
))
255 creds
.set_password(env_get_var_value(env_password
))
256 return connect_samdb(samdb_url
, credentials
=creds
, lp
=lp
)
259 def delete_force(samdb
, dn
):
262 except ldb
.LdbError
, (num
, errstr
):
263 assert num
== ldb
.ERR_NO_SUCH_OBJECT
, "ldb.delete() failed: %s" % errstr