1 # Python module for parsing and generating the Subunit protocol
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__
import print_function
19 __all__
= ['parse_results']
25 from samba
import subunit
26 from samba
.subunit
.run
import TestProtocolClient
27 from samba
.subunit
import iso8601
31 VALID_RESULTS
= set(['success', 'successful', 'failure', 'fail', 'skip',
32 'knownfail', 'error', 'xfail', 'skip-testsuite',
33 'testsuite-failure', 'testsuite-xfail',
34 'testsuite-success', 'testsuite-error',
35 'uxsuccess', 'testsuite-uxsuccess'])
38 class TestsuiteEnabledTestResult(unittest
.TestResult
):
40 def start_testsuite(self
, name
):
41 raise NotImplementedError(self
.start_testsuite
)
44 def parse_results(msg_ops
, statistics
, fh
):
49 parts
= l
.split(None, 1)
50 if not len(parts
) == 2 or not l
.startswith(parts
[0]):
53 command
= parts
[0].rstrip(":")
55 if command
in ("test", "testing"):
56 msg_ops
.control_msg(l
)
58 test
= subunit
.RemotedTestCase(name
)
59 if name
in open_tests
:
60 msg_ops
.addError(open_tests
.pop(name
), subunit
.RemoteError(u
"Test already running"))
61 msg_ops
.startTest(test
)
62 open_tests
[name
] = test
63 elif command
== "time":
64 msg_ops
.control_msg(l
)
66 dt
= iso8601
.parse_date(arg
.rstrip("\n"))
67 except TypeError as e
:
68 print("Unable to parse time line: %s" % arg
.rstrip("\n"))
71 elif command
in VALID_RESULTS
:
72 msg_ops
.control_msg(l
)
74 grp
= re
.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg
)
75 (testname
, hasreason
) = (grp
.group(1), grp
.group(2))
78 # reason may be specified in next lines
81 msg_ops
.control_msg(l
)
88 if isinstance(reason
, bytes
):
89 remote_error
= subunit
.RemoteError(reason
.decode("utf-8"))
91 remote_error
= subunit
.RemoteError(reason
)
94 statistics
['TESTS_ERROR'] += 1
95 msg_ops
.addError(subunit
.RemotedTestCase(testname
),
96 subunit
.RemoteError(u
"result (%s) reason (%s) interrupted" % (result
, reason
)))
100 remote_error
= subunit
.RemoteError(u
"No reason specified")
101 if result
in ("success", "successful"):
103 test
= open_tests
.pop(testname
)
105 statistics
['TESTS_ERROR'] += 1
107 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
109 statistics
['TESTS_EXPECTED_OK'] += 1
110 msg_ops
.addSuccess(test
)
111 elif result
in ("xfail", "knownfail"):
113 test
= open_tests
.pop(testname
)
115 statistics
['TESTS_ERROR'] += 1
117 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
119 statistics
['TESTS_EXPECTED_FAIL'] += 1
120 msg_ops
.addExpectedFailure(test
, remote_error
)
121 elif result
in ("uxsuccess", ):
123 test
= open_tests
.pop(testname
)
125 statistics
['TESTS_ERROR'] += 1
127 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
129 statistics
['TESTS_UNEXPECTED_OK'] += 1
130 msg_ops
.addUnexpectedSuccess(test
)
132 elif result
in ("failure", "fail"):
134 test
= open_tests
.pop(testname
)
136 statistics
['TESTS_ERROR'] += 1
138 msg_ops
.addError(subunit
.RemotedTestCase(testname
), subunit
.RemoteError(u
"Test was never started"))
140 statistics
['TESTS_UNEXPECTED_FAIL'] += 1
142 msg_ops
.addFailure(test
, remote_error
)
143 elif result
== "skip":
144 statistics
['TESTS_SKIP'] += 1
145 # Allow tests to be skipped without prior announcement of test
147 test
= open_tests
.pop(testname
)
149 test
= subunit
.RemotedTestCase(testname
)
150 msg_ops
.addSkip(test
, reason
)
151 elif result
== "error":
152 statistics
['TESTS_ERROR'] += 1
155 test
= open_tests
.pop(testname
)
157 test
= subunit
.RemotedTestCase(testname
)
158 msg_ops
.addError(test
, remote_error
)
159 elif result
== "skip-testsuite":
160 msg_ops
.skip_testsuite(testname
)
161 elif result
== "testsuite-success":
162 msg_ops
.end_testsuite(testname
, "success", reason
)
163 elif result
== "testsuite-failure":
164 msg_ops
.end_testsuite(testname
, "failure", reason
)
166 elif result
== "testsuite-xfail":
167 msg_ops
.end_testsuite(testname
, "xfail", reason
)
168 elif result
== "testsuite-uxsuccess":
169 msg_ops
.end_testsuite(testname
, "uxsuccess", reason
)
171 elif result
== "testsuite-error":
172 msg_ops
.end_testsuite(testname
, "error", reason
)
175 raise AssertionError("Recognized but unhandled result %r" %
177 elif command
== "testsuite":
178 msg_ops
.start_testsuite(arg
.strip())
179 elif command
== "progress":
182 msg_ops
.progress(None, subunit
.PROGRESS_POP
)
184 msg_ops
.progress(None, subunit
.PROGRESS_PUSH
)
186 msg_ops
.progress(int(arg
), subunit
.PROGRESS_CUR
)
188 msg_ops
.progress(int(arg
), subunit
.PROGRESS_SET
)
190 msg_ops
.output_msg(l
)
193 test
= subunit
.RemotedTestCase(open_tests
.popitem()[1])
194 msg_ops
.addError(test
, subunit
.RemoteError(u
"was started but never finished!"))
195 statistics
['TESTS_ERROR'] += 1
201 class SubunitOps(TestProtocolClient
, TestsuiteEnabledTestResult
):
203 def progress(self
, count
, whence
):
204 if whence
== subunit
.PROGRESS_POP
:
205 self
._stream
.write("progress: pop\n")
206 elif whence
== subunit
.PROGRESS_PUSH
:
207 self
._stream
.write("progress: push\n")
208 elif whence
== subunit
.PROGRESS_SET
:
209 self
._stream
.write("progress: %d\n" % count
)
210 elif whence
== subunit
.PROGRESS_CUR
:
211 raise NotImplementedError
213 # The following are Samba extensions:
214 def start_testsuite(self
, name
):
215 self
._stream
.write("testsuite: %s\n" % name
)
217 def skip_testsuite(self
, name
, reason
=None):
219 self
._stream
.write("skip-testsuite: %s [\n%s\n]\n" % (name
, reason
))
221 self
._stream
.write("skip-testsuite: %s\n" % name
)
223 def end_testsuite(self
, name
, result
, reason
=None):
225 self
._stream
.write("testsuite-%s: %s [\n%s\n]\n" % (result
, name
, reason
))
227 self
._stream
.write("testsuite-%s: %s\n" % (result
, name
))
229 def output_msg(self
, msg
):
230 self
._stream
.write(msg
)
233 def read_test_regexes(*names
):
237 # if we are given a directory, we read all the files it contains
238 # (except the ones that end with "~").
239 if os
.path
.isdir(name
):
240 files
.extend([os
.path
.join(name
, x
)
241 for x
in os
.listdir(name
)
246 for filename
in files
:
247 with
open(filename
, 'r') as f
:
250 if l
== "" or l
[0] == "#":
253 (regex
, reason
) = l
.split("#", 1)
254 ret
[regex
.strip()] = reason
.strip()
261 def find_in_list(regexes
, fullname
):
262 for regex
, reason
in regexes
.items():
263 if re
.match(regex
, fullname
):
270 class ImmediateFail(Exception):
271 """Raised to abort immediately."""
274 super(ImmediateFail
, self
).__init
__("test failed and fail_immediately set")
277 class FilterOps(unittest
.TestResult
):
279 def control_msg(self
, msg
):
280 pass # We regenerate control messages, so ignore this
282 def time(self
, time
):
285 def progress(self
, delta
, whence
):
286 self
._ops
.progress(delta
, whence
)
288 def output_msg(self
, msg
):
289 if self
.output
is None:
290 sys
.stdout
.write(msg
)
294 def startTest(self
, test
):
295 self
.seen_output
= True
296 test
= self
._add
_prefix
(test
)
297 if self
.strip_ok_output
:
300 self
._ops
.startTest(test
)
302 def _add_prefix(self
, test
):
303 return subunit
.RemotedTestCase(self
.prefix
+ test
.id() + self
.suffix
)
305 def addError(self
, test
, err
=None):
306 test
= self
._add
_prefix
(test
)
307 self
.error_added
+= 1
308 self
.total_error
+= 1
309 self
._ops
.addError(test
, err
)
311 if self
.fail_immediately
:
312 raise ImmediateFail()
314 def addSkip(self
, test
, reason
=None):
315 self
.seen_output
= True
316 test
= self
._add
_prefix
(test
)
317 self
._ops
.addSkip(test
, reason
)
320 def addExpectedFailure(self
, test
, err
=None):
321 test
= self
._add
_prefix
(test
)
322 self
._ops
.addExpectedFailure(test
, err
)
325 def addUnexpectedSuccess(self
, test
):
326 test
= self
._add
_prefix
(test
)
327 self
.uxsuccess_added
+= 1
328 self
.total_uxsuccess
+= 1
329 self
._ops
.addUnexpectedSuccess(test
)
331 self
._ops
.output_msg(self
.output
)
333 if self
.fail_immediately
:
334 raise ImmediateFail()
336 def addFailure(self
, test
, err
=None):
337 test
= self
._add
_prefix
(test
)
338 xfail_reason
= find_in_list(self
.expected_failures
, test
.id())
339 if xfail_reason
is None:
340 xfail_reason
= find_in_list(self
.flapping
, test
.id())
341 if xfail_reason
is not None:
342 self
.xfail_added
+= 1
343 self
.total_xfail
+= 1
344 self
._ops
.addExpectedFailure(test
, err
)
348 self
._ops
.addFailure(test
, err
)
350 self
._ops
.output_msg(self
.output
)
351 if self
.fail_immediately
:
352 raise ImmediateFail()
355 def addSuccess(self
, test
):
356 test
= self
._add
_prefix
(test
)
357 xfail_reason
= find_in_list(self
.expected_failures
, test
.id())
358 if xfail_reason
is not None:
359 self
.uxsuccess_added
+= 1
360 self
.total_uxsuccess
+= 1
361 self
._ops
.addUnexpectedSuccess(test
)
363 self
._ops
.output_msg(self
.output
)
364 if self
.fail_immediately
:
365 raise ImmediateFail()
367 self
._ops
.addSuccess(test
)
370 def skip_testsuite(self
, name
, reason
=None):
371 self
._ops
.skip_testsuite(name
, reason
)
373 def start_testsuite(self
, name
):
374 self
._ops
.start_testsuite(name
)
378 self
.uxsuccess_added
= 0
380 def end_testsuite(self
, name
, result
, reason
=None):
383 if self
.xfail_added
> 0:
385 if self
.fail_added
> 0 or self
.error_added
> 0 or self
.uxsuccess_added
> 0:
388 if xfail
and result
in ("fail", "failure"):
391 if self
.uxsuccess_added
> 0 and result
!= "uxsuccess":
394 reason
= "Subunit/Filter Reason"
395 reason
+= "\n uxsuccess[%d]" % self
.uxsuccess_added
397 if self
.fail_added
> 0 and result
!= "failure":
400 reason
= "Subunit/Filter Reason"
401 reason
+= "\n failures[%d]" % self
.fail_added
403 if self
.error_added
> 0 and result
!= "error":
406 reason
= "Subunit/Filter Reason"
407 reason
+= "\n errors[%d]" % self
.error_added
409 self
._ops
.end_testsuite(name
, result
, reason
)
410 if result
not in ("success", "xfail"):
412 self
._ops
.output_msg(self
.output
)
413 if self
.fail_immediately
:
414 raise ImmediateFail()
417 def __init__(self
, out
, prefix
=None, suffix
=None, expected_failures
=None,
418 strip_ok_output
=False, fail_immediately
=False,
421 self
.seen_output
= False
425 if expected_failures
is not None:
426 self
.expected_failures
= expected_failures
428 self
.expected_failures
= {}
429 if flapping
is not None:
430 self
.flapping
= flapping
433 self
.strip_ok_output
= strip_ok_output
436 self
.uxsuccess_added
= 0
440 self
.total_uxsuccess
= 0
442 self
.fail_immediately
= fail_immediately
445 class PerfFilterOps(unittest
.TestResult
):
447 def progress(self
, delta
, whence
):
450 def output_msg(self
, msg
):
453 def control_msg(self
, msg
):
456 def skip_testsuite(self
, name
, reason
=None):
457 self
._ops
.skip_testsuite(name
, reason
)
459 def start_testsuite(self
, name
):
460 self
.suite_has_time
= False
462 def end_testsuite(self
, name
, result
, reason
=None):
465 def _add_prefix(self
, test
):
466 return subunit
.RemotedTestCase(self
.prefix
+ test
.id() + self
.suffix
)
468 def time(self
, time
):
469 self
.latest_time
= time
470 #self._ops.output_msg("found time %s\n" % time)
471 self
.suite_has_time
= True
474 if self
.suite_has_time
:
475 return self
.latest_time
476 return datetime
.datetime
.utcnow()
478 def startTest(self
, test
):
479 self
.seen_output
= True
480 test
= self
._add
_prefix
(test
)
481 self
.starts
[test
.id()] = self
.get_time()
483 def addSuccess(self
, test
):
484 test
= self
._add
_prefix
(test
)
486 if tid
not in self
.starts
:
487 self
._ops
.addError(test
, "%s succeeded without ever starting!" % tid
)
488 delta
= self
.get_time() - self
.starts
[tid
]
489 self
._ops
.output_msg("elapsed-time: %s: %f\n" % (tid
, delta
.total_seconds()))
491 def addFailure(self
, test
, err
=''):
493 delta
= self
.get_time() - self
.starts
[tid
]
494 self
._ops
.output_msg("failure: %s failed after %f seconds (%s)\n" %
495 (tid
, delta
.total_seconds(), err
))
497 def addError(self
, test
, err
=''):
499 delta
= self
.get_time() - self
.starts
[tid
]
500 self
._ops
.output_msg("error: %s failed after %f seconds (%s)\n" %
501 (tid
, delta
.total_seconds(), err
))
503 def __init__(self
, out
, prefix
='', suffix
=''):
505 self
.prefix
= prefix
or ''
506 self
.suffix
= suffix
or ''
508 self
.seen_output
= False
509 self
.suite_has_time
= False
512 class PlainFormatter(TestsuiteEnabledTestResult
):
514 def __init__(self
, verbose
, immediate
, statistics
,
516 super(PlainFormatter
, self
).__init
__()
517 self
.verbose
= verbose
518 self
.immediate
= immediate
519 self
.statistics
= statistics
520 self
.start_time
= None
521 self
.test_output
= {}
522 self
.suitesfailed
= []
527 self
._progress
_level
= 0
528 self
.totalsuites
= totaltests
529 self
.last_time
= None
532 def _format_time(delta
):
533 minutes
, seconds
= divmod(delta
.seconds
, 60)
534 hours
, minutes
= divmod(minutes
, 60)
539 ret
+= "%dm" % minutes
540 ret
+= "%ds" % seconds
543 def progress(self
, offset
, whence
):
544 if whence
== subunit
.PROGRESS_POP
:
545 self
._progress
_level
-= 1
546 elif whence
== subunit
.PROGRESS_PUSH
:
547 self
._progress
_level
+= 1
548 elif whence
== subunit
.PROGRESS_SET
:
549 if self
._progress
_level
== 0:
550 self
.totalsuites
= offset
551 elif whence
== subunit
.PROGRESS_CUR
:
552 raise NotImplementedError
555 if self
.start_time
is None:
559 def start_testsuite(self
, name
):
564 self
.test_output
[name
] = ""
566 total_tests
= (self
.statistics
['TESTS_EXPECTED_OK'] +
567 self
.statistics
['TESTS_EXPECTED_FAIL'] +
568 self
.statistics
['TESTS_ERROR'] +
569 self
.statistics
['TESTS_UNEXPECTED_FAIL'] +
570 self
.statistics
['TESTS_UNEXPECTED_OK'])
572 out
= "[%d(%d)" % (self
.index
, total_tests
)
573 if self
.totalsuites
is not None:
574 out
+= "/%d" % self
.totalsuites
575 if self
.start_time
is not None:
576 out
+= " at " + self
._format
_time
(self
.last_time
- self
.start_time
)
577 if self
.suitesfailed
:
578 out
+= ", %d errors" % (len(self
.suitesfailed
),)
581 sys
.stdout
.write(out
+ "\n")
583 sys
.stdout
.write(out
+ ": ")
585 def output_msg(self
, output
):
587 sys
.stdout
.write(output
)
588 elif self
.name
is not None:
589 self
.test_output
[self
.name
] += output
591 sys
.stdout
.write(output
)
593 def control_msg(self
, output
):
596 def end_testsuite(self
, name
, result
, reason
):
600 if name
not in self
.test_output
:
601 print("no output for name[%s]" % name
)
603 if result
in ("success", "xfail"):
606 self
.output_msg("ERROR: Testsuite[%s]\n" % name
)
607 if reason
is not None:
608 self
.output_msg("REASON: %s\n" % (reason
,))
609 self
.suitesfailed
.append(name
)
610 if self
.immediate
and not self
.verbose
and name
in self
.test_output
:
611 out
+= self
.test_output
[name
]
614 if not self
.immediate
:
618 out
+= " " + result
.upper() + "\n"
620 sys
.stdout
.write(out
)
622 def startTest(self
, test
):
625 def addSuccess(self
, test
):
626 self
.end_test(test
.id(), "success", False)
628 def addError(self
, test
, err
=None):
629 self
.end_test(test
.id(), "error", True, err
)
631 def addFailure(self
, test
, err
=None):
632 self
.end_test(test
.id(), "failure", True, err
)
634 def addSkip(self
, test
, reason
=None):
635 self
.end_test(test
.id(), "skip", False, reason
)
637 def addExpectedFailure(self
, test
, err
=None):
638 self
.end_test(test
.id(), "xfail", False, err
)
640 def addUnexpectedSuccess(self
, test
):
641 self
.end_test(test
.id(), "uxsuccess", True)
643 def end_test(self
, testname
, result
, unexpected
, err
=None):
645 self
.test_output
[self
.name
] = ""
646 if not self
.immediate
:
651 'success': '.'}.get(result
, "?(%s)" % result
))
654 if self
.name
not in self
.test_output
:
655 self
.test_output
[self
.name
] = ""
657 self
.test_output
[self
.name
] += "UNEXPECTED(%s): %s\n" % (result
, testname
)
659 self
.test_output
[self
.name
] += "REASON: %s\n" % str(err
[1]).strip()
661 if self
.immediate
and not self
.verbose
:
662 sys
.stdout
.write(self
.test_output
[self
.name
])
663 self
.test_output
[self
.name
] = ""
665 if not self
.immediate
:
670 'success': 'S'}.get(result
, "?"))
672 def write_summary(self
, path
):
675 if self
.suitesfailed
:
676 f
.write("= Failed tests =\n")
678 for suite
in self
.suitesfailed
:
679 f
.write("== %s ==\n" % suite
)
680 if suite
in self
.test_output
:
681 f
.write(self
.test_output
[suite
] + "\n\n")
685 if not self
.immediate
and not self
.verbose
:
686 for suite
in self
.suitesfailed
:
688 print("FAIL: %s" % suite
)
689 if suite
in self
.test_output
:
690 print(self
.test_output
[suite
])
693 f
.write("= Skipped tests =\n")
694 for reason
in self
.skips
.keys():
695 f
.write(reason
+ "\n")
696 for name
in self
.skips
[reason
]:
697 f
.write("\t%s\n" % name
)
701 if (not self
.suitesfailed
and
702 not self
.statistics
['TESTS_UNEXPECTED_FAIL'] and
703 not self
.statistics
['TESTS_UNEXPECTED_OK'] and
704 not self
.statistics
['TESTS_ERROR']):
705 ok
= (self
.statistics
['TESTS_EXPECTED_OK'] +
706 self
.statistics
['TESTS_EXPECTED_FAIL'])
707 print("\nALL OK (%d tests in %d testsuites)" % (ok
, self
.suites_ok
))
709 print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
710 self
.statistics
['TESTS_UNEXPECTED_FAIL'],
711 self
.statistics
['TESTS_ERROR'],
712 self
.statistics
['TESTS_UNEXPECTED_OK'],
713 len(self
.suitesfailed
)))
715 def skip_testsuite(self
, name
, reason
="UNKNOWN"):
716 self
.skips
.setdefault(reason
, []).append(name
)
718 self
.totalsuites
-= 1