1 # Python module for parsing and generating the Subunit protocol
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 __all__
= ['parse_results']
23 import subunit
.iso8601
25 from testtools
import content
, content_type
27 VALID_RESULTS
= ['success', 'successful', 'failure', 'fail', 'skip', 'knownfail', 'error', 'xfail', 'skip-testsuite', 'testsuite-failure', 'testsuite-xfail', 'testsuite-success', 'testsuite-error', 'uxsuccess']
29 class TestsuiteEnabledTestResult(testtools
.testresult
.TestResult
):
31 def start_testsuite(self
, name
):
32 raise NotImplementedError(self
.start_testsuite
)
35 def parse_results(msg_ops
, statistics
, fh
):
43 parts
= l
.split(None, 1)
44 if not len(parts
) == 2 or not l
.startswith(parts
[0]):
47 command
= parts
[0].rstrip(":")
49 if command
in ("test", "testing"):
50 msg_ops
.control_msg(l
)
52 test
= subunit
.RemotedTestCase(name
)
53 if name
in open_tests
:
54 msg_ops
.addError(open_tests
.pop(name
), subunit
.RemoteError(u
"Test already running"))
55 msg_ops
.startTest(test
)
56 open_tests
[name
] = test
57 elif command
== "time":
58 msg_ops
.control_msg(l
)
60 dt
= subunit
.iso8601
.parse_date(arg
.rstrip("\n"))
62 print "Unable to parse time line: %s" % arg
.rstrip("\n")
65 elif command
in VALID_RESULTS
:
66 msg_ops
.control_msg(l
)
68 grp
= re
.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg
)
69 (testname
, hasreason
) = (grp
.group(1), grp
.group(2))
72 # reason may be specified in next lines
78 msg_ops
.control_msg(l
)
85 remote_error
= subunit
.RemoteError(reason
.decode("utf-8"))
88 statistics
['TESTS_ERROR']+=1
89 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"reason (%s) interrupted" % result
))
93 remote_error
= subunit
.RemoteError(u
"No reason specified")
94 if result
in ("success", "successful"):
96 test
= open_tests
.pop(testname
)
98 statistics
['TESTS_ERROR']+=1
100 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
102 statistics
['TESTS_EXPECTED_OK']+=1
103 msg_ops
.addSuccess(test
)
104 elif result
in ("xfail", "knownfail"):
106 test
= open_tests
.pop(testname
)
108 statistics
['TESTS_ERROR']+=1
110 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
112 statistics
['TESTS_EXPECTED_FAIL']+=1
113 msg_ops
.addExpectedFailure(test
, remote_error
)
114 elif result
in ("uxsuccess", ):
116 test
= open_tests
.pop(testname
)
118 statistics
['TESTS_ERROR']+=1
120 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
122 statistics
['TESTS_UNEXPECTED_OK']+=1
123 msg_ops
.addUnexpectedSuccess(test
, remote_error
)
125 elif result
in ("failure", "fail"):
127 test
= open_tests
.pop(testname
)
129 statistics
['TESTS_ERROR']+=1
131 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
133 statistics
['TESTS_UNEXPECTED_FAIL']+=1
135 msg_ops
.addFailure(test
, remote_error
)
136 elif result
== "skip":
137 statistics
['TESTS_SKIP']+=1
138 # Allow tests to be skipped without prior announcement of test
140 test
= open_tests
.pop(testname
)
142 test
= subunit
.RemotedTestCase(testname
)
143 msg_ops
.addSkip(test
, reason
)
144 elif result
== "error":
145 statistics
['TESTS_ERROR']+=1
148 test
= open_tests
.pop(testname
)
150 test
= subunit
.RemotedTestCase(testname
)
151 msg_ops
.addError(test
, remote_error
)
152 elif result
== "skip-testsuite":
153 msg_ops
.skip_testsuite(testname
)
154 elif result
== "testsuite-success":
155 msg_ops
.end_testsuite(testname
, "success", reason
)
156 elif result
== "testsuite-failure":
157 msg_ops
.end_testsuite(testname
, "failure", reason
)
159 elif result
== "testsuite-xfail":
160 msg_ops
.end_testsuite(testname
, "xfail", reason
)
161 elif result
== "testsuite-error":
162 msg_ops
.end_testsuite(testname
, "error", reason
)
165 raise AssertionError("Recognized but unhandled result %r" %
167 elif command
== "testsuite":
168 msg_ops
.start_testsuite(arg
.strip())
169 elif command
== "progress":
172 msg_ops
.progress(None, subunit
.PROGRESS_POP
)
174 msg_ops
.progress(None, subunit
.PROGRESS_PUSH
)
176 msg_ops
.progress(int(arg
), subunit
.PROGRESS_CUR
)
178 msg_ops
.progress(int(arg
), subunit
.PROGRESS_SET
)
180 msg_ops
.output_msg(l
)
183 test
= subunit
.RemotedTestCase(open_tests
.popitem()[1])
184 msg_ops
.addError(test
, subunit
.RemoteError(u
"was started but never finished!"))
185 statistics
['TESTS_ERROR']+=1
191 class SubunitOps(subunit
.TestProtocolClient
,TestsuiteEnabledTestResult
):
193 # The following are Samba extensions:
194 def start_testsuite(self
, name
):
195 self
._stream
.write("testsuite: %s\n" % name
)
197 def skip_testsuite(self
, name
, reason
=None):
199 self
._stream
.write("skip-testsuite: %s [\n%s\n]\n" % (name
, reason
))
201 self
._stream
.write("skip-testsuite: %s\n" % name
)
203 def end_testsuite(self
, name
, result
, reason
=None):
205 self
._stream
.write("testsuite-%s: %s [\n%s\n]\n" % (result
, name
, reason
))
207 self
._stream
.write("testsuite-%s: %s\n" % (result
, name
))
209 def output_msg(self
, msg
):
210 self
._stream
.write(msg
)
213 def read_test_regexes(name
):
219 if l
== "" or l
[0] == "#":
222 (regex
, reason
) = l
.split("#", 1)
223 ret
[regex
.strip()] = reason
.strip()
231 def find_in_list(regexes
, fullname
):
232 for regex
, reason
in regexes
.iteritems():
233 if re
.match(regex
, fullname
):
240 class ImmediateFail(Exception):
241 """Raised to abort immediately."""
244 super(ImmediateFail
, self
).__init
__("test failed and fail_immediately set")
247 class FilterOps(testtools
.testresult
.TestResult
):
249 def control_msg(self
, msg
):
250 pass # We regenerate control messages, so ignore this
252 def time(self
, time
):
255 def progress(self
, delta
, whence
):
256 self
._ops
.progress(delta
, whence
)
258 def output_msg(self
, msg
):
259 if self
.output
is None:
260 sys
.stdout
.write(msg
)
264 def startTest(self
, test
):
265 self
.seen_output
= True
266 test
= self
._add
_prefix
(test
)
267 if self
.strip_ok_output
:
270 self
._ops
.startTest(test
)
272 def _add_prefix(self
, test
):
275 if self
.prefix
is not None:
277 if self
.suffix
is not None:
280 return subunit
.RemotedTestCase(prefix
+ test
.id() + suffix
)
282 def addError(self
, test
, details
=None):
283 test
= self
._add
_prefix
(test
)
286 self
._ops
.addError(test
, details
)
288 if self
.fail_immediately
:
289 raise ImmediateFail()
291 def addSkip(self
, test
, details
=None):
292 self
.seen_output
= True
293 test
= self
._add
_prefix
(test
)
294 self
._ops
.addSkip(test
, details
)
297 def addExpectedFailure(self
, test
, details
=None):
298 test
= self
._add
_prefix
(test
)
299 self
._ops
.addExpectedFailure(test
, details
)
302 def addUnexpectedSuccess(self
, test
, details
=None):
303 test
= self
._add
_prefix
(test
)
304 self
.uxsuccess_added
+=1
305 self
.total_uxsuccess
+=1
306 self
._ops
.addUnexpectedSuccess(test
, details
)
308 self
._ops
.output_msg(self
.output
)
310 if self
.fail_immediately
:
311 raise ImmediateFail()
313 def addFailure(self
, test
, details
=None):
314 test
= self
._add
_prefix
(test
)
315 xfail_reason
= find_in_list(self
.expected_failures
, test
.id())
316 if xfail_reason
is None:
317 xfail_reason
= find_in_list(self
.flapping
, test
.id())
318 if xfail_reason
is not None:
321 if details
is not None:
322 details
= subunit
.RemoteError(unicode(details
[1]) + xfail_reason
.decode("utf-8"))
324 details
= subunit
.RemoteError(xfail_reason
.decode("utf-8"))
325 self
._ops
.addExpectedFailure(test
, details
)
329 self
._ops
.addFailure(test
, details
)
331 self
._ops
.output_msg(self
.output
)
332 if self
.fail_immediately
:
333 raise ImmediateFail()
336 def addSuccess(self
, test
, details
=None):
337 test
= self
._add
_prefix
(test
)
338 xfail_reason
= find_in_list(self
.expected_failures
, test
.id())
339 if xfail_reason
is not None:
340 self
.uxsuccess_added
+= 1
341 self
.total_uxsuccess
+= 1
344 details
['reason'] = content
.Content(
345 content_type
.ContentType("text", "plain",
346 {"charset": "utf8"}), lambda: xfail_reason
)
347 self
._ops
.addUnexpectedSuccess(test
, details
)
349 self
._ops
.output_msg(self
.output
)
350 if self
.fail_immediately
:
351 raise ImmediateFail()
353 self
._ops
.addSuccess(test
, details
)
356 def skip_testsuite(self
, name
, reason
=None):
357 self
._ops
.skip_testsuite(name
, reason
)
359 def start_testsuite(self
, name
):
360 self
._ops
.start_testsuite(name
)
364 self
.uxsuccess_added
= 0
366 def end_testsuite(self
, name
, result
, reason
=None):
369 if self
.xfail_added
> 0:
371 if self
.fail_added
> 0 or self
.error_added
> 0 or self
.uxsuccess_added
> 0:
374 if xfail
and result
in ("fail", "failure"):
377 if self
.uxsuccess_added
> 0 and result
!= "uxsuccess":
380 reason
= "Subunit/Filter Reason"
381 reason
+= "\n uxsuccess[%d]" % self
.uxsuccess_added
383 if self
.fail_added
> 0 and result
!= "failure":
386 reason
= "Subunit/Filter Reason"
387 reason
+= "\n failures[%d]" % self
.fail_added
389 if self
.error_added
> 0 and result
!= "error":
392 reason
= "Subunit/Filter Reason"
393 reason
+= "\n errors[%d]" % self
.error_added
395 self
._ops
.end_testsuite(name
, result
, reason
)
396 if result
not in ("success", "xfail"):
398 self
._ops
.output_msg(self
.output
)
399 if self
.fail_immediately
:
400 raise ImmediateFail()
403 def __init__(self
, out
, prefix
=None, suffix
=None, expected_failures
=None,
404 strip_ok_output
=False, fail_immediately
=False,
407 self
.seen_output
= False
411 if expected_failures
is not None:
412 self
.expected_failures
= expected_failures
414 self
.expected_failures
= {}
415 if flapping
is not None:
416 self
.flapping
= flapping
419 self
.strip_ok_output
= strip_ok_output
422 self
.uxsuccess_added
= 0
426 self
.total_uxsuccess
= 0
428 self
.fail_immediately
= fail_immediately
431 class PlainFormatter(TestsuiteEnabledTestResult
):
433 def __init__(self
, verbose
, immediate
, statistics
,
435 super(PlainFormatter
, self
).__init
__()
436 self
.verbose
= verbose
437 self
.immediate
= immediate
438 self
.statistics
= statistics
439 self
.start_time
= None
440 self
.test_output
= {}
441 self
.suitesfailed
= []
446 self
._progress
_level
= 0
447 self
.totalsuites
= totaltests
448 self
.last_time
= None
451 def _format_time(delta
):
452 minutes
, seconds
= divmod(delta
.seconds
, 60)
453 hours
, minutes
= divmod(minutes
, 60)
458 ret
+= "%dm" % minutes
459 ret
+= "%ds" % seconds
462 def progress(self
, offset
, whence
):
463 if whence
== subunit
.PROGRESS_POP
:
464 self
._progress
_level
-= 1
465 elif whence
== subunit
.PROGRESS_PUSH
:
466 self
._progress
_level
+= 1
467 elif whence
== subunit
.PROGRESS_SET
:
468 if self
._progress
_level
== 0:
469 self
.totalsuites
= offset
470 elif whence
== subunit
.PROGRESS_CUR
:
471 raise NotImplementedError
474 if self
.start_time
is None:
478 def start_testsuite(self
, name
):
483 self
.test_output
[name
] = ""
485 out
= "[%d" % self
.index
486 if self
.totalsuites
is not None:
487 out
+= "/%d" % self
.totalsuites
488 if self
.start_time
is not None:
489 out
+= " in " + self
._format
_time
(self
.last_time
- self
.start_time
)
490 if self
.suitesfailed
:
491 out
+= ", %d errors" % (len(self
.suitesfailed
),)
494 sys
.stdout
.write(out
+ "\n")
496 sys
.stdout
.write(out
+ ": ")
498 def output_msg(self
, output
):
500 sys
.stdout
.write(output
)
501 elif self
.name
is not None:
502 self
.test_output
[self
.name
] += output
504 sys
.stdout
.write(output
)
506 def control_msg(self
, output
):
509 def end_testsuite(self
, name
, result
, reason
):
513 if not name
in self
.test_output
:
514 print "no output for name[%s]" % name
516 if result
in ("success", "xfail"):
519 self
.output_msg("ERROR: Testsuite[%s]\n" % name
)
520 if reason
is not None:
521 self
.output_msg("REASON: %s\n" % (reason
,))
522 self
.suitesfailed
.append(name
)
523 if self
.immediate
and not self
.verbose
and name
in self
.test_output
:
524 out
+= self
.test_output
[name
]
527 if not self
.immediate
:
531 out
+= " " + result
.upper() + "\n"
533 sys
.stdout
.write(out
)
535 def startTest(self
, test
):
538 def addSuccess(self
, test
):
539 self
.end_test(test
.id(), "success", False)
541 def addError(self
, test
, details
=None):
542 self
.end_test(test
.id(), "error", True, details
)
544 def addFailure(self
, test
, details
=None):
545 self
.end_test(test
.id(), "failure", True, details
)
547 def addSkip(self
, test
, details
=None):
548 self
.end_test(test
.id(), "skip", False, details
)
550 def addExpectedFailure(self
, test
, details
=None):
551 self
.end_test(test
.id(), "xfail", False, details
)
553 def addUnexpectedSuccess(self
, test
, details
=None):
554 self
.end_test(test
.id(), "uxsuccess", True, details
)
556 def end_test(self
, testname
, result
, unexpected
, details
=None):
558 self
.test_output
[self
.name
] = ""
559 if not self
.immediate
:
564 'success': '.'}.get(result
, "?(%s)" % result
))
567 if not self
.name
in self
.test_output
:
568 self
.test_output
[self
.name
] = ""
570 self
.test_output
[self
.name
] += "UNEXPECTED(%s): %s\n" % (result
, testname
)
571 if details
is not None:
572 self
.test_output
[self
.name
] += "REASON: %s\n" % (unicode(details
[1]).encode("utf-8").strip(),)
574 if self
.immediate
and not self
.verbose
:
575 sys
.stdout
.write(self
.test_output
[self
.name
])
576 self
.test_output
[self
.name
] = ""
578 if not self
.immediate
:
583 'success': 'S'}.get(result
, "?"))
585 def write_summary(self
, path
):
588 if self
.suitesfailed
:
589 f
.write("= Failed tests =\n")
591 for suite
in self
.suitesfailed
:
592 f
.write("== %s ==\n" % suite
)
593 if suite
in self
.test_output
:
594 f
.write(self
.test_output
[suite
]+"\n\n")
598 if not self
.immediate
and not self
.verbose
:
599 for suite
in self
.suitesfailed
:
601 print "FAIL: %s" % suite
602 if suite
in self
.test_output
:
603 print self
.test_output
[suite
]
606 f
.write("= Skipped tests =\n")
607 for reason
in self
.skips
.keys():
608 f
.write(reason
+ "\n")
609 for name
in self
.skips
[reason
]:
610 f
.write("\t%s\n" % name
)
614 if (not self
.suitesfailed
and
615 not self
.statistics
['TESTS_UNEXPECTED_FAIL'] and
616 not self
.statistics
['TESTS_UNEXPECTED_OK'] and
617 not self
.statistics
['TESTS_ERROR']):
618 ok
= (self
.statistics
['TESTS_EXPECTED_OK'] +
619 self
.statistics
['TESTS_EXPECTED_FAIL'])
620 print "\nALL OK (%d tests in %d testsuites)" % (ok
, self
.suites_ok
)
622 print "\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
623 self
.statistics
['TESTS_UNEXPECTED_FAIL'],
624 self
.statistics
['TESTS_ERROR'],
625 self
.statistics
['TESTS_UNEXPECTED_OK'],
626 len(self
.suitesfailed
))
628 def skip_testsuite(self
, name
, reason
="UNKNOWN"):
629 self
.skips
.setdefault(reason
, []).append(name
)