3 # Copyright (c) Facebook, Inc. and its affiliates.
6 This file contains the main module code for Python test programs.
26 # Hide warning about importing "imp"; remove once python2 is gone.
27 with warnings
.catch_warnings():
28 warnings
.filterwarnings("ignore", category
=DeprecationWarning)
32 from StringIO
import StringIO
34 from io
import StringIO
38 coverage
= None # type: ignore
40 from importlib
.machinery
import SourceFileLoader
42 SourceFileLoader
= None # type: ignore
45 class get_cpu_instr_counter(object):
52 EXIT_CODE_TEST_FAILURE
= 70
55 class TestStatus(object):
60 EXPECTED_FAILURE
= "SUCCESS"
61 UNEXPECTED_SUCCESS
= "FAILURE"
62 SKIPPED
= "ASSUMPTION_VIOLATION"
65 class PathMatcher(object):
66 def __init__(self
, include_patterns
, omit_patterns
):
67 self
.include_patterns
= include_patterns
68 self
.omit_patterns
= omit_patterns
72 Omit iff matches any of the omit_patterns or the include patterns are
73 not empty and none is matched
75 path
= os
.path
.realpath(path
)
76 return any(fnmatch
.fnmatch(path
, p
) for p
in self
.omit_patterns
) or (
78 and not any(fnmatch
.fnmatch(path
, p
) for p
in self
.include_patterns
)
81 def include(self
, path
):
82 return not self
.omit(path
)
85 class DebugWipeFinder(object):
87 PEP 302 finder that uses a DebugWipeLoader for all files which do not need
91 def __init__(self
, matcher
):
92 self
.matcher
= matcher
94 def find_module(self
, fullname
, path
=None):
95 _
, _
, basename
= fullname
.rpartition(".")
97 fd
, pypath
, (_
, _
, kind
) = imp
.find_module(basename
, path
)
99 # Finding without hooks using the imp module failed. One reason
100 # could be that there is a zip file on sys.path. The imp module
101 # does not support loading from there. Leave finding this module to
102 # the others finders in sys.meta_path.
105 if hasattr(fd
, "close"):
107 if kind
!= imp
.PY_SOURCE
:
109 if self
.matcher
.include(pypath
):
113 This is defined to match CPython's PyVarObject struct
116 class PyVarObject(ctypes
.Structure
):
118 ("ob_refcnt", ctypes
.c_long
),
119 ("ob_type", ctypes
.c_void_p
),
120 ("ob_size", ctypes
.c_ulong
),
123 class DebugWipeLoader(SourceFileLoader
):
125 PEP302 loader that zeros out debug information before execution
128 def get_code(self
, fullname
):
129 code
= super(DebugWipeLoader
, self
).get_code(fullname
)
132 # code.co_lnotab = b''
133 # But code objects are READONLY. Not to worry though; we'll
134 # directly modify CPython's object
135 code_impl
= PyVarObject
.from_address(id(code
.co_lnotab
))
136 code_impl
.ob_size
= 0
139 return DebugWipeLoader(fullname
, pypath
)
142 def optimize_for_coverage(cov
, include_patterns
, omit_patterns
):
144 We get better performance if we zero out debug information for files which
145 we're not interested in. Only available in CPython 3.3+
147 matcher
= PathMatcher(include_patterns
, omit_patterns
)
148 if SourceFileLoader
and platform
.python_implementation() == "CPython":
149 sys
.meta_path
.insert(0, DebugWipeFinder(matcher
))
152 class TeeStream(object):
153 def __init__(self
, *streams
):
154 self
._streams
= streams
156 def write(self
, data
):
157 for stream
in self
._streams
:
161 for stream
in self
._streams
:
168 class CallbackStream(object):
169 def __init__(self
, callback
, bytes_callback
=None, orig
=None):
170 self
._callback
= callback
171 self
._fileno
= orig
.fileno() if orig
else None
174 # - `encoding` is a string holding the encoding name
175 # - `errors` is a string holding the error-handling mode for encoding
176 # - `buffer` should look like an io.BufferedIOBase object
178 self
.errors
= orig
.errors
if orig
else None
180 # those members are only on the io.TextIOWrapper
181 self
.encoding
= orig
.encoding
if orig
else "UTF-8"
182 self
.buffer = CallbackStream(bytes_callback
, orig
=orig
)
184 def write(self
, data
):
197 class BuckTestResult(unittest
._TextTestResult
):
199 Our own TestResult class that outputs data in a format that can be easily
200 parsed by buck's test runner.
203 _instr_counter
= get_cpu_instr_counter()
206 self
, stream
, descriptions
, verbosity
, show_output
, main_program
, suite
208 super(BuckTestResult
, self
).__init
__(stream
, descriptions
, verbosity
)
209 self
._main
_program
= main_program
212 self
._current
_test
= None
213 self
._saved
_stdout
= sys
.stdout
214 self
._saved
_stderr
= sys
.stderr
215 self
._show
_output
= show_output
217 def getResults(self
):
220 def startTest(self
, test
):
221 super(BuckTestResult
, self
).startTest(test
)
223 # Pass in the real stdout and stderr filenos. We can't really do much
224 # here to intercept callers who directly operate on these fileno
226 sys
.stdout
= CallbackStream(
227 self
.addStdout
, self
.addStdoutBytes
, orig
=sys
.stdout
229 sys
.stderr
= CallbackStream(
230 self
.addStderr
, self
.addStderrBytes
, orig
=sys
.stderr
232 self
._current
_test
= test
233 self
._test
_start
_time
= time
.time()
234 self
._current
_status
= TestStatus
.ABORTED
236 self
._stacktrace
= None
239 self
._start
_instr
_count
= self
._instr
_counter
.read()
241 def _find_next_test(self
, suite
):
243 Find the next test that has not been run.
248 # We identify test suites by test that are iterable (as is done in
249 # the builtin python test harness). If we see one, recurse on it.
250 if hasattr(test
, "__iter__"):
251 test
= self
._find
_next
_test
(test
)
253 # The builtin python test harness sets test references to `None`
254 # after they have run, so we know we've found the next test up
255 # if it's not `None`.
259 def stopTest(self
, test
):
260 sys
.stdout
= self
._saved
_stdout
261 sys
.stderr
= self
._saved
_stderr
263 super(BuckTestResult
, self
).stopTest(test
)
265 # If a failure occured during module/class setup, then this "test" may
266 # actually be a `_ErrorHolder`, which doesn't contain explicit info
267 # about the upcoming test. Since we really only care about the test
268 # name field (i.e. `_testMethodName`), we use that to detect an actual
269 # test cases, and fall back to looking the test up from the suite
271 if not hasattr(test
, "_testMethodName"):
272 test
= self
._find
_next
_test
(self
._suite
)
275 "testCaseName": "{0}.{1}".format(
276 test
.__class
__.__module
__, test
.__class
__.__name
__
278 "testCase": test
._testMethodName
,
279 "type": self
._current
_status
,
280 "time": int((time
.time() - self
._test
_start
_time
) * 1000),
281 "message": os
.linesep
.join(self
._messages
),
282 "stacktrace": self
._stacktrace
,
283 "stdOut": self
._stdout
,
284 "stdErr": self
._stderr
,
287 # TestPilot supports an instruction count field.
288 if "TEST_PILOT" in os
.environ
:
289 result
["instrCount"] = (
290 int(self
._instr
_counter
.read() - self
._start
_instr
_count
),
293 self
._results
.append(result
)
294 self
._current
_test
= None
296 def stopTestRun(self
):
297 cov
= self
._main
_program
.get_coverage()
299 self
._results
.append({"coverage": cov
})
301 @contextlib.contextmanager
302 def _withTest(self
, test
):
307 def _setStatus(self
, test
, status
, message
=None, stacktrace
=None):
308 assert test
== self
._current
_test
309 self
._current
_status
= status
310 self
._stacktrace
= stacktrace
311 if message
is not None:
312 if message
.endswith(os
.linesep
):
313 message
= message
[:-1]
314 self
._messages
.append(message
)
316 def setStatus(self
, test
, status
, message
=None, stacktrace
=None):
317 # addError() may be called outside of a test if one of the shared
318 # fixtures (setUpClass/tearDownClass/setUpModule/tearDownModule)
321 # In this case, create a fake test result to record the error.
322 if self
._current
_test
is None:
323 with self
._withTest
(test
):
324 self
._setStatus
(test
, status
, message
, stacktrace
)
326 self
._setStatus
(test
, status
, message
, stacktrace
)
328 def setException(self
, test
, status
, excinfo
):
329 exctype
, value
, tb
= excinfo
333 "{0}: {1}".format(exctype
.__name
__, value
),
334 "".join(traceback
.format_tb(tb
)),
337 def addSuccess(self
, test
):
338 super(BuckTestResult
, self
).addSuccess(test
)
339 self
.setStatus(test
, TestStatus
.PASSED
)
341 def addError(self
, test
, err
):
342 super(BuckTestResult
, self
).addError(test
, err
)
343 self
.setException(test
, TestStatus
.ABORTED
, err
)
345 def addFailure(self
, test
, err
):
346 super(BuckTestResult
, self
).addFailure(test
, err
)
347 self
.setException(test
, TestStatus
.FAILED
, err
)
349 def addSkip(self
, test
, reason
):
350 super(BuckTestResult
, self
).addSkip(test
, reason
)
351 self
.setStatus(test
, TestStatus
.SKIPPED
, "Skipped: %s" % (reason
,))
353 def addExpectedFailure(self
, test
, err
):
354 super(BuckTestResult
, self
).addExpectedFailure(test
, err
)
355 self
.setException(test
, TestStatus
.EXPECTED_FAILURE
, err
)
357 def addUnexpectedSuccess(self
, test
):
358 super(BuckTestResult
, self
).addUnexpectedSuccess(test
)
359 self
.setStatus(test
, TestStatus
.UNEXPECTED_SUCCESS
, "Unexpected success")
361 def addStdout(self
, val
):
363 if self
._show
_output
:
364 self
._saved
_stdout
.write(val
)
365 self
._saved
_stdout
.flush()
367 def addStdoutBytes(self
, val
):
368 string
= val
.decode("utf-8", errors
="backslashreplace")
369 self
.addStdout(string
)
371 def addStderr(self
, val
):
373 if self
._show
_output
:
374 self
._saved
_stderr
.write(val
)
375 self
._saved
_stderr
.flush()
377 def addStderrBytes(self
, val
):
378 string
= val
.decode("utf-8", errors
="backslashreplace")
379 self
.addStderr(string
)
382 class BuckTestRunner(unittest
.TextTestRunner
):
383 def __init__(self
, main_program
, suite
, show_output
=True, **kwargs
):
384 super(BuckTestRunner
, self
).__init
__(**kwargs
)
385 self
.show_output
= show_output
386 self
._main
_program
= main_program
389 def _makeResult(self
):
390 return BuckTestResult(
400 def _format_test_name(test_class
, attrname
):
401 return "{0}.{1}.{2}".format(test_class
.__module
__, test_class
.__name
__, attrname
)
404 class StderrLogHandler(logging
.StreamHandler
):
406 This class is very similar to logging.StreamHandler, except that it
407 always uses the current sys.stderr object.
409 StreamHandler caches the current sys.stderr object when it is constructed.
410 This makes it behave poorly in unit tests, which may replace sys.stderr
411 with a StringIO buffer during tests. The StreamHandler will continue using
412 the old sys.stderr object instead of the desired StringIO buffer.
416 logging
.Handler
.__init
__(self
)
423 class RegexTestLoader(unittest
.TestLoader
):
424 def __init__(self
, regex
=None):
426 super(RegexTestLoader
, self
).__init
__()
428 def getTestCaseNames(self
, testCaseClass
):
430 Return a sorted sequence of method names found within testCaseClass
433 testFnNames
= super(RegexTestLoader
, self
).getTestCaseNames(testCaseClass
)
434 if self
.regex
is None:
436 robj
= re
.compile(self
.regex
)
438 for attrname
in testFnNames
:
439 fullname
= _format_test_name(testCaseClass
, attrname
)
440 if robj
.search(fullname
):
441 matched
.append(attrname
)
445 class Loader(object):
447 suiteClass
= unittest
.TestSuite
449 def __init__(self
, modules
, regex
=None):
450 self
.modules
= modules
454 loader
= RegexTestLoader(self
.regex
)
455 test_suite
= self
.suiteClass()
456 for module_name
in self
.modules
:
457 __import__(module_name
, level
=0)
458 module
= sys
.modules
[module_name
]
459 module_suite
= loader
.loadTestsFromModule(module
)
460 test_suite
.addTest(module_suite
)
463 def load_args(self
, args
):
464 loader
= RegexTestLoader(self
.regex
)
468 suite
= loader
.loadTestsFromName(arg
)
469 # loadTestsFromName() can only process names that refer to
470 # individual test functions or modules. It can't process package
471 # names. If there were no module/function matches, check to see if
472 # this looks like a package name.
473 if suite
.countTestCases() != 0:
477 # Load all modules whose name is <arg>.<something>
479 for module
in self
.modules
:
480 if module
.startswith(prefix
):
481 suite
= loader
.loadTestsFromName(module
)
484 return loader
.suiteClass(suites
)
492 pragma:.*no${PLATFORM}
493 pragma:.*no${PY_IMPL}${PY_MAJOR}${PY_MINOR}
494 pragma:.*no${PY_IMPL}${PY_MAJOR}
495 pragma:.*nopy${PY_MAJOR}
496 pragma:.*nopy${PY_MAJOR}${PY_MINOR}
500 class MainProgram(object):
502 This class implements the main program. It can be subclassed by
503 users who wish to customize some parts of the main program.
504 (Adding additional command line options, customizing test loading, etc.)
507 DEFAULT_VERBOSITY
= 2
509 def __init__(self
, argv
):
510 self
.init_option_parser()
511 self
.parse_options(argv
)
514 def init_option_parser(self
):
515 usage
= "%prog [options] [TEST] ..."
516 op
= optparse
.OptionParser(usage
=usage
, add_help_option
=False)
517 self
.option_parser
= op
522 action
="store_false",
524 help="Suppress data that tests print to stdout/stderr, and only "
525 "show it if the test fails.",
530 help="Write results to a file in a JSON format to be read by Buck",
537 help="Stop after the first failure",
545 help="List tests and exit",
551 help="Regex to apply to tests, to only run those tests",
554 "--collect-coverage",
557 help="Collect test coverage information",
560 "--coverage-include",
562 help='File globs to include in converage (split by ",")',
567 help='File globs to omit from converage (split by ",")',
572 metavar
="<category>=<level>",
574 help="Configure log levels for specific logger categories",
581 help="Decrease the verbosity (may be specified multiple times)",
587 default
=self
.DEFAULT_VERBOSITY
,
588 help="Increase the verbosity (may be specified multiple times)",
591 "-?", "--help", action
="help", help="Show this help message and exit"
594 def parse_options(self
, argv
):
595 self
.options
, self
.test_args
= self
.option_parser
.parse_args(argv
[1:])
596 self
.options
.verbosity
-= self
.options
.quiet
598 if self
.options
.collect_coverage
and coverage
is None:
599 self
.option_parser
.error("coverage module is not available")
600 self
.options
.coverage_include
= self
.options
.coverage_include
.split(",")
601 if self
.options
.coverage_omit
== "":
602 self
.options
.coverage_omit
= []
604 self
.options
.coverage_omit
= self
.options
.coverage_omit
.split(",")
606 def setup_logging(self
):
607 # Configure the root logger to log at INFO level.
608 # This is similar to logging.basicConfig(), but uses our
609 # StderrLogHandler instead of a StreamHandler.
610 fmt
= logging
.Formatter("%(pathname)s:%(lineno)s: %(message)s")
611 log_handler
= StderrLogHandler()
612 log_handler
.setFormatter(fmt
)
613 root_logger
= logging
.getLogger()
614 root_logger
.addHandler(log_handler
)
615 root_logger
.setLevel(logging
.INFO
)
618 "debug": logging
.DEBUG
,
619 "info": logging
.INFO
,
620 "warn": logging
.WARNING
,
621 "warning": logging
.WARNING
,
622 "error": logging
.ERROR
,
623 "critical": logging
.CRITICAL
,
624 "fatal": logging
.FATAL
,
627 for value
in self
.options
.logger
:
628 parts
= value
.rsplit("=", 1)
630 self
.option_parser
.error(
631 "--logger argument must be of the "
632 "form <name>=<level>: %s" % value
635 level_name
= parts
[1].lower()
636 level
= level_names
.get(level_name
)
638 self
.option_parser
.error(
639 "invalid log level %r for log " "category %s" % (parts
[1], name
)
641 logging
.getLogger(name
).setLevel(level
)
643 def create_loader(self
):
644 import __test_modules__
646 return Loader(__test_modules__
.TEST_MODULES
, self
.options
.regex
)
648 def load_tests(self
):
649 loader
= self
.create_loader()
650 if self
.options
.collect_coverage
:
651 self
.start_coverage()
652 include
= self
.options
.coverage_include
653 omit
= self
.options
.coverage_omit
654 if include
and "*" not in include
:
655 optimize_for_coverage(self
.cov
, include
, omit
)
658 suite
= loader
.load_args(self
.test_args
)
660 suite
= loader
.load_all()
661 if self
.options
.collect_coverage
:
665 def get_tests(self
, test_suite
):
668 for test
in test_suite
:
669 if isinstance(test
, unittest
.TestSuite
):
670 tests
.extend(self
.get_tests(test
))
677 test_suite
= self
.load_tests()
679 if self
.options
.list:
680 for test
in self
.get_tests(test_suite
):
681 method_name
= getattr(test
, "_testMethodName", "")
682 name
= _format_test_name(test
.__class
__, method_name
)
684 return EXIT_CODE_SUCCESS
686 result
= self
.run_tests(test_suite
)
687 if self
.options
.output
is not None:
688 with
open(self
.options
.output
, "w") as f
:
689 json
.dump(result
.getResults(), f
, indent
=4, sort_keys
=True)
690 if not result
.wasSuccessful():
691 return EXIT_CODE_TEST_FAILURE
692 return EXIT_CODE_SUCCESS
694 def run_tests(self
, test_suite
):
695 # Install a signal handler to catch Ctrl-C and display the results
696 # (but only if running >2.6).
697 if sys
.version_info
[0] > 2 or sys
.version_info
[1] > 6:
698 unittest
.installHandler()
701 runner
= BuckTestRunner(
704 verbosity
=self
.options
.verbosity
,
705 show_output
=self
.options
.show_output
,
707 result
= runner
.run(test_suite
)
709 if self
.options
.collect_coverage
and self
.options
.show_output
:
712 self
.cov
.report(file=sys
.stdout
)
713 except coverage
.misc
.CoverageException
:
714 print("No lines were covered, potentially restricted by file filters")
718 def get_abbr_impl(self
):
719 """Return abbreviated implementation name."""
720 impl
= platform
.python_implementation()
723 elif impl
== "Jython":
725 elif impl
== "IronPython":
727 elif impl
== "CPython":
730 raise RuntimeError("unknown python runtime")
732 def start_coverage(self
):
733 if not self
.options
.collect_coverage
:
736 with tempfile
.NamedTemporaryFile("w", delete
=False) as coverage_ini
:
737 coverage_ini
.write(_COVERAGE_INI
)
738 self
._coverage
_ini
_path
= coverage_ini
.name
740 # Keep the original working dir in case tests use os.chdir
741 self
._original
_working
_dir
= os
.getcwd()
743 # for coverage config ignores by platform/python version
744 os
.environ
["PLATFORM"] = sys
.platform
745 os
.environ
["PY_IMPL"] = self
.get_abbr_impl()
746 os
.environ
["PY_MAJOR"] = str(sys
.version_info
.major
)
747 os
.environ
["PY_MINOR"] = str(sys
.version_info
.minor
)
749 self
.cov
= coverage
.Coverage(
750 include
=self
.options
.coverage_include
,
751 omit
=self
.options
.coverage_omit
,
752 config_file
=coverage_ini
.name
,
757 def get_coverage(self
):
758 if not self
.options
.collect_coverage
:
762 os
.remove(self
._coverage
_ini
_path
)
764 pass # Better to litter than to fail the test
766 # Switch back to the original working directory.
767 os
.chdir(self
._original
_working
_dir
)
775 self
.cov
.report(file=f
)
776 lines
= f
.getvalue().split("\n")
777 except coverage
.misc
.CoverageException
:
778 # Nothing was covered. That's fine by us
781 # N.B.: the format of the coverage library's output differs
782 # depending on whether one or more files are in the results
783 for line
in lines
[2:]:
784 if line
.strip("-") == "":
787 analysis
= self
.cov
.analysis2(r
)
788 covString
= self
.convert_to_diff_cov_str(analysis
)
790 result
[r
] = covString
794 def convert_to_diff_cov_str(self
, analysis
):
795 # Info on the format of analysis:
796 # http://nedbatchelder.com/code/coverage/api.html
800 analysis
[1][-1] if len(analysis
[1]) else 0,
801 analysis
[2][-1] if len(analysis
[2]) else 0,
802 analysis
[3][-1] if len(analysis
[3]) else 0,
804 lines
= ["N"] * numLines
805 for l
in analysis
[1]:
807 for l
in analysis
[2]:
809 for l
in analysis
[3]:
811 return "".join(lines
)
815 return MainProgram(sys
.argv
).run()
818 if __name__
== "__main__":
819 sys
.exit(main(sys
.argv
))