target/arm: Convert VMOV (register) to decodetree
[qemu/ar7.git] / tests / qemu-iotests / iotests.py
blob6bcddd887048e85f5b04077055b2c8dfd8e12c0c
1 from __future__ import print_function
2 # Common utilities and Python wrappers for qemu-iotests
4 # Copyright (C) 2012 IBM Corp.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 import errno
21 import os
22 import re
23 import subprocess
24 import string
25 import unittest
26 import sys
27 import struct
28 import json
29 import signal
30 import logging
31 import atexit
32 import io
33 from collections import OrderedDict
35 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
36 from qemu import qtest
39 # This will not work if arguments contain spaces but is necessary if we
40 # want to support the override options that ./check supports.
41 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
42 if os.environ.get('QEMU_IMG_OPTIONS'):
43 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
45 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
46 if os.environ.get('QEMU_IO_OPTIONS'):
47 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
49 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
50 if os.environ.get('QEMU_NBD_OPTIONS'):
51 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
53 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
54 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
56 imgfmt = os.environ.get('IMGFMT', 'raw')
57 imgproto = os.environ.get('IMGPROTO', 'file')
58 test_dir = os.environ.get('TEST_DIR')
59 output_dir = os.environ.get('OUTPUT_DIR', '.')
60 cachemode = os.environ.get('CACHEMODE')
61 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
63 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
64 debug = False
66 luks_default_secret_object = 'secret,id=keysec0,data=' + \
67 os.environ.get('IMGKEYSECRET', '')
68 luks_default_key_secret_opt = 'key-secret=keysec0'
71 def qemu_img(*args):
72 '''Run qemu-img and return the exit code'''
73 devnull = open('/dev/null', 'r+')
74 exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
75 if exitcode < 0:
76 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
77 return exitcode
79 def ordered_qmp(qmsg, conv_keys=True):
80 # Dictionaries are not ordered prior to 3.6, therefore:
81 if isinstance(qmsg, list):
82 return [ordered_qmp(atom) for atom in qmsg]
83 if isinstance(qmsg, dict):
84 od = OrderedDict()
85 for k, v in sorted(qmsg.items()):
86 if conv_keys:
87 k = k.replace('_', '-')
88 od[k] = ordered_qmp(v, conv_keys=False)
89 return od
90 return qmsg
92 def qemu_img_create(*args):
93 args = list(args)
95 # default luks support
96 if '-f' in args and args[args.index('-f') + 1] == 'luks':
97 if '-o' in args:
98 i = args.index('-o')
99 if 'key-secret' not in args[i + 1]:
100 args[i + 1].append(luks_default_key_secret_opt)
101 args.insert(i + 2, '--object')
102 args.insert(i + 3, luks_default_secret_object)
103 else:
104 args = ['-o', luks_default_key_secret_opt,
105 '--object', luks_default_secret_object] + args
107 args.insert(0, 'create')
109 return qemu_img(*args)
111 def qemu_img_verbose(*args):
112 '''Run qemu-img without suppressing its output and return the exit code'''
113 exitcode = subprocess.call(qemu_img_args + list(args))
114 if exitcode < 0:
115 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
116 return exitcode
118 def qemu_img_pipe(*args):
119 '''Run qemu-img and return its output'''
120 subp = subprocess.Popen(qemu_img_args + list(args),
121 stdout=subprocess.PIPE,
122 stderr=subprocess.STDOUT,
123 universal_newlines=True)
124 exitcode = subp.wait()
125 if exitcode < 0:
126 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
127 return subp.communicate()[0]
129 def qemu_img_log(*args):
130 result = qemu_img_pipe(*args)
131 log(result, filters=[filter_testfiles])
132 return result
134 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]):
135 args = [ 'info' ]
136 if imgopts:
137 args.append('--image-opts')
138 else:
139 args += [ '-f', imgfmt ]
140 args += extra_args
141 args.append(filename)
143 output = qemu_img_pipe(*args)
144 if not filter_path:
145 filter_path = filename
146 log(filter_img_info(output, filter_path))
148 def qemu_io(*args):
149 '''Run qemu-io and return the stdout data'''
150 args = qemu_io_args + list(args)
151 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
152 stderr=subprocess.STDOUT,
153 universal_newlines=True)
154 exitcode = subp.wait()
155 if exitcode < 0:
156 sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
157 return subp.communicate()[0]
159 def qemu_io_silent(*args):
160 '''Run qemu-io and return the exit code, suppressing stdout'''
161 args = qemu_io_args + list(args)
162 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
163 if exitcode < 0:
164 sys.stderr.write('qemu-io received signal %i: %s\n' %
165 (-exitcode, ' '.join(args)))
166 return exitcode
169 class QemuIoInteractive:
170 def __init__(self, *args):
171 self.args = qemu_io_args + list(args)
172 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
173 stdout=subprocess.PIPE,
174 stderr=subprocess.STDOUT,
175 universal_newlines=True)
176 assert self._p.stdout.read(9) == 'qemu-io> '
178 def close(self):
179 self._p.communicate('q\n')
181 def _read_output(self):
182 pattern = 'qemu-io> '
183 n = len(pattern)
184 pos = 0
185 s = []
186 while pos != n:
187 c = self._p.stdout.read(1)
188 # check unexpected EOF
189 assert c != ''
190 s.append(c)
191 if c == pattern[pos]:
192 pos += 1
193 else:
194 pos = 0
196 return ''.join(s[:-n])
198 def cmd(self, cmd):
199 # quit command is in close(), '\n' is added automatically
200 assert '\n' not in cmd
201 cmd = cmd.strip()
202 assert cmd != 'q' and cmd != 'quit'
203 self._p.stdin.write(cmd + '\n')
204 self._p.stdin.flush()
205 return self._read_output()
208 def qemu_nbd(*args):
209 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
210 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
212 def qemu_nbd_pipe(*args):
213 '''Run qemu-nbd in daemon mode and return both the parent's exit code
214 and its output'''
215 subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
216 stdout=subprocess.PIPE,
217 stderr=subprocess.STDOUT,
218 universal_newlines=True)
219 exitcode = subp.wait()
220 if exitcode < 0:
221 sys.stderr.write('qemu-nbd received signal %i: %s\n' %
222 (-exitcode,
223 ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
224 return exitcode, subp.communicate()[0]
226 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
227 '''Return True if two image files are identical'''
228 return qemu_img('compare', '-f', fmt1,
229 '-F', fmt2, img1, img2) == 0
231 def create_image(name, size):
232 '''Create a fully-allocated raw image with sector markers'''
233 file = open(name, 'wb')
234 i = 0
235 while i < size:
236 sector = struct.pack('>l504xl', i // 512, i // 512)
237 file.write(sector)
238 i = i + 512
239 file.close()
241 def image_size(img):
242 '''Return image's virtual size'''
243 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
244 return json.loads(r)['virtual-size']
246 def is_str(val):
247 if sys.version_info.major >= 3:
248 return isinstance(val, str)
249 else:
250 return isinstance(val, str) or isinstance(val, unicode)
252 test_dir_re = re.compile(r"%s" % test_dir)
253 def filter_test_dir(msg):
254 return test_dir_re.sub("TEST_DIR", msg)
256 win32_re = re.compile(r"\r")
257 def filter_win32(msg):
258 return win32_re.sub("", msg)
260 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
261 def filter_qemu_io(msg):
262 msg = filter_win32(msg)
263 return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
265 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
266 def filter_chown(msg):
267 return chown_re.sub("chown UID:GID", msg)
269 def filter_qmp_event(event):
270 '''Filter a QMP event dict'''
271 event = dict(event)
272 if 'timestamp' in event:
273 event['timestamp']['seconds'] = 'SECS'
274 event['timestamp']['microseconds'] = 'USECS'
275 return event
277 def filter_qmp(qmsg, filter_fn):
278 '''Given a string filter, filter a QMP object's values.
279 filter_fn takes a (key, value) pair.'''
280 # Iterate through either lists or dicts;
281 if isinstance(qmsg, list):
282 items = enumerate(qmsg)
283 else:
284 items = qmsg.items()
286 for k, v in items:
287 if isinstance(v, list) or isinstance(v, dict):
288 qmsg[k] = filter_qmp(v, filter_fn)
289 else:
290 qmsg[k] = filter_fn(k, v)
291 return qmsg
293 def filter_testfiles(msg):
294 prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
295 return msg.replace(prefix, 'TEST_DIR/PID-')
297 def filter_qmp_testfiles(qmsg):
298 def _filter(key, value):
299 if is_str(value):
300 return filter_testfiles(value)
301 return value
302 return filter_qmp(qmsg, _filter)
304 def filter_generated_node_ids(msg):
305 return re.sub("#block[0-9]+", "NODE_NAME", msg)
307 def filter_img_info(output, filename):
308 lines = []
309 for line in output.split('\n'):
310 if 'disk size' in line or 'actual-size' in line:
311 continue
312 line = line.replace(filename, 'TEST_IMG') \
313 .replace(imgfmt, 'IMGFMT')
314 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
315 line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
316 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
317 lines.append(line)
318 return '\n'.join(lines)
320 def filter_imgfmt(msg):
321 return msg.replace(imgfmt, 'IMGFMT')
323 def filter_qmp_imgfmt(qmsg):
324 def _filter(key, value):
325 if is_str(value):
326 return filter_imgfmt(value)
327 return value
328 return filter_qmp(qmsg, _filter)
330 def log(msg, filters=[], indent=None):
331 '''Logs either a string message or a JSON serializable message (like QMP).
332 If indent is provided, JSON serializable messages are pretty-printed.'''
333 for flt in filters:
334 msg = flt(msg)
335 if isinstance(msg, dict) or isinstance(msg, list):
336 # Python < 3.4 needs to know not to add whitespace when pretty-printing:
337 separators = (', ', ': ') if indent is None else (',', ': ')
338 # Don't sort if it's already sorted
339 do_sort = not isinstance(msg, OrderedDict)
340 print(json.dumps(msg, sort_keys=do_sort,
341 indent=indent, separators=separators))
342 else:
343 print(msg)
345 class Timeout:
346 def __init__(self, seconds, errmsg = "Timeout"):
347 self.seconds = seconds
348 self.errmsg = errmsg
349 def __enter__(self):
350 signal.signal(signal.SIGALRM, self.timeout)
351 signal.setitimer(signal.ITIMER_REAL, self.seconds)
352 return self
353 def __exit__(self, type, value, traceback):
354 signal.setitimer(signal.ITIMER_REAL, 0)
355 return False
356 def timeout(self, signum, frame):
357 raise Exception(self.errmsg)
360 class FilePath(object):
361 '''An auto-generated filename that cleans itself up.
363 Use this context manager to generate filenames and ensure that the file
364 gets deleted::
366 with TestFilePath('test.img') as img_path:
367 qemu_img('create', img_path, '1G')
368 # migration_sock_path is automatically deleted
370 def __init__(self, name):
371 filename = '{0}-{1}'.format(os.getpid(), name)
372 self.path = os.path.join(test_dir, filename)
374 def __enter__(self):
375 return self.path
377 def __exit__(self, exc_type, exc_val, exc_tb):
378 try:
379 os.remove(self.path)
380 except OSError:
381 pass
382 return False
385 def file_path_remover():
386 for path in reversed(file_path_remover.paths):
387 try:
388 os.remove(path)
389 except OSError:
390 pass
393 def file_path(*names):
394 ''' Another way to get auto-generated filename that cleans itself up.
396 Use is as simple as:
398 img_a, img_b = file_path('a.img', 'b.img')
399 sock = file_path('socket')
402 if not hasattr(file_path_remover, 'paths'):
403 file_path_remover.paths = []
404 atexit.register(file_path_remover)
406 paths = []
407 for name in names:
408 filename = '{0}-{1}'.format(os.getpid(), name)
409 path = os.path.join(test_dir, filename)
410 file_path_remover.paths.append(path)
411 paths.append(path)
413 return paths[0] if len(paths) == 1 else paths
415 def remote_filename(path):
416 if imgproto == 'file':
417 return path
418 elif imgproto == 'ssh':
419 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
420 else:
421 raise Exception("Protocol %s not supported" % (imgproto))
423 class VM(qtest.QEMUQtestMachine):
424 '''A QEMU VM'''
426 def __init__(self, path_suffix=''):
427 name = "qemu%s-%d" % (path_suffix, os.getpid())
428 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
429 test_dir=test_dir,
430 socket_scm_helper=socket_scm_helper)
431 self._num_drives = 0
433 def add_object(self, opts):
434 self._args.append('-object')
435 self._args.append(opts)
436 return self
438 def add_device(self, opts):
439 self._args.append('-device')
440 self._args.append(opts)
441 return self
443 def add_drive_raw(self, opts):
444 self._args.append('-drive')
445 self._args.append(opts)
446 return self
448 def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
449 '''Add a virtio-blk drive to the VM'''
450 options = ['if=%s' % interface,
451 'id=drive%d' % self._num_drives]
453 if path is not None:
454 options.append('file=%s' % path)
455 options.append('format=%s' % format)
456 options.append('cache=%s' % cachemode)
458 if opts:
459 options.append(opts)
461 if format == 'luks' and 'key-secret' not in opts:
462 # default luks support
463 if luks_default_secret_object not in self._args:
464 self.add_object(luks_default_secret_object)
466 options.append(luks_default_key_secret_opt)
468 self._args.append('-drive')
469 self._args.append(','.join(options))
470 self._num_drives += 1
471 return self
473 def add_blockdev(self, opts):
474 self._args.append('-blockdev')
475 if isinstance(opts, str):
476 self._args.append(opts)
477 else:
478 self._args.append(','.join(opts))
479 return self
481 def add_incoming(self, addr):
482 self._args.append('-incoming')
483 self._args.append(addr)
484 return self
486 def pause_drive(self, drive, event=None):
487 '''Pause drive r/w operations'''
488 if not event:
489 self.pause_drive(drive, "read_aio")
490 self.pause_drive(drive, "write_aio")
491 return
492 self.qmp('human-monitor-command',
493 command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
495 def resume_drive(self, drive):
496 self.qmp('human-monitor-command',
497 command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
499 def hmp_qemu_io(self, drive, cmd):
500 '''Write to a given drive using an HMP command'''
501 return self.qmp('human-monitor-command',
502 command_line='qemu-io %s "%s"' % (drive, cmd))
504 def flatten_qmp_object(self, obj, output=None, basestr=''):
505 if output is None:
506 output = dict()
507 if isinstance(obj, list):
508 for i in range(len(obj)):
509 self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
510 elif isinstance(obj, dict):
511 for key in obj:
512 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
513 else:
514 output[basestr[:-1]] = obj # Strip trailing '.'
515 return output
517 def qmp_to_opts(self, obj):
518 obj = self.flatten_qmp_object(obj)
519 output_list = list()
520 for key in obj:
521 output_list += [key + '=' + obj[key]]
522 return ','.join(output_list)
524 def get_qmp_events_filtered(self, wait=True):
525 result = []
526 for ev in self.get_qmp_events(wait=wait):
527 result.append(filter_qmp_event(ev))
528 return result
530 def qmp_log(self, cmd, filters=[], indent=None, **kwargs):
531 full_cmd = OrderedDict((
532 ("execute", cmd),
533 ("arguments", ordered_qmp(kwargs))
535 log(full_cmd, filters, indent=indent)
536 result = self.qmp(cmd, **kwargs)
537 log(result, filters, indent=indent)
538 return result
540 # Returns None on success, and an error string on failure
541 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
542 pre_finalize=None):
543 error = None
544 while True:
545 for ev in self.get_qmp_events_filtered(wait=True):
546 if ev['event'] == 'JOB_STATUS_CHANGE':
547 status = ev['data']['status']
548 if status == 'aborting':
549 result = self.qmp('query-jobs')
550 for j in result['return']:
551 if j['id'] == job:
552 error = j['error']
553 log('Job failed: %s' % (j['error']))
554 elif status == 'pending' and not auto_finalize:
555 if pre_finalize:
556 pre_finalize()
557 self.qmp_log('job-finalize', id=job)
558 elif status == 'concluded' and not auto_dismiss:
559 self.qmp_log('job-dismiss', id=job)
560 elif status == 'null':
561 return error
562 else:
563 log(ev)
565 def node_info(self, node_name):
566 nodes = self.qmp('query-named-block-nodes')
567 for x in nodes['return']:
568 if x['node-name'] == node_name:
569 return x
570 return None
573 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
575 class QMPTestCase(unittest.TestCase):
576 '''Abstract base class for QMP test cases'''
578 def dictpath(self, d, path):
579 '''Traverse a path in a nested dict'''
580 for component in path.split('/'):
581 m = index_re.match(component)
582 if m:
583 component, idx = m.groups()
584 idx = int(idx)
586 if not isinstance(d, dict) or component not in d:
587 self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
588 d = d[component]
590 if m:
591 if not isinstance(d, list):
592 self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
593 try:
594 d = d[idx]
595 except IndexError:
596 self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
597 return d
599 def assert_qmp_absent(self, d, path):
600 try:
601 result = self.dictpath(d, path)
602 except AssertionError:
603 return
604 self.fail('path "%s" has value "%s"' % (path, str(result)))
606 def assert_qmp(self, d, path, value):
607 '''Assert that the value for a specific path in a QMP dict
608 matches. When given a list of values, assert that any of
609 them matches.'''
611 result = self.dictpath(d, path)
613 # [] makes no sense as a list of valid values, so treat it as
614 # an actual single value.
615 if isinstance(value, list) and value != []:
616 for v in value:
617 if result == v:
618 return
619 self.fail('no match for "%s" in %s' % (str(result), str(value)))
620 else:
621 self.assertEqual(result, value,
622 'values not equal "%s" and "%s"'
623 % (str(result), str(value)))
625 def assert_no_active_block_jobs(self):
626 result = self.vm.qmp('query-block-jobs')
627 self.assert_qmp(result, 'return', [])
629 def assert_has_block_node(self, node_name=None, file_name=None):
630 """Issue a query-named-block-nodes and assert node_name and/or
631 file_name is present in the result"""
632 def check_equal_or_none(a, b):
633 return a == None or b == None or a == b
634 assert node_name or file_name
635 result = self.vm.qmp('query-named-block-nodes')
636 for x in result["return"]:
637 if check_equal_or_none(x.get("node-name"), node_name) and \
638 check_equal_or_none(x.get("file"), file_name):
639 return
640 self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
641 (node_name, file_name, result))
643 def assert_json_filename_equal(self, json_filename, reference):
644 '''Asserts that the given filename is a json: filename and that its
645 content is equal to the given reference object'''
646 self.assertEqual(json_filename[:5], 'json:')
647 self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
648 self.vm.flatten_qmp_object(reference))
650 def cancel_and_wait(self, drive='drive0', force=False, resume=False):
651 '''Cancel a block job and wait for it to finish, returning the event'''
652 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
653 self.assert_qmp(result, 'return', {})
655 if resume:
656 self.vm.resume_drive(drive)
658 cancelled = False
659 result = None
660 while not cancelled:
661 for event in self.vm.get_qmp_events(wait=True):
662 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
663 event['event'] == 'BLOCK_JOB_CANCELLED':
664 self.assert_qmp(event, 'data/device', drive)
665 result = event
666 cancelled = True
667 elif event['event'] == 'JOB_STATUS_CHANGE':
668 self.assert_qmp(event, 'data/id', drive)
671 self.assert_no_active_block_jobs()
672 return result
674 def wait_until_completed(self, drive='drive0', check_offset=True):
675 '''Wait for a block job to finish, returning the event'''
676 while True:
677 for event in self.vm.get_qmp_events(wait=True):
678 if event['event'] == 'BLOCK_JOB_COMPLETED':
679 self.assert_qmp(event, 'data/device', drive)
680 self.assert_qmp_absent(event, 'data/error')
681 if check_offset:
682 self.assert_qmp(event, 'data/offset', event['data']['len'])
683 self.assert_no_active_block_jobs()
684 return event
685 elif event['event'] == 'JOB_STATUS_CHANGE':
686 self.assert_qmp(event, 'data/id', drive)
688 def wait_ready(self, drive='drive0'):
689 '''Wait until a block job BLOCK_JOB_READY event'''
690 f = {'data': {'type': 'mirror', 'device': drive } }
691 event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
693 def wait_ready_and_cancel(self, drive='drive0'):
694 self.wait_ready(drive=drive)
695 event = self.cancel_and_wait(drive=drive)
696 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
697 self.assert_qmp(event, 'data/type', 'mirror')
698 self.assert_qmp(event, 'data/offset', event['data']['len'])
700 def complete_and_wait(self, drive='drive0', wait_ready=True):
701 '''Complete a block job and wait for it to finish'''
702 if wait_ready:
703 self.wait_ready(drive=drive)
705 result = self.vm.qmp('block-job-complete', device=drive)
706 self.assert_qmp(result, 'return', {})
708 event = self.wait_until_completed(drive=drive)
709 self.assert_qmp(event, 'data/type', 'mirror')
711 def pause_wait(self, job_id='job0'):
712 with Timeout(1, "Timeout waiting for job to pause"):
713 while True:
714 result = self.vm.qmp('query-block-jobs')
715 found = False
716 for job in result['return']:
717 if job['device'] == job_id:
718 found = True
719 if job['paused'] == True and job['busy'] == False:
720 return job
721 break
722 assert found
724 def pause_job(self, job_id='job0', wait=True):
725 result = self.vm.qmp('block-job-pause', device=job_id)
726 self.assert_qmp(result, 'return', {})
727 if wait:
728 return self.pause_wait(job_id)
729 return result
732 def notrun(reason):
733 '''Skip this test suite'''
734 # Each test in qemu-iotests has a number ("seq")
735 seq = os.path.basename(sys.argv[0])
737 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
738 print('%s not run: %s' % (seq, reason))
739 sys.exit(0)
741 def case_notrun(reason):
742 '''Skip this test case'''
743 # Each test in qemu-iotests has a number ("seq")
744 seq = os.path.basename(sys.argv[0])
746 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
747 ' [case not run] ' + reason + '\n')
749 def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
750 assert not (supported_fmts and unsupported_fmts)
752 if 'generic' in supported_fmts and \
753 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
754 # similar to
755 # _supported_fmt generic
756 # for bash tests
757 return
759 not_sup = supported_fmts and (imgfmt not in supported_fmts)
760 if not_sup or (imgfmt in unsupported_fmts):
761 notrun('not suitable for this image format: %s' % imgfmt)
763 def verify_protocol(supported=[], unsupported=[]):
764 assert not (supported and unsupported)
766 if 'generic' in supported:
767 return
769 not_sup = supported and (imgproto not in supported)
770 if not_sup or (imgproto in unsupported):
771 notrun('not suitable for this protocol: %s' % imgproto)
773 def verify_platform(supported_oses=['linux']):
774 if True not in [sys.platform.startswith(x) for x in supported_oses]:
775 notrun('not suitable for this OS: %s' % sys.platform)
777 def verify_cache_mode(supported_cache_modes=[]):
778 if supported_cache_modes and (cachemode not in supported_cache_modes):
779 notrun('not suitable for this cache mode: %s' % cachemode)
781 def supports_quorum():
782 return 'quorum' in qemu_img_pipe('--help')
784 def verify_quorum():
785 '''Skip test suite if quorum support is not available'''
786 if not supports_quorum():
787 notrun('quorum support missing')
789 def qemu_pipe(*args):
790 '''Run qemu with an option to print something and exit (e.g. a help option),
791 and return its output'''
792 args = [qemu_prog] + qemu_opts + list(args)
793 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
794 stderr=subprocess.STDOUT,
795 universal_newlines=True)
796 exitcode = subp.wait()
797 if exitcode < 0:
798 sys.stderr.write('qemu received signal %i: %s\n' % (-exitcode,
799 ' '.join(args)))
800 return subp.communicate()[0]
802 def supported_formats(read_only=False):
803 '''Set 'read_only' to True to check ro-whitelist
804 Otherwise, rw-whitelist is checked'''
805 format_message = qemu_pipe("-drive", "format=help")
806 line = 1 if read_only else 0
807 return format_message.splitlines()[line].split(":")[1].split()
809 def skip_if_unsupported(required_formats=[], read_only=False):
810 '''Skip Test Decorator
811 Runs the test if all the required formats are whitelisted'''
812 def skip_test_decorator(func):
813 def func_wrapper(*args, **kwargs):
814 usf_list = list(set(required_formats) -
815 set(supported_formats(read_only)))
816 if usf_list:
817 case_notrun('{}: formats {} are not whitelisted'.format(
818 args[0], usf_list))
819 else:
820 return func(*args, **kwargs)
821 return func_wrapper
822 return skip_test_decorator
824 def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[],
825 unsupported_fmts=[]):
826 '''Run tests'''
828 global debug
830 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
831 # indicate that we're not being run via "check". There may be
832 # other things set up by "check" that individual test cases rely
833 # on.
834 if test_dir is None or qemu_default_machine is None:
835 sys.stderr.write('Please run this test via the "check" script\n')
836 sys.exit(os.EX_USAGE)
838 debug = '-d' in sys.argv
839 verbosity = 1
840 verify_image_format(supported_fmts, unsupported_fmts)
841 verify_platform(supported_oses)
842 verify_cache_mode(supported_cache_modes)
844 if debug:
845 output = sys.stdout
846 verbosity = 2
847 sys.argv.remove('-d')
848 else:
849 # We need to filter out the time taken from the output so that
850 # qemu-iotest can reliably diff the results against master output.
851 if sys.version_info.major >= 3:
852 output = io.StringIO()
853 else:
854 # io.StringIO is for unicode strings, which is not what
855 # 2.x's test runner emits.
856 output = io.BytesIO()
858 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
860 class MyTestRunner(unittest.TextTestRunner):
861 def __init__(self, stream=output, descriptions=True, verbosity=verbosity):
862 unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
864 # unittest.main() will use sys.exit() so expect a SystemExit exception
865 try:
866 unittest.main(testRunner=MyTestRunner)
867 finally:
868 if not debug:
869 sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', output.getvalue()))