Merge remote-tracking branch 'remotes/ehabkost/tags/python-next-pull-request' into...
[qemu.git] / tests / qemu-iotests / iotests.py
blob4e67fbbe96d082f638420bd2a41e43e45a80a2df
1 from __future__ import print_function
2 # Common utilities and Python wrappers for qemu-iotests
4 # Copyright (C) 2012 IBM Corp.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 import errno
21 import os
22 import re
23 import subprocess
24 import string
25 import unittest
26 import sys
27 import struct
28 import json
29 import signal
30 import logging
31 import atexit
33 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
34 import qtest
37 # This will not work if arguments contain spaces but is necessary if we
38 # want to support the override options that ./check supports.
39 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
40 if os.environ.get('QEMU_IMG_OPTIONS'):
41 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
43 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
44 if os.environ.get('QEMU_IO_OPTIONS'):
45 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
47 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
48 if os.environ.get('QEMU_NBD_OPTIONS'):
49 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
51 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
52 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
54 imgfmt = os.environ.get('IMGFMT', 'raw')
55 imgproto = os.environ.get('IMGPROTO', 'file')
56 test_dir = os.environ.get('TEST_DIR')
57 output_dir = os.environ.get('OUTPUT_DIR', '.')
58 cachemode = os.environ.get('CACHEMODE')
59 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
61 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
62 debug = False
64 luks_default_secret_object = 'secret,id=keysec0,data=' + \
65 os.environ['IMGKEYSECRET']
66 luks_default_key_secret_opt = 'key-secret=keysec0'
69 def qemu_img(*args):
70 '''Run qemu-img and return the exit code'''
71 devnull = open('/dev/null', 'r+')
72 exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
73 if exitcode < 0:
74 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
75 return exitcode
77 def qemu_img_create(*args):
78 args = list(args)
80 # default luks support
81 if '-f' in args and args[args.index('-f') + 1] == 'luks':
82 if '-o' in args:
83 i = args.index('-o')
84 if 'key-secret' not in args[i + 1]:
85 args[i + 1].append(luks_default_key_secret_opt)
86 args.insert(i + 2, '--object')
87 args.insert(i + 3, luks_default_secret_object)
88 else:
89 args = ['-o', luks_default_key_secret_opt,
90 '--object', luks_default_secret_object] + args
92 args.insert(0, 'create')
94 return qemu_img(*args)
96 def qemu_img_verbose(*args):
97 '''Run qemu-img without suppressing its output and return the exit code'''
98 exitcode = subprocess.call(qemu_img_args + list(args))
99 if exitcode < 0:
100 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
101 return exitcode
103 def qemu_img_pipe(*args):
104 '''Run qemu-img and return its output'''
105 subp = subprocess.Popen(qemu_img_args + list(args),
106 stdout=subprocess.PIPE,
107 stderr=subprocess.STDOUT)
108 exitcode = subp.wait()
109 if exitcode < 0:
110 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
111 return subp.communicate()[0]
113 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]):
114 args = [ 'info' ]
115 if imgopts:
116 args.append('--image-opts')
117 else:
118 args += [ '-f', imgfmt ]
119 args += extra_args
120 args.append(filename)
122 output = qemu_img_pipe(*args)
123 if not filter_path:
124 filter_path = filename
125 log(filter_img_info(output, filter_path))
127 def qemu_io(*args):
128 '''Run qemu-io and return the stdout data'''
129 args = qemu_io_args + list(args)
130 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
131 stderr=subprocess.STDOUT)
132 exitcode = subp.wait()
133 if exitcode < 0:
134 sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
135 return subp.communicate()[0]
137 def qemu_io_silent(*args):
138 '''Run qemu-io and return the exit code, suppressing stdout'''
139 args = qemu_io_args + list(args)
140 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
141 if exitcode < 0:
142 sys.stderr.write('qemu-io received signal %i: %s\n' %
143 (-exitcode, ' '.join(args)))
144 return exitcode
147 class QemuIoInteractive:
148 def __init__(self, *args):
149 self.args = qemu_io_args + list(args)
150 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
151 stdout=subprocess.PIPE,
152 stderr=subprocess.STDOUT)
153 assert self._p.stdout.read(9) == 'qemu-io> '
155 def close(self):
156 self._p.communicate('q\n')
158 def _read_output(self):
159 pattern = 'qemu-io> '
160 n = len(pattern)
161 pos = 0
162 s = []
163 while pos != n:
164 c = self._p.stdout.read(1)
165 # check unexpected EOF
166 assert c != ''
167 s.append(c)
168 if c == pattern[pos]:
169 pos += 1
170 else:
171 pos = 0
173 return ''.join(s[:-n])
175 def cmd(self, cmd):
176 # quit command is in close(), '\n' is added automatically
177 assert '\n' not in cmd
178 cmd = cmd.strip()
179 assert cmd != 'q' and cmd != 'quit'
180 self._p.stdin.write(cmd + '\n')
181 return self._read_output()
184 def qemu_nbd(*args):
185 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
186 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
188 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
189 '''Return True if two image files are identical'''
190 return qemu_img('compare', '-f', fmt1,
191 '-F', fmt2, img1, img2) == 0
193 def create_image(name, size):
194 '''Create a fully-allocated raw image with sector markers'''
195 file = open(name, 'w')
196 i = 0
197 while i < size:
198 sector = struct.pack('>l504xl', i / 512, i / 512)
199 file.write(sector)
200 i = i + 512
201 file.close()
203 def image_size(img):
204 '''Return image's virtual size'''
205 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
206 return json.loads(r)['virtual-size']
208 test_dir_re = re.compile(r"%s" % test_dir)
209 def filter_test_dir(msg):
210 return test_dir_re.sub("TEST_DIR", msg)
212 win32_re = re.compile(r"\r")
213 def filter_win32(msg):
214 return win32_re.sub("", msg)
216 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
217 def filter_qemu_io(msg):
218 msg = filter_win32(msg)
219 return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
221 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
222 def filter_chown(msg):
223 return chown_re.sub("chown UID:GID", msg)
225 def filter_qmp_event(event):
226 '''Filter a QMP event dict'''
227 event = dict(event)
228 if 'timestamp' in event:
229 event['timestamp']['seconds'] = 'SECS'
230 event['timestamp']['microseconds'] = 'USECS'
231 return event
233 def filter_testfiles(msg):
234 prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
235 return msg.replace(prefix, 'TEST_DIR/PID-')
237 def filter_img_info(output, filename):
238 lines = []
239 for line in output.split('\n'):
240 if 'disk size' in line or 'actual-size' in line:
241 continue
242 line = line.replace(filename, 'TEST_IMG') \
243 .replace(imgfmt, 'IMGFMT')
244 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
245 line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
246 lines.append(line)
247 return '\n'.join(lines)
249 def log(msg, filters=[]):
250 for flt in filters:
251 msg = flt(msg)
252 print(msg)
254 class Timeout:
255 def __init__(self, seconds, errmsg = "Timeout"):
256 self.seconds = seconds
257 self.errmsg = errmsg
258 def __enter__(self):
259 signal.signal(signal.SIGALRM, self.timeout)
260 signal.setitimer(signal.ITIMER_REAL, self.seconds)
261 return self
262 def __exit__(self, type, value, traceback):
263 signal.setitimer(signal.ITIMER_REAL, 0)
264 return False
265 def timeout(self, signum, frame):
266 raise Exception(self.errmsg)
269 class FilePath(object):
270 '''An auto-generated filename that cleans itself up.
272 Use this context manager to generate filenames and ensure that the file
273 gets deleted::
275 with TestFilePath('test.img') as img_path:
276 qemu_img('create', img_path, '1G')
277 # migration_sock_path is automatically deleted
279 def __init__(self, name):
280 filename = '{0}-{1}'.format(os.getpid(), name)
281 self.path = os.path.join(test_dir, filename)
283 def __enter__(self):
284 return self.path
286 def __exit__(self, exc_type, exc_val, exc_tb):
287 try:
288 os.remove(self.path)
289 except OSError:
290 pass
291 return False
294 def file_path_remover():
295 for path in reversed(file_path_remover.paths):
296 try:
297 os.remove(path)
298 except OSError:
299 pass
302 def file_path(*names):
303 ''' Another way to get auto-generated filename that cleans itself up.
305 Use is as simple as:
307 img_a, img_b = file_path('a.img', 'b.img')
308 sock = file_path('socket')
311 if not hasattr(file_path_remover, 'paths'):
312 file_path_remover.paths = []
313 atexit.register(file_path_remover)
315 paths = []
316 for name in names:
317 filename = '{0}-{1}'.format(os.getpid(), name)
318 path = os.path.join(test_dir, filename)
319 file_path_remover.paths.append(path)
320 paths.append(path)
322 return paths[0] if len(paths) == 1 else paths
324 def remote_filename(path):
325 if imgproto == 'file':
326 return path
327 elif imgproto == 'ssh':
328 return "ssh://127.0.0.1%s" % (path)
329 else:
330 raise Exception("Protocol %s not supported" % (imgproto))
332 class VM(qtest.QEMUQtestMachine):
333 '''A QEMU VM'''
335 def __init__(self, path_suffix=''):
336 name = "qemu%s-%d" % (path_suffix, os.getpid())
337 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
338 test_dir=test_dir,
339 socket_scm_helper=socket_scm_helper)
340 self._num_drives = 0
342 def add_object(self, opts):
343 self._args.append('-object')
344 self._args.append(opts)
345 return self
347 def add_device(self, opts):
348 self._args.append('-device')
349 self._args.append(opts)
350 return self
352 def add_drive_raw(self, opts):
353 self._args.append('-drive')
354 self._args.append(opts)
355 return self
357 def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
358 '''Add a virtio-blk drive to the VM'''
359 options = ['if=%s' % interface,
360 'id=drive%d' % self._num_drives]
362 if path is not None:
363 options.append('file=%s' % path)
364 options.append('format=%s' % format)
365 options.append('cache=%s' % cachemode)
367 if opts:
368 options.append(opts)
370 if format == 'luks' and 'key-secret' not in opts:
371 # default luks support
372 if luks_default_secret_object not in self._args:
373 self.add_object(luks_default_secret_object)
375 options.append(luks_default_key_secret_opt)
377 self._args.append('-drive')
378 self._args.append(','.join(options))
379 self._num_drives += 1
380 return self
382 def add_blockdev(self, opts):
383 self._args.append('-blockdev')
384 if isinstance(opts, str):
385 self._args.append(opts)
386 else:
387 self._args.append(','.join(opts))
388 return self
390 def add_incoming(self, addr):
391 self._args.append('-incoming')
392 self._args.append(addr)
393 return self
395 def pause_drive(self, drive, event=None):
396 '''Pause drive r/w operations'''
397 if not event:
398 self.pause_drive(drive, "read_aio")
399 self.pause_drive(drive, "write_aio")
400 return
401 self.qmp('human-monitor-command',
402 command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
404 def resume_drive(self, drive):
405 self.qmp('human-monitor-command',
406 command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
408 def hmp_qemu_io(self, drive, cmd):
409 '''Write to a given drive using an HMP command'''
410 return self.qmp('human-monitor-command',
411 command_line='qemu-io %s "%s"' % (drive, cmd))
413 def flatten_qmp_object(self, obj, output=None, basestr=''):
414 if output is None:
415 output = dict()
416 if isinstance(obj, list):
417 for i in range(len(obj)):
418 self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
419 elif isinstance(obj, dict):
420 for key in obj:
421 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
422 else:
423 output[basestr[:-1]] = obj # Strip trailing '.'
424 return output
426 def qmp_to_opts(self, obj):
427 obj = self.flatten_qmp_object(obj)
428 output_list = list()
429 for key in obj:
430 output_list += [key + '=' + obj[key]]
431 return ','.join(output_list)
433 def get_qmp_events_filtered(self, wait=True):
434 result = []
435 for ev in self.get_qmp_events(wait=wait):
436 result.append(filter_qmp_event(ev))
437 return result
439 def qmp_log(self, cmd, filters=[filter_testfiles], **kwargs):
440 logmsg = "{'execute': '%s', 'arguments': %s}" % (cmd, kwargs)
441 log(logmsg, filters)
442 result = self.qmp(cmd, **kwargs)
443 log(str(result), filters)
444 return result
446 def run_job(self, job, auto_finalize=True, auto_dismiss=False):
447 while True:
448 for ev in self.get_qmp_events_filtered(wait=True):
449 if ev['event'] == 'JOB_STATUS_CHANGE':
450 status = ev['data']['status']
451 if status == 'aborting':
452 result = self.qmp('query-jobs')
453 for j in result['return']:
454 if j['id'] == job:
455 log('Job failed: %s' % (j['error']))
456 elif status == 'pending' and not auto_finalize:
457 self.qmp_log('job-finalize', id=job)
458 elif status == 'concluded' and not auto_dismiss:
459 self.qmp_log('job-dismiss', id=job)
460 elif status == 'null':
461 return
462 else:
463 iotests.log(ev)
466 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
468 class QMPTestCase(unittest.TestCase):
469 '''Abstract base class for QMP test cases'''
471 def dictpath(self, d, path):
472 '''Traverse a path in a nested dict'''
473 for component in path.split('/'):
474 m = index_re.match(component)
475 if m:
476 component, idx = m.groups()
477 idx = int(idx)
479 if not isinstance(d, dict) or component not in d:
480 self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
481 d = d[component]
483 if m:
484 if not isinstance(d, list):
485 self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
486 try:
487 d = d[idx]
488 except IndexError:
489 self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
490 return d
492 def assert_qmp_absent(self, d, path):
493 try:
494 result = self.dictpath(d, path)
495 except AssertionError:
496 return
497 self.fail('path "%s" has value "%s"' % (path, str(result)))
499 def assert_qmp(self, d, path, value):
500 '''Assert that the value for a specific path in a QMP dict matches'''
501 result = self.dictpath(d, path)
502 self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value)))
504 def assert_no_active_block_jobs(self):
505 result = self.vm.qmp('query-block-jobs')
506 self.assert_qmp(result, 'return', [])
508 def assert_has_block_node(self, node_name=None, file_name=None):
509 """Issue a query-named-block-nodes and assert node_name and/or
510 file_name is present in the result"""
511 def check_equal_or_none(a, b):
512 return a == None or b == None or a == b
513 assert node_name or file_name
514 result = self.vm.qmp('query-named-block-nodes')
515 for x in result["return"]:
516 if check_equal_or_none(x.get("node-name"), node_name) and \
517 check_equal_or_none(x.get("file"), file_name):
518 return
519 self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
520 (node_name, file_name, result))
522 def assert_json_filename_equal(self, json_filename, reference):
523 '''Asserts that the given filename is a json: filename and that its
524 content is equal to the given reference object'''
525 self.assertEqual(json_filename[:5], 'json:')
526 self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
527 self.vm.flatten_qmp_object(reference))
529 def cancel_and_wait(self, drive='drive0', force=False, resume=False):
530 '''Cancel a block job and wait for it to finish, returning the event'''
531 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
532 self.assert_qmp(result, 'return', {})
534 if resume:
535 self.vm.resume_drive(drive)
537 cancelled = False
538 result = None
539 while not cancelled:
540 for event in self.vm.get_qmp_events(wait=True):
541 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
542 event['event'] == 'BLOCK_JOB_CANCELLED':
543 self.assert_qmp(event, 'data/device', drive)
544 result = event
545 cancelled = True
546 elif event['event'] == 'JOB_STATUS_CHANGE':
547 self.assert_qmp(event, 'data/id', drive)
550 self.assert_no_active_block_jobs()
551 return result
553 def wait_until_completed(self, drive='drive0', check_offset=True):
554 '''Wait for a block job to finish, returning the event'''
555 while True:
556 for event in self.vm.get_qmp_events(wait=True):
557 if event['event'] == 'BLOCK_JOB_COMPLETED':
558 self.assert_qmp(event, 'data/device', drive)
559 self.assert_qmp_absent(event, 'data/error')
560 if check_offset:
561 self.assert_qmp(event, 'data/offset', event['data']['len'])
562 self.assert_no_active_block_jobs()
563 return event
564 elif event['event'] == 'JOB_STATUS_CHANGE':
565 self.assert_qmp(event, 'data/id', drive)
567 def wait_ready(self, drive='drive0'):
568 '''Wait until a block job BLOCK_JOB_READY event'''
569 f = {'data': {'type': 'mirror', 'device': drive } }
570 event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
572 def wait_ready_and_cancel(self, drive='drive0'):
573 self.wait_ready(drive=drive)
574 event = self.cancel_and_wait(drive=drive)
575 self.assertEquals(event['event'], 'BLOCK_JOB_COMPLETED')
576 self.assert_qmp(event, 'data/type', 'mirror')
577 self.assert_qmp(event, 'data/offset', event['data']['len'])
579 def complete_and_wait(self, drive='drive0', wait_ready=True):
580 '''Complete a block job and wait for it to finish'''
581 if wait_ready:
582 self.wait_ready(drive=drive)
584 result = self.vm.qmp('block-job-complete', device=drive)
585 self.assert_qmp(result, 'return', {})
587 event = self.wait_until_completed(drive=drive)
588 self.assert_qmp(event, 'data/type', 'mirror')
590 def pause_wait(self, job_id='job0'):
591 with Timeout(1, "Timeout waiting for job to pause"):
592 while True:
593 result = self.vm.qmp('query-block-jobs')
594 found = False
595 for job in result['return']:
596 if job['device'] == job_id:
597 found = True
598 if job['paused'] == True and job['busy'] == False:
599 return job
600 break
601 assert found
603 def pause_job(self, job_id='job0', wait=True):
604 result = self.vm.qmp('block-job-pause', device=job_id)
605 self.assert_qmp(result, 'return', {})
606 if wait:
607 return self.pause_wait(job_id)
608 return result
611 def notrun(reason):
612 '''Skip this test suite'''
613 # Each test in qemu-iotests has a number ("seq")
614 seq = os.path.basename(sys.argv[0])
616 open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n')
617 print('%s not run: %s' % (seq, reason))
618 sys.exit(0)
620 def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
621 assert not (supported_fmts and unsupported_fmts)
623 if 'generic' in supported_fmts and \
624 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
625 # similar to
626 # _supported_fmt generic
627 # for bash tests
628 return
630 not_sup = supported_fmts and (imgfmt not in supported_fmts)
631 if not_sup or (imgfmt in unsupported_fmts):
632 notrun('not suitable for this image format: %s' % imgfmt)
634 def verify_protocol(supported=[], unsupported=[]):
635 assert not (supported and unsupported)
637 if 'generic' in supported:
638 return
640 not_sup = supported and (imgproto not in supported)
641 if not_sup or (imgproto in unsupported):
642 notrun('not suitable for this protocol: %s' % imgproto)
644 def verify_platform(supported_oses=['linux']):
645 if True not in [sys.platform.startswith(x) for x in supported_oses]:
646 notrun('not suitable for this OS: %s' % sys.platform)
648 def verify_cache_mode(supported_cache_modes=[]):
649 if supported_cache_modes and (cachemode not in supported_cache_modes):
650 notrun('not suitable for this cache mode: %s' % cachemode)
652 def supports_quorum():
653 return 'quorum' in qemu_img_pipe('--help')
655 def verify_quorum():
656 '''Skip test suite if quorum support is not available'''
657 if not supports_quorum():
658 notrun('quorum support missing')
660 def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[],
661 unsupported_fmts=[]):
662 '''Run tests'''
664 global debug
666 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
667 # indicate that we're not being run via "check". There may be
668 # other things set up by "check" that individual test cases rely
669 # on.
670 if test_dir is None or qemu_default_machine is None:
671 sys.stderr.write('Please run this test via the "check" script\n')
672 sys.exit(os.EX_USAGE)
674 debug = '-d' in sys.argv
675 verbosity = 1
676 verify_image_format(supported_fmts, unsupported_fmts)
677 verify_platform(supported_oses)
678 verify_cache_mode(supported_cache_modes)
680 # We need to filter out the time taken from the output so that qemu-iotest
681 # can reliably diff the results against master output.
682 import StringIO
683 if debug:
684 output = sys.stdout
685 verbosity = 2
686 sys.argv.remove('-d')
687 else:
688 output = StringIO.StringIO()
690 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
692 class MyTestRunner(unittest.TextTestRunner):
693 def __init__(self, stream=output, descriptions=True, verbosity=verbosity):
694 unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
696 # unittest.main() will use sys.exit() so expect a SystemExit exception
697 try:
698 unittest.main(testRunner=MyTestRunner)
699 finally:
700 if not debug:
701 sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', output.getvalue()))