Clean up a comment noticed by Jonathan Shao@Panasas.com and remove an
[Samba/gebeck_regimport.git] / source3 / stf / comfychair.py
blob522f9bedeba44058e57a6ca66b7d95ae04e35147
1 #! /usr/bin/env python
3 # Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
4 # Copyright (C) 2003 by Tim Potter <tpot@samba.org>
5 #
6 # This program is free software; you can redistribute it and/or
7 # modify it under the terms of the GNU General Public License as
8 # published by the Free Software Foundation; either version 2 of the
9 # License, or (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 # USA
21 """comfychair: a Python-based instrument of software torture.
23 Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
24 Copyright (C) 2003 by Tim Potter <tpot@samba.org>
26 This is a test framework designed for testing programs written in
27 Python, or (through a fork/exec interface) any other language.
29 For more information, see the file README.comfychair.
31 To run a test suite based on ComfyChair, just run it as a program.
32 """
34 import sys, re
37 class TestCase:
38 """A base class for tests. This class defines required functions which
39 can optionally be overridden by subclasses. It also provides some
40 utility functions for"""
42 def __init__(self):
43 self.test_log = ""
44 self.background_pids = []
45 self._cleanups = []
46 self._enter_rundir()
47 self._save_environment()
48 self.add_cleanup(self.teardown)
51 # --------------------------------------------------
52 # Save and restore directory
53 def _enter_rundir(self):
54 import os
55 self.basedir = os.getcwd()
56 self.add_cleanup(self._restore_directory)
57 self.rundir = os.path.join(self.basedir,
58 'testtmp',
59 self.__class__.__name__)
60 self.tmpdir = os.path.join(self.rundir, 'tmp')
61 os.system("rm -fr %s" % self.rundir)
62 os.makedirs(self.tmpdir)
63 os.system("mkdir -p %s" % self.rundir)
64 os.chdir(self.rundir)
66 def _restore_directory(self):
67 import os
68 os.chdir(self.basedir)
70 # --------------------------------------------------
71 # Save and restore environment
72 def _save_environment(self):
73 import os
74 self._saved_environ = os.environ.copy()
75 self.add_cleanup(self._restore_environment)
77 def _restore_environment(self):
78 import os
79 os.environ.clear()
80 os.environ.update(self._saved_environ)
83 def setup(self):
84 """Set up test fixture."""
85 pass
87 def teardown(self):
88 """Tear down test fixture."""
89 pass
91 def runtest(self):
92 """Run the test."""
93 pass
96 def add_cleanup(self, c):
97 """Queue a cleanup to be run when the test is complete."""
98 self._cleanups.append(c)
101 def fail(self, reason = ""):
102 """Say the test failed."""
103 raise AssertionError(reason)
106 #############################################################
107 # Requisition methods
109 def require(self, predicate, message):
110 """Check a predicate for running this test.
112 If the predicate value is not true, the test is skipped with a message explaining
113 why."""
114 if not predicate:
115 raise NotRunError, message
117 def require_root(self):
118 """Skip this test unless run by root."""
119 import os
120 self.require(os.getuid() == 0,
121 "must be root to run this test")
123 #############################################################
124 # Assertion methods
126 def assert_(self, expr, reason = ""):
127 if not expr:
128 raise AssertionError(reason)
130 def assert_equal(self, a, b):
131 if not a == b:
132 raise AssertionError("assertEquals failed: %s" % `(a, b)`)
134 def assert_notequal(self, a, b):
135 if a == b:
136 raise AssertionError("assertNotEqual failed: %s" % `(a, b)`)
138 def assert_re_match(self, pattern, s):
139 """Assert that a string matches a particular pattern
141 Inputs:
142 pattern string: regular expression
143 s string: to be matched
145 Raises:
146 AssertionError if not matched
148 if not re.match(pattern, s):
149 raise AssertionError("string does not match regexp\n"
150 " string: %s\n"
151 " re: %s" % (`s`, `pattern`))
153 def assert_re_search(self, pattern, s):
154 """Assert that a string *contains* a particular pattern
156 Inputs:
157 pattern string: regular expression
158 s string: to be searched
160 Raises:
161 AssertionError if not matched
163 if not re.search(pattern, s):
164 raise AssertionError("string does not contain regexp\n"
165 " string: %s\n"
166 " re: %s" % (`s`, `pattern`))
169 def assert_no_file(self, filename):
170 import os.path
171 assert not os.path.exists(filename), ("file exists but should not: %s" % filename)
174 #############################################################
175 # Methods for running programs
177 def runcmd_background(self, cmd):
178 import os
179 self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n"
180 pid = os.fork()
181 if pid == 0:
182 # child
183 try:
184 os.execvp("/bin/sh", ["/bin/sh", "-c", cmd])
185 finally:
186 os._exit(127)
187 self.test_log = self.test_log + "pid: %d\n" % pid
188 return pid
191 def runcmd(self, cmd, expectedResult = 0):
192 """Run a command, fail if the command returns an unexpected exit
193 code. Return the output produced."""
194 rc, output, stderr = self.runcmd_unchecked(cmd)
195 if rc != expectedResult:
196 raise AssertionError("""command returned %d; expected %s: \"%s\"
197 stdout:
199 stderr:
200 %s""" % (rc, expectedResult, cmd, output, stderr))
202 return output, stderr
205 def run_captured(self, cmd):
206 """Run a command, capturing stdout and stderr.
208 Based in part on popen2.py
210 Returns (waitstatus, stdout, stderr)."""
211 import os, types
212 pid = os.fork()
213 if pid == 0:
214 # child
215 try:
216 pid = os.getpid()
217 openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC
219 outfd = os.open('%d.out' % pid, openmode, 0666)
220 os.dup2(outfd, 1)
221 os.close(outfd)
223 errfd = os.open('%d.err' % pid, openmode, 0666)
224 os.dup2(errfd, 2)
225 os.close(errfd)
227 if isinstance(cmd, types.StringType):
228 cmd = ['/bin/sh', '-c', cmd]
230 os.execvp(cmd[0], cmd)
231 finally:
232 os._exit(127)
233 else:
234 # parent
235 exited_pid, waitstatus = os.waitpid(pid, 0)
236 stdout = open('%d.out' % pid).read()
237 stderr = open('%d.err' % pid).read()
238 return waitstatus, stdout, stderr
241 def runcmd_unchecked(self, cmd, skip_on_noexec = 0):
242 """Invoke a command; return (exitcode, stdout, stderr)"""
243 import os
244 waitstatus, stdout, stderr = self.run_captured(cmd)
245 assert not os.WIFSIGNALED(waitstatus), \
246 ("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus)))
247 rc = os.WEXITSTATUS(waitstatus)
248 self.test_log = self.test_log + ("""Run command: %s
249 Wait status: %#x (exit code %d, signal %d)
250 stdout:
252 stderr:
253 %s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus),
254 stdout, stderr))
255 if skip_on_noexec and rc == 127:
256 # Either we could not execute the command or the command
257 # returned exit code 127. According to system(3) we can't
258 # tell the difference.
259 raise NotRunError, "could not execute %s" % `cmd`
260 return rc, stdout, stderr
263 def explain_failure(self, exc_info = None):
264 print "test_log:"
265 print self.test_log
268 def log(self, msg):
269 """Log a message to the test log. This message is displayed if
270 the test fails, or when the runtests function is invoked with
271 the verbose option."""
272 self.test_log = self.test_log + msg + "\n"
275 class NotRunError(Exception):
276 """Raised if a test must be skipped because of missing resources"""
277 def __init__(self, value = None):
278 self.value = value
281 def _report_error(case, debugger):
282 """Ask the test case to explain failure, and optionally run a debugger
284 Input:
285 case TestCase instance
286 debugger if true, a debugger function to be applied to the traceback
288 import sys
289 ex = sys.exc_info()
290 print "-----------------------------------------------------------------"
291 if ex:
292 import traceback
293 traceback.print_exc(file=sys.stdout)
294 case.explain_failure()
295 print "-----------------------------------------------------------------"
297 if debugger:
298 tb = ex[2]
299 debugger(tb)
302 def runtests(test_list, verbose = 0, debugger = None):
303 """Run a series of tests.
305 Inputs:
306 test_list sequence of TestCase classes
307 verbose print more information as testing proceeds
308 debugger debugger object to be applied to errors
310 Returns:
311 unix return code: 0 for success, 1 for failures, 2 for test failure
313 import traceback
314 ret = 0
315 for test_class in test_list:
316 print "%-30s" % _test_name(test_class),
317 # flush now so that long running tests are easier to follow
318 sys.stdout.flush()
320 obj = None
321 try:
322 try: # run test and show result
323 obj = test_class()
324 obj.setup()
325 obj.runtest()
326 print "OK"
327 except KeyboardInterrupt:
328 print "INTERRUPT"
329 _report_error(obj, debugger)
330 ret = 2
331 break
332 except NotRunError, msg:
333 print "NOTRUN, %s" % msg.value
334 except:
335 print "FAIL"
336 _report_error(obj, debugger)
337 ret = 1
338 finally:
339 while obj and obj._cleanups:
340 try:
341 apply(obj._cleanups.pop())
342 except KeyboardInterrupt:
343 print "interrupted during teardown"
344 _report_error(obj, debugger)
345 ret = 2
346 break
347 except:
348 print "error during teardown"
349 _report_error(obj, debugger)
350 ret = 1
351 # Display log file if we're verbose
352 if ret == 0 and verbose:
353 obj.explain_failure()
355 return ret
358 def _test_name(test_class):
359 """Return a human-readable name for a test class.
361 try:
362 return test_class.__name__
363 except:
364 return `test_class`
367 def print_help():
368 """Help for people running tests"""
369 import sys
370 print """%s: software test suite based on ComfyChair
372 usage:
373 To run all tests, just run this program. To run particular tests,
374 list them on the command line.
376 options:
377 --help show usage message
378 --list list available tests
379 --verbose, -v show more information while running tests
380 --post-mortem, -p enter Python debugger on error
381 """ % sys.argv[0]
384 def print_list(test_list):
385 """Show list of available tests"""
386 for test_class in test_list:
387 print " %s" % _test_name(test_class)
390 def main(tests, extra_tests=[]):
391 """Main entry point for test suites based on ComfyChair.
393 inputs:
394 tests Sequence of TestCase subclasses to be run by default.
395 extra_tests Sequence of TestCase subclasses that are available but
396 not run by default.
398 Test suites should contain this boilerplate:
400 if __name__ == '__main__':
401 comfychair.main(tests)
403 This function handles standard options such as --help and --list, and
404 by default runs all tests in the suggested order.
406 Calls sys.exit() on completion.
408 from sys import argv
409 import getopt, sys
411 opt_verbose = 0
412 debugger = None
414 opts, args = getopt.getopt(argv[1:], 'pv',
415 ['help', 'list', 'verbose', 'post-mortem'])
416 for opt, opt_arg in opts:
417 if opt == '--help':
418 print_help()
419 return
420 elif opt == '--list':
421 print_list(tests + extra_tests)
422 return
423 elif opt == '--verbose' or opt == '-v':
424 opt_verbose = 1
425 elif opt == '--post-mortem' or opt == '-p':
426 import pdb
427 debugger = pdb.post_mortem
429 if args:
430 all_tests = tests + extra_tests
431 by_name = {}
432 for t in all_tests:
433 by_name[_test_name(t)] = t
434 which_tests = []
435 for name in args:
436 which_tests.append(by_name[name])
437 else:
438 which_tests = tests
440 sys.exit(runtests(which_tests, verbose=opt_verbose,
441 debugger=debugger))
444 if __name__ == '__main__':
445 print __doc__