tdb: Add new function tdb_transaction_active()
[Samba.git] / selftest / subunithelper.py
blobfab7d6f0b41706322fbab1311cec2bcb657df6ba
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 __all__ = ['parse_results']
20 import datetime
21 import re
22 import sys
23 import os
24 from samba import subunit
25 from samba.subunit.run import TestProtocolClient
26 from samba.subunit import iso8601
27 import unittest
29 VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
30 'knownfail', 'error', 'xfail', 'skip-testsuite',
31 'testsuite-failure', 'testsuite-xfail',
32 'testsuite-success', 'testsuite-error',
33 'uxsuccess', 'testsuite-uxsuccess'])
35 class TestsuiteEnabledTestResult(unittest.TestResult):
37 def start_testsuite(self, name):
38 raise NotImplementedError(self.start_testsuite)
41 def parse_results(msg_ops, statistics, fh):
42 exitcode = 0
43 open_tests = {}
45 while fh:
46 l = fh.readline()
47 if l == "":
48 break
49 parts = l.split(None, 1)
50 if not len(parts) == 2 or not l.startswith(parts[0]):
51 msg_ops.output_msg(l)
52 continue
53 command = parts[0].rstrip(":")
54 arg = parts[1]
55 if command in ("test", "testing"):
56 msg_ops.control_msg(l)
57 name = arg.rstrip()
58 test = subunit.RemotedTestCase(name)
59 if name in open_tests:
60 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
61 msg_ops.startTest(test)
62 open_tests[name] = test
63 elif command == "time":
64 msg_ops.control_msg(l)
65 try:
66 dt = iso8601.parse_date(arg.rstrip("\n"))
67 except TypeError, e:
68 print "Unable to parse time line: %s" % arg.rstrip("\n")
69 else:
70 msg_ops.time(dt)
71 elif command in VALID_RESULTS:
72 msg_ops.control_msg(l)
73 result = command
74 grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
75 (testname, hasreason) = (grp.group(1), grp.group(2))
76 if hasreason:
77 reason = ""
78 # reason may be specified in next lines
79 terminated = False
80 while fh:
81 l = fh.readline()
82 if l == "":
83 break
84 msg_ops.control_msg(l)
85 if l == "]\n":
86 terminated = True
87 break
88 else:
89 reason += l
91 remote_error = subunit.RemoteError(reason.decode("utf-8"))
93 if not terminated:
94 statistics['TESTS_ERROR']+=1
95 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
96 return 1
97 else:
98 reason = None
99 remote_error = subunit.RemoteError(u"No reason specified")
100 if result in ("success", "successful"):
101 try:
102 test = open_tests.pop(testname)
103 except KeyError:
104 statistics['TESTS_ERROR']+=1
105 exitcode = 1
106 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
107 else:
108 statistics['TESTS_EXPECTED_OK']+=1
109 msg_ops.addSuccess(test)
110 elif result in ("xfail", "knownfail"):
111 try:
112 test = open_tests.pop(testname)
113 except KeyError:
114 statistics['TESTS_ERROR']+=1
115 exitcode = 1
116 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
117 else:
118 statistics['TESTS_EXPECTED_FAIL']+=1
119 msg_ops.addExpectedFailure(test, remote_error)
120 elif result in ("uxsuccess", ):
121 try:
122 test = open_tests.pop(testname)
123 except KeyError:
124 statistics['TESTS_ERROR']+=1
125 exitcode = 1
126 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
127 else:
128 statistics['TESTS_UNEXPECTED_OK']+=1
129 msg_ops.addUnexpectedSuccess(test)
130 exitcode = 1
131 elif result in ("failure", "fail"):
132 try:
133 test = open_tests.pop(testname)
134 except KeyError:
135 statistics['TESTS_ERROR']+=1
136 exitcode = 1
137 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
138 else:
139 statistics['TESTS_UNEXPECTED_FAIL']+=1
140 exitcode = 1
141 msg_ops.addFailure(test, remote_error)
142 elif result == "skip":
143 statistics['TESTS_SKIP']+=1
144 # Allow tests to be skipped without prior announcement of test
145 try:
146 test = open_tests.pop(testname)
147 except KeyError:
148 test = subunit.RemotedTestCase(testname)
149 msg_ops.addSkip(test, reason)
150 elif result == "error":
151 statistics['TESTS_ERROR']+=1
152 exitcode = 1
153 try:
154 test = open_tests.pop(testname)
155 except KeyError:
156 test = subunit.RemotedTestCase(testname)
157 msg_ops.addError(test, remote_error)
158 elif result == "skip-testsuite":
159 msg_ops.skip_testsuite(testname)
160 elif result == "testsuite-success":
161 msg_ops.end_testsuite(testname, "success", reason)
162 elif result == "testsuite-failure":
163 msg_ops.end_testsuite(testname, "failure", reason)
164 exitcode = 1
165 elif result == "testsuite-xfail":
166 msg_ops.end_testsuite(testname, "xfail", reason)
167 elif result == "testsuite-uxsuccess":
168 msg_ops.end_testsuite(testname, "uxsuccess", reason)
169 exitcode = 1
170 elif result == "testsuite-error":
171 msg_ops.end_testsuite(testname, "error", reason)
172 exitcode = 1
173 else:
174 raise AssertionError("Recognized but unhandled result %r" %
175 result)
176 elif command == "testsuite":
177 msg_ops.start_testsuite(arg.strip())
178 elif command == "progress":
179 arg = arg.strip()
180 if arg == "pop":
181 msg_ops.progress(None, subunit.PROGRESS_POP)
182 elif arg == "push":
183 msg_ops.progress(None, subunit.PROGRESS_PUSH)
184 elif arg[0] in '+-':
185 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
186 else:
187 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
188 else:
189 msg_ops.output_msg(l)
191 while open_tests:
192 test = subunit.RemotedTestCase(open_tests.popitem()[1])
193 msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
194 statistics['TESTS_ERROR']+=1
195 exitcode = 1
197 return exitcode
200 class SubunitOps(TestProtocolClient,TestsuiteEnabledTestResult):
202 def progress(self, count, whence):
203 if whence == subunit.PROGRESS_POP:
204 self._stream.write("progress: pop\n")
205 elif whence == subunit.PROGRESS_PUSH:
206 self._stream.write("progress: push\n")
207 elif whence == subunit.PROGRESS_SET:
208 self._stream.write("progress: %d\n" % count)
209 elif whence == subunit.PROGRESS_CUR:
210 raise NotImplementedError
212 # The following are Samba extensions:
213 def start_testsuite(self, name):
214 self._stream.write("testsuite: %s\n" % name)
216 def skip_testsuite(self, name, reason=None):
217 if reason:
218 self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
219 else:
220 self._stream.write("skip-testsuite: %s\n" % name)
222 def end_testsuite(self, name, result, reason=None):
223 if reason:
224 self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
225 else:
226 self._stream.write("testsuite-%s: %s\n" % (result, name))
228 def output_msg(self, msg):
229 self._stream.write(msg)
232 def read_test_regexes(*names):
233 ret = {}
234 files = []
235 for name in names:
236 # if we are given a directory, we read all the files it contains
237 # (except the ones that end with "~").
238 if os.path.isdir(name):
239 files.extend([os.path.join(name, x)
240 for x in os.listdir(name)
241 if x[-1] != '~'])
242 else:
243 files.append(name)
245 for filename in files:
246 f = open(filename, 'r')
247 try:
248 for l in f:
249 l = l.strip()
250 if l == "" or l[0] == "#":
251 continue
252 if "#" in l:
253 (regex, reason) = l.split("#", 1)
254 ret[regex.strip()] = reason.strip()
255 else:
256 ret[l] = None
257 finally:
258 f.close()
259 return ret
262 def find_in_list(regexes, fullname):
263 for regex, reason in regexes.iteritems():
264 if re.match(regex, fullname):
265 if reason is None:
266 return ""
267 return reason
268 return None
271 class ImmediateFail(Exception):
272 """Raised to abort immediately."""
274 def __init__(self):
275 super(ImmediateFail, self).__init__("test failed and fail_immediately set")
278 class FilterOps(unittest.TestResult):
280 def control_msg(self, msg):
281 pass # We regenerate control messages, so ignore this
283 def time(self, time):
284 self._ops.time(time)
286 def progress(self, delta, whence):
287 self._ops.progress(delta, whence)
289 def output_msg(self, msg):
290 if self.output is None:
291 sys.stdout.write(msg)
292 else:
293 self.output+=msg
295 def startTest(self, test):
296 self.seen_output = True
297 test = self._add_prefix(test)
298 if self.strip_ok_output:
299 self.output = ""
301 self._ops.startTest(test)
303 def _add_prefix(self, test):
304 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
306 def addError(self, test, err=None):
307 test = self._add_prefix(test)
308 self.error_added+=1
309 self.total_error+=1
310 self._ops.addError(test, err)
311 self.output = None
312 if self.fail_immediately:
313 raise ImmediateFail()
315 def addSkip(self, test, reason=None):
316 self.seen_output = True
317 test = self._add_prefix(test)
318 self._ops.addSkip(test, reason)
319 self.output = None
321 def addExpectedFailure(self, test, err=None):
322 test = self._add_prefix(test)
323 self._ops.addExpectedFailure(test, err)
324 self.output = None
326 def addUnexpectedSuccess(self, test):
327 test = self._add_prefix(test)
328 self.uxsuccess_added+=1
329 self.total_uxsuccess+=1
330 self._ops.addUnexpectedSuccess(test)
331 if self.output:
332 self._ops.output_msg(self.output)
333 self.output = None
334 if self.fail_immediately:
335 raise ImmediateFail()
337 def addFailure(self, test, err=None):
338 test = self._add_prefix(test)
339 xfail_reason = find_in_list(self.expected_failures, test.id())
340 if xfail_reason is None:
341 xfail_reason = find_in_list(self.flapping, test.id())
342 if xfail_reason is not None:
343 self.xfail_added+=1
344 self.total_xfail+=1
345 self._ops.addExpectedFailure(test, err)
346 else:
347 self.fail_added+=1
348 self.total_fail+=1
349 self._ops.addFailure(test, err)
350 if self.output:
351 self._ops.output_msg(self.output)
352 if self.fail_immediately:
353 raise ImmediateFail()
354 self.output = None
356 def addSuccess(self, test):
357 test = self._add_prefix(test)
358 xfail_reason = find_in_list(self.expected_failures, test.id())
359 if xfail_reason is not None:
360 self.uxsuccess_added += 1
361 self.total_uxsuccess += 1
362 self._ops.addUnexpectedSuccess(test)
363 if self.output:
364 self._ops.output_msg(self.output)
365 if self.fail_immediately:
366 raise ImmediateFail()
367 else:
368 self._ops.addSuccess(test)
369 self.output = None
371 def skip_testsuite(self, name, reason=None):
372 self._ops.skip_testsuite(name, reason)
374 def start_testsuite(self, name):
375 self._ops.start_testsuite(name)
376 self.error_added = 0
377 self.fail_added = 0
378 self.xfail_added = 0
379 self.uxsuccess_added = 0
381 def end_testsuite(self, name, result, reason=None):
382 xfail = False
384 if self.xfail_added > 0:
385 xfail = True
386 if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
387 xfail = False
389 if xfail and result in ("fail", "failure"):
390 result = "xfail"
392 if self.uxsuccess_added > 0 and result != "uxsuccess":
393 result = "uxsuccess"
394 if reason is None:
395 reason = "Subunit/Filter Reason"
396 reason += "\n uxsuccess[%d]" % self.uxsuccess_added
398 if self.fail_added > 0 and result != "failure":
399 result = "failure"
400 if reason is None:
401 reason = "Subunit/Filter Reason"
402 reason += "\n failures[%d]" % self.fail_added
404 if self.error_added > 0 and result != "error":
405 result = "error"
406 if reason is None:
407 reason = "Subunit/Filter Reason"
408 reason += "\n errors[%d]" % self.error_added
410 self._ops.end_testsuite(name, result, reason)
411 if result not in ("success", "xfail"):
412 if self.output:
413 self._ops.output_msg(self.output)
414 if self.fail_immediately:
415 raise ImmediateFail()
416 self.output = None
418 def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
419 strip_ok_output=False, fail_immediately=False,
420 flapping=None):
421 self._ops = out
422 self.seen_output = False
423 self.output = None
424 self.prefix = prefix
425 self.suffix = suffix
426 if expected_failures is not None:
427 self.expected_failures = expected_failures
428 else:
429 self.expected_failures = {}
430 if flapping is not None:
431 self.flapping = flapping
432 else:
433 self.flapping = {}
434 self.strip_ok_output = strip_ok_output
435 self.xfail_added = 0
436 self.fail_added = 0
437 self.uxsuccess_added = 0
438 self.total_xfail = 0
439 self.total_error = 0
440 self.total_fail = 0
441 self.total_uxsuccess = 0
442 self.error_added = 0
443 self.fail_immediately = fail_immediately
446 class PerfFilterOps(unittest.TestResult):
448 def progress(self, delta, whence):
449 pass
451 def output_msg(self, msg):
452 pass
454 def control_msg(self, msg):
455 pass
457 def skip_testsuite(self, name, reason=None):
458 self._ops.skip_testsuite(name, reason)
460 def start_testsuite(self, name):
461 self.suite_has_time = False
463 def end_testsuite(self, name, result, reason=None):
464 pass
466 def _add_prefix(self, test):
467 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
469 def time(self, time):
470 self.latest_time = time
471 #self._ops.output_msg("found time %s\n" % time)
472 self.suite_has_time = True
474 def get_time(self):
475 if self.suite_has_time:
476 return self.latest_time
477 return datetime.datetime.utcnow()
479 def startTest(self, test):
480 self.seen_output = True
481 test = self._add_prefix(test)
482 self.starts[test.id()] = self.get_time()
484 def addSuccess(self, test):
485 test = self._add_prefix(test)
486 tid = test.id()
487 if tid not in self.starts:
488 self._ops.addError(test, "%s succeeded without ever starting!" % tid)
489 delta = self.get_time() - self.starts[tid]
490 self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
492 def addFailure(self, test, err=''):
493 tid = test.id()
494 delta = self.get_time() - self.starts[tid]
495 self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
496 (tid, delta.total_seconds(), err))
498 def addError(self, test, err=''):
499 tid = test.id()
500 delta = self.get_time() - self.starts[tid]
501 self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
502 (tid, delta.total_seconds(), err))
504 def __init__(self, out, prefix='', suffix=''):
505 self._ops = out
506 self.prefix = prefix or ''
507 self.suffix = suffix or ''
508 self.starts = {}
509 self.seen_output = False
510 self.suite_has_time = False
513 class PlainFormatter(TestsuiteEnabledTestResult):
515 def __init__(self, verbose, immediate, statistics,
516 totaltests=None):
517 super(PlainFormatter, self).__init__()
518 self.verbose = verbose
519 self.immediate = immediate
520 self.statistics = statistics
521 self.start_time = None
522 self.test_output = {}
523 self.suitesfailed = []
524 self.suites_ok = 0
525 self.skips = {}
526 self.index = 0
527 self.name = None
528 self._progress_level = 0
529 self.totalsuites = totaltests
530 self.last_time = None
532 @staticmethod
533 def _format_time(delta):
534 minutes, seconds = divmod(delta.seconds, 60)
535 hours, minutes = divmod(minutes, 60)
536 ret = ""
537 if hours:
538 ret += "%dh" % hours
539 if minutes:
540 ret += "%dm" % minutes
541 ret += "%ds" % seconds
542 return ret
544 def progress(self, offset, whence):
545 if whence == subunit.PROGRESS_POP:
546 self._progress_level -= 1
547 elif whence == subunit.PROGRESS_PUSH:
548 self._progress_level += 1
549 elif whence == subunit.PROGRESS_SET:
550 if self._progress_level == 0:
551 self.totalsuites = offset
552 elif whence == subunit.PROGRESS_CUR:
553 raise NotImplementedError
555 def time(self, dt):
556 if self.start_time is None:
557 self.start_time = dt
558 self.last_time = dt
560 def start_testsuite(self, name):
561 self.index += 1
562 self.name = name
564 if not self.verbose:
565 self.test_output[name] = ""
567 total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
568 self.statistics['TESTS_EXPECTED_FAIL'] +
569 self.statistics['TESTS_ERROR'] +
570 self.statistics['TESTS_UNEXPECTED_FAIL'] +
571 self.statistics['TESTS_UNEXPECTED_OK'])
573 out = "[%d(%d)" % (self.index, total_tests)
574 if self.totalsuites is not None:
575 out += "/%d" % self.totalsuites
576 if self.start_time is not None:
577 out += " at " + self._format_time(self.last_time - self.start_time)
578 if self.suitesfailed:
579 out += ", %d errors" % (len(self.suitesfailed),)
580 out += "] %s" % name
581 if self.immediate:
582 sys.stdout.write(out + "\n")
583 else:
584 sys.stdout.write(out + ": ")
586 def output_msg(self, output):
587 if self.verbose:
588 sys.stdout.write(output)
589 elif self.name is not None:
590 self.test_output[self.name] += output
591 else:
592 sys.stdout.write(output)
594 def control_msg(self, output):
595 pass
597 def end_testsuite(self, name, result, reason):
598 out = ""
599 unexpected = False
601 if not name in self.test_output:
602 print "no output for name[%s]" % name
604 if result in ("success", "xfail"):
605 self.suites_ok+=1
606 else:
607 self.output_msg("ERROR: Testsuite[%s]\n" % name)
608 if reason is not None:
609 self.output_msg("REASON: %s\n" % (reason,))
610 self.suitesfailed.append(name)
611 if self.immediate and not self.verbose and name in self.test_output:
612 out += self.test_output[name]
613 unexpected = True
615 if not self.immediate:
616 if not unexpected:
617 out += " ok\n"
618 else:
619 out += " " + result.upper() + "\n"
621 sys.stdout.write(out)
623 def startTest(self, test):
624 pass
626 def addSuccess(self, test):
627 self.end_test(test.id(), "success", False)
629 def addError(self, test, err=None):
630 self.end_test(test.id(), "error", True, err)
632 def addFailure(self, test, err=None):
633 self.end_test(test.id(), "failure", True, err)
635 def addSkip(self, test, reason=None):
636 self.end_test(test.id(), "skip", False, reason)
638 def addExpectedFailure(self, test, err=None):
639 self.end_test(test.id(), "xfail", False, err)
641 def addUnexpectedSuccess(self, test):
642 self.end_test(test.id(), "uxsuccess", True)
644 def end_test(self, testname, result, unexpected, err=None):
645 if not unexpected:
646 self.test_output[self.name] = ""
647 if not self.immediate:
648 sys.stdout.write({
649 'failure': 'f',
650 'xfail': 'X',
651 'skip': 's',
652 'success': '.'}.get(result, "?(%s)" % result))
653 return
655 if not self.name in self.test_output:
656 self.test_output[self.name] = ""
658 self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
659 if err is not None:
660 self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
662 if self.immediate and not self.verbose:
663 sys.stdout.write(self.test_output[self.name])
664 self.test_output[self.name] = ""
666 if not self.immediate:
667 sys.stdout.write({
668 'error': 'E',
669 'failure': 'F',
670 'uxsuccess': 'U',
671 'success': 'S'}.get(result, "?"))
673 def write_summary(self, path):
674 f = open(path, 'w+')
676 if self.suitesfailed:
677 f.write("= Failed tests =\n")
679 for suite in self.suitesfailed:
680 f.write("== %s ==\n" % suite)
681 if suite in self.test_output:
682 f.write(self.test_output[suite]+"\n\n")
684 f.write("\n")
686 if not self.immediate and not self.verbose:
687 for suite in self.suitesfailed:
688 print "=" * 78
689 print "FAIL: %s" % suite
690 if suite in self.test_output:
691 print self.test_output[suite]
692 print ""
694 f.write("= Skipped tests =\n")
695 for reason in self.skips.keys():
696 f.write(reason + "\n")
697 for name in self.skips[reason]:
698 f.write("\t%s\n" % name)
699 f.write("\n")
700 f.close()
702 if (not self.suitesfailed and
703 not self.statistics['TESTS_UNEXPECTED_FAIL'] and
704 not self.statistics['TESTS_UNEXPECTED_OK'] and
705 not self.statistics['TESTS_ERROR']):
706 ok = (self.statistics['TESTS_EXPECTED_OK'] +
707 self.statistics['TESTS_EXPECTED_FAIL'])
708 print "\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok)
709 else:
710 print "\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
711 self.statistics['TESTS_UNEXPECTED_FAIL'],
712 self.statistics['TESTS_ERROR'],
713 self.statistics['TESTS_UNEXPECTED_OK'],
714 len(self.suitesfailed))
716 def skip_testsuite(self, name, reason="UNKNOWN"):
717 self.skips.setdefault(reason, []).append(name)
718 if self.totalsuites:
719 self.totalsuites-=1