docs: man ldb.3: Add missing meta data.
[Samba/gebeck_regimport.git] / lib / testtools / testtools / testsuite.py
blob41eb6f7d3a5c9ea4f30a28adad97efdc4cab6300
1 # Copyright (c) 2009-2011 testtools developers. See LICENSE for details.
3 """Test suites and related things."""
5 __metaclass__ = type
6 __all__ = [
7 'ConcurrentTestSuite',
8 'iterate_tests',
11 from testtools.helpers import try_imports
13 Queue = try_imports(['Queue.Queue', 'queue.Queue'])
15 import threading
16 import unittest
18 import testtools
21 def iterate_tests(test_suite_or_case):
22 """Iterate through all of the test cases in 'test_suite_or_case'."""
23 try:
24 suite = iter(test_suite_or_case)
25 except TypeError:
26 yield test_suite_or_case
27 else:
28 for test in suite:
29 for subtest in iterate_tests(test):
30 yield subtest
33 class ConcurrentTestSuite(unittest.TestSuite):
34 """A TestSuite whose run() calls out to a concurrency strategy."""
36 def __init__(self, suite, make_tests, wrap_result=None):
37 """Create a ConcurrentTestSuite to execute suite.
39 :param suite: A suite to run concurrently.
40 :param make_tests: A helper function to split the tests in the
41 ConcurrentTestSuite into some number of concurrently executing
42 sub-suites. make_tests must take a suite, and return an iterable
43 of TestCase-like object, each of which must have a run(result)
44 method.
45 :param wrap_result: An optional function that takes a thread-safe
46 result and a thread number and must return a ``TestResult``
47 object. If not provided, then ``ConcurrentTestSuite`` will just
48 use a ``ThreadsafeForwardingResult`` wrapped around the result
49 passed to ``run()``.
50 """
51 super(ConcurrentTestSuite, self).__init__([suite])
52 self.make_tests = make_tests
53 if wrap_result:
54 self._wrap_result = wrap_result
56 def _wrap_result(self, thread_safe_result, thread_number):
57 """Wrap a thread-safe result before sending it test results.
59 You can either override this in a subclass or pass your own
60 ``wrap_result`` in to the constructor. The latter is preferred.
61 """
62 return thread_safe_result
64 def run(self, result):
65 """Run the tests concurrently.
67 This calls out to the provided make_tests helper, and then serialises
68 the results so that result only sees activity from one TestCase at
69 a time.
71 ConcurrentTestSuite provides no special mechanism to stop the tests
72 returned by make_tests, it is up to the make_tests to honour the
73 shouldStop attribute on the result object they are run with, which will
74 be set if an exception is raised in the thread which
75 ConcurrentTestSuite.run is called in.
76 """
77 tests = self.make_tests(self)
78 try:
79 threads = {}
80 queue = Queue()
81 semaphore = threading.Semaphore(1)
82 for i, test in enumerate(tests):
83 process_result = self._wrap_result(
84 testtools.ThreadsafeForwardingResult(result, semaphore), i)
85 reader_thread = threading.Thread(
86 target=self._run_test, args=(test, process_result, queue))
87 threads[test] = reader_thread, process_result
88 reader_thread.start()
89 while threads:
90 finished_test = queue.get()
91 threads[finished_test][0].join()
92 del threads[finished_test]
93 except:
94 for thread, process_result in threads.values():
95 process_result.stop()
96 raise
98 def _run_test(self, test, process_result, queue):
99 try:
100 test.run(process_result)
101 finally:
102 queue.put(test)
105 class FixtureSuite(unittest.TestSuite):
107 def __init__(self, fixture, tests):
108 super(FixtureSuite, self).__init__(tests)
109 self._fixture = fixture
111 def run(self, result):
112 self._fixture.setUp()
113 try:
114 super(FixtureSuite, self).run(result)
115 finally:
116 self._fixture.cleanUp()