torture: Test tevent_req_profile
[Samba.git] / selftest / subunithelper.py
blobf20f3fe8ba735216171190444527223feea8a359
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
19 __all__ = ['parse_results']
21 import datetime
22 import re
23 import sys
24 import os
25 from samba import subunit
26 from samba.subunit.run import TestProtocolClient
27 from samba.subunit import iso8601
28 import unittest
30 VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
31 'knownfail', 'error', 'xfail', 'skip-testsuite',
32 'testsuite-failure', 'testsuite-xfail',
33 'testsuite-success', 'testsuite-error',
34 'uxsuccess', 'testsuite-uxsuccess'])
36 class TestsuiteEnabledTestResult(unittest.TestResult):
38 def start_testsuite(self, name):
39 raise NotImplementedError(self.start_testsuite)
42 def parse_results(msg_ops, statistics, fh):
43 exitcode = 0
44 open_tests = {}
46 while fh:
47 l = fh.readline()
48 if l == "":
49 break
50 parts = l.split(None, 1)
51 if not len(parts) == 2 or not l.startswith(parts[0]):
52 msg_ops.output_msg(l)
53 continue
54 command = parts[0].rstrip(":")
55 arg = parts[1]
56 if command in ("test", "testing"):
57 msg_ops.control_msg(l)
58 name = arg.rstrip()
59 test = subunit.RemotedTestCase(name)
60 if name in open_tests:
61 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
62 msg_ops.startTest(test)
63 open_tests[name] = test
64 elif command == "time":
65 msg_ops.control_msg(l)
66 try:
67 dt = iso8601.parse_date(arg.rstrip("\n"))
68 except TypeError as e:
69 print("Unable to parse time line: %s" % arg.rstrip("\n"))
70 else:
71 msg_ops.time(dt)
72 elif command in VALID_RESULTS:
73 msg_ops.control_msg(l)
74 result = command
75 grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
76 (testname, hasreason) = (grp.group(1), grp.group(2))
77 if hasreason:
78 reason = ""
79 # reason may be specified in next lines
80 terminated = False
81 while fh:
82 l = fh.readline()
83 if l == "":
84 break
85 msg_ops.control_msg(l)
86 if l[-2:] == "]\n":
87 reason += l[:-2]
88 terminated = True
89 break
90 else:
91 reason += l
93 remote_error = subunit.RemoteError(reason.decode("utf-8"))
95 if not terminated:
96 statistics['TESTS_ERROR']+=1
97 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
98 return 1
99 else:
100 reason = None
101 remote_error = subunit.RemoteError(u"No reason specified")
102 if result in ("success", "successful"):
103 try:
104 test = open_tests.pop(testname)
105 except KeyError:
106 statistics['TESTS_ERROR']+=1
107 exitcode = 1
108 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
109 else:
110 statistics['TESTS_EXPECTED_OK']+=1
111 msg_ops.addSuccess(test)
112 elif result in ("xfail", "knownfail"):
113 try:
114 test = open_tests.pop(testname)
115 except KeyError:
116 statistics['TESTS_ERROR']+=1
117 exitcode = 1
118 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
119 else:
120 statistics['TESTS_EXPECTED_FAIL']+=1
121 msg_ops.addExpectedFailure(test, remote_error)
122 elif result in ("uxsuccess", ):
123 try:
124 test = open_tests.pop(testname)
125 except KeyError:
126 statistics['TESTS_ERROR']+=1
127 exitcode = 1
128 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
129 else:
130 statistics['TESTS_UNEXPECTED_OK']+=1
131 msg_ops.addUnexpectedSuccess(test)
132 exitcode = 1
133 elif result in ("failure", "fail"):
134 try:
135 test = open_tests.pop(testname)
136 except KeyError:
137 statistics['TESTS_ERROR']+=1
138 exitcode = 1
139 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
140 else:
141 statistics['TESTS_UNEXPECTED_FAIL']+=1
142 exitcode = 1
143 msg_ops.addFailure(test, remote_error)
144 elif result == "skip":
145 statistics['TESTS_SKIP']+=1
146 # Allow tests to be skipped without prior announcement of test
147 try:
148 test = open_tests.pop(testname)
149 except KeyError:
150 test = subunit.RemotedTestCase(testname)
151 msg_ops.addSkip(test, reason)
152 elif result == "error":
153 statistics['TESTS_ERROR']+=1
154 exitcode = 1
155 try:
156 test = open_tests.pop(testname)
157 except KeyError:
158 test = subunit.RemotedTestCase(testname)
159 msg_ops.addError(test, remote_error)
160 elif result == "skip-testsuite":
161 msg_ops.skip_testsuite(testname)
162 elif result == "testsuite-success":
163 msg_ops.end_testsuite(testname, "success", reason)
164 elif result == "testsuite-failure":
165 msg_ops.end_testsuite(testname, "failure", reason)
166 exitcode = 1
167 elif result == "testsuite-xfail":
168 msg_ops.end_testsuite(testname, "xfail", reason)
169 elif result == "testsuite-uxsuccess":
170 msg_ops.end_testsuite(testname, "uxsuccess", reason)
171 exitcode = 1
172 elif result == "testsuite-error":
173 msg_ops.end_testsuite(testname, "error", reason)
174 exitcode = 1
175 else:
176 raise AssertionError("Recognized but unhandled result %r" %
177 result)
178 elif command == "testsuite":
179 msg_ops.start_testsuite(arg.strip())
180 elif command == "progress":
181 arg = arg.strip()
182 if arg == "pop":
183 msg_ops.progress(None, subunit.PROGRESS_POP)
184 elif arg == "push":
185 msg_ops.progress(None, subunit.PROGRESS_PUSH)
186 elif arg[0] in '+-':
187 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
188 else:
189 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
190 else:
191 msg_ops.output_msg(l)
193 while open_tests:
194 test = subunit.RemotedTestCase(open_tests.popitem()[1])
195 msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
196 statistics['TESTS_ERROR']+=1
197 exitcode = 1
199 return exitcode
202 class SubunitOps(TestProtocolClient,TestsuiteEnabledTestResult):
204 def progress(self, count, whence):
205 if whence == subunit.PROGRESS_POP:
206 self._stream.write("progress: pop\n")
207 elif whence == subunit.PROGRESS_PUSH:
208 self._stream.write("progress: push\n")
209 elif whence == subunit.PROGRESS_SET:
210 self._stream.write("progress: %d\n" % count)
211 elif whence == subunit.PROGRESS_CUR:
212 raise NotImplementedError
214 # The following are Samba extensions:
215 def start_testsuite(self, name):
216 self._stream.write("testsuite: %s\n" % name)
218 def skip_testsuite(self, name, reason=None):
219 if reason:
220 self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
221 else:
222 self._stream.write("skip-testsuite: %s\n" % name)
224 def end_testsuite(self, name, result, reason=None):
225 if reason:
226 self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
227 else:
228 self._stream.write("testsuite-%s: %s\n" % (result, name))
230 def output_msg(self, msg):
231 self._stream.write(msg)
234 def read_test_regexes(*names):
235 ret = {}
236 files = []
237 for name in names:
238 # if we are given a directory, we read all the files it contains
239 # (except the ones that end with "~").
240 if os.path.isdir(name):
241 files.extend([os.path.join(name, x)
242 for x in os.listdir(name)
243 if x[-1] != '~'])
244 else:
245 files.append(name)
247 for filename in files:
248 f = open(filename, 'r')
249 try:
250 for l in f:
251 l = l.strip()
252 if l == "" or l[0] == "#":
253 continue
254 if "#" in l:
255 (regex, reason) = l.split("#", 1)
256 ret[regex.strip()] = reason.strip()
257 else:
258 ret[l] = None
259 finally:
260 f.close()
261 return ret
264 def find_in_list(regexes, fullname):
265 for regex, reason in regexes.items():
266 if re.match(regex, fullname):
267 if reason is None:
268 return ""
269 return reason
270 return None
273 class ImmediateFail(Exception):
274 """Raised to abort immediately."""
276 def __init__(self):
277 super(ImmediateFail, self).__init__("test failed and fail_immediately set")
280 class FilterOps(unittest.TestResult):
282 def control_msg(self, msg):
283 pass # We regenerate control messages, so ignore this
285 def time(self, time):
286 self._ops.time(time)
288 def progress(self, delta, whence):
289 self._ops.progress(delta, whence)
291 def output_msg(self, msg):
292 if self.output is None:
293 sys.stdout.write(msg)
294 else:
295 self.output+=msg
297 def startTest(self, test):
298 self.seen_output = True
299 test = self._add_prefix(test)
300 if self.strip_ok_output:
301 self.output = ""
303 self._ops.startTest(test)
305 def _add_prefix(self, test):
306 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
308 def addError(self, test, err=None):
309 test = self._add_prefix(test)
310 self.error_added+=1
311 self.total_error+=1
312 self._ops.addError(test, err)
313 self.output = None
314 if self.fail_immediately:
315 raise ImmediateFail()
317 def addSkip(self, test, reason=None):
318 self.seen_output = True
319 test = self._add_prefix(test)
320 self._ops.addSkip(test, reason)
321 self.output = None
323 def addExpectedFailure(self, test, err=None):
324 test = self._add_prefix(test)
325 self._ops.addExpectedFailure(test, err)
326 self.output = None
328 def addUnexpectedSuccess(self, test):
329 test = self._add_prefix(test)
330 self.uxsuccess_added+=1
331 self.total_uxsuccess+=1
332 self._ops.addUnexpectedSuccess(test)
333 if self.output:
334 self._ops.output_msg(self.output)
335 self.output = None
336 if self.fail_immediately:
337 raise ImmediateFail()
339 def addFailure(self, test, err=None):
340 test = self._add_prefix(test)
341 xfail_reason = find_in_list(self.expected_failures, test.id())
342 if xfail_reason is None:
343 xfail_reason = find_in_list(self.flapping, test.id())
344 if xfail_reason is not None:
345 self.xfail_added+=1
346 self.total_xfail+=1
347 self._ops.addExpectedFailure(test, err)
348 else:
349 self.fail_added+=1
350 self.total_fail+=1
351 self._ops.addFailure(test, err)
352 if self.output:
353 self._ops.output_msg(self.output)
354 if self.fail_immediately:
355 raise ImmediateFail()
356 self.output = None
358 def addSuccess(self, test):
359 test = self._add_prefix(test)
360 xfail_reason = find_in_list(self.expected_failures, test.id())
361 if xfail_reason is not None:
362 self.uxsuccess_added += 1
363 self.total_uxsuccess += 1
364 self._ops.addUnexpectedSuccess(test)
365 if self.output:
366 self._ops.output_msg(self.output)
367 if self.fail_immediately:
368 raise ImmediateFail()
369 else:
370 self._ops.addSuccess(test)
371 self.output = None
373 def skip_testsuite(self, name, reason=None):
374 self._ops.skip_testsuite(name, reason)
376 def start_testsuite(self, name):
377 self._ops.start_testsuite(name)
378 self.error_added = 0
379 self.fail_added = 0
380 self.xfail_added = 0
381 self.uxsuccess_added = 0
383 def end_testsuite(self, name, result, reason=None):
384 xfail = False
386 if self.xfail_added > 0:
387 xfail = True
388 if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
389 xfail = False
391 if xfail and result in ("fail", "failure"):
392 result = "xfail"
394 if self.uxsuccess_added > 0 and result != "uxsuccess":
395 result = "uxsuccess"
396 if reason is None:
397 reason = "Subunit/Filter Reason"
398 reason += "\n uxsuccess[%d]" % self.uxsuccess_added
400 if self.fail_added > 0 and result != "failure":
401 result = "failure"
402 if reason is None:
403 reason = "Subunit/Filter Reason"
404 reason += "\n failures[%d]" % self.fail_added
406 if self.error_added > 0 and result != "error":
407 result = "error"
408 if reason is None:
409 reason = "Subunit/Filter Reason"
410 reason += "\n errors[%d]" % self.error_added
412 self._ops.end_testsuite(name, result, reason)
413 if result not in ("success", "xfail"):
414 if self.output:
415 self._ops.output_msg(self.output)
416 if self.fail_immediately:
417 raise ImmediateFail()
418 self.output = None
420 def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
421 strip_ok_output=False, fail_immediately=False,
422 flapping=None):
423 self._ops = out
424 self.seen_output = False
425 self.output = None
426 self.prefix = prefix
427 self.suffix = suffix
428 if expected_failures is not None:
429 self.expected_failures = expected_failures
430 else:
431 self.expected_failures = {}
432 if flapping is not None:
433 self.flapping = flapping
434 else:
435 self.flapping = {}
436 self.strip_ok_output = strip_ok_output
437 self.xfail_added = 0
438 self.fail_added = 0
439 self.uxsuccess_added = 0
440 self.total_xfail = 0
441 self.total_error = 0
442 self.total_fail = 0
443 self.total_uxsuccess = 0
444 self.error_added = 0
445 self.fail_immediately = fail_immediately
448 class PerfFilterOps(unittest.TestResult):
450 def progress(self, delta, whence):
451 pass
453 def output_msg(self, msg):
454 pass
456 def control_msg(self, msg):
457 pass
459 def skip_testsuite(self, name, reason=None):
460 self._ops.skip_testsuite(name, reason)
462 def start_testsuite(self, name):
463 self.suite_has_time = False
465 def end_testsuite(self, name, result, reason=None):
466 pass
468 def _add_prefix(self, test):
469 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
471 def time(self, time):
472 self.latest_time = time
473 #self._ops.output_msg("found time %s\n" % time)
474 self.suite_has_time = True
476 def get_time(self):
477 if self.suite_has_time:
478 return self.latest_time
479 return datetime.datetime.utcnow()
481 def startTest(self, test):
482 self.seen_output = True
483 test = self._add_prefix(test)
484 self.starts[test.id()] = self.get_time()
486 def addSuccess(self, test):
487 test = self._add_prefix(test)
488 tid = test.id()
489 if tid not in self.starts:
490 self._ops.addError(test, "%s succeeded without ever starting!" % tid)
491 delta = self.get_time() - self.starts[tid]
492 self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
494 def addFailure(self, test, err=''):
495 tid = test.id()
496 delta = self.get_time() - self.starts[tid]
497 self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
498 (tid, delta.total_seconds(), err))
500 def addError(self, test, err=''):
501 tid = test.id()
502 delta = self.get_time() - self.starts[tid]
503 self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
504 (tid, delta.total_seconds(), err))
506 def __init__(self, out, prefix='', suffix=''):
507 self._ops = out
508 self.prefix = prefix or ''
509 self.suffix = suffix or ''
510 self.starts = {}
511 self.seen_output = False
512 self.suite_has_time = False
515 class PlainFormatter(TestsuiteEnabledTestResult):
517 def __init__(self, verbose, immediate, statistics,
518 totaltests=None):
519 super(PlainFormatter, self).__init__()
520 self.verbose = verbose
521 self.immediate = immediate
522 self.statistics = statistics
523 self.start_time = None
524 self.test_output = {}
525 self.suitesfailed = []
526 self.suites_ok = 0
527 self.skips = {}
528 self.index = 0
529 self.name = None
530 self._progress_level = 0
531 self.totalsuites = totaltests
532 self.last_time = None
534 @staticmethod
535 def _format_time(delta):
536 minutes, seconds = divmod(delta.seconds, 60)
537 hours, minutes = divmod(minutes, 60)
538 ret = ""
539 if hours:
540 ret += "%dh" % hours
541 if minutes:
542 ret += "%dm" % minutes
543 ret += "%ds" % seconds
544 return ret
546 def progress(self, offset, whence):
547 if whence == subunit.PROGRESS_POP:
548 self._progress_level -= 1
549 elif whence == subunit.PROGRESS_PUSH:
550 self._progress_level += 1
551 elif whence == subunit.PROGRESS_SET:
552 if self._progress_level == 0:
553 self.totalsuites = offset
554 elif whence == subunit.PROGRESS_CUR:
555 raise NotImplementedError
557 def time(self, dt):
558 if self.start_time is None:
559 self.start_time = dt
560 self.last_time = dt
562 def start_testsuite(self, name):
563 self.index += 1
564 self.name = name
566 if not self.verbose:
567 self.test_output[name] = ""
569 total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
570 self.statistics['TESTS_EXPECTED_FAIL'] +
571 self.statistics['TESTS_ERROR'] +
572 self.statistics['TESTS_UNEXPECTED_FAIL'] +
573 self.statistics['TESTS_UNEXPECTED_OK'])
575 out = "[%d(%d)" % (self.index, total_tests)
576 if self.totalsuites is not None:
577 out += "/%d" % self.totalsuites
578 if self.start_time is not None:
579 out += " at " + self._format_time(self.last_time - self.start_time)
580 if self.suitesfailed:
581 out += ", %d errors" % (len(self.suitesfailed),)
582 out += "] %s" % name
583 if self.immediate:
584 sys.stdout.write(out + "\n")
585 else:
586 sys.stdout.write(out + ": ")
588 def output_msg(self, output):
589 if self.verbose:
590 sys.stdout.write(output)
591 elif self.name is not None:
592 self.test_output[self.name] += output
593 else:
594 sys.stdout.write(output)
596 def control_msg(self, output):
597 pass
599 def end_testsuite(self, name, result, reason):
600 out = ""
601 unexpected = False
603 if not name in self.test_output:
604 print("no output for name[%s]" % name)
606 if result in ("success", "xfail"):
607 self.suites_ok+=1
608 else:
609 self.output_msg("ERROR: Testsuite[%s]\n" % name)
610 if reason is not None:
611 self.output_msg("REASON: %s\n" % (reason,))
612 self.suitesfailed.append(name)
613 if self.immediate and not self.verbose and name in self.test_output:
614 out += self.test_output[name]
615 unexpected = True
617 if not self.immediate:
618 if not unexpected:
619 out += " ok\n"
620 else:
621 out += " " + result.upper() + "\n"
623 sys.stdout.write(out)
625 def startTest(self, test):
626 pass
628 def addSuccess(self, test):
629 self.end_test(test.id(), "success", False)
631 def addError(self, test, err=None):
632 self.end_test(test.id(), "error", True, err)
634 def addFailure(self, test, err=None):
635 self.end_test(test.id(), "failure", True, err)
637 def addSkip(self, test, reason=None):
638 self.end_test(test.id(), "skip", False, reason)
640 def addExpectedFailure(self, test, err=None):
641 self.end_test(test.id(), "xfail", False, err)
643 def addUnexpectedSuccess(self, test):
644 self.end_test(test.id(), "uxsuccess", True)
646 def end_test(self, testname, result, unexpected, err=None):
647 if not unexpected:
648 self.test_output[self.name] = ""
649 if not self.immediate:
650 sys.stdout.write({
651 'failure': 'f',
652 'xfail': 'X',
653 'skip': 's',
654 'success': '.'}.get(result, "?(%s)" % result))
655 return
657 if not self.name in self.test_output:
658 self.test_output[self.name] = ""
660 self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
661 if err is not None:
662 self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
664 if self.immediate and not self.verbose:
665 sys.stdout.write(self.test_output[self.name])
666 self.test_output[self.name] = ""
668 if not self.immediate:
669 sys.stdout.write({
670 'error': 'E',
671 'failure': 'F',
672 'uxsuccess': 'U',
673 'success': 'S'}.get(result, "?"))
675 def write_summary(self, path):
676 f = open(path, 'w+')
678 if self.suitesfailed:
679 f.write("= Failed tests =\n")
681 for suite in self.suitesfailed:
682 f.write("== %s ==\n" % suite)
683 if suite in self.test_output:
684 f.write(self.test_output[suite]+"\n\n")
686 f.write("\n")
688 if not self.immediate and not self.verbose:
689 for suite in self.suitesfailed:
690 print("=" * 78)
691 print("FAIL: %s" % suite)
692 if suite in self.test_output:
693 print(self.test_output[suite])
694 print("")
696 f.write("= Skipped tests =\n")
697 for reason in self.skips.keys():
698 f.write(reason + "\n")
699 for name in self.skips[reason]:
700 f.write("\t%s\n" % name)
701 f.write("\n")
702 f.close()
704 if (not self.suitesfailed and
705 not self.statistics['TESTS_UNEXPECTED_FAIL'] and
706 not self.statistics['TESTS_UNEXPECTED_OK'] and
707 not self.statistics['TESTS_ERROR']):
708 ok = (self.statistics['TESTS_EXPECTED_OK'] +
709 self.statistics['TESTS_EXPECTED_FAIL'])
710 print("\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok))
711 else:
712 print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
713 self.statistics['TESTS_UNEXPECTED_FAIL'],
714 self.statistics['TESTS_ERROR'],
715 self.statistics['TESTS_UNEXPECTED_OK'],
716 len(self.suitesfailed)))
718 def skip_testsuite(self, name, reason="UNKNOWN"):
719 self.skips.setdefault(reason, []).append(name)
720 if self.totalsuites:
721 self.totalsuites-=1