1 # Copyright (c) 2010-2011 testtools developers. See LICENSE for details.
3 """Tests for the DeferredRunTest single test execution logic."""
8 from testtools
import (
13 from testtools
.content
import (
16 from testtools
.helpers
import try_import
17 from testtools
.matchers
import (
23 from testtools
.runtest
import RunTest
24 from testtools
.testresult
.doubles
import ExtendedTestResult
25 from testtools
.tests
.test_spinner
import NeedsTwistedTestCase
27 assert_fails_with
= try_import('testtools.deferredruntest.assert_fails_with')
28 AsynchronousDeferredRunTest
= try_import(
29 'testtools.deferredruntest.AsynchronousDeferredRunTest')
30 flush_logged_errors
= try_import(
31 'testtools.deferredruntest.flush_logged_errors')
32 SynchronousDeferredRunTest
= try_import(
33 'testtools.deferredruntest.SynchronousDeferredRunTest')
35 defer
= try_import('twisted.internet.defer')
36 failure
= try_import('twisted.python.failure')
37 log
= try_import('twisted.python.log')
38 DelayedCall
= try_import('twisted.internet.base.DelayedCall')
42 """Tests that we run as part of our tests, nested to avoid discovery."""
46 super(X
.Base
, self
).setUp()
47 self
.calls
= ['setUp']
48 self
.addCleanup(self
.calls
.append
, 'clean-up')
49 def test_something(self
):
50 self
.calls
.append('test')
52 self
.calls
.append('tearDown')
53 super(X
.Base
, self
).tearDown()
55 class ErrorInSetup(Base
):
56 expected_calls
= ['setUp', 'clean-up']
57 expected_results
= [('addError', RuntimeError)]
59 super(X
.ErrorInSetup
, self
).setUp()
60 raise RuntimeError("Error in setUp")
62 class ErrorInTest(Base
):
63 expected_calls
= ['setUp', 'tearDown', 'clean-up']
64 expected_results
= [('addError', RuntimeError)]
65 def test_something(self
):
66 raise RuntimeError("Error in test")
68 class FailureInTest(Base
):
69 expected_calls
= ['setUp', 'tearDown', 'clean-up']
70 expected_results
= [('addFailure', AssertionError)]
71 def test_something(self
):
72 self
.fail("test failed")
74 class ErrorInTearDown(Base
):
75 expected_calls
= ['setUp', 'test', 'clean-up']
76 expected_results
= [('addError', RuntimeError)]
78 raise RuntimeError("Error in tearDown")
80 class ErrorInCleanup(Base
):
81 expected_calls
= ['setUp', 'test', 'tearDown', 'clean-up']
82 expected_results
= [('addError', ZeroDivisionError)]
83 def test_something(self
):
84 self
.calls
.append('test')
85 self
.addCleanup(lambda: 1/0)
87 class TestIntegration(NeedsTwistedTestCase
):
89 def assertResultsMatch(self
, test
, result
):
90 events
= list(result
._events
)
91 self
.assertEqual(('startTest', test
), events
.pop(0))
92 for expected_result
in test
.expected_results
:
93 result
= events
.pop(0)
94 if len(expected_result
) == 1:
95 self
.assertEqual((expected_result
[0], test
), result
)
97 self
.assertEqual((expected_result
[0], test
), result
[:2])
98 error_type
= expected_result
[1]
99 self
.assertIn(error_type
.__name
__, str(result
[2]))
100 self
.assertEqual([('stopTest', test
)], events
)
102 def test_runner(self
):
103 result
= ExtendedTestResult()
104 test
= self
.test_factory('test_something', runTest
=self
.runner
)
106 self
.assertEqual(test
.calls
, self
.test_factory
.expected_calls
)
107 self
.assertResultsMatch(test
, result
)
110 def make_integration_tests():
111 from unittest
import TestSuite
112 from testtools
import clone_test_with_new_id
114 ('RunTest', RunTest
),
115 ('SynchronousDeferredRunTest', SynchronousDeferredRunTest
),
116 ('AsynchronousDeferredRunTest', AsynchronousDeferredRunTest
),
126 base_test
= X
.TestIntegration('test_runner')
127 integration_tests
= []
128 for runner_name
, runner
in runners
:
130 new_test
= clone_test_with_new_id(
131 base_test
, '%s(%s, %s)' % (
135 new_test
.test_factory
= test
136 new_test
.runner
= runner
137 integration_tests
.append(new_test
)
138 return TestSuite(integration_tests
)
141 class TestSynchronousDeferredRunTest(NeedsTwistedTestCase
):
143 def make_result(self
):
144 return ExtendedTestResult()
146 def make_runner(self
, test
):
147 return SynchronousDeferredRunTest(test
, test
.exception_handlers
)
149 def test_success(self
):
150 class SomeCase(TestCase
):
151 def test_success(self
):
152 return defer
.succeed(None)
153 test
= SomeCase('test_success')
154 runner
= self
.make_runner(test
)
155 result
= self
.make_result()
158 result
._events
, Equals([
160 ('addSuccess', test
),
161 ('stopTest', test
)]))
163 def test_failure(self
):
164 class SomeCase(TestCase
):
165 def test_failure(self
):
166 return defer
.maybeDeferred(self
.fail
, "Egads!")
167 test
= SomeCase('test_failure')
168 runner
= self
.make_runner(test
)
169 result
= self
.make_result()
172 [event
[:2] for event
in result
._events
], Equals([
174 ('addFailure', test
),
175 ('stopTest', test
)]))
177 def test_setUp_followed_by_test(self
):
178 class SomeCase(TestCase
):
180 super(SomeCase
, self
).setUp()
181 return defer
.succeed(None)
182 def test_failure(self
):
183 return defer
.maybeDeferred(self
.fail
, "Egads!")
184 test
= SomeCase('test_failure')
185 runner
= self
.make_runner(test
)
186 result
= self
.make_result()
189 [event
[:2] for event
in result
._events
], Equals([
191 ('addFailure', test
),
192 ('stopTest', test
)]))
195 class TestAsynchronousDeferredRunTest(NeedsTwistedTestCase
):
197 def make_reactor(self
):
198 from twisted
.internet
import reactor
201 def make_result(self
):
202 return ExtendedTestResult()
204 def make_runner(self
, test
, timeout
=None):
206 timeout
= self
.make_timeout()
207 return AsynchronousDeferredRunTest(
208 test
, test
.exception_handlers
, timeout
=timeout
)
210 def make_timeout(self
):
213 def test_setUp_returns_deferred_that_fires_later(self
):
214 # setUp can return a Deferred that might fire at any time.
215 # AsynchronousDeferredRunTest will not go on to running the test until
216 # the Deferred returned by setUp actually fires.
219 d
= defer
.Deferred().addCallback(call_log
.append
)
220 class SomeCase(TestCase
):
222 super(SomeCase
, self
).setUp()
223 call_log
.append('setUp')
225 def test_something(self
):
226 call_log
.append('test')
228 self
.assertThat(call_log
, Equals(['setUp']))
230 test
= SomeCase('test_something')
231 timeout
= self
.make_timeout()
232 runner
= self
.make_runner(test
, timeout
=timeout
)
233 result
= self
.make_result()
234 reactor
= self
.make_reactor()
235 reactor
.callLater(timeout
, fire_deferred
)
237 self
.assertThat(call_log
, Equals(['setUp', marker
, 'test']))
239 def test_calls_setUp_test_tearDown_in_sequence(self
):
240 # setUp, the test method and tearDown can all return
241 # Deferreds. AsynchronousDeferredRunTest will make sure that each of
242 # these are run in turn, only going on to the next stage once the
243 # Deferred from the previous stage has fired.
246 a
.addCallback(lambda x
: call_log
.append('a'))
248 b
.addCallback(lambda x
: call_log
.append('b'))
250 c
.addCallback(lambda x
: call_log
.append('c'))
251 class SomeCase(TestCase
):
253 super(SomeCase
, self
).setUp()
254 call_log
.append('setUp')
256 def test_success(self
):
257 call_log
.append('test')
260 super(SomeCase
, self
).tearDown()
261 call_log
.append('tearDown')
263 test
= SomeCase('test_success')
264 timeout
= self
.make_timeout()
265 runner
= self
.make_runner(test
, timeout
)
266 result
= self
.make_result()
267 reactor
= self
.make_reactor()
269 self
.assertThat(call_log
, Equals(['setUp']))
272 self
.assertThat(call_log
, Equals(['setUp', 'a', 'test']))
276 call_log
, Equals(['setUp', 'a', 'test', 'b', 'tearDown']))
278 reactor
.callLater(timeout
* 0.25, fire_a
)
279 reactor
.callLater(timeout
* 0.5, fire_b
)
280 reactor
.callLater(timeout
* 0.75, fire_c
)
283 call_log
, Equals(['setUp', 'a', 'test', 'b', 'tearDown', 'c']))
285 def test_async_cleanups(self
):
286 # Cleanups added with addCleanup can return
287 # Deferreds. AsynchronousDeferredRunTest will run each of them in
289 class SomeCase(TestCase
):
290 def test_whatever(self
):
292 test
= SomeCase('test_whatever')
294 a
= defer
.Deferred().addCallback(lambda x
: call_log
.append('a'))
295 b
= defer
.Deferred().addCallback(lambda x
: call_log
.append('b'))
296 c
= defer
.Deferred().addCallback(lambda x
: call_log
.append('c'))
297 test
.addCleanup(lambda: a
)
298 test
.addCleanup(lambda: b
)
299 test
.addCleanup(lambda: c
)
301 self
.assertThat(call_log
, Equals([]))
304 self
.assertThat(call_log
, Equals(['a']))
307 self
.assertThat(call_log
, Equals(['a', 'b']))
309 timeout
= self
.make_timeout()
310 reactor
= self
.make_reactor()
311 reactor
.callLater(timeout
* 0.25, fire_a
)
312 reactor
.callLater(timeout
* 0.5, fire_b
)
313 reactor
.callLater(timeout
* 0.75, fire_c
)
314 runner
= self
.make_runner(test
, timeout
)
315 result
= self
.make_result()
317 self
.assertThat(call_log
, Equals(['a', 'b', 'c']))
319 def test_clean_reactor(self
):
320 # If there's cruft left over in the reactor, the test fails.
321 reactor
= self
.make_reactor()
322 timeout
= self
.make_timeout()
323 class SomeCase(TestCase
):
324 def test_cruft(self
):
325 reactor
.callLater(timeout
* 10.0, lambda: None)
326 test
= SomeCase('test_cruft')
327 runner
= self
.make_runner(test
, timeout
)
328 result
= self
.make_result()
331 [event
[:2] for event
in result
._events
],
333 [('startTest', test
),
335 ('stopTest', test
)]))
336 error
= result
._events
[1][2]
337 self
.assertThat(error
, KeysEqual('traceback', 'twisted-log'))
339 def test_exports_reactor(self
):
340 # The reactor is set as an attribute on the test case.
341 reactor
= self
.make_reactor()
342 timeout
= self
.make_timeout()
343 class SomeCase(TestCase
):
344 def test_cruft(self
):
345 self
.assertIs(reactor
, self
.reactor
)
346 test
= SomeCase('test_cruft')
347 runner
= self
.make_runner(test
, timeout
)
348 result
= TestResult()
350 self
.assertEqual([], result
.errors
)
351 self
.assertEqual([], result
.failures
)
353 def test_unhandled_error_from_deferred(self
):
354 # If there's a Deferred with an unhandled error, the test fails. Each
355 # unhandled error is reported with a separate traceback.
356 class SomeCase(TestCase
):
357 def test_cruft(self
):
358 # Note we aren't returning the Deferred so that the error will
360 defer
.maybeDeferred(lambda: 1/0)
361 defer
.maybeDeferred(lambda: 2/0)
362 test
= SomeCase('test_cruft')
363 runner
= self
.make_runner(test
)
364 result
= self
.make_result()
366 error
= result
._events
[1][2]
367 result
._events
[1] = ('addError', test
, None)
368 self
.assertThat(result
._events
, Equals(
369 [('startTest', test
),
370 ('addError', test
, None),
371 ('stopTest', test
)]))
375 'unhandled-error-in-deferred',
376 'unhandled-error-in-deferred-1',
379 def test_unhandled_error_from_deferred_combined_with_error(self
):
380 # If there's a Deferred with an unhandled error, the test fails. Each
381 # unhandled error is reported with a separate traceback, and the error
383 class SomeCase(TestCase
):
384 def test_cruft(self
):
385 # Note we aren't returning the Deferred so that the error will
387 defer
.maybeDeferred(lambda: 1/0)
389 test
= SomeCase('test_cruft')
390 runner
= self
.make_runner(test
)
391 result
= self
.make_result()
393 error
= result
._events
[1][2]
394 result
._events
[1] = ('addError', test
, None)
395 self
.assertThat(result
._events
, Equals(
396 [('startTest', test
),
397 ('addError', test
, None),
398 ('stopTest', test
)]))
403 'unhandled-error-in-deferred',
406 @skipIf(os
.name
!= "posix", "Sending SIGINT with os.kill is posix only")
407 def test_keyboard_interrupt_stops_test_run(self
):
408 # If we get a SIGINT during a test run, the test stops and no more
410 SIGINT
= getattr(signal
, 'SIGINT', None)
412 raise self
.skipTest("SIGINT unavailable")
413 class SomeCase(TestCase
):
414 def test_pause(self
):
415 return defer
.Deferred()
416 test
= SomeCase('test_pause')
417 reactor
= self
.make_reactor()
418 timeout
= self
.make_timeout()
419 runner
= self
.make_runner(test
, timeout
* 5)
420 result
= self
.make_result()
421 reactor
.callLater(timeout
, os
.kill
, os
.getpid(), SIGINT
)
422 self
.assertThat(lambda:runner
.run(result
),
423 Raises(MatchesException(KeyboardInterrupt)))
425 @skipIf(os
.name
!= "posix", "Sending SIGINT with os.kill is posix only")
426 def test_fast_keyboard_interrupt_stops_test_run(self
):
427 # If we get a SIGINT during a test run, the test stops and no more
429 SIGINT
= getattr(signal
, 'SIGINT', None)
431 raise self
.skipTest("SIGINT unavailable")
432 class SomeCase(TestCase
):
433 def test_pause(self
):
434 return defer
.Deferred()
435 test
= SomeCase('test_pause')
436 reactor
= self
.make_reactor()
437 timeout
= self
.make_timeout()
438 runner
= self
.make_runner(test
, timeout
* 5)
439 result
= self
.make_result()
440 reactor
.callWhenRunning(os
.kill
, os
.getpid(), SIGINT
)
441 self
.assertThat(lambda:runner
.run(result
),
442 Raises(MatchesException(KeyboardInterrupt)))
444 def test_timeout_causes_test_error(self
):
445 # If a test times out, it reports itself as having failed with a
447 class SomeCase(TestCase
):
448 def test_pause(self
):
449 return defer
.Deferred()
450 test
= SomeCase('test_pause')
451 runner
= self
.make_runner(test
)
452 result
= self
.make_result()
454 error
= result
._events
[1][2]
456 [event
[:2] for event
in result
._events
], Equals(
457 [('startTest', test
),
459 ('stopTest', test
)]))
460 self
.assertIn('TimeoutError', str(error
['traceback']))
462 def test_convenient_construction(self
):
463 # As a convenience method, AsynchronousDeferredRunTest has a
464 # classmethod that returns an AsynchronousDeferredRunTest
465 # factory. This factory has the same API as the RunTest constructor.
469 factory
= AsynchronousDeferredRunTest
.make_factory(reactor
, timeout
)
470 runner
= factory(self
, [handler
])
471 self
.assertIs(reactor
, runner
._reactor
)
472 self
.assertIs(timeout
, runner
._timeout
)
473 self
.assertIs(self
, runner
.case
)
474 self
.assertEqual([handler
], runner
.handlers
)
476 def test_use_convenient_factory(self
):
477 # Make sure that the factory can actually be used.
478 factory
= AsynchronousDeferredRunTest
.make_factory()
479 class SomeCase(TestCase
):
480 run_tests_with
= factory
481 def test_something(self
):
483 case
= SomeCase('test_something')
486 def test_convenient_construction_default_reactor(self
):
487 # As a convenience method, AsynchronousDeferredRunTest has a
488 # classmethod that returns an AsynchronousDeferredRunTest
489 # factory. This factory has the same API as the RunTest constructor.
492 factory
= AsynchronousDeferredRunTest
.make_factory(reactor
=reactor
)
493 runner
= factory(self
, [handler
])
494 self
.assertIs(reactor
, runner
._reactor
)
495 self
.assertIs(self
, runner
.case
)
496 self
.assertEqual([handler
], runner
.handlers
)
498 def test_convenient_construction_default_timeout(self
):
499 # As a convenience method, AsynchronousDeferredRunTest has a
500 # classmethod that returns an AsynchronousDeferredRunTest
501 # factory. This factory has the same API as the RunTest constructor.
504 factory
= AsynchronousDeferredRunTest
.make_factory(timeout
=timeout
)
505 runner
= factory(self
, [handler
])
506 self
.assertIs(timeout
, runner
._timeout
)
507 self
.assertIs(self
, runner
.case
)
508 self
.assertEqual([handler
], runner
.handlers
)
510 def test_convenient_construction_default_debugging(self
):
511 # As a convenience method, AsynchronousDeferredRunTest has a
512 # classmethod that returns an AsynchronousDeferredRunTest
513 # factory. This factory has the same API as the RunTest constructor.
515 factory
= AsynchronousDeferredRunTest
.make_factory(debug
=True)
516 runner
= factory(self
, [handler
])
517 self
.assertIs(self
, runner
.case
)
518 self
.assertEqual([handler
], runner
.handlers
)
519 self
.assertEqual(True, runner
._debug
)
521 def test_deferred_error(self
):
522 class SomeTest(TestCase
):
523 def test_something(self
):
524 return defer
.maybeDeferred(lambda: 1/0)
525 test
= SomeTest('test_something')
526 runner
= self
.make_runner(test
)
527 result
= self
.make_result()
530 [event
[:2] for event
in result
._events
],
534 ('stopTest', test
)]))
535 error
= result
._events
[1][2]
536 self
.assertThat(error
, KeysEqual('traceback', 'twisted-log'))
538 def test_only_addError_once(self
):
539 # Even if the reactor is unclean and the test raises an error and the
540 # cleanups raise errors, we only called addError once per test.
541 reactor
= self
.make_reactor()
542 class WhenItRains(TestCase
):
544 # Add a dirty cleanup.
545 self
.addCleanup(lambda: 3 / 0)
547 from twisted
.internet
.protocol
import ServerFactory
548 reactor
.listenTCP(0, ServerFactory())
550 defer
.maybeDeferred(lambda: 2 / 0)
552 raise RuntimeError("Excess precipitation")
553 test
= WhenItRains('it_pours')
554 runner
= self
.make_runner(test
)
555 result
= self
.make_result()
558 [event
[:2] for event
in result
._events
],
562 ('stopTest', test
)]))
563 error
= result
._events
[1][2]
570 'unhandled-error-in-deferred',
573 def test_log_err_is_error(self
):
574 # An error logged during the test run is recorded as an error in the
576 class LogAnError(TestCase
):
577 def test_something(self
):
580 except ZeroDivisionError:
581 f
= failure
.Failure()
583 test
= LogAnError('test_something')
584 runner
= self
.make_runner(test
)
585 result
= self
.make_result()
588 [event
[:2] for event
in result
._events
],
592 ('stopTest', test
)]))
593 error
= result
._events
[1][2]
594 self
.assertThat(error
, KeysEqual('logged-error', 'twisted-log'))
596 def test_log_err_flushed_is_success(self
):
597 # An error logged during the test run is recorded as an error in the
599 class LogAnError(TestCase
):
600 def test_something(self
):
603 except ZeroDivisionError:
604 f
= failure
.Failure()
606 flush_logged_errors(ZeroDivisionError)
607 test
= LogAnError('test_something')
608 runner
= self
.make_runner(test
)
609 result
= self
.make_result()
615 ('addSuccess', test
, {'twisted-log': text_content('')}),
616 ('stopTest', test
)]))
618 def test_log_in_details(self
):
619 class LogAnError(TestCase
):
620 def test_something(self
):
623 test
= LogAnError('test_something')
624 runner
= self
.make_runner(test
)
625 result
= self
.make_result()
628 [event
[:2] for event
in result
._events
],
632 ('stopTest', test
)]))
633 error
= result
._events
[1][2]
634 self
.assertThat(error
, KeysEqual('traceback', 'twisted-log'))
636 def test_debugging_unchanged_during_test_by_default(self
):
637 debugging
= [(defer
.Deferred
.debug
, DelayedCall
.debug
)]
638 class SomeCase(TestCase
):
639 def test_debugging_enabled(self
):
640 debugging
.append((defer
.Deferred
.debug
, DelayedCall
.debug
))
641 test
= SomeCase('test_debugging_enabled')
642 runner
= AsynchronousDeferredRunTest(
643 test
, handlers
=test
.exception_handlers
,
644 reactor
=self
.make_reactor(), timeout
=self
.make_timeout())
645 runner
.run(self
.make_result())
646 self
.assertEqual(debugging
[0], debugging
[1])
648 def test_debugging_enabled_during_test_with_debug_flag(self
):
649 self
.patch(defer
.Deferred
, 'debug', False)
650 self
.patch(DelayedCall
, 'debug', False)
652 class SomeCase(TestCase
):
653 def test_debugging_enabled(self
):
654 debugging
.append((defer
.Deferred
.debug
, DelayedCall
.debug
))
655 test
= SomeCase('test_debugging_enabled')
656 runner
= AsynchronousDeferredRunTest(
657 test
, handlers
=test
.exception_handlers
,
658 reactor
=self
.make_reactor(), timeout
=self
.make_timeout(),
660 runner
.run(self
.make_result())
661 self
.assertEqual([(True, True)], debugging
)
662 self
.assertEqual(False, defer
.Deferred
.debug
)
663 self
.assertEqual(False, defer
.Deferred
.debug
)
666 class TestAssertFailsWith(NeedsTwistedTestCase
):
667 """Tests for `assert_fails_with`."""
669 if SynchronousDeferredRunTest
is not None:
670 run_tests_with
= SynchronousDeferredRunTest
672 def test_assert_fails_with_success(self
):
673 # assert_fails_with fails the test if it's given a Deferred that
676 d
= assert_fails_with(defer
.succeed(marker
), RuntimeError)
677 def check_result(failure
):
678 failure
.trap(self
.failureException
)
681 Equals("RuntimeError not raised (%r returned)" % (marker
,)))
683 lambda x
: self
.fail("Should not have succeeded"), check_result
)
686 def test_assert_fails_with_success_multiple_types(self
):
687 # assert_fails_with fails the test if it's given a Deferred that
690 d
= assert_fails_with(
691 defer
.succeed(marker
), RuntimeError, ZeroDivisionError)
692 def check_result(failure
):
693 failure
.trap(self
.failureException
)
696 Equals("RuntimeError, ZeroDivisionError not raised "
697 "(%r returned)" % (marker
,)))
699 lambda x
: self
.fail("Should not have succeeded"), check_result
)
702 def test_assert_fails_with_wrong_exception(self
):
703 # assert_fails_with fails the test if it's given a Deferred that
705 d
= assert_fails_with(
706 defer
.maybeDeferred(lambda: 1/0), RuntimeError, KeyboardInterrupt)
707 def check_result(failure
):
708 failure
.trap(self
.failureException
)
709 lines
= str(failure
.value
).splitlines()
713 ("ZeroDivisionError raised instead of RuntimeError, "
714 "KeyboardInterrupt:"),
715 " Traceback (most recent call last):",
718 lambda x
: self
.fail("Should not have succeeded"), check_result
)
721 def test_assert_fails_with_expected_exception(self
):
722 # assert_fails_with calls back with the value of the failure if it's
723 # one of the expected types of failures.
726 except ZeroDivisionError:
727 f
= failure
.Failure()
728 d
= assert_fails_with(defer
.fail(f
), ZeroDivisionError)
729 return d
.addCallback(self
.assertThat
, Equals(f
.value
))
731 def test_custom_failure_exception(self
):
732 # If assert_fails_with is passed a 'failureException' keyword
733 # argument, then it will raise that instead of `AssertionError`.
734 class CustomException(Exception):
737 d
= assert_fails_with(
738 defer
.succeed(marker
), RuntimeError,
739 failureException
=CustomException
)
740 def check_result(failure
):
741 failure
.trap(CustomException
)
744 Equals("RuntimeError not raised (%r returned)" % (marker
,)))
745 return d
.addCallbacks(
746 lambda x
: self
.fail("Should not have succeeded"), check_result
)
750 from unittest
import TestLoader
, TestSuite
752 [TestLoader().loadTestsFromName(__name__
),
753 make_integration_tests()])