Re-sync with internal repository
[hiphop-php.git] / third-party / watchman / src / build / fbcode_builder / CMake / fb_py_test_main.py
blobe9ae5dd028a6b4d822af1c33b3b05f13fbe9746f
1 #!/usr/bin/env python
3 # Copyright (c) Facebook, Inc. and its affiliates.
5 """
6 This file contains the main module code for Python test programs.
7 """
10 import contextlib
11 import ctypes
12 import fnmatch
13 import json
14 import logging
15 import optparse
16 import os
17 import platform
18 import re
19 import sys
20 import tempfile
21 import time
22 import traceback
23 import unittest
24 import warnings
26 # Hide warning about importing "imp"; remove once python2 is gone.
27 with warnings.catch_warnings():
28 warnings.filterwarnings("ignore", category=DeprecationWarning)
29 import imp
31 try:
32 from StringIO import StringIO
33 except ImportError:
34 from io import StringIO
35 try:
36 import coverage
37 except ImportError:
38 coverage = None # type: ignore
39 try:
40 from importlib.machinery import SourceFileLoader
41 except ImportError:
42 SourceFileLoader = None # type: ignore
45 class get_cpu_instr_counter(object):
46 def read(self):
47 # TODO
48 return 0
51 EXIT_CODE_SUCCESS = 0
52 EXIT_CODE_TEST_FAILURE = 70
55 class TestStatus(object):
57 ABORTED = "FAILURE"
58 PASSED = "SUCCESS"
59 FAILED = "FAILURE"
60 EXPECTED_FAILURE = "SUCCESS"
61 UNEXPECTED_SUCCESS = "FAILURE"
62 SKIPPED = "ASSUMPTION_VIOLATION"
65 class PathMatcher(object):
66 def __init__(self, include_patterns, omit_patterns):
67 self.include_patterns = include_patterns
68 self.omit_patterns = omit_patterns
70 def omit(self, path):
71 """
72 Omit iff matches any of the omit_patterns or the include patterns are
73 not empty and none is matched
74 """
75 path = os.path.realpath(path)
76 return any(fnmatch.fnmatch(path, p) for p in self.omit_patterns) or (
77 self.include_patterns
78 and not any(fnmatch.fnmatch(path, p) for p in self.include_patterns)
81 def include(self, path):
82 return not self.omit(path)
85 class DebugWipeFinder(object):
86 """
87 PEP 302 finder that uses a DebugWipeLoader for all files which do not need
88 coverage
89 """
91 def __init__(self, matcher):
92 self.matcher = matcher
94 def find_module(self, fullname, path=None):
95 _, _, basename = fullname.rpartition(".")
96 try:
97 fd, pypath, (_, _, kind) = imp.find_module(basename, path)
98 except Exception:
99 # Finding without hooks using the imp module failed. One reason
100 # could be that there is a zip file on sys.path. The imp module
101 # does not support loading from there. Leave finding this module to
102 # the others finders in sys.meta_path.
103 return None
105 if hasattr(fd, "close"):
106 fd.close()
107 if kind != imp.PY_SOURCE:
108 return None
109 if self.matcher.include(pypath):
110 return None
113 This is defined to match CPython's PyVarObject struct
116 class PyVarObject(ctypes.Structure):
117 _fields_ = [
118 ("ob_refcnt", ctypes.c_long),
119 ("ob_type", ctypes.c_void_p),
120 ("ob_size", ctypes.c_ulong),
123 class DebugWipeLoader(SourceFileLoader):
125 PEP302 loader that zeros out debug information before execution
128 def get_code(self, fullname):
129 code = super(DebugWipeLoader, self).get_code(fullname)
130 if code:
131 # Ideally we'd do
132 # code.co_lnotab = b''
133 # But code objects are READONLY. Not to worry though; we'll
134 # directly modify CPython's object
135 code_impl = PyVarObject.from_address(id(code.co_lnotab))
136 code_impl.ob_size = 0
137 return code
139 return DebugWipeLoader(fullname, pypath)
142 def optimize_for_coverage(cov, include_patterns, omit_patterns):
144 We get better performance if we zero out debug information for files which
145 we're not interested in. Only available in CPython 3.3+
147 matcher = PathMatcher(include_patterns, omit_patterns)
148 if SourceFileLoader and platform.python_implementation() == "CPython":
149 sys.meta_path.insert(0, DebugWipeFinder(matcher))
152 class TeeStream(object):
153 def __init__(self, *streams):
154 self._streams = streams
156 def write(self, data):
157 for stream in self._streams:
158 stream.write(data)
160 def flush(self):
161 for stream in self._streams:
162 stream.flush()
164 def isatty(self):
165 return False
168 class CallbackStream(object):
169 def __init__(self, callback, bytes_callback=None, orig=None):
170 self._callback = callback
171 self._fileno = orig.fileno() if orig else None
173 # Python 3 APIs:
174 # - `encoding` is a string holding the encoding name
175 # - `errors` is a string holding the error-handling mode for encoding
176 # - `buffer` should look like an io.BufferedIOBase object
178 self.errors = orig.errors if orig else None
179 if bytes_callback:
180 # those members are only on the io.TextIOWrapper
181 self.encoding = orig.encoding if orig else "UTF-8"
182 self.buffer = CallbackStream(bytes_callback, orig=orig)
184 def write(self, data):
185 self._callback(data)
187 def flush(self):
188 pass
190 def isatty(self):
191 return False
193 def fileno(self):
194 return self._fileno
197 class BuckTestResult(unittest._TextTestResult):
199 Our own TestResult class that outputs data in a format that can be easily
200 parsed by buck's test runner.
203 _instr_counter = get_cpu_instr_counter()
205 def __init__(
206 self, stream, descriptions, verbosity, show_output, main_program, suite
208 super(BuckTestResult, self).__init__(stream, descriptions, verbosity)
209 self._main_program = main_program
210 self._suite = suite
211 self._results = []
212 self._current_test = None
213 self._saved_stdout = sys.stdout
214 self._saved_stderr = sys.stderr
215 self._show_output = show_output
217 def getResults(self):
218 return self._results
220 def startTest(self, test):
221 super(BuckTestResult, self).startTest(test)
223 # Pass in the real stdout and stderr filenos. We can't really do much
224 # here to intercept callers who directly operate on these fileno
225 # objects.
226 sys.stdout = CallbackStream(
227 self.addStdout, self.addStdoutBytes, orig=sys.stdout
229 sys.stderr = CallbackStream(
230 self.addStderr, self.addStderrBytes, orig=sys.stderr
232 self._current_test = test
233 self._test_start_time = time.time()
234 self._current_status = TestStatus.ABORTED
235 self._messages = []
236 self._stacktrace = None
237 self._stdout = ""
238 self._stderr = ""
239 self._start_instr_count = self._instr_counter.read()
241 def _find_next_test(self, suite):
243 Find the next test that has not been run.
246 for test in suite:
248 # We identify test suites by test that are iterable (as is done in
249 # the builtin python test harness). If we see one, recurse on it.
250 if hasattr(test, "__iter__"):
251 test = self._find_next_test(test)
253 # The builtin python test harness sets test references to `None`
254 # after they have run, so we know we've found the next test up
255 # if it's not `None`.
256 if test is not None:
257 return test
259 def stopTest(self, test):
260 sys.stdout = self._saved_stdout
261 sys.stderr = self._saved_stderr
263 super(BuckTestResult, self).stopTest(test)
265 # If a failure occured during module/class setup, then this "test" may
266 # actually be a `_ErrorHolder`, which doesn't contain explicit info
267 # about the upcoming test. Since we really only care about the test
268 # name field (i.e. `_testMethodName`), we use that to detect an actual
269 # test cases, and fall back to looking the test up from the suite
270 # otherwise.
271 if not hasattr(test, "_testMethodName"):
272 test = self._find_next_test(self._suite)
274 result = {
275 "testCaseName": "{0}.{1}".format(
276 test.__class__.__module__, test.__class__.__name__
278 "testCase": test._testMethodName,
279 "type": self._current_status,
280 "time": int((time.time() - self._test_start_time) * 1000),
281 "message": os.linesep.join(self._messages),
282 "stacktrace": self._stacktrace,
283 "stdOut": self._stdout,
284 "stdErr": self._stderr,
287 # TestPilot supports an instruction count field.
288 if "TEST_PILOT" in os.environ:
289 result["instrCount"] = (
290 int(self._instr_counter.read() - self._start_instr_count),
293 self._results.append(result)
294 self._current_test = None
296 def stopTestRun(self):
297 cov = self._main_program.get_coverage()
298 if cov is not None:
299 self._results.append({"coverage": cov})
301 @contextlib.contextmanager
302 def _withTest(self, test):
303 self.startTest(test)
304 yield
305 self.stopTest(test)
307 def _setStatus(self, test, status, message=None, stacktrace=None):
308 assert test == self._current_test
309 self._current_status = status
310 self._stacktrace = stacktrace
311 if message is not None:
312 if message.endswith(os.linesep):
313 message = message[:-1]
314 self._messages.append(message)
316 def setStatus(self, test, status, message=None, stacktrace=None):
317 # addError() may be called outside of a test if one of the shared
318 # fixtures (setUpClass/tearDownClass/setUpModule/tearDownModule)
319 # throws an error.
321 # In this case, create a fake test result to record the error.
322 if self._current_test is None:
323 with self._withTest(test):
324 self._setStatus(test, status, message, stacktrace)
325 else:
326 self._setStatus(test, status, message, stacktrace)
328 def setException(self, test, status, excinfo):
329 exctype, value, tb = excinfo
330 self.setStatus(
331 test,
332 status,
333 "{0}: {1}".format(exctype.__name__, value),
334 "".join(traceback.format_tb(tb)),
337 def addSuccess(self, test):
338 super(BuckTestResult, self).addSuccess(test)
339 self.setStatus(test, TestStatus.PASSED)
341 def addError(self, test, err):
342 super(BuckTestResult, self).addError(test, err)
343 self.setException(test, TestStatus.ABORTED, err)
345 def addFailure(self, test, err):
346 super(BuckTestResult, self).addFailure(test, err)
347 self.setException(test, TestStatus.FAILED, err)
349 def addSkip(self, test, reason):
350 super(BuckTestResult, self).addSkip(test, reason)
351 self.setStatus(test, TestStatus.SKIPPED, "Skipped: %s" % (reason,))
353 def addExpectedFailure(self, test, err):
354 super(BuckTestResult, self).addExpectedFailure(test, err)
355 self.setException(test, TestStatus.EXPECTED_FAILURE, err)
357 def addUnexpectedSuccess(self, test):
358 super(BuckTestResult, self).addUnexpectedSuccess(test)
359 self.setStatus(test, TestStatus.UNEXPECTED_SUCCESS, "Unexpected success")
361 def addStdout(self, val):
362 self._stdout += val
363 if self._show_output:
364 self._saved_stdout.write(val)
365 self._saved_stdout.flush()
367 def addStdoutBytes(self, val):
368 string = val.decode("utf-8", errors="backslashreplace")
369 self.addStdout(string)
371 def addStderr(self, val):
372 self._stderr += val
373 if self._show_output:
374 self._saved_stderr.write(val)
375 self._saved_stderr.flush()
377 def addStderrBytes(self, val):
378 string = val.decode("utf-8", errors="backslashreplace")
379 self.addStderr(string)
382 class BuckTestRunner(unittest.TextTestRunner):
383 def __init__(self, main_program, suite, show_output=True, **kwargs):
384 super(BuckTestRunner, self).__init__(**kwargs)
385 self.show_output = show_output
386 self._main_program = main_program
387 self._suite = suite
389 def _makeResult(self):
390 return BuckTestResult(
391 self.stream,
392 self.descriptions,
393 self.verbosity,
394 self.show_output,
395 self._main_program,
396 self._suite,
400 def _format_test_name(test_class, attrname):
401 return "{0}.{1}.{2}".format(test_class.__module__, test_class.__name__, attrname)
404 class StderrLogHandler(logging.StreamHandler):
406 This class is very similar to logging.StreamHandler, except that it
407 always uses the current sys.stderr object.
409 StreamHandler caches the current sys.stderr object when it is constructed.
410 This makes it behave poorly in unit tests, which may replace sys.stderr
411 with a StringIO buffer during tests. The StreamHandler will continue using
412 the old sys.stderr object instead of the desired StringIO buffer.
415 def __init__(self):
416 logging.Handler.__init__(self)
418 @property
419 def stream(self):
420 return sys.stderr
423 class RegexTestLoader(unittest.TestLoader):
424 def __init__(self, regex=None):
425 self.regex = regex
426 super(RegexTestLoader, self).__init__()
428 def getTestCaseNames(self, testCaseClass):
430 Return a sorted sequence of method names found within testCaseClass
433 testFnNames = super(RegexTestLoader, self).getTestCaseNames(testCaseClass)
434 if self.regex is None:
435 return testFnNames
436 robj = re.compile(self.regex)
437 matched = []
438 for attrname in testFnNames:
439 fullname = _format_test_name(testCaseClass, attrname)
440 if robj.search(fullname):
441 matched.append(attrname)
442 return matched
445 class Loader(object):
447 suiteClass = unittest.TestSuite
449 def __init__(self, modules, regex=None):
450 self.modules = modules
451 self.regex = regex
453 def load_all(self):
454 loader = RegexTestLoader(self.regex)
455 test_suite = self.suiteClass()
456 for module_name in self.modules:
457 __import__(module_name, level=0)
458 module = sys.modules[module_name]
459 module_suite = loader.loadTestsFromModule(module)
460 test_suite.addTest(module_suite)
461 return test_suite
463 def load_args(self, args):
464 loader = RegexTestLoader(self.regex)
466 suites = []
467 for arg in args:
468 suite = loader.loadTestsFromName(arg)
469 # loadTestsFromName() can only process names that refer to
470 # individual test functions or modules. It can't process package
471 # names. If there were no module/function matches, check to see if
472 # this looks like a package name.
473 if suite.countTestCases() != 0:
474 suites.append(suite)
475 continue
477 # Load all modules whose name is <arg>.<something>
478 prefix = arg + "."
479 for module in self.modules:
480 if module.startswith(prefix):
481 suite = loader.loadTestsFromName(module)
482 suites.append(suite)
484 return loader.suiteClass(suites)
487 _COVERAGE_INI = """\
488 [report]
489 exclude_lines =
490 pragma: no cover
491 pragma: nocover
492 pragma:.*no${PLATFORM}
493 pragma:.*no${PY_IMPL}${PY_MAJOR}${PY_MINOR}
494 pragma:.*no${PY_IMPL}${PY_MAJOR}
495 pragma:.*nopy${PY_MAJOR}
496 pragma:.*nopy${PY_MAJOR}${PY_MINOR}
500 class MainProgram(object):
502 This class implements the main program. It can be subclassed by
503 users who wish to customize some parts of the main program.
504 (Adding additional command line options, customizing test loading, etc.)
507 DEFAULT_VERBOSITY = 2
509 def __init__(self, argv):
510 self.init_option_parser()
511 self.parse_options(argv)
512 self.setup_logging()
514 def init_option_parser(self):
515 usage = "%prog [options] [TEST] ..."
516 op = optparse.OptionParser(usage=usage, add_help_option=False)
517 self.option_parser = op
519 op.add_option(
520 "--hide-output",
521 dest="show_output",
522 action="store_false",
523 default=True,
524 help="Suppress data that tests print to stdout/stderr, and only "
525 "show it if the test fails.",
527 op.add_option(
528 "-o",
529 "--output",
530 help="Write results to a file in a JSON format to be read by Buck",
532 op.add_option(
533 "-f",
534 "--failfast",
535 action="store_true",
536 default=False,
537 help="Stop after the first failure",
539 op.add_option(
540 "-l",
541 "--list-tests",
542 action="store_true",
543 dest="list",
544 default=False,
545 help="List tests and exit",
547 op.add_option(
548 "-r",
549 "--regex",
550 default=None,
551 help="Regex to apply to tests, to only run those tests",
553 op.add_option(
554 "--collect-coverage",
555 action="store_true",
556 default=False,
557 help="Collect test coverage information",
559 op.add_option(
560 "--coverage-include",
561 default="*",
562 help='File globs to include in converage (split by ",")',
564 op.add_option(
565 "--coverage-omit",
566 default="",
567 help='File globs to omit from converage (split by ",")',
569 op.add_option(
570 "--logger",
571 action="append",
572 metavar="<category>=<level>",
573 default=[],
574 help="Configure log levels for specific logger categories",
576 op.add_option(
577 "-q",
578 "--quiet",
579 action="count",
580 default=0,
581 help="Decrease the verbosity (may be specified multiple times)",
583 op.add_option(
584 "-v",
585 "--verbosity",
586 action="count",
587 default=self.DEFAULT_VERBOSITY,
588 help="Increase the verbosity (may be specified multiple times)",
590 op.add_option(
591 "-?", "--help", action="help", help="Show this help message and exit"
594 def parse_options(self, argv):
595 self.options, self.test_args = self.option_parser.parse_args(argv[1:])
596 self.options.verbosity -= self.options.quiet
598 if self.options.collect_coverage and coverage is None:
599 self.option_parser.error("coverage module is not available")
600 self.options.coverage_include = self.options.coverage_include.split(",")
601 if self.options.coverage_omit == "":
602 self.options.coverage_omit = []
603 else:
604 self.options.coverage_omit = self.options.coverage_omit.split(",")
606 def setup_logging(self):
607 # Configure the root logger to log at INFO level.
608 # This is similar to logging.basicConfig(), but uses our
609 # StderrLogHandler instead of a StreamHandler.
610 fmt = logging.Formatter("%(pathname)s:%(lineno)s: %(message)s")
611 log_handler = StderrLogHandler()
612 log_handler.setFormatter(fmt)
613 root_logger = logging.getLogger()
614 root_logger.addHandler(log_handler)
615 root_logger.setLevel(logging.INFO)
617 level_names = {
618 "debug": logging.DEBUG,
619 "info": logging.INFO,
620 "warn": logging.WARNING,
621 "warning": logging.WARNING,
622 "error": logging.ERROR,
623 "critical": logging.CRITICAL,
624 "fatal": logging.FATAL,
627 for value in self.options.logger:
628 parts = value.rsplit("=", 1)
629 if len(parts) != 2:
630 self.option_parser.error(
631 "--logger argument must be of the "
632 "form <name>=<level>: %s" % value
634 name = parts[0]
635 level_name = parts[1].lower()
636 level = level_names.get(level_name)
637 if level is None:
638 self.option_parser.error(
639 "invalid log level %r for log " "category %s" % (parts[1], name)
641 logging.getLogger(name).setLevel(level)
643 def create_loader(self):
644 import __test_modules__
646 return Loader(__test_modules__.TEST_MODULES, self.options.regex)
648 def load_tests(self):
649 loader = self.create_loader()
650 if self.options.collect_coverage:
651 self.start_coverage()
652 include = self.options.coverage_include
653 omit = self.options.coverage_omit
654 if include and "*" not in include:
655 optimize_for_coverage(self.cov, include, omit)
657 if self.test_args:
658 suite = loader.load_args(self.test_args)
659 else:
660 suite = loader.load_all()
661 if self.options.collect_coverage:
662 self.cov.start()
663 return suite
665 def get_tests(self, test_suite):
666 tests = []
668 for test in test_suite:
669 if isinstance(test, unittest.TestSuite):
670 tests.extend(self.get_tests(test))
671 else:
672 tests.append(test)
674 return tests
676 def run(self):
677 test_suite = self.load_tests()
679 if self.options.list:
680 for test in self.get_tests(test_suite):
681 method_name = getattr(test, "_testMethodName", "")
682 name = _format_test_name(test.__class__, method_name)
683 print(name)
684 return EXIT_CODE_SUCCESS
685 else:
686 result = self.run_tests(test_suite)
687 if self.options.output is not None:
688 with open(self.options.output, "w") as f:
689 json.dump(result.getResults(), f, indent=4, sort_keys=True)
690 if not result.wasSuccessful():
691 return EXIT_CODE_TEST_FAILURE
692 return EXIT_CODE_SUCCESS
694 def run_tests(self, test_suite):
695 # Install a signal handler to catch Ctrl-C and display the results
696 # (but only if running >2.6).
697 if sys.version_info[0] > 2 or sys.version_info[1] > 6:
698 unittest.installHandler()
700 # Run the tests
701 runner = BuckTestRunner(
702 self,
703 test_suite,
704 verbosity=self.options.verbosity,
705 show_output=self.options.show_output,
707 result = runner.run(test_suite)
709 if self.options.collect_coverage and self.options.show_output:
710 self.cov.stop()
711 try:
712 self.cov.report(file=sys.stdout)
713 except coverage.misc.CoverageException:
714 print("No lines were covered, potentially restricted by file filters")
716 return result
718 def get_abbr_impl(self):
719 """Return abbreviated implementation name."""
720 impl = platform.python_implementation()
721 if impl == "PyPy":
722 return "pp"
723 elif impl == "Jython":
724 return "jy"
725 elif impl == "IronPython":
726 return "ip"
727 elif impl == "CPython":
728 return "cp"
729 else:
730 raise RuntimeError("unknown python runtime")
732 def start_coverage(self):
733 if not self.options.collect_coverage:
734 return
736 with tempfile.NamedTemporaryFile("w", delete=False) as coverage_ini:
737 coverage_ini.write(_COVERAGE_INI)
738 self._coverage_ini_path = coverage_ini.name
740 # Keep the original working dir in case tests use os.chdir
741 self._original_working_dir = os.getcwd()
743 # for coverage config ignores by platform/python version
744 os.environ["PLATFORM"] = sys.platform
745 os.environ["PY_IMPL"] = self.get_abbr_impl()
746 os.environ["PY_MAJOR"] = str(sys.version_info.major)
747 os.environ["PY_MINOR"] = str(sys.version_info.minor)
749 self.cov = coverage.Coverage(
750 include=self.options.coverage_include,
751 omit=self.options.coverage_omit,
752 config_file=coverage_ini.name,
754 self.cov.erase()
755 self.cov.start()
757 def get_coverage(self):
758 if not self.options.collect_coverage:
759 return None
761 try:
762 os.remove(self._coverage_ini_path)
763 except OSError:
764 pass # Better to litter than to fail the test
766 # Switch back to the original working directory.
767 os.chdir(self._original_working_dir)
769 result = {}
771 self.cov.stop()
773 try:
774 f = StringIO()
775 self.cov.report(file=f)
776 lines = f.getvalue().split("\n")
777 except coverage.misc.CoverageException:
778 # Nothing was covered. That's fine by us
779 return result
781 # N.B.: the format of the coverage library's output differs
782 # depending on whether one or more files are in the results
783 for line in lines[2:]:
784 if line.strip("-") == "":
785 break
786 r = line.split()[0]
787 analysis = self.cov.analysis2(r)
788 covString = self.convert_to_diff_cov_str(analysis)
789 if covString:
790 result[r] = covString
792 return result
794 def convert_to_diff_cov_str(self, analysis):
795 # Info on the format of analysis:
796 # http://nedbatchelder.com/code/coverage/api.html
797 if not analysis:
798 return None
799 numLines = max(
800 analysis[1][-1] if len(analysis[1]) else 0,
801 analysis[2][-1] if len(analysis[2]) else 0,
802 analysis[3][-1] if len(analysis[3]) else 0,
804 lines = ["N"] * numLines
805 for l in analysis[1]:
806 lines[l - 1] = "C"
807 for l in analysis[2]:
808 lines[l - 1] = "X"
809 for l in analysis[3]:
810 lines[l - 1] = "U"
811 return "".join(lines)
814 def main(argv):
815 return MainProgram(sys.argv).run()
818 if __name__ == "__main__":
819 sys.exit(main(sys.argv))