nbt_server: Fix a typo ("domian->domain")
[Samba.git] / selftest / subunithelper.py
blobbcd25ffc8b528967ff8d3c1b0f729d0a0697efe0
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 __all__ = ['parse_results']
20 import datetime
21 import re
22 import sys
23 import os
24 from samba import subunit
25 from samba.subunit.run import TestProtocolClient
26 from samba.subunit import iso8601
27 import unittest
29 VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
30 'knownfail', 'error', 'xfail', 'skip-testsuite',
31 'testsuite-failure', 'testsuite-xfail',
32 'testsuite-success', 'testsuite-error',
33 'uxsuccess', 'testsuite-uxsuccess'])
35 class TestsuiteEnabledTestResult(unittest.TestResult):
37 def start_testsuite(self, name):
38 raise NotImplementedError(self.start_testsuite)
41 def parse_results(msg_ops, statistics, fh):
42 exitcode = 0
43 open_tests = {}
45 while fh:
46 l = fh.readline()
47 if l == "":
48 break
49 parts = l.split(None, 1)
50 if not len(parts) == 2 or not l.startswith(parts[0]):
51 msg_ops.output_msg(l)
52 continue
53 command = parts[0].rstrip(":")
54 arg = parts[1]
55 if command in ("test", "testing"):
56 msg_ops.control_msg(l)
57 name = arg.rstrip()
58 test = subunit.RemotedTestCase(name)
59 if name in open_tests:
60 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
61 msg_ops.startTest(test)
62 open_tests[name] = test
63 elif command == "time":
64 msg_ops.control_msg(l)
65 try:
66 dt = iso8601.parse_date(arg.rstrip("\n"))
67 except TypeError as e:
68 print "Unable to parse time line: %s" % arg.rstrip("\n")
69 else:
70 msg_ops.time(dt)
71 elif command in VALID_RESULTS:
72 msg_ops.control_msg(l)
73 result = command
74 grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
75 (testname, hasreason) = (grp.group(1), grp.group(2))
76 if hasreason:
77 reason = ""
78 # reason may be specified in next lines
79 terminated = False
80 while fh:
81 l = fh.readline()
82 if l == "":
83 break
84 msg_ops.control_msg(l)
85 if l[-2:] == "]\n":
86 reason += l[:-2]
87 terminated = True
88 break
89 else:
90 reason += l
92 remote_error = subunit.RemoteError(reason.decode("utf-8"))
94 if not terminated:
95 statistics['TESTS_ERROR']+=1
96 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
97 return 1
98 else:
99 reason = None
100 remote_error = subunit.RemoteError(u"No reason specified")
101 if result in ("success", "successful"):
102 try:
103 test = open_tests.pop(testname)
104 except KeyError:
105 statistics['TESTS_ERROR']+=1
106 exitcode = 1
107 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
108 else:
109 statistics['TESTS_EXPECTED_OK']+=1
110 msg_ops.addSuccess(test)
111 elif result in ("xfail", "knownfail"):
112 try:
113 test = open_tests.pop(testname)
114 except KeyError:
115 statistics['TESTS_ERROR']+=1
116 exitcode = 1
117 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
118 else:
119 statistics['TESTS_EXPECTED_FAIL']+=1
120 msg_ops.addExpectedFailure(test, remote_error)
121 elif result in ("uxsuccess", ):
122 try:
123 test = open_tests.pop(testname)
124 except KeyError:
125 statistics['TESTS_ERROR']+=1
126 exitcode = 1
127 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
128 else:
129 statistics['TESTS_UNEXPECTED_OK']+=1
130 msg_ops.addUnexpectedSuccess(test)
131 exitcode = 1
132 elif result in ("failure", "fail"):
133 try:
134 test = open_tests.pop(testname)
135 except KeyError:
136 statistics['TESTS_ERROR']+=1
137 exitcode = 1
138 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
139 else:
140 statistics['TESTS_UNEXPECTED_FAIL']+=1
141 exitcode = 1
142 msg_ops.addFailure(test, remote_error)
143 elif result == "skip":
144 statistics['TESTS_SKIP']+=1
145 # Allow tests to be skipped without prior announcement of test
146 try:
147 test = open_tests.pop(testname)
148 except KeyError:
149 test = subunit.RemotedTestCase(testname)
150 msg_ops.addSkip(test, reason)
151 elif result == "error":
152 statistics['TESTS_ERROR']+=1
153 exitcode = 1
154 try:
155 test = open_tests.pop(testname)
156 except KeyError:
157 test = subunit.RemotedTestCase(testname)
158 msg_ops.addError(test, remote_error)
159 elif result == "skip-testsuite":
160 msg_ops.skip_testsuite(testname)
161 elif result == "testsuite-success":
162 msg_ops.end_testsuite(testname, "success", reason)
163 elif result == "testsuite-failure":
164 msg_ops.end_testsuite(testname, "failure", reason)
165 exitcode = 1
166 elif result == "testsuite-xfail":
167 msg_ops.end_testsuite(testname, "xfail", reason)
168 elif result == "testsuite-uxsuccess":
169 msg_ops.end_testsuite(testname, "uxsuccess", reason)
170 exitcode = 1
171 elif result == "testsuite-error":
172 msg_ops.end_testsuite(testname, "error", reason)
173 exitcode = 1
174 else:
175 raise AssertionError("Recognized but unhandled result %r" %
176 result)
177 elif command == "testsuite":
178 msg_ops.start_testsuite(arg.strip())
179 elif command == "progress":
180 arg = arg.strip()
181 if arg == "pop":
182 msg_ops.progress(None, subunit.PROGRESS_POP)
183 elif arg == "push":
184 msg_ops.progress(None, subunit.PROGRESS_PUSH)
185 elif arg[0] in '+-':
186 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
187 else:
188 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
189 else:
190 msg_ops.output_msg(l)
192 while open_tests:
193 test = subunit.RemotedTestCase(open_tests.popitem()[1])
194 msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
195 statistics['TESTS_ERROR']+=1
196 exitcode = 1
198 return exitcode
201 class SubunitOps(TestProtocolClient,TestsuiteEnabledTestResult):
203 def progress(self, count, whence):
204 if whence == subunit.PROGRESS_POP:
205 self._stream.write("progress: pop\n")
206 elif whence == subunit.PROGRESS_PUSH:
207 self._stream.write("progress: push\n")
208 elif whence == subunit.PROGRESS_SET:
209 self._stream.write("progress: %d\n" % count)
210 elif whence == subunit.PROGRESS_CUR:
211 raise NotImplementedError
213 # The following are Samba extensions:
214 def start_testsuite(self, name):
215 self._stream.write("testsuite: %s\n" % name)
217 def skip_testsuite(self, name, reason=None):
218 if reason:
219 self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
220 else:
221 self._stream.write("skip-testsuite: %s\n" % name)
223 def end_testsuite(self, name, result, reason=None):
224 if reason:
225 self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
226 else:
227 self._stream.write("testsuite-%s: %s\n" % (result, name))
229 def output_msg(self, msg):
230 self._stream.write(msg)
233 def read_test_regexes(*names):
234 ret = {}
235 files = []
236 for name in names:
237 # if we are given a directory, we read all the files it contains
238 # (except the ones that end with "~").
239 if os.path.isdir(name):
240 files.extend([os.path.join(name, x)
241 for x in os.listdir(name)
242 if x[-1] != '~'])
243 else:
244 files.append(name)
246 for filename in files:
247 f = open(filename, 'r')
248 try:
249 for l in f:
250 l = l.strip()
251 if l == "" or l[0] == "#":
252 continue
253 if "#" in l:
254 (regex, reason) = l.split("#", 1)
255 ret[regex.strip()] = reason.strip()
256 else:
257 ret[l] = None
258 finally:
259 f.close()
260 return ret
263 def find_in_list(regexes, fullname):
264 for regex, reason in regexes.iteritems():
265 if re.match(regex, fullname):
266 if reason is None:
267 return ""
268 return reason
269 return None
272 class ImmediateFail(Exception):
273 """Raised to abort immediately."""
275 def __init__(self):
276 super(ImmediateFail, self).__init__("test failed and fail_immediately set")
279 class FilterOps(unittest.TestResult):
281 def control_msg(self, msg):
282 pass # We regenerate control messages, so ignore this
284 def time(self, time):
285 self._ops.time(time)
287 def progress(self, delta, whence):
288 self._ops.progress(delta, whence)
290 def output_msg(self, msg):
291 if self.output is None:
292 sys.stdout.write(msg)
293 else:
294 self.output+=msg
296 def startTest(self, test):
297 self.seen_output = True
298 test = self._add_prefix(test)
299 if self.strip_ok_output:
300 self.output = ""
302 self._ops.startTest(test)
304 def _add_prefix(self, test):
305 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
307 def addError(self, test, err=None):
308 test = self._add_prefix(test)
309 self.error_added+=1
310 self.total_error+=1
311 self._ops.addError(test, err)
312 self.output = None
313 if self.fail_immediately:
314 raise ImmediateFail()
316 def addSkip(self, test, reason=None):
317 self.seen_output = True
318 test = self._add_prefix(test)
319 self._ops.addSkip(test, reason)
320 self.output = None
322 def addExpectedFailure(self, test, err=None):
323 test = self._add_prefix(test)
324 self._ops.addExpectedFailure(test, err)
325 self.output = None
327 def addUnexpectedSuccess(self, test):
328 test = self._add_prefix(test)
329 self.uxsuccess_added+=1
330 self.total_uxsuccess+=1
331 self._ops.addUnexpectedSuccess(test)
332 if self.output:
333 self._ops.output_msg(self.output)
334 self.output = None
335 if self.fail_immediately:
336 raise ImmediateFail()
338 def addFailure(self, test, err=None):
339 test = self._add_prefix(test)
340 xfail_reason = find_in_list(self.expected_failures, test.id())
341 if xfail_reason is None:
342 xfail_reason = find_in_list(self.flapping, test.id())
343 if xfail_reason is not None:
344 self.xfail_added+=1
345 self.total_xfail+=1
346 self._ops.addExpectedFailure(test, err)
347 else:
348 self.fail_added+=1
349 self.total_fail+=1
350 self._ops.addFailure(test, err)
351 if self.output:
352 self._ops.output_msg(self.output)
353 if self.fail_immediately:
354 raise ImmediateFail()
355 self.output = None
357 def addSuccess(self, test):
358 test = self._add_prefix(test)
359 xfail_reason = find_in_list(self.expected_failures, test.id())
360 if xfail_reason is not None:
361 self.uxsuccess_added += 1
362 self.total_uxsuccess += 1
363 self._ops.addUnexpectedSuccess(test)
364 if self.output:
365 self._ops.output_msg(self.output)
366 if self.fail_immediately:
367 raise ImmediateFail()
368 else:
369 self._ops.addSuccess(test)
370 self.output = None
372 def skip_testsuite(self, name, reason=None):
373 self._ops.skip_testsuite(name, reason)
375 def start_testsuite(self, name):
376 self._ops.start_testsuite(name)
377 self.error_added = 0
378 self.fail_added = 0
379 self.xfail_added = 0
380 self.uxsuccess_added = 0
382 def end_testsuite(self, name, result, reason=None):
383 xfail = False
385 if self.xfail_added > 0:
386 xfail = True
387 if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
388 xfail = False
390 if xfail and result in ("fail", "failure"):
391 result = "xfail"
393 if self.uxsuccess_added > 0 and result != "uxsuccess":
394 result = "uxsuccess"
395 if reason is None:
396 reason = "Subunit/Filter Reason"
397 reason += "\n uxsuccess[%d]" % self.uxsuccess_added
399 if self.fail_added > 0 and result != "failure":
400 result = "failure"
401 if reason is None:
402 reason = "Subunit/Filter Reason"
403 reason += "\n failures[%d]" % self.fail_added
405 if self.error_added > 0 and result != "error":
406 result = "error"
407 if reason is None:
408 reason = "Subunit/Filter Reason"
409 reason += "\n errors[%d]" % self.error_added
411 self._ops.end_testsuite(name, result, reason)
412 if result not in ("success", "xfail"):
413 if self.output:
414 self._ops.output_msg(self.output)
415 if self.fail_immediately:
416 raise ImmediateFail()
417 self.output = None
419 def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
420 strip_ok_output=False, fail_immediately=False,
421 flapping=None):
422 self._ops = out
423 self.seen_output = False
424 self.output = None
425 self.prefix = prefix
426 self.suffix = suffix
427 if expected_failures is not None:
428 self.expected_failures = expected_failures
429 else:
430 self.expected_failures = {}
431 if flapping is not None:
432 self.flapping = flapping
433 else:
434 self.flapping = {}
435 self.strip_ok_output = strip_ok_output
436 self.xfail_added = 0
437 self.fail_added = 0
438 self.uxsuccess_added = 0
439 self.total_xfail = 0
440 self.total_error = 0
441 self.total_fail = 0
442 self.total_uxsuccess = 0
443 self.error_added = 0
444 self.fail_immediately = fail_immediately
447 class PerfFilterOps(unittest.TestResult):
449 def progress(self, delta, whence):
450 pass
452 def output_msg(self, msg):
453 pass
455 def control_msg(self, msg):
456 pass
458 def skip_testsuite(self, name, reason=None):
459 self._ops.skip_testsuite(name, reason)
461 def start_testsuite(self, name):
462 self.suite_has_time = False
464 def end_testsuite(self, name, result, reason=None):
465 pass
467 def _add_prefix(self, test):
468 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
470 def time(self, time):
471 self.latest_time = time
472 #self._ops.output_msg("found time %s\n" % time)
473 self.suite_has_time = True
475 def get_time(self):
476 if self.suite_has_time:
477 return self.latest_time
478 return datetime.datetime.utcnow()
480 def startTest(self, test):
481 self.seen_output = True
482 test = self._add_prefix(test)
483 self.starts[test.id()] = self.get_time()
485 def addSuccess(self, test):
486 test = self._add_prefix(test)
487 tid = test.id()
488 if tid not in self.starts:
489 self._ops.addError(test, "%s succeeded without ever starting!" % tid)
490 delta = self.get_time() - self.starts[tid]
491 self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
493 def addFailure(self, test, err=''):
494 tid = test.id()
495 delta = self.get_time() - self.starts[tid]
496 self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
497 (tid, delta.total_seconds(), err))
499 def addError(self, test, err=''):
500 tid = test.id()
501 delta = self.get_time() - self.starts[tid]
502 self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
503 (tid, delta.total_seconds(), err))
505 def __init__(self, out, prefix='', suffix=''):
506 self._ops = out
507 self.prefix = prefix or ''
508 self.suffix = suffix or ''
509 self.starts = {}
510 self.seen_output = False
511 self.suite_has_time = False
514 class PlainFormatter(TestsuiteEnabledTestResult):
516 def __init__(self, verbose, immediate, statistics,
517 totaltests=None):
518 super(PlainFormatter, self).__init__()
519 self.verbose = verbose
520 self.immediate = immediate
521 self.statistics = statistics
522 self.start_time = None
523 self.test_output = {}
524 self.suitesfailed = []
525 self.suites_ok = 0
526 self.skips = {}
527 self.index = 0
528 self.name = None
529 self._progress_level = 0
530 self.totalsuites = totaltests
531 self.last_time = None
533 @staticmethod
534 def _format_time(delta):
535 minutes, seconds = divmod(delta.seconds, 60)
536 hours, minutes = divmod(minutes, 60)
537 ret = ""
538 if hours:
539 ret += "%dh" % hours
540 if minutes:
541 ret += "%dm" % minutes
542 ret += "%ds" % seconds
543 return ret
545 def progress(self, offset, whence):
546 if whence == subunit.PROGRESS_POP:
547 self._progress_level -= 1
548 elif whence == subunit.PROGRESS_PUSH:
549 self._progress_level += 1
550 elif whence == subunit.PROGRESS_SET:
551 if self._progress_level == 0:
552 self.totalsuites = offset
553 elif whence == subunit.PROGRESS_CUR:
554 raise NotImplementedError
556 def time(self, dt):
557 if self.start_time is None:
558 self.start_time = dt
559 self.last_time = dt
561 def start_testsuite(self, name):
562 self.index += 1
563 self.name = name
565 if not self.verbose:
566 self.test_output[name] = ""
568 total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
569 self.statistics['TESTS_EXPECTED_FAIL'] +
570 self.statistics['TESTS_ERROR'] +
571 self.statistics['TESTS_UNEXPECTED_FAIL'] +
572 self.statistics['TESTS_UNEXPECTED_OK'])
574 out = "[%d(%d)" % (self.index, total_tests)
575 if self.totalsuites is not None:
576 out += "/%d" % self.totalsuites
577 if self.start_time is not None:
578 out += " at " + self._format_time(self.last_time - self.start_time)
579 if self.suitesfailed:
580 out += ", %d errors" % (len(self.suitesfailed),)
581 out += "] %s" % name
582 if self.immediate:
583 sys.stdout.write(out + "\n")
584 else:
585 sys.stdout.write(out + ": ")
587 def output_msg(self, output):
588 if self.verbose:
589 sys.stdout.write(output)
590 elif self.name is not None:
591 self.test_output[self.name] += output
592 else:
593 sys.stdout.write(output)
595 def control_msg(self, output):
596 pass
598 def end_testsuite(self, name, result, reason):
599 out = ""
600 unexpected = False
602 if not name in self.test_output:
603 print "no output for name[%s]" % name
605 if result in ("success", "xfail"):
606 self.suites_ok+=1
607 else:
608 self.output_msg("ERROR: Testsuite[%s]\n" % name)
609 if reason is not None:
610 self.output_msg("REASON: %s\n" % (reason,))
611 self.suitesfailed.append(name)
612 if self.immediate and not self.verbose and name in self.test_output:
613 out += self.test_output[name]
614 unexpected = True
616 if not self.immediate:
617 if not unexpected:
618 out += " ok\n"
619 else:
620 out += " " + result.upper() + "\n"
622 sys.stdout.write(out)
624 def startTest(self, test):
625 pass
627 def addSuccess(self, test):
628 self.end_test(test.id(), "success", False)
630 def addError(self, test, err=None):
631 self.end_test(test.id(), "error", True, err)
633 def addFailure(self, test, err=None):
634 self.end_test(test.id(), "failure", True, err)
636 def addSkip(self, test, reason=None):
637 self.end_test(test.id(), "skip", False, reason)
639 def addExpectedFailure(self, test, err=None):
640 self.end_test(test.id(), "xfail", False, err)
642 def addUnexpectedSuccess(self, test):
643 self.end_test(test.id(), "uxsuccess", True)
645 def end_test(self, testname, result, unexpected, err=None):
646 if not unexpected:
647 self.test_output[self.name] = ""
648 if not self.immediate:
649 sys.stdout.write({
650 'failure': 'f',
651 'xfail': 'X',
652 'skip': 's',
653 'success': '.'}.get(result, "?(%s)" % result))
654 return
656 if not self.name in self.test_output:
657 self.test_output[self.name] = ""
659 self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
660 if err is not None:
661 self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
663 if self.immediate and not self.verbose:
664 sys.stdout.write(self.test_output[self.name])
665 self.test_output[self.name] = ""
667 if not self.immediate:
668 sys.stdout.write({
669 'error': 'E',
670 'failure': 'F',
671 'uxsuccess': 'U',
672 'success': 'S'}.get(result, "?"))
674 def write_summary(self, path):
675 f = open(path, 'w+')
677 if self.suitesfailed:
678 f.write("= Failed tests =\n")
680 for suite in self.suitesfailed:
681 f.write("== %s ==\n" % suite)
682 if suite in self.test_output:
683 f.write(self.test_output[suite]+"\n\n")
685 f.write("\n")
687 if not self.immediate and not self.verbose:
688 for suite in self.suitesfailed:
689 print "=" * 78
690 print "FAIL: %s" % suite
691 if suite in self.test_output:
692 print self.test_output[suite]
693 print ""
695 f.write("= Skipped tests =\n")
696 for reason in self.skips.keys():
697 f.write(reason + "\n")
698 for name in self.skips[reason]:
699 f.write("\t%s\n" % name)
700 f.write("\n")
701 f.close()
703 if (not self.suitesfailed and
704 not self.statistics['TESTS_UNEXPECTED_FAIL'] and
705 not self.statistics['TESTS_UNEXPECTED_OK'] and
706 not self.statistics['TESTS_ERROR']):
707 ok = (self.statistics['TESTS_EXPECTED_OK'] +
708 self.statistics['TESTS_EXPECTED_FAIL'])
709 print "\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok)
710 else:
711 print "\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
712 self.statistics['TESTS_UNEXPECTED_FAIL'],
713 self.statistics['TESTS_ERROR'],
714 self.statistics['TESTS_UNEXPECTED_OK'],
715 len(self.suitesfailed))
717 def skip_testsuite(self, name, reason="UNKNOWN"):
718 self.skips.setdefault(reason, []).append(name)
719 if self.totalsuites:
720 self.totalsuites-=1