1 # Python module for parsing and generating the Subunit protocol
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 __all__
= ['parse_results']
23 import subunit
.iso8601
26 VALID_RESULTS
= ['success', 'successful', 'failure', 'fail', 'skip', 'knownfail', 'error', 'xfail', 'skip-testsuite', 'testsuite-failure', 'testsuite-xfail', 'testsuite-success', 'testsuite-error']
28 class TestsuiteEnabledTestResult(testtools
.testresult
.TestResult
):
30 def start_testsuite(self
, name
):
31 raise NotImplementedError(self
.start_testsuite
)
34 def parse_results(msg_ops
, statistics
, fh
):
42 parts
= l
.split(None, 1)
43 if not len(parts
) == 2 or not l
.startswith(parts
[0]):
46 command
= parts
[0].rstrip(":")
48 if command
in ("test", "testing"):
49 msg_ops
.control_msg(l
)
51 test
= subunit
.RemotedTestCase(name
)
52 if name
in open_tests
:
53 msg_ops
.addError(open_tests
.pop(name
), subunit
.RemoteError(u
"Test already running"))
54 msg_ops
.startTest(test
)
55 open_tests
[name
] = test
56 elif command
== "time":
57 msg_ops
.control_msg(l
)
59 dt
= subunit
.iso8601
.parse_date(arg
.rstrip("\n"))
61 print "Unable to parse time line: %s" % arg
.rstrip("\n")
64 elif command
in VALID_RESULTS
:
65 msg_ops
.control_msg(l
)
67 grp
= re
.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg
)
68 (testname
, hasreason
) = (grp
.group(1), grp
.group(2))
71 # reason may be specified in next lines
77 msg_ops
.control_msg(l
)
84 remote_error
= subunit
.RemoteError(reason
.decode("utf-8"))
87 statistics
['TESTS_ERROR']+=1
88 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"reason (%s) interrupted" % result
))
92 remote_error
= subunit
.RemoteError(u
"No reason specified")
93 if result
in ("success", "successful"):
95 test
= open_tests
.pop(testname
)
97 statistics
['TESTS_ERROR']+=1
98 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
100 statistics
['TESTS_EXPECTED_OK']+=1
101 msg_ops
.addSuccess(test
)
102 elif result
in ("xfail", "knownfail"):
104 test
= open_tests
.pop(testname
)
106 statistics
['TESTS_ERROR']+=1
107 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
109 statistics
['TESTS_EXPECTED_FAIL']+=1
110 msg_ops
.addExpectedFailure(test
, remote_error
)
112 elif result
in ("failure", "fail"):
114 test
= open_tests
.pop(testname
)
116 statistics
['TESTS_ERROR']+=1
117 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
119 statistics
['TESTS_UNEXPECTED_FAIL']+=1
120 msg_ops
.addFailure(test
, remote_error
)
121 elif result
== "skip":
122 statistics
['TESTS_SKIP']+=1
123 # Allow tests to be skipped without prior announcement of test
125 test
= open_tests
.pop(testname
)
127 test
= subunit
.RemotedTestCase(testname
)
128 msg_ops
.addSkip(test
, reason
)
129 elif result
== "error":
130 statistics
['TESTS_ERROR']+=1
132 test
= open_tests
.pop(testname
)
134 test
= subunit
.RemotedTestCase(testname
)
135 msg_ops
.addError(test
, remote_error
)
136 elif result
== "skip-testsuite":
137 msg_ops
.skip_testsuite(testname
)
138 elif result
== "testsuite-success":
139 msg_ops
.end_testsuite(testname
, "success", reason
)
140 elif result
== "testsuite-failure":
141 msg_ops
.end_testsuite(testname
, "failure", reason
)
142 elif result
== "testsuite-xfail":
143 msg_ops
.end_testsuite(testname
, "xfail", reason
)
144 elif result
== "testsuite-error":
145 msg_ops
.end_testsuite(testname
, "error", reason
)
147 raise AssertionError("Recognized but unhandled result %r" %
149 elif command
== "testsuite":
150 msg_ops
.start_testsuite(arg
.strip())
151 elif command
== "progress":
154 msg_ops
.progress(None, subunit
.PROGRESS_POP
)
156 msg_ops
.progress(None, subunit
.PROGRESS_PUSH
)
158 msg_ops
.progress(int(arg
), subunit
.PROGRESS_CUR
)
160 msg_ops
.progress(int(arg
), subunit
.PROGRESS_SET
)
162 msg_ops
.output_msg(l
)
165 test
= subunit
.RemotedTestCase(open_tests
.popitem()[1])
166 msg_ops
.addError(test
, subunit
.RemoteError(u
"was started but never finished!"))
167 statistics
['TESTS_ERROR']+=1
169 if statistics
['TESTS_ERROR'] > 0:
171 if statistics
['TESTS_UNEXPECTED_FAIL'] > 0:
176 class SubunitOps(subunit
.TestProtocolClient
,TestsuiteEnabledTestResult
):
178 # The following are Samba extensions:
179 def start_testsuite(self
, name
):
180 self
._stream
.write("testsuite: %s\n" % name
)
182 def skip_testsuite(self
, name
, reason
=None):
184 self
._stream
.write("skip-testsuite: %s [\n%s\n]\n" % (name
, reason
))
186 self
._stream
.write("skip-testsuite: %s\n" % name
)
188 def end_testsuite(self
, name
, result
, reason
=None):
190 self
._stream
.write("testsuite-%s: %s [\n%s\n]\n" % (result
, name
, reason
))
192 self
._stream
.write("testsuite-%s: %s\n" % (result
, name
))
194 def output_msg(self
, msg
):
195 self
._stream
.write(msg
)
198 def read_test_regexes(name
):
204 if l
== "" or l
[0] == "#":
207 (regex
, reason
) = l
.split("#", 1)
208 ret
[regex
.strip()] = reason
.strip()
216 def find_in_list(regexes
, fullname
):
217 for regex
, reason
in regexes
.iteritems():
218 if re
.match(regex
, fullname
):
225 class ImmediateFail(Exception):
226 """Raised to abort immediately."""
229 super(ImmediateFail
, self
).__init
__("test failed and fail_immediately set")
232 class FilterOps(testtools
.testresult
.TestResult
):
234 def control_msg(self
, msg
):
235 pass # We regenerate control messages, so ignore this
237 def time(self
, time
):
240 def progress(self
, delta
, whence
):
241 self
._ops
.progress(delta
, whence
)
243 def output_msg(self
, msg
):
244 if self
.output
is None:
245 sys
.stdout
.write(msg
)
249 def startTest(self
, test
):
250 test
= self
._add
_prefix
(test
)
251 if self
.strip_ok_output
:
254 self
._ops
.startTest(test
)
256 def _add_prefix(self
, test
):
257 if self
.prefix
is not None:
258 return subunit
.RemotedTestCase(self
.prefix
+ test
.id())
262 def addError(self
, test
, details
=None):
263 test
= self
._add
_prefix
(test
)
266 self
._ops
.addError(test
, details
)
268 if self
.fail_immediately
:
269 raise ImmediateFail()
271 def addSkip(self
, test
, details
=None):
272 test
= self
._add
_prefix
(test
)
273 self
._ops
.addSkip(test
, details
)
276 def addExpectedFailure(self
, test
, details
=None):
277 test
= self
._add
_prefix
(test
)
278 self
._ops
.addExpectedFailure(test
, details
)
281 def addFailure(self
, test
, details
=None):
282 test
= self
._add
_prefix
(test
)
283 xfail_reason
= find_in_list(self
.expected_failures
, test
.id())
284 if xfail_reason
is not None:
287 if details
is not None:
288 details
= subunit
.RemoteError(unicode(details
[1]) + xfail_reason
.decode("utf-8"))
290 details
= subunit
.RemoteError(xfail_reason
.decode("utf-8"))
291 self
._ops
.addExpectedFailure(test
, details
)
295 self
._ops
.addFailure(test
, details
)
297 self
._ops
.output_msg(self
.output
)
298 if self
.fail_immediately
:
299 raise ImmediateFail()
302 def addSuccess(self
, test
, details
=None):
303 test
= self
._add
_prefix
(test
)
304 self
._ops
.addSuccess(test
, details
)
307 def skip_testsuite(self
, name
, reason
=None):
308 self
._ops
.skip_testsuite(name
, reason
)
310 def start_testsuite(self
, name
):
311 self
._ops
.start_testsuite(name
)
316 def end_testsuite(self
, name
, result
, reason
=None):
319 if self
.xfail_added
> 0:
321 if self
.fail_added
> 0 or self
.error_added
> 0:
324 if xfail
and result
in ("fail", "failure"):
327 if self
.fail_added
> 0 and result
!= "failure":
330 reason
= "Subunit/Filter Reason"
331 reason
+= "\n failures[%d]" % self
.fail_added
333 if self
.error_added
> 0 and result
!= "error":
336 reason
= "Subunit/Filter Reason"
337 reason
+= "\n errors[%d]" % self
.error_added
339 self
._ops
.end_testsuite(name
, result
, reason
)
341 def __init__(self
, out
, prefix
, expected_failures
, strip_ok_output
, fail_immediately
=False):
345 self
.expected_failures
= expected_failures
346 self
.strip_ok_output
= strip_ok_output
353 self
.fail_immediately
= fail_immediately
356 class PlainFormatter(TestsuiteEnabledTestResult
):
358 def __init__(self
, summaryfile
, verbose
, immediate
, statistics
,
360 super(PlainFormatter
, self
).__init
__()
361 self
.verbose
= verbose
362 self
.immediate
= immediate
363 self
.statistics
= statistics
364 self
.start_time
= None
365 self
.test_output
= {}
366 self
.suitesfailed
= []
369 self
.summaryfile
= summaryfile
372 self
._progress
_level
= 0
373 self
.totalsuites
= totaltests
374 self
.last_time
= None
377 def _format_time(delta
):
378 minutes
, seconds
= divmod(delta
.seconds
, 60)
379 hours
, minutes
= divmod(minutes
, 60)
384 ret
+= "%dm" % minutes
385 ret
+= "%ds" % seconds
388 def progress(self
, offset
, whence
):
389 if whence
== subunit
.PROGRESS_POP
:
390 self
._progress
_level
-= 1
391 elif whence
== subunit
.PROGRESS_PUSH
:
392 self
._progress
_level
+= 1
393 elif whence
== subunit
.PROGRESS_SET
:
394 if self
._progress
_level
== 0:
395 self
.totalsuites
= offset
396 elif whence
== subunit
.PROGRESS_CUR
:
397 raise NotImplementedError
400 if self
.start_time
is None:
404 def start_testsuite(self
, name
):
409 self
.test_output
[name
] = ""
411 out
= "[%d" % self
.index
412 if self
.totalsuites
is not None:
413 out
+= "/%d" % self
.totalsuites
414 if self
.start_time
is not None:
415 out
+= " in " + self
._format
_time
(self
.last_time
- self
.start_time
)
416 if self
.suitesfailed
:
417 out
+= ", %d errors" % (len(self
.suitesfailed
),)
420 sys
.stdout
.write(out
+ "\n")
422 sys
.stdout
.write(out
+ ": ")
424 def output_msg(self
, output
):
426 sys
.stdout
.write(output
)
427 elif self
.name
is not None:
428 self
.test_output
[self
.name
] += output
430 sys
.stdout
.write(output
)
432 def control_msg(self
, output
):
435 def end_testsuite(self
, name
, result
, reason
):
439 if not name
in self
.test_output
:
440 print "no output for name[%s]" % name
442 if result
in ("success", "xfail"):
445 self
.output_msg("ERROR: Testsuite[%s]\n" % name
)
446 if reason
is not None:
447 self
.output_msg("REASON: %s\n" % (reason
,))
448 self
.suitesfailed
.append(name
)
449 if self
.immediate
and not self
.verbose
and name
in self
.test_output
:
450 out
+= self
.test_output
[name
]
453 if not self
.immediate
:
457 out
+= " " + result
.upper() + "\n"
459 sys
.stdout
.write(out
)
461 def startTest(self
, test
):
464 def addSuccess(self
, test
):
465 self
.end_test(test
.id(), "success", False)
467 def addError(self
, test
, details
=None):
468 self
.end_test(test
.id(), "error", True, details
)
470 def addFailure(self
, test
, details
=None):
471 self
.end_test(test
.id(), "failure", True, details
)
473 def addSkip(self
, test
, details
=None):
474 self
.end_test(test
.id(), "skip", False, details
)
476 def addExpectedFail(self
, test
, details
=None):
477 self
.end_test(test
.id(), "xfail", False, details
)
479 def end_test(self
, testname
, result
, unexpected
, reason
=None):
481 self
.test_output
[self
.name
] = ""
482 if not self
.immediate
:
487 'success': '.'}.get(result
, "?(%s)" % result
))
490 if not self
.name
in self
.test_output
:
491 self
.test_output
[self
.name
] = ""
493 self
.test_output
[self
.name
] += "UNEXPECTED(%s): %s\n" % (result
, testname
)
494 if reason
is not None:
495 self
.test_output
[self
.name
] += "REASON: %s\n" % (reason
[1].message
.encode("utf-8").strip(),)
497 if self
.immediate
and not self
.verbose
:
498 print self
.test_output
[self
.name
]
499 self
.test_output
[self
.name
] = ""
501 if not self
.immediate
:
505 'success': 'S'}.get(result
, "?"))
508 f
= open(self
.summaryfile
, 'w+')
510 if self
.suitesfailed
:
511 f
.write("= Failed tests =\n")
513 for suite
in self
.suitesfailed
:
514 f
.write("== %s ==\n" % suite
)
515 if suite
in self
.test_output
:
516 f
.write(self
.test_output
[suite
]+"\n\n")
520 if not self
.immediate
and not self
.verbose
:
521 for suite
in self
.suitesfailed
:
523 print "FAIL: %s" % suite
524 if suite
in self
.test_output
:
525 print self
.test_output
[suite
]
528 f
.write("= Skipped tests =\n")
529 for reason
in self
.skips
.keys():
530 f
.write(reason
+ "\n")
531 for name
in self
.skips
[reason
]:
532 f
.write("\t%s\n" % name
)
536 print "\nA summary with detailed information can be found in:"
537 print " %s" % self
.summaryfile
539 if (not self
.suitesfailed
and
540 not self
.statistics
['TESTS_UNEXPECTED_FAIL'] and
541 not self
.statistics
['TESTS_ERROR']):
542 ok
= (self
.statistics
['TESTS_EXPECTED_OK'] +
543 self
.statistics
['TESTS_EXPECTED_FAIL'])
544 print "\nALL OK (%d tests in %d testsuites)" % (ok
, self
.suites_ok
)
546 print "\nFAILED (%d failures and %d errors in %d testsuites)" % (
547 self
.statistics
['TESTS_UNEXPECTED_FAIL'],
548 self
.statistics
['TESTS_ERROR'],
549 len(self
.suitesfailed
))
551 def skip_testsuite(self
, name
, reason
="UNKNOWN"):
552 self
.skips
.setdefault(reason
, []).append(name
)