gpo: Move policy application to the gp_ext
[Samba.git] / selftest / subunithelper.py
blobd79bd7f2ba5a2c0a98d53974320cc20ffa652aae
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
19 __all__ = ['parse_results']
21 import datetime
22 import re
23 import sys
24 import os
25 from samba import subunit
26 from samba.subunit.run import TestProtocolClient
27 from samba.subunit import iso8601
28 import unittest
30 VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
31 'knownfail', 'error', 'xfail', 'skip-testsuite',
32 'testsuite-failure', 'testsuite-xfail',
33 'testsuite-success', 'testsuite-error',
34 'uxsuccess', 'testsuite-uxsuccess'])
37 class TestsuiteEnabledTestResult(unittest.TestResult):
39 def start_testsuite(self, name):
40 raise NotImplementedError(self.start_testsuite)
43 def parse_results(msg_ops, statistics, fh):
44 exitcode = 0
45 open_tests = {}
47 while fh:
48 l = fh.readline()
49 if l == "":
50 break
51 parts = l.split(None, 1)
52 if not len(parts) == 2 or not l.startswith(parts[0]):
53 msg_ops.output_msg(l)
54 continue
55 command = parts[0].rstrip(":")
56 arg = parts[1]
57 if command in ("test", "testing"):
58 msg_ops.control_msg(l)
59 name = arg.rstrip()
60 test = subunit.RemotedTestCase(name)
61 if name in open_tests:
62 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
63 msg_ops.startTest(test)
64 open_tests[name] = test
65 elif command == "time":
66 msg_ops.control_msg(l)
67 try:
68 dt = iso8601.parse_date(arg.rstrip("\n"))
69 except TypeError as e:
70 print("Unable to parse time line: %s" % arg.rstrip("\n"))
71 else:
72 msg_ops.time(dt)
73 elif command in VALID_RESULTS:
74 msg_ops.control_msg(l)
75 result = command
76 grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
77 (testname, hasreason) = (grp.group(1), grp.group(2))
78 if hasreason:
79 reason = ""
80 # reason may be specified in next lines
81 terminated = False
82 while fh:
83 l = fh.readline()
84 if l == "":
85 break
86 msg_ops.control_msg(l)
87 if l[-2:] == "]\n":
88 reason += l[:-2]
89 terminated = True
90 break
91 else:
92 reason += l
94 remote_error = subunit.RemoteError(reason.decode("utf-8"))
96 if not terminated:
97 statistics['TESTS_ERROR'] += 1
98 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
99 return 1
100 else:
101 reason = None
102 remote_error = subunit.RemoteError(u"No reason specified")
103 if result in ("success", "successful"):
104 try:
105 test = open_tests.pop(testname)
106 except KeyError:
107 statistics['TESTS_ERROR'] += 1
108 exitcode = 1
109 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
110 else:
111 statistics['TESTS_EXPECTED_OK'] += 1
112 msg_ops.addSuccess(test)
113 elif result in ("xfail", "knownfail"):
114 try:
115 test = open_tests.pop(testname)
116 except KeyError:
117 statistics['TESTS_ERROR'] += 1
118 exitcode = 1
119 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
120 else:
121 statistics['TESTS_EXPECTED_FAIL'] += 1
122 msg_ops.addExpectedFailure(test, remote_error)
123 elif result in ("uxsuccess", ):
124 try:
125 test = open_tests.pop(testname)
126 except KeyError:
127 statistics['TESTS_ERROR'] += 1
128 exitcode = 1
129 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
130 else:
131 statistics['TESTS_UNEXPECTED_OK'] += 1
132 msg_ops.addUnexpectedSuccess(test)
133 exitcode = 1
134 elif result in ("failure", "fail"):
135 try:
136 test = open_tests.pop(testname)
137 except KeyError:
138 statistics['TESTS_ERROR'] += 1
139 exitcode = 1
140 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
141 else:
142 statistics['TESTS_UNEXPECTED_FAIL'] += 1
143 exitcode = 1
144 msg_ops.addFailure(test, remote_error)
145 elif result == "skip":
146 statistics['TESTS_SKIP'] += 1
147 # Allow tests to be skipped without prior announcement of test
148 try:
149 test = open_tests.pop(testname)
150 except KeyError:
151 test = subunit.RemotedTestCase(testname)
152 msg_ops.addSkip(test, reason)
153 elif result == "error":
154 statistics['TESTS_ERROR'] += 1
155 exitcode = 1
156 try:
157 test = open_tests.pop(testname)
158 except KeyError:
159 test = subunit.RemotedTestCase(testname)
160 msg_ops.addError(test, remote_error)
161 elif result == "skip-testsuite":
162 msg_ops.skip_testsuite(testname)
163 elif result == "testsuite-success":
164 msg_ops.end_testsuite(testname, "success", reason)
165 elif result == "testsuite-failure":
166 msg_ops.end_testsuite(testname, "failure", reason)
167 exitcode = 1
168 elif result == "testsuite-xfail":
169 msg_ops.end_testsuite(testname, "xfail", reason)
170 elif result == "testsuite-uxsuccess":
171 msg_ops.end_testsuite(testname, "uxsuccess", reason)
172 exitcode = 1
173 elif result == "testsuite-error":
174 msg_ops.end_testsuite(testname, "error", reason)
175 exitcode = 1
176 else:
177 raise AssertionError("Recognized but unhandled result %r" %
178 result)
179 elif command == "testsuite":
180 msg_ops.start_testsuite(arg.strip())
181 elif command == "progress":
182 arg = arg.strip()
183 if arg == "pop":
184 msg_ops.progress(None, subunit.PROGRESS_POP)
185 elif arg == "push":
186 msg_ops.progress(None, subunit.PROGRESS_PUSH)
187 elif arg[0] in '+-':
188 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
189 else:
190 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
191 else:
192 msg_ops.output_msg(l)
194 while open_tests:
195 test = subunit.RemotedTestCase(open_tests.popitem()[1])
196 msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
197 statistics['TESTS_ERROR'] += 1
198 exitcode = 1
200 return exitcode
203 class SubunitOps(TestProtocolClient, TestsuiteEnabledTestResult):
205 def progress(self, count, whence):
206 if whence == subunit.PROGRESS_POP:
207 self._stream.write("progress: pop\n")
208 elif whence == subunit.PROGRESS_PUSH:
209 self._stream.write("progress: push\n")
210 elif whence == subunit.PROGRESS_SET:
211 self._stream.write("progress: %d\n" % count)
212 elif whence == subunit.PROGRESS_CUR:
213 raise NotImplementedError
215 # The following are Samba extensions:
216 def start_testsuite(self, name):
217 self._stream.write("testsuite: %s\n" % name)
219 def skip_testsuite(self, name, reason=None):
220 if reason:
221 self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
222 else:
223 self._stream.write("skip-testsuite: %s\n" % name)
225 def end_testsuite(self, name, result, reason=None):
226 if reason:
227 self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
228 else:
229 self._stream.write("testsuite-%s: %s\n" % (result, name))
231 def output_msg(self, msg):
232 self._stream.write(msg)
235 def read_test_regexes(*names):
236 ret = {}
237 files = []
238 for name in names:
239 # if we are given a directory, we read all the files it contains
240 # (except the ones that end with "~").
241 if os.path.isdir(name):
242 files.extend([os.path.join(name, x)
243 for x in os.listdir(name)
244 if x[-1] != '~'])
245 else:
246 files.append(name)
248 for filename in files:
249 f = open(filename, 'r')
250 try:
251 for l in f:
252 l = l.strip()
253 if l == "" or l[0] == "#":
254 continue
255 if "#" in l:
256 (regex, reason) = l.split("#", 1)
257 ret[regex.strip()] = reason.strip()
258 else:
259 ret[l] = None
260 finally:
261 f.close()
262 return ret
265 def find_in_list(regexes, fullname):
266 for regex, reason in regexes.items():
267 if re.match(regex, fullname):
268 if reason is None:
269 return ""
270 return reason
271 return None
274 class ImmediateFail(Exception):
275 """Raised to abort immediately."""
277 def __init__(self):
278 super(ImmediateFail, self).__init__("test failed and fail_immediately set")
281 class FilterOps(unittest.TestResult):
283 def control_msg(self, msg):
284 pass # We regenerate control messages, so ignore this
286 def time(self, time):
287 self._ops.time(time)
289 def progress(self, delta, whence):
290 self._ops.progress(delta, whence)
292 def output_msg(self, msg):
293 if self.output is None:
294 sys.stdout.write(msg)
295 else:
296 self.output += msg
298 def startTest(self, test):
299 self.seen_output = True
300 test = self._add_prefix(test)
301 if self.strip_ok_output:
302 self.output = ""
304 self._ops.startTest(test)
306 def _add_prefix(self, test):
307 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
309 def addError(self, test, err=None):
310 test = self._add_prefix(test)
311 self.error_added += 1
312 self.total_error += 1
313 self._ops.addError(test, err)
314 self.output = None
315 if self.fail_immediately:
316 raise ImmediateFail()
318 def addSkip(self, test, reason=None):
319 self.seen_output = True
320 test = self._add_prefix(test)
321 self._ops.addSkip(test, reason)
322 self.output = None
324 def addExpectedFailure(self, test, err=None):
325 test = self._add_prefix(test)
326 self._ops.addExpectedFailure(test, err)
327 self.output = None
329 def addUnexpectedSuccess(self, test):
330 test = self._add_prefix(test)
331 self.uxsuccess_added += 1
332 self.total_uxsuccess += 1
333 self._ops.addUnexpectedSuccess(test)
334 if self.output:
335 self._ops.output_msg(self.output)
336 self.output = None
337 if self.fail_immediately:
338 raise ImmediateFail()
340 def addFailure(self, test, err=None):
341 test = self._add_prefix(test)
342 xfail_reason = find_in_list(self.expected_failures, test.id())
343 if xfail_reason is None:
344 xfail_reason = find_in_list(self.flapping, test.id())
345 if xfail_reason is not None:
346 self.xfail_added += 1
347 self.total_xfail += 1
348 self._ops.addExpectedFailure(test, err)
349 else:
350 self.fail_added += 1
351 self.total_fail += 1
352 self._ops.addFailure(test, err)
353 if self.output:
354 self._ops.output_msg(self.output)
355 if self.fail_immediately:
356 raise ImmediateFail()
357 self.output = None
359 def addSuccess(self, test):
360 test = self._add_prefix(test)
361 xfail_reason = find_in_list(self.expected_failures, test.id())
362 if xfail_reason is not None:
363 self.uxsuccess_added += 1
364 self.total_uxsuccess += 1
365 self._ops.addUnexpectedSuccess(test)
366 if self.output:
367 self._ops.output_msg(self.output)
368 if self.fail_immediately:
369 raise ImmediateFail()
370 else:
371 self._ops.addSuccess(test)
372 self.output = None
374 def skip_testsuite(self, name, reason=None):
375 self._ops.skip_testsuite(name, reason)
377 def start_testsuite(self, name):
378 self._ops.start_testsuite(name)
379 self.error_added = 0
380 self.fail_added = 0
381 self.xfail_added = 0
382 self.uxsuccess_added = 0
384 def end_testsuite(self, name, result, reason=None):
385 xfail = False
387 if self.xfail_added > 0:
388 xfail = True
389 if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
390 xfail = False
392 if xfail and result in ("fail", "failure"):
393 result = "xfail"
395 if self.uxsuccess_added > 0 and result != "uxsuccess":
396 result = "uxsuccess"
397 if reason is None:
398 reason = "Subunit/Filter Reason"
399 reason += "\n uxsuccess[%d]" % self.uxsuccess_added
401 if self.fail_added > 0 and result != "failure":
402 result = "failure"
403 if reason is None:
404 reason = "Subunit/Filter Reason"
405 reason += "\n failures[%d]" % self.fail_added
407 if self.error_added > 0 and result != "error":
408 result = "error"
409 if reason is None:
410 reason = "Subunit/Filter Reason"
411 reason += "\n errors[%d]" % self.error_added
413 self._ops.end_testsuite(name, result, reason)
414 if result not in ("success", "xfail"):
415 if self.output:
416 self._ops.output_msg(self.output)
417 if self.fail_immediately:
418 raise ImmediateFail()
419 self.output = None
421 def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
422 strip_ok_output=False, fail_immediately=False,
423 flapping=None):
424 self._ops = out
425 self.seen_output = False
426 self.output = None
427 self.prefix = prefix
428 self.suffix = suffix
429 if expected_failures is not None:
430 self.expected_failures = expected_failures
431 else:
432 self.expected_failures = {}
433 if flapping is not None:
434 self.flapping = flapping
435 else:
436 self.flapping = {}
437 self.strip_ok_output = strip_ok_output
438 self.xfail_added = 0
439 self.fail_added = 0
440 self.uxsuccess_added = 0
441 self.total_xfail = 0
442 self.total_error = 0
443 self.total_fail = 0
444 self.total_uxsuccess = 0
445 self.error_added = 0
446 self.fail_immediately = fail_immediately
449 class PerfFilterOps(unittest.TestResult):
451 def progress(self, delta, whence):
452 pass
454 def output_msg(self, msg):
455 pass
457 def control_msg(self, msg):
458 pass
460 def skip_testsuite(self, name, reason=None):
461 self._ops.skip_testsuite(name, reason)
463 def start_testsuite(self, name):
464 self.suite_has_time = False
466 def end_testsuite(self, name, result, reason=None):
467 pass
469 def _add_prefix(self, test):
470 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
472 def time(self, time):
473 self.latest_time = time
474 #self._ops.output_msg("found time %s\n" % time)
475 self.suite_has_time = True
477 def get_time(self):
478 if self.suite_has_time:
479 return self.latest_time
480 return datetime.datetime.utcnow()
482 def startTest(self, test):
483 self.seen_output = True
484 test = self._add_prefix(test)
485 self.starts[test.id()] = self.get_time()
487 def addSuccess(self, test):
488 test = self._add_prefix(test)
489 tid = test.id()
490 if tid not in self.starts:
491 self._ops.addError(test, "%s succeeded without ever starting!" % tid)
492 delta = self.get_time() - self.starts[tid]
493 self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
495 def addFailure(self, test, err=''):
496 tid = test.id()
497 delta = self.get_time() - self.starts[tid]
498 self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
499 (tid, delta.total_seconds(), err))
501 def addError(self, test, err=''):
502 tid = test.id()
503 delta = self.get_time() - self.starts[tid]
504 self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
505 (tid, delta.total_seconds(), err))
507 def __init__(self, out, prefix='', suffix=''):
508 self._ops = out
509 self.prefix = prefix or ''
510 self.suffix = suffix or ''
511 self.starts = {}
512 self.seen_output = False
513 self.suite_has_time = False
516 class PlainFormatter(TestsuiteEnabledTestResult):
518 def __init__(self, verbose, immediate, statistics,
519 totaltests=None):
520 super(PlainFormatter, self).__init__()
521 self.verbose = verbose
522 self.immediate = immediate
523 self.statistics = statistics
524 self.start_time = None
525 self.test_output = {}
526 self.suitesfailed = []
527 self.suites_ok = 0
528 self.skips = {}
529 self.index = 0
530 self.name = None
531 self._progress_level = 0
532 self.totalsuites = totaltests
533 self.last_time = None
535 @staticmethod
536 def _format_time(delta):
537 minutes, seconds = divmod(delta.seconds, 60)
538 hours, minutes = divmod(minutes, 60)
539 ret = ""
540 if hours:
541 ret += "%dh" % hours
542 if minutes:
543 ret += "%dm" % minutes
544 ret += "%ds" % seconds
545 return ret
547 def progress(self, offset, whence):
548 if whence == subunit.PROGRESS_POP:
549 self._progress_level -= 1
550 elif whence == subunit.PROGRESS_PUSH:
551 self._progress_level += 1
552 elif whence == subunit.PROGRESS_SET:
553 if self._progress_level == 0:
554 self.totalsuites = offset
555 elif whence == subunit.PROGRESS_CUR:
556 raise NotImplementedError
558 def time(self, dt):
559 if self.start_time is None:
560 self.start_time = dt
561 self.last_time = dt
563 def start_testsuite(self, name):
564 self.index += 1
565 self.name = name
567 if not self.verbose:
568 self.test_output[name] = ""
570 total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
571 self.statistics['TESTS_EXPECTED_FAIL'] +
572 self.statistics['TESTS_ERROR'] +
573 self.statistics['TESTS_UNEXPECTED_FAIL'] +
574 self.statistics['TESTS_UNEXPECTED_OK'])
576 out = "[%d(%d)" % (self.index, total_tests)
577 if self.totalsuites is not None:
578 out += "/%d" % self.totalsuites
579 if self.start_time is not None:
580 out += " at " + self._format_time(self.last_time - self.start_time)
581 if self.suitesfailed:
582 out += ", %d errors" % (len(self.suitesfailed),)
583 out += "] %s" % name
584 if self.immediate:
585 sys.stdout.write(out + "\n")
586 else:
587 sys.stdout.write(out + ": ")
589 def output_msg(self, output):
590 if self.verbose:
591 sys.stdout.write(output)
592 elif self.name is not None:
593 self.test_output[self.name] += output
594 else:
595 sys.stdout.write(output)
597 def control_msg(self, output):
598 pass
600 def end_testsuite(self, name, result, reason):
601 out = ""
602 unexpected = False
604 if name not in self.test_output:
605 print("no output for name[%s]" % name)
607 if result in ("success", "xfail"):
608 self.suites_ok += 1
609 else:
610 self.output_msg("ERROR: Testsuite[%s]\n" % name)
611 if reason is not None:
612 self.output_msg("REASON: %s\n" % (reason,))
613 self.suitesfailed.append(name)
614 if self.immediate and not self.verbose and name in self.test_output:
615 out += self.test_output[name]
616 unexpected = True
618 if not self.immediate:
619 if not unexpected:
620 out += " ok\n"
621 else:
622 out += " " + result.upper() + "\n"
624 sys.stdout.write(out)
626 def startTest(self, test):
627 pass
629 def addSuccess(self, test):
630 self.end_test(test.id(), "success", False)
632 def addError(self, test, err=None):
633 self.end_test(test.id(), "error", True, err)
635 def addFailure(self, test, err=None):
636 self.end_test(test.id(), "failure", True, err)
638 def addSkip(self, test, reason=None):
639 self.end_test(test.id(), "skip", False, reason)
641 def addExpectedFailure(self, test, err=None):
642 self.end_test(test.id(), "xfail", False, err)
644 def addUnexpectedSuccess(self, test):
645 self.end_test(test.id(), "uxsuccess", True)
647 def end_test(self, testname, result, unexpected, err=None):
648 if not unexpected:
649 self.test_output[self.name] = ""
650 if not self.immediate:
651 sys.stdout.write({
652 'failure': 'f',
653 'xfail': 'X',
654 'skip': 's',
655 'success': '.'}.get(result, "?(%s)" % result))
656 return
658 if self.name not in self.test_output:
659 self.test_output[self.name] = ""
661 self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
662 if err is not None:
663 self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
665 if self.immediate and not self.verbose:
666 sys.stdout.write(self.test_output[self.name])
667 self.test_output[self.name] = ""
669 if not self.immediate:
670 sys.stdout.write({
671 'error': 'E',
672 'failure': 'F',
673 'uxsuccess': 'U',
674 'success': 'S'}.get(result, "?"))
676 def write_summary(self, path):
677 f = open(path, 'w+')
679 if self.suitesfailed:
680 f.write("= Failed tests =\n")
682 for suite in self.suitesfailed:
683 f.write("== %s ==\n" % suite)
684 if suite in self.test_output:
685 f.write(self.test_output[suite] + "\n\n")
687 f.write("\n")
689 if not self.immediate and not self.verbose:
690 for suite in self.suitesfailed:
691 print("=" * 78)
692 print("FAIL: %s" % suite)
693 if suite in self.test_output:
694 print(self.test_output[suite])
695 print("")
697 f.write("= Skipped tests =\n")
698 for reason in self.skips.keys():
699 f.write(reason + "\n")
700 for name in self.skips[reason]:
701 f.write("\t%s\n" % name)
702 f.write("\n")
703 f.close()
705 if (not self.suitesfailed and
706 not self.statistics['TESTS_UNEXPECTED_FAIL'] and
707 not self.statistics['TESTS_UNEXPECTED_OK'] and
708 not self.statistics['TESTS_ERROR']):
709 ok = (self.statistics['TESTS_EXPECTED_OK'] +
710 self.statistics['TESTS_EXPECTED_FAIL'])
711 print("\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok))
712 else:
713 print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
714 self.statistics['TESTS_UNEXPECTED_FAIL'],
715 self.statistics['TESTS_ERROR'],
716 self.statistics['TESTS_UNEXPECTED_OK'],
717 len(self.suitesfailed)))
719 def skip_testsuite(self, name, reason="UNKNOWN"):
720 self.skips.setdefault(reason, []).append(name)
721 if self.totalsuites:
722 self.totalsuites -= 1