Docs/RCU: Correct sample code of qatomic_rcu_set
[qemu/ar7.git] / tests / qemu-iotests / iotests.py
blobdcdcd0387f20dca7e957930d557c46250a643bde
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import atexit
20 from collections import OrderedDict
21 import faulthandler
22 import io
23 import json
24 import logging
25 import os
26 import re
27 import signal
28 import struct
29 import subprocess
30 import sys
31 import time
32 from typing import (Any, Callable, Dict, Iterable,
33 List, Optional, Sequence, Tuple, TypeVar)
34 import unittest
36 from contextlib import contextmanager
38 # pylint: disable=import-error, wrong-import-position
39 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40 from qemu import qtest
41 from qemu.qmp import QMPMessage
43 # Use this logger for logging messages directly from the iotests module
44 logger = logging.getLogger('qemu.iotests')
45 logger.addHandler(logging.NullHandler())
47 # Use this logger for messages that ought to be used for diff output.
48 test_logger = logging.getLogger('qemu.iotests.diff_io')
51 faulthandler.enable()
53 # This will not work if arguments contain spaces but is necessary if we
54 # want to support the override options that ./check supports.
55 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
56 if os.environ.get('QEMU_IMG_OPTIONS'):
57 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
60 if os.environ.get('QEMU_IO_OPTIONS'):
61 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
65 qemu_io_args_no_fmt += \
66 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
69 qemu_nbd_args = [qemu_nbd_prog]
70 if os.environ.get('QEMU_NBD_OPTIONS'):
71 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
74 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76 imgfmt = os.environ.get('IMGFMT', 'raw')
77 imgproto = os.environ.get('IMGPROTO', 'file')
78 test_dir = os.environ.get('TEST_DIR')
79 sock_dir = os.environ.get('SOCK_DIR')
80 output_dir = os.environ.get('OUTPUT_DIR', '.')
81 cachemode = os.environ.get('CACHEMODE')
82 aiomode = os.environ.get('AIOMODE')
83 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
85 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
87 luks_default_secret_object = 'secret,id=keysec0,data=' + \
88 os.environ.get('IMGKEYSECRET', '')
89 luks_default_key_secret_opt = 'key-secret=keysec0'
92 def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
93 connect_stderr: bool = True) -> Tuple[str, int]:
94 """
95 Run a tool and return both its output and its exit code
96 """
97 stderr = subprocess.STDOUT if connect_stderr else None
98 subp = subprocess.Popen(args,
99 stdout=subprocess.PIPE,
100 stderr=stderr,
101 universal_newlines=True)
102 output = subp.communicate()[0]
103 if subp.returncode < 0:
104 sys.stderr.write('%s received signal %i: %s\n'
105 % (tool, -subp.returncode,
106 ' '.join(qemu_img_args + list(args))))
107 return (output, subp.returncode)
109 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
111 Run qemu-img and return both its output and its exit code
113 full_args = qemu_img_args + list(args)
114 return qemu_tool_pipe_and_status('qemu-img', full_args)
116 def qemu_img(*args: str) -> int:
117 '''Run qemu-img and return the exit code'''
118 return qemu_img_pipe_and_status(*args)[1]
120 def ordered_qmp(qmsg, conv_keys=True):
121 # Dictionaries are not ordered prior to 3.6, therefore:
122 if isinstance(qmsg, list):
123 return [ordered_qmp(atom) for atom in qmsg]
124 if isinstance(qmsg, dict):
125 od = OrderedDict()
126 for k, v in sorted(qmsg.items()):
127 if conv_keys:
128 k = k.replace('_', '-')
129 od[k] = ordered_qmp(v, conv_keys=False)
130 return od
131 return qmsg
133 def qemu_img_create(*args):
134 args = list(args)
136 # default luks support
137 if '-f' in args and args[args.index('-f') + 1] == 'luks':
138 if '-o' in args:
139 i = args.index('-o')
140 if 'key-secret' not in args[i + 1]:
141 args[i + 1].append(luks_default_key_secret_opt)
142 args.insert(i + 2, '--object')
143 args.insert(i + 3, luks_default_secret_object)
144 else:
145 args = ['-o', luks_default_key_secret_opt,
146 '--object', luks_default_secret_object] + args
148 args.insert(0, 'create')
150 return qemu_img(*args)
152 def qemu_img_measure(*args):
153 return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
155 def qemu_img_check(*args):
156 return json.loads(qemu_img_pipe("check", "--output", "json", *args))
158 def qemu_img_verbose(*args):
159 '''Run qemu-img without suppressing its output and return the exit code'''
160 exitcode = subprocess.call(qemu_img_args + list(args))
161 if exitcode < 0:
162 sys.stderr.write('qemu-img received signal %i: %s\n'
163 % (-exitcode, ' '.join(qemu_img_args + list(args))))
164 return exitcode
166 def qemu_img_pipe(*args: str) -> str:
167 '''Run qemu-img and return its output'''
168 return qemu_img_pipe_and_status(*args)[0]
170 def qemu_img_log(*args):
171 result = qemu_img_pipe(*args)
172 log(result, filters=[filter_testfiles])
173 return result
175 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
176 args = ['info']
177 if imgopts:
178 args.append('--image-opts')
179 else:
180 args += ['-f', imgfmt]
181 args += extra_args
182 args.append(filename)
184 output = qemu_img_pipe(*args)
185 if not filter_path:
186 filter_path = filename
187 log(filter_img_info(output, filter_path))
189 def qemu_io(*args):
190 '''Run qemu-io and return the stdout data'''
191 args = qemu_io_args + list(args)
192 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
193 stderr=subprocess.STDOUT,
194 universal_newlines=True)
195 output = subp.communicate()[0]
196 if subp.returncode < 0:
197 sys.stderr.write('qemu-io received signal %i: %s\n'
198 % (-subp.returncode, ' '.join(args)))
199 return output
201 def qemu_io_log(*args):
202 result = qemu_io(*args)
203 log(result, filters=[filter_testfiles, filter_qemu_io])
204 return result
206 def qemu_io_silent(*args):
207 '''Run qemu-io and return the exit code, suppressing stdout'''
208 if '-f' in args or '--image-opts' in args:
209 default_args = qemu_io_args_no_fmt
210 else:
211 default_args = qemu_io_args
213 args = default_args + list(args)
214 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
215 if exitcode < 0:
216 sys.stderr.write('qemu-io received signal %i: %s\n' %
217 (-exitcode, ' '.join(args)))
218 return exitcode
220 def qemu_io_silent_check(*args):
221 '''Run qemu-io and return the true if subprocess returned 0'''
222 args = qemu_io_args + list(args)
223 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
224 stderr=subprocess.STDOUT)
225 return exitcode == 0
227 def get_virtio_scsi_device():
228 if qemu_default_machine == 's390-ccw-virtio':
229 return 'virtio-scsi-ccw'
230 return 'virtio-scsi-pci'
232 class QemuIoInteractive:
233 def __init__(self, *args):
234 self.args = qemu_io_args_no_fmt + list(args)
235 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
236 stdout=subprocess.PIPE,
237 stderr=subprocess.STDOUT,
238 universal_newlines=True)
239 out = self._p.stdout.read(9)
240 if out != 'qemu-io> ':
241 # Most probably qemu-io just failed to start.
242 # Let's collect the whole output and exit.
243 out += self._p.stdout.read()
244 self._p.wait(timeout=1)
245 raise ValueError(out)
247 def close(self):
248 self._p.communicate('q\n')
250 def _read_output(self):
251 pattern = 'qemu-io> '
252 n = len(pattern)
253 pos = 0
254 s = []
255 while pos != n:
256 c = self._p.stdout.read(1)
257 # check unexpected EOF
258 assert c != ''
259 s.append(c)
260 if c == pattern[pos]:
261 pos += 1
262 else:
263 pos = 0
265 return ''.join(s[:-n])
267 def cmd(self, cmd):
268 # quit command is in close(), '\n' is added automatically
269 assert '\n' not in cmd
270 cmd = cmd.strip()
271 assert cmd not in ('q', 'quit')
272 self._p.stdin.write(cmd + '\n')
273 self._p.stdin.flush()
274 return self._read_output()
277 def qemu_nbd(*args):
278 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
279 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
281 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
282 '''Run qemu-nbd in daemon mode and return both the parent's exit code
283 and its output in case of an error'''
284 full_args = qemu_nbd_args + ['--fork'] + list(args)
285 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
286 connect_stderr=False)
287 return returncode, output if returncode else ''
289 def qemu_nbd_list_log(*args: str) -> str:
290 '''Run qemu-nbd to list remote exports'''
291 full_args = [qemu_nbd_prog, '-L'] + list(args)
292 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
293 log(output, filters=[filter_testfiles, filter_nbd_exports])
294 return output
296 @contextmanager
297 def qemu_nbd_popen(*args):
298 '''Context manager running qemu-nbd within the context'''
299 pid_file = file_path("pid")
301 cmd = list(qemu_nbd_args)
302 cmd.extend(('--persistent', '--pid-file', pid_file))
303 cmd.extend(args)
305 log('Start NBD server')
306 p = subprocess.Popen(cmd)
307 try:
308 while not os.path.exists(pid_file):
309 if p.poll() is not None:
310 raise RuntimeError(
311 "qemu-nbd terminated with exit code {}: {}"
312 .format(p.returncode, ' '.join(cmd)))
314 time.sleep(0.01)
315 yield
316 finally:
317 log('Kill NBD server')
318 p.kill()
319 p.wait()
321 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
322 '''Return True if two image files are identical'''
323 return qemu_img('compare', '-f', fmt1,
324 '-F', fmt2, img1, img2) == 0
326 def create_image(name, size):
327 '''Create a fully-allocated raw image with sector markers'''
328 file = open(name, 'wb')
329 i = 0
330 while i < size:
331 sector = struct.pack('>l504xl', i // 512, i // 512)
332 file.write(sector)
333 i = i + 512
334 file.close()
336 def image_size(img):
337 '''Return image's virtual size'''
338 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
339 return json.loads(r)['virtual-size']
341 def is_str(val):
342 return isinstance(val, str)
344 test_dir_re = re.compile(r"%s" % test_dir)
345 def filter_test_dir(msg):
346 return test_dir_re.sub("TEST_DIR", msg)
348 win32_re = re.compile(r"\r")
349 def filter_win32(msg):
350 return win32_re.sub("", msg)
352 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
353 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
354 r"and [0-9\/.inf]* ops\/sec\)")
355 def filter_qemu_io(msg):
356 msg = filter_win32(msg)
357 return qemu_io_re.sub("X ops; XX:XX:XX.X "
358 "(XXX YYY/sec and XXX ops/sec)", msg)
360 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
361 def filter_chown(msg):
362 return chown_re.sub("chown UID:GID", msg)
364 def filter_qmp_event(event):
365 '''Filter a QMP event dict'''
366 event = dict(event)
367 if 'timestamp' in event:
368 event['timestamp']['seconds'] = 'SECS'
369 event['timestamp']['microseconds'] = 'USECS'
370 return event
372 def filter_qmp(qmsg, filter_fn):
373 '''Given a string filter, filter a QMP object's values.
374 filter_fn takes a (key, value) pair.'''
375 # Iterate through either lists or dicts;
376 if isinstance(qmsg, list):
377 items = enumerate(qmsg)
378 else:
379 items = qmsg.items()
381 for k, v in items:
382 if isinstance(v, (dict, list)):
383 qmsg[k] = filter_qmp(v, filter_fn)
384 else:
385 qmsg[k] = filter_fn(k, v)
386 return qmsg
388 def filter_testfiles(msg):
389 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
390 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
391 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
393 def filter_qmp_testfiles(qmsg):
394 def _filter(_key, value):
395 if is_str(value):
396 return filter_testfiles(value)
397 return value
398 return filter_qmp(qmsg, _filter)
400 def filter_virtio_scsi(output: str) -> str:
401 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
403 def filter_qmp_virtio_scsi(qmsg):
404 def _filter(_key, value):
405 if is_str(value):
406 return filter_virtio_scsi(value)
407 return value
408 return filter_qmp(qmsg, _filter)
410 def filter_generated_node_ids(msg):
411 return re.sub("#block[0-9]+", "NODE_NAME", msg)
413 def filter_img_info(output, filename):
414 lines = []
415 for line in output.split('\n'):
416 if 'disk size' in line or 'actual-size' in line:
417 continue
418 line = line.replace(filename, 'TEST_IMG')
419 line = filter_testfiles(line)
420 line = line.replace(imgfmt, 'IMGFMT')
421 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
422 line = re.sub('uuid: [-a-f0-9]+',
423 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
424 line)
425 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
426 lines.append(line)
427 return '\n'.join(lines)
429 def filter_imgfmt(msg):
430 return msg.replace(imgfmt, 'IMGFMT')
432 def filter_qmp_imgfmt(qmsg):
433 def _filter(_key, value):
434 if is_str(value):
435 return filter_imgfmt(value)
436 return value
437 return filter_qmp(qmsg, _filter)
439 def filter_nbd_exports(output: str) -> str:
440 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
443 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
445 def log(msg: Msg,
446 filters: Iterable[Callable[[Msg], Msg]] = (),
447 indent: Optional[int] = None) -> None:
449 Logs either a string message or a JSON serializable message (like QMP).
450 If indent is provided, JSON serializable messages are pretty-printed.
452 for flt in filters:
453 msg = flt(msg)
454 if isinstance(msg, (dict, list)):
455 # Don't sort if it's already sorted
456 do_sort = not isinstance(msg, OrderedDict)
457 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
458 else:
459 test_logger.info(msg)
461 class Timeout:
462 def __init__(self, seconds, errmsg="Timeout"):
463 self.seconds = seconds
464 self.errmsg = errmsg
465 def __enter__(self):
466 signal.signal(signal.SIGALRM, self.timeout)
467 signal.setitimer(signal.ITIMER_REAL, self.seconds)
468 return self
469 def __exit__(self, exc_type, value, traceback):
470 signal.setitimer(signal.ITIMER_REAL, 0)
471 return False
472 def timeout(self, signum, frame):
473 raise Exception(self.errmsg)
475 def file_pattern(name):
476 return "{0}-{1}".format(os.getpid(), name)
478 class FilePath:
480 Context manager generating multiple file names. The generated files are
481 removed when exiting the context.
483 Example usage:
485 with FilePath('a.img', 'b.img') as (img_a, img_b):
486 # Use img_a and img_b here...
488 # a.img and b.img are automatically removed here.
490 By default images are created in iotests.test_dir. To create sockets use
491 iotests.sock_dir:
493 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
495 For convenience, calling with one argument yields a single file instead of
496 a tuple with one item.
499 def __init__(self, *names, base_dir=test_dir):
500 self.paths = [os.path.join(base_dir, file_pattern(name))
501 for name in names]
503 def __enter__(self):
504 if len(self.paths) == 1:
505 return self.paths[0]
506 else:
507 return self.paths
509 def __exit__(self, exc_type, exc_val, exc_tb):
510 for path in self.paths:
511 try:
512 os.remove(path)
513 except OSError:
514 pass
515 return False
518 def file_path_remover():
519 for path in reversed(file_path_remover.paths):
520 try:
521 os.remove(path)
522 except OSError:
523 pass
526 def file_path(*names, base_dir=test_dir):
527 ''' Another way to get auto-generated filename that cleans itself up.
529 Use is as simple as:
531 img_a, img_b = file_path('a.img', 'b.img')
532 sock = file_path('socket')
535 if not hasattr(file_path_remover, 'paths'):
536 file_path_remover.paths = []
537 atexit.register(file_path_remover)
539 paths = []
540 for name in names:
541 filename = file_pattern(name)
542 path = os.path.join(base_dir, filename)
543 file_path_remover.paths.append(path)
544 paths.append(path)
546 return paths[0] if len(paths) == 1 else paths
548 def remote_filename(path):
549 if imgproto == 'file':
550 return path
551 elif imgproto == 'ssh':
552 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
553 else:
554 raise Exception("Protocol %s not supported" % (imgproto))
556 class VM(qtest.QEMUQtestMachine):
557 '''A QEMU VM'''
559 def __init__(self, path_suffix=''):
560 name = "qemu%s-%d" % (path_suffix, os.getpid())
561 super().__init__(qemu_prog, qemu_opts, name=name,
562 test_dir=test_dir,
563 socket_scm_helper=socket_scm_helper,
564 sock_dir=sock_dir)
565 self._num_drives = 0
567 def add_object(self, opts):
568 self._args.append('-object')
569 self._args.append(opts)
570 return self
572 def add_device(self, opts):
573 self._args.append('-device')
574 self._args.append(opts)
575 return self
577 def add_drive_raw(self, opts):
578 self._args.append('-drive')
579 self._args.append(opts)
580 return self
582 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
583 '''Add a virtio-blk drive to the VM'''
584 options = ['if=%s' % interface,
585 'id=drive%d' % self._num_drives]
587 if path is not None:
588 options.append('file=%s' % path)
589 options.append('format=%s' % img_format)
590 options.append('cache=%s' % cachemode)
591 options.append('aio=%s' % aiomode)
593 if opts:
594 options.append(opts)
596 if img_format == 'luks' and 'key-secret' not in opts:
597 # default luks support
598 if luks_default_secret_object not in self._args:
599 self.add_object(luks_default_secret_object)
601 options.append(luks_default_key_secret_opt)
603 self._args.append('-drive')
604 self._args.append(','.join(options))
605 self._num_drives += 1
606 return self
608 def add_blockdev(self, opts):
609 self._args.append('-blockdev')
610 if isinstance(opts, str):
611 self._args.append(opts)
612 else:
613 self._args.append(','.join(opts))
614 return self
616 def add_incoming(self, addr):
617 self._args.append('-incoming')
618 self._args.append(addr)
619 return self
621 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
622 cmd = 'human-monitor-command'
623 kwargs: Dict[str, Any] = {'command-line': command_line}
624 if use_log:
625 return self.qmp_log(cmd, **kwargs)
626 else:
627 return self.qmp(cmd, **kwargs)
629 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
630 """Pause drive r/w operations"""
631 if not event:
632 self.pause_drive(drive, "read_aio")
633 self.pause_drive(drive, "write_aio")
634 return
635 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
637 def resume_drive(self, drive: str) -> None:
638 """Resume drive r/w operations"""
639 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
641 def hmp_qemu_io(self, drive: str, cmd: str,
642 use_log: bool = False) -> QMPMessage:
643 """Write to a given drive using an HMP command"""
644 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
646 def flatten_qmp_object(self, obj, output=None, basestr=''):
647 if output is None:
648 output = dict()
649 if isinstance(obj, list):
650 for i, item in enumerate(obj):
651 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
652 elif isinstance(obj, dict):
653 for key in obj:
654 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
655 else:
656 output[basestr[:-1]] = obj # Strip trailing '.'
657 return output
659 def qmp_to_opts(self, obj):
660 obj = self.flatten_qmp_object(obj)
661 output_list = list()
662 for key in obj:
663 output_list += [key + '=' + obj[key]]
664 return ','.join(output_list)
666 def get_qmp_events_filtered(self, wait=60.0):
667 result = []
668 for ev in self.get_qmp_events(wait=wait):
669 result.append(filter_qmp_event(ev))
670 return result
672 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
673 full_cmd = OrderedDict((
674 ("execute", cmd),
675 ("arguments", ordered_qmp(kwargs))
677 log(full_cmd, filters, indent=indent)
678 result = self.qmp(cmd, **kwargs)
679 log(result, filters, indent=indent)
680 return result
682 # Returns None on success, and an error string on failure
683 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
684 pre_finalize=None, cancel=False, wait=60.0):
686 run_job moves a job from creation through to dismissal.
688 :param job: String. ID of recently-launched job
689 :param auto_finalize: Bool. True if the job was launched with
690 auto_finalize. Defaults to True.
691 :param auto_dismiss: Bool. True if the job was launched with
692 auto_dismiss=True. Defaults to False.
693 :param pre_finalize: Callback. A callable that takes no arguments to be
694 invoked prior to issuing job-finalize, if any.
695 :param cancel: Bool. When true, cancels the job after the pre_finalize
696 callback.
697 :param wait: Float. Timeout value specifying how long to wait for any
698 event, in seconds. Defaults to 60.0.
700 match_device = {'data': {'device': job}}
701 match_id = {'data': {'id': job}}
702 events = [
703 ('BLOCK_JOB_COMPLETED', match_device),
704 ('BLOCK_JOB_CANCELLED', match_device),
705 ('BLOCK_JOB_ERROR', match_device),
706 ('BLOCK_JOB_READY', match_device),
707 ('BLOCK_JOB_PENDING', match_id),
708 ('JOB_STATUS_CHANGE', match_id)
710 error = None
711 while True:
712 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
713 if ev['event'] != 'JOB_STATUS_CHANGE':
714 log(ev)
715 continue
716 status = ev['data']['status']
717 if status == 'aborting':
718 result = self.qmp('query-jobs')
719 for j in result['return']:
720 if j['id'] == job:
721 error = j['error']
722 log('Job failed: %s' % (j['error']))
723 elif status == 'ready':
724 self.qmp_log('job-complete', id=job)
725 elif status == 'pending' and not auto_finalize:
726 if pre_finalize:
727 pre_finalize()
728 if cancel:
729 self.qmp_log('job-cancel', id=job)
730 else:
731 self.qmp_log('job-finalize', id=job)
732 elif status == 'concluded' and not auto_dismiss:
733 self.qmp_log('job-dismiss', id=job)
734 elif status == 'null':
735 return error
737 # Returns None on success, and an error string on failure
738 def blockdev_create(self, options, job_id='job0', filters=None):
739 if filters is None:
740 filters = [filter_qmp_testfiles]
741 result = self.qmp_log('blockdev-create', filters=filters,
742 job_id=job_id, options=options)
744 if 'return' in result:
745 assert result['return'] == {}
746 job_result = self.run_job(job_id)
747 else:
748 job_result = result['error']
750 log("")
751 return job_result
753 def enable_migration_events(self, name):
754 log('Enabling migration QMP events on %s...' % name)
755 log(self.qmp('migrate-set-capabilities', capabilities=[
757 'capability': 'events',
758 'state': True
762 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
763 while True:
764 event = self.event_wait('MIGRATION')
765 # We use the default timeout, and with a timeout, event_wait()
766 # never returns None
767 assert event
769 log(event, filters=[filter_qmp_event])
770 if event['data']['status'] in ('completed', 'failed'):
771 break
773 if event['data']['status'] == 'completed':
774 # The event may occur in finish-migrate, so wait for the expected
775 # post-migration runstate
776 runstate = None
777 while runstate != expect_runstate:
778 runstate = self.qmp('query-status')['return']['status']
779 return True
780 else:
781 return False
783 def node_info(self, node_name):
784 nodes = self.qmp('query-named-block-nodes')
785 for x in nodes['return']:
786 if x['node-name'] == node_name:
787 return x
788 return None
790 def query_bitmaps(self):
791 res = self.qmp("query-named-block-nodes")
792 return {device['node-name']: device['dirty-bitmaps']
793 for device in res['return'] if 'dirty-bitmaps' in device}
795 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
797 get a specific bitmap from the object returned by query_bitmaps.
798 :param recording: If specified, filter results by the specified value.
799 :param bitmaps: If specified, use it instead of call query_bitmaps()
801 if bitmaps is None:
802 bitmaps = self.query_bitmaps()
804 for bitmap in bitmaps[node_name]:
805 if bitmap.get('name', '') == bitmap_name:
806 if recording is None or bitmap.get('recording') == recording:
807 return bitmap
808 return None
810 def check_bitmap_status(self, node_name, bitmap_name, fields):
811 ret = self.get_bitmap(node_name, bitmap_name)
813 return fields.items() <= ret.items()
815 def assert_block_path(self, root, path, expected_node, graph=None):
817 Check whether the node under the given path in the block graph
818 is @expected_node.
820 @root is the node name of the node where the @path is rooted.
822 @path is a string that consists of child names separated by
823 slashes. It must begin with a slash.
825 Examples for @root + @path:
826 - root="qcow2-node", path="/backing/file"
827 - root="quorum-node", path="/children.2/file"
829 Hypothetically, @path could be empty, in which case it would
830 point to @root. However, in practice this case is not useful
831 and hence not allowed.
833 @expected_node may be None. (All elements of the path but the
834 leaf must still exist.)
836 @graph may be None or the result of an x-debug-query-block-graph
837 call that has already been performed.
839 if graph is None:
840 graph = self.qmp('x-debug-query-block-graph')['return']
842 iter_path = iter(path.split('/'))
844 # Must start with a /
845 assert next(iter_path) == ''
847 node = next((node for node in graph['nodes'] if node['name'] == root),
848 None)
850 # An empty @path is not allowed, so the root node must be present
851 assert node is not None, 'Root node %s not found' % root
853 for child_name in iter_path:
854 assert node is not None, 'Cannot follow path %s%s' % (root, path)
856 try:
857 node_id = next(edge['child'] for edge in graph['edges']
858 if (edge['parent'] == node['id'] and
859 edge['name'] == child_name))
861 node = next(node for node in graph['nodes']
862 if node['id'] == node_id)
864 except StopIteration:
865 node = None
867 if node is None:
868 assert expected_node is None, \
869 'No node found under %s (but expected %s)' % \
870 (path, expected_node)
871 else:
872 assert node['name'] == expected_node, \
873 'Found node %s under %s (but expected %s)' % \
874 (node['name'], path, expected_node)
876 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
878 class QMPTestCase(unittest.TestCase):
879 '''Abstract base class for QMP test cases'''
881 def __init__(self, *args, **kwargs):
882 super().__init__(*args, **kwargs)
883 # Many users of this class set a VM property we rely on heavily
884 # in the methods below.
885 self.vm = None
887 def dictpath(self, d, path):
888 '''Traverse a path in a nested dict'''
889 for component in path.split('/'):
890 m = index_re.match(component)
891 if m:
892 component, idx = m.groups()
893 idx = int(idx)
895 if not isinstance(d, dict) or component not in d:
896 self.fail(f'failed path traversal for "{path}" in "{d}"')
897 d = d[component]
899 if m:
900 if not isinstance(d, list):
901 self.fail(f'path component "{component}" in "{path}" '
902 f'is not a list in "{d}"')
903 try:
904 d = d[idx]
905 except IndexError:
906 self.fail(f'invalid index "{idx}" in path "{path}" '
907 f'in "{d}"')
908 return d
910 def assert_qmp_absent(self, d, path):
911 try:
912 result = self.dictpath(d, path)
913 except AssertionError:
914 return
915 self.fail('path "%s" has value "%s"' % (path, str(result)))
917 def assert_qmp(self, d, path, value):
918 '''Assert that the value for a specific path in a QMP dict
919 matches. When given a list of values, assert that any of
920 them matches.'''
922 result = self.dictpath(d, path)
924 # [] makes no sense as a list of valid values, so treat it as
925 # an actual single value.
926 if isinstance(value, list) and value != []:
927 for v in value:
928 if result == v:
929 return
930 self.fail('no match for "%s" in %s' % (str(result), str(value)))
931 else:
932 self.assertEqual(result, value,
933 '"%s" is "%s", expected "%s"'
934 % (path, str(result), str(value)))
936 def assert_no_active_block_jobs(self):
937 result = self.vm.qmp('query-block-jobs')
938 self.assert_qmp(result, 'return', [])
940 def assert_has_block_node(self, node_name=None, file_name=None):
941 """Issue a query-named-block-nodes and assert node_name and/or
942 file_name is present in the result"""
943 def check_equal_or_none(a, b):
944 return a is None or b is None or a == b
945 assert node_name or file_name
946 result = self.vm.qmp('query-named-block-nodes')
947 for x in result["return"]:
948 if check_equal_or_none(x.get("node-name"), node_name) and \
949 check_equal_or_none(x.get("file"), file_name):
950 return
951 self.fail("Cannot find %s %s in result:\n%s" %
952 (node_name, file_name, result))
954 def assert_json_filename_equal(self, json_filename, reference):
955 '''Asserts that the given filename is a json: filename and that its
956 content is equal to the given reference object'''
957 self.assertEqual(json_filename[:5], 'json:')
958 self.assertEqual(
959 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
960 self.vm.flatten_qmp_object(reference)
963 def cancel_and_wait(self, drive='drive0', force=False,
964 resume=False, wait=60.0):
965 '''Cancel a block job and wait for it to finish, returning the event'''
966 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
967 self.assert_qmp(result, 'return', {})
969 if resume:
970 self.vm.resume_drive(drive)
972 cancelled = False
973 result = None
974 while not cancelled:
975 for event in self.vm.get_qmp_events(wait=wait):
976 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
977 event['event'] == 'BLOCK_JOB_CANCELLED':
978 self.assert_qmp(event, 'data/device', drive)
979 result = event
980 cancelled = True
981 elif event['event'] == 'JOB_STATUS_CHANGE':
982 self.assert_qmp(event, 'data/id', drive)
985 self.assert_no_active_block_jobs()
986 return result
988 def wait_until_completed(self, drive='drive0', check_offset=True,
989 wait=60.0, error=None):
990 '''Wait for a block job to finish, returning the event'''
991 while True:
992 for event in self.vm.get_qmp_events(wait=wait):
993 if event['event'] == 'BLOCK_JOB_COMPLETED':
994 self.assert_qmp(event, 'data/device', drive)
995 if error is None:
996 self.assert_qmp_absent(event, 'data/error')
997 if check_offset:
998 self.assert_qmp(event, 'data/offset',
999 event['data']['len'])
1000 else:
1001 self.assert_qmp(event, 'data/error', error)
1002 self.assert_no_active_block_jobs()
1003 return event
1004 if event['event'] == 'JOB_STATUS_CHANGE':
1005 self.assert_qmp(event, 'data/id', drive)
1007 def wait_ready(self, drive='drive0'):
1008 """Wait until a BLOCK_JOB_READY event, and return the event."""
1009 return self.vm.events_wait([
1010 ('BLOCK_JOB_READY',
1011 {'data': {'type': 'mirror', 'device': drive}}),
1012 ('BLOCK_JOB_READY',
1013 {'data': {'type': 'commit', 'device': drive}})
1016 def wait_ready_and_cancel(self, drive='drive0'):
1017 self.wait_ready(drive=drive)
1018 event = self.cancel_and_wait(drive=drive)
1019 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1020 self.assert_qmp(event, 'data/type', 'mirror')
1021 self.assert_qmp(event, 'data/offset', event['data']['len'])
1023 def complete_and_wait(self, drive='drive0', wait_ready=True,
1024 completion_error=None):
1025 '''Complete a block job and wait for it to finish'''
1026 if wait_ready:
1027 self.wait_ready(drive=drive)
1029 result = self.vm.qmp('block-job-complete', device=drive)
1030 self.assert_qmp(result, 'return', {})
1032 event = self.wait_until_completed(drive=drive, error=completion_error)
1033 self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1035 def pause_wait(self, job_id='job0'):
1036 with Timeout(3, "Timeout waiting for job to pause"):
1037 while True:
1038 result = self.vm.qmp('query-block-jobs')
1039 found = False
1040 for job in result['return']:
1041 if job['device'] == job_id:
1042 found = True
1043 if job['paused'] and not job['busy']:
1044 return job
1045 break
1046 assert found
1048 def pause_job(self, job_id='job0', wait=True):
1049 result = self.vm.qmp('block-job-pause', device=job_id)
1050 self.assert_qmp(result, 'return', {})
1051 if wait:
1052 return self.pause_wait(job_id)
1053 return result
1055 def case_skip(self, reason):
1056 '''Skip this test case'''
1057 case_notrun(reason)
1058 self.skipTest(reason)
1061 def notrun(reason):
1062 '''Skip this test suite'''
1063 # Each test in qemu-iotests has a number ("seq")
1064 seq = os.path.basename(sys.argv[0])
1066 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1067 logger.warning("%s not run: %s", seq, reason)
1068 sys.exit(0)
1070 def case_notrun(reason):
1071 '''Mark this test case as not having been run (without actually
1072 skipping it, that is left to the caller). See
1073 QMPTestCase.case_skip() for a variant that actually skips the
1074 current test case.'''
1076 # Each test in qemu-iotests has a number ("seq")
1077 seq = os.path.basename(sys.argv[0])
1079 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1080 ' [case not run] ' + reason + '\n')
1082 def _verify_image_format(supported_fmts: Sequence[str] = (),
1083 unsupported_fmts: Sequence[str] = ()) -> None:
1084 if 'generic' in supported_fmts and \
1085 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1086 # similar to
1087 # _supported_fmt generic
1088 # for bash tests
1089 supported_fmts = ()
1091 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1092 if not_sup or (imgfmt in unsupported_fmts):
1093 notrun('not suitable for this image format: %s' % imgfmt)
1095 if imgfmt == 'luks':
1096 verify_working_luks()
1098 def _verify_protocol(supported: Sequence[str] = (),
1099 unsupported: Sequence[str] = ()) -> None:
1100 assert not (supported and unsupported)
1102 if 'generic' in supported:
1103 return
1105 not_sup = supported and (imgproto not in supported)
1106 if not_sup or (imgproto in unsupported):
1107 notrun('not suitable for this protocol: %s' % imgproto)
1109 def _verify_platform(supported: Sequence[str] = (),
1110 unsupported: Sequence[str] = ()) -> None:
1111 if any((sys.platform.startswith(x) for x in unsupported)):
1112 notrun('not suitable for this OS: %s' % sys.platform)
1114 if supported:
1115 if not any((sys.platform.startswith(x) for x in supported)):
1116 notrun('not suitable for this OS: %s' % sys.platform)
1118 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1119 if supported_cache_modes and (cachemode not in supported_cache_modes):
1120 notrun('not suitable for this cache mode: %s' % cachemode)
1122 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1123 if supported_aio_modes and (aiomode not in supported_aio_modes):
1124 notrun('not suitable for this aio mode: %s' % aiomode)
1126 def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1127 usf_list = list(set(required_formats) - set(supported_formats()))
1128 if usf_list:
1129 notrun(f'formats {usf_list} are not whitelisted')
1131 def supports_quorum():
1132 return 'quorum' in qemu_img_pipe('--help')
1134 def verify_quorum():
1135 '''Skip test suite if quorum support is not available'''
1136 if not supports_quorum():
1137 notrun('quorum support missing')
1139 def has_working_luks() -> Tuple[bool, str]:
1141 Check whether our LUKS driver can actually create images
1142 (this extends to LUKS encryption for qcow2).
1144 If not, return the reason why.
1147 img_file = f'{test_dir}/luks-test.luks'
1148 (output, status) = \
1149 qemu_img_pipe_and_status('create', '-f', 'luks',
1150 '--object', luks_default_secret_object,
1151 '-o', luks_default_key_secret_opt,
1152 '-o', 'iter-time=10',
1153 img_file, '1G')
1154 try:
1155 os.remove(img_file)
1156 except OSError:
1157 pass
1159 if status != 0:
1160 reason = output
1161 for line in output.splitlines():
1162 if img_file + ':' in line:
1163 reason = line.split(img_file + ':', 1)[1].strip()
1164 break
1166 return (False, reason)
1167 else:
1168 return (True, '')
1170 def verify_working_luks():
1172 Skip test suite if LUKS does not work
1174 (working, reason) = has_working_luks()
1175 if not working:
1176 notrun(reason)
1178 def qemu_pipe(*args: str) -> str:
1180 Run qemu with an option to print something and exit (e.g. a help option).
1182 :return: QEMU's stdout output.
1184 full_args = [qemu_prog] + qemu_opts + list(args)
1185 output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1186 return output
1188 def supported_formats(read_only=False):
1189 '''Set 'read_only' to True to check ro-whitelist
1190 Otherwise, rw-whitelist is checked'''
1192 if not hasattr(supported_formats, "formats"):
1193 supported_formats.formats = {}
1195 if read_only not in supported_formats.formats:
1196 format_message = qemu_pipe("-drive", "format=help")
1197 line = 1 if read_only else 0
1198 supported_formats.formats[read_only] = \
1199 format_message.splitlines()[line].split(":")[1].split()
1201 return supported_formats.formats[read_only]
1203 def skip_if_unsupported(required_formats=(), read_only=False):
1204 '''Skip Test Decorator
1205 Runs the test if all the required formats are whitelisted'''
1206 def skip_test_decorator(func):
1207 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1208 **kwargs: Dict[str, Any]) -> None:
1209 if callable(required_formats):
1210 fmts = required_formats(test_case)
1211 else:
1212 fmts = required_formats
1214 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1215 if usf_list:
1216 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1217 test_case.case_skip(msg)
1218 else:
1219 func(test_case, *args, **kwargs)
1220 return func_wrapper
1221 return skip_test_decorator
1223 def skip_for_formats(formats: Sequence[str] = ()) \
1224 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1225 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1226 '''Skip Test Decorator
1227 Skips the test for the given formats'''
1228 def skip_test_decorator(func):
1229 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1230 **kwargs: Dict[str, Any]) -> None:
1231 if imgfmt in formats:
1232 msg = f'{test_case}: Skipped for format {imgfmt}'
1233 test_case.case_skip(msg)
1234 else:
1235 func(test_case, *args, **kwargs)
1236 return func_wrapper
1237 return skip_test_decorator
1239 def skip_if_user_is_root(func):
1240 '''Skip Test Decorator
1241 Runs the test only without root permissions'''
1242 def func_wrapper(*args, **kwargs):
1243 if os.getuid() == 0:
1244 case_notrun('{}: cannot be run as root'.format(args[0]))
1245 return None
1246 else:
1247 return func(*args, **kwargs)
1248 return func_wrapper
1250 def execute_unittest(debug=False):
1251 """Executes unittests within the calling module."""
1253 verbosity = 2 if debug else 1
1255 if debug:
1256 output = sys.stdout
1257 else:
1258 # We need to filter out the time taken from the output so that
1259 # qemu-iotest can reliably diff the results against master output.
1260 output = io.StringIO()
1262 runner = unittest.TextTestRunner(stream=output, descriptions=True,
1263 verbosity=verbosity)
1264 try:
1265 # unittest.main() will use sys.exit(); so expect a SystemExit
1266 # exception
1267 unittest.main(testRunner=runner)
1268 finally:
1269 # We need to filter out the time taken from the output so that
1270 # qemu-iotest can reliably diff the results against master output.
1271 if not debug:
1272 out = output.getvalue()
1273 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1275 # Hide skipped tests from the reference output
1276 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1277 out_first_line, out_rest = out.split('\n', 1)
1278 out = out_first_line.replace('s', '.') + '\n' + out_rest
1280 sys.stderr.write(out)
1282 def execute_setup_common(supported_fmts: Sequence[str] = (),
1283 supported_platforms: Sequence[str] = (),
1284 supported_cache_modes: Sequence[str] = (),
1285 supported_aio_modes: Sequence[str] = (),
1286 unsupported_fmts: Sequence[str] = (),
1287 supported_protocols: Sequence[str] = (),
1288 unsupported_protocols: Sequence[str] = (),
1289 required_fmts: Sequence[str] = ()) -> bool:
1291 Perform necessary setup for either script-style or unittest-style tests.
1293 :return: Bool; Whether or not debug mode has been requested via the CLI.
1295 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1297 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1298 # indicate that we're not being run via "check". There may be
1299 # other things set up by "check" that individual test cases rely
1300 # on.
1301 if test_dir is None or qemu_default_machine is None:
1302 sys.stderr.write('Please run this test via the "check" script\n')
1303 sys.exit(os.EX_USAGE)
1305 debug = '-d' in sys.argv
1306 if debug:
1307 sys.argv.remove('-d')
1308 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1310 _verify_image_format(supported_fmts, unsupported_fmts)
1311 _verify_protocol(supported_protocols, unsupported_protocols)
1312 _verify_platform(supported=supported_platforms)
1313 _verify_cache_mode(supported_cache_modes)
1314 _verify_aio_mode(supported_aio_modes)
1315 _verify_formats(required_fmts)
1317 return debug
1319 def execute_test(*args, test_function=None, **kwargs):
1320 """Run either unittest or script-style tests."""
1322 debug = execute_setup_common(*args, **kwargs)
1323 if not test_function:
1324 execute_unittest(debug)
1325 else:
1326 test_function()
1328 def activate_logging():
1329 """Activate iotests.log() output to stdout for script-style tests."""
1330 handler = logging.StreamHandler(stream=sys.stdout)
1331 formatter = logging.Formatter('%(message)s')
1332 handler.setFormatter(formatter)
1333 test_logger.addHandler(handler)
1334 test_logger.setLevel(logging.INFO)
1335 test_logger.propagate = False
1337 # This is called from script-style iotests without a single point of entry
1338 def script_initialize(*args, **kwargs):
1339 """Initialize script-style tests without running any tests."""
1340 activate_logging()
1341 execute_setup_common(*args, **kwargs)
1343 # This is called from script-style iotests with a single point of entry
1344 def script_main(test_function, *args, **kwargs):
1345 """Run script-style tests outside of the unittest framework"""
1346 activate_logging()
1347 execute_test(*args, test_function=test_function, **kwargs)
1349 # This is called from unittest style iotests
1350 def main(*args, **kwargs):
1351 """Run tests using the unittest framework"""
1352 execute_test(*args, **kwargs)