ctdb-tests: Fix handling of --no-event-scripts option
[samba.git] / selftest / subunithelper.py
blob651d9b00289dc597295b63944a6616e0ed4cb315
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
19 __all__ = ['parse_results']
21 import datetime
22 import re
23 import sys
24 import os
25 from samba import subunit
26 from samba.subunit.run import TestProtocolClient
27 from samba.subunit import iso8601
28 import unittest
29 from samba.compat import binary_type
32 VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
33 'knownfail', 'error', 'xfail', 'skip-testsuite',
34 'testsuite-failure', 'testsuite-xfail',
35 'testsuite-success', 'testsuite-error',
36 'uxsuccess', 'testsuite-uxsuccess'])
39 class TestsuiteEnabledTestResult(unittest.TestResult):
41 def start_testsuite(self, name):
42 raise NotImplementedError(self.start_testsuite)
45 def parse_results(msg_ops, statistics, fh):
46 exitcode = 0
47 open_tests = {}
49 while fh:
50 l = fh.readline()
51 if l == "":
52 break
53 parts = l.split(None, 1)
54 if not len(parts) == 2 or not l.startswith(parts[0]):
55 msg_ops.output_msg(l)
56 continue
57 command = parts[0].rstrip(":")
58 arg = parts[1]
59 if command in ("test", "testing"):
60 msg_ops.control_msg(l)
61 name = arg.rstrip()
62 test = subunit.RemotedTestCase(name)
63 if name in open_tests:
64 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
65 msg_ops.startTest(test)
66 open_tests[name] = test
67 elif command == "time":
68 msg_ops.control_msg(l)
69 try:
70 dt = iso8601.parse_date(arg.rstrip("\n"))
71 except TypeError as e:
72 print("Unable to parse time line: %s" % arg.rstrip("\n"))
73 else:
74 msg_ops.time(dt)
75 elif command in VALID_RESULTS:
76 msg_ops.control_msg(l)
77 result = command
78 grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
79 (testname, hasreason) = (grp.group(1), grp.group(2))
80 if hasreason:
81 reason = ""
82 # reason may be specified in next lines
83 terminated = False
84 while fh:
85 l = fh.readline()
86 if l == "":
87 break
88 msg_ops.control_msg(l)
89 if l[-2:] == "]\n":
90 reason += l[:-2]
91 terminated = True
92 break
93 else:
94 reason += l
96 if isinstance(reason, binary_type):
97 remote_error = subunit.RemoteError(reason.decode("utf-8"))
98 else:
99 remote_error = subunit.RemoteError(reason)
101 if not terminated:
102 statistics['TESTS_ERROR'] += 1
103 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
104 return 1
105 else:
106 reason = None
107 remote_error = subunit.RemoteError(u"No reason specified")
108 if result in ("success", "successful"):
109 try:
110 test = open_tests.pop(testname)
111 except KeyError:
112 statistics['TESTS_ERROR'] += 1
113 exitcode = 1
114 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
115 else:
116 statistics['TESTS_EXPECTED_OK'] += 1
117 msg_ops.addSuccess(test)
118 elif result in ("xfail", "knownfail"):
119 try:
120 test = open_tests.pop(testname)
121 except KeyError:
122 statistics['TESTS_ERROR'] += 1
123 exitcode = 1
124 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
125 else:
126 statistics['TESTS_EXPECTED_FAIL'] += 1
127 msg_ops.addExpectedFailure(test, remote_error)
128 elif result in ("uxsuccess", ):
129 try:
130 test = open_tests.pop(testname)
131 except KeyError:
132 statistics['TESTS_ERROR'] += 1
133 exitcode = 1
134 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
135 else:
136 statistics['TESTS_UNEXPECTED_OK'] += 1
137 msg_ops.addUnexpectedSuccess(test)
138 exitcode = 1
139 elif result in ("failure", "fail"):
140 try:
141 test = open_tests.pop(testname)
142 except KeyError:
143 statistics['TESTS_ERROR'] += 1
144 exitcode = 1
145 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
146 else:
147 statistics['TESTS_UNEXPECTED_FAIL'] += 1
148 exitcode = 1
149 msg_ops.addFailure(test, remote_error)
150 elif result == "skip":
151 statistics['TESTS_SKIP'] += 1
152 # Allow tests to be skipped without prior announcement of test
153 try:
154 test = open_tests.pop(testname)
155 except KeyError:
156 test = subunit.RemotedTestCase(testname)
157 msg_ops.addSkip(test, reason)
158 elif result == "error":
159 statistics['TESTS_ERROR'] += 1
160 exitcode = 1
161 try:
162 test = open_tests.pop(testname)
163 except KeyError:
164 test = subunit.RemotedTestCase(testname)
165 msg_ops.addError(test, remote_error)
166 elif result == "skip-testsuite":
167 msg_ops.skip_testsuite(testname)
168 elif result == "testsuite-success":
169 msg_ops.end_testsuite(testname, "success", reason)
170 elif result == "testsuite-failure":
171 msg_ops.end_testsuite(testname, "failure", reason)
172 exitcode = 1
173 elif result == "testsuite-xfail":
174 msg_ops.end_testsuite(testname, "xfail", reason)
175 elif result == "testsuite-uxsuccess":
176 msg_ops.end_testsuite(testname, "uxsuccess", reason)
177 exitcode = 1
178 elif result == "testsuite-error":
179 msg_ops.end_testsuite(testname, "error", reason)
180 exitcode = 1
181 else:
182 raise AssertionError("Recognized but unhandled result %r" %
183 result)
184 elif command == "testsuite":
185 msg_ops.start_testsuite(arg.strip())
186 elif command == "progress":
187 arg = arg.strip()
188 if arg == "pop":
189 msg_ops.progress(None, subunit.PROGRESS_POP)
190 elif arg == "push":
191 msg_ops.progress(None, subunit.PROGRESS_PUSH)
192 elif arg[0] in '+-':
193 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
194 else:
195 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
196 else:
197 msg_ops.output_msg(l)
199 while open_tests:
200 test = subunit.RemotedTestCase(open_tests.popitem()[1])
201 msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
202 statistics['TESTS_ERROR'] += 1
203 exitcode = 1
205 return exitcode
208 class SubunitOps(TestProtocolClient, TestsuiteEnabledTestResult):
210 def progress(self, count, whence):
211 if whence == subunit.PROGRESS_POP:
212 self._stream.write("progress: pop\n")
213 elif whence == subunit.PROGRESS_PUSH:
214 self._stream.write("progress: push\n")
215 elif whence == subunit.PROGRESS_SET:
216 self._stream.write("progress: %d\n" % count)
217 elif whence == subunit.PROGRESS_CUR:
218 raise NotImplementedError
220 # The following are Samba extensions:
221 def start_testsuite(self, name):
222 self._stream.write("testsuite: %s\n" % name)
224 def skip_testsuite(self, name, reason=None):
225 if reason:
226 self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
227 else:
228 self._stream.write("skip-testsuite: %s\n" % name)
230 def end_testsuite(self, name, result, reason=None):
231 if reason:
232 self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
233 else:
234 self._stream.write("testsuite-%s: %s\n" % (result, name))
236 def output_msg(self, msg):
237 self._stream.write(msg)
240 def read_test_regexes(*names):
241 ret = {}
242 files = []
243 for name in names:
244 # if we are given a directory, we read all the files it contains
245 # (except the ones that end with "~").
246 if os.path.isdir(name):
247 files.extend([os.path.join(name, x)
248 for x in os.listdir(name)
249 if x[-1] != '~'])
250 else:
251 files.append(name)
253 for filename in files:
254 f = open(filename, 'r')
255 try:
256 for l in f:
257 l = l.strip()
258 if l == "" or l[0] == "#":
259 continue
260 if "#" in l:
261 (regex, reason) = l.split("#", 1)
262 ret[regex.strip()] = reason.strip()
263 else:
264 ret[l] = None
265 finally:
266 f.close()
267 return ret
270 def find_in_list(regexes, fullname):
271 for regex, reason in regexes.items():
272 if re.match(regex, fullname):
273 if reason is None:
274 return ""
275 return reason
276 return None
279 class ImmediateFail(Exception):
280 """Raised to abort immediately."""
282 def __init__(self):
283 super(ImmediateFail, self).__init__("test failed and fail_immediately set")
286 class FilterOps(unittest.TestResult):
288 def control_msg(self, msg):
289 pass # We regenerate control messages, so ignore this
291 def time(self, time):
292 self._ops.time(time)
294 def progress(self, delta, whence):
295 self._ops.progress(delta, whence)
297 def output_msg(self, msg):
298 if self.output is None:
299 sys.stdout.write(msg)
300 else:
301 self.output += msg
303 def startTest(self, test):
304 self.seen_output = True
305 test = self._add_prefix(test)
306 if self.strip_ok_output:
307 self.output = ""
309 self._ops.startTest(test)
311 def _add_prefix(self, test):
312 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
314 def addError(self, test, err=None):
315 test = self._add_prefix(test)
316 self.error_added += 1
317 self.total_error += 1
318 self._ops.addError(test, err)
319 self.output = None
320 if self.fail_immediately:
321 raise ImmediateFail()
323 def addSkip(self, test, reason=None):
324 self.seen_output = True
325 test = self._add_prefix(test)
326 self._ops.addSkip(test, reason)
327 self.output = None
329 def addExpectedFailure(self, test, err=None):
330 test = self._add_prefix(test)
331 self._ops.addExpectedFailure(test, err)
332 self.output = None
334 def addUnexpectedSuccess(self, test):
335 test = self._add_prefix(test)
336 self.uxsuccess_added += 1
337 self.total_uxsuccess += 1
338 self._ops.addUnexpectedSuccess(test)
339 if self.output:
340 self._ops.output_msg(self.output)
341 self.output = None
342 if self.fail_immediately:
343 raise ImmediateFail()
345 def addFailure(self, test, err=None):
346 test = self._add_prefix(test)
347 xfail_reason = find_in_list(self.expected_failures, test.id())
348 if xfail_reason is None:
349 xfail_reason = find_in_list(self.flapping, test.id())
350 if xfail_reason is not None:
351 self.xfail_added += 1
352 self.total_xfail += 1
353 self._ops.addExpectedFailure(test, err)
354 else:
355 self.fail_added += 1
356 self.total_fail += 1
357 self._ops.addFailure(test, err)
358 if self.output:
359 self._ops.output_msg(self.output)
360 if self.fail_immediately:
361 raise ImmediateFail()
362 self.output = None
364 def addSuccess(self, test):
365 test = self._add_prefix(test)
366 xfail_reason = find_in_list(self.expected_failures, test.id())
367 if xfail_reason is not None:
368 self.uxsuccess_added += 1
369 self.total_uxsuccess += 1
370 self._ops.addUnexpectedSuccess(test)
371 if self.output:
372 self._ops.output_msg(self.output)
373 if self.fail_immediately:
374 raise ImmediateFail()
375 else:
376 self._ops.addSuccess(test)
377 self.output = None
379 def skip_testsuite(self, name, reason=None):
380 self._ops.skip_testsuite(name, reason)
382 def start_testsuite(self, name):
383 self._ops.start_testsuite(name)
384 self.error_added = 0
385 self.fail_added = 0
386 self.xfail_added = 0
387 self.uxsuccess_added = 0
389 def end_testsuite(self, name, result, reason=None):
390 xfail = False
392 if self.xfail_added > 0:
393 xfail = True
394 if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
395 xfail = False
397 if xfail and result in ("fail", "failure"):
398 result = "xfail"
400 if self.uxsuccess_added > 0 and result != "uxsuccess":
401 result = "uxsuccess"
402 if reason is None:
403 reason = "Subunit/Filter Reason"
404 reason += "\n uxsuccess[%d]" % self.uxsuccess_added
406 if self.fail_added > 0 and result != "failure":
407 result = "failure"
408 if reason is None:
409 reason = "Subunit/Filter Reason"
410 reason += "\n failures[%d]" % self.fail_added
412 if self.error_added > 0 and result != "error":
413 result = "error"
414 if reason is None:
415 reason = "Subunit/Filter Reason"
416 reason += "\n errors[%d]" % self.error_added
418 self._ops.end_testsuite(name, result, reason)
419 if result not in ("success", "xfail"):
420 if self.output:
421 self._ops.output_msg(self.output)
422 if self.fail_immediately:
423 raise ImmediateFail()
424 self.output = None
426 def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
427 strip_ok_output=False, fail_immediately=False,
428 flapping=None):
429 self._ops = out
430 self.seen_output = False
431 self.output = None
432 self.prefix = prefix
433 self.suffix = suffix
434 if expected_failures is not None:
435 self.expected_failures = expected_failures
436 else:
437 self.expected_failures = {}
438 if flapping is not None:
439 self.flapping = flapping
440 else:
441 self.flapping = {}
442 self.strip_ok_output = strip_ok_output
443 self.xfail_added = 0
444 self.fail_added = 0
445 self.uxsuccess_added = 0
446 self.total_xfail = 0
447 self.total_error = 0
448 self.total_fail = 0
449 self.total_uxsuccess = 0
450 self.error_added = 0
451 self.fail_immediately = fail_immediately
454 class PerfFilterOps(unittest.TestResult):
456 def progress(self, delta, whence):
457 pass
459 def output_msg(self, msg):
460 pass
462 def control_msg(self, msg):
463 pass
465 def skip_testsuite(self, name, reason=None):
466 self._ops.skip_testsuite(name, reason)
468 def start_testsuite(self, name):
469 self.suite_has_time = False
471 def end_testsuite(self, name, result, reason=None):
472 pass
474 def _add_prefix(self, test):
475 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
477 def time(self, time):
478 self.latest_time = time
479 #self._ops.output_msg("found time %s\n" % time)
480 self.suite_has_time = True
482 def get_time(self):
483 if self.suite_has_time:
484 return self.latest_time
485 return datetime.datetime.utcnow()
487 def startTest(self, test):
488 self.seen_output = True
489 test = self._add_prefix(test)
490 self.starts[test.id()] = self.get_time()
492 def addSuccess(self, test):
493 test = self._add_prefix(test)
494 tid = test.id()
495 if tid not in self.starts:
496 self._ops.addError(test, "%s succeeded without ever starting!" % tid)
497 delta = self.get_time() - self.starts[tid]
498 self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
500 def addFailure(self, test, err=''):
501 tid = test.id()
502 delta = self.get_time() - self.starts[tid]
503 self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
504 (tid, delta.total_seconds(), err))
506 def addError(self, test, err=''):
507 tid = test.id()
508 delta = self.get_time() - self.starts[tid]
509 self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
510 (tid, delta.total_seconds(), err))
512 def __init__(self, out, prefix='', suffix=''):
513 self._ops = out
514 self.prefix = prefix or ''
515 self.suffix = suffix or ''
516 self.starts = {}
517 self.seen_output = False
518 self.suite_has_time = False
521 class PlainFormatter(TestsuiteEnabledTestResult):
523 def __init__(self, verbose, immediate, statistics,
524 totaltests=None):
525 super(PlainFormatter, self).__init__()
526 self.verbose = verbose
527 self.immediate = immediate
528 self.statistics = statistics
529 self.start_time = None
530 self.test_output = {}
531 self.suitesfailed = []
532 self.suites_ok = 0
533 self.skips = {}
534 self.index = 0
535 self.name = None
536 self._progress_level = 0
537 self.totalsuites = totaltests
538 self.last_time = None
540 @staticmethod
541 def _format_time(delta):
542 minutes, seconds = divmod(delta.seconds, 60)
543 hours, minutes = divmod(minutes, 60)
544 ret = ""
545 if hours:
546 ret += "%dh" % hours
547 if minutes:
548 ret += "%dm" % minutes
549 ret += "%ds" % seconds
550 return ret
552 def progress(self, offset, whence):
553 if whence == subunit.PROGRESS_POP:
554 self._progress_level -= 1
555 elif whence == subunit.PROGRESS_PUSH:
556 self._progress_level += 1
557 elif whence == subunit.PROGRESS_SET:
558 if self._progress_level == 0:
559 self.totalsuites = offset
560 elif whence == subunit.PROGRESS_CUR:
561 raise NotImplementedError
563 def time(self, dt):
564 if self.start_time is None:
565 self.start_time = dt
566 self.last_time = dt
568 def start_testsuite(self, name):
569 self.index += 1
570 self.name = name
572 if not self.verbose:
573 self.test_output[name] = ""
575 total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
576 self.statistics['TESTS_EXPECTED_FAIL'] +
577 self.statistics['TESTS_ERROR'] +
578 self.statistics['TESTS_UNEXPECTED_FAIL'] +
579 self.statistics['TESTS_UNEXPECTED_OK'])
581 out = "[%d(%d)" % (self.index, total_tests)
582 if self.totalsuites is not None:
583 out += "/%d" % self.totalsuites
584 if self.start_time is not None:
585 out += " at " + self._format_time(self.last_time - self.start_time)
586 if self.suitesfailed:
587 out += ", %d errors" % (len(self.suitesfailed),)
588 out += "] %s" % name
589 if self.immediate:
590 sys.stdout.write(out + "\n")
591 else:
592 sys.stdout.write(out + ": ")
594 def output_msg(self, output):
595 if self.verbose:
596 sys.stdout.write(output)
597 elif self.name is not None:
598 self.test_output[self.name] += output
599 else:
600 sys.stdout.write(output)
602 def control_msg(self, output):
603 pass
605 def end_testsuite(self, name, result, reason):
606 out = ""
607 unexpected = False
609 if name not in self.test_output:
610 print("no output for name[%s]" % name)
612 if result in ("success", "xfail"):
613 self.suites_ok += 1
614 else:
615 self.output_msg("ERROR: Testsuite[%s]\n" % name)
616 if reason is not None:
617 self.output_msg("REASON: %s\n" % (reason,))
618 self.suitesfailed.append(name)
619 if self.immediate and not self.verbose and name in self.test_output:
620 out += self.test_output[name]
621 unexpected = True
623 if not self.immediate:
624 if not unexpected:
625 out += " ok\n"
626 else:
627 out += " " + result.upper() + "\n"
629 sys.stdout.write(out)
631 def startTest(self, test):
632 pass
634 def addSuccess(self, test):
635 self.end_test(test.id(), "success", False)
637 def addError(self, test, err=None):
638 self.end_test(test.id(), "error", True, err)
640 def addFailure(self, test, err=None):
641 self.end_test(test.id(), "failure", True, err)
643 def addSkip(self, test, reason=None):
644 self.end_test(test.id(), "skip", False, reason)
646 def addExpectedFailure(self, test, err=None):
647 self.end_test(test.id(), "xfail", False, err)
649 def addUnexpectedSuccess(self, test):
650 self.end_test(test.id(), "uxsuccess", True)
652 def end_test(self, testname, result, unexpected, err=None):
653 if not unexpected:
654 self.test_output[self.name] = ""
655 if not self.immediate:
656 sys.stdout.write({
657 'failure': 'f',
658 'xfail': 'X',
659 'skip': 's',
660 'success': '.'}.get(result, "?(%s)" % result))
661 return
663 if self.name not in self.test_output:
664 self.test_output[self.name] = ""
666 self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
667 if err is not None:
668 self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
670 if self.immediate and not self.verbose:
671 sys.stdout.write(self.test_output[self.name])
672 self.test_output[self.name] = ""
674 if not self.immediate:
675 sys.stdout.write({
676 'error': 'E',
677 'failure': 'F',
678 'uxsuccess': 'U',
679 'success': 'S'}.get(result, "?"))
681 def write_summary(self, path):
682 f = open(path, 'w+')
684 if self.suitesfailed:
685 f.write("= Failed tests =\n")
687 for suite in self.suitesfailed:
688 f.write("== %s ==\n" % suite)
689 if suite in self.test_output:
690 f.write(self.test_output[suite] + "\n\n")
692 f.write("\n")
694 if not self.immediate and not self.verbose:
695 for suite in self.suitesfailed:
696 print("=" * 78)
697 print("FAIL: %s" % suite)
698 if suite in self.test_output:
699 print(self.test_output[suite])
700 print("")
702 f.write("= Skipped tests =\n")
703 for reason in self.skips.keys():
704 f.write(reason + "\n")
705 for name in self.skips[reason]:
706 f.write("\t%s\n" % name)
707 f.write("\n")
708 f.close()
710 if (not self.suitesfailed and
711 not self.statistics['TESTS_UNEXPECTED_FAIL'] and
712 not self.statistics['TESTS_UNEXPECTED_OK'] and
713 not self.statistics['TESTS_ERROR']):
714 ok = (self.statistics['TESTS_EXPECTED_OK'] +
715 self.statistics['TESTS_EXPECTED_FAIL'])
716 print("\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok))
717 else:
718 print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
719 self.statistics['TESTS_UNEXPECTED_FAIL'],
720 self.statistics['TESTS_ERROR'],
721 self.statistics['TESTS_UNEXPECTED_OK'],
722 len(self.suitesfailed)))
724 def skip_testsuite(self, name, reason="UNKNOWN"):
725 self.skips.setdefault(reason, []).append(name)
726 if self.totalsuites:
727 self.totalsuites -= 1