python: use python3 style super statements
[samba.git] / python / samba / subunit / run.py
blobdc3f9316fcbeff23671692c7392579afac19893c
1 #!/usr/bin/env python3
3 # Simple subunit testrunner for python
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
6 # Cobbled together from testtools and subunit:
7 # Copyright (C) 2005-2011 Robert Collins <robertc@robertcollins.net>
8 # Copyright (c) 2008-2011 testtools developers.
10 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
11 # license at the users choice. A copy of both licenses are available in the
12 # project source as Apache-2.0 and BSD. You may not use this file except in
13 # compliance with one of these two licences.
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # license you chose for the specific language governing permissions and
19 # limitations under that license.
22 """Run a unittest testcase reporting results as Subunit.
24 $ python -m samba.subunit.run mylib.tests.test_suite
25 """
27 import datetime
28 import os
29 import sys
30 import unittest
33 class TestProtocolClient(unittest.TestResult):
34 """A TestResult which generates a subunit stream for a test run.
36 # Get a TestSuite or TestCase to run
37 suite = make_suite()
38 # Create a stream (any object with a 'write' method). This should accept
39 # bytes not strings: subunit is a byte orientated protocol.
40 stream = open('tests.log', 'wb')
41 # Create a subunit result object which will output to the stream
42 result = subunit.TestProtocolClient(stream)
43 # Optionally, to get timing data for performance analysis, wrap the
44 # serialiser with a timing decorator
45 result = subunit.test_results.AutoTimingTestResultDecorator(result)
46 # Run the test suite reporting to the subunit result object
47 suite.run(result)
48 # Close the stream.
49 stream.close()
50 """
52 def __init__(self, stream):
53 unittest.TestResult.__init__(self)
54 self._stream = stream
55 self.successes = []
57 def _addOutcome(self, outcome, test, errors=None):
58 """Report an outcome of test test.
60 :param outcome: A string describing the outcome - used as the
61 event name in the subunit stream.
62 :param errors: A list of strings describing the errors.
63 """
64 self._stream.write(("%s: " % outcome) + test.id())
65 if errors:
66 self._stream.write(" [\n")
67 for error in errors:
68 self._stream.write(error)
69 if not error.endswith('\n'):
70 self._stream.write('\n')
71 self._stream.write("]")
72 self._stream.write("\n")
74 def addSuccess(self, test):
75 """Report a success in a test."""
76 self.successes.append(test)
78 def startTest(self, test):
79 """Mark a test as starting its test run."""
80 super().startTest(test)
81 self._stream.write("test: " + test.id() + "\n")
82 self._stream.flush()
84 def stopTest(self, test):
85 """Mark a test as having finished its test run."""
86 super().stopTest(test)
87 self.writeOutcome(test)
89 def writeOutcome(self, test):
90 """Output the overall outcome for test test."""
91 err, self.errors = self._filterErrors(test, self.errors)
92 fail, self.failures = self._filterErrors(test, self.failures)
93 xfail, self.expectedFailures = self._filterErrors(test, self.expectedFailures)
94 skip, self.skipped = self._filterErrors(test, self.skipped)
95 success, self.successes = self._filterSuccesses(test, self.successes)
96 uxsuccess, self.unexpectedSuccesses = self._filterSuccesses(test, self.unexpectedSuccesses)
98 if err:
99 outcome = "error"
100 elif fail:
101 outcome = "failure"
102 elif skip:
103 outcome = "skip"
104 elif uxsuccess:
105 outcome = "uxsuccess"
106 elif xfail:
107 outcome = "xfail"
108 elif success:
109 outcome = "successful"
110 else:
111 outcome = None
113 if outcome:
114 self._addOutcome(outcome, test, errors=err+fail+skip+xfail)
116 self._stream.flush()
118 def _filterErrors(self, test, errors):
119 """Filter a list of errors by test test.
121 :param test: The test to filter by.
122 :param errors: A list of <test, error> pairs to filter.
124 :return: A pair whose first element is a list of strings containing
125 errors that apply to test test, and whose second element is a list
126 of the remaining elements.
128 filtered = []
129 unfiltered = []
131 for error in errors:
132 if error[0] is test:
133 filtered.append(error[1])
134 else:
135 unfiltered.append(error)
137 return (filtered, unfiltered)
139 def _filterSuccesses(self, test, successes):
140 """Filter a list of successes by test test.
142 :param test: The test to filter by.
143 :param successes: A list of tests to filter.
145 :return: A tuple whose first element is a boolean stating whether test
146 test was found in the list of successes, and whose second element is
147 a list of the remaining elements.
149 filtered = False
150 unfiltered = []
152 for success in successes:
153 if success is test:
154 filtered = True
155 else:
156 unfiltered.append(success)
158 return (filtered, unfiltered)
160 def time(self, a_datetime):
161 """Inform the client of the time.
163 ":param a_datetime: A datetime.datetime object.
165 time = a_datetime.astimezone(datetime.timezone.utc)
166 self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
167 time.year, time.month, time.day, time.hour, time.minute,
168 time.second, time.microsecond))
171 def _flatten_tests(suite_or_case, unpack_outer=False):
172 try:
173 tests = iter(suite_or_case)
174 except TypeError:
175 # Not iterable, assume it's a test case.
176 return [(suite_or_case.id(), suite_or_case)]
177 if (type(suite_or_case) in (unittest.TestSuite,) or
178 unpack_outer):
179 # Plain old test suite (or any others we may add).
180 result = []
181 for test in tests:
182 # Recurse to flatten.
183 result.extend(_flatten_tests(test))
184 return result
185 else:
186 # Find any old actual test and grab its id.
187 suite_id = None
188 tests = iterate_tests(suite_or_case)
189 for test in tests:
190 suite_id = test.id()
191 break
192 # If it has a sort_tests method, call that.
193 if getattr(suite_or_case, 'sort_tests', None) is not None:
194 suite_or_case.sort_tests()
195 return [(suite_id, suite_or_case)]
198 def sorted_tests(suite_or_case, unpack_outer=False):
199 """Sort suite_or_case while preserving non-vanilla TestSuites."""
200 tests = _flatten_tests(suite_or_case, unpack_outer=unpack_outer)
201 tests.sort()
202 return unittest.TestSuite([test for (sort_key, test) in tests])
205 def iterate_tests(test_suite_or_case):
206 """Iterate through all of the test cases in 'test_suite_or_case'."""
207 try:
208 suite = iter(test_suite_or_case)
209 except TypeError:
210 yield test_suite_or_case
211 else:
212 for test in suite:
213 for subtest in iterate_tests(test):
214 yield subtest
217 defaultTestLoader = unittest.defaultTestLoader
218 defaultTestLoaderCls = unittest.TestLoader
220 if getattr(defaultTestLoader, 'discover', None) is None:
221 try:
222 import discover
223 defaultTestLoader = discover.DiscoveringTestLoader()
224 defaultTestLoaderCls = discover.DiscoveringTestLoader
225 have_discover = True
226 except ImportError:
227 have_discover = False
228 else:
229 have_discover = True
232 ####################
233 # Taken from python 2.7 and slightly modified for compatibility with
234 # older versions. Delete when 2.7 is the oldest supported version.
235 # Modifications:
236 # - Use have_discover to raise an error if the user tries to use
237 # discovery on an old version and doesn't have discover installed.
238 # - If --catch is given check that installHandler is available, as
239 # it won't be on old python versions.
240 # - print calls have been been made single-source python3 compatible.
241 # - exception handling likewise.
242 # - The default help has been changed to USAGE_AS_MAIN and USAGE_FROM_MODULE
243 # removed.
244 # - A tweak has been added to detect 'python -m *.run' and use a
245 # better progName in that case.
246 # - self.module is more comprehensively set to None when being invoked from
247 # the commandline - __name__ is used as a sentinel value.
248 # - --list has been added which can list tests (should be upstreamed).
249 # - --load-list has been added which can reduce the tests used (should be
250 # upstreamed).
251 # - The limitation of using getopt is declared to the user.
252 # - http://bugs.python.org/issue16709 is worked around, by sorting tests when
253 # discover is used.
255 CATCHBREAK = " -c, --catch Catch control-C and display results\n"
256 BUFFEROUTPUT = " -b, --buffer Buffer stdout and stderr during test runs\n"
258 USAGE_AS_MAIN = """\
259 Usage: %(progName)s [options] [tests]
261 Options:
262 -h, --help Show this message
263 -v, --verbose Verbose output
264 -q, --quiet Minimal output
265 -l, --list List tests rather than executing them.
266 --load-list Specifies a file containing test ids, only tests matching
267 those ids are executed.
268 %(catchbreak)s%(buffer)s
269 Examples:
270 %(progName)s test_module - run tests from test_module
271 %(progName)s module.TestClass - run tests from module.TestClass
272 %(progName)s module.Class.test_method - run specified test method
274 All options must come before [tests]. [tests] can be a list of any number of
275 test modules, classes and test methods.
277 Alternative Usage: %(progName)s discover [options]
279 Options:
280 -v, --verbose Verbose output
281 s%(catchbreak)s%(buffer)s -s directory Directory to start discovery ('.' default)
282 -p pattern Pattern to match test files ('test*.py' default)
283 -t directory Top level directory of project (default to
284 start directory)
285 -l, --list List tests rather than executing them.
286 --load-list Specifies a file containing test ids, only tests matching
287 those ids are executed.
289 For test discovery all test modules must be importable from the top
290 level directory of the project.
294 # NOT a TestResult, because we are implementing the interface, not inheriting
295 # it.
296 class TestResultDecorator(object):
297 """General pass-through decorator.
299 This provides a base that other TestResults can inherit from to
300 gain basic forwarding functionality. It also takes care of
301 handling the case where the target doesn't support newer methods
302 or features by degrading them.
305 def __init__(self, decorated):
306 """Create a TestResultDecorator forwarding to decorated."""
307 # Make every decorator degrade gracefully.
308 self.decorated = decorated
310 def startTest(self, test):
311 return self.decorated.startTest(test)
313 def startTestRun(self):
314 return self.decorated.startTestRun()
316 def stopTest(self, test):
317 return self.decorated.stopTest(test)
319 def stopTestRun(self):
320 return self.decorated.stopTestRun()
322 def addError(self, test, err=None):
323 return self.decorated.addError(test, err)
325 def addFailure(self, test, err=None):
326 return self.decorated.addFailure(test, err)
328 def addSuccess(self, test):
329 return self.decorated.addSuccess(test)
331 def addSkip(self, test, reason=None):
332 return self.decorated.addSkip(test, reason)
334 def addExpectedFailure(self, test, err=None):
335 return self.decorated.addExpectedFailure(test, err)
337 def addUnexpectedSuccess(self, test):
338 return self.decorated.addUnexpectedSuccess(test)
340 def wasSuccessful(self):
341 return self.decorated.wasSuccessful()
343 @property
344 def shouldStop(self):
345 return self.decorated.shouldStop
347 def stop(self):
348 return self.decorated.stop()
350 @property
351 def testsRun(self):
352 return self.decorated.testsRun
354 def time(self, a_datetime):
355 return self.decorated.time(a_datetime)
358 class HookedTestResultDecorator(TestResultDecorator):
359 """A TestResult which calls a hook on every event."""
361 def __init__(self, decorated):
362 self.super = super()
363 self.super.__init__(decorated)
365 def startTest(self, test):
366 self._before_event()
367 return self.super.startTest(test)
369 def startTestRun(self):
370 self._before_event()
371 return self.super.startTestRun()
373 def stopTest(self, test):
374 self._before_event()
375 return self.super.stopTest(test)
377 def stopTestRun(self):
378 self._before_event()
379 return self.super.stopTestRun()
381 def addError(self, test, err=None):
382 self._before_event()
383 return self.super.addError(test, err)
385 def addFailure(self, test, err=None):
386 self._before_event()
387 return self.super.addFailure(test, err)
389 def addSuccess(self, test):
390 self._before_event()
391 return self.super.addSuccess(test)
393 def addSkip(self, test, reason=None):
394 self._before_event()
395 return self.super.addSkip(test, reason)
397 def addExpectedFailure(self, test, err=None):
398 self._before_event()
399 return self.super.addExpectedFailure(test, err)
401 def addUnexpectedSuccess(self, test):
402 self._before_event()
403 return self.super.addUnexpectedSuccess(test)
405 def wasSuccessful(self):
406 self._before_event()
407 return self.super.wasSuccessful()
409 @property
410 def shouldStop(self):
411 self._before_event()
412 return self.super.shouldStop
414 def stop(self):
415 self._before_event()
416 return self.super.stop()
418 def time(self, a_datetime):
419 self._before_event()
420 return self.super.time(a_datetime)
423 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
424 """Decorate a TestResult to add time events to a test run.
426 By default this will cause a time event before every test event,
427 but if explicit time data is being provided by the test run, then
428 this decorator will turn itself off to prevent causing confusion.
431 def __init__(self, decorated):
432 self._time = None
433 super().__init__(decorated)
435 def _before_event(self):
436 time = self._time
437 if time is not None:
438 return
439 time = datetime.datetime.now(tz=datetime.timezone.utc)
440 self.decorated.time(time)
442 @property
443 def shouldStop(self):
444 return self.decorated.shouldStop
446 def time(self, a_datetime):
447 """Provide a timestamp for the current test activity.
449 :param a_datetime: If None, automatically add timestamps before every
450 event (this is the default behaviour if time() is not called at
451 all). If not None, pass the provided time onto the decorated
452 result object and disable automatic timestamps.
454 self._time = a_datetime
455 return self.decorated.time(a_datetime)
458 class SubunitTestRunner(object):
460 def __init__(self, verbosity=None, buffer=None, stream=None):
461 """Create a SubunitTestRunner.
463 :param verbosity: Ignored.
464 :param buffer: Ignored.
466 self.stream = stream or sys.stdout
468 def run(self, test):
469 "Run the given test case or test suite."
470 result = TestProtocolClient(self.stream)
471 result = AutoTimingTestResultDecorator(result)
472 test(result)
473 return result
476 class TestProgram(object):
477 """A command-line program that runs a set of tests; this is primarily
478 for making test modules conveniently executable.
480 USAGE = USAGE_AS_MAIN
482 # defaults for testing
483 catchbreak = buffer = progName = None
485 def __init__(self, module=__name__, defaultTest=None, argv=None,
486 testRunner=None, testLoader=defaultTestLoader,
487 exit=True, verbosity=1, catchbreak=None,
488 buffer=None, stdout=None):
489 if module == __name__:
490 self.module = None
491 elif isinstance(module, str):
492 self.module = __import__(module)
493 for part in module.split('.')[1:]:
494 self.module = getattr(self.module, part)
495 else:
496 self.module = module
497 if argv is None:
498 argv = sys.argv
499 if stdout is None:
500 stdout = sys.stdout
501 if testRunner is None:
502 testRunner = SubunitTestRunner()
504 self.exit = exit
505 self.catchbreak = catchbreak
506 self.verbosity = verbosity
507 self.buffer = buffer
508 self.defaultTest = defaultTest
509 self.listtests = False
510 self.load_list = None
511 self.testRunner = testRunner
512 self.testLoader = testLoader
513 progName = argv[0]
514 if progName.endswith('%srun.py' % os.path.sep):
515 elements = progName.split(os.path.sep)
516 progName = '%s.run' % elements[-2]
517 else:
518 progName = os.path.basename(argv[0])
519 self.progName = progName
520 self.parseArgs(argv)
521 if self.load_list:
522 # TODO: preserve existing suites (like testresources does in
523 # OptimisingTestSuite.add, but with a standard protocol).
524 # This is needed because the load_tests hook allows arbitrary
525 # suites, even if that is rarely used.
526 source = open(self.load_list, 'rb')
527 try:
528 lines = source.readlines()
529 finally:
530 source.close()
531 test_ids = set(line.strip().decode('utf-8') for line in lines)
532 filtered = unittest.TestSuite()
533 for test in iterate_tests(self.test):
534 if test.id() in test_ids:
535 filtered.addTest(test)
536 self.test = filtered
537 if not self.listtests:
538 self.runTests()
539 else:
540 for test in iterate_tests(self.test):
541 stdout.write('%s\n' % test.id())
543 def parseArgs(self, argv):
544 if len(argv) > 1 and argv[1].lower() == 'discover':
545 self._do_discovery(argv[2:])
546 return
548 import getopt
549 long_opts = ['help', 'verbose', 'quiet', 'catch', 'buffer',
550 'list', 'load-list=']
551 try:
552 options, args = getopt.getopt(argv[1:], 'hHvqfcbl', long_opts)
553 for opt, value in options:
554 if opt in ('-h','-H','--help'):
555 self.usageExit()
556 if opt in ('-q','--quiet'):
557 self.verbosity = 0
558 if opt in ('-v','--verbose'):
559 self.verbosity = 2
560 if opt in ('-c','--catch'):
561 if self.catchbreak is None:
562 self.catchbreak = True
563 # Should this raise an exception if -c is not valid?
564 if opt in ('-b','--buffer'):
565 if self.buffer is None:
566 self.buffer = True
567 # Should this raise an exception if -b is not valid?
568 if opt in ('-l', '--list'):
569 self.listtests = True
570 if opt == '--load-list':
571 self.load_list = value
572 if len(args) == 0 and self.defaultTest is None:
573 # createTests will load tests from self.module
574 self.testNames = None
575 elif len(args) > 0:
576 self.testNames = args
577 else:
578 self.testNames = (self.defaultTest,)
579 self.createTests()
580 except getopt.error:
581 self.usageExit(sys.exc_info()[1])
583 def createTests(self):
584 if self.testNames is None:
585 self.test = self.testLoader.loadTestsFromModule(self.module)
586 else:
587 self.test = self.testLoader.loadTestsFromNames(self.testNames,
588 self.module)
590 def _do_discovery(self, argv, Loader=defaultTestLoaderCls):
591 # handle command line args for test discovery
592 if not have_discover:
593 raise AssertionError("Unable to use discovery, must use python 2.7 "
594 "or greater, or install the discover package.")
595 self.progName = '%s discover' % self.progName
596 import optparse
597 parser = optparse.OptionParser()
598 parser.prog = self.progName
599 parser.add_option('-v', '--verbose', dest='verbose', default=False,
600 help='Verbose output', action='store_true')
601 if self.catchbreak is not False:
602 parser.add_option('-c', '--catch', dest='catchbreak', default=False,
603 help='Catch ctrl-C and display results so far',
604 action='store_true')
605 if self.buffer is not False:
606 parser.add_option('-b', '--buffer', dest='buffer', default=False,
607 help='Buffer stdout and stderr during tests',
608 action='store_true')
609 parser.add_option('-s', '--start-directory', dest='start', default='.',
610 help="Directory to start discovery ('.' default)")
611 parser.add_option('-p', '--pattern', dest='pattern', default='test*.py',
612 help="Pattern to match tests ('test*.py' default)")
613 parser.add_option('-t', '--top-level-directory', dest='top', default=None,
614 help='Top level directory of project (defaults to start directory)')
615 parser.add_option('-l', '--list', dest='listtests', default=False, action="store_true",
616 help='List tests rather than running them.')
617 parser.add_option('--load-list', dest='load_list', default=None,
618 help='Specify a filename containing the test ids to use.')
620 options, args = parser.parse_args(argv)
621 if len(args) > 3:
622 self.usageExit()
624 for name, value in zip(('start', 'pattern', 'top'), args):
625 setattr(options, name, value)
627 # only set options from the parsing here
628 # if they weren't set explicitly in the constructor
629 if self.catchbreak is None:
630 self.catchbreak = options.catchbreak
631 if self.buffer is None:
632 self.buffer = options.buffer
633 self.listtests = options.listtests
634 self.load_list = options.load_list
636 if options.verbose:
637 self.verbosity = 2
639 start_dir = options.start
640 pattern = options.pattern
641 top_level_dir = options.top
643 loader = Loader()
644 # See http://bugs.python.org/issue16709
645 # While sorting here is intrusive, its better than being random.
646 # Rules for the sort:
647 # - standard suites are flattened, and the resulting tests sorted by
648 # id.
649 # - non-standard suites are preserved as-is, and sorted into position
650 # by the first test found by iterating the suite.
651 # We do this by a DSU process: flatten and grab a key, sort, strip the
652 # keys.
653 loaded = loader.discover(start_dir, pattern, top_level_dir)
654 self.test = sorted_tests(loaded)
656 def runTests(self):
657 if (self.catchbreak
658 and getattr(unittest, 'installHandler', None) is not None):
659 unittest.installHandler()
660 self.result = self.testRunner.run(self.test)
661 if self.exit:
662 sys.exit(not self.result.wasSuccessful())
664 def usageExit(self, msg=None):
665 if msg:
666 print (msg)
667 usage = {'progName': self.progName, 'catchbreak': '',
668 'buffer': ''}
669 if self.catchbreak is not False:
670 usage['catchbreak'] = CATCHBREAK
671 if self.buffer is not False:
672 usage['buffer'] = BUFFEROUTPUT
673 usage_text = self.USAGE % usage
674 usage_lines = usage_text.split('\n')
675 usage_lines.insert(2, "Run a test suite with a subunit reporter.")
676 usage_lines.insert(3, "")
677 print('\n'.join(usage_lines))
678 sys.exit(2)
681 if __name__ == '__main__':
682 TestProgram(module=None, argv=sys.argv, stdout=sys.stdout)