provision: add get_dns_{forest,domain}_microsoft_dns_descriptor()
[Samba/gebeck_regimport.git] / lib / subunit / python / subunit / __init__.py
blob42dcf297e4baeaa124761bbcdd8f820d35aace71
2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """Subunit - a streaming test protocol
19 Overview
20 ++++++++
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
28 Key Classes
29 -----------
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to
53 ``testtools.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc. The reference for this API is in testtools (0.9.0
57 and newer).
59 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
60 remove tags in the test run that is currently executing. If called when no
61 test is in progress (that is, if called outside of the ``startTest``,
62 ``stopTest`` pair), the the tags apply to all subsequent tests. If called
63 when a test is in progress, then the tags only apply to that test.
65 The ``time(a_datetime)`` method is called (if present) when a ``time:``
66 directive is encountered in a Subunit stream. This is used to tell a TestResult
67 about the time that events in the stream occurred at, to allow reconstructing
68 test timing from a stream.
70 The ``progress(offset, whence)`` method controls progress data for a stream.
71 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
72 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
73 ignore the offset parameter.
76 Python test support
77 -------------------
79 ``subunit.run`` is a convenience wrapper to run a Python test suite via
80 the command line, reporting via Subunit::
82 $ python -m subunit.run mylib.tests.test_suite
84 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
85 tests, allowing isolation between the test runner and some tests.
87 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
88 tests that will fork() before that individual test is run.
90 `ExecTestCase`` is a convenience wrapper for running an external
91 program to get a Subunit stream and then report that back to an arbitrary
92 result object::
94 class AggregateTests(subunit.ExecTestCase):
96 def test_script_one(self):
97 './bin/script_one'
99 def test_script_two(self):
100 './bin/script_two'
102 # Normally your normal test loading would take of this automatically,
103 # It is only spelt out in detail here for clarity.
104 suite = unittest.TestSuite([AggregateTests("test_script_one"),
105 AggregateTests("test_script_two")])
106 # Create any TestResult class you like.
107 result = unittest._TextTestResult(sys.stdout)
108 # And run your suite as normal, Subunit will exec each external script as
109 # needed and report to your result object.
110 suite.run(result)
112 Utility modules
113 ---------------
115 * subunit.chunked contains HTTP chunked encoding/decoding logic.
116 * subunit.test_results contains TestResult helper classes.
119 import os
120 import re
121 import subprocess
122 import sys
123 import unittest
124 if sys.version_info > (3, 0):
125 from io import UnsupportedOperation as _UnsupportedOperation
126 else:
127 _UnsupportedOperation = AttributeError
130 from testtools import content, content_type, ExtendedToOriginalDecorator
131 from testtools.content import TracebackContent
132 from testtools.compat import _b, _u, BytesIO, StringIO
133 try:
134 from testtools.testresult.real import _StringException
135 RemoteException = _StringException
136 # For testing: different pythons have different str() implementations.
137 if sys.version_info > (3, 0):
138 _remote_exception_str = "testtools.testresult.real._StringException"
139 _remote_exception_str_chunked = "34\r\n" + _remote_exception_str
140 else:
141 _remote_exception_str = "_StringException"
142 _remote_exception_str_chunked = "1A\r\n" + _remote_exception_str
143 except ImportError:
144 raise ImportError ("testtools.testresult.real does not contain "
145 "_StringException, check your version.")
146 from testtools import testresult
148 from subunit import chunked, details, iso8601, test_results
150 # same format as sys.version_info: "A tuple containing the five components of
151 # the version number: major, minor, micro, releaselevel, and serial. All
152 # values except releaselevel are integers; the release level is 'alpha',
153 # 'beta', 'candidate', or 'final'. The version_info value corresponding to the
154 # Python version 2.0 is (2, 0, 0, 'final', 0)." Additionally we use a
155 # releaselevel of 'dev' for unreleased under-development code.
157 # If the releaselevel is 'alpha' then the major/minor/micro components are not
158 # established at this point, and setup.py will use a version of next-$(revno).
159 # If the releaselevel is 'final', then the tarball will be major.minor.micro.
160 # Otherwise it is major.minor.micro~$(revno).
162 __version__ = (0, 0, 9, 'final', 0)
164 PROGRESS_SET = 0
165 PROGRESS_CUR = 1
166 PROGRESS_PUSH = 2
167 PROGRESS_POP = 3
170 def test_suite():
171 import subunit.tests
172 return subunit.tests.test_suite()
175 def join_dir(base_path, path):
177 Returns an absolute path to C{path}, calculated relative to the parent
178 of C{base_path}.
180 @param base_path: A path to a file or directory.
181 @param path: An absolute path, or a path relative to the containing
182 directory of C{base_path}.
184 @return: An absolute path to C{path}.
186 return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
189 def tags_to_new_gone(tags):
190 """Split a list of tags into a new_set and a gone_set."""
191 new_tags = set()
192 gone_tags = set()
193 for tag in tags:
194 if tag[0] == '-':
195 gone_tags.add(tag[1:])
196 else:
197 new_tags.add(tag)
198 return new_tags, gone_tags
201 class DiscardStream(object):
202 """A filelike object which discards what is written to it."""
204 def fileno(self):
205 raise _UnsupportedOperation()
207 def write(self, bytes):
208 pass
210 def read(self, len=0):
211 return _b('')
214 class _ParserState(object):
215 """State for the subunit parser."""
217 def __init__(self, parser):
218 self.parser = parser
219 self._test_sym = (_b('test'), _b('testing'))
220 self._colon_sym = _b(':')
221 self._error_sym = (_b('error'),)
222 self._failure_sym = (_b('failure'),)
223 self._progress_sym = (_b('progress'),)
224 self._skip_sym = _b('skip')
225 self._success_sym = (_b('success'), _b('successful'))
226 self._tags_sym = (_b('tags'),)
227 self._time_sym = (_b('time'),)
228 self._xfail_sym = (_b('xfail'),)
229 self._uxsuccess_sym = (_b('uxsuccess'),)
230 self._start_simple = _u(" [")
231 self._start_multipart = _u(" [ multipart")
233 def addError(self, offset, line):
234 """An 'error:' directive has been read."""
235 self.parser.stdOutLineReceived(line)
237 def addExpectedFail(self, offset, line):
238 """An 'xfail:' directive has been read."""
239 self.parser.stdOutLineReceived(line)
241 def addFailure(self, offset, line):
242 """A 'failure:' directive has been read."""
243 self.parser.stdOutLineReceived(line)
245 def addSkip(self, offset, line):
246 """A 'skip:' directive has been read."""
247 self.parser.stdOutLineReceived(line)
249 def addSuccess(self, offset, line):
250 """A 'success:' directive has been read."""
251 self.parser.stdOutLineReceived(line)
253 def lineReceived(self, line):
254 """a line has been received."""
255 parts = line.split(None, 1)
256 if len(parts) == 2 and line.startswith(parts[0]):
257 cmd, rest = parts
258 offset = len(cmd) + 1
259 cmd = cmd.rstrip(self._colon_sym)
260 if cmd in self._test_sym:
261 self.startTest(offset, line)
262 elif cmd in self._error_sym:
263 self.addError(offset, line)
264 elif cmd in self._failure_sym:
265 self.addFailure(offset, line)
266 elif cmd in self._progress_sym:
267 self.parser._handleProgress(offset, line)
268 elif cmd in self._skip_sym:
269 self.addSkip(offset, line)
270 elif cmd in self._success_sym:
271 self.addSuccess(offset, line)
272 elif cmd in self._tags_sym:
273 self.parser._handleTags(offset, line)
274 self.parser.subunitLineReceived(line)
275 elif cmd in self._time_sym:
276 self.parser._handleTime(offset, line)
277 self.parser.subunitLineReceived(line)
278 elif cmd in self._xfail_sym:
279 self.addExpectedFail(offset, line)
280 elif cmd in self._uxsuccess_sym:
281 self.addUnexpectedSuccess(offset, line)
282 else:
283 self.parser.stdOutLineReceived(line)
284 else:
285 self.parser.stdOutLineReceived(line)
287 def lostConnection(self):
288 """Connection lost."""
289 self.parser._lostConnectionInTest(_u('unknown state of '))
291 def startTest(self, offset, line):
292 """A test start command received."""
293 self.parser.stdOutLineReceived(line)
296 class _InTest(_ParserState):
297 """State for the subunit parser after reading a test: directive."""
299 def _outcome(self, offset, line, no_details, details_state):
300 """An outcome directive has been read.
302 :param no_details: Callable to call when no details are presented.
303 :param details_state: The state to switch to for details
304 processing of this outcome.
306 test_name = line[offset:-1].decode('utf8')
307 if self.parser.current_test_description == test_name:
308 self.parser._state = self.parser._outside_test
309 self.parser.current_test_description = None
310 no_details()
311 self.parser.client.stopTest(self.parser._current_test)
312 self.parser._current_test = None
313 self.parser.subunitLineReceived(line)
314 elif self.parser.current_test_description + self._start_simple == \
315 test_name:
316 self.parser._state = details_state
317 details_state.set_simple()
318 self.parser.subunitLineReceived(line)
319 elif self.parser.current_test_description + self._start_multipart == \
320 test_name:
321 self.parser._state = details_state
322 details_state.set_multipart()
323 self.parser.subunitLineReceived(line)
324 else:
325 self.parser.stdOutLineReceived(line)
327 def _error(self):
328 self.parser.client.addError(self.parser._current_test,
329 details={})
331 def addError(self, offset, line):
332 """An 'error:' directive has been read."""
333 self._outcome(offset, line, self._error,
334 self.parser._reading_error_details)
336 def _xfail(self):
337 self.parser.client.addExpectedFailure(self.parser._current_test,
338 details={})
340 def addExpectedFail(self, offset, line):
341 """An 'xfail:' directive has been read."""
342 self._outcome(offset, line, self._xfail,
343 self.parser._reading_xfail_details)
345 def _uxsuccess(self):
346 self.parser.client.addUnexpectedSuccess(self.parser._current_test)
348 def addUnexpectedSuccess(self, offset, line):
349 """A 'uxsuccess:' directive has been read."""
350 self._outcome(offset, line, self._uxsuccess,
351 self.parser._reading_uxsuccess_details)
353 def _failure(self):
354 self.parser.client.addFailure(self.parser._current_test, details={})
356 def addFailure(self, offset, line):
357 """A 'failure:' directive has been read."""
358 self._outcome(offset, line, self._failure,
359 self.parser._reading_failure_details)
361 def _skip(self):
362 self.parser.client.addSkip(self.parser._current_test, details={})
364 def addSkip(self, offset, line):
365 """A 'skip:' directive has been read."""
366 self._outcome(offset, line, self._skip,
367 self.parser._reading_skip_details)
369 def _succeed(self):
370 self.parser.client.addSuccess(self.parser._current_test, details={})
372 def addSuccess(self, offset, line):
373 """A 'success:' directive has been read."""
374 self._outcome(offset, line, self._succeed,
375 self.parser._reading_success_details)
377 def lostConnection(self):
378 """Connection lost."""
379 self.parser._lostConnectionInTest(_u(''))
382 class _OutSideTest(_ParserState):
383 """State for the subunit parser outside of a test context."""
385 def lostConnection(self):
386 """Connection lost."""
388 def startTest(self, offset, line):
389 """A test start command received."""
390 self.parser._state = self.parser._in_test
391 test_name = line[offset:-1].decode('utf8')
392 self.parser._current_test = RemotedTestCase(test_name)
393 self.parser.current_test_description = test_name
394 self.parser.client.startTest(self.parser._current_test)
395 self.parser.subunitLineReceived(line)
398 class _ReadingDetails(_ParserState):
399 """Common logic for readin state details."""
401 def endDetails(self):
402 """The end of a details section has been reached."""
403 self.parser._state = self.parser._outside_test
404 self.parser.current_test_description = None
405 self._report_outcome()
406 self.parser.client.stopTest(self.parser._current_test)
408 def lineReceived(self, line):
409 """a line has been received."""
410 self.details_parser.lineReceived(line)
411 self.parser.subunitLineReceived(line)
413 def lostConnection(self):
414 """Connection lost."""
415 self.parser._lostConnectionInTest(_u('%s report of ') %
416 self._outcome_label())
418 def _outcome_label(self):
419 """The label to describe this outcome."""
420 raise NotImplementedError(self._outcome_label)
422 def set_simple(self):
423 """Start a simple details parser."""
424 self.details_parser = details.SimpleDetailsParser(self)
426 def set_multipart(self):
427 """Start a multipart details parser."""
428 self.details_parser = details.MultipartDetailsParser(self)
431 class _ReadingFailureDetails(_ReadingDetails):
432 """State for the subunit parser when reading failure details."""
434 def _report_outcome(self):
435 self.parser.client.addFailure(self.parser._current_test,
436 details=self.details_parser.get_details())
438 def _outcome_label(self):
439 return "failure"
442 class _ReadingErrorDetails(_ReadingDetails):
443 """State for the subunit parser when reading error details."""
445 def _report_outcome(self):
446 self.parser.client.addError(self.parser._current_test,
447 details=self.details_parser.get_details())
449 def _outcome_label(self):
450 return "error"
453 class _ReadingExpectedFailureDetails(_ReadingDetails):
454 """State for the subunit parser when reading xfail details."""
456 def _report_outcome(self):
457 self.parser.client.addExpectedFailure(self.parser._current_test,
458 details=self.details_parser.get_details())
460 def _outcome_label(self):
461 return "xfail"
464 class _ReadingUnexpectedSuccessDetails(_ReadingDetails):
465 """State for the subunit parser when reading uxsuccess details."""
467 def _report_outcome(self):
468 self.parser.client.addUnexpectedSuccess(self.parser._current_test,
469 details=self.details_parser.get_details())
471 def _outcome_label(self):
472 return "uxsuccess"
475 class _ReadingSkipDetails(_ReadingDetails):
476 """State for the subunit parser when reading skip details."""
478 def _report_outcome(self):
479 self.parser.client.addSkip(self.parser._current_test,
480 details=self.details_parser.get_details("skip"))
482 def _outcome_label(self):
483 return "skip"
486 class _ReadingSuccessDetails(_ReadingDetails):
487 """State for the subunit parser when reading success details."""
489 def _report_outcome(self):
490 self.parser.client.addSuccess(self.parser._current_test,
491 details=self.details_parser.get_details("success"))
493 def _outcome_label(self):
494 return "success"
497 class TestProtocolServer(object):
498 """A parser for subunit.
500 :ivar tags: The current tags associated with the protocol stream.
503 def __init__(self, client, stream=None, forward_stream=None):
504 """Create a TestProtocolServer instance.
506 :param client: An object meeting the unittest.TestResult protocol.
507 :param stream: The stream that lines received which are not part of the
508 subunit protocol should be written to. This allows custom handling
509 of mixed protocols. By default, sys.stdout will be used for
510 convenience. It should accept bytes to its write() method.
511 :param forward_stream: A stream to forward subunit lines to. This
512 allows a filter to forward the entire stream while still parsing
513 and acting on it. By default forward_stream is set to
514 DiscardStream() and no forwarding happens.
516 self.client = ExtendedToOriginalDecorator(client)
517 if stream is None:
518 stream = sys.stdout
519 if sys.version_info > (3, 0):
520 stream = stream.buffer
521 self._stream = stream
522 self._forward_stream = forward_stream or DiscardStream()
523 # state objects we can switch too
524 self._in_test = _InTest(self)
525 self._outside_test = _OutSideTest(self)
526 self._reading_error_details = _ReadingErrorDetails(self)
527 self._reading_failure_details = _ReadingFailureDetails(self)
528 self._reading_skip_details = _ReadingSkipDetails(self)
529 self._reading_success_details = _ReadingSuccessDetails(self)
530 self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
531 self._reading_uxsuccess_details = _ReadingUnexpectedSuccessDetails(self)
532 # start with outside test.
533 self._state = self._outside_test
534 # Avoid casts on every call
535 self._plusminus = _b('+-')
536 self._push_sym = _b('push')
537 self._pop_sym = _b('pop')
539 def _handleProgress(self, offset, line):
540 """Process a progress directive."""
541 line = line[offset:].strip()
542 if line[0] in self._plusminus:
543 whence = PROGRESS_CUR
544 delta = int(line)
545 elif line == self._push_sym:
546 whence = PROGRESS_PUSH
547 delta = None
548 elif line == self._pop_sym:
549 whence = PROGRESS_POP
550 delta = None
551 else:
552 whence = PROGRESS_SET
553 delta = int(line)
554 self.client.progress(delta, whence)
556 def _handleTags(self, offset, line):
557 """Process a tags command."""
558 tags = line[offset:].decode('utf8').split()
559 new_tags, gone_tags = tags_to_new_gone(tags)
560 self.client.tags(new_tags, gone_tags)
562 def _handleTime(self, offset, line):
563 # Accept it, but do not do anything with it yet.
564 try:
565 event_time = iso8601.parse_date(line[offset:-1])
566 except TypeError:
567 raise TypeError(_u("Failed to parse %r, got %r")
568 % (line, sys.exec_info[1]))
569 self.client.time(event_time)
571 def lineReceived(self, line):
572 """Call the appropriate local method for the received line."""
573 self._state.lineReceived(line)
575 def _lostConnectionInTest(self, state_string):
576 error_string = _u("lost connection during %stest '%s'") % (
577 state_string, self.current_test_description)
578 self.client.addError(self._current_test, RemoteError(error_string))
579 self.client.stopTest(self._current_test)
581 def lostConnection(self):
582 """The input connection has finished."""
583 self._state.lostConnection()
585 def readFrom(self, pipe):
586 """Blocking convenience API to parse an entire stream.
588 :param pipe: A file-like object supporting readlines().
589 :return: None.
591 for line in pipe.readlines():
592 self.lineReceived(line)
593 self.lostConnection()
595 def _startTest(self, offset, line):
596 """Internal call to change state machine. Override startTest()."""
597 self._state.startTest(offset, line)
599 def subunitLineReceived(self, line):
600 self._forward_stream.write(line)
602 def stdOutLineReceived(self, line):
603 self._stream.write(line)
606 class TestProtocolClient(testresult.TestResult):
607 """A TestResult which generates a subunit stream for a test run.
609 # Get a TestSuite or TestCase to run
610 suite = make_suite()
611 # Create a stream (any object with a 'write' method). This should accept
612 # bytes not strings: subunit is a byte orientated protocol.
613 stream = file('tests.log', 'wb')
614 # Create a subunit result object which will output to the stream
615 result = subunit.TestProtocolClient(stream)
616 # Optionally, to get timing data for performance analysis, wrap the
617 # serialiser with a timing decorator
618 result = subunit.test_results.AutoTimingTestResultDecorator(result)
619 # Run the test suite reporting to the subunit result object
620 suite.run(result)
621 # Close the stream.
622 stream.close()
625 def __init__(self, stream):
626 testresult.TestResult.__init__(self)
627 stream = _make_stream_binary(stream)
628 self._stream = stream
629 self._progress_fmt = _b("progress: ")
630 self._bytes_eol = _b("\n")
631 self._progress_plus = _b("+")
632 self._progress_push = _b("push")
633 self._progress_pop = _b("pop")
634 self._empty_bytes = _b("")
635 self._start_simple = _b(" [\n")
636 self._end_simple = _b("]\n")
638 def addError(self, test, error=None, details=None):
639 """Report an error in test test.
641 Only one of error and details should be provided: conceptually there
642 are two separate methods:
643 addError(self, test, error)
644 addError(self, test, details)
646 :param error: Standard unittest positional argument form - an
647 exc_info tuple.
648 :param details: New Testing-in-python drafted API; a dict from string
649 to subunit.Content objects.
651 self._addOutcome("error", test, error=error, details=details)
652 if self.failfast:
653 self.stop()
655 def addExpectedFailure(self, test, error=None, details=None):
656 """Report an expected failure in test test.
658 Only one of error and details should be provided: conceptually there
659 are two separate methods:
660 addError(self, test, error)
661 addError(self, test, details)
663 :param error: Standard unittest positional argument form - an
664 exc_info tuple.
665 :param details: New Testing-in-python drafted API; a dict from string
666 to subunit.Content objects.
668 self._addOutcome("xfail", test, error=error, details=details)
670 def addFailure(self, test, error=None, details=None):
671 """Report a failure in test test.
673 Only one of error and details should be provided: conceptually there
674 are two separate methods:
675 addFailure(self, test, error)
676 addFailure(self, test, details)
678 :param error: Standard unittest positional argument form - an
679 exc_info tuple.
680 :param details: New Testing-in-python drafted API; a dict from string
681 to subunit.Content objects.
683 self._addOutcome("failure", test, error=error, details=details)
684 if self.failfast:
685 self.stop()
687 def _addOutcome(self, outcome, test, error=None, details=None,
688 error_permitted=True):
689 """Report a failure in test test.
691 Only one of error and details should be provided: conceptually there
692 are two separate methods:
693 addOutcome(self, test, error)
694 addOutcome(self, test, details)
696 :param outcome: A string describing the outcome - used as the
697 event name in the subunit stream.
698 :param error: Standard unittest positional argument form - an
699 exc_info tuple.
700 :param details: New Testing-in-python drafted API; a dict from string
701 to subunit.Content objects.
702 :param error_permitted: If True then one and only one of error or
703 details must be supplied. If False then error must not be supplied
704 and details is still optional. """
705 self._stream.write(_b("%s: " % outcome) + self._test_id(test))
706 if error_permitted:
707 if error is None and details is None:
708 raise ValueError
709 else:
710 if error is not None:
711 raise ValueError
712 if error is not None:
713 self._stream.write(self._start_simple)
714 tb_content = TracebackContent(error, test)
715 for bytes in tb_content.iter_bytes():
716 self._stream.write(bytes)
717 elif details is not None:
718 self._write_details(details)
719 else:
720 self._stream.write(_b("\n"))
721 if details is not None or error is not None:
722 self._stream.write(self._end_simple)
724 def addSkip(self, test, reason=None, details=None):
725 """Report a skipped test."""
726 if reason is None:
727 self._addOutcome("skip", test, error=None, details=details)
728 else:
729 self._stream.write(_b("skip: %s [\n" % test.id()))
730 self._stream.write(_b("%s\n" % reason))
731 self._stream.write(self._end_simple)
733 def addSuccess(self, test, details=None):
734 """Report a success in a test."""
735 self._addOutcome("successful", test, details=details, error_permitted=False)
737 def addUnexpectedSuccess(self, test, details=None):
738 """Report an unexpected success in test test.
740 Details can optionally be provided: conceptually there
741 are two separate methods:
742 addError(self, test)
743 addError(self, test, details)
745 :param details: New Testing-in-python drafted API; a dict from string
746 to subunit.Content objects.
748 self._addOutcome("uxsuccess", test, details=details,
749 error_permitted=False)
750 if self.failfast:
751 self.stop()
753 def _test_id(self, test):
754 result = test.id()
755 if type(result) is not bytes:
756 result = result.encode('utf8')
757 return result
759 def startTest(self, test):
760 """Mark a test as starting its test run."""
761 super(TestProtocolClient, self).startTest(test)
762 self._stream.write(_b("test: ") + self._test_id(test) + _b("\n"))
763 self._stream.flush()
765 def stopTest(self, test):
766 super(TestProtocolClient, self).stopTest(test)
767 self._stream.flush()
769 def progress(self, offset, whence):
770 """Provide indication about the progress/length of the test run.
772 :param offset: Information about the number of tests remaining. If
773 whence is PROGRESS_CUR, then offset increases/decreases the
774 remaining test count. If whence is PROGRESS_SET, then offset
775 specifies exactly the remaining test count.
776 :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
777 PROGRESS_POP.
779 if whence == PROGRESS_CUR and offset > -1:
780 prefix = self._progress_plus
781 offset = _b(str(offset))
782 elif whence == PROGRESS_PUSH:
783 prefix = self._empty_bytes
784 offset = self._progress_push
785 elif whence == PROGRESS_POP:
786 prefix = self._empty_bytes
787 offset = self._progress_pop
788 else:
789 prefix = self._empty_bytes
790 offset = _b(str(offset))
791 self._stream.write(self._progress_fmt + prefix + offset +
792 self._bytes_eol)
794 def tags(self, new_tags, gone_tags):
795 """Inform the client about tags added/removed from the stream."""
796 if not new_tags and not gone_tags:
797 return
798 tags = set([tag.encode('utf8') for tag in new_tags])
799 tags.update([_b("-") + tag.encode('utf8') for tag in gone_tags])
800 tag_line = _b("tags: ") + _b(" ").join(tags) + _b("\n")
801 self._stream.write(tag_line)
803 def time(self, a_datetime):
804 """Inform the client of the time.
806 ":param datetime: A datetime.datetime object.
808 time = a_datetime.astimezone(iso8601.Utc())
809 self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
810 time.year, time.month, time.day, time.hour, time.minute,
811 time.second, time.microsecond)))
813 def _write_details(self, details):
814 """Output details to the stream.
816 :param details: An extended details dict for a test outcome.
818 self._stream.write(_b(" [ multipart\n"))
819 for name, content in sorted(details.items()):
820 self._stream.write(_b("Content-Type: %s/%s" %
821 (content.content_type.type, content.content_type.subtype)))
822 parameters = content.content_type.parameters
823 if parameters:
824 self._stream.write(_b(";"))
825 param_strs = []
826 for param, value in parameters.items():
827 param_strs.append("%s=%s" % (param, value))
828 self._stream.write(_b(",".join(param_strs)))
829 self._stream.write(_b("\n%s\n" % name))
830 encoder = chunked.Encoder(self._stream)
831 list(map(encoder.write, content.iter_bytes()))
832 encoder.close()
834 def done(self):
835 """Obey the testtools result.done() interface."""
838 def RemoteError(description=_u("")):
839 return (_StringException, _StringException(description), None)
842 class RemotedTestCase(unittest.TestCase):
843 """A class to represent test cases run in child processes.
845 Instances of this class are used to provide the Python test API a TestCase
846 that can be printed to the screen, introspected for metadata and so on.
847 However, as they are a simply a memoisation of a test that was actually
848 run in the past by a separate process, they cannot perform any interactive
849 actions.
852 def __eq__ (self, other):
853 try:
854 return self.__description == other.__description
855 except AttributeError:
856 return False
858 def __init__(self, description):
859 """Create a psuedo test case with description description."""
860 self.__description = description
862 def error(self, label):
863 raise NotImplementedError("%s on RemotedTestCases is not permitted." %
864 label)
866 def setUp(self):
867 self.error("setUp")
869 def tearDown(self):
870 self.error("tearDown")
872 def shortDescription(self):
873 return self.__description
875 def id(self):
876 return "%s" % (self.__description,)
878 def __str__(self):
879 return "%s (%s)" % (self.__description, self._strclass())
881 def __repr__(self):
882 return "<%s description='%s'>" % \
883 (self._strclass(), self.__description)
885 def run(self, result=None):
886 if result is None: result = self.defaultTestResult()
887 result.startTest(self)
888 result.addError(self, RemoteError(_u("Cannot run RemotedTestCases.\n")))
889 result.stopTest(self)
891 def _strclass(self):
892 cls = self.__class__
893 return "%s.%s" % (cls.__module__, cls.__name__)
896 class ExecTestCase(unittest.TestCase):
897 """A test case which runs external scripts for test fixtures."""
899 def __init__(self, methodName='runTest'):
900 """Create an instance of the class that will use the named test
901 method when executed. Raises a ValueError if the instance does
902 not have a method with the specified name.
904 unittest.TestCase.__init__(self, methodName)
905 testMethod = getattr(self, methodName)
906 self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
907 testMethod.__doc__)
909 def countTestCases(self):
910 return 1
912 def run(self, result=None):
913 if result is None: result = self.defaultTestResult()
914 self._run(result)
916 def debug(self):
917 """Run the test without collecting errors in a TestResult"""
918 self._run(testresult.TestResult())
920 def _run(self, result):
921 protocol = TestProtocolServer(result)
922 process = subprocess.Popen(self.script, shell=True,
923 stdout=subprocess.PIPE)
924 _make_stream_binary(process.stdout)
925 output = process.communicate()[0]
926 protocol.readFrom(BytesIO(output))
929 class IsolatedTestCase(unittest.TestCase):
930 """A TestCase which executes in a forked process.
932 Each test gets its own process, which has a performance overhead but will
933 provide excellent isolation from global state (such as django configs,
934 zope utilities and so on).
937 def run(self, result=None):
938 if result is None: result = self.defaultTestResult()
939 run_isolated(unittest.TestCase, self, result)
942 class IsolatedTestSuite(unittest.TestSuite):
943 """A TestSuite which runs its tests in a forked process.
945 This decorator that will fork() before running the tests and report the
946 results from the child process using a Subunit stream. This is useful for
947 handling tests that mutate global state, or are testing C extensions that
948 could crash the VM.
951 def run(self, result=None):
952 if result is None: result = testresult.TestResult()
953 run_isolated(unittest.TestSuite, self, result)
956 def run_isolated(klass, self, result):
957 """Run a test suite or case in a subprocess, using the run method on klass.
959 c2pread, c2pwrite = os.pipe()
960 # fixme - error -> result
961 # now fork
962 pid = os.fork()
963 if pid == 0:
964 # Child
965 # Close parent's pipe ends
966 os.close(c2pread)
967 # Dup fds for child
968 os.dup2(c2pwrite, 1)
969 # Close pipe fds.
970 os.close(c2pwrite)
972 # at this point, sys.stdin is redirected, now we want
973 # to filter it to escape ]'s.
974 ### XXX: test and write that bit.
975 stream = os.fdopen(1, 'wb')
976 result = TestProtocolClient(stream)
977 klass.run(self, result)
978 stream.flush()
979 sys.stderr.flush()
980 # exit HARD, exit NOW.
981 os._exit(0)
982 else:
983 # Parent
984 # Close child pipe ends
985 os.close(c2pwrite)
986 # hookup a protocol engine
987 protocol = TestProtocolServer(result)
988 fileobj = os.fdopen(c2pread, 'rb')
989 protocol.readFrom(fileobj)
990 os.waitpid(pid, 0)
991 # TODO return code evaluation.
992 return result
995 def TAP2SubUnit(tap, subunit):
996 """Filter a TAP pipe into a subunit pipe.
998 :param tap: A tap pipe/stream/file object.
999 :param subunit: A pipe/stream/file object to write subunit results to.
1000 :return: The exit code to exit with.
1002 BEFORE_PLAN = 0
1003 AFTER_PLAN = 1
1004 SKIP_STREAM = 2
1005 state = BEFORE_PLAN
1006 plan_start = 1
1007 plan_stop = 0
1008 def _skipped_test(subunit, plan_start):
1009 # Some tests were skipped.
1010 subunit.write('test test %d\n' % plan_start)
1011 subunit.write('error test %d [\n' % plan_start)
1012 subunit.write('test missing from TAP output\n')
1013 subunit.write(']\n')
1014 return plan_start + 1
1015 # Test data for the next test to emit
1016 test_name = None
1017 log = []
1018 result = None
1019 def _emit_test():
1020 "write out a test"
1021 if test_name is None:
1022 return
1023 subunit.write("test %s\n" % test_name)
1024 if not log:
1025 subunit.write("%s %s\n" % (result, test_name))
1026 else:
1027 subunit.write("%s %s [\n" % (result, test_name))
1028 if log:
1029 for line in log:
1030 subunit.write("%s\n" % line)
1031 subunit.write("]\n")
1032 del log[:]
1033 for line in tap:
1034 if state == BEFORE_PLAN:
1035 match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
1036 if match:
1037 state = AFTER_PLAN
1038 _, plan_stop, comment = match.groups()
1039 plan_stop = int(plan_stop)
1040 if plan_start > plan_stop and plan_stop == 0:
1041 # skipped file
1042 state = SKIP_STREAM
1043 subunit.write("test file skip\n")
1044 subunit.write("skip file skip [\n")
1045 subunit.write("%s\n" % comment)
1046 subunit.write("]\n")
1047 continue
1048 # not a plan line, or have seen one before
1049 match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP|skip|todo)(?:\s+(.*))?)?\n", line)
1050 if match:
1051 # new test, emit current one.
1052 _emit_test()
1053 status, number, description, directive, directive_comment = match.groups()
1054 if status == 'ok':
1055 result = 'success'
1056 else:
1057 result = "failure"
1058 if description is None:
1059 description = ''
1060 else:
1061 description = ' ' + description
1062 if directive is not None:
1063 if directive.upper() == 'TODO':
1064 result = 'xfail'
1065 elif directive.upper() == 'SKIP':
1066 result = 'skip'
1067 if directive_comment is not None:
1068 log.append(directive_comment)
1069 if number is not None:
1070 number = int(number)
1071 while plan_start < number:
1072 plan_start = _skipped_test(subunit, plan_start)
1073 test_name = "test %d%s" % (plan_start, description)
1074 plan_start += 1
1075 continue
1076 match = re.match("Bail out\!(?:\s*(.*))?\n", line)
1077 if match:
1078 reason, = match.groups()
1079 if reason is None:
1080 extra = ''
1081 else:
1082 extra = ' %s' % reason
1083 _emit_test()
1084 test_name = "Bail out!%s" % extra
1085 result = "error"
1086 state = SKIP_STREAM
1087 continue
1088 match = re.match("\#.*\n", line)
1089 if match:
1090 log.append(line[:-1])
1091 continue
1092 subunit.write(line)
1093 _emit_test()
1094 while plan_start <= plan_stop:
1095 # record missed tests
1096 plan_start = _skipped_test(subunit, plan_start)
1097 return 0
1100 def tag_stream(original, filtered, tags):
1101 """Alter tags on a stream.
1103 :param original: The input stream.
1104 :param filtered: The output stream.
1105 :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
1106 '-TAG' commands.
1108 A 'TAG' command will add the tag to the output stream,
1109 and override any existing '-TAG' command in that stream.
1110 Specifically:
1111 * A global 'tags: TAG' will be added to the start of the stream.
1112 * Any tags commands with -TAG will have the -TAG removed.
1114 A '-TAG' command will remove the TAG command from the stream.
1115 Specifically:
1116 * A 'tags: -TAG' command will be added to the start of the stream.
1117 * Any 'tags: TAG' command will have 'TAG' removed from it.
1118 Additionally, any redundant tagging commands (adding a tag globally
1119 present, or removing a tag globally removed) are stripped as a
1120 by-product of the filtering.
1121 :return: 0
1123 new_tags, gone_tags = tags_to_new_gone(tags)
1124 def write_tags(new_tags, gone_tags):
1125 if new_tags or gone_tags:
1126 filtered.write("tags: " + ' '.join(new_tags))
1127 if gone_tags:
1128 for tag in gone_tags:
1129 filtered.write("-" + tag)
1130 filtered.write("\n")
1131 write_tags(new_tags, gone_tags)
1132 # TODO: use the protocol parser and thus don't mangle test comments.
1133 for line in original:
1134 if line.startswith("tags:"):
1135 line_tags = line[5:].split()
1136 line_new, line_gone = tags_to_new_gone(line_tags)
1137 line_new = line_new - gone_tags
1138 line_gone = line_gone - new_tags
1139 write_tags(line_new, line_gone)
1140 else:
1141 filtered.write(line)
1142 return 0
1145 class ProtocolTestCase(object):
1146 """Subunit wire protocol to unittest.TestCase adapter.
1148 ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1149 calling a ProtocolTestCase or invoking the run() method will make a 'test
1150 run' happen. The 'test run' will simply be a replay of the test activity
1151 that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1152 and ``countTestCases`` methods are not supported because there isn't a
1153 sensible mapping for those methods.
1155 # Get a stream (any object with a readline() method), in this case the
1156 # stream output by the example from ``subunit.TestProtocolClient``.
1157 stream = file('tests.log', 'rb')
1158 # Create a parser which will read from the stream and emit
1159 # activity to a unittest.TestResult when run() is called.
1160 suite = subunit.ProtocolTestCase(stream)
1161 # Create a result object to accept the contents of that stream.
1162 result = unittest._TextTestResult(sys.stdout)
1163 # 'run' the tests - process the stream and feed its contents to result.
1164 suite.run(result)
1165 stream.close()
1167 :seealso: TestProtocolServer (the subunit wire protocol parser).
1170 def __init__(self, stream, passthrough=None, forward=None):
1171 """Create a ProtocolTestCase reading from stream.
1173 :param stream: A filelike object which a subunit stream can be read
1174 from.
1175 :param passthrough: A stream pass non subunit input on to. If not
1176 supplied, the TestProtocolServer default is used.
1177 :param forward: A stream to pass subunit input on to. If not supplied
1178 subunit input is not forwarded.
1180 stream = _make_stream_binary(stream)
1181 self._stream = stream
1182 self._passthrough = passthrough
1183 if forward is not None:
1184 forward = _make_stream_binary(forward)
1185 self._forward = forward
1187 def __call__(self, result=None):
1188 return self.run(result)
1190 def run(self, result=None):
1191 if result is None:
1192 result = self.defaultTestResult()
1193 protocol = TestProtocolServer(result, self._passthrough, self._forward)
1194 line = self._stream.readline()
1195 while line:
1196 protocol.lineReceived(line)
1197 line = self._stream.readline()
1198 protocol.lostConnection()
1201 class TestResultStats(testresult.TestResult):
1202 """A pyunit TestResult interface implementation for making statistics.
1204 :ivar total_tests: The total tests seen.
1205 :ivar passed_tests: The tests that passed.
1206 :ivar failed_tests: The tests that failed.
1207 :ivar seen_tags: The tags seen across all tests.
1210 def __init__(self, stream):
1211 """Create a TestResultStats which outputs to stream."""
1212 testresult.TestResult.__init__(self)
1213 self._stream = stream
1214 self.failed_tests = 0
1215 self.skipped_tests = 0
1216 self.seen_tags = set()
1218 @property
1219 def total_tests(self):
1220 return self.testsRun
1222 def addError(self, test, err, details=None):
1223 self.failed_tests += 1
1225 def addFailure(self, test, err, details=None):
1226 self.failed_tests += 1
1228 def addSkip(self, test, reason, details=None):
1229 self.skipped_tests += 1
1231 def formatStats(self):
1232 self._stream.write("Total tests: %5d\n" % self.total_tests)
1233 self._stream.write("Passed tests: %5d\n" % self.passed_tests)
1234 self._stream.write("Failed tests: %5d\n" % self.failed_tests)
1235 self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1236 tags = sorted(self.seen_tags)
1237 self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1239 @property
1240 def passed_tests(self):
1241 return self.total_tests - self.failed_tests - self.skipped_tests
1243 def tags(self, new_tags, gone_tags):
1244 """Accumulate the seen tags."""
1245 self.seen_tags.update(new_tags)
1247 def wasSuccessful(self):
1248 """Tells whether or not this result was a success"""
1249 return self.failed_tests == 0
1252 def get_default_formatter():
1253 """Obtain the default formatter to write to.
1255 :return: A file-like object.
1257 formatter = os.getenv("SUBUNIT_FORMATTER")
1258 if formatter:
1259 return os.popen(formatter, "w")
1260 else:
1261 stream = sys.stdout
1262 if sys.version_info > (3, 0):
1263 stream = stream.buffer
1264 return stream
1267 def read_test_list(path):
1268 """Read a list of test ids from a file on disk.
1270 :param path: Path to the file
1271 :return: Sequence of test ids
1273 f = open(path, 'rb')
1274 try:
1275 return [l.rstrip("\n") for l in f.readlines()]
1276 finally:
1277 f.close()
1280 def _make_stream_binary(stream):
1281 """Ensure that a stream will be binary safe. See _make_binary_on_windows.
1283 :return: A binary version of the same stream (some streams cannot be
1284 'fixed' but can be unwrapped).
1286 try:
1287 fileno = stream.fileno()
1288 except _UnsupportedOperation:
1289 pass
1290 else:
1291 _make_binary_on_windows(fileno)
1292 return _unwrap_text(stream)
1294 def _make_binary_on_windows(fileno):
1295 """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
1296 if sys.platform == "win32":
1297 import msvcrt
1298 msvcrt.setmode(fileno, os.O_BINARY)
1301 def _unwrap_text(stream):
1302 """Unwrap stream if it is a text stream to get the original buffer."""
1303 if sys.version_info > (3, 0):
1304 try:
1305 # Read streams
1306 if type(stream.read(0)) is str:
1307 return stream.buffer
1308 except (_UnsupportedOperation, IOError):
1309 # Cannot read from the stream: try via writes
1310 try:
1311 stream.write(_b(''))
1312 except TypeError:
1313 return stream.buffer
1314 return stream