s3:lib/events: make use of tevent_common_loop_timer_delay()
[Samba/gebeck_regimport.git] / lib / testtools / testtools / testsuite.py
blob67ace56110428a1bf8a6ff93e95881f9d190dd74
1 # Copyright (c) 2009-2011 testtools developers. See LICENSE for details.
3 """Test suites and related things."""
5 __metaclass__ = type
6 __all__ = [
7 'ConcurrentTestSuite',
8 'iterate_tests',
9 'sorted_tests',
12 from testtools.helpers import safe_hasattr, try_imports
14 Queue = try_imports(['Queue.Queue', 'queue.Queue'])
16 import threading
17 import unittest
19 import testtools
22 def iterate_tests(test_suite_or_case):
23 """Iterate through all of the test cases in 'test_suite_or_case'."""
24 try:
25 suite = iter(test_suite_or_case)
26 except TypeError:
27 yield test_suite_or_case
28 else:
29 for test in suite:
30 for subtest in iterate_tests(test):
31 yield subtest
34 class ConcurrentTestSuite(unittest.TestSuite):
35 """A TestSuite whose run() calls out to a concurrency strategy."""
37 def __init__(self, suite, make_tests, wrap_result=None):
38 """Create a ConcurrentTestSuite to execute suite.
40 :param suite: A suite to run concurrently.
41 :param make_tests: A helper function to split the tests in the
42 ConcurrentTestSuite into some number of concurrently executing
43 sub-suites. make_tests must take a suite, and return an iterable
44 of TestCase-like object, each of which must have a run(result)
45 method.
46 :param wrap_result: An optional function that takes a thread-safe
47 result and a thread number and must return a ``TestResult``
48 object. If not provided, then ``ConcurrentTestSuite`` will just
49 use a ``ThreadsafeForwardingResult`` wrapped around the result
50 passed to ``run()``.
51 """
52 super(ConcurrentTestSuite, self).__init__([suite])
53 self.make_tests = make_tests
54 if wrap_result:
55 self._wrap_result = wrap_result
57 def _wrap_result(self, thread_safe_result, thread_number):
58 """Wrap a thread-safe result before sending it test results.
60 You can either override this in a subclass or pass your own
61 ``wrap_result`` in to the constructor. The latter is preferred.
62 """
63 return thread_safe_result
65 def run(self, result):
66 """Run the tests concurrently.
68 This calls out to the provided make_tests helper, and then serialises
69 the results so that result only sees activity from one TestCase at
70 a time.
72 ConcurrentTestSuite provides no special mechanism to stop the tests
73 returned by make_tests, it is up to the make_tests to honour the
74 shouldStop attribute on the result object they are run with, which will
75 be set if an exception is raised in the thread which
76 ConcurrentTestSuite.run is called in.
77 """
78 tests = self.make_tests(self)
79 try:
80 threads = {}
81 queue = Queue()
82 semaphore = threading.Semaphore(1)
83 for i, test in enumerate(tests):
84 process_result = self._wrap_result(
85 testtools.ThreadsafeForwardingResult(result, semaphore), i)
86 reader_thread = threading.Thread(
87 target=self._run_test, args=(test, process_result, queue))
88 threads[test] = reader_thread, process_result
89 reader_thread.start()
90 while threads:
91 finished_test = queue.get()
92 threads[finished_test][0].join()
93 del threads[finished_test]
94 except:
95 for thread, process_result in threads.values():
96 process_result.stop()
97 raise
99 def _run_test(self, test, process_result, queue):
100 try:
101 test.run(process_result)
102 finally:
103 queue.put(test)
106 class FixtureSuite(unittest.TestSuite):
108 def __init__(self, fixture, tests):
109 super(FixtureSuite, self).__init__(tests)
110 self._fixture = fixture
112 def run(self, result):
113 self._fixture.setUp()
114 try:
115 super(FixtureSuite, self).run(result)
116 finally:
117 self._fixture.cleanUp()
119 def sort_tests(self):
120 self._tests = sorted_tests(self, True)
123 def _flatten_tests(suite_or_case, unpack_outer=False):
124 try:
125 tests = iter(suite_or_case)
126 except TypeError:
127 # Not iterable, assume it's a test case.
128 return [(suite_or_case.id(), suite_or_case)]
129 if (type(suite_or_case) in (unittest.TestSuite,) or
130 unpack_outer):
131 # Plain old test suite (or any others we may add).
132 result = []
133 for test in tests:
134 # Recurse to flatten.
135 result.extend(_flatten_tests(test))
136 return result
137 else:
138 # Find any old actual test and grab its id.
139 suite_id = None
140 tests = iterate_tests(suite_or_case)
141 for test in tests:
142 suite_id = test.id()
143 break
144 # If it has a sort_tests method, call that.
145 if safe_hasattr(suite_or_case, 'sort_tests'):
146 suite_or_case.sort_tests()
147 return [(suite_id, suite_or_case)]
150 def sorted_tests(suite_or_case, unpack_outer=False):
151 """Sort suite_or_case while preserving non-vanilla TestSuites."""
152 tests = _flatten_tests(suite_or_case, unpack_outer=unpack_outer)
153 tests.sort()
154 return unittest.TestSuite([test for (sort_key, test) in tests])