1 # Python module for parsing and generating the Subunit protocol
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__
import print_function
19 __all__
= ['parse_results']
25 from samba
import subunit
26 from samba
.subunit
.run
import TestProtocolClient
27 from samba
.subunit
import iso8601
29 from samba
.compat
import binary_type
32 VALID_RESULTS
= set(['success', 'successful', 'failure', 'fail', 'skip',
33 'knownfail', 'error', 'xfail', 'skip-testsuite',
34 'testsuite-failure', 'testsuite-xfail',
35 'testsuite-success', 'testsuite-error',
36 'uxsuccess', 'testsuite-uxsuccess'])
39 class TestsuiteEnabledTestResult(unittest
.TestResult
):
41 def start_testsuite(self
, name
):
42 raise NotImplementedError(self
.start_testsuite
)
45 def parse_results(msg_ops
, statistics
, fh
):
53 parts
= l
.split(None, 1)
54 if not len(parts
) == 2 or not l
.startswith(parts
[0]):
57 command
= parts
[0].rstrip(":")
59 if command
in ("test", "testing"):
60 msg_ops
.control_msg(l
)
62 test
= subunit
.RemotedTestCase(name
)
63 if name
in open_tests
:
64 msg_ops
.addError(open_tests
.pop(name
), subunit
.RemoteError(u
"Test already running"))
65 msg_ops
.startTest(test
)
66 open_tests
[name
] = test
67 elif command
== "time":
68 msg_ops
.control_msg(l
)
70 dt
= iso8601
.parse_date(arg
.rstrip("\n"))
71 except TypeError as e
:
72 print("Unable to parse time line: %s" % arg
.rstrip("\n"))
75 elif command
in VALID_RESULTS
:
76 msg_ops
.control_msg(l
)
78 grp
= re
.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg
)
79 (testname
, hasreason
) = (grp
.group(1), grp
.group(2))
82 # reason may be specified in next lines
88 msg_ops
.control_msg(l
)
96 if isinstance(reason
, binary_type
):
97 remote_error
= subunit
.RemoteError(reason
.decode("utf-8"))
99 remote_error
= subunit
.RemoteError(reason
)
102 statistics
['TESTS_ERROR'] += 1
103 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"reason (%s) interrupted" % result
))
107 remote_error
= subunit
.RemoteError(u
"No reason specified")
108 if result
in ("success", "successful"):
110 test
= open_tests
.pop(testname
)
112 statistics
['TESTS_ERROR'] += 1
114 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
116 statistics
['TESTS_EXPECTED_OK'] += 1
117 msg_ops
.addSuccess(test
)
118 elif result
in ("xfail", "knownfail"):
120 test
= open_tests
.pop(testname
)
122 statistics
['TESTS_ERROR'] += 1
124 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
126 statistics
['TESTS_EXPECTED_FAIL'] += 1
127 msg_ops
.addExpectedFailure(test
, remote_error
)
128 elif result
in ("uxsuccess", ):
130 test
= open_tests
.pop(testname
)
132 statistics
['TESTS_ERROR'] += 1
134 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
136 statistics
['TESTS_UNEXPECTED_OK'] += 1
137 msg_ops
.addUnexpectedSuccess(test
)
139 elif result
in ("failure", "fail"):
141 test
= open_tests
.pop(testname
)
143 statistics
['TESTS_ERROR'] += 1
145 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
147 statistics
['TESTS_UNEXPECTED_FAIL'] += 1
149 msg_ops
.addFailure(test
, remote_error
)
150 elif result
== "skip":
151 statistics
['TESTS_SKIP'] += 1
152 # Allow tests to be skipped without prior announcement of test
154 test
= open_tests
.pop(testname
)
156 test
= subunit
.RemotedTestCase(testname
)
157 msg_ops
.addSkip(test
, reason
)
158 elif result
== "error":
159 statistics
['TESTS_ERROR'] += 1
162 test
= open_tests
.pop(testname
)
164 test
= subunit
.RemotedTestCase(testname
)
165 msg_ops
.addError(test
, remote_error
)
166 elif result
== "skip-testsuite":
167 msg_ops
.skip_testsuite(testname
)
168 elif result
== "testsuite-success":
169 msg_ops
.end_testsuite(testname
, "success", reason
)
170 elif result
== "testsuite-failure":
171 msg_ops
.end_testsuite(testname
, "failure", reason
)
173 elif result
== "testsuite-xfail":
174 msg_ops
.end_testsuite(testname
, "xfail", reason
)
175 elif result
== "testsuite-uxsuccess":
176 msg_ops
.end_testsuite(testname
, "uxsuccess", reason
)
178 elif result
== "testsuite-error":
179 msg_ops
.end_testsuite(testname
, "error", reason
)
182 raise AssertionError("Recognized but unhandled result %r" %
184 elif command
== "testsuite":
185 msg_ops
.start_testsuite(arg
.strip())
186 elif command
== "progress":
189 msg_ops
.progress(None, subunit
.PROGRESS_POP
)
191 msg_ops
.progress(None, subunit
.PROGRESS_PUSH
)
193 msg_ops
.progress(int(arg
), subunit
.PROGRESS_CUR
)
195 msg_ops
.progress(int(arg
), subunit
.PROGRESS_SET
)
197 msg_ops
.output_msg(l
)
200 test
= subunit
.RemotedTestCase(open_tests
.popitem()[1])
201 msg_ops
.addError(test
, subunit
.RemoteError(u
"was started but never finished!"))
202 statistics
['TESTS_ERROR'] += 1
208 class SubunitOps(TestProtocolClient
, TestsuiteEnabledTestResult
):
210 def progress(self
, count
, whence
):
211 if whence
== subunit
.PROGRESS_POP
:
212 self
._stream
.write("progress: pop\n")
213 elif whence
== subunit
.PROGRESS_PUSH
:
214 self
._stream
.write("progress: push\n")
215 elif whence
== subunit
.PROGRESS_SET
:
216 self
._stream
.write("progress: %d\n" % count
)
217 elif whence
== subunit
.PROGRESS_CUR
:
218 raise NotImplementedError
220 # The following are Samba extensions:
221 def start_testsuite(self
, name
):
222 self
._stream
.write("testsuite: %s\n" % name
)
224 def skip_testsuite(self
, name
, reason
=None):
226 self
._stream
.write("skip-testsuite: %s [\n%s\n]\n" % (name
, reason
))
228 self
._stream
.write("skip-testsuite: %s\n" % name
)
230 def end_testsuite(self
, name
, result
, reason
=None):
232 self
._stream
.write("testsuite-%s: %s [\n%s\n]\n" % (result
, name
, reason
))
234 self
._stream
.write("testsuite-%s: %s\n" % (result
, name
))
236 def output_msg(self
, msg
):
237 self
._stream
.write(msg
)
240 def read_test_regexes(*names
):
244 # if we are given a directory, we read all the files it contains
245 # (except the ones that end with "~").
246 if os
.path
.isdir(name
):
247 files
.extend([os
.path
.join(name
, x
)
248 for x
in os
.listdir(name
)
253 for filename
in files
:
254 f
= open(filename
, 'r')
258 if l
== "" or l
[0] == "#":
261 (regex
, reason
) = l
.split("#", 1)
262 ret
[regex
.strip()] = reason
.strip()
270 def find_in_list(regexes
, fullname
):
271 for regex
, reason
in regexes
.items():
272 if re
.match(regex
, fullname
):
279 class ImmediateFail(Exception):
280 """Raised to abort immediately."""
283 super(ImmediateFail
, self
).__init
__("test failed and fail_immediately set")
286 class FilterOps(unittest
.TestResult
):
288 def control_msg(self
, msg
):
289 pass # We regenerate control messages, so ignore this
291 def time(self
, time
):
294 def progress(self
, delta
, whence
):
295 self
._ops
.progress(delta
, whence
)
297 def output_msg(self
, msg
):
298 if self
.output
is None:
299 sys
.stdout
.write(msg
)
303 def startTest(self
, test
):
304 self
.seen_output
= True
305 test
= self
._add
_prefix
(test
)
306 if self
.strip_ok_output
:
309 self
._ops
.startTest(test
)
311 def _add_prefix(self
, test
):
312 return subunit
.RemotedTestCase(self
.prefix
+ test
.id() + self
.suffix
)
314 def addError(self
, test
, err
=None):
315 test
= self
._add
_prefix
(test
)
316 self
.error_added
+= 1
317 self
.total_error
+= 1
318 self
._ops
.addError(test
, err
)
320 if self
.fail_immediately
:
321 raise ImmediateFail()
323 def addSkip(self
, test
, reason
=None):
324 self
.seen_output
= True
325 test
= self
._add
_prefix
(test
)
326 self
._ops
.addSkip(test
, reason
)
329 def addExpectedFailure(self
, test
, err
=None):
330 test
= self
._add
_prefix
(test
)
331 self
._ops
.addExpectedFailure(test
, err
)
334 def addUnexpectedSuccess(self
, test
):
335 test
= self
._add
_prefix
(test
)
336 self
.uxsuccess_added
+= 1
337 self
.total_uxsuccess
+= 1
338 self
._ops
.addUnexpectedSuccess(test
)
340 self
._ops
.output_msg(self
.output
)
342 if self
.fail_immediately
:
343 raise ImmediateFail()
345 def addFailure(self
, test
, err
=None):
346 test
= self
._add
_prefix
(test
)
347 xfail_reason
= find_in_list(self
.expected_failures
, test
.id())
348 if xfail_reason
is None:
349 xfail_reason
= find_in_list(self
.flapping
, test
.id())
350 if xfail_reason
is not None:
351 self
.xfail_added
+= 1
352 self
.total_xfail
+= 1
353 self
._ops
.addExpectedFailure(test
, err
)
357 self
._ops
.addFailure(test
, err
)
359 self
._ops
.output_msg(self
.output
)
360 if self
.fail_immediately
:
361 raise ImmediateFail()
364 def addSuccess(self
, test
):
365 test
= self
._add
_prefix
(test
)
366 xfail_reason
= find_in_list(self
.expected_failures
, test
.id())
367 if xfail_reason
is not None:
368 self
.uxsuccess_added
+= 1
369 self
.total_uxsuccess
+= 1
370 self
._ops
.addUnexpectedSuccess(test
)
372 self
._ops
.output_msg(self
.output
)
373 if self
.fail_immediately
:
374 raise ImmediateFail()
376 self
._ops
.addSuccess(test
)
379 def skip_testsuite(self
, name
, reason
=None):
380 self
._ops
.skip_testsuite(name
, reason
)
382 def start_testsuite(self
, name
):
383 self
._ops
.start_testsuite(name
)
387 self
.uxsuccess_added
= 0
389 def end_testsuite(self
, name
, result
, reason
=None):
392 if self
.xfail_added
> 0:
394 if self
.fail_added
> 0 or self
.error_added
> 0 or self
.uxsuccess_added
> 0:
397 if xfail
and result
in ("fail", "failure"):
400 if self
.uxsuccess_added
> 0 and result
!= "uxsuccess":
403 reason
= "Subunit/Filter Reason"
404 reason
+= "\n uxsuccess[%d]" % self
.uxsuccess_added
406 if self
.fail_added
> 0 and result
!= "failure":
409 reason
= "Subunit/Filter Reason"
410 reason
+= "\n failures[%d]" % self
.fail_added
412 if self
.error_added
> 0 and result
!= "error":
415 reason
= "Subunit/Filter Reason"
416 reason
+= "\n errors[%d]" % self
.error_added
418 self
._ops
.end_testsuite(name
, result
, reason
)
419 if result
not in ("success", "xfail"):
421 self
._ops
.output_msg(self
.output
)
422 if self
.fail_immediately
:
423 raise ImmediateFail()
426 def __init__(self
, out
, prefix
=None, suffix
=None, expected_failures
=None,
427 strip_ok_output
=False, fail_immediately
=False,
430 self
.seen_output
= False
434 if expected_failures
is not None:
435 self
.expected_failures
= expected_failures
437 self
.expected_failures
= {}
438 if flapping
is not None:
439 self
.flapping
= flapping
442 self
.strip_ok_output
= strip_ok_output
445 self
.uxsuccess_added
= 0
449 self
.total_uxsuccess
= 0
451 self
.fail_immediately
= fail_immediately
454 class PerfFilterOps(unittest
.TestResult
):
456 def progress(self
, delta
, whence
):
459 def output_msg(self
, msg
):
462 def control_msg(self
, msg
):
465 def skip_testsuite(self
, name
, reason
=None):
466 self
._ops
.skip_testsuite(name
, reason
)
468 def start_testsuite(self
, name
):
469 self
.suite_has_time
= False
471 def end_testsuite(self
, name
, result
, reason
=None):
474 def _add_prefix(self
, test
):
475 return subunit
.RemotedTestCase(self
.prefix
+ test
.id() + self
.suffix
)
477 def time(self
, time
):
478 self
.latest_time
= time
479 #self._ops.output_msg("found time %s\n" % time)
480 self
.suite_has_time
= True
483 if self
.suite_has_time
:
484 return self
.latest_time
485 return datetime
.datetime
.utcnow()
487 def startTest(self
, test
):
488 self
.seen_output
= True
489 test
= self
._add
_prefix
(test
)
490 self
.starts
[test
.id()] = self
.get_time()
492 def addSuccess(self
, test
):
493 test
= self
._add
_prefix
(test
)
495 if tid
not in self
.starts
:
496 self
._ops
.addError(test
, "%s succeeded without ever starting!" % tid
)
497 delta
= self
.get_time() - self
.starts
[tid
]
498 self
._ops
.output_msg("elapsed-time: %s: %f\n" % (tid
, delta
.total_seconds()))
500 def addFailure(self
, test
, err
=''):
502 delta
= self
.get_time() - self
.starts
[tid
]
503 self
._ops
.output_msg("failure: %s failed after %f seconds (%s)\n" %
504 (tid
, delta
.total_seconds(), err
))
506 def addError(self
, test
, err
=''):
508 delta
= self
.get_time() - self
.starts
[tid
]
509 self
._ops
.output_msg("error: %s failed after %f seconds (%s)\n" %
510 (tid
, delta
.total_seconds(), err
))
512 def __init__(self
, out
, prefix
='', suffix
=''):
514 self
.prefix
= prefix
or ''
515 self
.suffix
= suffix
or ''
517 self
.seen_output
= False
518 self
.suite_has_time
= False
521 class PlainFormatter(TestsuiteEnabledTestResult
):
523 def __init__(self
, verbose
, immediate
, statistics
,
525 super(PlainFormatter
, self
).__init
__()
526 self
.verbose
= verbose
527 self
.immediate
= immediate
528 self
.statistics
= statistics
529 self
.start_time
= None
530 self
.test_output
= {}
531 self
.suitesfailed
= []
536 self
._progress
_level
= 0
537 self
.totalsuites
= totaltests
538 self
.last_time
= None
541 def _format_time(delta
):
542 minutes
, seconds
= divmod(delta
.seconds
, 60)
543 hours
, minutes
= divmod(minutes
, 60)
548 ret
+= "%dm" % minutes
549 ret
+= "%ds" % seconds
552 def progress(self
, offset
, whence
):
553 if whence
== subunit
.PROGRESS_POP
:
554 self
._progress
_level
-= 1
555 elif whence
== subunit
.PROGRESS_PUSH
:
556 self
._progress
_level
+= 1
557 elif whence
== subunit
.PROGRESS_SET
:
558 if self
._progress
_level
== 0:
559 self
.totalsuites
= offset
560 elif whence
== subunit
.PROGRESS_CUR
:
561 raise NotImplementedError
564 if self
.start_time
is None:
568 def start_testsuite(self
, name
):
573 self
.test_output
[name
] = ""
575 total_tests
= (self
.statistics
['TESTS_EXPECTED_OK'] +
576 self
.statistics
['TESTS_EXPECTED_FAIL'] +
577 self
.statistics
['TESTS_ERROR'] +
578 self
.statistics
['TESTS_UNEXPECTED_FAIL'] +
579 self
.statistics
['TESTS_UNEXPECTED_OK'])
581 out
= "[%d(%d)" % (self
.index
, total_tests
)
582 if self
.totalsuites
is not None:
583 out
+= "/%d" % self
.totalsuites
584 if self
.start_time
is not None:
585 out
+= " at " + self
._format
_time
(self
.last_time
- self
.start_time
)
586 if self
.suitesfailed
:
587 out
+= ", %d errors" % (len(self
.suitesfailed
),)
590 sys
.stdout
.write(out
+ "\n")
592 sys
.stdout
.write(out
+ ": ")
594 def output_msg(self
, output
):
596 sys
.stdout
.write(output
)
597 elif self
.name
is not None:
598 self
.test_output
[self
.name
] += output
600 sys
.stdout
.write(output
)
602 def control_msg(self
, output
):
605 def end_testsuite(self
, name
, result
, reason
):
609 if name
not in self
.test_output
:
610 print("no output for name[%s]" % name
)
612 if result
in ("success", "xfail"):
615 self
.output_msg("ERROR: Testsuite[%s]\n" % name
)
616 if reason
is not None:
617 self
.output_msg("REASON: %s\n" % (reason
,))
618 self
.suitesfailed
.append(name
)
619 if self
.immediate
and not self
.verbose
and name
in self
.test_output
:
620 out
+= self
.test_output
[name
]
623 if not self
.immediate
:
627 out
+= " " + result
.upper() + "\n"
629 sys
.stdout
.write(out
)
631 def startTest(self
, test
):
634 def addSuccess(self
, test
):
635 self
.end_test(test
.id(), "success", False)
637 def addError(self
, test
, err
=None):
638 self
.end_test(test
.id(), "error", True, err
)
640 def addFailure(self
, test
, err
=None):
641 self
.end_test(test
.id(), "failure", True, err
)
643 def addSkip(self
, test
, reason
=None):
644 self
.end_test(test
.id(), "skip", False, reason
)
646 def addExpectedFailure(self
, test
, err
=None):
647 self
.end_test(test
.id(), "xfail", False, err
)
649 def addUnexpectedSuccess(self
, test
):
650 self
.end_test(test
.id(), "uxsuccess", True)
652 def end_test(self
, testname
, result
, unexpected
, err
=None):
654 self
.test_output
[self
.name
] = ""
655 if not self
.immediate
:
660 'success': '.'}.get(result
, "?(%s)" % result
))
663 if self
.name
not in self
.test_output
:
664 self
.test_output
[self
.name
] = ""
666 self
.test_output
[self
.name
] += "UNEXPECTED(%s): %s\n" % (result
, testname
)
668 self
.test_output
[self
.name
] += "REASON: %s\n" % str(err
[1]).strip()
670 if self
.immediate
and not self
.verbose
:
671 sys
.stdout
.write(self
.test_output
[self
.name
])
672 self
.test_output
[self
.name
] = ""
674 if not self
.immediate
:
679 'success': 'S'}.get(result
, "?"))
681 def write_summary(self
, path
):
684 if self
.suitesfailed
:
685 f
.write("= Failed tests =\n")
687 for suite
in self
.suitesfailed
:
688 f
.write("== %s ==\n" % suite
)
689 if suite
in self
.test_output
:
690 f
.write(self
.test_output
[suite
] + "\n\n")
694 if not self
.immediate
and not self
.verbose
:
695 for suite
in self
.suitesfailed
:
697 print("FAIL: %s" % suite
)
698 if suite
in self
.test_output
:
699 print(self
.test_output
[suite
])
702 f
.write("= Skipped tests =\n")
703 for reason
in self
.skips
.keys():
704 f
.write(reason
+ "\n")
705 for name
in self
.skips
[reason
]:
706 f
.write("\t%s\n" % name
)
710 if (not self
.suitesfailed
and
711 not self
.statistics
['TESTS_UNEXPECTED_FAIL'] and
712 not self
.statistics
['TESTS_UNEXPECTED_OK'] and
713 not self
.statistics
['TESTS_ERROR']):
714 ok
= (self
.statistics
['TESTS_EXPECTED_OK'] +
715 self
.statistics
['TESTS_EXPECTED_FAIL'])
716 print("\nALL OK (%d tests in %d testsuites)" % (ok
, self
.suites_ok
))
718 print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
719 self
.statistics
['TESTS_UNEXPECTED_FAIL'],
720 self
.statistics
['TESTS_ERROR'],
721 self
.statistics
['TESTS_UNEXPECTED_OK'],
722 len(self
.suitesfailed
)))
724 def skip_testsuite(self
, name
, reason
="UNKNOWN"):
725 self
.skips
.setdefault(reason
, []).append(name
)
727 self
.totalsuites
-= 1