iotests.py: Add skip_for_formats() decorator
[qemu.git] / tests / qemu-iotests / iotests.py
blob5ea4c4df8b62595e01889345e0511b7d7c32a780
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import atexit
20 from collections import OrderedDict
21 import faulthandler
22 import io
23 import json
24 import logging
25 import os
26 import re
27 import signal
28 import struct
29 import subprocess
30 import sys
31 from typing import (Any, Callable, Dict, Iterable,
32 List, Optional, Sequence, TypeVar)
33 import unittest
35 # pylint: disable=import-error, wrong-import-position
36 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
37 from qemu import qtest
39 assert sys.version_info >= (3, 6)
41 # Type Aliases
42 QMPResponse = Dict[str, Any]
45 # Use this logger for logging messages directly from the iotests module
46 logger = logging.getLogger('qemu.iotests')
47 logger.addHandler(logging.NullHandler())
49 # Use this logger for messages that ought to be used for diff output.
50 test_logger = logging.getLogger('qemu.iotests.diff_io')
53 faulthandler.enable()
55 # This will not work if arguments contain spaces but is necessary if we
56 # want to support the override options that ./check supports.
57 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
58 if os.environ.get('QEMU_IMG_OPTIONS'):
59 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
61 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
62 if os.environ.get('QEMU_IO_OPTIONS'):
63 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
65 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
66 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
67 qemu_io_args_no_fmt += \
68 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
70 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
71 if os.environ.get('QEMU_NBD_OPTIONS'):
72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
74 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
77 imgfmt = os.environ.get('IMGFMT', 'raw')
78 imgproto = os.environ.get('IMGPROTO', 'file')
79 test_dir = os.environ.get('TEST_DIR')
80 sock_dir = os.environ.get('SOCK_DIR')
81 output_dir = os.environ.get('OUTPUT_DIR', '.')
82 cachemode = os.environ.get('CACHEMODE')
83 aiomode = os.environ.get('AIOMODE')
84 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
86 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
88 luks_default_secret_object = 'secret,id=keysec0,data=' + \
89 os.environ.get('IMGKEYSECRET', '')
90 luks_default_key_secret_opt = 'key-secret=keysec0'
93 def qemu_img(*args):
94 '''Run qemu-img and return the exit code'''
95 devnull = open('/dev/null', 'r+')
96 exitcode = subprocess.call(qemu_img_args + list(args),
97 stdin=devnull, stdout=devnull)
98 if exitcode < 0:
99 sys.stderr.write('qemu-img received signal %i: %s\n'
100 % (-exitcode, ' '.join(qemu_img_args + list(args))))
101 return exitcode
103 def ordered_qmp(qmsg, conv_keys=True):
104 # Dictionaries are not ordered prior to 3.6, therefore:
105 if isinstance(qmsg, list):
106 return [ordered_qmp(atom) for atom in qmsg]
107 if isinstance(qmsg, dict):
108 od = OrderedDict()
109 for k, v in sorted(qmsg.items()):
110 if conv_keys:
111 k = k.replace('_', '-')
112 od[k] = ordered_qmp(v, conv_keys=False)
113 return od
114 return qmsg
116 def qemu_img_create(*args):
117 args = list(args)
119 # default luks support
120 if '-f' in args and args[args.index('-f') + 1] == 'luks':
121 if '-o' in args:
122 i = args.index('-o')
123 if 'key-secret' not in args[i + 1]:
124 args[i + 1].append(luks_default_key_secret_opt)
125 args.insert(i + 2, '--object')
126 args.insert(i + 3, luks_default_secret_object)
127 else:
128 args = ['-o', luks_default_key_secret_opt,
129 '--object', luks_default_secret_object] + args
131 args.insert(0, 'create')
133 return qemu_img(*args)
135 def qemu_img_verbose(*args):
136 '''Run qemu-img without suppressing its output and return the exit code'''
137 exitcode = subprocess.call(qemu_img_args + list(args))
138 if exitcode < 0:
139 sys.stderr.write('qemu-img received signal %i: %s\n'
140 % (-exitcode, ' '.join(qemu_img_args + list(args))))
141 return exitcode
143 def qemu_img_pipe(*args):
144 '''Run qemu-img and return its output'''
145 subp = subprocess.Popen(qemu_img_args + list(args),
146 stdout=subprocess.PIPE,
147 stderr=subprocess.STDOUT,
148 universal_newlines=True)
149 exitcode = subp.wait()
150 if exitcode < 0:
151 sys.stderr.write('qemu-img received signal %i: %s\n'
152 % (-exitcode, ' '.join(qemu_img_args + list(args))))
153 return subp.communicate()[0]
155 def qemu_img_log(*args):
156 result = qemu_img_pipe(*args)
157 log(result, filters=[filter_testfiles])
158 return result
160 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
161 args = ['info']
162 if imgopts:
163 args.append('--image-opts')
164 else:
165 args += ['-f', imgfmt]
166 args += extra_args
167 args.append(filename)
169 output = qemu_img_pipe(*args)
170 if not filter_path:
171 filter_path = filename
172 log(filter_img_info(output, filter_path))
174 def qemu_io(*args):
175 '''Run qemu-io and return the stdout data'''
176 args = qemu_io_args + list(args)
177 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
178 stderr=subprocess.STDOUT,
179 universal_newlines=True)
180 exitcode = subp.wait()
181 if exitcode < 0:
182 sys.stderr.write('qemu-io received signal %i: %s\n'
183 % (-exitcode, ' '.join(args)))
184 return subp.communicate()[0]
186 def qemu_io_log(*args):
187 result = qemu_io(*args)
188 log(result, filters=[filter_testfiles, filter_qemu_io])
189 return result
191 def qemu_io_silent(*args):
192 '''Run qemu-io and return the exit code, suppressing stdout'''
193 args = qemu_io_args + list(args)
194 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
195 if exitcode < 0:
196 sys.stderr.write('qemu-io received signal %i: %s\n' %
197 (-exitcode, ' '.join(args)))
198 return exitcode
200 def qemu_io_silent_check(*args):
201 '''Run qemu-io and return the true if subprocess returned 0'''
202 args = qemu_io_args + list(args)
203 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
204 stderr=subprocess.STDOUT)
205 return exitcode == 0
207 def get_virtio_scsi_device():
208 if qemu_default_machine == 's390-ccw-virtio':
209 return 'virtio-scsi-ccw'
210 return 'virtio-scsi-pci'
212 class QemuIoInteractive:
213 def __init__(self, *args):
214 self.args = qemu_io_args + list(args)
215 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
216 stdout=subprocess.PIPE,
217 stderr=subprocess.STDOUT,
218 universal_newlines=True)
219 assert self._p.stdout.read(9) == 'qemu-io> '
221 def close(self):
222 self._p.communicate('q\n')
224 def _read_output(self):
225 pattern = 'qemu-io> '
226 n = len(pattern)
227 pos = 0
228 s = []
229 while pos != n:
230 c = self._p.stdout.read(1)
231 # check unexpected EOF
232 assert c != ''
233 s.append(c)
234 if c == pattern[pos]:
235 pos += 1
236 else:
237 pos = 0
239 return ''.join(s[:-n])
241 def cmd(self, cmd):
242 # quit command is in close(), '\n' is added automatically
243 assert '\n' not in cmd
244 cmd = cmd.strip()
245 assert cmd not in ('q', 'quit')
246 self._p.stdin.write(cmd + '\n')
247 self._p.stdin.flush()
248 return self._read_output()
251 def qemu_nbd(*args):
252 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
253 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
255 def qemu_nbd_early_pipe(*args):
256 '''Run qemu-nbd in daemon mode and return both the parent's exit code
257 and its output in case of an error'''
258 subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
259 stdout=subprocess.PIPE,
260 stderr=subprocess.STDOUT,
261 universal_newlines=True)
262 exitcode = subp.wait()
263 if exitcode < 0:
264 sys.stderr.write('qemu-nbd received signal %i: %s\n' %
265 (-exitcode,
266 ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
268 return exitcode, subp.communicate()[0] if exitcode else ''
270 def qemu_nbd_popen(*args):
271 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
272 return subprocess.Popen(qemu_nbd_args + ['--persistent'] + list(args))
274 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
275 '''Return True if two image files are identical'''
276 return qemu_img('compare', '-f', fmt1,
277 '-F', fmt2, img1, img2) == 0
279 def create_image(name, size):
280 '''Create a fully-allocated raw image with sector markers'''
281 file = open(name, 'wb')
282 i = 0
283 while i < size:
284 sector = struct.pack('>l504xl', i // 512, i // 512)
285 file.write(sector)
286 i = i + 512
287 file.close()
289 def image_size(img):
290 '''Return image's virtual size'''
291 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
292 return json.loads(r)['virtual-size']
294 def is_str(val):
295 return isinstance(val, str)
297 test_dir_re = re.compile(r"%s" % test_dir)
298 def filter_test_dir(msg):
299 return test_dir_re.sub("TEST_DIR", msg)
301 win32_re = re.compile(r"\r")
302 def filter_win32(msg):
303 return win32_re.sub("", msg)
305 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
306 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
307 r"and [0-9\/.inf]* ops\/sec\)")
308 def filter_qemu_io(msg):
309 msg = filter_win32(msg)
310 return qemu_io_re.sub("X ops; XX:XX:XX.X "
311 "(XXX YYY/sec and XXX ops/sec)", msg)
313 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
314 def filter_chown(msg):
315 return chown_re.sub("chown UID:GID", msg)
317 def filter_qmp_event(event):
318 '''Filter a QMP event dict'''
319 event = dict(event)
320 if 'timestamp' in event:
321 event['timestamp']['seconds'] = 'SECS'
322 event['timestamp']['microseconds'] = 'USECS'
323 return event
325 def filter_qmp(qmsg, filter_fn):
326 '''Given a string filter, filter a QMP object's values.
327 filter_fn takes a (key, value) pair.'''
328 # Iterate through either lists or dicts;
329 if isinstance(qmsg, list):
330 items = enumerate(qmsg)
331 else:
332 items = qmsg.items()
334 for k, v in items:
335 if isinstance(v, (dict, list)):
336 qmsg[k] = filter_qmp(v, filter_fn)
337 else:
338 qmsg[k] = filter_fn(k, v)
339 return qmsg
341 def filter_testfiles(msg):
342 prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
343 return msg.replace(prefix, 'TEST_DIR/PID-')
345 def filter_qmp_testfiles(qmsg):
346 def _filter(_key, value):
347 if is_str(value):
348 return filter_testfiles(value)
349 return value
350 return filter_qmp(qmsg, _filter)
352 def filter_generated_node_ids(msg):
353 return re.sub("#block[0-9]+", "NODE_NAME", msg)
355 def filter_img_info(output, filename):
356 lines = []
357 for line in output.split('\n'):
358 if 'disk size' in line or 'actual-size' in line:
359 continue
360 line = line.replace(filename, 'TEST_IMG')
361 line = filter_testfiles(line)
362 line = line.replace(imgfmt, 'IMGFMT')
363 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
364 line = re.sub('uuid: [-a-f0-9]+',
365 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
366 line)
367 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
368 lines.append(line)
369 return '\n'.join(lines)
371 def filter_imgfmt(msg):
372 return msg.replace(imgfmt, 'IMGFMT')
374 def filter_qmp_imgfmt(qmsg):
375 def _filter(_key, value):
376 if is_str(value):
377 return filter_imgfmt(value)
378 return value
379 return filter_qmp(qmsg, _filter)
382 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
384 def log(msg: Msg,
385 filters: Iterable[Callable[[Msg], Msg]] = (),
386 indent: Optional[int] = None) -> None:
388 Logs either a string message or a JSON serializable message (like QMP).
389 If indent is provided, JSON serializable messages are pretty-printed.
391 for flt in filters:
392 msg = flt(msg)
393 if isinstance(msg, (dict, list)):
394 # Don't sort if it's already sorted
395 do_sort = not isinstance(msg, OrderedDict)
396 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
397 else:
398 test_logger.info(msg)
400 class Timeout:
401 def __init__(self, seconds, errmsg="Timeout"):
402 self.seconds = seconds
403 self.errmsg = errmsg
404 def __enter__(self):
405 signal.signal(signal.SIGALRM, self.timeout)
406 signal.setitimer(signal.ITIMER_REAL, self.seconds)
407 return self
408 def __exit__(self, exc_type, value, traceback):
409 signal.setitimer(signal.ITIMER_REAL, 0)
410 return False
411 def timeout(self, signum, frame):
412 raise Exception(self.errmsg)
414 def file_pattern(name):
415 return "{0}-{1}".format(os.getpid(), name)
417 class FilePaths:
419 FilePaths is an auto-generated filename that cleans itself up.
421 Use this context manager to generate filenames and ensure that the file
422 gets deleted::
424 with FilePaths(['test.img']) as img_path:
425 qemu_img('create', img_path, '1G')
426 # migration_sock_path is automatically deleted
428 def __init__(self, names, base_dir=test_dir):
429 self.paths = []
430 for name in names:
431 self.paths.append(os.path.join(base_dir, file_pattern(name)))
433 def __enter__(self):
434 return self.paths
436 def __exit__(self, exc_type, exc_val, exc_tb):
437 try:
438 for path in self.paths:
439 os.remove(path)
440 except OSError:
441 pass
442 return False
444 class FilePath(FilePaths):
446 FilePath is a specialization of FilePaths that takes a single filename.
448 def __init__(self, name, base_dir=test_dir):
449 super(FilePath, self).__init__([name], base_dir)
451 def __enter__(self):
452 return self.paths[0]
454 def file_path_remover():
455 for path in reversed(file_path_remover.paths):
456 try:
457 os.remove(path)
458 except OSError:
459 pass
462 def file_path(*names, base_dir=test_dir):
463 ''' Another way to get auto-generated filename that cleans itself up.
465 Use is as simple as:
467 img_a, img_b = file_path('a.img', 'b.img')
468 sock = file_path('socket')
471 if not hasattr(file_path_remover, 'paths'):
472 file_path_remover.paths = []
473 atexit.register(file_path_remover)
475 paths = []
476 for name in names:
477 filename = file_pattern(name)
478 path = os.path.join(base_dir, filename)
479 file_path_remover.paths.append(path)
480 paths.append(path)
482 return paths[0] if len(paths) == 1 else paths
484 def remote_filename(path):
485 if imgproto == 'file':
486 return path
487 elif imgproto == 'ssh':
488 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
489 else:
490 raise Exception("Protocol %s not supported" % (imgproto))
492 class VM(qtest.QEMUQtestMachine):
493 '''A QEMU VM'''
495 def __init__(self, path_suffix=''):
496 name = "qemu%s-%d" % (path_suffix, os.getpid())
497 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
498 test_dir=test_dir,
499 socket_scm_helper=socket_scm_helper,
500 sock_dir=sock_dir)
501 self._num_drives = 0
503 def add_object(self, opts):
504 self._args.append('-object')
505 self._args.append(opts)
506 return self
508 def add_device(self, opts):
509 self._args.append('-device')
510 self._args.append(opts)
511 return self
513 def add_drive_raw(self, opts):
514 self._args.append('-drive')
515 self._args.append(opts)
516 return self
518 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
519 '''Add a virtio-blk drive to the VM'''
520 options = ['if=%s' % interface,
521 'id=drive%d' % self._num_drives]
523 if path is not None:
524 options.append('file=%s' % path)
525 options.append('format=%s' % img_format)
526 options.append('cache=%s' % cachemode)
527 options.append('aio=%s' % aiomode)
529 if opts:
530 options.append(opts)
532 if img_format == 'luks' and 'key-secret' not in opts:
533 # default luks support
534 if luks_default_secret_object not in self._args:
535 self.add_object(luks_default_secret_object)
537 options.append(luks_default_key_secret_opt)
539 self._args.append('-drive')
540 self._args.append(','.join(options))
541 self._num_drives += 1
542 return self
544 def add_blockdev(self, opts):
545 self._args.append('-blockdev')
546 if isinstance(opts, str):
547 self._args.append(opts)
548 else:
549 self._args.append(','.join(opts))
550 return self
552 def add_incoming(self, addr):
553 self._args.append('-incoming')
554 self._args.append(addr)
555 return self
557 def hmp(self, command_line: str, use_log: bool = False) -> QMPResponse:
558 cmd = 'human-monitor-command'
559 kwargs = {'command-line': command_line}
560 if use_log:
561 return self.qmp_log(cmd, **kwargs)
562 else:
563 return self.qmp(cmd, **kwargs)
565 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
566 """Pause drive r/w operations"""
567 if not event:
568 self.pause_drive(drive, "read_aio")
569 self.pause_drive(drive, "write_aio")
570 return
571 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
573 def resume_drive(self, drive: str) -> None:
574 """Resume drive r/w operations"""
575 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
577 def hmp_qemu_io(self, drive: str, cmd: str,
578 use_log: bool = False) -> QMPResponse:
579 """Write to a given drive using an HMP command"""
580 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
582 def flatten_qmp_object(self, obj, output=None, basestr=''):
583 if output is None:
584 output = dict()
585 if isinstance(obj, list):
586 for i, item in enumerate(obj):
587 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
588 elif isinstance(obj, dict):
589 for key in obj:
590 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
591 else:
592 output[basestr[:-1]] = obj # Strip trailing '.'
593 return output
595 def qmp_to_opts(self, obj):
596 obj = self.flatten_qmp_object(obj)
597 output_list = list()
598 for key in obj:
599 output_list += [key + '=' + obj[key]]
600 return ','.join(output_list)
602 def get_qmp_events_filtered(self, wait=60.0):
603 result = []
604 for ev in self.get_qmp_events(wait=wait):
605 result.append(filter_qmp_event(ev))
606 return result
608 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
609 full_cmd = OrderedDict((
610 ("execute", cmd),
611 ("arguments", ordered_qmp(kwargs))
613 log(full_cmd, filters, indent=indent)
614 result = self.qmp(cmd, **kwargs)
615 log(result, filters, indent=indent)
616 return result
618 # Returns None on success, and an error string on failure
619 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
620 pre_finalize=None, cancel=False, wait=60.0):
622 run_job moves a job from creation through to dismissal.
624 :param job: String. ID of recently-launched job
625 :param auto_finalize: Bool. True if the job was launched with
626 auto_finalize. Defaults to True.
627 :param auto_dismiss: Bool. True if the job was launched with
628 auto_dismiss=True. Defaults to False.
629 :param pre_finalize: Callback. A callable that takes no arguments to be
630 invoked prior to issuing job-finalize, if any.
631 :param cancel: Bool. When true, cancels the job after the pre_finalize
632 callback.
633 :param wait: Float. Timeout value specifying how long to wait for any
634 event, in seconds. Defaults to 60.0.
636 match_device = {'data': {'device': job}}
637 match_id = {'data': {'id': job}}
638 events = [
639 ('BLOCK_JOB_COMPLETED', match_device),
640 ('BLOCK_JOB_CANCELLED', match_device),
641 ('BLOCK_JOB_ERROR', match_device),
642 ('BLOCK_JOB_READY', match_device),
643 ('BLOCK_JOB_PENDING', match_id),
644 ('JOB_STATUS_CHANGE', match_id)
646 error = None
647 while True:
648 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
649 if ev['event'] != 'JOB_STATUS_CHANGE':
650 log(ev)
651 continue
652 status = ev['data']['status']
653 if status == 'aborting':
654 result = self.qmp('query-jobs')
655 for j in result['return']:
656 if j['id'] == job:
657 error = j['error']
658 log('Job failed: %s' % (j['error']))
659 elif status == 'ready':
660 self.qmp_log('job-complete', id=job)
661 elif status == 'pending' and not auto_finalize:
662 if pre_finalize:
663 pre_finalize()
664 if cancel:
665 self.qmp_log('job-cancel', id=job)
666 else:
667 self.qmp_log('job-finalize', id=job)
668 elif status == 'concluded' and not auto_dismiss:
669 self.qmp_log('job-dismiss', id=job)
670 elif status == 'null':
671 return error
673 # Returns None on success, and an error string on failure
674 def blockdev_create(self, options, job_id='job0', filters=None):
675 if filters is None:
676 filters = [filter_qmp_testfiles]
677 result = self.qmp_log('blockdev-create', filters=filters,
678 job_id=job_id, options=options)
680 if 'return' in result:
681 assert result['return'] == {}
682 job_result = self.run_job(job_id)
683 else:
684 job_result = result['error']
686 log("")
687 return job_result
689 def enable_migration_events(self, name):
690 log('Enabling migration QMP events on %s...' % name)
691 log(self.qmp('migrate-set-capabilities', capabilities=[
693 'capability': 'events',
694 'state': True
698 def wait_migration(self, expect_runstate):
699 while True:
700 event = self.event_wait('MIGRATION')
701 log(event, filters=[filter_qmp_event])
702 if event['data']['status'] == 'completed':
703 break
704 # The event may occur in finish-migrate, so wait for the expected
705 # post-migration runstate
706 while self.qmp('query-status')['return']['status'] != expect_runstate:
707 pass
709 def node_info(self, node_name):
710 nodes = self.qmp('query-named-block-nodes')
711 for x in nodes['return']:
712 if x['node-name'] == node_name:
713 return x
714 return None
716 def query_bitmaps(self):
717 res = self.qmp("query-named-block-nodes")
718 return {device['node-name']: device['dirty-bitmaps']
719 for device in res['return'] if 'dirty-bitmaps' in device}
721 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
723 get a specific bitmap from the object returned by query_bitmaps.
724 :param recording: If specified, filter results by the specified value.
725 :param bitmaps: If specified, use it instead of call query_bitmaps()
727 if bitmaps is None:
728 bitmaps = self.query_bitmaps()
730 for bitmap in bitmaps[node_name]:
731 if bitmap.get('name', '') == bitmap_name:
732 if recording is None or bitmap.get('recording') == recording:
733 return bitmap
734 return None
736 def check_bitmap_status(self, node_name, bitmap_name, fields):
737 ret = self.get_bitmap(node_name, bitmap_name)
739 return fields.items() <= ret.items()
741 def assert_block_path(self, root, path, expected_node, graph=None):
743 Check whether the node under the given path in the block graph
744 is @expected_node.
746 @root is the node name of the node where the @path is rooted.
748 @path is a string that consists of child names separated by
749 slashes. It must begin with a slash.
751 Examples for @root + @path:
752 - root="qcow2-node", path="/backing/file"
753 - root="quorum-node", path="/children.2/file"
755 Hypothetically, @path could be empty, in which case it would
756 point to @root. However, in practice this case is not useful
757 and hence not allowed.
759 @expected_node may be None. (All elements of the path but the
760 leaf must still exist.)
762 @graph may be None or the result of an x-debug-query-block-graph
763 call that has already been performed.
765 if graph is None:
766 graph = self.qmp('x-debug-query-block-graph')['return']
768 iter_path = iter(path.split('/'))
770 # Must start with a /
771 assert next(iter_path) == ''
773 node = next((node for node in graph['nodes'] if node['name'] == root),
774 None)
776 # An empty @path is not allowed, so the root node must be present
777 assert node is not None, 'Root node %s not found' % root
779 for child_name in iter_path:
780 assert node is not None, 'Cannot follow path %s%s' % (root, path)
782 try:
783 node_id = next(edge['child'] for edge in graph['edges']
784 if (edge['parent'] == node['id'] and
785 edge['name'] == child_name))
787 node = next(node for node in graph['nodes']
788 if node['id'] == node_id)
790 except StopIteration:
791 node = None
793 if node is None:
794 assert expected_node is None, \
795 'No node found under %s (but expected %s)' % \
796 (path, expected_node)
797 else:
798 assert node['name'] == expected_node, \
799 'Found node %s under %s (but expected %s)' % \
800 (node['name'], path, expected_node)
802 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
804 class QMPTestCase(unittest.TestCase):
805 '''Abstract base class for QMP test cases'''
807 def __init__(self, *args, **kwargs):
808 super().__init__(*args, **kwargs)
809 # Many users of this class set a VM property we rely on heavily
810 # in the methods below.
811 self.vm = None
813 def dictpath(self, d, path):
814 '''Traverse a path in a nested dict'''
815 for component in path.split('/'):
816 m = index_re.match(component)
817 if m:
818 component, idx = m.groups()
819 idx = int(idx)
821 if not isinstance(d, dict) or component not in d:
822 self.fail(f'failed path traversal for "{path}" in "{d}"')
823 d = d[component]
825 if m:
826 if not isinstance(d, list):
827 self.fail(f'path component "{component}" in "{path}" '
828 f'is not a list in "{d}"')
829 try:
830 d = d[idx]
831 except IndexError:
832 self.fail(f'invalid index "{idx}" in path "{path}" '
833 f'in "{d}"')
834 return d
836 def assert_qmp_absent(self, d, path):
837 try:
838 result = self.dictpath(d, path)
839 except AssertionError:
840 return
841 self.fail('path "%s" has value "%s"' % (path, str(result)))
843 def assert_qmp(self, d, path, value):
844 '''Assert that the value for a specific path in a QMP dict
845 matches. When given a list of values, assert that any of
846 them matches.'''
848 result = self.dictpath(d, path)
850 # [] makes no sense as a list of valid values, so treat it as
851 # an actual single value.
852 if isinstance(value, list) and value != []:
853 for v in value:
854 if result == v:
855 return
856 self.fail('no match for "%s" in %s' % (str(result), str(value)))
857 else:
858 self.assertEqual(result, value,
859 '"%s" is "%s", expected "%s"'
860 % (path, str(result), str(value)))
862 def assert_no_active_block_jobs(self):
863 result = self.vm.qmp('query-block-jobs')
864 self.assert_qmp(result, 'return', [])
866 def assert_has_block_node(self, node_name=None, file_name=None):
867 """Issue a query-named-block-nodes and assert node_name and/or
868 file_name is present in the result"""
869 def check_equal_or_none(a, b):
870 return a is None or b is None or a == b
871 assert node_name or file_name
872 result = self.vm.qmp('query-named-block-nodes')
873 for x in result["return"]:
874 if check_equal_or_none(x.get("node-name"), node_name) and \
875 check_equal_or_none(x.get("file"), file_name):
876 return
877 self.fail("Cannot find %s %s in result:\n%s" %
878 (node_name, file_name, result))
880 def assert_json_filename_equal(self, json_filename, reference):
881 '''Asserts that the given filename is a json: filename and that its
882 content is equal to the given reference object'''
883 self.assertEqual(json_filename[:5], 'json:')
884 self.assertEqual(
885 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
886 self.vm.flatten_qmp_object(reference)
889 def cancel_and_wait(self, drive='drive0', force=False,
890 resume=False, wait=60.0):
891 '''Cancel a block job and wait for it to finish, returning the event'''
892 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
893 self.assert_qmp(result, 'return', {})
895 if resume:
896 self.vm.resume_drive(drive)
898 cancelled = False
899 result = None
900 while not cancelled:
901 for event in self.vm.get_qmp_events(wait=wait):
902 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
903 event['event'] == 'BLOCK_JOB_CANCELLED':
904 self.assert_qmp(event, 'data/device', drive)
905 result = event
906 cancelled = True
907 elif event['event'] == 'JOB_STATUS_CHANGE':
908 self.assert_qmp(event, 'data/id', drive)
911 self.assert_no_active_block_jobs()
912 return result
914 def wait_until_completed(self, drive='drive0', check_offset=True,
915 wait=60.0, error=None):
916 '''Wait for a block job to finish, returning the event'''
917 while True:
918 for event in self.vm.get_qmp_events(wait=wait):
919 if event['event'] == 'BLOCK_JOB_COMPLETED':
920 self.assert_qmp(event, 'data/device', drive)
921 if error is None:
922 self.assert_qmp_absent(event, 'data/error')
923 if check_offset:
924 self.assert_qmp(event, 'data/offset',
925 event['data']['len'])
926 else:
927 self.assert_qmp(event, 'data/error', error)
928 self.assert_no_active_block_jobs()
929 return event
930 if event['event'] == 'JOB_STATUS_CHANGE':
931 self.assert_qmp(event, 'data/id', drive)
933 def wait_ready(self, drive='drive0'):
934 """Wait until a BLOCK_JOB_READY event, and return the event."""
935 f = {'data': {'type': 'mirror', 'device': drive}}
936 return self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
938 def wait_ready_and_cancel(self, drive='drive0'):
939 self.wait_ready(drive=drive)
940 event = self.cancel_and_wait(drive=drive)
941 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
942 self.assert_qmp(event, 'data/type', 'mirror')
943 self.assert_qmp(event, 'data/offset', event['data']['len'])
945 def complete_and_wait(self, drive='drive0', wait_ready=True,
946 completion_error=None):
947 '''Complete a block job and wait for it to finish'''
948 if wait_ready:
949 self.wait_ready(drive=drive)
951 result = self.vm.qmp('block-job-complete', device=drive)
952 self.assert_qmp(result, 'return', {})
954 event = self.wait_until_completed(drive=drive, error=completion_error)
955 self.assert_qmp(event, 'data/type', 'mirror')
957 def pause_wait(self, job_id='job0'):
958 with Timeout(3, "Timeout waiting for job to pause"):
959 while True:
960 result = self.vm.qmp('query-block-jobs')
961 found = False
962 for job in result['return']:
963 if job['device'] == job_id:
964 found = True
965 if job['paused'] and not job['busy']:
966 return job
967 break
968 assert found
970 def pause_job(self, job_id='job0', wait=True):
971 result = self.vm.qmp('block-job-pause', device=job_id)
972 self.assert_qmp(result, 'return', {})
973 if wait:
974 return self.pause_wait(job_id)
975 return result
977 def case_skip(self, reason):
978 '''Skip this test case'''
979 case_notrun(reason)
980 self.skipTest(reason)
983 def notrun(reason):
984 '''Skip this test suite'''
985 # Each test in qemu-iotests has a number ("seq")
986 seq = os.path.basename(sys.argv[0])
988 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
989 logger.warning("%s not run: %s", seq, reason)
990 sys.exit(0)
992 def case_notrun(reason):
993 '''Mark this test case as not having been run (without actually
994 skipping it, that is left to the caller). See
995 QMPTestCase.case_skip() for a variant that actually skips the
996 current test case.'''
998 # Each test in qemu-iotests has a number ("seq")
999 seq = os.path.basename(sys.argv[0])
1001 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1002 ' [case not run] ' + reason + '\n')
1004 def _verify_image_format(supported_fmts: Sequence[str] = (),
1005 unsupported_fmts: Sequence[str] = ()) -> None:
1006 assert not (supported_fmts and unsupported_fmts)
1008 if 'generic' in supported_fmts and \
1009 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1010 # similar to
1011 # _supported_fmt generic
1012 # for bash tests
1013 return
1015 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1016 if not_sup or (imgfmt in unsupported_fmts):
1017 notrun('not suitable for this image format: %s' % imgfmt)
1019 def _verify_protocol(supported: Sequence[str] = (),
1020 unsupported: Sequence[str] = ()) -> None:
1021 assert not (supported and unsupported)
1023 if 'generic' in supported:
1024 return
1026 not_sup = supported and (imgproto not in supported)
1027 if not_sup or (imgproto in unsupported):
1028 notrun('not suitable for this protocol: %s' % imgproto)
1030 def _verify_platform(supported: Sequence[str] = (),
1031 unsupported: Sequence[str] = ()) -> None:
1032 if any((sys.platform.startswith(x) for x in unsupported)):
1033 notrun('not suitable for this OS: %s' % sys.platform)
1035 if supported:
1036 if not any((sys.platform.startswith(x) for x in supported)):
1037 notrun('not suitable for this OS: %s' % sys.platform)
1039 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1040 if supported_cache_modes and (cachemode not in supported_cache_modes):
1041 notrun('not suitable for this cache mode: %s' % cachemode)
1043 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1044 if supported_aio_modes and (aiomode not in supported_aio_modes):
1045 notrun('not suitable for this aio mode: %s' % aiomode)
1047 def supports_quorum():
1048 return 'quorum' in qemu_img_pipe('--help')
1050 def verify_quorum():
1051 '''Skip test suite if quorum support is not available'''
1052 if not supports_quorum():
1053 notrun('quorum support missing')
1055 def qemu_pipe(*args):
1057 Run qemu with an option to print something and exit (e.g. a help option).
1059 :return: QEMU's stdout output.
1061 args = [qemu_prog] + qemu_opts + list(args)
1062 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
1063 stderr=subprocess.STDOUT,
1064 universal_newlines=True)
1065 exitcode = subp.wait()
1066 if exitcode < 0:
1067 sys.stderr.write('qemu received signal %i: %s\n' %
1068 (-exitcode, ' '.join(args)))
1069 return subp.communicate()[0]
1071 def supported_formats(read_only=False):
1072 '''Set 'read_only' to True to check ro-whitelist
1073 Otherwise, rw-whitelist is checked'''
1075 if not hasattr(supported_formats, "formats"):
1076 supported_formats.formats = {}
1078 if read_only not in supported_formats.formats:
1079 format_message = qemu_pipe("-drive", "format=help")
1080 line = 1 if read_only else 0
1081 supported_formats.formats[read_only] = \
1082 format_message.splitlines()[line].split(":")[1].split()
1084 return supported_formats.formats[read_only]
1086 def skip_if_unsupported(required_formats=(), read_only=False):
1087 '''Skip Test Decorator
1088 Runs the test if all the required formats are whitelisted'''
1089 def skip_test_decorator(func):
1090 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1091 **kwargs: Dict[str, Any]) -> None:
1092 if callable(required_formats):
1093 fmts = required_formats(test_case)
1094 else:
1095 fmts = required_formats
1097 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1098 if usf_list:
1099 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1100 test_case.case_skip(msg)
1101 else:
1102 func(test_case, *args, **kwargs)
1103 return func_wrapper
1104 return skip_test_decorator
1106 def skip_for_formats(formats: Sequence[str] = ()) \
1107 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1108 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1109 '''Skip Test Decorator
1110 Skips the test for the given formats'''
1111 def skip_test_decorator(func):
1112 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1113 **kwargs: Dict[str, Any]) -> None:
1114 if imgfmt in formats:
1115 msg = f'{test_case}: Skipped for format {imgfmt}'
1116 test_case.case_skip(msg)
1117 else:
1118 func(test_case, *args, **kwargs)
1119 return func_wrapper
1120 return skip_test_decorator
1122 def skip_if_user_is_root(func):
1123 '''Skip Test Decorator
1124 Runs the test only without root permissions'''
1125 def func_wrapper(*args, **kwargs):
1126 if os.getuid() == 0:
1127 case_notrun('{}: cannot be run as root'.format(args[0]))
1128 return None
1129 else:
1130 return func(*args, **kwargs)
1131 return func_wrapper
1133 def execute_unittest(debug=False):
1134 """Executes unittests within the calling module."""
1136 verbosity = 2 if debug else 1
1138 if debug:
1139 output = sys.stdout
1140 else:
1141 # We need to filter out the time taken from the output so that
1142 # qemu-iotest can reliably diff the results against master output.
1143 output = io.StringIO()
1145 runner = unittest.TextTestRunner(stream=output, descriptions=True,
1146 verbosity=verbosity)
1147 try:
1148 # unittest.main() will use sys.exit(); so expect a SystemExit
1149 # exception
1150 unittest.main(testRunner=runner)
1151 finally:
1152 # We need to filter out the time taken from the output so that
1153 # qemu-iotest can reliably diff the results against master output.
1154 if not debug:
1155 out = output.getvalue()
1156 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1158 # Hide skipped tests from the reference output
1159 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1160 out_first_line, out_rest = out.split('\n', 1)
1161 out = out_first_line.replace('s', '.') + '\n' + out_rest
1163 sys.stderr.write(out)
1165 def execute_setup_common(supported_fmts: Sequence[str] = (),
1166 supported_platforms: Sequence[str] = (),
1167 supported_cache_modes: Sequence[str] = (),
1168 supported_aio_modes: Sequence[str] = (),
1169 unsupported_fmts: Sequence[str] = (),
1170 supported_protocols: Sequence[str] = (),
1171 unsupported_protocols: Sequence[str] = ()) -> bool:
1173 Perform necessary setup for either script-style or unittest-style tests.
1175 :return: Bool; Whether or not debug mode has been requested via the CLI.
1177 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1179 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1180 # indicate that we're not being run via "check". There may be
1181 # other things set up by "check" that individual test cases rely
1182 # on.
1183 if test_dir is None or qemu_default_machine is None:
1184 sys.stderr.write('Please run this test via the "check" script\n')
1185 sys.exit(os.EX_USAGE)
1187 debug = '-d' in sys.argv
1188 if debug:
1189 sys.argv.remove('-d')
1190 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1192 _verify_image_format(supported_fmts, unsupported_fmts)
1193 _verify_protocol(supported_protocols, unsupported_protocols)
1194 _verify_platform(supported=supported_platforms)
1195 _verify_cache_mode(supported_cache_modes)
1196 _verify_aio_mode(supported_aio_modes)
1198 return debug
1200 def execute_test(*args, test_function=None, **kwargs):
1201 """Run either unittest or script-style tests."""
1203 debug = execute_setup_common(*args, **kwargs)
1204 if not test_function:
1205 execute_unittest(debug)
1206 else:
1207 test_function()
1209 def activate_logging():
1210 """Activate iotests.log() output to stdout for script-style tests."""
1211 handler = logging.StreamHandler(stream=sys.stdout)
1212 formatter = logging.Formatter('%(message)s')
1213 handler.setFormatter(formatter)
1214 test_logger.addHandler(handler)
1215 test_logger.setLevel(logging.INFO)
1216 test_logger.propagate = False
1218 # This is called from script-style iotests without a single point of entry
1219 def script_initialize(*args, **kwargs):
1220 """Initialize script-style tests without running any tests."""
1221 activate_logging()
1222 execute_setup_common(*args, **kwargs)
1224 # This is called from script-style iotests with a single point of entry
1225 def script_main(test_function, *args, **kwargs):
1226 """Run script-style tests outside of the unittest framework"""
1227 activate_logging()
1228 execute_test(*args, test_function=test_function, **kwargs)
1230 # This is called from unittest style iotests
1231 def main(*args, **kwargs):
1232 """Run tests using the unittest framework"""
1233 execute_test(*args, **kwargs)