s3: Fix some nonempty line endings
[Samba/gebeck_regimport.git] / lib / testtools / testtools / tests / test_deferredruntest.py
blobab0fd87890a9ed6d35c47cb887e4cc03b9c461fa
1 # Copyright (c) 2010-2011 testtools developers. See LICENSE for details.
3 """Tests for the DeferredRunTest single test execution logic."""
5 import os
6 import signal
8 from testtools import (
9 skipIf,
10 TestCase,
11 TestResult,
13 from testtools.content import (
14 text_content,
16 from testtools.helpers import try_import
17 from testtools.matchers import (
18 Equals,
19 KeysEqual,
20 MatchesException,
21 Raises,
23 from testtools.runtest import RunTest
24 from testtools.testresult.doubles import ExtendedTestResult
25 from testtools.tests.test_spinner import NeedsTwistedTestCase
27 assert_fails_with = try_import('testtools.deferredruntest.assert_fails_with')
28 AsynchronousDeferredRunTest = try_import(
29 'testtools.deferredruntest.AsynchronousDeferredRunTest')
30 flush_logged_errors = try_import(
31 'testtools.deferredruntest.flush_logged_errors')
32 SynchronousDeferredRunTest = try_import(
33 'testtools.deferredruntest.SynchronousDeferredRunTest')
35 defer = try_import('twisted.internet.defer')
36 failure = try_import('twisted.python.failure')
37 log = try_import('twisted.python.log')
38 DelayedCall = try_import('twisted.internet.base.DelayedCall')
41 class X(object):
42 """Tests that we run as part of our tests, nested to avoid discovery."""
44 class Base(TestCase):
45 def setUp(self):
46 super(X.Base, self).setUp()
47 self.calls = ['setUp']
48 self.addCleanup(self.calls.append, 'clean-up')
49 def test_something(self):
50 self.calls.append('test')
51 def tearDown(self):
52 self.calls.append('tearDown')
53 super(X.Base, self).tearDown()
55 class ErrorInSetup(Base):
56 expected_calls = ['setUp', 'clean-up']
57 expected_results = [('addError', RuntimeError)]
58 def setUp(self):
59 super(X.ErrorInSetup, self).setUp()
60 raise RuntimeError("Error in setUp")
62 class ErrorInTest(Base):
63 expected_calls = ['setUp', 'tearDown', 'clean-up']
64 expected_results = [('addError', RuntimeError)]
65 def test_something(self):
66 raise RuntimeError("Error in test")
68 class FailureInTest(Base):
69 expected_calls = ['setUp', 'tearDown', 'clean-up']
70 expected_results = [('addFailure', AssertionError)]
71 def test_something(self):
72 self.fail("test failed")
74 class ErrorInTearDown(Base):
75 expected_calls = ['setUp', 'test', 'clean-up']
76 expected_results = [('addError', RuntimeError)]
77 def tearDown(self):
78 raise RuntimeError("Error in tearDown")
80 class ErrorInCleanup(Base):
81 expected_calls = ['setUp', 'test', 'tearDown', 'clean-up']
82 expected_results = [('addError', ZeroDivisionError)]
83 def test_something(self):
84 self.calls.append('test')
85 self.addCleanup(lambda: 1/0)
87 class TestIntegration(NeedsTwistedTestCase):
89 def assertResultsMatch(self, test, result):
90 events = list(result._events)
91 self.assertEqual(('startTest', test), events.pop(0))
92 for expected_result in test.expected_results:
93 result = events.pop(0)
94 if len(expected_result) == 1:
95 self.assertEqual((expected_result[0], test), result)
96 else:
97 self.assertEqual((expected_result[0], test), result[:2])
98 error_type = expected_result[1]
99 self.assertIn(error_type.__name__, str(result[2]))
100 self.assertEqual([('stopTest', test)], events)
102 def test_runner(self):
103 result = ExtendedTestResult()
104 test = self.test_factory('test_something', runTest=self.runner)
105 test.run(result)
106 self.assertEqual(test.calls, self.test_factory.expected_calls)
107 self.assertResultsMatch(test, result)
110 def make_integration_tests():
111 from unittest import TestSuite
112 from testtools import clone_test_with_new_id
113 runners = [
114 ('RunTest', RunTest),
115 ('SynchronousDeferredRunTest', SynchronousDeferredRunTest),
116 ('AsynchronousDeferredRunTest', AsynchronousDeferredRunTest),
119 tests = [
120 X.ErrorInSetup,
121 X.ErrorInTest,
122 X.ErrorInTearDown,
123 X.FailureInTest,
124 X.ErrorInCleanup,
126 base_test = X.TestIntegration('test_runner')
127 integration_tests = []
128 for runner_name, runner in runners:
129 for test in tests:
130 new_test = clone_test_with_new_id(
131 base_test, '%s(%s, %s)' % (
132 base_test.id(),
133 runner_name,
134 test.__name__))
135 new_test.test_factory = test
136 new_test.runner = runner
137 integration_tests.append(new_test)
138 return TestSuite(integration_tests)
141 class TestSynchronousDeferredRunTest(NeedsTwistedTestCase):
143 def make_result(self):
144 return ExtendedTestResult()
146 def make_runner(self, test):
147 return SynchronousDeferredRunTest(test, test.exception_handlers)
149 def test_success(self):
150 class SomeCase(TestCase):
151 def test_success(self):
152 return defer.succeed(None)
153 test = SomeCase('test_success')
154 runner = self.make_runner(test)
155 result = self.make_result()
156 runner.run(result)
157 self.assertThat(
158 result._events, Equals([
159 ('startTest', test),
160 ('addSuccess', test),
161 ('stopTest', test)]))
163 def test_failure(self):
164 class SomeCase(TestCase):
165 def test_failure(self):
166 return defer.maybeDeferred(self.fail, "Egads!")
167 test = SomeCase('test_failure')
168 runner = self.make_runner(test)
169 result = self.make_result()
170 runner.run(result)
171 self.assertThat(
172 [event[:2] for event in result._events], Equals([
173 ('startTest', test),
174 ('addFailure', test),
175 ('stopTest', test)]))
177 def test_setUp_followed_by_test(self):
178 class SomeCase(TestCase):
179 def setUp(self):
180 super(SomeCase, self).setUp()
181 return defer.succeed(None)
182 def test_failure(self):
183 return defer.maybeDeferred(self.fail, "Egads!")
184 test = SomeCase('test_failure')
185 runner = self.make_runner(test)
186 result = self.make_result()
187 runner.run(result)
188 self.assertThat(
189 [event[:2] for event in result._events], Equals([
190 ('startTest', test),
191 ('addFailure', test),
192 ('stopTest', test)]))
195 class TestAsynchronousDeferredRunTest(NeedsTwistedTestCase):
197 def make_reactor(self):
198 from twisted.internet import reactor
199 return reactor
201 def make_result(self):
202 return ExtendedTestResult()
204 def make_runner(self, test, timeout=None):
205 if timeout is None:
206 timeout = self.make_timeout()
207 return AsynchronousDeferredRunTest(
208 test, test.exception_handlers, timeout=timeout)
210 def make_timeout(self):
211 return 0.005
213 def test_setUp_returns_deferred_that_fires_later(self):
214 # setUp can return a Deferred that might fire at any time.
215 # AsynchronousDeferredRunTest will not go on to running the test until
216 # the Deferred returned by setUp actually fires.
217 call_log = []
218 marker = object()
219 d = defer.Deferred().addCallback(call_log.append)
220 class SomeCase(TestCase):
221 def setUp(self):
222 super(SomeCase, self).setUp()
223 call_log.append('setUp')
224 return d
225 def test_something(self):
226 call_log.append('test')
227 def fire_deferred():
228 self.assertThat(call_log, Equals(['setUp']))
229 d.callback(marker)
230 test = SomeCase('test_something')
231 timeout = self.make_timeout()
232 runner = self.make_runner(test, timeout=timeout)
233 result = self.make_result()
234 reactor = self.make_reactor()
235 reactor.callLater(timeout, fire_deferred)
236 runner.run(result)
237 self.assertThat(call_log, Equals(['setUp', marker, 'test']))
239 def test_calls_setUp_test_tearDown_in_sequence(self):
240 # setUp, the test method and tearDown can all return
241 # Deferreds. AsynchronousDeferredRunTest will make sure that each of
242 # these are run in turn, only going on to the next stage once the
243 # Deferred from the previous stage has fired.
244 call_log = []
245 a = defer.Deferred()
246 a.addCallback(lambda x: call_log.append('a'))
247 b = defer.Deferred()
248 b.addCallback(lambda x: call_log.append('b'))
249 c = defer.Deferred()
250 c.addCallback(lambda x: call_log.append('c'))
251 class SomeCase(TestCase):
252 def setUp(self):
253 super(SomeCase, self).setUp()
254 call_log.append('setUp')
255 return a
256 def test_success(self):
257 call_log.append('test')
258 return b
259 def tearDown(self):
260 super(SomeCase, self).tearDown()
261 call_log.append('tearDown')
262 return c
263 test = SomeCase('test_success')
264 timeout = self.make_timeout()
265 runner = self.make_runner(test, timeout)
266 result = self.make_result()
267 reactor = self.make_reactor()
268 def fire_a():
269 self.assertThat(call_log, Equals(['setUp']))
270 a.callback(None)
271 def fire_b():
272 self.assertThat(call_log, Equals(['setUp', 'a', 'test']))
273 b.callback(None)
274 def fire_c():
275 self.assertThat(
276 call_log, Equals(['setUp', 'a', 'test', 'b', 'tearDown']))
277 c.callback(None)
278 reactor.callLater(timeout * 0.25, fire_a)
279 reactor.callLater(timeout * 0.5, fire_b)
280 reactor.callLater(timeout * 0.75, fire_c)
281 runner.run(result)
282 self.assertThat(
283 call_log, Equals(['setUp', 'a', 'test', 'b', 'tearDown', 'c']))
285 def test_async_cleanups(self):
286 # Cleanups added with addCleanup can return
287 # Deferreds. AsynchronousDeferredRunTest will run each of them in
288 # turn.
289 class SomeCase(TestCase):
290 def test_whatever(self):
291 pass
292 test = SomeCase('test_whatever')
293 call_log = []
294 a = defer.Deferred().addCallback(lambda x: call_log.append('a'))
295 b = defer.Deferred().addCallback(lambda x: call_log.append('b'))
296 c = defer.Deferred().addCallback(lambda x: call_log.append('c'))
297 test.addCleanup(lambda: a)
298 test.addCleanup(lambda: b)
299 test.addCleanup(lambda: c)
300 def fire_a():
301 self.assertThat(call_log, Equals([]))
302 a.callback(None)
303 def fire_b():
304 self.assertThat(call_log, Equals(['a']))
305 b.callback(None)
306 def fire_c():
307 self.assertThat(call_log, Equals(['a', 'b']))
308 c.callback(None)
309 timeout = self.make_timeout()
310 reactor = self.make_reactor()
311 reactor.callLater(timeout * 0.25, fire_a)
312 reactor.callLater(timeout * 0.5, fire_b)
313 reactor.callLater(timeout * 0.75, fire_c)
314 runner = self.make_runner(test, timeout)
315 result = self.make_result()
316 runner.run(result)
317 self.assertThat(call_log, Equals(['a', 'b', 'c']))
319 def test_clean_reactor(self):
320 # If there's cruft left over in the reactor, the test fails.
321 reactor = self.make_reactor()
322 timeout = self.make_timeout()
323 class SomeCase(TestCase):
324 def test_cruft(self):
325 reactor.callLater(timeout * 10.0, lambda: None)
326 test = SomeCase('test_cruft')
327 runner = self.make_runner(test, timeout)
328 result = self.make_result()
329 runner.run(result)
330 self.assertThat(
331 [event[:2] for event in result._events],
332 Equals(
333 [('startTest', test),
334 ('addError', test),
335 ('stopTest', test)]))
336 error = result._events[1][2]
337 self.assertThat(error, KeysEqual('traceback', 'twisted-log'))
339 def test_exports_reactor(self):
340 # The reactor is set as an attribute on the test case.
341 reactor = self.make_reactor()
342 timeout = self.make_timeout()
343 class SomeCase(TestCase):
344 def test_cruft(self):
345 self.assertIs(reactor, self.reactor)
346 test = SomeCase('test_cruft')
347 runner = self.make_runner(test, timeout)
348 result = TestResult()
349 runner.run(result)
350 self.assertEqual([], result.errors)
351 self.assertEqual([], result.failures)
353 def test_unhandled_error_from_deferred(self):
354 # If there's a Deferred with an unhandled error, the test fails. Each
355 # unhandled error is reported with a separate traceback.
356 class SomeCase(TestCase):
357 def test_cruft(self):
358 # Note we aren't returning the Deferred so that the error will
359 # be unhandled.
360 defer.maybeDeferred(lambda: 1/0)
361 defer.maybeDeferred(lambda: 2/0)
362 test = SomeCase('test_cruft')
363 runner = self.make_runner(test)
364 result = self.make_result()
365 runner.run(result)
366 error = result._events[1][2]
367 result._events[1] = ('addError', test, None)
368 self.assertThat(result._events, Equals(
369 [('startTest', test),
370 ('addError', test, None),
371 ('stopTest', test)]))
372 self.assertThat(
373 error, KeysEqual(
374 'twisted-log',
375 'unhandled-error-in-deferred',
376 'unhandled-error-in-deferred-1',
379 def test_unhandled_error_from_deferred_combined_with_error(self):
380 # If there's a Deferred with an unhandled error, the test fails. Each
381 # unhandled error is reported with a separate traceback, and the error
382 # is still reported.
383 class SomeCase(TestCase):
384 def test_cruft(self):
385 # Note we aren't returning the Deferred so that the error will
386 # be unhandled.
387 defer.maybeDeferred(lambda: 1/0)
388 2 / 0
389 test = SomeCase('test_cruft')
390 runner = self.make_runner(test)
391 result = self.make_result()
392 runner.run(result)
393 error = result._events[1][2]
394 result._events[1] = ('addError', test, None)
395 self.assertThat(result._events, Equals(
396 [('startTest', test),
397 ('addError', test, None),
398 ('stopTest', test)]))
399 self.assertThat(
400 error, KeysEqual(
401 'traceback',
402 'twisted-log',
403 'unhandled-error-in-deferred',
406 @skipIf(os.name != "posix", "Sending SIGINT with os.kill is posix only")
407 def test_keyboard_interrupt_stops_test_run(self):
408 # If we get a SIGINT during a test run, the test stops and no more
409 # tests run.
410 SIGINT = getattr(signal, 'SIGINT', None)
411 if not SIGINT:
412 raise self.skipTest("SIGINT unavailable")
413 class SomeCase(TestCase):
414 def test_pause(self):
415 return defer.Deferred()
416 test = SomeCase('test_pause')
417 reactor = self.make_reactor()
418 timeout = self.make_timeout()
419 runner = self.make_runner(test, timeout * 5)
420 result = self.make_result()
421 reactor.callLater(timeout, os.kill, os.getpid(), SIGINT)
422 self.assertThat(lambda:runner.run(result),
423 Raises(MatchesException(KeyboardInterrupt)))
425 @skipIf(os.name != "posix", "Sending SIGINT with os.kill is posix only")
426 def test_fast_keyboard_interrupt_stops_test_run(self):
427 # If we get a SIGINT during a test run, the test stops and no more
428 # tests run.
429 SIGINT = getattr(signal, 'SIGINT', None)
430 if not SIGINT:
431 raise self.skipTest("SIGINT unavailable")
432 class SomeCase(TestCase):
433 def test_pause(self):
434 return defer.Deferred()
435 test = SomeCase('test_pause')
436 reactor = self.make_reactor()
437 timeout = self.make_timeout()
438 runner = self.make_runner(test, timeout * 5)
439 result = self.make_result()
440 reactor.callWhenRunning(os.kill, os.getpid(), SIGINT)
441 self.assertThat(lambda:runner.run(result),
442 Raises(MatchesException(KeyboardInterrupt)))
444 def test_timeout_causes_test_error(self):
445 # If a test times out, it reports itself as having failed with a
446 # TimeoutError.
447 class SomeCase(TestCase):
448 def test_pause(self):
449 return defer.Deferred()
450 test = SomeCase('test_pause')
451 runner = self.make_runner(test)
452 result = self.make_result()
453 runner.run(result)
454 error = result._events[1][2]
455 self.assertThat(
456 [event[:2] for event in result._events], Equals(
457 [('startTest', test),
458 ('addError', test),
459 ('stopTest', test)]))
460 self.assertIn('TimeoutError', str(error['traceback']))
462 def test_convenient_construction(self):
463 # As a convenience method, AsynchronousDeferredRunTest has a
464 # classmethod that returns an AsynchronousDeferredRunTest
465 # factory. This factory has the same API as the RunTest constructor.
466 reactor = object()
467 timeout = object()
468 handler = object()
469 factory = AsynchronousDeferredRunTest.make_factory(reactor, timeout)
470 runner = factory(self, [handler])
471 self.assertIs(reactor, runner._reactor)
472 self.assertIs(timeout, runner._timeout)
473 self.assertIs(self, runner.case)
474 self.assertEqual([handler], runner.handlers)
476 def test_use_convenient_factory(self):
477 # Make sure that the factory can actually be used.
478 factory = AsynchronousDeferredRunTest.make_factory()
479 class SomeCase(TestCase):
480 run_tests_with = factory
481 def test_something(self):
482 pass
483 case = SomeCase('test_something')
484 case.run()
486 def test_convenient_construction_default_reactor(self):
487 # As a convenience method, AsynchronousDeferredRunTest has a
488 # classmethod that returns an AsynchronousDeferredRunTest
489 # factory. This factory has the same API as the RunTest constructor.
490 reactor = object()
491 handler = object()
492 factory = AsynchronousDeferredRunTest.make_factory(reactor=reactor)
493 runner = factory(self, [handler])
494 self.assertIs(reactor, runner._reactor)
495 self.assertIs(self, runner.case)
496 self.assertEqual([handler], runner.handlers)
498 def test_convenient_construction_default_timeout(self):
499 # As a convenience method, AsynchronousDeferredRunTest has a
500 # classmethod that returns an AsynchronousDeferredRunTest
501 # factory. This factory has the same API as the RunTest constructor.
502 timeout = object()
503 handler = object()
504 factory = AsynchronousDeferredRunTest.make_factory(timeout=timeout)
505 runner = factory(self, [handler])
506 self.assertIs(timeout, runner._timeout)
507 self.assertIs(self, runner.case)
508 self.assertEqual([handler], runner.handlers)
510 def test_convenient_construction_default_debugging(self):
511 # As a convenience method, AsynchronousDeferredRunTest has a
512 # classmethod that returns an AsynchronousDeferredRunTest
513 # factory. This factory has the same API as the RunTest constructor.
514 handler = object()
515 factory = AsynchronousDeferredRunTest.make_factory(debug=True)
516 runner = factory(self, [handler])
517 self.assertIs(self, runner.case)
518 self.assertEqual([handler], runner.handlers)
519 self.assertEqual(True, runner._debug)
521 def test_deferred_error(self):
522 class SomeTest(TestCase):
523 def test_something(self):
524 return defer.maybeDeferred(lambda: 1/0)
525 test = SomeTest('test_something')
526 runner = self.make_runner(test)
527 result = self.make_result()
528 runner.run(result)
529 self.assertThat(
530 [event[:2] for event in result._events],
531 Equals([
532 ('startTest', test),
533 ('addError', test),
534 ('stopTest', test)]))
535 error = result._events[1][2]
536 self.assertThat(error, KeysEqual('traceback', 'twisted-log'))
538 def test_only_addError_once(self):
539 # Even if the reactor is unclean and the test raises an error and the
540 # cleanups raise errors, we only called addError once per test.
541 reactor = self.make_reactor()
542 class WhenItRains(TestCase):
543 def it_pours(self):
544 # Add a dirty cleanup.
545 self.addCleanup(lambda: 3 / 0)
546 # Dirty the reactor.
547 from twisted.internet.protocol import ServerFactory
548 reactor.listenTCP(0, ServerFactory())
549 # Unhandled error.
550 defer.maybeDeferred(lambda: 2 / 0)
551 # Actual error.
552 raise RuntimeError("Excess precipitation")
553 test = WhenItRains('it_pours')
554 runner = self.make_runner(test)
555 result = self.make_result()
556 runner.run(result)
557 self.assertThat(
558 [event[:2] for event in result._events],
559 Equals([
560 ('startTest', test),
561 ('addError', test),
562 ('stopTest', test)]))
563 error = result._events[1][2]
564 self.assertThat(
565 error, KeysEqual(
566 'traceback',
567 'traceback-1',
568 'traceback-2',
569 'twisted-log',
570 'unhandled-error-in-deferred',
573 def test_log_err_is_error(self):
574 # An error logged during the test run is recorded as an error in the
575 # tests.
576 class LogAnError(TestCase):
577 def test_something(self):
578 try:
580 except ZeroDivisionError:
581 f = failure.Failure()
582 log.err(f)
583 test = LogAnError('test_something')
584 runner = self.make_runner(test)
585 result = self.make_result()
586 runner.run(result)
587 self.assertThat(
588 [event[:2] for event in result._events],
589 Equals([
590 ('startTest', test),
591 ('addError', test),
592 ('stopTest', test)]))
593 error = result._events[1][2]
594 self.assertThat(error, KeysEqual('logged-error', 'twisted-log'))
596 def test_log_err_flushed_is_success(self):
597 # An error logged during the test run is recorded as an error in the
598 # tests.
599 class LogAnError(TestCase):
600 def test_something(self):
601 try:
603 except ZeroDivisionError:
604 f = failure.Failure()
605 log.err(f)
606 flush_logged_errors(ZeroDivisionError)
607 test = LogAnError('test_something')
608 runner = self.make_runner(test)
609 result = self.make_result()
610 runner.run(result)
611 self.assertThat(
612 result._events,
613 Equals([
614 ('startTest', test),
615 ('addSuccess', test, {'twisted-log': text_content('')}),
616 ('stopTest', test)]))
618 def test_log_in_details(self):
619 class LogAnError(TestCase):
620 def test_something(self):
621 log.msg("foo")
623 test = LogAnError('test_something')
624 runner = self.make_runner(test)
625 result = self.make_result()
626 runner.run(result)
627 self.assertThat(
628 [event[:2] for event in result._events],
629 Equals([
630 ('startTest', test),
631 ('addError', test),
632 ('stopTest', test)]))
633 error = result._events[1][2]
634 self.assertThat(error, KeysEqual('traceback', 'twisted-log'))
636 def test_debugging_unchanged_during_test_by_default(self):
637 debugging = [(defer.Deferred.debug, DelayedCall.debug)]
638 class SomeCase(TestCase):
639 def test_debugging_enabled(self):
640 debugging.append((defer.Deferred.debug, DelayedCall.debug))
641 test = SomeCase('test_debugging_enabled')
642 runner = AsynchronousDeferredRunTest(
643 test, handlers=test.exception_handlers,
644 reactor=self.make_reactor(), timeout=self.make_timeout())
645 runner.run(self.make_result())
646 self.assertEqual(debugging[0], debugging[1])
648 def test_debugging_enabled_during_test_with_debug_flag(self):
649 self.patch(defer.Deferred, 'debug', False)
650 self.patch(DelayedCall, 'debug', False)
651 debugging = []
652 class SomeCase(TestCase):
653 def test_debugging_enabled(self):
654 debugging.append((defer.Deferred.debug, DelayedCall.debug))
655 test = SomeCase('test_debugging_enabled')
656 runner = AsynchronousDeferredRunTest(
657 test, handlers=test.exception_handlers,
658 reactor=self.make_reactor(), timeout=self.make_timeout(),
659 debug=True)
660 runner.run(self.make_result())
661 self.assertEqual([(True, True)], debugging)
662 self.assertEqual(False, defer.Deferred.debug)
663 self.assertEqual(False, defer.Deferred.debug)
666 class TestAssertFailsWith(NeedsTwistedTestCase):
667 """Tests for `assert_fails_with`."""
669 if SynchronousDeferredRunTest is not None:
670 run_tests_with = SynchronousDeferredRunTest
672 def test_assert_fails_with_success(self):
673 # assert_fails_with fails the test if it's given a Deferred that
674 # succeeds.
675 marker = object()
676 d = assert_fails_with(defer.succeed(marker), RuntimeError)
677 def check_result(failure):
678 failure.trap(self.failureException)
679 self.assertThat(
680 str(failure.value),
681 Equals("RuntimeError not raised (%r returned)" % (marker,)))
682 d.addCallbacks(
683 lambda x: self.fail("Should not have succeeded"), check_result)
684 return d
686 def test_assert_fails_with_success_multiple_types(self):
687 # assert_fails_with fails the test if it's given a Deferred that
688 # succeeds.
689 marker = object()
690 d = assert_fails_with(
691 defer.succeed(marker), RuntimeError, ZeroDivisionError)
692 def check_result(failure):
693 failure.trap(self.failureException)
694 self.assertThat(
695 str(failure.value),
696 Equals("RuntimeError, ZeroDivisionError not raised "
697 "(%r returned)" % (marker,)))
698 d.addCallbacks(
699 lambda x: self.fail("Should not have succeeded"), check_result)
700 return d
702 def test_assert_fails_with_wrong_exception(self):
703 # assert_fails_with fails the test if it's given a Deferred that
704 # succeeds.
705 d = assert_fails_with(
706 defer.maybeDeferred(lambda: 1/0), RuntimeError, KeyboardInterrupt)
707 def check_result(failure):
708 failure.trap(self.failureException)
709 lines = str(failure.value).splitlines()
710 self.assertThat(
711 lines[:2],
712 Equals([
713 ("ZeroDivisionError raised instead of RuntimeError, "
714 "KeyboardInterrupt:"),
715 " Traceback (most recent call last):",
717 d.addCallbacks(
718 lambda x: self.fail("Should not have succeeded"), check_result)
719 return d
721 def test_assert_fails_with_expected_exception(self):
722 # assert_fails_with calls back with the value of the failure if it's
723 # one of the expected types of failures.
724 try:
726 except ZeroDivisionError:
727 f = failure.Failure()
728 d = assert_fails_with(defer.fail(f), ZeroDivisionError)
729 return d.addCallback(self.assertThat, Equals(f.value))
731 def test_custom_failure_exception(self):
732 # If assert_fails_with is passed a 'failureException' keyword
733 # argument, then it will raise that instead of `AssertionError`.
734 class CustomException(Exception):
735 pass
736 marker = object()
737 d = assert_fails_with(
738 defer.succeed(marker), RuntimeError,
739 failureException=CustomException)
740 def check_result(failure):
741 failure.trap(CustomException)
742 self.assertThat(
743 str(failure.value),
744 Equals("RuntimeError not raised (%r returned)" % (marker,)))
745 return d.addCallbacks(
746 lambda x: self.fail("Should not have succeeded"), check_result)
749 def test_suite():
750 from unittest import TestLoader, TestSuite
751 return TestSuite(
752 [TestLoader().loadTestsFromName(__name__),
753 make_integration_tests()])