s4:dsdb/drepl: update the source_dsa_obj/invocation_id in repsFrom
[Samba/gebeck_regimport.git] / lib / testtools / testtools / deferredruntest.py
blobcf33c06e277f9938c51d938191d0d999e156da60
1 # Copyright (c) 2010 testtools developers. See LICENSE for details.
3 """Individual test case execution for tests that return Deferreds.
5 This module is highly experimental and is liable to change in ways that cause
6 subtle failures in tests. Use at your own peril.
7 """
9 __all__ = [
10 'assert_fails_with',
11 'AsynchronousDeferredRunTest',
12 'AsynchronousDeferredRunTestForBrokenTwisted',
13 'SynchronousDeferredRunTest',
16 import sys
18 from testtools.compat import StringIO
19 from testtools.content import (
20 Content,
21 text_content,
23 from testtools.content_type import UTF8_TEXT
24 from testtools.runtest import RunTest
25 from testtools._spinner import (
26 extract_result,
27 NoResultError,
28 Spinner,
29 TimeoutError,
30 trap_unhandled_errors,
33 from twisted.internet import defer
34 from twisted.python import log
35 from twisted.trial.unittest import _LogObserver
38 class _DeferredRunTest(RunTest):
39 """Base for tests that return Deferreds."""
41 def _got_user_failure(self, failure, tb_label='traceback'):
42 """We got a failure from user code."""
43 return self._got_user_exception(
44 (failure.type, failure.value, failure.getTracebackObject()),
45 tb_label=tb_label)
48 class SynchronousDeferredRunTest(_DeferredRunTest):
49 """Runner for tests that return synchronous Deferreds."""
51 def _run_user(self, function, *args):
52 d = defer.maybeDeferred(function, *args)
53 d.addErrback(self._got_user_failure)
54 result = extract_result(d)
55 return result
58 def run_with_log_observers(observers, function, *args, **kwargs):
59 """Run 'function' with the given Twisted log observers."""
60 real_observers = list(log.theLogPublisher.observers)
61 for observer in real_observers:
62 log.theLogPublisher.removeObserver(observer)
63 for observer in observers:
64 log.theLogPublisher.addObserver(observer)
65 try:
66 return function(*args, **kwargs)
67 finally:
68 for observer in observers:
69 log.theLogPublisher.removeObserver(observer)
70 for observer in real_observers:
71 log.theLogPublisher.addObserver(observer)
74 # Observer of the Twisted log that we install during tests.
75 _log_observer = _LogObserver()
79 class AsynchronousDeferredRunTest(_DeferredRunTest):
80 """Runner for tests that return Deferreds that fire asynchronously.
82 That is, this test runner assumes that the Deferreds will only fire if the
83 reactor is left to spin for a while.
85 Do not rely too heavily on the nuances of the behaviour of this class.
86 What it does to the reactor is black magic, and if we can find nicer ways
87 of doing it we will gladly break backwards compatibility.
89 This is highly experimental code. Use at your own risk.
90 """
92 def __init__(self, case, handlers=None, reactor=None, timeout=0.005,
93 debug=False):
94 """Construct an `AsynchronousDeferredRunTest`.
96 :param case: The `TestCase` to run.
97 :param handlers: A list of exception handlers (ExceptionType, handler)
98 where 'handler' is a callable that takes a `TestCase`, a
99 ``testtools.TestResult`` and the exception raised.
100 :param reactor: The Twisted reactor to use. If not given, we use the
101 default reactor.
102 :param timeout: The maximum time allowed for running a test. The
103 default is 0.005s.
104 :param debug: Whether or not to enable Twisted's debugging. Use this
105 to get information about unhandled Deferreds and left-over
106 DelayedCalls. Defaults to False.
108 super(AsynchronousDeferredRunTest, self).__init__(case, handlers)
109 if reactor is None:
110 from twisted.internet import reactor
111 self._reactor = reactor
112 self._timeout = timeout
113 self._debug = debug
115 @classmethod
116 def make_factory(cls, reactor=None, timeout=0.005, debug=False):
117 """Make a factory that conforms to the RunTest factory interface."""
118 # This is horrible, but it means that the return value of the method
119 # will be able to be assigned to a class variable *and* also be
120 # invoked directly.
121 class AsynchronousDeferredRunTestFactory:
122 def __call__(self, case, handlers=None):
123 return cls(case, handlers, reactor, timeout, debug)
124 return AsynchronousDeferredRunTestFactory()
126 @defer.deferredGenerator
127 def _run_cleanups(self):
128 """Run the cleanups on the test case.
130 We expect that the cleanups on the test case can also return
131 asynchronous Deferreds. As such, we take the responsibility for
132 running the cleanups, rather than letting TestCase do it.
134 while self.case._cleanups:
135 f, args, kwargs = self.case._cleanups.pop()
136 d = defer.maybeDeferred(f, *args, **kwargs)
137 thing = defer.waitForDeferred(d)
138 yield thing
139 try:
140 thing.getResult()
141 except Exception:
142 exc_info = sys.exc_info()
143 self.case._report_traceback(exc_info)
144 last_exception = exc_info[1]
145 yield last_exception
147 def _make_spinner(self):
148 """Make the `Spinner` to be used to run the tests."""
149 return Spinner(self._reactor, debug=self._debug)
151 def _run_deferred(self):
152 """Run the test, assuming everything in it is Deferred-returning.
154 This should return a Deferred that fires with True if the test was
155 successful and False if the test was not successful. It should *not*
156 call addSuccess on the result, because there's reactor clean up that
157 we needs to be done afterwards.
159 fails = []
161 def fail_if_exception_caught(exception_caught):
162 if self.exception_caught == exception_caught:
163 fails.append(None)
165 def clean_up(ignored=None):
166 """Run the cleanups."""
167 d = self._run_cleanups()
168 def clean_up_done(result):
169 if result is not None:
170 self._exceptions.append(result)
171 fails.append(None)
172 return d.addCallback(clean_up_done)
174 def set_up_done(exception_caught):
175 """Set up is done, either clean up or run the test."""
176 if self.exception_caught == exception_caught:
177 fails.append(None)
178 return clean_up()
179 else:
180 d = self._run_user(self.case._run_test_method, self.result)
181 d.addCallback(fail_if_exception_caught)
182 d.addBoth(tear_down)
183 return d
185 def tear_down(ignored):
186 d = self._run_user(self.case._run_teardown, self.result)
187 d.addCallback(fail_if_exception_caught)
188 d.addBoth(clean_up)
189 return d
191 d = self._run_user(self.case._run_setup, self.result)
192 d.addCallback(set_up_done)
193 d.addBoth(lambda ignored: len(fails) == 0)
194 return d
196 def _log_user_exception(self, e):
197 """Raise 'e' and report it as a user exception."""
198 try:
199 raise e
200 except e.__class__:
201 self._got_user_exception(sys.exc_info())
203 def _blocking_run_deferred(self, spinner):
204 try:
205 return trap_unhandled_errors(
206 spinner.run, self._timeout, self._run_deferred)
207 except NoResultError:
208 # We didn't get a result at all! This could be for any number of
209 # reasons, but most likely someone hit Ctrl-C during the test.
210 raise KeyboardInterrupt
211 except TimeoutError:
212 # The function took too long to run.
213 self._log_user_exception(TimeoutError(self.case, self._timeout))
214 return False, []
216 def _run_core(self):
217 # Add an observer to trap all logged errors.
218 self.case.reactor = self._reactor
219 error_observer = _log_observer
220 full_log = StringIO()
221 full_observer = log.FileLogObserver(full_log)
222 spinner = self._make_spinner()
223 successful, unhandled = run_with_log_observers(
224 [error_observer.gotEvent, full_observer.emit],
225 self._blocking_run_deferred, spinner)
227 self.case.addDetail(
228 'twisted-log', Content(UTF8_TEXT, full_log.readlines))
230 logged_errors = error_observer.flushErrors()
231 for logged_error in logged_errors:
232 successful = False
233 self._got_user_failure(logged_error, tb_label='logged-error')
235 if unhandled:
236 successful = False
237 for debug_info in unhandled:
238 f = debug_info.failResult
239 info = debug_info._getDebugTracebacks()
240 if info:
241 self.case.addDetail(
242 'unhandled-error-in-deferred-debug',
243 text_content(info))
244 self._got_user_failure(f, 'unhandled-error-in-deferred')
246 junk = spinner.clear_junk()
247 if junk:
248 successful = False
249 self._log_user_exception(UncleanReactorError(junk))
251 if successful:
252 self.result.addSuccess(self.case, details=self.case.getDetails())
254 def _run_user(self, function, *args):
255 """Run a user-supplied function.
257 This just makes sure that it returns a Deferred, regardless of how the
258 user wrote it.
260 d = defer.maybeDeferred(function, *args)
261 return d.addErrback(self._got_user_failure)
264 class AsynchronousDeferredRunTestForBrokenTwisted(AsynchronousDeferredRunTest):
265 """Test runner that works around Twisted brokenness re reactor junk.
267 There are many APIs within Twisted itself where a Deferred fires but
268 leaves cleanup work scheduled for the reactor to do. Arguably, many of
269 these are bugs. This runner iterates the reactor event loop a number of
270 times after every test, in order to shake out these buggy-but-commonplace
271 events.
274 def _make_spinner(self):
275 spinner = super(
276 AsynchronousDeferredRunTestForBrokenTwisted, self)._make_spinner()
277 spinner._OBLIGATORY_REACTOR_ITERATIONS = 2
278 return spinner
281 def assert_fails_with(d, *exc_types, **kwargs):
282 """Assert that 'd' will fail with one of 'exc_types'.
284 The normal way to use this is to return the result of 'assert_fails_with'
285 from your unit test.
287 Note that this function is experimental and unstable. Use at your own
288 peril; expect the API to change.
290 :param d: A Deferred that is expected to fail.
291 :param exc_types: The exception types that the Deferred is expected to
292 fail with.
293 :param failureException: An optional keyword argument. If provided, will
294 raise that exception instead of
295 ``testtools.TestCase.failureException``.
296 :return: A Deferred that will fail with an ``AssertionError`` if 'd' does
297 not fail with one of the exception types.
299 failureException = kwargs.pop('failureException', None)
300 if failureException is None:
301 # Avoid circular imports.
302 from testtools import TestCase
303 failureException = TestCase.failureException
304 expected_names = ", ".join(exc_type.__name__ for exc_type in exc_types)
305 def got_success(result):
306 raise failureException(
307 "%s not raised (%r returned)" % (expected_names, result))
308 def got_failure(failure):
309 if failure.check(*exc_types):
310 return failure.value
311 raise failureException("%s raised instead of %s:\n %s" % (
312 failure.type.__name__, expected_names, failure.getTraceback()))
313 return d.addCallbacks(got_success, got_failure)
316 def flush_logged_errors(*error_types):
317 return _log_observer.flushErrors(*error_types)
320 class UncleanReactorError(Exception):
321 """Raised when the reactor has junk in it."""
323 def __init__(self, junk):
324 Exception.__init__(self,
325 "The reactor still thinks it needs to do things. Close all "
326 "connections, kill all processes and make sure all delayed "
327 "calls have either fired or been cancelled:\n%s"
328 % ''.join(map(self._get_junk_info, junk)))
330 def _get_junk_info(self, junk):
331 from twisted.internet.base import DelayedCall
332 if isinstance(junk, DelayedCall):
333 ret = str(junk)
334 else:
335 ret = repr(junk)
336 return ' %s\n' % (ret,)