1 # Python module for parsing and generating the Subunit protocol
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__
import print_function
19 __all__
= ['parse_results']
25 from samba
import subunit
26 from samba
.subunit
.run
import TestProtocolClient
27 from samba
.subunit
import iso8601
31 VALID_RESULTS
= set(['success', 'successful', 'failure', 'fail', 'skip',
32 'knownfail', 'error', 'xfail', 'skip-testsuite',
33 'testsuite-failure', 'testsuite-xfail',
34 'testsuite-success', 'testsuite-error',
35 'uxsuccess', 'testsuite-uxsuccess'])
38 class TestsuiteEnabledTestResult(unittest
.TestResult
):
40 def start_testsuite(self
, name
):
41 raise NotImplementedError(self
.start_testsuite
)
44 def parse_results(msg_ops
, statistics
, fh
):
52 parts
= l
.split(None, 1)
53 if not len(parts
) == 2 or not l
.startswith(parts
[0]):
56 command
= parts
[0].rstrip(":")
58 if command
in ("test", "testing"):
59 msg_ops
.control_msg(l
)
61 test
= subunit
.RemotedTestCase(name
)
62 if name
in open_tests
:
63 msg_ops
.addError(open_tests
.pop(name
), subunit
.RemoteError(u
"Test already running"))
64 msg_ops
.startTest(test
)
65 open_tests
[name
] = test
66 elif command
== "time":
67 msg_ops
.control_msg(l
)
69 dt
= iso8601
.parse_date(arg
.rstrip("\n"))
70 except TypeError as e
:
71 print("Unable to parse time line: %s" % arg
.rstrip("\n"))
74 elif command
in VALID_RESULTS
:
75 msg_ops
.control_msg(l
)
77 grp
= re
.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg
)
78 (testname
, hasreason
) = (grp
.group(1), grp
.group(2))
81 # reason may be specified in next lines
87 msg_ops
.control_msg(l
)
95 if isinstance(reason
, bytes
):
96 remote_error
= subunit
.RemoteError(reason
.decode("utf-8"))
98 remote_error
= subunit
.RemoteError(reason
)
101 statistics
['TESTS_ERROR'] += 1
102 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"reason (%s) interrupted" % result
))
106 remote_error
= subunit
.RemoteError(u
"No reason specified")
107 if result
in ("success", "successful"):
109 test
= open_tests
.pop(testname
)
111 statistics
['TESTS_ERROR'] += 1
113 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
115 statistics
['TESTS_EXPECTED_OK'] += 1
116 msg_ops
.addSuccess(test
)
117 elif result
in ("xfail", "knownfail"):
119 test
= open_tests
.pop(testname
)
121 statistics
['TESTS_ERROR'] += 1
123 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
125 statistics
['TESTS_EXPECTED_FAIL'] += 1
126 msg_ops
.addExpectedFailure(test
, remote_error
)
127 elif result
in ("uxsuccess", ):
129 test
= open_tests
.pop(testname
)
131 statistics
['TESTS_ERROR'] += 1
133 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
135 statistics
['TESTS_UNEXPECTED_OK'] += 1
136 msg_ops
.addUnexpectedSuccess(test
)
138 elif result
in ("failure", "fail"):
140 test
= open_tests
.pop(testname
)
142 statistics
['TESTS_ERROR'] += 1
144 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
146 statistics
['TESTS_UNEXPECTED_FAIL'] += 1
148 msg_ops
.addFailure(test
, remote_error
)
149 elif result
== "skip":
150 statistics
['TESTS_SKIP'] += 1
151 # Allow tests to be skipped without prior announcement of test
153 test
= open_tests
.pop(testname
)
155 test
= subunit
.RemotedTestCase(testname
)
156 msg_ops
.addSkip(test
, reason
)
157 elif result
== "error":
158 statistics
['TESTS_ERROR'] += 1
161 test
= open_tests
.pop(testname
)
163 test
= subunit
.RemotedTestCase(testname
)
164 msg_ops
.addError(test
, remote_error
)
165 elif result
== "skip-testsuite":
166 msg_ops
.skip_testsuite(testname
)
167 elif result
== "testsuite-success":
168 msg_ops
.end_testsuite(testname
, "success", reason
)
169 elif result
== "testsuite-failure":
170 msg_ops
.end_testsuite(testname
, "failure", reason
)
172 elif result
== "testsuite-xfail":
173 msg_ops
.end_testsuite(testname
, "xfail", reason
)
174 elif result
== "testsuite-uxsuccess":
175 msg_ops
.end_testsuite(testname
, "uxsuccess", reason
)
177 elif result
== "testsuite-error":
178 msg_ops
.end_testsuite(testname
, "error", reason
)
181 raise AssertionError("Recognized but unhandled result %r" %
183 elif command
== "testsuite":
184 msg_ops
.start_testsuite(arg
.strip())
185 elif command
== "progress":
188 msg_ops
.progress(None, subunit
.PROGRESS_POP
)
190 msg_ops
.progress(None, subunit
.PROGRESS_PUSH
)
192 msg_ops
.progress(int(arg
), subunit
.PROGRESS_CUR
)
194 msg_ops
.progress(int(arg
), subunit
.PROGRESS_SET
)
196 msg_ops
.output_msg(l
)
199 test
= subunit
.RemotedTestCase(open_tests
.popitem()[1])
200 msg_ops
.addError(test
, subunit
.RemoteError(u
"was started but never finished!"))
201 statistics
['TESTS_ERROR'] += 1
207 class SubunitOps(TestProtocolClient
, TestsuiteEnabledTestResult
):
209 def progress(self
, count
, whence
):
210 if whence
== subunit
.PROGRESS_POP
:
211 self
._stream
.write("progress: pop\n")
212 elif whence
== subunit
.PROGRESS_PUSH
:
213 self
._stream
.write("progress: push\n")
214 elif whence
== subunit
.PROGRESS_SET
:
215 self
._stream
.write("progress: %d\n" % count
)
216 elif whence
== subunit
.PROGRESS_CUR
:
217 raise NotImplementedError
219 # The following are Samba extensions:
220 def start_testsuite(self
, name
):
221 self
._stream
.write("testsuite: %s\n" % name
)
223 def skip_testsuite(self
, name
, reason
=None):
225 self
._stream
.write("skip-testsuite: %s [\n%s\n]\n" % (name
, reason
))
227 self
._stream
.write("skip-testsuite: %s\n" % name
)
229 def end_testsuite(self
, name
, result
, reason
=None):
231 self
._stream
.write("testsuite-%s: %s [\n%s\n]\n" % (result
, name
, reason
))
233 self
._stream
.write("testsuite-%s: %s\n" % (result
, name
))
235 def output_msg(self
, msg
):
236 self
._stream
.write(msg
)
239 def read_test_regexes(*names
):
243 # if we are given a directory, we read all the files it contains
244 # (except the ones that end with "~").
245 if os
.path
.isdir(name
):
246 files
.extend([os
.path
.join(name
, x
)
247 for x
in os
.listdir(name
)
252 for filename
in files
:
253 f
= open(filename
, 'r')
257 if l
== "" or l
[0] == "#":
260 (regex
, reason
) = l
.split("#", 1)
261 ret
[regex
.strip()] = reason
.strip()
269 def find_in_list(regexes
, fullname
):
270 for regex
, reason
in regexes
.items():
271 if re
.match(regex
, fullname
):
278 class ImmediateFail(Exception):
279 """Raised to abort immediately."""
282 super(ImmediateFail
, self
).__init
__("test failed and fail_immediately set")
285 class FilterOps(unittest
.TestResult
):
287 def control_msg(self
, msg
):
288 pass # We regenerate control messages, so ignore this
290 def time(self
, time
):
293 def progress(self
, delta
, whence
):
294 self
._ops
.progress(delta
, whence
)
296 def output_msg(self
, msg
):
297 if self
.output
is None:
298 sys
.stdout
.write(msg
)
302 def startTest(self
, test
):
303 self
.seen_output
= True
304 test
= self
._add
_prefix
(test
)
305 if self
.strip_ok_output
:
308 self
._ops
.startTest(test
)
310 def _add_prefix(self
, test
):
311 return subunit
.RemotedTestCase(self
.prefix
+ test
.id() + self
.suffix
)
313 def addError(self
, test
, err
=None):
314 test
= self
._add
_prefix
(test
)
315 self
.error_added
+= 1
316 self
.total_error
+= 1
317 self
._ops
.addError(test
, err
)
319 if self
.fail_immediately
:
320 raise ImmediateFail()
322 def addSkip(self
, test
, reason
=None):
323 self
.seen_output
= True
324 test
= self
._add
_prefix
(test
)
325 self
._ops
.addSkip(test
, reason
)
328 def addExpectedFailure(self
, test
, err
=None):
329 test
= self
._add
_prefix
(test
)
330 self
._ops
.addExpectedFailure(test
, err
)
333 def addUnexpectedSuccess(self
, test
):
334 test
= self
._add
_prefix
(test
)
335 self
.uxsuccess_added
+= 1
336 self
.total_uxsuccess
+= 1
337 self
._ops
.addUnexpectedSuccess(test
)
339 self
._ops
.output_msg(self
.output
)
341 if self
.fail_immediately
:
342 raise ImmediateFail()
344 def addFailure(self
, test
, err
=None):
345 test
= self
._add
_prefix
(test
)
346 xfail_reason
= find_in_list(self
.expected_failures
, test
.id())
347 if xfail_reason
is None:
348 xfail_reason
= find_in_list(self
.flapping
, test
.id())
349 if xfail_reason
is not None:
350 self
.xfail_added
+= 1
351 self
.total_xfail
+= 1
352 self
._ops
.addExpectedFailure(test
, err
)
356 self
._ops
.addFailure(test
, err
)
358 self
._ops
.output_msg(self
.output
)
359 if self
.fail_immediately
:
360 raise ImmediateFail()
363 def addSuccess(self
, test
):
364 test
= self
._add
_prefix
(test
)
365 xfail_reason
= find_in_list(self
.expected_failures
, test
.id())
366 if xfail_reason
is not None:
367 self
.uxsuccess_added
+= 1
368 self
.total_uxsuccess
+= 1
369 self
._ops
.addUnexpectedSuccess(test
)
371 self
._ops
.output_msg(self
.output
)
372 if self
.fail_immediately
:
373 raise ImmediateFail()
375 self
._ops
.addSuccess(test
)
378 def skip_testsuite(self
, name
, reason
=None):
379 self
._ops
.skip_testsuite(name
, reason
)
381 def start_testsuite(self
, name
):
382 self
._ops
.start_testsuite(name
)
386 self
.uxsuccess_added
= 0
388 def end_testsuite(self
, name
, result
, reason
=None):
391 if self
.xfail_added
> 0:
393 if self
.fail_added
> 0 or self
.error_added
> 0 or self
.uxsuccess_added
> 0:
396 if xfail
and result
in ("fail", "failure"):
399 if self
.uxsuccess_added
> 0 and result
!= "uxsuccess":
402 reason
= "Subunit/Filter Reason"
403 reason
+= "\n uxsuccess[%d]" % self
.uxsuccess_added
405 if self
.fail_added
> 0 and result
!= "failure":
408 reason
= "Subunit/Filter Reason"
409 reason
+= "\n failures[%d]" % self
.fail_added
411 if self
.error_added
> 0 and result
!= "error":
414 reason
= "Subunit/Filter Reason"
415 reason
+= "\n errors[%d]" % self
.error_added
417 self
._ops
.end_testsuite(name
, result
, reason
)
418 if result
not in ("success", "xfail"):
420 self
._ops
.output_msg(self
.output
)
421 if self
.fail_immediately
:
422 raise ImmediateFail()
425 def __init__(self
, out
, prefix
=None, suffix
=None, expected_failures
=None,
426 strip_ok_output
=False, fail_immediately
=False,
429 self
.seen_output
= False
433 if expected_failures
is not None:
434 self
.expected_failures
= expected_failures
436 self
.expected_failures
= {}
437 if flapping
is not None:
438 self
.flapping
= flapping
441 self
.strip_ok_output
= strip_ok_output
444 self
.uxsuccess_added
= 0
448 self
.total_uxsuccess
= 0
450 self
.fail_immediately
= fail_immediately
453 class PerfFilterOps(unittest
.TestResult
):
455 def progress(self
, delta
, whence
):
458 def output_msg(self
, msg
):
461 def control_msg(self
, msg
):
464 def skip_testsuite(self
, name
, reason
=None):
465 self
._ops
.skip_testsuite(name
, reason
)
467 def start_testsuite(self
, name
):
468 self
.suite_has_time
= False
470 def end_testsuite(self
, name
, result
, reason
=None):
473 def _add_prefix(self
, test
):
474 return subunit
.RemotedTestCase(self
.prefix
+ test
.id() + self
.suffix
)
476 def time(self
, time
):
477 self
.latest_time
= time
478 #self._ops.output_msg("found time %s\n" % time)
479 self
.suite_has_time
= True
482 if self
.suite_has_time
:
483 return self
.latest_time
484 return datetime
.datetime
.utcnow()
486 def startTest(self
, test
):
487 self
.seen_output
= True
488 test
= self
._add
_prefix
(test
)
489 self
.starts
[test
.id()] = self
.get_time()
491 def addSuccess(self
, test
):
492 test
= self
._add
_prefix
(test
)
494 if tid
not in self
.starts
:
495 self
._ops
.addError(test
, "%s succeeded without ever starting!" % tid
)
496 delta
= self
.get_time() - self
.starts
[tid
]
497 self
._ops
.output_msg("elapsed-time: %s: %f\n" % (tid
, delta
.total_seconds()))
499 def addFailure(self
, test
, err
=''):
501 delta
= self
.get_time() - self
.starts
[tid
]
502 self
._ops
.output_msg("failure: %s failed after %f seconds (%s)\n" %
503 (tid
, delta
.total_seconds(), err
))
505 def addError(self
, test
, err
=''):
507 delta
= self
.get_time() - self
.starts
[tid
]
508 self
._ops
.output_msg("error: %s failed after %f seconds (%s)\n" %
509 (tid
, delta
.total_seconds(), err
))
511 def __init__(self
, out
, prefix
='', suffix
=''):
513 self
.prefix
= prefix
or ''
514 self
.suffix
= suffix
or ''
516 self
.seen_output
= False
517 self
.suite_has_time
= False
520 class PlainFormatter(TestsuiteEnabledTestResult
):
522 def __init__(self
, verbose
, immediate
, statistics
,
524 super(PlainFormatter
, self
).__init
__()
525 self
.verbose
= verbose
526 self
.immediate
= immediate
527 self
.statistics
= statistics
528 self
.start_time
= None
529 self
.test_output
= {}
530 self
.suitesfailed
= []
535 self
._progress
_level
= 0
536 self
.totalsuites
= totaltests
537 self
.last_time
= None
540 def _format_time(delta
):
541 minutes
, seconds
= divmod(delta
.seconds
, 60)
542 hours
, minutes
= divmod(minutes
, 60)
547 ret
+= "%dm" % minutes
548 ret
+= "%ds" % seconds
551 def progress(self
, offset
, whence
):
552 if whence
== subunit
.PROGRESS_POP
:
553 self
._progress
_level
-= 1
554 elif whence
== subunit
.PROGRESS_PUSH
:
555 self
._progress
_level
+= 1
556 elif whence
== subunit
.PROGRESS_SET
:
557 if self
._progress
_level
== 0:
558 self
.totalsuites
= offset
559 elif whence
== subunit
.PROGRESS_CUR
:
560 raise NotImplementedError
563 if self
.start_time
is None:
567 def start_testsuite(self
, name
):
572 self
.test_output
[name
] = ""
574 total_tests
= (self
.statistics
['TESTS_EXPECTED_OK'] +
575 self
.statistics
['TESTS_EXPECTED_FAIL'] +
576 self
.statistics
['TESTS_ERROR'] +
577 self
.statistics
['TESTS_UNEXPECTED_FAIL'] +
578 self
.statistics
['TESTS_UNEXPECTED_OK'])
580 out
= "[%d(%d)" % (self
.index
, total_tests
)
581 if self
.totalsuites
is not None:
582 out
+= "/%d" % self
.totalsuites
583 if self
.start_time
is not None:
584 out
+= " at " + self
._format
_time
(self
.last_time
- self
.start_time
)
585 if self
.suitesfailed
:
586 out
+= ", %d errors" % (len(self
.suitesfailed
),)
589 sys
.stdout
.write(out
+ "\n")
591 sys
.stdout
.write(out
+ ": ")
593 def output_msg(self
, output
):
595 sys
.stdout
.write(output
)
596 elif self
.name
is not None:
597 self
.test_output
[self
.name
] += output
599 sys
.stdout
.write(output
)
601 def control_msg(self
, output
):
604 def end_testsuite(self
, name
, result
, reason
):
608 if name
not in self
.test_output
:
609 print("no output for name[%s]" % name
)
611 if result
in ("success", "xfail"):
614 self
.output_msg("ERROR: Testsuite[%s]\n" % name
)
615 if reason
is not None:
616 self
.output_msg("REASON: %s\n" % (reason
,))
617 self
.suitesfailed
.append(name
)
618 if self
.immediate
and not self
.verbose
and name
in self
.test_output
:
619 out
+= self
.test_output
[name
]
622 if not self
.immediate
:
626 out
+= " " + result
.upper() + "\n"
628 sys
.stdout
.write(out
)
630 def startTest(self
, test
):
633 def addSuccess(self
, test
):
634 self
.end_test(test
.id(), "success", False)
636 def addError(self
, test
, err
=None):
637 self
.end_test(test
.id(), "error", True, err
)
639 def addFailure(self
, test
, err
=None):
640 self
.end_test(test
.id(), "failure", True, err
)
642 def addSkip(self
, test
, reason
=None):
643 self
.end_test(test
.id(), "skip", False, reason
)
645 def addExpectedFailure(self
, test
, err
=None):
646 self
.end_test(test
.id(), "xfail", False, err
)
648 def addUnexpectedSuccess(self
, test
):
649 self
.end_test(test
.id(), "uxsuccess", True)
651 def end_test(self
, testname
, result
, unexpected
, err
=None):
653 self
.test_output
[self
.name
] = ""
654 if not self
.immediate
:
659 'success': '.'}.get(result
, "?(%s)" % result
))
662 if self
.name
not in self
.test_output
:
663 self
.test_output
[self
.name
] = ""
665 self
.test_output
[self
.name
] += "UNEXPECTED(%s): %s\n" % (result
, testname
)
667 self
.test_output
[self
.name
] += "REASON: %s\n" % str(err
[1]).strip()
669 if self
.immediate
and not self
.verbose
:
670 sys
.stdout
.write(self
.test_output
[self
.name
])
671 self
.test_output
[self
.name
] = ""
673 if not self
.immediate
:
678 'success': 'S'}.get(result
, "?"))
680 def write_summary(self
, path
):
683 if self
.suitesfailed
:
684 f
.write("= Failed tests =\n")
686 for suite
in self
.suitesfailed
:
687 f
.write("== %s ==\n" % suite
)
688 if suite
in self
.test_output
:
689 f
.write(self
.test_output
[suite
] + "\n\n")
693 if not self
.immediate
and not self
.verbose
:
694 for suite
in self
.suitesfailed
:
696 print("FAIL: %s" % suite
)
697 if suite
in self
.test_output
:
698 print(self
.test_output
[suite
])
701 f
.write("= Skipped tests =\n")
702 for reason
in self
.skips
.keys():
703 f
.write(reason
+ "\n")
704 for name
in self
.skips
[reason
]:
705 f
.write("\t%s\n" % name
)
709 if (not self
.suitesfailed
and
710 not self
.statistics
['TESTS_UNEXPECTED_FAIL'] and
711 not self
.statistics
['TESTS_UNEXPECTED_OK'] and
712 not self
.statistics
['TESTS_ERROR']):
713 ok
= (self
.statistics
['TESTS_EXPECTED_OK'] +
714 self
.statistics
['TESTS_EXPECTED_FAIL'])
715 print("\nALL OK (%d tests in %d testsuites)" % (ok
, self
.suites_ok
))
717 print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
718 self
.statistics
['TESTS_UNEXPECTED_FAIL'],
719 self
.statistics
['TESTS_ERROR'],
720 self
.statistics
['TESTS_UNEXPECTED_OK'],
721 len(self
.suitesfailed
)))
723 def skip_testsuite(self
, name
, reason
="UNKNOWN"):
724 self
.skips
.setdefault(reason
, []).append(name
)
726 self
.totalsuites
-= 1