doc: rename 'contact create' to 'contact add'
[Samba.git] / selftest / subunithelper.py
blobbaab146ca60eb1b7bba2ba02e48bba89cd0b57c9
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
19 __all__ = ['parse_results']
21 import datetime
22 import re
23 import sys
24 import os
25 from samba import subunit
26 from samba.subunit.run import TestProtocolClient
27 from samba.subunit import iso8601
28 import unittest
31 VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
32 'knownfail', 'error', 'xfail', 'skip-testsuite',
33 'testsuite-failure', 'testsuite-xfail',
34 'testsuite-success', 'testsuite-error',
35 'uxsuccess', 'testsuite-uxsuccess'])
38 class TestsuiteEnabledTestResult(unittest.TestResult):
40 def start_testsuite(self, name):
41 raise NotImplementedError(self.start_testsuite)
44 def parse_results(msg_ops, statistics, fh):
45 exitcode = 0
46 open_tests = {}
48 while fh:
49 l = fh.readline()
50 if l == "":
51 break
52 parts = l.split(None, 1)
53 if not len(parts) == 2 or not l.startswith(parts[0]):
54 msg_ops.output_msg(l)
55 continue
56 command = parts[0].rstrip(":")
57 arg = parts[1]
58 if command in ("test", "testing"):
59 msg_ops.control_msg(l)
60 name = arg.rstrip()
61 test = subunit.RemotedTestCase(name)
62 if name in open_tests:
63 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
64 msg_ops.startTest(test)
65 open_tests[name] = test
66 elif command == "time":
67 msg_ops.control_msg(l)
68 try:
69 dt = iso8601.parse_date(arg.rstrip("\n"))
70 except TypeError as e:
71 print("Unable to parse time line: %s" % arg.rstrip("\n"))
72 else:
73 msg_ops.time(dt)
74 elif command in VALID_RESULTS:
75 msg_ops.control_msg(l)
76 result = command
77 grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
78 (testname, hasreason) = (grp.group(1), grp.group(2))
79 if hasreason:
80 reason = ""
81 # reason may be specified in next lines
82 terminated = False
83 while fh:
84 l = fh.readline()
85 if l == "":
86 break
87 msg_ops.control_msg(l)
88 if l[-2:] == "]\n":
89 reason += l[:-2]
90 terminated = True
91 break
92 else:
93 reason += l
95 if isinstance(reason, bytes):
96 remote_error = subunit.RemoteError(reason.decode("utf-8"))
97 else:
98 remote_error = subunit.RemoteError(reason)
100 if not terminated:
101 statistics['TESTS_ERROR'] += 1
102 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
103 return 1
104 else:
105 reason = None
106 remote_error = subunit.RemoteError(u"No reason specified")
107 if result in ("success", "successful"):
108 try:
109 test = open_tests.pop(testname)
110 except KeyError:
111 statistics['TESTS_ERROR'] += 1
112 exitcode = 1
113 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
114 else:
115 statistics['TESTS_EXPECTED_OK'] += 1
116 msg_ops.addSuccess(test)
117 elif result in ("xfail", "knownfail"):
118 try:
119 test = open_tests.pop(testname)
120 except KeyError:
121 statistics['TESTS_ERROR'] += 1
122 exitcode = 1
123 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
124 else:
125 statistics['TESTS_EXPECTED_FAIL'] += 1
126 msg_ops.addExpectedFailure(test, remote_error)
127 elif result in ("uxsuccess", ):
128 try:
129 test = open_tests.pop(testname)
130 except KeyError:
131 statistics['TESTS_ERROR'] += 1
132 exitcode = 1
133 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
134 else:
135 statistics['TESTS_UNEXPECTED_OK'] += 1
136 msg_ops.addUnexpectedSuccess(test)
137 exitcode = 1
138 elif result in ("failure", "fail"):
139 try:
140 test = open_tests.pop(testname)
141 except KeyError:
142 statistics['TESTS_ERROR'] += 1
143 exitcode = 1
144 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
145 else:
146 statistics['TESTS_UNEXPECTED_FAIL'] += 1
147 exitcode = 1
148 msg_ops.addFailure(test, remote_error)
149 elif result == "skip":
150 statistics['TESTS_SKIP'] += 1
151 # Allow tests to be skipped without prior announcement of test
152 try:
153 test = open_tests.pop(testname)
154 except KeyError:
155 test = subunit.RemotedTestCase(testname)
156 msg_ops.addSkip(test, reason)
157 elif result == "error":
158 statistics['TESTS_ERROR'] += 1
159 exitcode = 1
160 try:
161 test = open_tests.pop(testname)
162 except KeyError:
163 test = subunit.RemotedTestCase(testname)
164 msg_ops.addError(test, remote_error)
165 elif result == "skip-testsuite":
166 msg_ops.skip_testsuite(testname)
167 elif result == "testsuite-success":
168 msg_ops.end_testsuite(testname, "success", reason)
169 elif result == "testsuite-failure":
170 msg_ops.end_testsuite(testname, "failure", reason)
171 exitcode = 1
172 elif result == "testsuite-xfail":
173 msg_ops.end_testsuite(testname, "xfail", reason)
174 elif result == "testsuite-uxsuccess":
175 msg_ops.end_testsuite(testname, "uxsuccess", reason)
176 exitcode = 1
177 elif result == "testsuite-error":
178 msg_ops.end_testsuite(testname, "error", reason)
179 exitcode = 1
180 else:
181 raise AssertionError("Recognized but unhandled result %r" %
182 result)
183 elif command == "testsuite":
184 msg_ops.start_testsuite(arg.strip())
185 elif command == "progress":
186 arg = arg.strip()
187 if arg == "pop":
188 msg_ops.progress(None, subunit.PROGRESS_POP)
189 elif arg == "push":
190 msg_ops.progress(None, subunit.PROGRESS_PUSH)
191 elif arg[0] in '+-':
192 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
193 else:
194 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
195 else:
196 msg_ops.output_msg(l)
198 while open_tests:
199 test = subunit.RemotedTestCase(open_tests.popitem()[1])
200 msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
201 statistics['TESTS_ERROR'] += 1
202 exitcode = 1
204 return exitcode
207 class SubunitOps(TestProtocolClient, TestsuiteEnabledTestResult):
209 def progress(self, count, whence):
210 if whence == subunit.PROGRESS_POP:
211 self._stream.write("progress: pop\n")
212 elif whence == subunit.PROGRESS_PUSH:
213 self._stream.write("progress: push\n")
214 elif whence == subunit.PROGRESS_SET:
215 self._stream.write("progress: %d\n" % count)
216 elif whence == subunit.PROGRESS_CUR:
217 raise NotImplementedError
219 # The following are Samba extensions:
220 def start_testsuite(self, name):
221 self._stream.write("testsuite: %s\n" % name)
223 def skip_testsuite(self, name, reason=None):
224 if reason:
225 self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
226 else:
227 self._stream.write("skip-testsuite: %s\n" % name)
229 def end_testsuite(self, name, result, reason=None):
230 if reason:
231 self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
232 else:
233 self._stream.write("testsuite-%s: %s\n" % (result, name))
235 def output_msg(self, msg):
236 self._stream.write(msg)
239 def read_test_regexes(*names):
240 ret = {}
241 files = []
242 for name in names:
243 # if we are given a directory, we read all the files it contains
244 # (except the ones that end with "~").
245 if os.path.isdir(name):
246 files.extend([os.path.join(name, x)
247 for x in os.listdir(name)
248 if x[-1] != '~'])
249 else:
250 files.append(name)
252 for filename in files:
253 f = open(filename, 'r')
254 try:
255 for l in f:
256 l = l.strip()
257 if l == "" or l[0] == "#":
258 continue
259 if "#" in l:
260 (regex, reason) = l.split("#", 1)
261 ret[regex.strip()] = reason.strip()
262 else:
263 ret[l] = None
264 finally:
265 f.close()
266 return ret
269 def find_in_list(regexes, fullname):
270 for regex, reason in regexes.items():
271 if re.match(regex, fullname):
272 if reason is None:
273 return ""
274 return reason
275 return None
278 class ImmediateFail(Exception):
279 """Raised to abort immediately."""
281 def __init__(self):
282 super(ImmediateFail, self).__init__("test failed and fail_immediately set")
285 class FilterOps(unittest.TestResult):
287 def control_msg(self, msg):
288 pass # We regenerate control messages, so ignore this
290 def time(self, time):
291 self._ops.time(time)
293 def progress(self, delta, whence):
294 self._ops.progress(delta, whence)
296 def output_msg(self, msg):
297 if self.output is None:
298 sys.stdout.write(msg)
299 else:
300 self.output += msg
302 def startTest(self, test):
303 self.seen_output = True
304 test = self._add_prefix(test)
305 if self.strip_ok_output:
306 self.output = ""
308 self._ops.startTest(test)
310 def _add_prefix(self, test):
311 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
313 def addError(self, test, err=None):
314 test = self._add_prefix(test)
315 self.error_added += 1
316 self.total_error += 1
317 self._ops.addError(test, err)
318 self.output = None
319 if self.fail_immediately:
320 raise ImmediateFail()
322 def addSkip(self, test, reason=None):
323 self.seen_output = True
324 test = self._add_prefix(test)
325 self._ops.addSkip(test, reason)
326 self.output = None
328 def addExpectedFailure(self, test, err=None):
329 test = self._add_prefix(test)
330 self._ops.addExpectedFailure(test, err)
331 self.output = None
333 def addUnexpectedSuccess(self, test):
334 test = self._add_prefix(test)
335 self.uxsuccess_added += 1
336 self.total_uxsuccess += 1
337 self._ops.addUnexpectedSuccess(test)
338 if self.output:
339 self._ops.output_msg(self.output)
340 self.output = None
341 if self.fail_immediately:
342 raise ImmediateFail()
344 def addFailure(self, test, err=None):
345 test = self._add_prefix(test)
346 xfail_reason = find_in_list(self.expected_failures, test.id())
347 if xfail_reason is None:
348 xfail_reason = find_in_list(self.flapping, test.id())
349 if xfail_reason is not None:
350 self.xfail_added += 1
351 self.total_xfail += 1
352 self._ops.addExpectedFailure(test, err)
353 else:
354 self.fail_added += 1
355 self.total_fail += 1
356 self._ops.addFailure(test, err)
357 if self.output:
358 self._ops.output_msg(self.output)
359 if self.fail_immediately:
360 raise ImmediateFail()
361 self.output = None
363 def addSuccess(self, test):
364 test = self._add_prefix(test)
365 xfail_reason = find_in_list(self.expected_failures, test.id())
366 if xfail_reason is not None:
367 self.uxsuccess_added += 1
368 self.total_uxsuccess += 1
369 self._ops.addUnexpectedSuccess(test)
370 if self.output:
371 self._ops.output_msg(self.output)
372 if self.fail_immediately:
373 raise ImmediateFail()
374 else:
375 self._ops.addSuccess(test)
376 self.output = None
378 def skip_testsuite(self, name, reason=None):
379 self._ops.skip_testsuite(name, reason)
381 def start_testsuite(self, name):
382 self._ops.start_testsuite(name)
383 self.error_added = 0
384 self.fail_added = 0
385 self.xfail_added = 0
386 self.uxsuccess_added = 0
388 def end_testsuite(self, name, result, reason=None):
389 xfail = False
391 if self.xfail_added > 0:
392 xfail = True
393 if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
394 xfail = False
396 if xfail and result in ("fail", "failure"):
397 result = "xfail"
399 if self.uxsuccess_added > 0 and result != "uxsuccess":
400 result = "uxsuccess"
401 if reason is None:
402 reason = "Subunit/Filter Reason"
403 reason += "\n uxsuccess[%d]" % self.uxsuccess_added
405 if self.fail_added > 0 and result != "failure":
406 result = "failure"
407 if reason is None:
408 reason = "Subunit/Filter Reason"
409 reason += "\n failures[%d]" % self.fail_added
411 if self.error_added > 0 and result != "error":
412 result = "error"
413 if reason is None:
414 reason = "Subunit/Filter Reason"
415 reason += "\n errors[%d]" % self.error_added
417 self._ops.end_testsuite(name, result, reason)
418 if result not in ("success", "xfail"):
419 if self.output:
420 self._ops.output_msg(self.output)
421 if self.fail_immediately:
422 raise ImmediateFail()
423 self.output = None
425 def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
426 strip_ok_output=False, fail_immediately=False,
427 flapping=None):
428 self._ops = out
429 self.seen_output = False
430 self.output = None
431 self.prefix = prefix
432 self.suffix = suffix
433 if expected_failures is not None:
434 self.expected_failures = expected_failures
435 else:
436 self.expected_failures = {}
437 if flapping is not None:
438 self.flapping = flapping
439 else:
440 self.flapping = {}
441 self.strip_ok_output = strip_ok_output
442 self.xfail_added = 0
443 self.fail_added = 0
444 self.uxsuccess_added = 0
445 self.total_xfail = 0
446 self.total_error = 0
447 self.total_fail = 0
448 self.total_uxsuccess = 0
449 self.error_added = 0
450 self.fail_immediately = fail_immediately
453 class PerfFilterOps(unittest.TestResult):
455 def progress(self, delta, whence):
456 pass
458 def output_msg(self, msg):
459 pass
461 def control_msg(self, msg):
462 pass
464 def skip_testsuite(self, name, reason=None):
465 self._ops.skip_testsuite(name, reason)
467 def start_testsuite(self, name):
468 self.suite_has_time = False
470 def end_testsuite(self, name, result, reason=None):
471 pass
473 def _add_prefix(self, test):
474 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
476 def time(self, time):
477 self.latest_time = time
478 #self._ops.output_msg("found time %s\n" % time)
479 self.suite_has_time = True
481 def get_time(self):
482 if self.suite_has_time:
483 return self.latest_time
484 return datetime.datetime.utcnow()
486 def startTest(self, test):
487 self.seen_output = True
488 test = self._add_prefix(test)
489 self.starts[test.id()] = self.get_time()
491 def addSuccess(self, test):
492 test = self._add_prefix(test)
493 tid = test.id()
494 if tid not in self.starts:
495 self._ops.addError(test, "%s succeeded without ever starting!" % tid)
496 delta = self.get_time() - self.starts[tid]
497 self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
499 def addFailure(self, test, err=''):
500 tid = test.id()
501 delta = self.get_time() - self.starts[tid]
502 self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
503 (tid, delta.total_seconds(), err))
505 def addError(self, test, err=''):
506 tid = test.id()
507 delta = self.get_time() - self.starts[tid]
508 self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
509 (tid, delta.total_seconds(), err))
511 def __init__(self, out, prefix='', suffix=''):
512 self._ops = out
513 self.prefix = prefix or ''
514 self.suffix = suffix or ''
515 self.starts = {}
516 self.seen_output = False
517 self.suite_has_time = False
520 class PlainFormatter(TestsuiteEnabledTestResult):
522 def __init__(self, verbose, immediate, statistics,
523 totaltests=None):
524 super(PlainFormatter, self).__init__()
525 self.verbose = verbose
526 self.immediate = immediate
527 self.statistics = statistics
528 self.start_time = None
529 self.test_output = {}
530 self.suitesfailed = []
531 self.suites_ok = 0
532 self.skips = {}
533 self.index = 0
534 self.name = None
535 self._progress_level = 0
536 self.totalsuites = totaltests
537 self.last_time = None
539 @staticmethod
540 def _format_time(delta):
541 minutes, seconds = divmod(delta.seconds, 60)
542 hours, minutes = divmod(minutes, 60)
543 ret = ""
544 if hours:
545 ret += "%dh" % hours
546 if minutes:
547 ret += "%dm" % minutes
548 ret += "%ds" % seconds
549 return ret
551 def progress(self, offset, whence):
552 if whence == subunit.PROGRESS_POP:
553 self._progress_level -= 1
554 elif whence == subunit.PROGRESS_PUSH:
555 self._progress_level += 1
556 elif whence == subunit.PROGRESS_SET:
557 if self._progress_level == 0:
558 self.totalsuites = offset
559 elif whence == subunit.PROGRESS_CUR:
560 raise NotImplementedError
562 def time(self, dt):
563 if self.start_time is None:
564 self.start_time = dt
565 self.last_time = dt
567 def start_testsuite(self, name):
568 self.index += 1
569 self.name = name
571 if not self.verbose:
572 self.test_output[name] = ""
574 total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
575 self.statistics['TESTS_EXPECTED_FAIL'] +
576 self.statistics['TESTS_ERROR'] +
577 self.statistics['TESTS_UNEXPECTED_FAIL'] +
578 self.statistics['TESTS_UNEXPECTED_OK'])
580 out = "[%d(%d)" % (self.index, total_tests)
581 if self.totalsuites is not None:
582 out += "/%d" % self.totalsuites
583 if self.start_time is not None:
584 out += " at " + self._format_time(self.last_time - self.start_time)
585 if self.suitesfailed:
586 out += ", %d errors" % (len(self.suitesfailed),)
587 out += "] %s" % name
588 if self.immediate:
589 sys.stdout.write(out + "\n")
590 else:
591 sys.stdout.write(out + ": ")
593 def output_msg(self, output):
594 if self.verbose:
595 sys.stdout.write(output)
596 elif self.name is not None:
597 self.test_output[self.name] += output
598 else:
599 sys.stdout.write(output)
601 def control_msg(self, output):
602 pass
604 def end_testsuite(self, name, result, reason):
605 out = ""
606 unexpected = False
608 if name not in self.test_output:
609 print("no output for name[%s]" % name)
611 if result in ("success", "xfail"):
612 self.suites_ok += 1
613 else:
614 self.output_msg("ERROR: Testsuite[%s]\n" % name)
615 if reason is not None:
616 self.output_msg("REASON: %s\n" % (reason,))
617 self.suitesfailed.append(name)
618 if self.immediate and not self.verbose and name in self.test_output:
619 out += self.test_output[name]
620 unexpected = True
622 if not self.immediate:
623 if not unexpected:
624 out += " ok\n"
625 else:
626 out += " " + result.upper() + "\n"
628 sys.stdout.write(out)
630 def startTest(self, test):
631 pass
633 def addSuccess(self, test):
634 self.end_test(test.id(), "success", False)
636 def addError(self, test, err=None):
637 self.end_test(test.id(), "error", True, err)
639 def addFailure(self, test, err=None):
640 self.end_test(test.id(), "failure", True, err)
642 def addSkip(self, test, reason=None):
643 self.end_test(test.id(), "skip", False, reason)
645 def addExpectedFailure(self, test, err=None):
646 self.end_test(test.id(), "xfail", False, err)
648 def addUnexpectedSuccess(self, test):
649 self.end_test(test.id(), "uxsuccess", True)
651 def end_test(self, testname, result, unexpected, err=None):
652 if not unexpected:
653 self.test_output[self.name] = ""
654 if not self.immediate:
655 sys.stdout.write({
656 'failure': 'f',
657 'xfail': 'X',
658 'skip': 's',
659 'success': '.'}.get(result, "?(%s)" % result))
660 return
662 if self.name not in self.test_output:
663 self.test_output[self.name] = ""
665 self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
666 if err is not None:
667 self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
669 if self.immediate and not self.verbose:
670 sys.stdout.write(self.test_output[self.name])
671 self.test_output[self.name] = ""
673 if not self.immediate:
674 sys.stdout.write({
675 'error': 'E',
676 'failure': 'F',
677 'uxsuccess': 'U',
678 'success': 'S'}.get(result, "?"))
680 def write_summary(self, path):
681 f = open(path, 'w+')
683 if self.suitesfailed:
684 f.write("= Failed tests =\n")
686 for suite in self.suitesfailed:
687 f.write("== %s ==\n" % suite)
688 if suite in self.test_output:
689 f.write(self.test_output[suite] + "\n\n")
691 f.write("\n")
693 if not self.immediate and not self.verbose:
694 for suite in self.suitesfailed:
695 print("=" * 78)
696 print("FAIL: %s" % suite)
697 if suite in self.test_output:
698 print(self.test_output[suite])
699 print("")
701 f.write("= Skipped tests =\n")
702 for reason in self.skips.keys():
703 f.write(reason + "\n")
704 for name in self.skips[reason]:
705 f.write("\t%s\n" % name)
706 f.write("\n")
707 f.close()
709 if (not self.suitesfailed and
710 not self.statistics['TESTS_UNEXPECTED_FAIL'] and
711 not self.statistics['TESTS_UNEXPECTED_OK'] and
712 not self.statistics['TESTS_ERROR']):
713 ok = (self.statistics['TESTS_EXPECTED_OK'] +
714 self.statistics['TESTS_EXPECTED_FAIL'])
715 print("\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok))
716 else:
717 print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
718 self.statistics['TESTS_UNEXPECTED_FAIL'],
719 self.statistics['TESTS_ERROR'],
720 self.statistics['TESTS_UNEXPECTED_OK'],
721 len(self.suitesfailed)))
723 def skip_testsuite(self, name, reason="UNKNOWN"):
724 self.skips.setdefault(reason, []).append(name)
725 if self.totalsuites:
726 self.totalsuites -= 1