hw/misc/mps2-scc: Support configurable number of OSCCLK values
[qemu/ar7.git] / tests / qemu-iotests / iotests.py
blob4e758308f24710fe32141692fc7dfd0fae728056
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import atexit
20 from collections import OrderedDict
21 import faulthandler
22 import io
23 import json
24 import logging
25 import os
26 import re
27 import signal
28 import struct
29 import subprocess
30 import sys
31 import time
32 from typing import (Any, Callable, Dict, Iterable,
33 List, Optional, Sequence, Tuple, TypeVar)
34 import unittest
36 from contextlib import contextmanager
38 # pylint: disable=import-error, wrong-import-position
39 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40 from qemu import qtest
41 from qemu.qmp import QMPMessage
43 # Use this logger for logging messages directly from the iotests module
44 logger = logging.getLogger('qemu.iotests')
45 logger.addHandler(logging.NullHandler())
47 # Use this logger for messages that ought to be used for diff output.
48 test_logger = logging.getLogger('qemu.iotests.diff_io')
51 faulthandler.enable()
53 # This will not work if arguments contain spaces but is necessary if we
54 # want to support the override options that ./check supports.
55 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
56 if os.environ.get('QEMU_IMG_OPTIONS'):
57 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
60 if os.environ.get('QEMU_IO_OPTIONS'):
61 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
65 qemu_io_args_no_fmt += \
66 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
69 qemu_nbd_args = [qemu_nbd_prog]
70 if os.environ.get('QEMU_NBD_OPTIONS'):
71 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
74 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76 imgfmt = os.environ.get('IMGFMT', 'raw')
77 imgproto = os.environ.get('IMGPROTO', 'file')
78 output_dir = os.environ.get('OUTPUT_DIR', '.')
80 try:
81 test_dir = os.environ['TEST_DIR']
82 sock_dir = os.environ['SOCK_DIR']
83 cachemode = os.environ['CACHEMODE']
84 aiomode = os.environ['AIOMODE']
85 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
86 except KeyError:
87 # We are using these variables as proxies to indicate that we're
88 # not being run via "check". There may be other things set up by
89 # "check" that individual test cases rely on.
90 sys.stderr.write('Please run this test via the "check" script\n')
91 sys.exit(os.EX_USAGE)
93 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
95 luks_default_secret_object = 'secret,id=keysec0,data=' + \
96 os.environ.get('IMGKEYSECRET', '')
97 luks_default_key_secret_opt = 'key-secret=keysec0'
100 def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
101 connect_stderr: bool = True) -> Tuple[str, int]:
103 Run a tool and return both its output and its exit code
105 stderr = subprocess.STDOUT if connect_stderr else None
106 subp = subprocess.Popen(args,
107 stdout=subprocess.PIPE,
108 stderr=stderr,
109 universal_newlines=True)
110 output = subp.communicate()[0]
111 if subp.returncode < 0:
112 cmd = ' '.join(args)
113 sys.stderr.write(f'{tool} received signal {-subp.returncode}: {cmd}\n')
114 return (output, subp.returncode)
116 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
118 Run qemu-img and return both its output and its exit code
120 full_args = qemu_img_args + list(args)
121 return qemu_tool_pipe_and_status('qemu-img', full_args)
123 def qemu_img(*args: str) -> int:
124 '''Run qemu-img and return the exit code'''
125 return qemu_img_pipe_and_status(*args)[1]
127 def ordered_qmp(qmsg, conv_keys=True):
128 # Dictionaries are not ordered prior to 3.6, therefore:
129 if isinstance(qmsg, list):
130 return [ordered_qmp(atom) for atom in qmsg]
131 if isinstance(qmsg, dict):
132 od = OrderedDict()
133 for k, v in sorted(qmsg.items()):
134 if conv_keys:
135 k = k.replace('_', '-')
136 od[k] = ordered_qmp(v, conv_keys=False)
137 return od
138 return qmsg
140 def qemu_img_create(*args):
141 args = list(args)
143 # default luks support
144 if '-f' in args and args[args.index('-f') + 1] == 'luks':
145 if '-o' in args:
146 i = args.index('-o')
147 if 'key-secret' not in args[i + 1]:
148 args[i + 1].append(luks_default_key_secret_opt)
149 args.insert(i + 2, '--object')
150 args.insert(i + 3, luks_default_secret_object)
151 else:
152 args = ['-o', luks_default_key_secret_opt,
153 '--object', luks_default_secret_object] + args
155 args.insert(0, 'create')
157 return qemu_img(*args)
159 def qemu_img_measure(*args):
160 return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
162 def qemu_img_check(*args):
163 return json.loads(qemu_img_pipe("check", "--output", "json", *args))
165 def qemu_img_verbose(*args):
166 '''Run qemu-img without suppressing its output and return the exit code'''
167 exitcode = subprocess.call(qemu_img_args + list(args))
168 if exitcode < 0:
169 sys.stderr.write('qemu-img received signal %i: %s\n'
170 % (-exitcode, ' '.join(qemu_img_args + list(args))))
171 return exitcode
173 def qemu_img_pipe(*args: str) -> str:
174 '''Run qemu-img and return its output'''
175 return qemu_img_pipe_and_status(*args)[0]
177 def qemu_img_log(*args):
178 result = qemu_img_pipe(*args)
179 log(result, filters=[filter_testfiles])
180 return result
182 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
183 args = ['info']
184 if imgopts:
185 args.append('--image-opts')
186 else:
187 args += ['-f', imgfmt]
188 args += extra_args
189 args.append(filename)
191 output = qemu_img_pipe(*args)
192 if not filter_path:
193 filter_path = filename
194 log(filter_img_info(output, filter_path))
196 def qemu_io(*args):
197 '''Run qemu-io and return the stdout data'''
198 args = qemu_io_args + list(args)
199 return qemu_tool_pipe_and_status('qemu-io', args)[0]
201 def qemu_io_log(*args):
202 result = qemu_io(*args)
203 log(result, filters=[filter_testfiles, filter_qemu_io])
204 return result
206 def qemu_io_silent(*args):
207 '''Run qemu-io and return the exit code, suppressing stdout'''
208 if '-f' in args or '--image-opts' in args:
209 default_args = qemu_io_args_no_fmt
210 else:
211 default_args = qemu_io_args
213 args = default_args + list(args)
214 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
215 if exitcode < 0:
216 sys.stderr.write('qemu-io received signal %i: %s\n' %
217 (-exitcode, ' '.join(args)))
218 return exitcode
220 def qemu_io_silent_check(*args):
221 '''Run qemu-io and return the true if subprocess returned 0'''
222 args = qemu_io_args + list(args)
223 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
224 stderr=subprocess.STDOUT)
225 return exitcode == 0
227 def get_virtio_scsi_device():
228 if qemu_default_machine == 's390-ccw-virtio':
229 return 'virtio-scsi-ccw'
230 return 'virtio-scsi-pci'
232 class QemuIoInteractive:
233 def __init__(self, *args):
234 self.args = qemu_io_args_no_fmt + list(args)
235 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
236 stdout=subprocess.PIPE,
237 stderr=subprocess.STDOUT,
238 universal_newlines=True)
239 out = self._p.stdout.read(9)
240 if out != 'qemu-io> ':
241 # Most probably qemu-io just failed to start.
242 # Let's collect the whole output and exit.
243 out += self._p.stdout.read()
244 self._p.wait(timeout=1)
245 raise ValueError(out)
247 def close(self):
248 self._p.communicate('q\n')
250 def _read_output(self):
251 pattern = 'qemu-io> '
252 n = len(pattern)
253 pos = 0
254 s = []
255 while pos != n:
256 c = self._p.stdout.read(1)
257 # check unexpected EOF
258 assert c != ''
259 s.append(c)
260 if c == pattern[pos]:
261 pos += 1
262 else:
263 pos = 0
265 return ''.join(s[:-n])
267 def cmd(self, cmd):
268 # quit command is in close(), '\n' is added automatically
269 assert '\n' not in cmd
270 cmd = cmd.strip()
271 assert cmd not in ('q', 'quit')
272 self._p.stdin.write(cmd + '\n')
273 self._p.stdin.flush()
274 return self._read_output()
277 def qemu_nbd(*args):
278 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
279 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
281 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
282 '''Run qemu-nbd in daemon mode and return both the parent's exit code
283 and its output in case of an error'''
284 full_args = qemu_nbd_args + ['--fork'] + list(args)
285 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
286 connect_stderr=False)
287 return returncode, output if returncode else ''
289 def qemu_nbd_list_log(*args: str) -> str:
290 '''Run qemu-nbd to list remote exports'''
291 full_args = [qemu_nbd_prog, '-L'] + list(args)
292 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
293 log(output, filters=[filter_testfiles, filter_nbd_exports])
294 return output
296 @contextmanager
297 def qemu_nbd_popen(*args):
298 '''Context manager running qemu-nbd within the context'''
299 pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
301 assert not os.path.exists(pid_file)
303 cmd = list(qemu_nbd_args)
304 cmd.extend(('--persistent', '--pid-file', pid_file))
305 cmd.extend(args)
307 log('Start NBD server')
308 p = subprocess.Popen(cmd)
309 try:
310 while not os.path.exists(pid_file):
311 if p.poll() is not None:
312 raise RuntimeError(
313 "qemu-nbd terminated with exit code {}: {}"
314 .format(p.returncode, ' '.join(cmd)))
316 time.sleep(0.01)
317 yield
318 finally:
319 if os.path.exists(pid_file):
320 os.remove(pid_file)
321 log('Kill NBD server')
322 p.kill()
323 p.wait()
325 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
326 '''Return True if two image files are identical'''
327 return qemu_img('compare', '-f', fmt1,
328 '-F', fmt2, img1, img2) == 0
330 def create_image(name, size):
331 '''Create a fully-allocated raw image with sector markers'''
332 file = open(name, 'wb')
333 i = 0
334 while i < size:
335 sector = struct.pack('>l504xl', i // 512, i // 512)
336 file.write(sector)
337 i = i + 512
338 file.close()
340 def image_size(img):
341 '''Return image's virtual size'''
342 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
343 return json.loads(r)['virtual-size']
345 def is_str(val):
346 return isinstance(val, str)
348 test_dir_re = re.compile(r"%s" % test_dir)
349 def filter_test_dir(msg):
350 return test_dir_re.sub("TEST_DIR", msg)
352 win32_re = re.compile(r"\r")
353 def filter_win32(msg):
354 return win32_re.sub("", msg)
356 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
357 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
358 r"and [0-9\/.inf]* ops\/sec\)")
359 def filter_qemu_io(msg):
360 msg = filter_win32(msg)
361 return qemu_io_re.sub("X ops; XX:XX:XX.X "
362 "(XXX YYY/sec and XXX ops/sec)", msg)
364 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
365 def filter_chown(msg):
366 return chown_re.sub("chown UID:GID", msg)
368 def filter_qmp_event(event):
369 '''Filter a QMP event dict'''
370 event = dict(event)
371 if 'timestamp' in event:
372 event['timestamp']['seconds'] = 'SECS'
373 event['timestamp']['microseconds'] = 'USECS'
374 return event
376 def filter_qmp(qmsg, filter_fn):
377 '''Given a string filter, filter a QMP object's values.
378 filter_fn takes a (key, value) pair.'''
379 # Iterate through either lists or dicts;
380 if isinstance(qmsg, list):
381 items = enumerate(qmsg)
382 else:
383 items = qmsg.items()
385 for k, v in items:
386 if isinstance(v, (dict, list)):
387 qmsg[k] = filter_qmp(v, filter_fn)
388 else:
389 qmsg[k] = filter_fn(k, v)
390 return qmsg
392 def filter_testfiles(msg):
393 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
394 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
395 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
397 def filter_qmp_testfiles(qmsg):
398 def _filter(_key, value):
399 if is_str(value):
400 return filter_testfiles(value)
401 return value
402 return filter_qmp(qmsg, _filter)
404 def filter_virtio_scsi(output: str) -> str:
405 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
407 def filter_qmp_virtio_scsi(qmsg):
408 def _filter(_key, value):
409 if is_str(value):
410 return filter_virtio_scsi(value)
411 return value
412 return filter_qmp(qmsg, _filter)
414 def filter_generated_node_ids(msg):
415 return re.sub("#block[0-9]+", "NODE_NAME", msg)
417 def filter_img_info(output, filename):
418 lines = []
419 for line in output.split('\n'):
420 if 'disk size' in line or 'actual-size' in line:
421 continue
422 line = line.replace(filename, 'TEST_IMG')
423 line = filter_testfiles(line)
424 line = line.replace(imgfmt, 'IMGFMT')
425 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
426 line = re.sub('uuid: [-a-f0-9]+',
427 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
428 line)
429 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
430 lines.append(line)
431 return '\n'.join(lines)
433 def filter_imgfmt(msg):
434 return msg.replace(imgfmt, 'IMGFMT')
436 def filter_qmp_imgfmt(qmsg):
437 def _filter(_key, value):
438 if is_str(value):
439 return filter_imgfmt(value)
440 return value
441 return filter_qmp(qmsg, _filter)
443 def filter_nbd_exports(output: str) -> str:
444 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
447 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
449 def log(msg: Msg,
450 filters: Iterable[Callable[[Msg], Msg]] = (),
451 indent: Optional[int] = None) -> None:
453 Logs either a string message or a JSON serializable message (like QMP).
454 If indent is provided, JSON serializable messages are pretty-printed.
456 for flt in filters:
457 msg = flt(msg)
458 if isinstance(msg, (dict, list)):
459 # Don't sort if it's already sorted
460 do_sort = not isinstance(msg, OrderedDict)
461 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
462 else:
463 test_logger.info(msg)
465 class Timeout:
466 def __init__(self, seconds, errmsg="Timeout"):
467 self.seconds = seconds
468 self.errmsg = errmsg
469 def __enter__(self):
470 signal.signal(signal.SIGALRM, self.timeout)
471 signal.setitimer(signal.ITIMER_REAL, self.seconds)
472 return self
473 def __exit__(self, exc_type, value, traceback):
474 signal.setitimer(signal.ITIMER_REAL, 0)
475 return False
476 def timeout(self, signum, frame):
477 raise Exception(self.errmsg)
479 def file_pattern(name):
480 return "{0}-{1}".format(os.getpid(), name)
482 class FilePath:
484 Context manager generating multiple file names. The generated files are
485 removed when exiting the context.
487 Example usage:
489 with FilePath('a.img', 'b.img') as (img_a, img_b):
490 # Use img_a and img_b here...
492 # a.img and b.img are automatically removed here.
494 By default images are created in iotests.test_dir. To create sockets use
495 iotests.sock_dir:
497 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
499 For convenience, calling with one argument yields a single file instead of
500 a tuple with one item.
503 def __init__(self, *names, base_dir=test_dir):
504 self.paths = [os.path.join(base_dir, file_pattern(name))
505 for name in names]
507 def __enter__(self):
508 if len(self.paths) == 1:
509 return self.paths[0]
510 else:
511 return self.paths
513 def __exit__(self, exc_type, exc_val, exc_tb):
514 for path in self.paths:
515 try:
516 os.remove(path)
517 except OSError:
518 pass
519 return False
522 def try_remove(img):
523 try:
524 os.remove(img)
525 except OSError:
526 pass
528 def file_path_remover():
529 for path in reversed(file_path_remover.paths):
530 try_remove(path)
533 def file_path(*names, base_dir=test_dir):
534 ''' Another way to get auto-generated filename that cleans itself up.
536 Use is as simple as:
538 img_a, img_b = file_path('a.img', 'b.img')
539 sock = file_path('socket')
542 if not hasattr(file_path_remover, 'paths'):
543 file_path_remover.paths = []
544 atexit.register(file_path_remover)
546 paths = []
547 for name in names:
548 filename = file_pattern(name)
549 path = os.path.join(base_dir, filename)
550 file_path_remover.paths.append(path)
551 paths.append(path)
553 return paths[0] if len(paths) == 1 else paths
555 def remote_filename(path):
556 if imgproto == 'file':
557 return path
558 elif imgproto == 'ssh':
559 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
560 else:
561 raise Exception("Protocol %s not supported" % (imgproto))
563 class VM(qtest.QEMUQtestMachine):
564 '''A QEMU VM'''
566 def __init__(self, path_suffix=''):
567 name = "qemu%s-%d" % (path_suffix, os.getpid())
568 super().__init__(qemu_prog, qemu_opts, name=name,
569 test_dir=test_dir,
570 socket_scm_helper=socket_scm_helper,
571 sock_dir=sock_dir)
572 self._num_drives = 0
574 def add_object(self, opts):
575 self._args.append('-object')
576 self._args.append(opts)
577 return self
579 def add_device(self, opts):
580 self._args.append('-device')
581 self._args.append(opts)
582 return self
584 def add_drive_raw(self, opts):
585 self._args.append('-drive')
586 self._args.append(opts)
587 return self
589 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
590 '''Add a virtio-blk drive to the VM'''
591 options = ['if=%s' % interface,
592 'id=drive%d' % self._num_drives]
594 if path is not None:
595 options.append('file=%s' % path)
596 options.append('format=%s' % img_format)
597 options.append('cache=%s' % cachemode)
598 options.append('aio=%s' % aiomode)
600 if opts:
601 options.append(opts)
603 if img_format == 'luks' and 'key-secret' not in opts:
604 # default luks support
605 if luks_default_secret_object not in self._args:
606 self.add_object(luks_default_secret_object)
608 options.append(luks_default_key_secret_opt)
610 self._args.append('-drive')
611 self._args.append(','.join(options))
612 self._num_drives += 1
613 return self
615 def add_blockdev(self, opts):
616 self._args.append('-blockdev')
617 if isinstance(opts, str):
618 self._args.append(opts)
619 else:
620 self._args.append(','.join(opts))
621 return self
623 def add_incoming(self, addr):
624 self._args.append('-incoming')
625 self._args.append(addr)
626 return self
628 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
629 cmd = 'human-monitor-command'
630 kwargs: Dict[str, Any] = {'command-line': command_line}
631 if use_log:
632 return self.qmp_log(cmd, **kwargs)
633 else:
634 return self.qmp(cmd, **kwargs)
636 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
637 """Pause drive r/w operations"""
638 if not event:
639 self.pause_drive(drive, "read_aio")
640 self.pause_drive(drive, "write_aio")
641 return
642 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
644 def resume_drive(self, drive: str) -> None:
645 """Resume drive r/w operations"""
646 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
648 def hmp_qemu_io(self, drive: str, cmd: str,
649 use_log: bool = False) -> QMPMessage:
650 """Write to a given drive using an HMP command"""
651 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
653 def flatten_qmp_object(self, obj, output=None, basestr=''):
654 if output is None:
655 output = dict()
656 if isinstance(obj, list):
657 for i, item in enumerate(obj):
658 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
659 elif isinstance(obj, dict):
660 for key in obj:
661 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
662 else:
663 output[basestr[:-1]] = obj # Strip trailing '.'
664 return output
666 def qmp_to_opts(self, obj):
667 obj = self.flatten_qmp_object(obj)
668 output_list = list()
669 for key in obj:
670 output_list += [key + '=' + obj[key]]
671 return ','.join(output_list)
673 def get_qmp_events_filtered(self, wait=60.0):
674 result = []
675 for ev in self.get_qmp_events(wait=wait):
676 result.append(filter_qmp_event(ev))
677 return result
679 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
680 full_cmd = OrderedDict((
681 ("execute", cmd),
682 ("arguments", ordered_qmp(kwargs))
684 log(full_cmd, filters, indent=indent)
685 result = self.qmp(cmd, **kwargs)
686 log(result, filters, indent=indent)
687 return result
689 # Returns None on success, and an error string on failure
690 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
691 pre_finalize=None, cancel=False, wait=60.0):
693 run_job moves a job from creation through to dismissal.
695 :param job: String. ID of recently-launched job
696 :param auto_finalize: Bool. True if the job was launched with
697 auto_finalize. Defaults to True.
698 :param auto_dismiss: Bool. True if the job was launched with
699 auto_dismiss=True. Defaults to False.
700 :param pre_finalize: Callback. A callable that takes no arguments to be
701 invoked prior to issuing job-finalize, if any.
702 :param cancel: Bool. When true, cancels the job after the pre_finalize
703 callback.
704 :param wait: Float. Timeout value specifying how long to wait for any
705 event, in seconds. Defaults to 60.0.
707 match_device = {'data': {'device': job}}
708 match_id = {'data': {'id': job}}
709 events = [
710 ('BLOCK_JOB_COMPLETED', match_device),
711 ('BLOCK_JOB_CANCELLED', match_device),
712 ('BLOCK_JOB_ERROR', match_device),
713 ('BLOCK_JOB_READY', match_device),
714 ('BLOCK_JOB_PENDING', match_id),
715 ('JOB_STATUS_CHANGE', match_id)
717 error = None
718 while True:
719 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
720 if ev['event'] != 'JOB_STATUS_CHANGE':
721 log(ev)
722 continue
723 status = ev['data']['status']
724 if status == 'aborting':
725 result = self.qmp('query-jobs')
726 for j in result['return']:
727 if j['id'] == job:
728 error = j['error']
729 log('Job failed: %s' % (j['error']))
730 elif status == 'ready':
731 self.qmp_log('job-complete', id=job)
732 elif status == 'pending' and not auto_finalize:
733 if pre_finalize:
734 pre_finalize()
735 if cancel:
736 self.qmp_log('job-cancel', id=job)
737 else:
738 self.qmp_log('job-finalize', id=job)
739 elif status == 'concluded' and not auto_dismiss:
740 self.qmp_log('job-dismiss', id=job)
741 elif status == 'null':
742 return error
744 # Returns None on success, and an error string on failure
745 def blockdev_create(self, options, job_id='job0', filters=None):
746 if filters is None:
747 filters = [filter_qmp_testfiles]
748 result = self.qmp_log('blockdev-create', filters=filters,
749 job_id=job_id, options=options)
751 if 'return' in result:
752 assert result['return'] == {}
753 job_result = self.run_job(job_id)
754 else:
755 job_result = result['error']
757 log("")
758 return job_result
760 def enable_migration_events(self, name):
761 log('Enabling migration QMP events on %s...' % name)
762 log(self.qmp('migrate-set-capabilities', capabilities=[
764 'capability': 'events',
765 'state': True
769 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
770 while True:
771 event = self.event_wait('MIGRATION')
772 # We use the default timeout, and with a timeout, event_wait()
773 # never returns None
774 assert event
776 log(event, filters=[filter_qmp_event])
777 if event['data']['status'] in ('completed', 'failed'):
778 break
780 if event['data']['status'] == 'completed':
781 # The event may occur in finish-migrate, so wait for the expected
782 # post-migration runstate
783 runstate = None
784 while runstate != expect_runstate:
785 runstate = self.qmp('query-status')['return']['status']
786 return True
787 else:
788 return False
790 def node_info(self, node_name):
791 nodes = self.qmp('query-named-block-nodes')
792 for x in nodes['return']:
793 if x['node-name'] == node_name:
794 return x
795 return None
797 def query_bitmaps(self):
798 res = self.qmp("query-named-block-nodes")
799 return {device['node-name']: device['dirty-bitmaps']
800 for device in res['return'] if 'dirty-bitmaps' in device}
802 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
804 get a specific bitmap from the object returned by query_bitmaps.
805 :param recording: If specified, filter results by the specified value.
806 :param bitmaps: If specified, use it instead of call query_bitmaps()
808 if bitmaps is None:
809 bitmaps = self.query_bitmaps()
811 for bitmap in bitmaps[node_name]:
812 if bitmap.get('name', '') == bitmap_name:
813 if recording is None or bitmap.get('recording') == recording:
814 return bitmap
815 return None
817 def check_bitmap_status(self, node_name, bitmap_name, fields):
818 ret = self.get_bitmap(node_name, bitmap_name)
820 return fields.items() <= ret.items()
822 def assert_block_path(self, root, path, expected_node, graph=None):
824 Check whether the node under the given path in the block graph
825 is @expected_node.
827 @root is the node name of the node where the @path is rooted.
829 @path is a string that consists of child names separated by
830 slashes. It must begin with a slash.
832 Examples for @root + @path:
833 - root="qcow2-node", path="/backing/file"
834 - root="quorum-node", path="/children.2/file"
836 Hypothetically, @path could be empty, in which case it would
837 point to @root. However, in practice this case is not useful
838 and hence not allowed.
840 @expected_node may be None. (All elements of the path but the
841 leaf must still exist.)
843 @graph may be None or the result of an x-debug-query-block-graph
844 call that has already been performed.
846 if graph is None:
847 graph = self.qmp('x-debug-query-block-graph')['return']
849 iter_path = iter(path.split('/'))
851 # Must start with a /
852 assert next(iter_path) == ''
854 node = next((node for node in graph['nodes'] if node['name'] == root),
855 None)
857 # An empty @path is not allowed, so the root node must be present
858 assert node is not None, 'Root node %s not found' % root
860 for child_name in iter_path:
861 assert node is not None, 'Cannot follow path %s%s' % (root, path)
863 try:
864 node_id = next(edge['child'] for edge in graph['edges']
865 if (edge['parent'] == node['id'] and
866 edge['name'] == child_name))
868 node = next(node for node in graph['nodes']
869 if node['id'] == node_id)
871 except StopIteration:
872 node = None
874 if node is None:
875 assert expected_node is None, \
876 'No node found under %s (but expected %s)' % \
877 (path, expected_node)
878 else:
879 assert node['name'] == expected_node, \
880 'Found node %s under %s (but expected %s)' % \
881 (node['name'], path, expected_node)
883 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
885 class QMPTestCase(unittest.TestCase):
886 '''Abstract base class for QMP test cases'''
888 def __init__(self, *args, **kwargs):
889 super().__init__(*args, **kwargs)
890 # Many users of this class set a VM property we rely on heavily
891 # in the methods below.
892 self.vm = None
894 def dictpath(self, d, path):
895 '''Traverse a path in a nested dict'''
896 for component in path.split('/'):
897 m = index_re.match(component)
898 if m:
899 component, idx = m.groups()
900 idx = int(idx)
902 if not isinstance(d, dict) or component not in d:
903 self.fail(f'failed path traversal for "{path}" in "{d}"')
904 d = d[component]
906 if m:
907 if not isinstance(d, list):
908 self.fail(f'path component "{component}" in "{path}" '
909 f'is not a list in "{d}"')
910 try:
911 d = d[idx]
912 except IndexError:
913 self.fail(f'invalid index "{idx}" in path "{path}" '
914 f'in "{d}"')
915 return d
917 def assert_qmp_absent(self, d, path):
918 try:
919 result = self.dictpath(d, path)
920 except AssertionError:
921 return
922 self.fail('path "%s" has value "%s"' % (path, str(result)))
924 def assert_qmp(self, d, path, value):
925 '''Assert that the value for a specific path in a QMP dict
926 matches. When given a list of values, assert that any of
927 them matches.'''
929 result = self.dictpath(d, path)
931 # [] makes no sense as a list of valid values, so treat it as
932 # an actual single value.
933 if isinstance(value, list) and value != []:
934 for v in value:
935 if result == v:
936 return
937 self.fail('no match for "%s" in %s' % (str(result), str(value)))
938 else:
939 self.assertEqual(result, value,
940 '"%s" is "%s", expected "%s"'
941 % (path, str(result), str(value)))
943 def assert_no_active_block_jobs(self):
944 result = self.vm.qmp('query-block-jobs')
945 self.assert_qmp(result, 'return', [])
947 def assert_has_block_node(self, node_name=None, file_name=None):
948 """Issue a query-named-block-nodes and assert node_name and/or
949 file_name is present in the result"""
950 def check_equal_or_none(a, b):
951 return a is None or b is None or a == b
952 assert node_name or file_name
953 result = self.vm.qmp('query-named-block-nodes')
954 for x in result["return"]:
955 if check_equal_or_none(x.get("node-name"), node_name) and \
956 check_equal_or_none(x.get("file"), file_name):
957 return
958 self.fail("Cannot find %s %s in result:\n%s" %
959 (node_name, file_name, result))
961 def assert_json_filename_equal(self, json_filename, reference):
962 '''Asserts that the given filename is a json: filename and that its
963 content is equal to the given reference object'''
964 self.assertEqual(json_filename[:5], 'json:')
965 self.assertEqual(
966 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
967 self.vm.flatten_qmp_object(reference)
970 def cancel_and_wait(self, drive='drive0', force=False,
971 resume=False, wait=60.0):
972 '''Cancel a block job and wait for it to finish, returning the event'''
973 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
974 self.assert_qmp(result, 'return', {})
976 if resume:
977 self.vm.resume_drive(drive)
979 cancelled = False
980 result = None
981 while not cancelled:
982 for event in self.vm.get_qmp_events(wait=wait):
983 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
984 event['event'] == 'BLOCK_JOB_CANCELLED':
985 self.assert_qmp(event, 'data/device', drive)
986 result = event
987 cancelled = True
988 elif event['event'] == 'JOB_STATUS_CHANGE':
989 self.assert_qmp(event, 'data/id', drive)
992 self.assert_no_active_block_jobs()
993 return result
995 def wait_until_completed(self, drive='drive0', check_offset=True,
996 wait=60.0, error=None):
997 '''Wait for a block job to finish, returning the event'''
998 while True:
999 for event in self.vm.get_qmp_events(wait=wait):
1000 if event['event'] == 'BLOCK_JOB_COMPLETED':
1001 self.assert_qmp(event, 'data/device', drive)
1002 if error is None:
1003 self.assert_qmp_absent(event, 'data/error')
1004 if check_offset:
1005 self.assert_qmp(event, 'data/offset',
1006 event['data']['len'])
1007 else:
1008 self.assert_qmp(event, 'data/error', error)
1009 self.assert_no_active_block_jobs()
1010 return event
1011 if event['event'] == 'JOB_STATUS_CHANGE':
1012 self.assert_qmp(event, 'data/id', drive)
1014 def wait_ready(self, drive='drive0'):
1015 """Wait until a BLOCK_JOB_READY event, and return the event."""
1016 return self.vm.events_wait([
1017 ('BLOCK_JOB_READY',
1018 {'data': {'type': 'mirror', 'device': drive}}),
1019 ('BLOCK_JOB_READY',
1020 {'data': {'type': 'commit', 'device': drive}})
1023 def wait_ready_and_cancel(self, drive='drive0'):
1024 self.wait_ready(drive=drive)
1025 event = self.cancel_and_wait(drive=drive)
1026 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1027 self.assert_qmp(event, 'data/type', 'mirror')
1028 self.assert_qmp(event, 'data/offset', event['data']['len'])
1030 def complete_and_wait(self, drive='drive0', wait_ready=True,
1031 completion_error=None):
1032 '''Complete a block job and wait for it to finish'''
1033 if wait_ready:
1034 self.wait_ready(drive=drive)
1036 result = self.vm.qmp('block-job-complete', device=drive)
1037 self.assert_qmp(result, 'return', {})
1039 event = self.wait_until_completed(drive=drive, error=completion_error)
1040 self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1042 def pause_wait(self, job_id='job0'):
1043 with Timeout(3, "Timeout waiting for job to pause"):
1044 while True:
1045 result = self.vm.qmp('query-block-jobs')
1046 found = False
1047 for job in result['return']:
1048 if job['device'] == job_id:
1049 found = True
1050 if job['paused'] and not job['busy']:
1051 return job
1052 break
1053 assert found
1055 def pause_job(self, job_id='job0', wait=True):
1056 result = self.vm.qmp('block-job-pause', device=job_id)
1057 self.assert_qmp(result, 'return', {})
1058 if wait:
1059 return self.pause_wait(job_id)
1060 return result
1062 def case_skip(self, reason):
1063 '''Skip this test case'''
1064 case_notrun(reason)
1065 self.skipTest(reason)
1068 def notrun(reason):
1069 '''Skip this test suite'''
1070 # Each test in qemu-iotests has a number ("seq")
1071 seq = os.path.basename(sys.argv[0])
1073 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1074 logger.warning("%s not run: %s", seq, reason)
1075 sys.exit(0)
1077 def case_notrun(reason):
1078 '''Mark this test case as not having been run (without actually
1079 skipping it, that is left to the caller). See
1080 QMPTestCase.case_skip() for a variant that actually skips the
1081 current test case.'''
1083 # Each test in qemu-iotests has a number ("seq")
1084 seq = os.path.basename(sys.argv[0])
1086 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1087 ' [case not run] ' + reason + '\n')
1089 def _verify_image_format(supported_fmts: Sequence[str] = (),
1090 unsupported_fmts: Sequence[str] = ()) -> None:
1091 if 'generic' in supported_fmts and \
1092 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1093 # similar to
1094 # _supported_fmt generic
1095 # for bash tests
1096 supported_fmts = ()
1098 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1099 if not_sup or (imgfmt in unsupported_fmts):
1100 notrun('not suitable for this image format: %s' % imgfmt)
1102 if imgfmt == 'luks':
1103 verify_working_luks()
1105 def _verify_protocol(supported: Sequence[str] = (),
1106 unsupported: Sequence[str] = ()) -> None:
1107 assert not (supported and unsupported)
1109 if 'generic' in supported:
1110 return
1112 not_sup = supported and (imgproto not in supported)
1113 if not_sup or (imgproto in unsupported):
1114 notrun('not suitable for this protocol: %s' % imgproto)
1116 def _verify_platform(supported: Sequence[str] = (),
1117 unsupported: Sequence[str] = ()) -> None:
1118 if any((sys.platform.startswith(x) for x in unsupported)):
1119 notrun('not suitable for this OS: %s' % sys.platform)
1121 if supported:
1122 if not any((sys.platform.startswith(x) for x in supported)):
1123 notrun('not suitable for this OS: %s' % sys.platform)
1125 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1126 if supported_cache_modes and (cachemode not in supported_cache_modes):
1127 notrun('not suitable for this cache mode: %s' % cachemode)
1129 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1130 if supported_aio_modes and (aiomode not in supported_aio_modes):
1131 notrun('not suitable for this aio mode: %s' % aiomode)
1133 def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1134 usf_list = list(set(required_formats) - set(supported_formats()))
1135 if usf_list:
1136 notrun(f'formats {usf_list} are not whitelisted')
1139 def _verify_virtio_blk() -> None:
1140 out = qemu_pipe('-M', 'none', '-device', 'help')
1141 if 'virtio-blk' not in out:
1142 notrun('Missing virtio-blk in QEMU binary')
1145 def supports_quorum():
1146 return 'quorum' in qemu_img_pipe('--help')
1148 def verify_quorum():
1149 '''Skip test suite if quorum support is not available'''
1150 if not supports_quorum():
1151 notrun('quorum support missing')
1153 def has_working_luks() -> Tuple[bool, str]:
1155 Check whether our LUKS driver can actually create images
1156 (this extends to LUKS encryption for qcow2).
1158 If not, return the reason why.
1161 img_file = f'{test_dir}/luks-test.luks'
1162 (output, status) = \
1163 qemu_img_pipe_and_status('create', '-f', 'luks',
1164 '--object', luks_default_secret_object,
1165 '-o', luks_default_key_secret_opt,
1166 '-o', 'iter-time=10',
1167 img_file, '1G')
1168 try:
1169 os.remove(img_file)
1170 except OSError:
1171 pass
1173 if status != 0:
1174 reason = output
1175 for line in output.splitlines():
1176 if img_file + ':' in line:
1177 reason = line.split(img_file + ':', 1)[1].strip()
1178 break
1180 return (False, reason)
1181 else:
1182 return (True, '')
1184 def verify_working_luks():
1186 Skip test suite if LUKS does not work
1188 (working, reason) = has_working_luks()
1189 if not working:
1190 notrun(reason)
1192 def qemu_pipe(*args: str) -> str:
1194 Run qemu with an option to print something and exit (e.g. a help option).
1196 :return: QEMU's stdout output.
1198 full_args = [qemu_prog] + qemu_opts + list(args)
1199 output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1200 return output
1202 def supported_formats(read_only=False):
1203 '''Set 'read_only' to True to check ro-whitelist
1204 Otherwise, rw-whitelist is checked'''
1206 if not hasattr(supported_formats, "formats"):
1207 supported_formats.formats = {}
1209 if read_only not in supported_formats.formats:
1210 format_message = qemu_pipe("-drive", "format=help")
1211 line = 1 if read_only else 0
1212 supported_formats.formats[read_only] = \
1213 format_message.splitlines()[line].split(":")[1].split()
1215 return supported_formats.formats[read_only]
1217 def skip_if_unsupported(required_formats=(), read_only=False):
1218 '''Skip Test Decorator
1219 Runs the test if all the required formats are whitelisted'''
1220 def skip_test_decorator(func):
1221 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1222 **kwargs: Dict[str, Any]) -> None:
1223 if callable(required_formats):
1224 fmts = required_formats(test_case)
1225 else:
1226 fmts = required_formats
1228 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1229 if usf_list:
1230 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1231 test_case.case_skip(msg)
1232 else:
1233 func(test_case, *args, **kwargs)
1234 return func_wrapper
1235 return skip_test_decorator
1237 def skip_for_formats(formats: Sequence[str] = ()) \
1238 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1239 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1240 '''Skip Test Decorator
1241 Skips the test for the given formats'''
1242 def skip_test_decorator(func):
1243 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1244 **kwargs: Dict[str, Any]) -> None:
1245 if imgfmt in formats:
1246 msg = f'{test_case}: Skipped for format {imgfmt}'
1247 test_case.case_skip(msg)
1248 else:
1249 func(test_case, *args, **kwargs)
1250 return func_wrapper
1251 return skip_test_decorator
1253 def skip_if_user_is_root(func):
1254 '''Skip Test Decorator
1255 Runs the test only without root permissions'''
1256 def func_wrapper(*args, **kwargs):
1257 if os.getuid() == 0:
1258 case_notrun('{}: cannot be run as root'.format(args[0]))
1259 return None
1260 else:
1261 return func(*args, **kwargs)
1262 return func_wrapper
1264 def execute_unittest(debug=False):
1265 """Executes unittests within the calling module."""
1267 verbosity = 2 if debug else 1
1269 if debug:
1270 output = sys.stdout
1271 else:
1272 # We need to filter out the time taken from the output so that
1273 # qemu-iotest can reliably diff the results against master output.
1274 output = io.StringIO()
1276 runner = unittest.TextTestRunner(stream=output, descriptions=True,
1277 verbosity=verbosity)
1278 try:
1279 # unittest.main() will use sys.exit(); so expect a SystemExit
1280 # exception
1281 unittest.main(testRunner=runner)
1282 finally:
1283 # We need to filter out the time taken from the output so that
1284 # qemu-iotest can reliably diff the results against master output.
1285 if not debug:
1286 out = output.getvalue()
1287 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1289 # Hide skipped tests from the reference output
1290 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1291 out_first_line, out_rest = out.split('\n', 1)
1292 out = out_first_line.replace('s', '.') + '\n' + out_rest
1294 sys.stderr.write(out)
1296 def execute_setup_common(supported_fmts: Sequence[str] = (),
1297 supported_platforms: Sequence[str] = (),
1298 supported_cache_modes: Sequence[str] = (),
1299 supported_aio_modes: Sequence[str] = (),
1300 unsupported_fmts: Sequence[str] = (),
1301 supported_protocols: Sequence[str] = (),
1302 unsupported_protocols: Sequence[str] = (),
1303 required_fmts: Sequence[str] = ()) -> bool:
1305 Perform necessary setup for either script-style or unittest-style tests.
1307 :return: Bool; Whether or not debug mode has been requested via the CLI.
1309 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1311 debug = '-d' in sys.argv
1312 if debug:
1313 sys.argv.remove('-d')
1314 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1316 _verify_image_format(supported_fmts, unsupported_fmts)
1317 _verify_protocol(supported_protocols, unsupported_protocols)
1318 _verify_platform(supported=supported_platforms)
1319 _verify_cache_mode(supported_cache_modes)
1320 _verify_aio_mode(supported_aio_modes)
1321 _verify_formats(required_fmts)
1322 _verify_virtio_blk()
1324 return debug
1326 def execute_test(*args, test_function=None, **kwargs):
1327 """Run either unittest or script-style tests."""
1329 debug = execute_setup_common(*args, **kwargs)
1330 if not test_function:
1331 execute_unittest(debug)
1332 else:
1333 test_function()
1335 def activate_logging():
1336 """Activate iotests.log() output to stdout for script-style tests."""
1337 handler = logging.StreamHandler(stream=sys.stdout)
1338 formatter = logging.Formatter('%(message)s')
1339 handler.setFormatter(formatter)
1340 test_logger.addHandler(handler)
1341 test_logger.setLevel(logging.INFO)
1342 test_logger.propagate = False
1344 # This is called from script-style iotests without a single point of entry
1345 def script_initialize(*args, **kwargs):
1346 """Initialize script-style tests without running any tests."""
1347 activate_logging()
1348 execute_setup_common(*args, **kwargs)
1350 # This is called from script-style iotests with a single point of entry
1351 def script_main(test_function, *args, **kwargs):
1352 """Run script-style tests outside of the unittest framework"""
1353 activate_logging()
1354 execute_test(*args, test_function=test_function, **kwargs)
1356 # This is called from unittest style iotests
1357 def main(*args, **kwargs):
1358 """Run tests using the unittest framework"""
1359 execute_test(*args, **kwargs)