1 # Python module for parsing and generating the Subunit protocol
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 __all__
= ['parse_results']
24 from samba
import subunit
25 from samba
.subunit
.run
import TestProtocolClient
26 from samba
.subunit
import iso8601
29 VALID_RESULTS
= set(['success', 'successful', 'failure', 'fail', 'skip',
30 'knownfail', 'error', 'xfail', 'skip-testsuite',
31 'testsuite-failure', 'testsuite-xfail',
32 'testsuite-success', 'testsuite-error',
33 'uxsuccess', 'testsuite-uxsuccess'])
35 class TestsuiteEnabledTestResult(unittest
.TestResult
):
37 def start_testsuite(self
, name
):
38 raise NotImplementedError(self
.start_testsuite
)
41 def parse_results(msg_ops
, statistics
, fh
):
49 parts
= l
.split(None, 1)
50 if not len(parts
) == 2 or not l
.startswith(parts
[0]):
53 command
= parts
[0].rstrip(":")
55 if command
in ("test", "testing"):
56 msg_ops
.control_msg(l
)
58 test
= subunit
.RemotedTestCase(name
)
59 if name
in open_tests
:
60 msg_ops
.addError(open_tests
.pop(name
), subunit
.RemoteError(u
"Test already running"))
61 msg_ops
.startTest(test
)
62 open_tests
[name
] = test
63 elif command
== "time":
64 msg_ops
.control_msg(l
)
66 dt
= iso8601
.parse_date(arg
.rstrip("\n"))
67 except TypeError as e
:
68 print "Unable to parse time line: %s" % arg
.rstrip("\n")
71 elif command
in VALID_RESULTS
:
72 msg_ops
.control_msg(l
)
74 grp
= re
.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg
)
75 (testname
, hasreason
) = (grp
.group(1), grp
.group(2))
78 # reason may be specified in next lines
84 msg_ops
.control_msg(l
)
92 remote_error
= subunit
.RemoteError(reason
.decode("utf-8"))
95 statistics
['TESTS_ERROR']+=1
96 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"reason (%s) interrupted" % result
))
100 remote_error
= subunit
.RemoteError(u
"No reason specified")
101 if result
in ("success", "successful"):
103 test
= open_tests
.pop(testname
)
105 statistics
['TESTS_ERROR']+=1
107 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
109 statistics
['TESTS_EXPECTED_OK']+=1
110 msg_ops
.addSuccess(test
)
111 elif result
in ("xfail", "knownfail"):
113 test
= open_tests
.pop(testname
)
115 statistics
['TESTS_ERROR']+=1
117 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
119 statistics
['TESTS_EXPECTED_FAIL']+=1
120 msg_ops
.addExpectedFailure(test
, remote_error
)
121 elif result
in ("uxsuccess", ):
123 test
= open_tests
.pop(testname
)
125 statistics
['TESTS_ERROR']+=1
127 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
129 statistics
['TESTS_UNEXPECTED_OK']+=1
130 msg_ops
.addUnexpectedSuccess(test
)
132 elif result
in ("failure", "fail"):
134 test
= open_tests
.pop(testname
)
136 statistics
['TESTS_ERROR']+=1
138 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
140 statistics
['TESTS_UNEXPECTED_FAIL']+=1
142 msg_ops
.addFailure(test
, remote_error
)
143 elif result
== "skip":
144 statistics
['TESTS_SKIP']+=1
145 # Allow tests to be skipped without prior announcement of test
147 test
= open_tests
.pop(testname
)
149 test
= subunit
.RemotedTestCase(testname
)
150 msg_ops
.addSkip(test
, reason
)
151 elif result
== "error":
152 statistics
['TESTS_ERROR']+=1
155 test
= open_tests
.pop(testname
)
157 test
= subunit
.RemotedTestCase(testname
)
158 msg_ops
.addError(test
, remote_error
)
159 elif result
== "skip-testsuite":
160 msg_ops
.skip_testsuite(testname
)
161 elif result
== "testsuite-success":
162 msg_ops
.end_testsuite(testname
, "success", reason
)
163 elif result
== "testsuite-failure":
164 msg_ops
.end_testsuite(testname
, "failure", reason
)
166 elif result
== "testsuite-xfail":
167 msg_ops
.end_testsuite(testname
, "xfail", reason
)
168 elif result
== "testsuite-uxsuccess":
169 msg_ops
.end_testsuite(testname
, "uxsuccess", reason
)
171 elif result
== "testsuite-error":
172 msg_ops
.end_testsuite(testname
, "error", reason
)
175 raise AssertionError("Recognized but unhandled result %r" %
177 elif command
== "testsuite":
178 msg_ops
.start_testsuite(arg
.strip())
179 elif command
== "progress":
182 msg_ops
.progress(None, subunit
.PROGRESS_POP
)
184 msg_ops
.progress(None, subunit
.PROGRESS_PUSH
)
186 msg_ops
.progress(int(arg
), subunit
.PROGRESS_CUR
)
188 msg_ops
.progress(int(arg
), subunit
.PROGRESS_SET
)
190 msg_ops
.output_msg(l
)
193 test
= subunit
.RemotedTestCase(open_tests
.popitem()[1])
194 msg_ops
.addError(test
, subunit
.RemoteError(u
"was started but never finished!"))
195 statistics
['TESTS_ERROR']+=1
201 class SubunitOps(TestProtocolClient
,TestsuiteEnabledTestResult
):
203 def progress(self
, count
, whence
):
204 if whence
== subunit
.PROGRESS_POP
:
205 self
._stream
.write("progress: pop\n")
206 elif whence
== subunit
.PROGRESS_PUSH
:
207 self
._stream
.write("progress: push\n")
208 elif whence
== subunit
.PROGRESS_SET
:
209 self
._stream
.write("progress: %d\n" % count
)
210 elif whence
== subunit
.PROGRESS_CUR
:
211 raise NotImplementedError
213 # The following are Samba extensions:
214 def start_testsuite(self
, name
):
215 self
._stream
.write("testsuite: %s\n" % name
)
217 def skip_testsuite(self
, name
, reason
=None):
219 self
._stream
.write("skip-testsuite: %s [\n%s\n]\n" % (name
, reason
))
221 self
._stream
.write("skip-testsuite: %s\n" % name
)
223 def end_testsuite(self
, name
, result
, reason
=None):
225 self
._stream
.write("testsuite-%s: %s [\n%s\n]\n" % (result
, name
, reason
))
227 self
._stream
.write("testsuite-%s: %s\n" % (result
, name
))
229 def output_msg(self
, msg
):
230 self
._stream
.write(msg
)
233 def read_test_regexes(*names
):
237 # if we are given a directory, we read all the files it contains
238 # (except the ones that end with "~").
239 if os
.path
.isdir(name
):
240 files
.extend([os
.path
.join(name
, x
)
241 for x
in os
.listdir(name
)
246 for filename
in files
:
247 f
= open(filename
, 'r')
251 if l
== "" or l
[0] == "#":
254 (regex
, reason
) = l
.split("#", 1)
255 ret
[regex
.strip()] = reason
.strip()
263 def find_in_list(regexes
, fullname
):
264 for regex
, reason
in regexes
.iteritems():
265 if re
.match(regex
, fullname
):
272 class ImmediateFail(Exception):
273 """Raised to abort immediately."""
276 super(ImmediateFail
, self
).__init
__("test failed and fail_immediately set")
279 class FilterOps(unittest
.TestResult
):
281 def control_msg(self
, msg
):
282 pass # We regenerate control messages, so ignore this
284 def time(self
, time
):
287 def progress(self
, delta
, whence
):
288 self
._ops
.progress(delta
, whence
)
290 def output_msg(self
, msg
):
291 if self
.output
is None:
292 sys
.stdout
.write(msg
)
296 def startTest(self
, test
):
297 self
.seen_output
= True
298 test
= self
._add
_prefix
(test
)
299 if self
.strip_ok_output
:
302 self
._ops
.startTest(test
)
304 def _add_prefix(self
, test
):
305 return subunit
.RemotedTestCase(self
.prefix
+ test
.id() + self
.suffix
)
307 def addError(self
, test
, err
=None):
308 test
= self
._add
_prefix
(test
)
311 self
._ops
.addError(test
, err
)
313 if self
.fail_immediately
:
314 raise ImmediateFail()
316 def addSkip(self
, test
, reason
=None):
317 self
.seen_output
= True
318 test
= self
._add
_prefix
(test
)
319 self
._ops
.addSkip(test
, reason
)
322 def addExpectedFailure(self
, test
, err
=None):
323 test
= self
._add
_prefix
(test
)
324 self
._ops
.addExpectedFailure(test
, err
)
327 def addUnexpectedSuccess(self
, test
):
328 test
= self
._add
_prefix
(test
)
329 self
.uxsuccess_added
+=1
330 self
.total_uxsuccess
+=1
331 self
._ops
.addUnexpectedSuccess(test
)
333 self
._ops
.output_msg(self
.output
)
335 if self
.fail_immediately
:
336 raise ImmediateFail()
338 def addFailure(self
, test
, err
=None):
339 test
= self
._add
_prefix
(test
)
340 xfail_reason
= find_in_list(self
.expected_failures
, test
.id())
341 if xfail_reason
is None:
342 xfail_reason
= find_in_list(self
.flapping
, test
.id())
343 if xfail_reason
is not None:
346 self
._ops
.addExpectedFailure(test
, err
)
350 self
._ops
.addFailure(test
, err
)
352 self
._ops
.output_msg(self
.output
)
353 if self
.fail_immediately
:
354 raise ImmediateFail()
357 def addSuccess(self
, test
):
358 test
= self
._add
_prefix
(test
)
359 xfail_reason
= find_in_list(self
.expected_failures
, test
.id())
360 if xfail_reason
is not None:
361 self
.uxsuccess_added
+= 1
362 self
.total_uxsuccess
+= 1
363 self
._ops
.addUnexpectedSuccess(test
)
365 self
._ops
.output_msg(self
.output
)
366 if self
.fail_immediately
:
367 raise ImmediateFail()
369 self
._ops
.addSuccess(test
)
372 def skip_testsuite(self
, name
, reason
=None):
373 self
._ops
.skip_testsuite(name
, reason
)
375 def start_testsuite(self
, name
):
376 self
._ops
.start_testsuite(name
)
380 self
.uxsuccess_added
= 0
382 def end_testsuite(self
, name
, result
, reason
=None):
385 if self
.xfail_added
> 0:
387 if self
.fail_added
> 0 or self
.error_added
> 0 or self
.uxsuccess_added
> 0:
390 if xfail
and result
in ("fail", "failure"):
393 if self
.uxsuccess_added
> 0 and result
!= "uxsuccess":
396 reason
= "Subunit/Filter Reason"
397 reason
+= "\n uxsuccess[%d]" % self
.uxsuccess_added
399 if self
.fail_added
> 0 and result
!= "failure":
402 reason
= "Subunit/Filter Reason"
403 reason
+= "\n failures[%d]" % self
.fail_added
405 if self
.error_added
> 0 and result
!= "error":
408 reason
= "Subunit/Filter Reason"
409 reason
+= "\n errors[%d]" % self
.error_added
411 self
._ops
.end_testsuite(name
, result
, reason
)
412 if result
not in ("success", "xfail"):
414 self
._ops
.output_msg(self
.output
)
415 if self
.fail_immediately
:
416 raise ImmediateFail()
419 def __init__(self
, out
, prefix
=None, suffix
=None, expected_failures
=None,
420 strip_ok_output
=False, fail_immediately
=False,
423 self
.seen_output
= False
427 if expected_failures
is not None:
428 self
.expected_failures
= expected_failures
430 self
.expected_failures
= {}
431 if flapping
is not None:
432 self
.flapping
= flapping
435 self
.strip_ok_output
= strip_ok_output
438 self
.uxsuccess_added
= 0
442 self
.total_uxsuccess
= 0
444 self
.fail_immediately
= fail_immediately
447 class PerfFilterOps(unittest
.TestResult
):
449 def progress(self
, delta
, whence
):
452 def output_msg(self
, msg
):
455 def control_msg(self
, msg
):
458 def skip_testsuite(self
, name
, reason
=None):
459 self
._ops
.skip_testsuite(name
, reason
)
461 def start_testsuite(self
, name
):
462 self
.suite_has_time
= False
464 def end_testsuite(self
, name
, result
, reason
=None):
467 def _add_prefix(self
, test
):
468 return subunit
.RemotedTestCase(self
.prefix
+ test
.id() + self
.suffix
)
470 def time(self
, time
):
471 self
.latest_time
= time
472 #self._ops.output_msg("found time %s\n" % time)
473 self
.suite_has_time
= True
476 if self
.suite_has_time
:
477 return self
.latest_time
478 return datetime
.datetime
.utcnow()
480 def startTest(self
, test
):
481 self
.seen_output
= True
482 test
= self
._add
_prefix
(test
)
483 self
.starts
[test
.id()] = self
.get_time()
485 def addSuccess(self
, test
):
486 test
= self
._add
_prefix
(test
)
488 if tid
not in self
.starts
:
489 self
._ops
.addError(test
, "%s succeeded without ever starting!" % tid
)
490 delta
= self
.get_time() - self
.starts
[tid
]
491 self
._ops
.output_msg("elapsed-time: %s: %f\n" % (tid
, delta
.total_seconds()))
493 def addFailure(self
, test
, err
=''):
495 delta
= self
.get_time() - self
.starts
[tid
]
496 self
._ops
.output_msg("failure: %s failed after %f seconds (%s)\n" %
497 (tid
, delta
.total_seconds(), err
))
499 def addError(self
, test
, err
=''):
501 delta
= self
.get_time() - self
.starts
[tid
]
502 self
._ops
.output_msg("error: %s failed after %f seconds (%s)\n" %
503 (tid
, delta
.total_seconds(), err
))
505 def __init__(self
, out
, prefix
='', suffix
=''):
507 self
.prefix
= prefix
or ''
508 self
.suffix
= suffix
or ''
510 self
.seen_output
= False
511 self
.suite_has_time
= False
514 class PlainFormatter(TestsuiteEnabledTestResult
):
516 def __init__(self
, verbose
, immediate
, statistics
,
518 super(PlainFormatter
, self
).__init
__()
519 self
.verbose
= verbose
520 self
.immediate
= immediate
521 self
.statistics
= statistics
522 self
.start_time
= None
523 self
.test_output
= {}
524 self
.suitesfailed
= []
529 self
._progress
_level
= 0
530 self
.totalsuites
= totaltests
531 self
.last_time
= None
534 def _format_time(delta
):
535 minutes
, seconds
= divmod(delta
.seconds
, 60)
536 hours
, minutes
= divmod(minutes
, 60)
541 ret
+= "%dm" % minutes
542 ret
+= "%ds" % seconds
545 def progress(self
, offset
, whence
):
546 if whence
== subunit
.PROGRESS_POP
:
547 self
._progress
_level
-= 1
548 elif whence
== subunit
.PROGRESS_PUSH
:
549 self
._progress
_level
+= 1
550 elif whence
== subunit
.PROGRESS_SET
:
551 if self
._progress
_level
== 0:
552 self
.totalsuites
= offset
553 elif whence
== subunit
.PROGRESS_CUR
:
554 raise NotImplementedError
557 if self
.start_time
is None:
561 def start_testsuite(self
, name
):
566 self
.test_output
[name
] = ""
568 total_tests
= (self
.statistics
['TESTS_EXPECTED_OK'] +
569 self
.statistics
['TESTS_EXPECTED_FAIL'] +
570 self
.statistics
['TESTS_ERROR'] +
571 self
.statistics
['TESTS_UNEXPECTED_FAIL'] +
572 self
.statistics
['TESTS_UNEXPECTED_OK'])
574 out
= "[%d(%d)" % (self
.index
, total_tests
)
575 if self
.totalsuites
is not None:
576 out
+= "/%d" % self
.totalsuites
577 if self
.start_time
is not None:
578 out
+= " at " + self
._format
_time
(self
.last_time
- self
.start_time
)
579 if self
.suitesfailed
:
580 out
+= ", %d errors" % (len(self
.suitesfailed
),)
583 sys
.stdout
.write(out
+ "\n")
585 sys
.stdout
.write(out
+ ": ")
587 def output_msg(self
, output
):
589 sys
.stdout
.write(output
)
590 elif self
.name
is not None:
591 self
.test_output
[self
.name
] += output
593 sys
.stdout
.write(output
)
595 def control_msg(self
, output
):
598 def end_testsuite(self
, name
, result
, reason
):
602 if not name
in self
.test_output
:
603 print "no output for name[%s]" % name
605 if result
in ("success", "xfail"):
608 self
.output_msg("ERROR: Testsuite[%s]\n" % name
)
609 if reason
is not None:
610 self
.output_msg("REASON: %s\n" % (reason
,))
611 self
.suitesfailed
.append(name
)
612 if self
.immediate
and not self
.verbose
and name
in self
.test_output
:
613 out
+= self
.test_output
[name
]
616 if not self
.immediate
:
620 out
+= " " + result
.upper() + "\n"
622 sys
.stdout
.write(out
)
624 def startTest(self
, test
):
627 def addSuccess(self
, test
):
628 self
.end_test(test
.id(), "success", False)
630 def addError(self
, test
, err
=None):
631 self
.end_test(test
.id(), "error", True, err
)
633 def addFailure(self
, test
, err
=None):
634 self
.end_test(test
.id(), "failure", True, err
)
636 def addSkip(self
, test
, reason
=None):
637 self
.end_test(test
.id(), "skip", False, reason
)
639 def addExpectedFailure(self
, test
, err
=None):
640 self
.end_test(test
.id(), "xfail", False, err
)
642 def addUnexpectedSuccess(self
, test
):
643 self
.end_test(test
.id(), "uxsuccess", True)
645 def end_test(self
, testname
, result
, unexpected
, err
=None):
647 self
.test_output
[self
.name
] = ""
648 if not self
.immediate
:
653 'success': '.'}.get(result
, "?(%s)" % result
))
656 if not self
.name
in self
.test_output
:
657 self
.test_output
[self
.name
] = ""
659 self
.test_output
[self
.name
] += "UNEXPECTED(%s): %s\n" % (result
, testname
)
661 self
.test_output
[self
.name
] += "REASON: %s\n" % str(err
[1]).strip()
663 if self
.immediate
and not self
.verbose
:
664 sys
.stdout
.write(self
.test_output
[self
.name
])
665 self
.test_output
[self
.name
] = ""
667 if not self
.immediate
:
672 'success': 'S'}.get(result
, "?"))
674 def write_summary(self
, path
):
677 if self
.suitesfailed
:
678 f
.write("= Failed tests =\n")
680 for suite
in self
.suitesfailed
:
681 f
.write("== %s ==\n" % suite
)
682 if suite
in self
.test_output
:
683 f
.write(self
.test_output
[suite
]+"\n\n")
687 if not self
.immediate
and not self
.verbose
:
688 for suite
in self
.suitesfailed
:
690 print "FAIL: %s" % suite
691 if suite
in self
.test_output
:
692 print self
.test_output
[suite
]
695 f
.write("= Skipped tests =\n")
696 for reason
in self
.skips
.keys():
697 f
.write(reason
+ "\n")
698 for name
in self
.skips
[reason
]:
699 f
.write("\t%s\n" % name
)
703 if (not self
.suitesfailed
and
704 not self
.statistics
['TESTS_UNEXPECTED_FAIL'] and
705 not self
.statistics
['TESTS_UNEXPECTED_OK'] and
706 not self
.statistics
['TESTS_ERROR']):
707 ok
= (self
.statistics
['TESTS_EXPECTED_OK'] +
708 self
.statistics
['TESTS_EXPECTED_FAIL'])
709 print "\nALL OK (%d tests in %d testsuites)" % (ok
, self
.suites_ok
)
711 print "\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
712 self
.statistics
['TESTS_UNEXPECTED_FAIL'],
713 self
.statistics
['TESTS_ERROR'],
714 self
.statistics
['TESTS_UNEXPECTED_OK'],
715 len(self
.suitesfailed
))
717 def skip_testsuite(self
, name
, reason
="UNKNOWN"):
718 self
.skips
.setdefault(reason
, []).append(name
)