provision: add get_dns_{forest,domain}_microsoft_dns_descriptor()
[Samba/gebeck_regimport.git] / lib / subunit / python / subunit / test_results.py
blob91c9bbdc1e4219e21957bd39e16a6c0e06043925
2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """TestResult helper classes used to by subunit."""
19 import csv
20 import datetime
22 import testtools
23 from testtools.compat import all
24 from testtools.content import (
25 text_content,
26 TracebackContent,
29 from subunit import iso8601
32 # NOT a TestResult, because we are implementing the interface, not inheriting
33 # it.
34 class TestResultDecorator(object):
35 """General pass-through decorator.
37 This provides a base that other TestResults can inherit from to
38 gain basic forwarding functionality. It also takes care of
39 handling the case where the target doesn't support newer methods
40 or features by degrading them.
41 """
43 # XXX: Since lp:testtools r250, this is in testtools. Once it's released,
44 # we should gut this and just use that.
46 def __init__(self, decorated):
47 """Create a TestResultDecorator forwarding to decorated."""
48 # Make every decorator degrade gracefully.
49 self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
51 def startTest(self, test):
52 return self.decorated.startTest(test)
54 def startTestRun(self):
55 return self.decorated.startTestRun()
57 def stopTest(self, test):
58 return self.decorated.stopTest(test)
60 def stopTestRun(self):
61 return self.decorated.stopTestRun()
63 def addError(self, test, err=None, details=None):
64 return self.decorated.addError(test, err, details=details)
66 def addFailure(self, test, err=None, details=None):
67 return self.decorated.addFailure(test, err, details=details)
69 def addSuccess(self, test, details=None):
70 return self.decorated.addSuccess(test, details=details)
72 def addSkip(self, test, reason=None, details=None):
73 return self.decorated.addSkip(test, reason, details=details)
75 def addExpectedFailure(self, test, err=None, details=None):
76 return self.decorated.addExpectedFailure(test, err, details=details)
78 def addUnexpectedSuccess(self, test, details=None):
79 return self.decorated.addUnexpectedSuccess(test, details=details)
81 def _get_failfast(self):
82 return getattr(self.decorated, 'failfast', False)
84 def _set_failfast(self, value):
85 self.decorated.failfast = value
86 failfast = property(_get_failfast, _set_failfast)
88 def progress(self, offset, whence):
89 return self.decorated.progress(offset, whence)
91 def wasSuccessful(self):
92 return self.decorated.wasSuccessful()
94 @property
95 def shouldStop(self):
96 return self.decorated.shouldStop
98 def stop(self):
99 return self.decorated.stop()
101 @property
102 def testsRun(self):
103 return self.decorated.testsRun
105 def tags(self, new_tags, gone_tags):
106 return self.decorated.tags(new_tags, gone_tags)
108 def time(self, a_datetime):
109 return self.decorated.time(a_datetime)
112 class HookedTestResultDecorator(TestResultDecorator):
113 """A TestResult which calls a hook on every event."""
115 def __init__(self, decorated):
116 self.super = super(HookedTestResultDecorator, self)
117 self.super.__init__(decorated)
119 def startTest(self, test):
120 self._before_event()
121 return self.super.startTest(test)
123 def startTestRun(self):
124 self._before_event()
125 return self.super.startTestRun()
127 def stopTest(self, test):
128 self._before_event()
129 return self.super.stopTest(test)
131 def stopTestRun(self):
132 self._before_event()
133 return self.super.stopTestRun()
135 def addError(self, test, err=None, details=None):
136 self._before_event()
137 return self.super.addError(test, err, details=details)
139 def addFailure(self, test, err=None, details=None):
140 self._before_event()
141 return self.super.addFailure(test, err, details=details)
143 def addSuccess(self, test, details=None):
144 self._before_event()
145 return self.super.addSuccess(test, details=details)
147 def addSkip(self, test, reason=None, details=None):
148 self._before_event()
149 return self.super.addSkip(test, reason, details=details)
151 def addExpectedFailure(self, test, err=None, details=None):
152 self._before_event()
153 return self.super.addExpectedFailure(test, err, details=details)
155 def addUnexpectedSuccess(self, test, details=None):
156 self._before_event()
157 return self.super.addUnexpectedSuccess(test, details=details)
159 def progress(self, offset, whence):
160 self._before_event()
161 return self.super.progress(offset, whence)
163 def wasSuccessful(self):
164 self._before_event()
165 return self.super.wasSuccessful()
167 @property
168 def shouldStop(self):
169 self._before_event()
170 return self.super.shouldStop
172 def stop(self):
173 self._before_event()
174 return self.super.stop()
176 def time(self, a_datetime):
177 self._before_event()
178 return self.super.time(a_datetime)
181 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
182 """Decorate a TestResult to add time events to a test run.
184 By default this will cause a time event before every test event,
185 but if explicit time data is being provided by the test run, then
186 this decorator will turn itself off to prevent causing confusion.
189 def __init__(self, decorated):
190 self._time = None
191 super(AutoTimingTestResultDecorator, self).__init__(decorated)
193 def _before_event(self):
194 time = self._time
195 if time is not None:
196 return
197 time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
198 self.decorated.time(time)
200 def progress(self, offset, whence):
201 return self.decorated.progress(offset, whence)
203 @property
204 def shouldStop(self):
205 return self.decorated.shouldStop
207 def time(self, a_datetime):
208 """Provide a timestamp for the current test activity.
210 :param a_datetime: If None, automatically add timestamps before every
211 event (this is the default behaviour if time() is not called at
212 all). If not None, pass the provided time onto the decorated
213 result object and disable automatic timestamps.
215 self._time = a_datetime
216 return self.decorated.time(a_datetime)
219 class TagsMixin(object):
221 def __init__(self):
222 self._clear_tags()
224 def _clear_tags(self):
225 self._global_tags = set(), set()
226 self._test_tags = None
228 def _get_active_tags(self):
229 global_new, global_gone = self._global_tags
230 if self._test_tags is None:
231 return set(global_new)
232 test_new, test_gone = self._test_tags
233 return global_new.difference(test_gone).union(test_new)
235 def _get_current_scope(self):
236 if self._test_tags:
237 return self._test_tags
238 return self._global_tags
240 def _flush_current_scope(self, tag_receiver):
241 new_tags, gone_tags = self._get_current_scope()
242 if new_tags or gone_tags:
243 tag_receiver.tags(new_tags, gone_tags)
244 if self._test_tags:
245 self._test_tags = set(), set()
246 else:
247 self._global_tags = set(), set()
249 def startTestRun(self):
250 self._clear_tags()
252 def startTest(self, test):
253 self._test_tags = set(), set()
255 def stopTest(self, test):
256 self._test_tags = None
258 def tags(self, new_tags, gone_tags):
259 """Handle tag instructions.
261 Adds and removes tags as appropriate. If a test is currently running,
262 tags are not affected for subsequent tests.
264 :param new_tags: Tags to add,
265 :param gone_tags: Tags to remove.
267 current_new_tags, current_gone_tags = self._get_current_scope()
268 current_new_tags.update(new_tags)
269 current_new_tags.difference_update(gone_tags)
270 current_gone_tags.update(gone_tags)
271 current_gone_tags.difference_update(new_tags)
274 class TagCollapsingDecorator(HookedTestResultDecorator, TagsMixin):
275 """Collapses many 'tags' calls into one where possible."""
277 def __init__(self, result):
278 super(TagCollapsingDecorator, self).__init__(result)
279 self._clear_tags()
281 def _before_event(self):
282 self._flush_current_scope(self.decorated)
284 def tags(self, new_tags, gone_tags):
285 TagsMixin.tags(self, new_tags, gone_tags)
288 class TimeCollapsingDecorator(HookedTestResultDecorator):
289 """Only pass on the first and last of a consecutive sequence of times."""
291 def __init__(self, decorated):
292 super(TimeCollapsingDecorator, self).__init__(decorated)
293 self._last_received_time = None
294 self._last_sent_time = None
296 def _before_event(self):
297 if self._last_received_time is None:
298 return
299 if self._last_received_time != self._last_sent_time:
300 self.decorated.time(self._last_received_time)
301 self._last_sent_time = self._last_received_time
302 self._last_received_time = None
304 def time(self, a_time):
305 # Don't upcall, because we don't want to call _before_event, it's only
306 # for non-time events.
307 if self._last_received_time is None:
308 self.decorated.time(a_time)
309 self._last_sent_time = a_time
310 self._last_received_time = a_time
313 def and_predicates(predicates):
314 """Return a predicate that is true iff all predicates are true."""
315 # XXX: Should probably be in testtools to be better used by matchers. jml
316 return lambda *args, **kwargs: all(p(*args, **kwargs) for p in predicates)
319 def make_tag_filter(with_tags, without_tags):
320 """Make a callback that checks tests against tags."""
322 with_tags = with_tags and set(with_tags) or None
323 without_tags = without_tags and set(without_tags) or None
325 def check_tags(test, outcome, err, details, tags):
326 if with_tags and not with_tags <= tags:
327 return False
328 if without_tags and bool(without_tags & tags):
329 return False
330 return True
332 return check_tags
335 class _PredicateFilter(TestResultDecorator, TagsMixin):
337 def __init__(self, result, predicate):
338 super(_PredicateFilter, self).__init__(result)
339 self._clear_tags()
340 self.decorated = TimeCollapsingDecorator(
341 TagCollapsingDecorator(self.decorated))
342 self._predicate = predicate
343 # The current test (for filtering tags)
344 self._current_test = None
345 # Has the current test been filtered (for outputting test tags)
346 self._current_test_filtered = None
347 # Calls to this result that we don't know whether to forward on yet.
348 self._buffered_calls = []
350 def filter_predicate(self, test, outcome, error, details):
351 return self._predicate(
352 test, outcome, error, details, self._get_active_tags())
354 def addError(self, test, err=None, details=None):
355 if (self.filter_predicate(test, 'error', err, details)):
356 self._buffered_calls.append(
357 ('addError', [test, err], {'details': details}))
358 else:
359 self._filtered()
361 def addFailure(self, test, err=None, details=None):
362 if (self.filter_predicate(test, 'failure', err, details)):
363 self._buffered_calls.append(
364 ('addFailure', [test, err], {'details': details}))
365 else:
366 self._filtered()
368 def addSkip(self, test, reason=None, details=None):
369 if (self.filter_predicate(test, 'skip', reason, details)):
370 self._buffered_calls.append(
371 ('addSkip', [test, reason], {'details': details}))
372 else:
373 self._filtered()
375 def addExpectedFailure(self, test, err=None, details=None):
376 if self.filter_predicate(test, 'expectedfailure', err, details):
377 self._buffered_calls.append(
378 ('addExpectedFailure', [test, err], {'details': details}))
379 else:
380 self._filtered()
382 def addUnexpectedSuccess(self, test, details=None):
383 self._buffered_calls.append(
384 ('addUnexpectedSuccess', [test], {'details': details}))
386 def addSuccess(self, test, details=None):
387 if (self.filter_predicate(test, 'success', None, details)):
388 self._buffered_calls.append(
389 ('addSuccess', [test], {'details': details}))
390 else:
391 self._filtered()
393 def _filtered(self):
394 self._current_test_filtered = True
396 def startTest(self, test):
397 """Start a test.
399 Not directly passed to the client, but used for handling of tags
400 correctly.
402 TagsMixin.startTest(self, test)
403 self._current_test = test
404 self._current_test_filtered = False
405 self._buffered_calls.append(('startTest', [test], {}))
407 def stopTest(self, test):
408 """Stop a test.
410 Not directly passed to the client, but used for handling of tags
411 correctly.
413 if not self._current_test_filtered:
414 for method, args, kwargs in self._buffered_calls:
415 getattr(self.decorated, method)(*args, **kwargs)
416 self.decorated.stopTest(test)
417 self._current_test = None
418 self._current_test_filtered = None
419 self._buffered_calls = []
420 TagsMixin.stopTest(self, test)
422 def tags(self, new_tags, gone_tags):
423 TagsMixin.tags(self, new_tags, gone_tags)
424 if self._current_test is not None:
425 self._buffered_calls.append(('tags', [new_tags, gone_tags], {}))
426 else:
427 return super(_PredicateFilter, self).tags(new_tags, gone_tags)
429 def time(self, a_time):
430 return self.decorated.time(a_time)
432 def id_to_orig_id(self, id):
433 if id.startswith("subunit.RemotedTestCase."):
434 return id[len("subunit.RemotedTestCase."):]
435 return id
438 class TestResultFilter(TestResultDecorator):
439 """A pyunit TestResult interface implementation which filters tests.
441 Tests that pass the filter are handed on to another TestResult instance
442 for further processing/reporting. To obtain the filtered results,
443 the other instance must be interrogated.
445 :ivar result: The result that tests are passed to after filtering.
446 :ivar filter_predicate: The callback run to decide whether to pass
447 a result.
450 def __init__(self, result, filter_error=False, filter_failure=False,
451 filter_success=True, filter_skip=False, filter_xfail=False,
452 filter_predicate=None, fixup_expected_failures=None):
453 """Create a FilterResult object filtering to result.
455 :param filter_error: Filter out errors.
456 :param filter_failure: Filter out failures.
457 :param filter_success: Filter out successful tests.
458 :param filter_skip: Filter out skipped tests.
459 :param filter_xfail: Filter out expected failure tests.
460 :param filter_predicate: A callable taking (test, outcome, err,
461 details, tags) and returning True if the result should be passed
462 through. err and details may be none if no error or extra
463 metadata is available. outcome is the name of the outcome such
464 as 'success' or 'failure'. tags is new in 0.0.8; 0.0.7 filters
465 are still supported but should be updated to accept the tags
466 parameter for efficiency.
467 :param fixup_expected_failures: Set of test ids to consider known
468 failing.
470 predicates = []
471 if filter_error:
472 predicates.append(
473 lambda t, outcome, e, d, tags: outcome != 'error')
474 if filter_failure:
475 predicates.append(
476 lambda t, outcome, e, d, tags: outcome != 'failure')
477 if filter_success:
478 predicates.append(
479 lambda t, outcome, e, d, tags: outcome != 'success')
480 if filter_skip:
481 predicates.append(
482 lambda t, outcome, e, d, tags: outcome != 'skip')
483 if filter_xfail:
484 predicates.append(
485 lambda t, outcome, e, d, tags: outcome != 'expectedfailure')
486 if filter_predicate is not None:
487 def compat(test, outcome, error, details, tags):
488 # 0.0.7 and earlier did not support the 'tags' parameter.
489 try:
490 return filter_predicate(
491 test, outcome, error, details, tags)
492 except TypeError:
493 return filter_predicate(test, outcome, error, details)
494 predicates.append(compat)
495 predicate = and_predicates(predicates)
496 super(TestResultFilter, self).__init__(
497 _PredicateFilter(result, predicate))
498 if fixup_expected_failures is None:
499 self._fixup_expected_failures = frozenset()
500 else:
501 self._fixup_expected_failures = fixup_expected_failures
503 def addError(self, test, err=None, details=None):
504 if self._failure_expected(test):
505 self.addExpectedFailure(test, err=err, details=details)
506 else:
507 super(TestResultFilter, self).addError(
508 test, err=err, details=details)
510 def addFailure(self, test, err=None, details=None):
511 if self._failure_expected(test):
512 self.addExpectedFailure(test, err=err, details=details)
513 else:
514 super(TestResultFilter, self).addFailure(
515 test, err=err, details=details)
517 def addSuccess(self, test, details=None):
518 if self._failure_expected(test):
519 self.addUnexpectedSuccess(test, details=details)
520 else:
521 super(TestResultFilter, self).addSuccess(test, details=details)
523 def _failure_expected(self, test):
524 return (test.id() in self._fixup_expected_failures)
527 class TestIdPrintingResult(testtools.TestResult):
529 def __init__(self, stream, show_times=False):
530 """Create a FilterResult object outputting to stream."""
531 super(TestIdPrintingResult, self).__init__()
532 self._stream = stream
533 self.failed_tests = 0
534 self.__time = None
535 self.show_times = show_times
536 self._test = None
537 self._test_duration = 0
539 def addError(self, test, err):
540 self.failed_tests += 1
541 self._test = test
543 def addFailure(self, test, err):
544 self.failed_tests += 1
545 self._test = test
547 def addSuccess(self, test):
548 self._test = test
550 def addSkip(self, test, reason=None, details=None):
551 self._test = test
553 def addUnexpectedSuccess(self, test, details=None):
554 self.failed_tests += 1
555 self._test = test
557 def addExpectedFailure(self, test, err=None, details=None):
558 self._test = test
560 def reportTest(self, test, duration):
561 if self.show_times:
562 seconds = duration.seconds
563 seconds += duration.days * 3600 * 24
564 seconds += duration.microseconds / 1000000.0
565 self._stream.write(test.id() + ' %0.3f\n' % seconds)
566 else:
567 self._stream.write(test.id() + '\n')
569 def startTest(self, test):
570 self._start_time = self._time()
572 def stopTest(self, test):
573 test_duration = self._time() - self._start_time
574 self.reportTest(self._test, test_duration)
576 def time(self, time):
577 self.__time = time
579 def _time(self):
580 return self.__time
582 def wasSuccessful(self):
583 "Tells whether or not this result was a success"
584 return self.failed_tests == 0
587 class TestByTestResult(testtools.TestResult):
588 """Call something every time a test completes."""
590 # XXX: In testtools since lp:testtools r249. Once that's released, just
591 # import that.
593 def __init__(self, on_test):
594 """Construct a ``TestByTestResult``.
596 :param on_test: A callable that take a test case, a status (one of
597 "success", "failure", "error", "skip", or "xfail"), a start time
598 (a ``datetime`` with timezone), a stop time, an iterable of tags,
599 and a details dict. Is called at the end of each test (i.e. on
600 ``stopTest``) with the accumulated values for that test.
602 super(TestByTestResult, self).__init__()
603 self._on_test = on_test
605 def startTest(self, test):
606 super(TestByTestResult, self).startTest(test)
607 self._start_time = self._now()
608 # There's no supported (i.e. tested) behaviour that relies on these
609 # being set, but it makes me more comfortable all the same. -- jml
610 self._status = None
611 self._details = None
612 self._stop_time = None
614 def stopTest(self, test):
615 self._stop_time = self._now()
616 super(TestByTestResult, self).stopTest(test)
617 self._on_test(
618 test=test,
619 status=self._status,
620 start_time=self._start_time,
621 stop_time=self._stop_time,
622 # current_tags is new in testtools 0.9.13.
623 tags=getattr(self, 'current_tags', None),
624 details=self._details)
626 def _err_to_details(self, test, err, details):
627 if details:
628 return details
629 return {'traceback': TracebackContent(err, test)}
631 def addSuccess(self, test, details=None):
632 super(TestByTestResult, self).addSuccess(test)
633 self._status = 'success'
634 self._details = details
636 def addFailure(self, test, err=None, details=None):
637 super(TestByTestResult, self).addFailure(test, err, details)
638 self._status = 'failure'
639 self._details = self._err_to_details(test, err, details)
641 def addError(self, test, err=None, details=None):
642 super(TestByTestResult, self).addError(test, err, details)
643 self._status = 'error'
644 self._details = self._err_to_details(test, err, details)
646 def addSkip(self, test, reason=None, details=None):
647 super(TestByTestResult, self).addSkip(test, reason, details)
648 self._status = 'skip'
649 if details is None:
650 details = {'reason': text_content(reason)}
651 elif reason:
652 # XXX: What if details already has 'reason' key?
653 details['reason'] = text_content(reason)
654 self._details = details
656 def addExpectedFailure(self, test, err=None, details=None):
657 super(TestByTestResult, self).addExpectedFailure(test, err, details)
658 self._status = 'xfail'
659 self._details = self._err_to_details(test, err, details)
661 def addUnexpectedSuccess(self, test, details=None):
662 super(TestByTestResult, self).addUnexpectedSuccess(test, details)
663 self._status = 'success'
664 self._details = details
667 class CsvResult(TestByTestResult):
669 def __init__(self, stream):
670 super(CsvResult, self).__init__(self._on_test)
671 self._write_row = csv.writer(stream).writerow
673 def _on_test(self, test, status, start_time, stop_time, tags, details):
674 self._write_row([test.id(), status, start_time, stop_time])
676 def startTestRun(self):
677 super(CsvResult, self).startTestRun()
678 self._write_row(['test', 'status', 'start_time', 'stop_time'])