selftest/Samba4: correctly pass KRB5CCNAME to provision
[Samba.git] / selftest / subunithelper.py
blob4fbb5442839552f3a90fdb50bde54b2a1cb75159
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
19 __all__ = ['parse_results']
21 import datetime
22 import re
23 import sys
24 import os
25 from samba import subunit
26 from samba.subunit.run import TestProtocolClient
27 from samba.subunit import iso8601
28 import unittest
31 VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
32 'knownfail', 'error', 'xfail', 'skip-testsuite',
33 'testsuite-failure', 'testsuite-xfail',
34 'testsuite-success', 'testsuite-error',
35 'uxsuccess', 'testsuite-uxsuccess'])
38 class TestsuiteEnabledTestResult(unittest.TestResult):
40 def start_testsuite(self, name):
41 raise NotImplementedError(self.start_testsuite)
44 def parse_results(msg_ops, statistics, fh):
45 exitcode = 0
46 open_tests = {}
48 for l in fh:
49 parts = l.split(None, 1)
50 if not len(parts) == 2 or not l.startswith(parts[0]):
51 msg_ops.output_msg(l)
52 continue
53 command = parts[0].rstrip(":")
54 arg = parts[1]
55 if command in ("test", "testing"):
56 msg_ops.control_msg(l)
57 name = arg.rstrip()
58 test = subunit.RemotedTestCase(name)
59 if name in open_tests:
60 msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
61 msg_ops.startTest(test)
62 open_tests[name] = test
63 elif command == "time":
64 msg_ops.control_msg(l)
65 try:
66 dt = iso8601.parse_date(arg.rstrip("\n"))
67 except TypeError as e:
68 print("Unable to parse time line: %s" % arg.rstrip("\n"))
69 else:
70 msg_ops.time(dt)
71 elif command in VALID_RESULTS:
72 msg_ops.control_msg(l)
73 result = command
74 grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
75 (testname, hasreason) = (grp.group(1), grp.group(2))
76 if hasreason:
77 reason = ""
78 # reason may be specified in next lines
79 terminated = False
80 for l in fh:
81 msg_ops.control_msg(l)
82 if l == "]\n":
83 terminated = True
84 break
85 else:
86 reason += l
88 if isinstance(reason, bytes):
89 remote_error = subunit.RemoteError(reason.decode("utf-8"))
90 else:
91 remote_error = subunit.RemoteError(reason)
93 if not terminated:
94 statistics['TESTS_ERROR'] += 1
95 msg_ops.addError(subunit.RemotedTestCase(testname),
96 subunit.RemoteError(u"result (%s) reason (%s) interrupted" % (result, reason)))
97 return 1
98 else:
99 reason = None
100 remote_error = subunit.RemoteError(u"No reason specified")
101 if result in ("success", "successful"):
102 try:
103 test = open_tests.pop(testname)
104 except KeyError:
105 statistics['TESTS_ERROR'] += 1
106 exitcode = 1
107 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
108 else:
109 statistics['TESTS_EXPECTED_OK'] += 1
110 msg_ops.addSuccess(test)
111 elif result in ("xfail", "knownfail"):
112 try:
113 test = open_tests.pop(testname)
114 except KeyError:
115 statistics['TESTS_ERROR'] += 1
116 exitcode = 1
117 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
118 else:
119 statistics['TESTS_EXPECTED_FAIL'] += 1
120 msg_ops.addExpectedFailure(test, remote_error)
121 elif result in ("uxsuccess", ):
122 try:
123 test = open_tests.pop(testname)
124 except KeyError:
125 statistics['TESTS_ERROR'] += 1
126 exitcode = 1
127 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
128 else:
129 statistics['TESTS_UNEXPECTED_OK'] += 1
130 msg_ops.addUnexpectedSuccess(test)
131 exitcode = 1
132 elif result in ("failure", "fail"):
133 try:
134 test = open_tests.pop(testname)
135 except KeyError:
136 statistics['TESTS_ERROR'] += 1
137 exitcode = 1
138 msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
139 else:
140 statistics['TESTS_UNEXPECTED_FAIL'] += 1
141 exitcode = 1
142 msg_ops.addFailure(test, remote_error)
143 elif result == "skip":
144 statistics['TESTS_SKIP'] += 1
145 # Allow tests to be skipped without prior announcement of test
146 try:
147 test = open_tests.pop(testname)
148 except KeyError:
149 test = subunit.RemotedTestCase(testname)
150 msg_ops.addSkip(test, reason)
151 elif result == "error":
152 statistics['TESTS_ERROR'] += 1
153 exitcode = 1
154 try:
155 test = open_tests.pop(testname)
156 except KeyError:
157 test = subunit.RemotedTestCase(testname)
158 msg_ops.addError(test, remote_error)
159 elif result == "skip-testsuite":
160 msg_ops.skip_testsuite(testname)
161 elif result == "testsuite-success":
162 msg_ops.end_testsuite(testname, "success", reason)
163 elif result == "testsuite-failure":
164 msg_ops.end_testsuite(testname, "failure", reason)
165 exitcode = 1
166 elif result == "testsuite-xfail":
167 msg_ops.end_testsuite(testname, "xfail", reason)
168 elif result == "testsuite-uxsuccess":
169 msg_ops.end_testsuite(testname, "uxsuccess", reason)
170 exitcode = 1
171 elif result == "testsuite-error":
172 msg_ops.end_testsuite(testname, "error", reason)
173 exitcode = 1
174 else:
175 raise AssertionError("Recognized but unhandled result %r" %
176 result)
177 elif command == "testsuite":
178 msg_ops.start_testsuite(arg.strip())
179 elif command == "progress":
180 arg = arg.strip()
181 if arg == "pop":
182 msg_ops.progress(None, subunit.PROGRESS_POP)
183 elif arg == "push":
184 msg_ops.progress(None, subunit.PROGRESS_PUSH)
185 elif arg[0] in '+-':
186 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
187 else:
188 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
189 else:
190 msg_ops.output_msg(l)
192 while open_tests:
193 test = subunit.RemotedTestCase(open_tests.popitem()[1])
194 msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
195 statistics['TESTS_ERROR'] += 1
196 exitcode = 1
198 return exitcode
201 class SubunitOps(TestProtocolClient, TestsuiteEnabledTestResult):
203 def progress(self, count, whence):
204 if whence == subunit.PROGRESS_POP:
205 self._stream.write("progress: pop\n")
206 elif whence == subunit.PROGRESS_PUSH:
207 self._stream.write("progress: push\n")
208 elif whence == subunit.PROGRESS_SET:
209 self._stream.write("progress: %d\n" % count)
210 elif whence == subunit.PROGRESS_CUR:
211 raise NotImplementedError
213 # The following are Samba extensions:
214 def start_testsuite(self, name):
215 self._stream.write("testsuite: %s\n" % name)
217 def skip_testsuite(self, name, reason=None):
218 if reason:
219 self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
220 else:
221 self._stream.write("skip-testsuite: %s\n" % name)
223 def end_testsuite(self, name, result, reason=None):
224 if reason:
225 self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
226 else:
227 self._stream.write("testsuite-%s: %s\n" % (result, name))
229 def output_msg(self, msg):
230 self._stream.write(msg)
233 def read_test_regexes(*names):
234 ret = {}
235 files = []
236 for name in names:
237 # if we are given a directory, we read all the files it contains
238 # (except the ones that end with "~").
239 if os.path.isdir(name):
240 files.extend([os.path.join(name, x)
241 for x in os.listdir(name)
242 if x[-1] != '~'])
243 else:
244 files.append(name)
246 for filename in files:
247 with open(filename, 'r') as f:
248 for l in f:
249 l = l.strip()
250 if l == "" or l[0] == "#":
251 continue
252 if "#" in l:
253 (regex, reason) = l.split("#", 1)
254 ret[regex.strip()] = reason.strip()
255 else:
256 ret[l] = None
258 return ret
261 def find_in_list(regexes, fullname):
262 for regex, reason in regexes.items():
263 if re.match(regex, fullname):
264 if reason is None:
265 return ""
266 return reason
267 return None
270 class ImmediateFail(Exception):
271 """Raised to abort immediately."""
273 def __init__(self):
274 super(ImmediateFail, self).__init__("test failed and fail_immediately set")
277 class FilterOps(unittest.TestResult):
279 def control_msg(self, msg):
280 pass # We regenerate control messages, so ignore this
282 def time(self, time):
283 self._ops.time(time)
285 def progress(self, delta, whence):
286 self._ops.progress(delta, whence)
288 def output_msg(self, msg):
289 if self.output is None:
290 sys.stdout.write(msg)
291 else:
292 self.output += msg
294 def startTest(self, test):
295 self.seen_output = True
296 test = self._add_prefix(test)
297 if self.strip_ok_output:
298 self.output = ""
300 self._ops.startTest(test)
302 def _add_prefix(self, test):
303 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
305 def addError(self, test, err=None):
306 test = self._add_prefix(test)
307 self.error_added += 1
308 self.total_error += 1
309 self._ops.addError(test, err)
310 self.output = None
311 if self.fail_immediately:
312 raise ImmediateFail()
314 def addSkip(self, test, reason=None):
315 self.seen_output = True
316 test = self._add_prefix(test)
317 self._ops.addSkip(test, reason)
318 self.output = None
320 def addExpectedFailure(self, test, err=None):
321 test = self._add_prefix(test)
322 self._ops.addExpectedFailure(test, err)
323 self.output = None
325 def addUnexpectedSuccess(self, test):
326 test = self._add_prefix(test)
327 self.uxsuccess_added += 1
328 self.total_uxsuccess += 1
329 self._ops.addUnexpectedSuccess(test)
330 if self.output:
331 self._ops.output_msg(self.output)
332 self.output = None
333 if self.fail_immediately:
334 raise ImmediateFail()
336 def addFailure(self, test, err=None):
337 test = self._add_prefix(test)
338 xfail_reason = find_in_list(self.expected_failures, test.id())
339 if xfail_reason is None:
340 xfail_reason = find_in_list(self.flapping, test.id())
341 if xfail_reason is not None:
342 self.xfail_added += 1
343 self.total_xfail += 1
344 self._ops.addExpectedFailure(test, err)
345 else:
346 self.fail_added += 1
347 self.total_fail += 1
348 self._ops.addFailure(test, err)
349 if self.output:
350 self._ops.output_msg(self.output)
351 if self.fail_immediately:
352 raise ImmediateFail()
353 self.output = None
355 def addSuccess(self, test):
356 test = self._add_prefix(test)
357 xfail_reason = find_in_list(self.expected_failures, test.id())
358 if xfail_reason is not None:
359 self.uxsuccess_added += 1
360 self.total_uxsuccess += 1
361 self._ops.addUnexpectedSuccess(test)
362 if self.output:
363 self._ops.output_msg(self.output)
364 if self.fail_immediately:
365 raise ImmediateFail()
366 else:
367 self._ops.addSuccess(test)
368 self.output = None
370 def skip_testsuite(self, name, reason=None):
371 self._ops.skip_testsuite(name, reason)
373 def start_testsuite(self, name):
374 self._ops.start_testsuite(name)
375 self.error_added = 0
376 self.fail_added = 0
377 self.xfail_added = 0
378 self.uxsuccess_added = 0
380 def end_testsuite(self, name, result, reason=None):
381 xfail = False
383 if self.xfail_added > 0:
384 xfail = True
385 if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
386 xfail = False
388 if xfail and result in ("fail", "failure"):
389 result = "xfail"
391 if self.uxsuccess_added > 0 and result != "uxsuccess":
392 result = "uxsuccess"
393 if reason is None:
394 reason = "Subunit/Filter Reason"
395 reason += "\n uxsuccess[%d]" % self.uxsuccess_added
397 if self.fail_added > 0 and result != "failure":
398 result = "failure"
399 if reason is None:
400 reason = "Subunit/Filter Reason"
401 reason += "\n failures[%d]" % self.fail_added
403 if self.error_added > 0 and result != "error":
404 result = "error"
405 if reason is None:
406 reason = "Subunit/Filter Reason"
407 reason += "\n errors[%d]" % self.error_added
409 self._ops.end_testsuite(name, result, reason)
410 if result not in ("success", "xfail"):
411 if self.output:
412 self._ops.output_msg(self.output)
413 if self.fail_immediately:
414 raise ImmediateFail()
415 self.output = None
417 def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
418 strip_ok_output=False, fail_immediately=False,
419 flapping=None):
420 self._ops = out
421 self.seen_output = False
422 self.output = None
423 self.prefix = prefix
424 self.suffix = suffix
425 if expected_failures is not None:
426 self.expected_failures = expected_failures
427 else:
428 self.expected_failures = {}
429 if flapping is not None:
430 self.flapping = flapping
431 else:
432 self.flapping = {}
433 self.strip_ok_output = strip_ok_output
434 self.xfail_added = 0
435 self.fail_added = 0
436 self.uxsuccess_added = 0
437 self.total_xfail = 0
438 self.total_error = 0
439 self.total_fail = 0
440 self.total_uxsuccess = 0
441 self.error_added = 0
442 self.fail_immediately = fail_immediately
445 class PerfFilterOps(unittest.TestResult):
447 def progress(self, delta, whence):
448 pass
450 def output_msg(self, msg):
451 pass
453 def control_msg(self, msg):
454 pass
456 def skip_testsuite(self, name, reason=None):
457 self._ops.skip_testsuite(name, reason)
459 def start_testsuite(self, name):
460 self.suite_has_time = False
462 def end_testsuite(self, name, result, reason=None):
463 pass
465 def _add_prefix(self, test):
466 return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
468 def time(self, time):
469 self.latest_time = time
470 #self._ops.output_msg("found time %s\n" % time)
471 self.suite_has_time = True
473 def get_time(self):
474 if self.suite_has_time:
475 return self.latest_time
476 return datetime.datetime.utcnow()
478 def startTest(self, test):
479 self.seen_output = True
480 test = self._add_prefix(test)
481 self.starts[test.id()] = self.get_time()
483 def addSuccess(self, test):
484 test = self._add_prefix(test)
485 tid = test.id()
486 if tid not in self.starts:
487 self._ops.addError(test, "%s succeeded without ever starting!" % tid)
488 delta = self.get_time() - self.starts[tid]
489 self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
491 def addFailure(self, test, err=''):
492 tid = test.id()
493 delta = self.get_time() - self.starts[tid]
494 self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
495 (tid, delta.total_seconds(), err))
497 def addError(self, test, err=''):
498 tid = test.id()
499 delta = self.get_time() - self.starts[tid]
500 self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
501 (tid, delta.total_seconds(), err))
503 def __init__(self, out, prefix='', suffix=''):
504 self._ops = out
505 self.prefix = prefix or ''
506 self.suffix = suffix or ''
507 self.starts = {}
508 self.seen_output = False
509 self.suite_has_time = False
512 class PlainFormatter(TestsuiteEnabledTestResult):
514 def __init__(self, verbose, immediate, statistics,
515 totaltests=None):
516 super(PlainFormatter, self).__init__()
517 self.verbose = verbose
518 self.immediate = immediate
519 self.statistics = statistics
520 self.start_time = None
521 self.test_output = {}
522 self.suitesfailed = []
523 self.suites_ok = 0
524 self.skips = {}
525 self.index = 0
526 self.name = None
527 self._progress_level = 0
528 self.totalsuites = totaltests
529 self.last_time = None
531 @staticmethod
532 def _format_time(delta):
533 minutes, seconds = divmod(delta.seconds, 60)
534 hours, minutes = divmod(minutes, 60)
535 ret = ""
536 if hours:
537 ret += "%dh" % hours
538 if minutes:
539 ret += "%dm" % minutes
540 ret += "%ds" % seconds
541 return ret
543 def progress(self, offset, whence):
544 if whence == subunit.PROGRESS_POP:
545 self._progress_level -= 1
546 elif whence == subunit.PROGRESS_PUSH:
547 self._progress_level += 1
548 elif whence == subunit.PROGRESS_SET:
549 if self._progress_level == 0:
550 self.totalsuites = offset
551 elif whence == subunit.PROGRESS_CUR:
552 raise NotImplementedError
554 def time(self, dt):
555 if self.start_time is None:
556 self.start_time = dt
557 self.last_time = dt
559 def start_testsuite(self, name):
560 self.index += 1
561 self.name = name
563 if not self.verbose:
564 self.test_output[name] = ""
566 total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
567 self.statistics['TESTS_EXPECTED_FAIL'] +
568 self.statistics['TESTS_ERROR'] +
569 self.statistics['TESTS_UNEXPECTED_FAIL'] +
570 self.statistics['TESTS_UNEXPECTED_OK'])
572 out = "[%d(%d)" % (self.index, total_tests)
573 if self.totalsuites is not None:
574 out += "/%d" % self.totalsuites
575 if self.start_time is not None:
576 out += " at " + self._format_time(self.last_time - self.start_time)
577 if self.suitesfailed:
578 out += ", %d errors" % (len(self.suitesfailed),)
579 out += "] %s" % name
580 if self.immediate:
581 sys.stdout.write(out + "\n")
582 else:
583 sys.stdout.write(out + ": ")
585 def output_msg(self, output):
586 if self.verbose:
587 sys.stdout.write(output)
588 elif self.name is not None:
589 self.test_output[self.name] += output
590 else:
591 sys.stdout.write(output)
593 def control_msg(self, output):
594 pass
596 def end_testsuite(self, name, result, reason):
597 out = ""
598 unexpected = False
600 if name not in self.test_output:
601 print("no output for name[%s]" % name)
603 if result in ("success", "xfail"):
604 self.suites_ok += 1
605 else:
606 self.output_msg("ERROR: Testsuite[%s]\n" % name)
607 if reason is not None:
608 self.output_msg("REASON: %s\n" % (reason,))
609 self.suitesfailed.append(name)
610 if self.immediate and not self.verbose and name in self.test_output:
611 out += self.test_output[name]
612 unexpected = True
614 if not self.immediate:
615 if not unexpected:
616 out += " ok\n"
617 else:
618 out += " " + result.upper() + "\n"
620 sys.stdout.write(out)
622 def startTest(self, test):
623 pass
625 def addSuccess(self, test):
626 self.end_test(test.id(), "success", False)
628 def addError(self, test, err=None):
629 self.end_test(test.id(), "error", True, err)
631 def addFailure(self, test, err=None):
632 self.end_test(test.id(), "failure", True, err)
634 def addSkip(self, test, reason=None):
635 self.end_test(test.id(), "skip", False, reason)
637 def addExpectedFailure(self, test, err=None):
638 self.end_test(test.id(), "xfail", False, err)
640 def addUnexpectedSuccess(self, test):
641 self.end_test(test.id(), "uxsuccess", True)
643 def end_test(self, testname, result, unexpected, err=None):
644 if not unexpected:
645 self.test_output[self.name] = ""
646 if not self.immediate:
647 sys.stdout.write({
648 'failure': 'f',
649 'xfail': 'X',
650 'skip': 's',
651 'success': '.'}.get(result, "?(%s)" % result))
652 return
654 if self.name not in self.test_output:
655 self.test_output[self.name] = ""
657 self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
658 if err is not None:
659 self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
661 if self.immediate and not self.verbose:
662 sys.stdout.write(self.test_output[self.name])
663 self.test_output[self.name] = ""
665 if not self.immediate:
666 sys.stdout.write({
667 'error': 'E',
668 'failure': 'F',
669 'uxsuccess': 'U',
670 'success': 'S'}.get(result, "?"))
672 def write_summary(self, path):
673 f = open(path, 'w+')
675 if self.suitesfailed:
676 f.write("= Failed tests =\n")
678 for suite in self.suitesfailed:
679 f.write("== %s ==\n" % suite)
680 if suite in self.test_output:
681 f.write(self.test_output[suite] + "\n\n")
683 f.write("\n")
685 if not self.immediate and not self.verbose:
686 for suite in self.suitesfailed:
687 print("=" * 78)
688 print("FAIL: %s" % suite)
689 if suite in self.test_output:
690 print(self.test_output[suite])
691 print("")
693 f.write("= Skipped tests =\n")
694 for reason in self.skips.keys():
695 f.write(reason + "\n")
696 for name in self.skips[reason]:
697 f.write("\t%s\n" % name)
698 f.write("\n")
699 f.close()
701 if (not self.suitesfailed and
702 not self.statistics['TESTS_UNEXPECTED_FAIL'] and
703 not self.statistics['TESTS_UNEXPECTED_OK'] and
704 not self.statistics['TESTS_ERROR']):
705 ok = (self.statistics['TESTS_EXPECTED_OK'] +
706 self.statistics['TESTS_EXPECTED_FAIL'])
707 print("\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok))
708 else:
709 print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
710 self.statistics['TESTS_UNEXPECTED_FAIL'],
711 self.statistics['TESTS_ERROR'],
712 self.statistics['TESTS_UNEXPECTED_OK'],
713 len(self.suitesfailed)))
715 def skip_testsuite(self, name, reason="UNKNOWN"):
716 self.skips.setdefault(reason, []).append(name)
717 if self.totalsuites:
718 self.totalsuites -= 1