target/ppc: convert vspltis[bhw] to use vector operations
[qemu/ar7.git] / tests / qemu-iotests / iotests.py
blobb461f53abf8727085772bdd066bd042c62fabdc2
1 from __future__ import print_function
2 # Common utilities and Python wrappers for qemu-iotests
4 # Copyright (C) 2012 IBM Corp.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 import errno
21 import os
22 import re
23 import subprocess
24 import string
25 import unittest
26 import sys
27 import struct
28 import json
29 import signal
30 import logging
31 import atexit
32 import io
33 from collections import OrderedDict
35 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
36 import qtest
39 # This will not work if arguments contain spaces but is necessary if we
40 # want to support the override options that ./check supports.
41 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
42 if os.environ.get('QEMU_IMG_OPTIONS'):
43 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
45 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
46 if os.environ.get('QEMU_IO_OPTIONS'):
47 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
49 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
50 if os.environ.get('QEMU_NBD_OPTIONS'):
51 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
53 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
54 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
56 imgfmt = os.environ.get('IMGFMT', 'raw')
57 imgproto = os.environ.get('IMGPROTO', 'file')
58 test_dir = os.environ.get('TEST_DIR')
59 output_dir = os.environ.get('OUTPUT_DIR', '.')
60 cachemode = os.environ.get('CACHEMODE')
61 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
63 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
64 debug = False
66 luks_default_secret_object = 'secret,id=keysec0,data=' + \
67 os.environ.get('IMGKEYSECRET', '')
68 luks_default_key_secret_opt = 'key-secret=keysec0'
71 def qemu_img(*args):
72 '''Run qemu-img and return the exit code'''
73 devnull = open('/dev/null', 'r+')
74 exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
75 if exitcode < 0:
76 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
77 return exitcode
79 def ordered_qmp(qmsg):
80 # Dictionaries are not ordered prior to 3.6, therefore:
81 if isinstance(qmsg, list):
82 return [ordered_qmp(atom) for atom in qmsg]
83 if isinstance(qmsg, dict):
84 od = OrderedDict()
85 for k, v in sorted(qmsg.items()):
86 od[k] = ordered_qmp(v)
87 return od
88 return qmsg
90 def qemu_img_create(*args):
91 args = list(args)
93 # default luks support
94 if '-f' in args and args[args.index('-f') + 1] == 'luks':
95 if '-o' in args:
96 i = args.index('-o')
97 if 'key-secret' not in args[i + 1]:
98 args[i + 1].append(luks_default_key_secret_opt)
99 args.insert(i + 2, '--object')
100 args.insert(i + 3, luks_default_secret_object)
101 else:
102 args = ['-o', luks_default_key_secret_opt,
103 '--object', luks_default_secret_object] + args
105 args.insert(0, 'create')
107 return qemu_img(*args)
109 def qemu_img_verbose(*args):
110 '''Run qemu-img without suppressing its output and return the exit code'''
111 exitcode = subprocess.call(qemu_img_args + list(args))
112 if exitcode < 0:
113 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
114 return exitcode
116 def qemu_img_pipe(*args):
117 '''Run qemu-img and return its output'''
118 subp = subprocess.Popen(qemu_img_args + list(args),
119 stdout=subprocess.PIPE,
120 stderr=subprocess.STDOUT,
121 universal_newlines=True)
122 exitcode = subp.wait()
123 if exitcode < 0:
124 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
125 return subp.communicate()[0]
127 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]):
128 args = [ 'info' ]
129 if imgopts:
130 args.append('--image-opts')
131 else:
132 args += [ '-f', imgfmt ]
133 args += extra_args
134 args.append(filename)
136 output = qemu_img_pipe(*args)
137 if not filter_path:
138 filter_path = filename
139 log(filter_img_info(output, filter_path))
141 def qemu_io(*args):
142 '''Run qemu-io and return the stdout data'''
143 args = qemu_io_args + list(args)
144 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
145 stderr=subprocess.STDOUT,
146 universal_newlines=True)
147 exitcode = subp.wait()
148 if exitcode < 0:
149 sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
150 return subp.communicate()[0]
152 def qemu_io_silent(*args):
153 '''Run qemu-io and return the exit code, suppressing stdout'''
154 args = qemu_io_args + list(args)
155 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
156 if exitcode < 0:
157 sys.stderr.write('qemu-io received signal %i: %s\n' %
158 (-exitcode, ' '.join(args)))
159 return exitcode
162 class QemuIoInteractive:
163 def __init__(self, *args):
164 self.args = qemu_io_args + list(args)
165 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
166 stdout=subprocess.PIPE,
167 stderr=subprocess.STDOUT,
168 universal_newlines=True)
169 assert self._p.stdout.read(9) == 'qemu-io> '
171 def close(self):
172 self._p.communicate('q\n')
174 def _read_output(self):
175 pattern = 'qemu-io> '
176 n = len(pattern)
177 pos = 0
178 s = []
179 while pos != n:
180 c = self._p.stdout.read(1)
181 # check unexpected EOF
182 assert c != ''
183 s.append(c)
184 if c == pattern[pos]:
185 pos += 1
186 else:
187 pos = 0
189 return ''.join(s[:-n])
191 def cmd(self, cmd):
192 # quit command is in close(), '\n' is added automatically
193 assert '\n' not in cmd
194 cmd = cmd.strip()
195 assert cmd != 'q' and cmd != 'quit'
196 self._p.stdin.write(cmd + '\n')
197 self._p.stdin.flush()
198 return self._read_output()
201 def qemu_nbd(*args):
202 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
203 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
205 def qemu_nbd_pipe(*args):
206 '''Run qemu-nbd in daemon mode and return both the parent's exit code
207 and its output'''
208 subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
209 stdout=subprocess.PIPE,
210 stderr=subprocess.STDOUT,
211 universal_newlines=True)
212 exitcode = subp.wait()
213 if exitcode < 0:
214 sys.stderr.write('qemu-nbd received signal %i: %s\n' %
215 (-exitcode,
216 ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
217 return exitcode, subp.communicate()[0]
219 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
220 '''Return True if two image files are identical'''
221 return qemu_img('compare', '-f', fmt1,
222 '-F', fmt2, img1, img2) == 0
224 def create_image(name, size):
225 '''Create a fully-allocated raw image with sector markers'''
226 file = open(name, 'wb')
227 i = 0
228 while i < size:
229 sector = struct.pack('>l504xl', i // 512, i // 512)
230 file.write(sector)
231 i = i + 512
232 file.close()
234 def image_size(img):
235 '''Return image's virtual size'''
236 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
237 return json.loads(r)['virtual-size']
239 test_dir_re = re.compile(r"%s" % test_dir)
240 def filter_test_dir(msg):
241 return test_dir_re.sub("TEST_DIR", msg)
243 win32_re = re.compile(r"\r")
244 def filter_win32(msg):
245 return win32_re.sub("", msg)
247 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
248 def filter_qemu_io(msg):
249 msg = filter_win32(msg)
250 return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
252 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
253 def filter_chown(msg):
254 return chown_re.sub("chown UID:GID", msg)
256 def filter_qmp_event(event):
257 '''Filter a QMP event dict'''
258 event = dict(event)
259 if 'timestamp' in event:
260 event['timestamp']['seconds'] = 'SECS'
261 event['timestamp']['microseconds'] = 'USECS'
262 return event
264 def filter_qmp(qmsg, filter_fn):
265 '''Given a string filter, filter a QMP object's values.
266 filter_fn takes a (key, value) pair.'''
267 # Iterate through either lists or dicts;
268 if isinstance(qmsg, list):
269 items = enumerate(qmsg)
270 else:
271 items = qmsg.items()
273 for k, v in items:
274 if isinstance(v, list) or isinstance(v, dict):
275 qmsg[k] = filter_qmp(v, filter_fn)
276 else:
277 qmsg[k] = filter_fn(k, v)
278 return qmsg
280 def filter_testfiles(msg):
281 prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
282 return msg.replace(prefix, 'TEST_DIR/PID-')
284 def filter_qmp_testfiles(qmsg):
285 def _filter(key, value):
286 if key == 'filename' or key == 'backing-file':
287 return filter_testfiles(value)
288 return value
289 return filter_qmp(qmsg, _filter)
291 def filter_generated_node_ids(msg):
292 return re.sub("#block[0-9]+", "NODE_NAME", msg)
294 def filter_img_info(output, filename):
295 lines = []
296 for line in output.split('\n'):
297 if 'disk size' in line or 'actual-size' in line:
298 continue
299 line = line.replace(filename, 'TEST_IMG') \
300 .replace(imgfmt, 'IMGFMT')
301 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
302 line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
303 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
304 lines.append(line)
305 return '\n'.join(lines)
307 def log(msg, filters=[], indent=None):
308 '''Logs either a string message or a JSON serializable message (like QMP).
309 If indent is provided, JSON serializable messages are pretty-printed.'''
310 for flt in filters:
311 msg = flt(msg)
312 if isinstance(msg, dict) or isinstance(msg, list):
313 # Python < 3.4 needs to know not to add whitespace when pretty-printing:
314 separators = (', ', ': ') if indent is None else (',', ': ')
315 # Don't sort if it's already sorted
316 do_sort = not isinstance(msg, OrderedDict)
317 print(json.dumps(msg, sort_keys=do_sort,
318 indent=indent, separators=separators))
319 else:
320 print(msg)
322 class Timeout:
323 def __init__(self, seconds, errmsg = "Timeout"):
324 self.seconds = seconds
325 self.errmsg = errmsg
326 def __enter__(self):
327 signal.signal(signal.SIGALRM, self.timeout)
328 signal.setitimer(signal.ITIMER_REAL, self.seconds)
329 return self
330 def __exit__(self, type, value, traceback):
331 signal.setitimer(signal.ITIMER_REAL, 0)
332 return False
333 def timeout(self, signum, frame):
334 raise Exception(self.errmsg)
337 class FilePath(object):
338 '''An auto-generated filename that cleans itself up.
340 Use this context manager to generate filenames and ensure that the file
341 gets deleted::
343 with TestFilePath('test.img') as img_path:
344 qemu_img('create', img_path, '1G')
345 # migration_sock_path is automatically deleted
347 def __init__(self, name):
348 filename = '{0}-{1}'.format(os.getpid(), name)
349 self.path = os.path.join(test_dir, filename)
351 def __enter__(self):
352 return self.path
354 def __exit__(self, exc_type, exc_val, exc_tb):
355 try:
356 os.remove(self.path)
357 except OSError:
358 pass
359 return False
362 def file_path_remover():
363 for path in reversed(file_path_remover.paths):
364 try:
365 os.remove(path)
366 except OSError:
367 pass
370 def file_path(*names):
371 ''' Another way to get auto-generated filename that cleans itself up.
373 Use is as simple as:
375 img_a, img_b = file_path('a.img', 'b.img')
376 sock = file_path('socket')
379 if not hasattr(file_path_remover, 'paths'):
380 file_path_remover.paths = []
381 atexit.register(file_path_remover)
383 paths = []
384 for name in names:
385 filename = '{0}-{1}'.format(os.getpid(), name)
386 path = os.path.join(test_dir, filename)
387 file_path_remover.paths.append(path)
388 paths.append(path)
390 return paths[0] if len(paths) == 1 else paths
392 def remote_filename(path):
393 if imgproto == 'file':
394 return path
395 elif imgproto == 'ssh':
396 return "ssh://127.0.0.1%s" % (path)
397 else:
398 raise Exception("Protocol %s not supported" % (imgproto))
400 class VM(qtest.QEMUQtestMachine):
401 '''A QEMU VM'''
403 def __init__(self, path_suffix=''):
404 name = "qemu%s-%d" % (path_suffix, os.getpid())
405 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
406 test_dir=test_dir,
407 socket_scm_helper=socket_scm_helper)
408 self._num_drives = 0
410 def add_object(self, opts):
411 self._args.append('-object')
412 self._args.append(opts)
413 return self
415 def add_device(self, opts):
416 self._args.append('-device')
417 self._args.append(opts)
418 return self
420 def add_drive_raw(self, opts):
421 self._args.append('-drive')
422 self._args.append(opts)
423 return self
425 def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
426 '''Add a virtio-blk drive to the VM'''
427 options = ['if=%s' % interface,
428 'id=drive%d' % self._num_drives]
430 if path is not None:
431 options.append('file=%s' % path)
432 options.append('format=%s' % format)
433 options.append('cache=%s' % cachemode)
435 if opts:
436 options.append(opts)
438 if format == 'luks' and 'key-secret' not in opts:
439 # default luks support
440 if luks_default_secret_object not in self._args:
441 self.add_object(luks_default_secret_object)
443 options.append(luks_default_key_secret_opt)
445 self._args.append('-drive')
446 self._args.append(','.join(options))
447 self._num_drives += 1
448 return self
450 def add_blockdev(self, opts):
451 self._args.append('-blockdev')
452 if isinstance(opts, str):
453 self._args.append(opts)
454 else:
455 self._args.append(','.join(opts))
456 return self
458 def add_incoming(self, addr):
459 self._args.append('-incoming')
460 self._args.append(addr)
461 return self
463 def pause_drive(self, drive, event=None):
464 '''Pause drive r/w operations'''
465 if not event:
466 self.pause_drive(drive, "read_aio")
467 self.pause_drive(drive, "write_aio")
468 return
469 self.qmp('human-monitor-command',
470 command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
472 def resume_drive(self, drive):
473 self.qmp('human-monitor-command',
474 command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
476 def hmp_qemu_io(self, drive, cmd):
477 '''Write to a given drive using an HMP command'''
478 return self.qmp('human-monitor-command',
479 command_line='qemu-io %s "%s"' % (drive, cmd))
481 def flatten_qmp_object(self, obj, output=None, basestr=''):
482 if output is None:
483 output = dict()
484 if isinstance(obj, list):
485 for i in range(len(obj)):
486 self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
487 elif isinstance(obj, dict):
488 for key in obj:
489 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
490 else:
491 output[basestr[:-1]] = obj # Strip trailing '.'
492 return output
494 def qmp_to_opts(self, obj):
495 obj = self.flatten_qmp_object(obj)
496 output_list = list()
497 for key in obj:
498 output_list += [key + '=' + obj[key]]
499 return ','.join(output_list)
501 def get_qmp_events_filtered(self, wait=True):
502 result = []
503 for ev in self.get_qmp_events(wait=wait):
504 result.append(filter_qmp_event(ev))
505 return result
507 def qmp_log(self, cmd, filters=[], indent=None, **kwargs):
508 full_cmd = OrderedDict((
509 ("execute", cmd),
510 ("arguments", ordered_qmp(kwargs))
512 log(full_cmd, filters, indent=indent)
513 result = self.qmp(cmd, **kwargs)
514 log(result, filters, indent=indent)
515 return result
517 def run_job(self, job, auto_finalize=True, auto_dismiss=False):
518 while True:
519 for ev in self.get_qmp_events_filtered(wait=True):
520 if ev['event'] == 'JOB_STATUS_CHANGE':
521 status = ev['data']['status']
522 if status == 'aborting':
523 result = self.qmp('query-jobs')
524 for j in result['return']:
525 if j['id'] == job:
526 log('Job failed: %s' % (j['error']))
527 elif status == 'pending' and not auto_finalize:
528 self.qmp_log('job-finalize', id=job)
529 elif status == 'concluded' and not auto_dismiss:
530 self.qmp_log('job-dismiss', id=job)
531 elif status == 'null':
532 return
533 else:
534 iotests.log(ev)
537 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
539 class QMPTestCase(unittest.TestCase):
540 '''Abstract base class for QMP test cases'''
542 def dictpath(self, d, path):
543 '''Traverse a path in a nested dict'''
544 for component in path.split('/'):
545 m = index_re.match(component)
546 if m:
547 component, idx = m.groups()
548 idx = int(idx)
550 if not isinstance(d, dict) or component not in d:
551 self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
552 d = d[component]
554 if m:
555 if not isinstance(d, list):
556 self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
557 try:
558 d = d[idx]
559 except IndexError:
560 self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
561 return d
563 def assert_qmp_absent(self, d, path):
564 try:
565 result = self.dictpath(d, path)
566 except AssertionError:
567 return
568 self.fail('path "%s" has value "%s"' % (path, str(result)))
570 def assert_qmp(self, d, path, value):
571 '''Assert that the value for a specific path in a QMP dict matches'''
572 result = self.dictpath(d, path)
573 self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value)))
575 def assert_no_active_block_jobs(self):
576 result = self.vm.qmp('query-block-jobs')
577 self.assert_qmp(result, 'return', [])
579 def assert_has_block_node(self, node_name=None, file_name=None):
580 """Issue a query-named-block-nodes and assert node_name and/or
581 file_name is present in the result"""
582 def check_equal_or_none(a, b):
583 return a == None or b == None or a == b
584 assert node_name or file_name
585 result = self.vm.qmp('query-named-block-nodes')
586 for x in result["return"]:
587 if check_equal_or_none(x.get("node-name"), node_name) and \
588 check_equal_or_none(x.get("file"), file_name):
589 return
590 self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
591 (node_name, file_name, result))
593 def assert_json_filename_equal(self, json_filename, reference):
594 '''Asserts that the given filename is a json: filename and that its
595 content is equal to the given reference object'''
596 self.assertEqual(json_filename[:5], 'json:')
597 self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
598 self.vm.flatten_qmp_object(reference))
600 def cancel_and_wait(self, drive='drive0', force=False, resume=False):
601 '''Cancel a block job and wait for it to finish, returning the event'''
602 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
603 self.assert_qmp(result, 'return', {})
605 if resume:
606 self.vm.resume_drive(drive)
608 cancelled = False
609 result = None
610 while not cancelled:
611 for event in self.vm.get_qmp_events(wait=True):
612 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
613 event['event'] == 'BLOCK_JOB_CANCELLED':
614 self.assert_qmp(event, 'data/device', drive)
615 result = event
616 cancelled = True
617 elif event['event'] == 'JOB_STATUS_CHANGE':
618 self.assert_qmp(event, 'data/id', drive)
621 self.assert_no_active_block_jobs()
622 return result
624 def wait_until_completed(self, drive='drive0', check_offset=True):
625 '''Wait for a block job to finish, returning the event'''
626 while True:
627 for event in self.vm.get_qmp_events(wait=True):
628 if event['event'] == 'BLOCK_JOB_COMPLETED':
629 self.assert_qmp(event, 'data/device', drive)
630 self.assert_qmp_absent(event, 'data/error')
631 if check_offset:
632 self.assert_qmp(event, 'data/offset', event['data']['len'])
633 self.assert_no_active_block_jobs()
634 return event
635 elif event['event'] == 'JOB_STATUS_CHANGE':
636 self.assert_qmp(event, 'data/id', drive)
638 def wait_ready(self, drive='drive0'):
639 '''Wait until a block job BLOCK_JOB_READY event'''
640 f = {'data': {'type': 'mirror', 'device': drive } }
641 event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
643 def wait_ready_and_cancel(self, drive='drive0'):
644 self.wait_ready(drive=drive)
645 event = self.cancel_and_wait(drive=drive)
646 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
647 self.assert_qmp(event, 'data/type', 'mirror')
648 self.assert_qmp(event, 'data/offset', event['data']['len'])
650 def complete_and_wait(self, drive='drive0', wait_ready=True):
651 '''Complete a block job and wait for it to finish'''
652 if wait_ready:
653 self.wait_ready(drive=drive)
655 result = self.vm.qmp('block-job-complete', device=drive)
656 self.assert_qmp(result, 'return', {})
658 event = self.wait_until_completed(drive=drive)
659 self.assert_qmp(event, 'data/type', 'mirror')
661 def pause_wait(self, job_id='job0'):
662 with Timeout(1, "Timeout waiting for job to pause"):
663 while True:
664 result = self.vm.qmp('query-block-jobs')
665 found = False
666 for job in result['return']:
667 if job['device'] == job_id:
668 found = True
669 if job['paused'] == True and job['busy'] == False:
670 return job
671 break
672 assert found
674 def pause_job(self, job_id='job0', wait=True):
675 result = self.vm.qmp('block-job-pause', device=job_id)
676 self.assert_qmp(result, 'return', {})
677 if wait:
678 return self.pause_wait(job_id)
679 return result
682 def notrun(reason):
683 '''Skip this test suite'''
684 # Each test in qemu-iotests has a number ("seq")
685 seq = os.path.basename(sys.argv[0])
687 open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n')
688 print('%s not run: %s' % (seq, reason))
689 sys.exit(0)
691 def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
692 assert not (supported_fmts and unsupported_fmts)
694 if 'generic' in supported_fmts and \
695 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
696 # similar to
697 # _supported_fmt generic
698 # for bash tests
699 return
701 not_sup = supported_fmts and (imgfmt not in supported_fmts)
702 if not_sup or (imgfmt in unsupported_fmts):
703 notrun('not suitable for this image format: %s' % imgfmt)
705 def verify_protocol(supported=[], unsupported=[]):
706 assert not (supported and unsupported)
708 if 'generic' in supported:
709 return
711 not_sup = supported and (imgproto not in supported)
712 if not_sup or (imgproto in unsupported):
713 notrun('not suitable for this protocol: %s' % imgproto)
715 def verify_platform(supported_oses=['linux']):
716 if True not in [sys.platform.startswith(x) for x in supported_oses]:
717 notrun('not suitable for this OS: %s' % sys.platform)
719 def verify_cache_mode(supported_cache_modes=[]):
720 if supported_cache_modes and (cachemode not in supported_cache_modes):
721 notrun('not suitable for this cache mode: %s' % cachemode)
723 def supports_quorum():
724 return 'quorum' in qemu_img_pipe('--help')
726 def verify_quorum():
727 '''Skip test suite if quorum support is not available'''
728 if not supports_quorum():
729 notrun('quorum support missing')
731 def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[],
732 unsupported_fmts=[]):
733 '''Run tests'''
735 global debug
737 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
738 # indicate that we're not being run via "check". There may be
739 # other things set up by "check" that individual test cases rely
740 # on.
741 if test_dir is None or qemu_default_machine is None:
742 sys.stderr.write('Please run this test via the "check" script\n')
743 sys.exit(os.EX_USAGE)
745 debug = '-d' in sys.argv
746 verbosity = 1
747 verify_image_format(supported_fmts, unsupported_fmts)
748 verify_platform(supported_oses)
749 verify_cache_mode(supported_cache_modes)
751 if debug:
752 output = sys.stdout
753 verbosity = 2
754 sys.argv.remove('-d')
755 else:
756 # We need to filter out the time taken from the output so that
757 # qemu-iotest can reliably diff the results against master output.
758 if sys.version_info.major >= 3:
759 output = io.StringIO()
760 else:
761 # io.StringIO is for unicode strings, which is not what
762 # 2.x's test runner emits.
763 output = io.BytesIO()
765 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
767 class MyTestRunner(unittest.TextTestRunner):
768 def __init__(self, stream=output, descriptions=True, verbosity=verbosity):
769 unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
771 # unittest.main() will use sys.exit() so expect a SystemExit exception
772 try:
773 unittest.main(testRunner=MyTestRunner)
774 finally:
775 if not debug:
776 sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', output.getvalue()))