1 # Python module for parsing and generating the Subunit protocol
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 __all__
= ['parse_results']
24 from samba
import subunit
25 from samba
.subunit
.run
import TestProtocolClient
28 from dateutil
.parser
import isoparse
as iso_parse_date
31 from iso8601
import parse_date
as iso_parse_date
33 print('Install either python-dateutil >= 2.7.1 or python-iso8601')
36 VALID_RESULTS
= set(['success', 'successful', 'failure', 'fail', 'skip',
37 'knownfail', 'error', 'xfail', 'skip-testsuite',
38 'testsuite-failure', 'testsuite-xfail',
39 'testsuite-success', 'testsuite-error',
40 'uxsuccess', 'testsuite-uxsuccess'])
43 class TestsuiteEnabledTestResult(unittest
.TestResult
):
45 def start_testsuite(self
, name
):
46 raise NotImplementedError(self
.start_testsuite
)
49 def parse_results(msg_ops
, statistics
, fh
):
54 parts
= l
.split(None, 1)
55 if not len(parts
) == 2 or not l
.startswith(parts
[0]):
58 command
= parts
[0].rstrip(":")
60 if command
in ("test", "testing"):
61 msg_ops
.control_msg(l
)
63 test
= subunit
.RemotedTestCase(name
)
64 if name
in open_tests
:
65 msg_ops
.addError(open_tests
.pop(name
), subunit
.RemoteError(u
"Test already running"))
66 msg_ops
.startTest(test
)
67 open_tests
[name
] = test
68 elif command
== "time":
69 msg_ops
.control_msg(l
)
71 dt
= iso_parse_date(arg
.rstrip("\n"))
73 print("Unable to parse time line: %s" % arg
.rstrip("\n"))
76 elif command
in VALID_RESULTS
:
77 msg_ops
.control_msg(l
)
79 grp
= re
.match(r
"(.*?)( \[)?([ \t]*)( multipart)?\n", arg
)
80 (testname
, hasreason
) = (grp
.group(1), grp
.group(2))
83 # reason may be specified in next lines
86 msg_ops
.control_msg(l
)
93 if isinstance(reason
, bytes
):
94 remote_error
= subunit
.RemoteError(reason
.decode("utf-8"))
96 remote_error
= subunit
.RemoteError(reason
)
99 statistics
['TESTS_ERROR'] += 1
100 msg_ops
.addError(subunit
.RemotedTestCase(testname
),
101 subunit
.RemoteError(u
"result (%s) reason (%s) interrupted" % (result
, reason
)))
105 remote_error
= subunit
.RemoteError(u
"No reason specified")
106 if result
in ("success", "successful"):
108 test
= open_tests
.pop(testname
)
110 statistics
['TESTS_ERROR'] += 1
112 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
114 statistics
['TESTS_EXPECTED_OK'] += 1
115 msg_ops
.addSuccess(test
)
116 elif result
in ("xfail", "knownfail"):
118 test
= open_tests
.pop(testname
)
120 statistics
['TESTS_ERROR'] += 1
122 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
124 statistics
['TESTS_EXPECTED_FAIL'] += 1
125 msg_ops
.addExpectedFailure(test
, remote_error
)
126 elif result
in ("uxsuccess", ):
128 test
= open_tests
.pop(testname
)
130 statistics
['TESTS_ERROR'] += 1
132 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
134 statistics
['TESTS_UNEXPECTED_OK'] += 1
135 msg_ops
.addUnexpectedSuccess(test
)
137 elif result
in ("failure", "fail"):
139 test
= open_tests
.pop(testname
)
141 statistics
['TESTS_ERROR'] += 1
143 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
145 statistics
['TESTS_UNEXPECTED_FAIL'] += 1
147 msg_ops
.addFailure(test
, remote_error
)
148 elif result
== "skip":
149 statistics
['TESTS_SKIP'] += 1
150 # Allow tests to be skipped without prior announcement of test
152 test
= open_tests
.pop(testname
)
154 test
= subunit
.RemotedTestCase(testname
)
155 msg_ops
.addSkip(test
, reason
)
156 elif result
== "error":
157 statistics
['TESTS_ERROR'] += 1
160 test
= open_tests
.pop(testname
)
162 test
= subunit
.RemotedTestCase(testname
)
163 msg_ops
.addError(test
, remote_error
)
164 elif result
== "skip-testsuite":
165 msg_ops
.skip_testsuite(testname
)
166 elif result
== "testsuite-success":
167 msg_ops
.end_testsuite(testname
, "success", reason
)
168 elif result
== "testsuite-failure":
169 msg_ops
.end_testsuite(testname
, "failure", reason
)
171 elif result
== "testsuite-xfail":
172 msg_ops
.end_testsuite(testname
, "xfail", reason
)
173 elif result
== "testsuite-uxsuccess":
174 msg_ops
.end_testsuite(testname
, "uxsuccess", reason
)
176 elif result
== "testsuite-error":
177 msg_ops
.end_testsuite(testname
, "error", reason
)
180 raise AssertionError("Recognized but unhandled result %r" %
182 elif command
== "testsuite":
183 msg_ops
.start_testsuite(arg
.strip())
184 elif command
== "progress":
187 msg_ops
.progress(None, subunit
.PROGRESS_POP
)
189 msg_ops
.progress(None, subunit
.PROGRESS_PUSH
)
191 msg_ops
.progress(int(arg
), subunit
.PROGRESS_CUR
)
193 msg_ops
.progress(int(arg
), subunit
.PROGRESS_SET
)
195 msg_ops
.output_msg(l
)
198 test
= subunit
.RemotedTestCase(open_tests
.popitem()[1])
199 msg_ops
.addError(test
, subunit
.RemoteError(u
"was started but never finished!"))
200 statistics
['TESTS_ERROR'] += 1
206 class SubunitOps(TestProtocolClient
, TestsuiteEnabledTestResult
):
208 def progress(self
, count
, whence
):
209 if whence
== subunit
.PROGRESS_POP
:
210 self
._stream
.write("progress: pop\n")
211 elif whence
== subunit
.PROGRESS_PUSH
:
212 self
._stream
.write("progress: push\n")
213 elif whence
== subunit
.PROGRESS_SET
:
214 self
._stream
.write("progress: %d\n" % count
)
215 elif whence
== subunit
.PROGRESS_CUR
:
216 raise NotImplementedError
218 # The following are Samba extensions:
219 def start_testsuite(self
, name
):
220 self
._stream
.write("testsuite: %s\n" % name
)
222 def skip_testsuite(self
, name
, reason
=None):
224 self
._stream
.write("skip-testsuite: %s [\n%s\n]\n" % (name
, reason
))
226 self
._stream
.write("skip-testsuite: %s\n" % name
)
228 def end_testsuite(self
, name
, result
, reason
=None):
230 self
._stream
.write("testsuite-%s: %s [\n%s\n]\n" % (result
, name
, reason
))
232 self
._stream
.write("testsuite-%s: %s\n" % (result
, name
))
234 def output_msg(self
, msg
):
235 self
._stream
.write(msg
)
238 def read_test_regexes(*names
):
242 # if we are given a directory, we read all the files it contains
243 # (except the ones that end with "~").
244 if os
.path
.isdir(name
):
245 files
.extend([os
.path
.join(name
, x
)
246 for x
in os
.listdir(name
)
251 for filename
in files
:
252 with
open(filename
, 'r') as f
:
255 if l
== "" or l
[0] == "#":
258 (regex
, reason
) = l
.split("#", 1)
259 ret
.append(re
.compile(regex
.strip()))
261 ret
.append(re
.compile(l
))
266 def find_in_list(regexes
, fullname
):
267 for regex
in regexes
:
268 if regex
.match(fullname
):
273 class ImmediateFail(Exception):
274 """Raised to abort immediately."""
277 super(ImmediateFail
, self
).__init
__("test failed and fail_immediately set")
280 class FilterOps(unittest
.TestResult
):
282 def control_msg(self
, msg
):
283 pass # We regenerate control messages, so ignore this
285 def time(self
, time
):
288 def progress(self
, delta
, whence
):
289 self
._ops
.progress(delta
, whence
)
291 def output_msg(self
, msg
):
292 if self
.output
is None:
293 sys
.stdout
.write(msg
)
297 def startTest(self
, test
):
298 self
.seen_output
= True
299 test
= self
._add
_prefix
(test
)
300 if self
.strip_ok_output
:
303 self
._ops
.startTest(test
)
305 def _add_prefix(self
, test
):
306 return subunit
.RemotedTestCase(self
.prefix
+ test
.id() + self
.suffix
)
308 def addError(self
, test
, err
=None):
309 test
= self
._add
_prefix
(test
)
310 self
.error_added
+= 1
311 self
.total_error
+= 1
312 self
._ops
.addError(test
, err
)
313 self
._ops
.writeOutcome(test
)
315 if self
.fail_immediately
:
316 raise ImmediateFail()
318 def addSkip(self
, test
, reason
=None):
319 self
.seen_output
= True
320 test
= self
._add
_prefix
(test
)
321 self
._ops
.addSkip(test
, reason
)
322 self
._ops
.writeOutcome(test
)
325 def addExpectedFailure(self
, test
, err
=None):
326 test
= self
._add
_prefix
(test
)
327 self
._ops
.addExpectedFailure(test
, err
)
328 self
._ops
.writeOutcome(test
)
331 def addUnexpectedSuccess(self
, test
):
332 test
= self
._add
_prefix
(test
)
333 self
.uxsuccess_added
+= 1
334 self
.total_uxsuccess
+= 1
335 self
._ops
.addUnexpectedSuccess(test
)
336 self
._ops
.writeOutcome(test
)
338 self
._ops
.output_msg(self
.output
)
340 if self
.fail_immediately
:
341 raise ImmediateFail()
343 def addFailure(self
, test
, err
=None):
344 test
= self
._add
_prefix
(test
)
345 xfail
= find_in_list(self
.expected_failures
, test
.id())
347 xfail
= find_in_list(self
.flapping
, test
.id())
349 self
.xfail_added
+= 1
350 self
.total_xfail
+= 1
351 self
._ops
.addExpectedFailure(test
, err
)
352 self
._ops
.writeOutcome(test
)
356 self
._ops
.addFailure(test
, err
)
357 self
._ops
.writeOutcome(test
)
359 self
._ops
.output_msg(self
.output
)
360 if self
.fail_immediately
:
361 raise ImmediateFail()
364 def addSuccess(self
, test
):
365 test
= self
._add
_prefix
(test
)
366 xfail
= find_in_list(self
.expected_failures
, test
.id())
368 self
.uxsuccess_added
+= 1
369 self
.total_uxsuccess
+= 1
370 self
._ops
.addUnexpectedSuccess(test
)
371 self
._ops
.writeOutcome(test
)
373 self
._ops
.output_msg(self
.output
)
374 if self
.fail_immediately
:
375 raise ImmediateFail()
377 self
._ops
.addSuccess(test
)
378 self
._ops
.writeOutcome(test
)
381 def skip_testsuite(self
, name
, reason
=None):
382 self
._ops
.skip_testsuite(name
, reason
)
384 def start_testsuite(self
, name
):
385 self
._ops
.start_testsuite(name
)
389 self
.uxsuccess_added
= 0
391 def end_testsuite(self
, name
, result
, reason
=None):
394 if self
.xfail_added
> 0:
396 if self
.fail_added
> 0 or self
.error_added
> 0 or self
.uxsuccess_added
> 0:
399 if xfail
and result
in ("fail", "failure"):
402 if self
.uxsuccess_added
> 0 and result
!= "uxsuccess":
405 reason
= "Subunit/Filter Reason"
406 reason
+= "\n uxsuccess[%d]" % self
.uxsuccess_added
408 if self
.fail_added
> 0 and result
!= "failure":
411 reason
= "Subunit/Filter Reason"
412 reason
+= "\n failures[%d]" % self
.fail_added
414 if self
.error_added
> 0 and result
!= "error":
417 reason
= "Subunit/Filter Reason"
418 reason
+= "\n errors[%d]" % self
.error_added
420 self
._ops
.end_testsuite(name
, result
, reason
)
421 if result
not in ("success", "xfail"):
423 self
._ops
.output_msg(self
.output
)
424 if self
.fail_immediately
:
425 raise ImmediateFail()
428 def __init__(self
, out
, prefix
=None, suffix
=None, expected_failures
=None,
429 strip_ok_output
=False, fail_immediately
=False,
432 self
.seen_output
= False
436 if expected_failures
is not None:
437 self
.expected_failures
= expected_failures
439 self
.expected_failures
= []
440 if flapping
is not None:
441 self
.flapping
= flapping
444 self
.strip_ok_output
= strip_ok_output
447 self
.uxsuccess_added
= 0
451 self
.total_uxsuccess
= 0
453 self
.fail_immediately
= fail_immediately
456 class PerfFilterOps(unittest
.TestResult
):
458 def progress(self
, delta
, whence
):
461 def output_msg(self
, msg
):
464 def control_msg(self
, msg
):
467 def skip_testsuite(self
, name
, reason
=None):
468 self
._ops
.skip_testsuite(name
, reason
)
470 def start_testsuite(self
, name
):
471 self
.suite_has_time
= False
473 def end_testsuite(self
, name
, result
, reason
=None):
476 def _add_prefix(self
, test
):
477 return subunit
.RemotedTestCase(self
.prefix
+ test
.id() + self
.suffix
)
479 def time(self
, time
):
480 self
.latest_time
= time
481 self
.suite_has_time
= True
484 if self
.suite_has_time
:
485 return self
.latest_time
486 return datetime
.datetime
.now(tz
=datetime
.timezone
.utc
)
488 def startTest(self
, test
):
489 self
.seen_output
= True
490 test
= self
._add
_prefix
(test
)
491 self
.starts
[test
.id()] = self
.get_time()
493 def addSuccess(self
, test
):
494 test
= self
._add
_prefix
(test
)
496 if tid
not in self
.starts
:
497 self
._ops
.addError(test
, "%s succeeded without ever starting!" % tid
)
498 delta
= self
.get_time() - self
.starts
[tid
]
499 self
._ops
.output_msg("elapsed-time: %s: %f\n" % (tid
, delta
.total_seconds()))
501 def addFailure(self
, test
, err
=''):
503 delta
= self
.get_time() - self
.starts
[tid
]
504 self
._ops
.output_msg("failure: %s failed after %f seconds (%s)\n" %
505 (tid
, delta
.total_seconds(), err
))
507 def addError(self
, test
, err
=''):
509 delta
= self
.get_time() - self
.starts
[tid
]
510 self
._ops
.output_msg("error: %s failed after %f seconds (%s)\n" %
511 (tid
, delta
.total_seconds(), err
))
513 def __init__(self
, out
, prefix
='', suffix
=''):
515 self
.prefix
= prefix
or ''
516 self
.suffix
= suffix
or ''
518 self
.seen_output
= False
519 self
.suite_has_time
= False
522 class PlainFormatter(TestsuiteEnabledTestResult
):
524 def __init__(self
, verbose
, immediate
, statistics
,
526 super(PlainFormatter
, self
).__init
__()
527 self
.verbose
= verbose
528 self
.immediate
= immediate
529 self
.statistics
= statistics
530 self
.start_time
= None
531 self
.test_output
= {}
532 self
.suitesfailed
= []
537 self
._progress
_level
= 0
538 self
.totalsuites
= totaltests
539 self
.last_time
= None
542 def _format_time(delta
):
543 minutes
, seconds
= divmod(delta
.seconds
, 60)
544 hours
, minutes
= divmod(minutes
, 60)
549 ret
+= "%dm" % minutes
550 ret
+= "%ds" % seconds
553 def progress(self
, offset
, whence
):
554 if whence
== subunit
.PROGRESS_POP
:
555 self
._progress
_level
-= 1
556 elif whence
== subunit
.PROGRESS_PUSH
:
557 self
._progress
_level
+= 1
558 elif whence
== subunit
.PROGRESS_SET
:
559 if self
._progress
_level
== 0:
560 self
.totalsuites
= offset
561 elif whence
== subunit
.PROGRESS_CUR
:
562 raise NotImplementedError
565 if self
.start_time
is None:
569 def start_testsuite(self
, name
):
574 self
.test_output
[name
] = ""
576 total_tests
= (self
.statistics
['TESTS_EXPECTED_OK'] +
577 self
.statistics
['TESTS_EXPECTED_FAIL'] +
578 self
.statistics
['TESTS_ERROR'] +
579 self
.statistics
['TESTS_UNEXPECTED_FAIL'] +
580 self
.statistics
['TESTS_UNEXPECTED_OK'])
582 out
= "[%d(%d)" % (self
.index
, total_tests
)
583 if self
.totalsuites
is not None:
584 out
+= "/%d" % self
.totalsuites
585 if self
.start_time
is not None:
586 out
+= " at " + self
._format
_time
(self
.last_time
- self
.start_time
)
587 if self
.suitesfailed
:
588 out
+= ", %d errors" % (len(self
.suitesfailed
),)
591 sys
.stdout
.write(out
+ "\n")
593 sys
.stdout
.write(out
+ ": ")
595 def output_msg(self
, output
):
597 sys
.stdout
.write(output
)
598 elif self
.name
is not None:
599 self
.test_output
[self
.name
] += output
601 sys
.stdout
.write(output
)
603 def control_msg(self
, output
):
606 def end_testsuite(self
, name
, result
, reason
):
610 if name
not in self
.test_output
:
611 print("no output for name[%s]" % name
)
613 if result
in ("success", "xfail"):
616 self
.output_msg("ERROR: Testsuite[%s]\n" % name
)
617 if reason
is not None:
618 self
.output_msg("REASON: %s\n" % (reason
,))
619 self
.suitesfailed
.append(name
)
620 if self
.immediate
and not self
.verbose
and name
in self
.test_output
:
621 out
+= self
.test_output
[name
]
624 if not self
.immediate
:
628 out
+= " " + result
.upper() + "\n"
630 sys
.stdout
.write(out
)
632 def startTest(self
, test
):
635 def addSuccess(self
, test
):
636 self
.end_test(test
.id(), "success", False)
638 def addError(self
, test
, err
=None):
639 self
.end_test(test
.id(), "error", True, err
)
641 def addFailure(self
, test
, err
=None):
642 self
.end_test(test
.id(), "failure", True, err
)
644 def addSkip(self
, test
, reason
=None):
645 self
.end_test(test
.id(), "skip", False, reason
)
647 def addExpectedFailure(self
, test
, err
=None):
648 self
.end_test(test
.id(), "xfail", False, err
)
650 def addUnexpectedSuccess(self
, test
):
651 self
.end_test(test
.id(), "uxsuccess", True)
653 def end_test(self
, testname
, result
, unexpected
, err
=None):
655 self
.test_output
[self
.name
] = ""
656 if not self
.immediate
:
661 'success': '.'}.get(result
, "?(%s)" % result
))
664 if self
.name
not in self
.test_output
:
665 self
.test_output
[self
.name
] = ""
667 self
.test_output
[self
.name
] += "UNEXPECTED(%s): %s\n" % (result
, testname
)
669 self
.test_output
[self
.name
] += "REASON: %s\n" % str(err
[1]).strip()
671 if self
.immediate
and not self
.verbose
:
672 sys
.stdout
.write(self
.test_output
[self
.name
])
673 self
.test_output
[self
.name
] = ""
675 if not self
.immediate
:
680 'success': 'S'}.get(result
, "?"))
682 def write_summary(self
, path
):
685 if self
.suitesfailed
:
686 f
.write("= Failed tests =\n")
688 for suite
in self
.suitesfailed
:
689 f
.write("== %s ==\n" % suite
)
690 if suite
in self
.test_output
:
691 f
.write(self
.test_output
[suite
] + "\n\n")
695 if not self
.immediate
and not self
.verbose
:
696 for suite
in self
.suitesfailed
:
698 print("FAIL: %s" % suite
)
699 if suite
in self
.test_output
:
700 print(self
.test_output
[suite
])
703 f
.write("= Skipped tests =\n")
704 for reason
in self
.skips
.keys():
705 f
.write(reason
+ "\n")
706 for name
in self
.skips
[reason
]:
707 f
.write("\t%s\n" % name
)
711 if (not self
.suitesfailed
and
712 not self
.statistics
['TESTS_UNEXPECTED_FAIL'] and
713 not self
.statistics
['TESTS_UNEXPECTED_OK'] and
714 not self
.statistics
['TESTS_ERROR']):
715 ok
= (self
.statistics
['TESTS_EXPECTED_OK'] +
716 self
.statistics
['TESTS_EXPECTED_FAIL'])
717 print("\nALL OK (%d tests in %d testsuites)" % (ok
, self
.suites_ok
))
719 print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
720 self
.statistics
['TESTS_UNEXPECTED_FAIL'],
721 self
.statistics
['TESTS_ERROR'],
722 self
.statistics
['TESTS_UNEXPECTED_OK'],
723 len(self
.suitesfailed
)))
725 def skip_testsuite(self
, name
, reason
="UNKNOWN"):
726 self
.skips
.setdefault(reason
, []).append(name
)
728 self
.totalsuites
-= 1