1 # Python module for parsing and generating the Subunit protocol
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__
import print_function
19 __all__
= ['parse_results']
25 from samba
import subunit
26 from samba
.subunit
.run
import TestProtocolClient
27 from samba
.subunit
import iso8601
30 VALID_RESULTS
= set(['success', 'successful', 'failure', 'fail', 'skip',
31 'knownfail', 'error', 'xfail', 'skip-testsuite',
32 'testsuite-failure', 'testsuite-xfail',
33 'testsuite-success', 'testsuite-error',
34 'uxsuccess', 'testsuite-uxsuccess'])
37 class TestsuiteEnabledTestResult(unittest
.TestResult
):
39 def start_testsuite(self
, name
):
40 raise NotImplementedError(self
.start_testsuite
)
43 def parse_results(msg_ops
, statistics
, fh
):
51 parts
= l
.split(None, 1)
52 if not len(parts
) == 2 or not l
.startswith(parts
[0]):
55 command
= parts
[0].rstrip(":")
57 if command
in ("test", "testing"):
58 msg_ops
.control_msg(l
)
60 test
= subunit
.RemotedTestCase(name
)
61 if name
in open_tests
:
62 msg_ops
.addError(open_tests
.pop(name
), subunit
.RemoteError(u
"Test already running"))
63 msg_ops
.startTest(test
)
64 open_tests
[name
] = test
65 elif command
== "time":
66 msg_ops
.control_msg(l
)
68 dt
= iso8601
.parse_date(arg
.rstrip("\n"))
69 except TypeError as e
:
70 print("Unable to parse time line: %s" % arg
.rstrip("\n"))
73 elif command
in VALID_RESULTS
:
74 msg_ops
.control_msg(l
)
76 grp
= re
.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg
)
77 (testname
, hasreason
) = (grp
.group(1), grp
.group(2))
80 # reason may be specified in next lines
86 msg_ops
.control_msg(l
)
94 remote_error
= subunit
.RemoteError(reason
.decode("utf-8"))
97 statistics
['TESTS_ERROR'] += 1
98 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"reason (%s) interrupted" % result
))
102 remote_error
= subunit
.RemoteError(u
"No reason specified")
103 if result
in ("success", "successful"):
105 test
= open_tests
.pop(testname
)
107 statistics
['TESTS_ERROR'] += 1
109 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
111 statistics
['TESTS_EXPECTED_OK'] += 1
112 msg_ops
.addSuccess(test
)
113 elif result
in ("xfail", "knownfail"):
115 test
= open_tests
.pop(testname
)
117 statistics
['TESTS_ERROR'] += 1
119 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
121 statistics
['TESTS_EXPECTED_FAIL'] += 1
122 msg_ops
.addExpectedFailure(test
, remote_error
)
123 elif result
in ("uxsuccess", ):
125 test
= open_tests
.pop(testname
)
127 statistics
['TESTS_ERROR'] += 1
129 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
131 statistics
['TESTS_UNEXPECTED_OK'] += 1
132 msg_ops
.addUnexpectedSuccess(test
)
134 elif result
in ("failure", "fail"):
136 test
= open_tests
.pop(testname
)
138 statistics
['TESTS_ERROR'] += 1
140 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
142 statistics
['TESTS_UNEXPECTED_FAIL'] += 1
144 msg_ops
.addFailure(test
, remote_error
)
145 elif result
== "skip":
146 statistics
['TESTS_SKIP'] += 1
147 # Allow tests to be skipped without prior announcement of test
149 test
= open_tests
.pop(testname
)
151 test
= subunit
.RemotedTestCase(testname
)
152 msg_ops
.addSkip(test
, reason
)
153 elif result
== "error":
154 statistics
['TESTS_ERROR'] += 1
157 test
= open_tests
.pop(testname
)
159 test
= subunit
.RemotedTestCase(testname
)
160 msg_ops
.addError(test
, remote_error
)
161 elif result
== "skip-testsuite":
162 msg_ops
.skip_testsuite(testname
)
163 elif result
== "testsuite-success":
164 msg_ops
.end_testsuite(testname
, "success", reason
)
165 elif result
== "testsuite-failure":
166 msg_ops
.end_testsuite(testname
, "failure", reason
)
168 elif result
== "testsuite-xfail":
169 msg_ops
.end_testsuite(testname
, "xfail", reason
)
170 elif result
== "testsuite-uxsuccess":
171 msg_ops
.end_testsuite(testname
, "uxsuccess", reason
)
173 elif result
== "testsuite-error":
174 msg_ops
.end_testsuite(testname
, "error", reason
)
177 raise AssertionError("Recognized but unhandled result %r" %
179 elif command
== "testsuite":
180 msg_ops
.start_testsuite(arg
.strip())
181 elif command
== "progress":
184 msg_ops
.progress(None, subunit
.PROGRESS_POP
)
186 msg_ops
.progress(None, subunit
.PROGRESS_PUSH
)
188 msg_ops
.progress(int(arg
), subunit
.PROGRESS_CUR
)
190 msg_ops
.progress(int(arg
), subunit
.PROGRESS_SET
)
192 msg_ops
.output_msg(l
)
195 test
= subunit
.RemotedTestCase(open_tests
.popitem()[1])
196 msg_ops
.addError(test
, subunit
.RemoteError(u
"was started but never finished!"))
197 statistics
['TESTS_ERROR'] += 1
203 class SubunitOps(TestProtocolClient
, TestsuiteEnabledTestResult
):
205 def progress(self
, count
, whence
):
206 if whence
== subunit
.PROGRESS_POP
:
207 self
._stream
.write("progress: pop\n")
208 elif whence
== subunit
.PROGRESS_PUSH
:
209 self
._stream
.write("progress: push\n")
210 elif whence
== subunit
.PROGRESS_SET
:
211 self
._stream
.write("progress: %d\n" % count
)
212 elif whence
== subunit
.PROGRESS_CUR
:
213 raise NotImplementedError
215 # The following are Samba extensions:
216 def start_testsuite(self
, name
):
217 self
._stream
.write("testsuite: %s\n" % name
)
219 def skip_testsuite(self
, name
, reason
=None):
221 self
._stream
.write("skip-testsuite: %s [\n%s\n]\n" % (name
, reason
))
223 self
._stream
.write("skip-testsuite: %s\n" % name
)
225 def end_testsuite(self
, name
, result
, reason
=None):
227 self
._stream
.write("testsuite-%s: %s [\n%s\n]\n" % (result
, name
, reason
))
229 self
._stream
.write("testsuite-%s: %s\n" % (result
, name
))
231 def output_msg(self
, msg
):
232 self
._stream
.write(msg
)
235 def read_test_regexes(*names
):
239 # if we are given a directory, we read all the files it contains
240 # (except the ones that end with "~").
241 if os
.path
.isdir(name
):
242 files
.extend([os
.path
.join(name
, x
)
243 for x
in os
.listdir(name
)
248 for filename
in files
:
249 f
= open(filename
, 'r')
253 if l
== "" or l
[0] == "#":
256 (regex
, reason
) = l
.split("#", 1)
257 ret
[regex
.strip()] = reason
.strip()
265 def find_in_list(regexes
, fullname
):
266 for regex
, reason
in regexes
.items():
267 if re
.match(regex
, fullname
):
274 class ImmediateFail(Exception):
275 """Raised to abort immediately."""
278 super(ImmediateFail
, self
).__init
__("test failed and fail_immediately set")
281 class FilterOps(unittest
.TestResult
):
283 def control_msg(self
, msg
):
284 pass # We regenerate control messages, so ignore this
286 def time(self
, time
):
289 def progress(self
, delta
, whence
):
290 self
._ops
.progress(delta
, whence
)
292 def output_msg(self
, msg
):
293 if self
.output
is None:
294 sys
.stdout
.write(msg
)
298 def startTest(self
, test
):
299 self
.seen_output
= True
300 test
= self
._add
_prefix
(test
)
301 if self
.strip_ok_output
:
304 self
._ops
.startTest(test
)
306 def _add_prefix(self
, test
):
307 return subunit
.RemotedTestCase(self
.prefix
+ test
.id() + self
.suffix
)
309 def addError(self
, test
, err
=None):
310 test
= self
._add
_prefix
(test
)
311 self
.error_added
+= 1
312 self
.total_error
+= 1
313 self
._ops
.addError(test
, err
)
315 if self
.fail_immediately
:
316 raise ImmediateFail()
318 def addSkip(self
, test
, reason
=None):
319 self
.seen_output
= True
320 test
= self
._add
_prefix
(test
)
321 self
._ops
.addSkip(test
, reason
)
324 def addExpectedFailure(self
, test
, err
=None):
325 test
= self
._add
_prefix
(test
)
326 self
._ops
.addExpectedFailure(test
, err
)
329 def addUnexpectedSuccess(self
, test
):
330 test
= self
._add
_prefix
(test
)
331 self
.uxsuccess_added
+= 1
332 self
.total_uxsuccess
+= 1
333 self
._ops
.addUnexpectedSuccess(test
)
335 self
._ops
.output_msg(self
.output
)
337 if self
.fail_immediately
:
338 raise ImmediateFail()
340 def addFailure(self
, test
, err
=None):
341 test
= self
._add
_prefix
(test
)
342 xfail_reason
= find_in_list(self
.expected_failures
, test
.id())
343 if xfail_reason
is None:
344 xfail_reason
= find_in_list(self
.flapping
, test
.id())
345 if xfail_reason
is not None:
346 self
.xfail_added
+= 1
347 self
.total_xfail
+= 1
348 self
._ops
.addExpectedFailure(test
, err
)
352 self
._ops
.addFailure(test
, err
)
354 self
._ops
.output_msg(self
.output
)
355 if self
.fail_immediately
:
356 raise ImmediateFail()
359 def addSuccess(self
, test
):
360 test
= self
._add
_prefix
(test
)
361 xfail_reason
= find_in_list(self
.expected_failures
, test
.id())
362 if xfail_reason
is not None:
363 self
.uxsuccess_added
+= 1
364 self
.total_uxsuccess
+= 1
365 self
._ops
.addUnexpectedSuccess(test
)
367 self
._ops
.output_msg(self
.output
)
368 if self
.fail_immediately
:
369 raise ImmediateFail()
371 self
._ops
.addSuccess(test
)
374 def skip_testsuite(self
, name
, reason
=None):
375 self
._ops
.skip_testsuite(name
, reason
)
377 def start_testsuite(self
, name
):
378 self
._ops
.start_testsuite(name
)
382 self
.uxsuccess_added
= 0
384 def end_testsuite(self
, name
, result
, reason
=None):
387 if self
.xfail_added
> 0:
389 if self
.fail_added
> 0 or self
.error_added
> 0 or self
.uxsuccess_added
> 0:
392 if xfail
and result
in ("fail", "failure"):
395 if self
.uxsuccess_added
> 0 and result
!= "uxsuccess":
398 reason
= "Subunit/Filter Reason"
399 reason
+= "\n uxsuccess[%d]" % self
.uxsuccess_added
401 if self
.fail_added
> 0 and result
!= "failure":
404 reason
= "Subunit/Filter Reason"
405 reason
+= "\n failures[%d]" % self
.fail_added
407 if self
.error_added
> 0 and result
!= "error":
410 reason
= "Subunit/Filter Reason"
411 reason
+= "\n errors[%d]" % self
.error_added
413 self
._ops
.end_testsuite(name
, result
, reason
)
414 if result
not in ("success", "xfail"):
416 self
._ops
.output_msg(self
.output
)
417 if self
.fail_immediately
:
418 raise ImmediateFail()
421 def __init__(self
, out
, prefix
=None, suffix
=None, expected_failures
=None,
422 strip_ok_output
=False, fail_immediately
=False,
425 self
.seen_output
= False
429 if expected_failures
is not None:
430 self
.expected_failures
= expected_failures
432 self
.expected_failures
= {}
433 if flapping
is not None:
434 self
.flapping
= flapping
437 self
.strip_ok_output
= strip_ok_output
440 self
.uxsuccess_added
= 0
444 self
.total_uxsuccess
= 0
446 self
.fail_immediately
= fail_immediately
449 class PerfFilterOps(unittest
.TestResult
):
451 def progress(self
, delta
, whence
):
454 def output_msg(self
, msg
):
457 def control_msg(self
, msg
):
460 def skip_testsuite(self
, name
, reason
=None):
461 self
._ops
.skip_testsuite(name
, reason
)
463 def start_testsuite(self
, name
):
464 self
.suite_has_time
= False
466 def end_testsuite(self
, name
, result
, reason
=None):
469 def _add_prefix(self
, test
):
470 return subunit
.RemotedTestCase(self
.prefix
+ test
.id() + self
.suffix
)
472 def time(self
, time
):
473 self
.latest_time
= time
474 #self._ops.output_msg("found time %s\n" % time)
475 self
.suite_has_time
= True
478 if self
.suite_has_time
:
479 return self
.latest_time
480 return datetime
.datetime
.utcnow()
482 def startTest(self
, test
):
483 self
.seen_output
= True
484 test
= self
._add
_prefix
(test
)
485 self
.starts
[test
.id()] = self
.get_time()
487 def addSuccess(self
, test
):
488 test
= self
._add
_prefix
(test
)
490 if tid
not in self
.starts
:
491 self
._ops
.addError(test
, "%s succeeded without ever starting!" % tid
)
492 delta
= self
.get_time() - self
.starts
[tid
]
493 self
._ops
.output_msg("elapsed-time: %s: %f\n" % (tid
, delta
.total_seconds()))
495 def addFailure(self
, test
, err
=''):
497 delta
= self
.get_time() - self
.starts
[tid
]
498 self
._ops
.output_msg("failure: %s failed after %f seconds (%s)\n" %
499 (tid
, delta
.total_seconds(), err
))
501 def addError(self
, test
, err
=''):
503 delta
= self
.get_time() - self
.starts
[tid
]
504 self
._ops
.output_msg("error: %s failed after %f seconds (%s)\n" %
505 (tid
, delta
.total_seconds(), err
))
507 def __init__(self
, out
, prefix
='', suffix
=''):
509 self
.prefix
= prefix
or ''
510 self
.suffix
= suffix
or ''
512 self
.seen_output
= False
513 self
.suite_has_time
= False
516 class PlainFormatter(TestsuiteEnabledTestResult
):
518 def __init__(self
, verbose
, immediate
, statistics
,
520 super(PlainFormatter
, self
).__init
__()
521 self
.verbose
= verbose
522 self
.immediate
= immediate
523 self
.statistics
= statistics
524 self
.start_time
= None
525 self
.test_output
= {}
526 self
.suitesfailed
= []
531 self
._progress
_level
= 0
532 self
.totalsuites
= totaltests
533 self
.last_time
= None
536 def _format_time(delta
):
537 minutes
, seconds
= divmod(delta
.seconds
, 60)
538 hours
, minutes
= divmod(minutes
, 60)
543 ret
+= "%dm" % minutes
544 ret
+= "%ds" % seconds
547 def progress(self
, offset
, whence
):
548 if whence
== subunit
.PROGRESS_POP
:
549 self
._progress
_level
-= 1
550 elif whence
== subunit
.PROGRESS_PUSH
:
551 self
._progress
_level
+= 1
552 elif whence
== subunit
.PROGRESS_SET
:
553 if self
._progress
_level
== 0:
554 self
.totalsuites
= offset
555 elif whence
== subunit
.PROGRESS_CUR
:
556 raise NotImplementedError
559 if self
.start_time
is None:
563 def start_testsuite(self
, name
):
568 self
.test_output
[name
] = ""
570 total_tests
= (self
.statistics
['TESTS_EXPECTED_OK'] +
571 self
.statistics
['TESTS_EXPECTED_FAIL'] +
572 self
.statistics
['TESTS_ERROR'] +
573 self
.statistics
['TESTS_UNEXPECTED_FAIL'] +
574 self
.statistics
['TESTS_UNEXPECTED_OK'])
576 out
= "[%d(%d)" % (self
.index
, total_tests
)
577 if self
.totalsuites
is not None:
578 out
+= "/%d" % self
.totalsuites
579 if self
.start_time
is not None:
580 out
+= " at " + self
._format
_time
(self
.last_time
- self
.start_time
)
581 if self
.suitesfailed
:
582 out
+= ", %d errors" % (len(self
.suitesfailed
),)
585 sys
.stdout
.write(out
+ "\n")
587 sys
.stdout
.write(out
+ ": ")
589 def output_msg(self
, output
):
591 sys
.stdout
.write(output
)
592 elif self
.name
is not None:
593 self
.test_output
[self
.name
] += output
595 sys
.stdout
.write(output
)
597 def control_msg(self
, output
):
600 def end_testsuite(self
, name
, result
, reason
):
604 if name
not in self
.test_output
:
605 print("no output for name[%s]" % name
)
607 if result
in ("success", "xfail"):
610 self
.output_msg("ERROR: Testsuite[%s]\n" % name
)
611 if reason
is not None:
612 self
.output_msg("REASON: %s\n" % (reason
,))
613 self
.suitesfailed
.append(name
)
614 if self
.immediate
and not self
.verbose
and name
in self
.test_output
:
615 out
+= self
.test_output
[name
]
618 if not self
.immediate
:
622 out
+= " " + result
.upper() + "\n"
624 sys
.stdout
.write(out
)
626 def startTest(self
, test
):
629 def addSuccess(self
, test
):
630 self
.end_test(test
.id(), "success", False)
632 def addError(self
, test
, err
=None):
633 self
.end_test(test
.id(), "error", True, err
)
635 def addFailure(self
, test
, err
=None):
636 self
.end_test(test
.id(), "failure", True, err
)
638 def addSkip(self
, test
, reason
=None):
639 self
.end_test(test
.id(), "skip", False, reason
)
641 def addExpectedFailure(self
, test
, err
=None):
642 self
.end_test(test
.id(), "xfail", False, err
)
644 def addUnexpectedSuccess(self
, test
):
645 self
.end_test(test
.id(), "uxsuccess", True)
647 def end_test(self
, testname
, result
, unexpected
, err
=None):
649 self
.test_output
[self
.name
] = ""
650 if not self
.immediate
:
655 'success': '.'}.get(result
, "?(%s)" % result
))
658 if self
.name
not in self
.test_output
:
659 self
.test_output
[self
.name
] = ""
661 self
.test_output
[self
.name
] += "UNEXPECTED(%s): %s\n" % (result
, testname
)
663 self
.test_output
[self
.name
] += "REASON: %s\n" % str(err
[1]).strip()
665 if self
.immediate
and not self
.verbose
:
666 sys
.stdout
.write(self
.test_output
[self
.name
])
667 self
.test_output
[self
.name
] = ""
669 if not self
.immediate
:
674 'success': 'S'}.get(result
, "?"))
676 def write_summary(self
, path
):
679 if self
.suitesfailed
:
680 f
.write("= Failed tests =\n")
682 for suite
in self
.suitesfailed
:
683 f
.write("== %s ==\n" % suite
)
684 if suite
in self
.test_output
:
685 f
.write(self
.test_output
[suite
] + "\n\n")
689 if not self
.immediate
and not self
.verbose
:
690 for suite
in self
.suitesfailed
:
692 print("FAIL: %s" % suite
)
693 if suite
in self
.test_output
:
694 print(self
.test_output
[suite
])
697 f
.write("= Skipped tests =\n")
698 for reason
in self
.skips
.keys():
699 f
.write(reason
+ "\n")
700 for name
in self
.skips
[reason
]:
701 f
.write("\t%s\n" % name
)
705 if (not self
.suitesfailed
and
706 not self
.statistics
['TESTS_UNEXPECTED_FAIL'] and
707 not self
.statistics
['TESTS_UNEXPECTED_OK'] and
708 not self
.statistics
['TESTS_ERROR']):
709 ok
= (self
.statistics
['TESTS_EXPECTED_OK'] +
710 self
.statistics
['TESTS_EXPECTED_FAIL'])
711 print("\nALL OK (%d tests in %d testsuites)" % (ok
, self
.suites_ok
))
713 print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
714 self
.statistics
['TESTS_UNEXPECTED_FAIL'],
715 self
.statistics
['TESTS_ERROR'],
716 self
.statistics
['TESTS_UNEXPECTED_OK'],
717 len(self
.suitesfailed
)))
719 def skip_testsuite(self
, name
, reason
="UNKNOWN"):
720 self
.skips
.setdefault(reason
, []).append(name
)
722 self
.totalsuites
-= 1