iotests.py: Add is_str()
[qemu/ar7.git] / tests / qemu-iotests / iotests.py
blob52fc77563c73eb4e2cf15685321cb3d38d6b36e1
1 from __future__ import print_function
2 # Common utilities and Python wrappers for qemu-iotests
4 # Copyright (C) 2012 IBM Corp.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 import errno
21 import os
22 import re
23 import subprocess
24 import string
25 import unittest
26 import sys
27 import struct
28 import json
29 import signal
30 import logging
31 import atexit
32 import io
33 from collections import OrderedDict
35 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
36 import qtest
39 # This will not work if arguments contain spaces but is necessary if we
40 # want to support the override options that ./check supports.
41 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
42 if os.environ.get('QEMU_IMG_OPTIONS'):
43 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
45 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
46 if os.environ.get('QEMU_IO_OPTIONS'):
47 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
49 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
50 if os.environ.get('QEMU_NBD_OPTIONS'):
51 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
53 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
54 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
56 imgfmt = os.environ.get('IMGFMT', 'raw')
57 imgproto = os.environ.get('IMGPROTO', 'file')
58 test_dir = os.environ.get('TEST_DIR')
59 output_dir = os.environ.get('OUTPUT_DIR', '.')
60 cachemode = os.environ.get('CACHEMODE')
61 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
63 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
64 debug = False
66 luks_default_secret_object = 'secret,id=keysec0,data=' + \
67 os.environ.get('IMGKEYSECRET', '')
68 luks_default_key_secret_opt = 'key-secret=keysec0'
71 def qemu_img(*args):
72 '''Run qemu-img and return the exit code'''
73 devnull = open('/dev/null', 'r+')
74 exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
75 if exitcode < 0:
76 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
77 return exitcode
79 def ordered_qmp(qmsg):
80 # Dictionaries are not ordered prior to 3.6, therefore:
81 if isinstance(qmsg, list):
82 return [ordered_qmp(atom) for atom in qmsg]
83 if isinstance(qmsg, dict):
84 od = OrderedDict()
85 for k, v in sorted(qmsg.items()):
86 od[k] = ordered_qmp(v)
87 return od
88 return qmsg
90 def qemu_img_create(*args):
91 args = list(args)
93 # default luks support
94 if '-f' in args and args[args.index('-f') + 1] == 'luks':
95 if '-o' in args:
96 i = args.index('-o')
97 if 'key-secret' not in args[i + 1]:
98 args[i + 1].append(luks_default_key_secret_opt)
99 args.insert(i + 2, '--object')
100 args.insert(i + 3, luks_default_secret_object)
101 else:
102 args = ['-o', luks_default_key_secret_opt,
103 '--object', luks_default_secret_object] + args
105 args.insert(0, 'create')
107 return qemu_img(*args)
109 def qemu_img_verbose(*args):
110 '''Run qemu-img without suppressing its output and return the exit code'''
111 exitcode = subprocess.call(qemu_img_args + list(args))
112 if exitcode < 0:
113 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
114 return exitcode
116 def qemu_img_pipe(*args):
117 '''Run qemu-img and return its output'''
118 subp = subprocess.Popen(qemu_img_args + list(args),
119 stdout=subprocess.PIPE,
120 stderr=subprocess.STDOUT,
121 universal_newlines=True)
122 exitcode = subp.wait()
123 if exitcode < 0:
124 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
125 return subp.communicate()[0]
127 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]):
128 args = [ 'info' ]
129 if imgopts:
130 args.append('--image-opts')
131 else:
132 args += [ '-f', imgfmt ]
133 args += extra_args
134 args.append(filename)
136 output = qemu_img_pipe(*args)
137 if not filter_path:
138 filter_path = filename
139 log(filter_img_info(output, filter_path))
141 def qemu_io(*args):
142 '''Run qemu-io and return the stdout data'''
143 args = qemu_io_args + list(args)
144 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
145 stderr=subprocess.STDOUT,
146 universal_newlines=True)
147 exitcode = subp.wait()
148 if exitcode < 0:
149 sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
150 return subp.communicate()[0]
152 def qemu_io_silent(*args):
153 '''Run qemu-io and return the exit code, suppressing stdout'''
154 args = qemu_io_args + list(args)
155 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
156 if exitcode < 0:
157 sys.stderr.write('qemu-io received signal %i: %s\n' %
158 (-exitcode, ' '.join(args)))
159 return exitcode
162 class QemuIoInteractive:
163 def __init__(self, *args):
164 self.args = qemu_io_args + list(args)
165 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
166 stdout=subprocess.PIPE,
167 stderr=subprocess.STDOUT,
168 universal_newlines=True)
169 assert self._p.stdout.read(9) == 'qemu-io> '
171 def close(self):
172 self._p.communicate('q\n')
174 def _read_output(self):
175 pattern = 'qemu-io> '
176 n = len(pattern)
177 pos = 0
178 s = []
179 while pos != n:
180 c = self._p.stdout.read(1)
181 # check unexpected EOF
182 assert c != ''
183 s.append(c)
184 if c == pattern[pos]:
185 pos += 1
186 else:
187 pos = 0
189 return ''.join(s[:-n])
191 def cmd(self, cmd):
192 # quit command is in close(), '\n' is added automatically
193 assert '\n' not in cmd
194 cmd = cmd.strip()
195 assert cmd != 'q' and cmd != 'quit'
196 self._p.stdin.write(cmd + '\n')
197 self._p.stdin.flush()
198 return self._read_output()
201 def qemu_nbd(*args):
202 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
203 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
205 def qemu_nbd_pipe(*args):
206 '''Run qemu-nbd in daemon mode and return both the parent's exit code
207 and its output'''
208 subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
209 stdout=subprocess.PIPE,
210 stderr=subprocess.STDOUT,
211 universal_newlines=True)
212 exitcode = subp.wait()
213 if exitcode < 0:
214 sys.stderr.write('qemu-nbd received signal %i: %s\n' %
215 (-exitcode,
216 ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
217 return exitcode, subp.communicate()[0]
219 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
220 '''Return True if two image files are identical'''
221 return qemu_img('compare', '-f', fmt1,
222 '-F', fmt2, img1, img2) == 0
224 def create_image(name, size):
225 '''Create a fully-allocated raw image with sector markers'''
226 file = open(name, 'wb')
227 i = 0
228 while i < size:
229 sector = struct.pack('>l504xl', i // 512, i // 512)
230 file.write(sector)
231 i = i + 512
232 file.close()
234 def image_size(img):
235 '''Return image's virtual size'''
236 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
237 return json.loads(r)['virtual-size']
239 def is_str(val):
240 if sys.version_info.major >= 3:
241 return isinstance(val, str)
242 else:
243 return isinstance(val, str) or isinstance(val, unicode)
245 test_dir_re = re.compile(r"%s" % test_dir)
246 def filter_test_dir(msg):
247 return test_dir_re.sub("TEST_DIR", msg)
249 win32_re = re.compile(r"\r")
250 def filter_win32(msg):
251 return win32_re.sub("", msg)
253 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
254 def filter_qemu_io(msg):
255 msg = filter_win32(msg)
256 return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
258 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
259 def filter_chown(msg):
260 return chown_re.sub("chown UID:GID", msg)
262 def filter_qmp_event(event):
263 '''Filter a QMP event dict'''
264 event = dict(event)
265 if 'timestamp' in event:
266 event['timestamp']['seconds'] = 'SECS'
267 event['timestamp']['microseconds'] = 'USECS'
268 return event
270 def filter_qmp(qmsg, filter_fn):
271 '''Given a string filter, filter a QMP object's values.
272 filter_fn takes a (key, value) pair.'''
273 # Iterate through either lists or dicts;
274 if isinstance(qmsg, list):
275 items = enumerate(qmsg)
276 else:
277 items = qmsg.items()
279 for k, v in items:
280 if isinstance(v, list) or isinstance(v, dict):
281 qmsg[k] = filter_qmp(v, filter_fn)
282 else:
283 qmsg[k] = filter_fn(k, v)
284 return qmsg
286 def filter_testfiles(msg):
287 prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
288 return msg.replace(prefix, 'TEST_DIR/PID-')
290 def filter_qmp_testfiles(qmsg):
291 def _filter(key, value):
292 if key == 'filename' or key == 'backing-file':
293 return filter_testfiles(value)
294 return value
295 return filter_qmp(qmsg, _filter)
297 def filter_generated_node_ids(msg):
298 return re.sub("#block[0-9]+", "NODE_NAME", msg)
300 def filter_img_info(output, filename):
301 lines = []
302 for line in output.split('\n'):
303 if 'disk size' in line or 'actual-size' in line:
304 continue
305 line = line.replace(filename, 'TEST_IMG') \
306 .replace(imgfmt, 'IMGFMT')
307 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
308 line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
309 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
310 lines.append(line)
311 return '\n'.join(lines)
313 def filter_imgfmt(msg):
314 return msg.replace(imgfmt, 'IMGFMT')
316 def filter_qmp_imgfmt(qmsg):
317 def _filter(key, value):
318 if is_str(value):
319 return filter_imgfmt(value)
320 return value
321 return filter_qmp(qmsg, _filter)
323 def log(msg, filters=[], indent=None):
324 '''Logs either a string message or a JSON serializable message (like QMP).
325 If indent is provided, JSON serializable messages are pretty-printed.'''
326 for flt in filters:
327 msg = flt(msg)
328 if isinstance(msg, dict) or isinstance(msg, list):
329 # Python < 3.4 needs to know not to add whitespace when pretty-printing:
330 separators = (', ', ': ') if indent is None else (',', ': ')
331 # Don't sort if it's already sorted
332 do_sort = not isinstance(msg, OrderedDict)
333 print(json.dumps(msg, sort_keys=do_sort,
334 indent=indent, separators=separators))
335 else:
336 print(msg)
338 class Timeout:
339 def __init__(self, seconds, errmsg = "Timeout"):
340 self.seconds = seconds
341 self.errmsg = errmsg
342 def __enter__(self):
343 signal.signal(signal.SIGALRM, self.timeout)
344 signal.setitimer(signal.ITIMER_REAL, self.seconds)
345 return self
346 def __exit__(self, type, value, traceback):
347 signal.setitimer(signal.ITIMER_REAL, 0)
348 return False
349 def timeout(self, signum, frame):
350 raise Exception(self.errmsg)
353 class FilePath(object):
354 '''An auto-generated filename that cleans itself up.
356 Use this context manager to generate filenames and ensure that the file
357 gets deleted::
359 with TestFilePath('test.img') as img_path:
360 qemu_img('create', img_path, '1G')
361 # migration_sock_path is automatically deleted
363 def __init__(self, name):
364 filename = '{0}-{1}'.format(os.getpid(), name)
365 self.path = os.path.join(test_dir, filename)
367 def __enter__(self):
368 return self.path
370 def __exit__(self, exc_type, exc_val, exc_tb):
371 try:
372 os.remove(self.path)
373 except OSError:
374 pass
375 return False
378 def file_path_remover():
379 for path in reversed(file_path_remover.paths):
380 try:
381 os.remove(path)
382 except OSError:
383 pass
386 def file_path(*names):
387 ''' Another way to get auto-generated filename that cleans itself up.
389 Use is as simple as:
391 img_a, img_b = file_path('a.img', 'b.img')
392 sock = file_path('socket')
395 if not hasattr(file_path_remover, 'paths'):
396 file_path_remover.paths = []
397 atexit.register(file_path_remover)
399 paths = []
400 for name in names:
401 filename = '{0}-{1}'.format(os.getpid(), name)
402 path = os.path.join(test_dir, filename)
403 file_path_remover.paths.append(path)
404 paths.append(path)
406 return paths[0] if len(paths) == 1 else paths
408 def remote_filename(path):
409 if imgproto == 'file':
410 return path
411 elif imgproto == 'ssh':
412 return "ssh://127.0.0.1%s" % (path)
413 else:
414 raise Exception("Protocol %s not supported" % (imgproto))
416 class VM(qtest.QEMUQtestMachine):
417 '''A QEMU VM'''
419 def __init__(self, path_suffix=''):
420 name = "qemu%s-%d" % (path_suffix, os.getpid())
421 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
422 test_dir=test_dir,
423 socket_scm_helper=socket_scm_helper)
424 self._num_drives = 0
426 def add_object(self, opts):
427 self._args.append('-object')
428 self._args.append(opts)
429 return self
431 def add_device(self, opts):
432 self._args.append('-device')
433 self._args.append(opts)
434 return self
436 def add_drive_raw(self, opts):
437 self._args.append('-drive')
438 self._args.append(opts)
439 return self
441 def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
442 '''Add a virtio-blk drive to the VM'''
443 options = ['if=%s' % interface,
444 'id=drive%d' % self._num_drives]
446 if path is not None:
447 options.append('file=%s' % path)
448 options.append('format=%s' % format)
449 options.append('cache=%s' % cachemode)
451 if opts:
452 options.append(opts)
454 if format == 'luks' and 'key-secret' not in opts:
455 # default luks support
456 if luks_default_secret_object not in self._args:
457 self.add_object(luks_default_secret_object)
459 options.append(luks_default_key_secret_opt)
461 self._args.append('-drive')
462 self._args.append(','.join(options))
463 self._num_drives += 1
464 return self
466 def add_blockdev(self, opts):
467 self._args.append('-blockdev')
468 if isinstance(opts, str):
469 self._args.append(opts)
470 else:
471 self._args.append(','.join(opts))
472 return self
474 def add_incoming(self, addr):
475 self._args.append('-incoming')
476 self._args.append(addr)
477 return self
479 def pause_drive(self, drive, event=None):
480 '''Pause drive r/w operations'''
481 if not event:
482 self.pause_drive(drive, "read_aio")
483 self.pause_drive(drive, "write_aio")
484 return
485 self.qmp('human-monitor-command',
486 command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
488 def resume_drive(self, drive):
489 self.qmp('human-monitor-command',
490 command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
492 def hmp_qemu_io(self, drive, cmd):
493 '''Write to a given drive using an HMP command'''
494 return self.qmp('human-monitor-command',
495 command_line='qemu-io %s "%s"' % (drive, cmd))
497 def flatten_qmp_object(self, obj, output=None, basestr=''):
498 if output is None:
499 output = dict()
500 if isinstance(obj, list):
501 for i in range(len(obj)):
502 self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
503 elif isinstance(obj, dict):
504 for key in obj:
505 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
506 else:
507 output[basestr[:-1]] = obj # Strip trailing '.'
508 return output
510 def qmp_to_opts(self, obj):
511 obj = self.flatten_qmp_object(obj)
512 output_list = list()
513 for key in obj:
514 output_list += [key + '=' + obj[key]]
515 return ','.join(output_list)
517 def get_qmp_events_filtered(self, wait=True):
518 result = []
519 for ev in self.get_qmp_events(wait=wait):
520 result.append(filter_qmp_event(ev))
521 return result
523 def qmp_log(self, cmd, filters=[], indent=None, **kwargs):
524 full_cmd = OrderedDict((
525 ("execute", cmd),
526 ("arguments", ordered_qmp(kwargs))
528 log(full_cmd, filters, indent=indent)
529 result = self.qmp(cmd, **kwargs)
530 log(result, filters, indent=indent)
531 return result
533 def run_job(self, job, auto_finalize=True, auto_dismiss=False):
534 while True:
535 for ev in self.get_qmp_events_filtered(wait=True):
536 if ev['event'] == 'JOB_STATUS_CHANGE':
537 status = ev['data']['status']
538 if status == 'aborting':
539 result = self.qmp('query-jobs')
540 for j in result['return']:
541 if j['id'] == job:
542 log('Job failed: %s' % (j['error']))
543 elif status == 'pending' and not auto_finalize:
544 self.qmp_log('job-finalize', id=job)
545 elif status == 'concluded' and not auto_dismiss:
546 self.qmp_log('job-dismiss', id=job)
547 elif status == 'null':
548 return
549 else:
550 iotests.log(ev)
552 def node_info(self, node_name):
553 nodes = self.qmp('query-named-block-nodes')
554 for x in nodes['return']:
555 if x['node-name'] == node_name:
556 return x
557 return None
560 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
562 class QMPTestCase(unittest.TestCase):
563 '''Abstract base class for QMP test cases'''
565 def dictpath(self, d, path):
566 '''Traverse a path in a nested dict'''
567 for component in path.split('/'):
568 m = index_re.match(component)
569 if m:
570 component, idx = m.groups()
571 idx = int(idx)
573 if not isinstance(d, dict) or component not in d:
574 self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
575 d = d[component]
577 if m:
578 if not isinstance(d, list):
579 self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
580 try:
581 d = d[idx]
582 except IndexError:
583 self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
584 return d
586 def assert_qmp_absent(self, d, path):
587 try:
588 result = self.dictpath(d, path)
589 except AssertionError:
590 return
591 self.fail('path "%s" has value "%s"' % (path, str(result)))
593 def assert_qmp(self, d, path, value):
594 '''Assert that the value for a specific path in a QMP dict matches'''
595 result = self.dictpath(d, path)
596 self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value)))
598 def assert_no_active_block_jobs(self):
599 result = self.vm.qmp('query-block-jobs')
600 self.assert_qmp(result, 'return', [])
602 def assert_has_block_node(self, node_name=None, file_name=None):
603 """Issue a query-named-block-nodes and assert node_name and/or
604 file_name is present in the result"""
605 def check_equal_or_none(a, b):
606 return a == None or b == None or a == b
607 assert node_name or file_name
608 result = self.vm.qmp('query-named-block-nodes')
609 for x in result["return"]:
610 if check_equal_or_none(x.get("node-name"), node_name) and \
611 check_equal_or_none(x.get("file"), file_name):
612 return
613 self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
614 (node_name, file_name, result))
616 def assert_json_filename_equal(self, json_filename, reference):
617 '''Asserts that the given filename is a json: filename and that its
618 content is equal to the given reference object'''
619 self.assertEqual(json_filename[:5], 'json:')
620 self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
621 self.vm.flatten_qmp_object(reference))
623 def cancel_and_wait(self, drive='drive0', force=False, resume=False):
624 '''Cancel a block job and wait for it to finish, returning the event'''
625 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
626 self.assert_qmp(result, 'return', {})
628 if resume:
629 self.vm.resume_drive(drive)
631 cancelled = False
632 result = None
633 while not cancelled:
634 for event in self.vm.get_qmp_events(wait=True):
635 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
636 event['event'] == 'BLOCK_JOB_CANCELLED':
637 self.assert_qmp(event, 'data/device', drive)
638 result = event
639 cancelled = True
640 elif event['event'] == 'JOB_STATUS_CHANGE':
641 self.assert_qmp(event, 'data/id', drive)
644 self.assert_no_active_block_jobs()
645 return result
647 def wait_until_completed(self, drive='drive0', check_offset=True):
648 '''Wait for a block job to finish, returning the event'''
649 while True:
650 for event in self.vm.get_qmp_events(wait=True):
651 if event['event'] == 'BLOCK_JOB_COMPLETED':
652 self.assert_qmp(event, 'data/device', drive)
653 self.assert_qmp_absent(event, 'data/error')
654 if check_offset:
655 self.assert_qmp(event, 'data/offset', event['data']['len'])
656 self.assert_no_active_block_jobs()
657 return event
658 elif event['event'] == 'JOB_STATUS_CHANGE':
659 self.assert_qmp(event, 'data/id', drive)
661 def wait_ready(self, drive='drive0'):
662 '''Wait until a block job BLOCK_JOB_READY event'''
663 f = {'data': {'type': 'mirror', 'device': drive } }
664 event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
666 def wait_ready_and_cancel(self, drive='drive0'):
667 self.wait_ready(drive=drive)
668 event = self.cancel_and_wait(drive=drive)
669 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
670 self.assert_qmp(event, 'data/type', 'mirror')
671 self.assert_qmp(event, 'data/offset', event['data']['len'])
673 def complete_and_wait(self, drive='drive0', wait_ready=True):
674 '''Complete a block job and wait for it to finish'''
675 if wait_ready:
676 self.wait_ready(drive=drive)
678 result = self.vm.qmp('block-job-complete', device=drive)
679 self.assert_qmp(result, 'return', {})
681 event = self.wait_until_completed(drive=drive)
682 self.assert_qmp(event, 'data/type', 'mirror')
684 def pause_wait(self, job_id='job0'):
685 with Timeout(1, "Timeout waiting for job to pause"):
686 while True:
687 result = self.vm.qmp('query-block-jobs')
688 found = False
689 for job in result['return']:
690 if job['device'] == job_id:
691 found = True
692 if job['paused'] == True and job['busy'] == False:
693 return job
694 break
695 assert found
697 def pause_job(self, job_id='job0', wait=True):
698 result = self.vm.qmp('block-job-pause', device=job_id)
699 self.assert_qmp(result, 'return', {})
700 if wait:
701 return self.pause_wait(job_id)
702 return result
705 def notrun(reason):
706 '''Skip this test suite'''
707 # Each test in qemu-iotests has a number ("seq")
708 seq = os.path.basename(sys.argv[0])
710 open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n')
711 print('%s not run: %s' % (seq, reason))
712 sys.exit(0)
714 def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
715 assert not (supported_fmts and unsupported_fmts)
717 if 'generic' in supported_fmts and \
718 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
719 # similar to
720 # _supported_fmt generic
721 # for bash tests
722 return
724 not_sup = supported_fmts and (imgfmt not in supported_fmts)
725 if not_sup or (imgfmt in unsupported_fmts):
726 notrun('not suitable for this image format: %s' % imgfmt)
728 def verify_protocol(supported=[], unsupported=[]):
729 assert not (supported and unsupported)
731 if 'generic' in supported:
732 return
734 not_sup = supported and (imgproto not in supported)
735 if not_sup or (imgproto in unsupported):
736 notrun('not suitable for this protocol: %s' % imgproto)
738 def verify_platform(supported_oses=['linux']):
739 if True not in [sys.platform.startswith(x) for x in supported_oses]:
740 notrun('not suitable for this OS: %s' % sys.platform)
742 def verify_cache_mode(supported_cache_modes=[]):
743 if supported_cache_modes and (cachemode not in supported_cache_modes):
744 notrun('not suitable for this cache mode: %s' % cachemode)
746 def supports_quorum():
747 return 'quorum' in qemu_img_pipe('--help')
749 def verify_quorum():
750 '''Skip test suite if quorum support is not available'''
751 if not supports_quorum():
752 notrun('quorum support missing')
754 def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[],
755 unsupported_fmts=[]):
756 '''Run tests'''
758 global debug
760 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
761 # indicate that we're not being run via "check". There may be
762 # other things set up by "check" that individual test cases rely
763 # on.
764 if test_dir is None or qemu_default_machine is None:
765 sys.stderr.write('Please run this test via the "check" script\n')
766 sys.exit(os.EX_USAGE)
768 debug = '-d' in sys.argv
769 verbosity = 1
770 verify_image_format(supported_fmts, unsupported_fmts)
771 verify_platform(supported_oses)
772 verify_cache_mode(supported_cache_modes)
774 if debug:
775 output = sys.stdout
776 verbosity = 2
777 sys.argv.remove('-d')
778 else:
779 # We need to filter out the time taken from the output so that
780 # qemu-iotest can reliably diff the results against master output.
781 if sys.version_info.major >= 3:
782 output = io.StringIO()
783 else:
784 # io.StringIO is for unicode strings, which is not what
785 # 2.x's test runner emits.
786 output = io.BytesIO()
788 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
790 class MyTestRunner(unittest.TextTestRunner):
791 def __init__(self, stream=output, descriptions=True, verbosity=verbosity):
792 unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
794 # unittest.main() will use sys.exit() so expect a SystemExit exception
795 try:
796 unittest.main(testRunner=MyTestRunner)
797 finally:
798 if not debug:
799 sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', output.getvalue()))