iotests: Move try_remove to iotests.py
[qemu.git] / tests / qemu-iotests / iotests.py
blob335e6feb70328a238a36ecd257b4672b456e0b73
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import atexit
20 from collections import OrderedDict
21 import faulthandler
22 import io
23 import json
24 import logging
25 import os
26 import re
27 import signal
28 import struct
29 import subprocess
30 import sys
31 import time
32 from typing import (Any, Callable, Dict, Iterable,
33 List, Optional, Sequence, Tuple, TypeVar)
34 import unittest
36 from contextlib import contextmanager
38 # pylint: disable=import-error, wrong-import-position
39 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40 from qemu import qtest
41 from qemu.qmp import QMPMessage
43 # Use this logger for logging messages directly from the iotests module
44 logger = logging.getLogger('qemu.iotests')
45 logger.addHandler(logging.NullHandler())
47 # Use this logger for messages that ought to be used for diff output.
48 test_logger = logging.getLogger('qemu.iotests.diff_io')
51 faulthandler.enable()
53 # This will not work if arguments contain spaces but is necessary if we
54 # want to support the override options that ./check supports.
55 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
56 if os.environ.get('QEMU_IMG_OPTIONS'):
57 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
60 if os.environ.get('QEMU_IO_OPTIONS'):
61 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
65 qemu_io_args_no_fmt += \
66 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
69 qemu_nbd_args = [qemu_nbd_prog]
70 if os.environ.get('QEMU_NBD_OPTIONS'):
71 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
74 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76 imgfmt = os.environ.get('IMGFMT', 'raw')
77 imgproto = os.environ.get('IMGPROTO', 'file')
78 output_dir = os.environ.get('OUTPUT_DIR', '.')
80 try:
81 test_dir = os.environ['TEST_DIR']
82 sock_dir = os.environ['SOCK_DIR']
83 cachemode = os.environ['CACHEMODE']
84 aiomode = os.environ['AIOMODE']
85 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
86 except KeyError:
87 # We are using these variables as proxies to indicate that we're
88 # not being run via "check". There may be other things set up by
89 # "check" that individual test cases rely on.
90 sys.stderr.write('Please run this test via the "check" script\n')
91 sys.exit(os.EX_USAGE)
93 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
95 luks_default_secret_object = 'secret,id=keysec0,data=' + \
96 os.environ.get('IMGKEYSECRET', '')
97 luks_default_key_secret_opt = 'key-secret=keysec0'
100 def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
101 connect_stderr: bool = True) -> Tuple[str, int]:
103 Run a tool and return both its output and its exit code
105 stderr = subprocess.STDOUT if connect_stderr else None
106 subp = subprocess.Popen(args,
107 stdout=subprocess.PIPE,
108 stderr=stderr,
109 universal_newlines=True)
110 output = subp.communicate()[0]
111 if subp.returncode < 0:
112 cmd = ' '.join(args)
113 sys.stderr.write(f'{tool} received signal {-subp.returncode}: {cmd}\n')
114 return (output, subp.returncode)
116 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
118 Run qemu-img and return both its output and its exit code
120 full_args = qemu_img_args + list(args)
121 return qemu_tool_pipe_and_status('qemu-img', full_args)
123 def qemu_img(*args: str) -> int:
124 '''Run qemu-img and return the exit code'''
125 return qemu_img_pipe_and_status(*args)[1]
127 def ordered_qmp(qmsg, conv_keys=True):
128 # Dictionaries are not ordered prior to 3.6, therefore:
129 if isinstance(qmsg, list):
130 return [ordered_qmp(atom) for atom in qmsg]
131 if isinstance(qmsg, dict):
132 od = OrderedDict()
133 for k, v in sorted(qmsg.items()):
134 if conv_keys:
135 k = k.replace('_', '-')
136 od[k] = ordered_qmp(v, conv_keys=False)
137 return od
138 return qmsg
140 def qemu_img_create(*args):
141 args = list(args)
143 # default luks support
144 if '-f' in args and args[args.index('-f') + 1] == 'luks':
145 if '-o' in args:
146 i = args.index('-o')
147 if 'key-secret' not in args[i + 1]:
148 args[i + 1].append(luks_default_key_secret_opt)
149 args.insert(i + 2, '--object')
150 args.insert(i + 3, luks_default_secret_object)
151 else:
152 args = ['-o', luks_default_key_secret_opt,
153 '--object', luks_default_secret_object] + args
155 args.insert(0, 'create')
157 return qemu_img(*args)
159 def qemu_img_measure(*args):
160 return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
162 def qemu_img_check(*args):
163 return json.loads(qemu_img_pipe("check", "--output", "json", *args))
165 def qemu_img_verbose(*args):
166 '''Run qemu-img without suppressing its output and return the exit code'''
167 exitcode = subprocess.call(qemu_img_args + list(args))
168 if exitcode < 0:
169 sys.stderr.write('qemu-img received signal %i: %s\n'
170 % (-exitcode, ' '.join(qemu_img_args + list(args))))
171 return exitcode
173 def qemu_img_pipe(*args: str) -> str:
174 '''Run qemu-img and return its output'''
175 return qemu_img_pipe_and_status(*args)[0]
177 def qemu_img_log(*args):
178 result = qemu_img_pipe(*args)
179 log(result, filters=[filter_testfiles])
180 return result
182 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
183 args = ['info']
184 if imgopts:
185 args.append('--image-opts')
186 else:
187 args += ['-f', imgfmt]
188 args += extra_args
189 args.append(filename)
191 output = qemu_img_pipe(*args)
192 if not filter_path:
193 filter_path = filename
194 log(filter_img_info(output, filter_path))
196 def qemu_io(*args):
197 '''Run qemu-io and return the stdout data'''
198 args = qemu_io_args + list(args)
199 return qemu_tool_pipe_and_status('qemu-io', args)[0]
201 def qemu_io_log(*args):
202 result = qemu_io(*args)
203 log(result, filters=[filter_testfiles, filter_qemu_io])
204 return result
206 def qemu_io_silent(*args):
207 '''Run qemu-io and return the exit code, suppressing stdout'''
208 if '-f' in args or '--image-opts' in args:
209 default_args = qemu_io_args_no_fmt
210 else:
211 default_args = qemu_io_args
213 args = default_args + list(args)
214 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
215 if exitcode < 0:
216 sys.stderr.write('qemu-io received signal %i: %s\n' %
217 (-exitcode, ' '.join(args)))
218 return exitcode
220 def qemu_io_silent_check(*args):
221 '''Run qemu-io and return the true if subprocess returned 0'''
222 args = qemu_io_args + list(args)
223 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
224 stderr=subprocess.STDOUT)
225 return exitcode == 0
227 def get_virtio_scsi_device():
228 if qemu_default_machine == 's390-ccw-virtio':
229 return 'virtio-scsi-ccw'
230 return 'virtio-scsi-pci'
232 class QemuIoInteractive:
233 def __init__(self, *args):
234 self.args = qemu_io_args_no_fmt + list(args)
235 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
236 stdout=subprocess.PIPE,
237 stderr=subprocess.STDOUT,
238 universal_newlines=True)
239 out = self._p.stdout.read(9)
240 if out != 'qemu-io> ':
241 # Most probably qemu-io just failed to start.
242 # Let's collect the whole output and exit.
243 out += self._p.stdout.read()
244 self._p.wait(timeout=1)
245 raise ValueError(out)
247 def close(self):
248 self._p.communicate('q\n')
250 def _read_output(self):
251 pattern = 'qemu-io> '
252 n = len(pattern)
253 pos = 0
254 s = []
255 while pos != n:
256 c = self._p.stdout.read(1)
257 # check unexpected EOF
258 assert c != ''
259 s.append(c)
260 if c == pattern[pos]:
261 pos += 1
262 else:
263 pos = 0
265 return ''.join(s[:-n])
267 def cmd(self, cmd):
268 # quit command is in close(), '\n' is added automatically
269 assert '\n' not in cmd
270 cmd = cmd.strip()
271 assert cmd not in ('q', 'quit')
272 self._p.stdin.write(cmd + '\n')
273 self._p.stdin.flush()
274 return self._read_output()
277 def qemu_nbd(*args):
278 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
279 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
281 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
282 '''Run qemu-nbd in daemon mode and return both the parent's exit code
283 and its output in case of an error'''
284 full_args = qemu_nbd_args + ['--fork'] + list(args)
285 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
286 connect_stderr=False)
287 return returncode, output if returncode else ''
289 def qemu_nbd_list_log(*args: str) -> str:
290 '''Run qemu-nbd to list remote exports'''
291 full_args = [qemu_nbd_prog, '-L'] + list(args)
292 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
293 log(output, filters=[filter_testfiles, filter_nbd_exports])
294 return output
296 @contextmanager
297 def qemu_nbd_popen(*args):
298 '''Context manager running qemu-nbd within the context'''
299 pid_file = file_path("pid")
301 cmd = list(qemu_nbd_args)
302 cmd.extend(('--persistent', '--pid-file', pid_file))
303 cmd.extend(args)
305 log('Start NBD server')
306 p = subprocess.Popen(cmd)
307 try:
308 while not os.path.exists(pid_file):
309 if p.poll() is not None:
310 raise RuntimeError(
311 "qemu-nbd terminated with exit code {}: {}"
312 .format(p.returncode, ' '.join(cmd)))
314 time.sleep(0.01)
315 yield
316 finally:
317 log('Kill NBD server')
318 p.kill()
319 p.wait()
321 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
322 '''Return True if two image files are identical'''
323 return qemu_img('compare', '-f', fmt1,
324 '-F', fmt2, img1, img2) == 0
326 def create_image(name, size):
327 '''Create a fully-allocated raw image with sector markers'''
328 file = open(name, 'wb')
329 i = 0
330 while i < size:
331 sector = struct.pack('>l504xl', i // 512, i // 512)
332 file.write(sector)
333 i = i + 512
334 file.close()
336 def image_size(img):
337 '''Return image's virtual size'''
338 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
339 return json.loads(r)['virtual-size']
341 def is_str(val):
342 return isinstance(val, str)
344 test_dir_re = re.compile(r"%s" % test_dir)
345 def filter_test_dir(msg):
346 return test_dir_re.sub("TEST_DIR", msg)
348 win32_re = re.compile(r"\r")
349 def filter_win32(msg):
350 return win32_re.sub("", msg)
352 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
353 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
354 r"and [0-9\/.inf]* ops\/sec\)")
355 def filter_qemu_io(msg):
356 msg = filter_win32(msg)
357 return qemu_io_re.sub("X ops; XX:XX:XX.X "
358 "(XXX YYY/sec and XXX ops/sec)", msg)
360 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
361 def filter_chown(msg):
362 return chown_re.sub("chown UID:GID", msg)
364 def filter_qmp_event(event):
365 '''Filter a QMP event dict'''
366 event = dict(event)
367 if 'timestamp' in event:
368 event['timestamp']['seconds'] = 'SECS'
369 event['timestamp']['microseconds'] = 'USECS'
370 return event
372 def filter_qmp(qmsg, filter_fn):
373 '''Given a string filter, filter a QMP object's values.
374 filter_fn takes a (key, value) pair.'''
375 # Iterate through either lists or dicts;
376 if isinstance(qmsg, list):
377 items = enumerate(qmsg)
378 else:
379 items = qmsg.items()
381 for k, v in items:
382 if isinstance(v, (dict, list)):
383 qmsg[k] = filter_qmp(v, filter_fn)
384 else:
385 qmsg[k] = filter_fn(k, v)
386 return qmsg
388 def filter_testfiles(msg):
389 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
390 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
391 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
393 def filter_qmp_testfiles(qmsg):
394 def _filter(_key, value):
395 if is_str(value):
396 return filter_testfiles(value)
397 return value
398 return filter_qmp(qmsg, _filter)
400 def filter_virtio_scsi(output: str) -> str:
401 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
403 def filter_qmp_virtio_scsi(qmsg):
404 def _filter(_key, value):
405 if is_str(value):
406 return filter_virtio_scsi(value)
407 return value
408 return filter_qmp(qmsg, _filter)
410 def filter_generated_node_ids(msg):
411 return re.sub("#block[0-9]+", "NODE_NAME", msg)
413 def filter_img_info(output, filename):
414 lines = []
415 for line in output.split('\n'):
416 if 'disk size' in line or 'actual-size' in line:
417 continue
418 line = line.replace(filename, 'TEST_IMG')
419 line = filter_testfiles(line)
420 line = line.replace(imgfmt, 'IMGFMT')
421 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
422 line = re.sub('uuid: [-a-f0-9]+',
423 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
424 line)
425 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
426 lines.append(line)
427 return '\n'.join(lines)
429 def filter_imgfmt(msg):
430 return msg.replace(imgfmt, 'IMGFMT')
432 def filter_qmp_imgfmt(qmsg):
433 def _filter(_key, value):
434 if is_str(value):
435 return filter_imgfmt(value)
436 return value
437 return filter_qmp(qmsg, _filter)
439 def filter_nbd_exports(output: str) -> str:
440 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
443 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
445 def log(msg: Msg,
446 filters: Iterable[Callable[[Msg], Msg]] = (),
447 indent: Optional[int] = None) -> None:
449 Logs either a string message or a JSON serializable message (like QMP).
450 If indent is provided, JSON serializable messages are pretty-printed.
452 for flt in filters:
453 msg = flt(msg)
454 if isinstance(msg, (dict, list)):
455 # Don't sort if it's already sorted
456 do_sort = not isinstance(msg, OrderedDict)
457 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
458 else:
459 test_logger.info(msg)
461 class Timeout:
462 def __init__(self, seconds, errmsg="Timeout"):
463 self.seconds = seconds
464 self.errmsg = errmsg
465 def __enter__(self):
466 signal.signal(signal.SIGALRM, self.timeout)
467 signal.setitimer(signal.ITIMER_REAL, self.seconds)
468 return self
469 def __exit__(self, exc_type, value, traceback):
470 signal.setitimer(signal.ITIMER_REAL, 0)
471 return False
472 def timeout(self, signum, frame):
473 raise Exception(self.errmsg)
475 def file_pattern(name):
476 return "{0}-{1}".format(os.getpid(), name)
478 class FilePath:
480 Context manager generating multiple file names. The generated files are
481 removed when exiting the context.
483 Example usage:
485 with FilePath('a.img', 'b.img') as (img_a, img_b):
486 # Use img_a and img_b here...
488 # a.img and b.img are automatically removed here.
490 By default images are created in iotests.test_dir. To create sockets use
491 iotests.sock_dir:
493 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
495 For convenience, calling with one argument yields a single file instead of
496 a tuple with one item.
499 def __init__(self, *names, base_dir=test_dir):
500 self.paths = [os.path.join(base_dir, file_pattern(name))
501 for name in names]
503 def __enter__(self):
504 if len(self.paths) == 1:
505 return self.paths[0]
506 else:
507 return self.paths
509 def __exit__(self, exc_type, exc_val, exc_tb):
510 for path in self.paths:
511 try:
512 os.remove(path)
513 except OSError:
514 pass
515 return False
518 def try_remove(img):
519 try:
520 os.remove(img)
521 except OSError:
522 pass
524 def file_path_remover():
525 for path in reversed(file_path_remover.paths):
526 try_remove(path)
529 def file_path(*names, base_dir=test_dir):
530 ''' Another way to get auto-generated filename that cleans itself up.
532 Use is as simple as:
534 img_a, img_b = file_path('a.img', 'b.img')
535 sock = file_path('socket')
538 if not hasattr(file_path_remover, 'paths'):
539 file_path_remover.paths = []
540 atexit.register(file_path_remover)
542 paths = []
543 for name in names:
544 filename = file_pattern(name)
545 path = os.path.join(base_dir, filename)
546 file_path_remover.paths.append(path)
547 paths.append(path)
549 return paths[0] if len(paths) == 1 else paths
551 def remote_filename(path):
552 if imgproto == 'file':
553 return path
554 elif imgproto == 'ssh':
555 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
556 else:
557 raise Exception("Protocol %s not supported" % (imgproto))
559 class VM(qtest.QEMUQtestMachine):
560 '''A QEMU VM'''
562 def __init__(self, path_suffix=''):
563 name = "qemu%s-%d" % (path_suffix, os.getpid())
564 super().__init__(qemu_prog, qemu_opts, name=name,
565 test_dir=test_dir,
566 socket_scm_helper=socket_scm_helper,
567 sock_dir=sock_dir)
568 self._num_drives = 0
570 def add_object(self, opts):
571 self._args.append('-object')
572 self._args.append(opts)
573 return self
575 def add_device(self, opts):
576 self._args.append('-device')
577 self._args.append(opts)
578 return self
580 def add_drive_raw(self, opts):
581 self._args.append('-drive')
582 self._args.append(opts)
583 return self
585 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
586 '''Add a virtio-blk drive to the VM'''
587 options = ['if=%s' % interface,
588 'id=drive%d' % self._num_drives]
590 if path is not None:
591 options.append('file=%s' % path)
592 options.append('format=%s' % img_format)
593 options.append('cache=%s' % cachemode)
594 options.append('aio=%s' % aiomode)
596 if opts:
597 options.append(opts)
599 if img_format == 'luks' and 'key-secret' not in opts:
600 # default luks support
601 if luks_default_secret_object not in self._args:
602 self.add_object(luks_default_secret_object)
604 options.append(luks_default_key_secret_opt)
606 self._args.append('-drive')
607 self._args.append(','.join(options))
608 self._num_drives += 1
609 return self
611 def add_blockdev(self, opts):
612 self._args.append('-blockdev')
613 if isinstance(opts, str):
614 self._args.append(opts)
615 else:
616 self._args.append(','.join(opts))
617 return self
619 def add_incoming(self, addr):
620 self._args.append('-incoming')
621 self._args.append(addr)
622 return self
624 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
625 cmd = 'human-monitor-command'
626 kwargs: Dict[str, Any] = {'command-line': command_line}
627 if use_log:
628 return self.qmp_log(cmd, **kwargs)
629 else:
630 return self.qmp(cmd, **kwargs)
632 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
633 """Pause drive r/w operations"""
634 if not event:
635 self.pause_drive(drive, "read_aio")
636 self.pause_drive(drive, "write_aio")
637 return
638 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
640 def resume_drive(self, drive: str) -> None:
641 """Resume drive r/w operations"""
642 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
644 def hmp_qemu_io(self, drive: str, cmd: str,
645 use_log: bool = False) -> QMPMessage:
646 """Write to a given drive using an HMP command"""
647 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
649 def flatten_qmp_object(self, obj, output=None, basestr=''):
650 if output is None:
651 output = dict()
652 if isinstance(obj, list):
653 for i, item in enumerate(obj):
654 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
655 elif isinstance(obj, dict):
656 for key in obj:
657 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
658 else:
659 output[basestr[:-1]] = obj # Strip trailing '.'
660 return output
662 def qmp_to_opts(self, obj):
663 obj = self.flatten_qmp_object(obj)
664 output_list = list()
665 for key in obj:
666 output_list += [key + '=' + obj[key]]
667 return ','.join(output_list)
669 def get_qmp_events_filtered(self, wait=60.0):
670 result = []
671 for ev in self.get_qmp_events(wait=wait):
672 result.append(filter_qmp_event(ev))
673 return result
675 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
676 full_cmd = OrderedDict((
677 ("execute", cmd),
678 ("arguments", ordered_qmp(kwargs))
680 log(full_cmd, filters, indent=indent)
681 result = self.qmp(cmd, **kwargs)
682 log(result, filters, indent=indent)
683 return result
685 # Returns None on success, and an error string on failure
686 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
687 pre_finalize=None, cancel=False, wait=60.0):
689 run_job moves a job from creation through to dismissal.
691 :param job: String. ID of recently-launched job
692 :param auto_finalize: Bool. True if the job was launched with
693 auto_finalize. Defaults to True.
694 :param auto_dismiss: Bool. True if the job was launched with
695 auto_dismiss=True. Defaults to False.
696 :param pre_finalize: Callback. A callable that takes no arguments to be
697 invoked prior to issuing job-finalize, if any.
698 :param cancel: Bool. When true, cancels the job after the pre_finalize
699 callback.
700 :param wait: Float. Timeout value specifying how long to wait for any
701 event, in seconds. Defaults to 60.0.
703 match_device = {'data': {'device': job}}
704 match_id = {'data': {'id': job}}
705 events = [
706 ('BLOCK_JOB_COMPLETED', match_device),
707 ('BLOCK_JOB_CANCELLED', match_device),
708 ('BLOCK_JOB_ERROR', match_device),
709 ('BLOCK_JOB_READY', match_device),
710 ('BLOCK_JOB_PENDING', match_id),
711 ('JOB_STATUS_CHANGE', match_id)
713 error = None
714 while True:
715 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
716 if ev['event'] != 'JOB_STATUS_CHANGE':
717 log(ev)
718 continue
719 status = ev['data']['status']
720 if status == 'aborting':
721 result = self.qmp('query-jobs')
722 for j in result['return']:
723 if j['id'] == job:
724 error = j['error']
725 log('Job failed: %s' % (j['error']))
726 elif status == 'ready':
727 self.qmp_log('job-complete', id=job)
728 elif status == 'pending' and not auto_finalize:
729 if pre_finalize:
730 pre_finalize()
731 if cancel:
732 self.qmp_log('job-cancel', id=job)
733 else:
734 self.qmp_log('job-finalize', id=job)
735 elif status == 'concluded' and not auto_dismiss:
736 self.qmp_log('job-dismiss', id=job)
737 elif status == 'null':
738 return error
740 # Returns None on success, and an error string on failure
741 def blockdev_create(self, options, job_id='job0', filters=None):
742 if filters is None:
743 filters = [filter_qmp_testfiles]
744 result = self.qmp_log('blockdev-create', filters=filters,
745 job_id=job_id, options=options)
747 if 'return' in result:
748 assert result['return'] == {}
749 job_result = self.run_job(job_id)
750 else:
751 job_result = result['error']
753 log("")
754 return job_result
756 def enable_migration_events(self, name):
757 log('Enabling migration QMP events on %s...' % name)
758 log(self.qmp('migrate-set-capabilities', capabilities=[
760 'capability': 'events',
761 'state': True
765 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
766 while True:
767 event = self.event_wait('MIGRATION')
768 # We use the default timeout, and with a timeout, event_wait()
769 # never returns None
770 assert event
772 log(event, filters=[filter_qmp_event])
773 if event['data']['status'] in ('completed', 'failed'):
774 break
776 if event['data']['status'] == 'completed':
777 # The event may occur in finish-migrate, so wait for the expected
778 # post-migration runstate
779 runstate = None
780 while runstate != expect_runstate:
781 runstate = self.qmp('query-status')['return']['status']
782 return True
783 else:
784 return False
786 def node_info(self, node_name):
787 nodes = self.qmp('query-named-block-nodes')
788 for x in nodes['return']:
789 if x['node-name'] == node_name:
790 return x
791 return None
793 def query_bitmaps(self):
794 res = self.qmp("query-named-block-nodes")
795 return {device['node-name']: device['dirty-bitmaps']
796 for device in res['return'] if 'dirty-bitmaps' in device}
798 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
800 get a specific bitmap from the object returned by query_bitmaps.
801 :param recording: If specified, filter results by the specified value.
802 :param bitmaps: If specified, use it instead of call query_bitmaps()
804 if bitmaps is None:
805 bitmaps = self.query_bitmaps()
807 for bitmap in bitmaps[node_name]:
808 if bitmap.get('name', '') == bitmap_name:
809 if recording is None or bitmap.get('recording') == recording:
810 return bitmap
811 return None
813 def check_bitmap_status(self, node_name, bitmap_name, fields):
814 ret = self.get_bitmap(node_name, bitmap_name)
816 return fields.items() <= ret.items()
818 def assert_block_path(self, root, path, expected_node, graph=None):
820 Check whether the node under the given path in the block graph
821 is @expected_node.
823 @root is the node name of the node where the @path is rooted.
825 @path is a string that consists of child names separated by
826 slashes. It must begin with a slash.
828 Examples for @root + @path:
829 - root="qcow2-node", path="/backing/file"
830 - root="quorum-node", path="/children.2/file"
832 Hypothetically, @path could be empty, in which case it would
833 point to @root. However, in practice this case is not useful
834 and hence not allowed.
836 @expected_node may be None. (All elements of the path but the
837 leaf must still exist.)
839 @graph may be None or the result of an x-debug-query-block-graph
840 call that has already been performed.
842 if graph is None:
843 graph = self.qmp('x-debug-query-block-graph')['return']
845 iter_path = iter(path.split('/'))
847 # Must start with a /
848 assert next(iter_path) == ''
850 node = next((node for node in graph['nodes'] if node['name'] == root),
851 None)
853 # An empty @path is not allowed, so the root node must be present
854 assert node is not None, 'Root node %s not found' % root
856 for child_name in iter_path:
857 assert node is not None, 'Cannot follow path %s%s' % (root, path)
859 try:
860 node_id = next(edge['child'] for edge in graph['edges']
861 if (edge['parent'] == node['id'] and
862 edge['name'] == child_name))
864 node = next(node for node in graph['nodes']
865 if node['id'] == node_id)
867 except StopIteration:
868 node = None
870 if node is None:
871 assert expected_node is None, \
872 'No node found under %s (but expected %s)' % \
873 (path, expected_node)
874 else:
875 assert node['name'] == expected_node, \
876 'Found node %s under %s (but expected %s)' % \
877 (node['name'], path, expected_node)
879 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
881 class QMPTestCase(unittest.TestCase):
882 '''Abstract base class for QMP test cases'''
884 def __init__(self, *args, **kwargs):
885 super().__init__(*args, **kwargs)
886 # Many users of this class set a VM property we rely on heavily
887 # in the methods below.
888 self.vm = None
890 def dictpath(self, d, path):
891 '''Traverse a path in a nested dict'''
892 for component in path.split('/'):
893 m = index_re.match(component)
894 if m:
895 component, idx = m.groups()
896 idx = int(idx)
898 if not isinstance(d, dict) or component not in d:
899 self.fail(f'failed path traversal for "{path}" in "{d}"')
900 d = d[component]
902 if m:
903 if not isinstance(d, list):
904 self.fail(f'path component "{component}" in "{path}" '
905 f'is not a list in "{d}"')
906 try:
907 d = d[idx]
908 except IndexError:
909 self.fail(f'invalid index "{idx}" in path "{path}" '
910 f'in "{d}"')
911 return d
913 def assert_qmp_absent(self, d, path):
914 try:
915 result = self.dictpath(d, path)
916 except AssertionError:
917 return
918 self.fail('path "%s" has value "%s"' % (path, str(result)))
920 def assert_qmp(self, d, path, value):
921 '''Assert that the value for a specific path in a QMP dict
922 matches. When given a list of values, assert that any of
923 them matches.'''
925 result = self.dictpath(d, path)
927 # [] makes no sense as a list of valid values, so treat it as
928 # an actual single value.
929 if isinstance(value, list) and value != []:
930 for v in value:
931 if result == v:
932 return
933 self.fail('no match for "%s" in %s' % (str(result), str(value)))
934 else:
935 self.assertEqual(result, value,
936 '"%s" is "%s", expected "%s"'
937 % (path, str(result), str(value)))
939 def assert_no_active_block_jobs(self):
940 result = self.vm.qmp('query-block-jobs')
941 self.assert_qmp(result, 'return', [])
943 def assert_has_block_node(self, node_name=None, file_name=None):
944 """Issue a query-named-block-nodes and assert node_name and/or
945 file_name is present in the result"""
946 def check_equal_or_none(a, b):
947 return a is None or b is None or a == b
948 assert node_name or file_name
949 result = self.vm.qmp('query-named-block-nodes')
950 for x in result["return"]:
951 if check_equal_or_none(x.get("node-name"), node_name) and \
952 check_equal_or_none(x.get("file"), file_name):
953 return
954 self.fail("Cannot find %s %s in result:\n%s" %
955 (node_name, file_name, result))
957 def assert_json_filename_equal(self, json_filename, reference):
958 '''Asserts that the given filename is a json: filename and that its
959 content is equal to the given reference object'''
960 self.assertEqual(json_filename[:5], 'json:')
961 self.assertEqual(
962 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
963 self.vm.flatten_qmp_object(reference)
966 def cancel_and_wait(self, drive='drive0', force=False,
967 resume=False, wait=60.0):
968 '''Cancel a block job and wait for it to finish, returning the event'''
969 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
970 self.assert_qmp(result, 'return', {})
972 if resume:
973 self.vm.resume_drive(drive)
975 cancelled = False
976 result = None
977 while not cancelled:
978 for event in self.vm.get_qmp_events(wait=wait):
979 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
980 event['event'] == 'BLOCK_JOB_CANCELLED':
981 self.assert_qmp(event, 'data/device', drive)
982 result = event
983 cancelled = True
984 elif event['event'] == 'JOB_STATUS_CHANGE':
985 self.assert_qmp(event, 'data/id', drive)
988 self.assert_no_active_block_jobs()
989 return result
991 def wait_until_completed(self, drive='drive0', check_offset=True,
992 wait=60.0, error=None):
993 '''Wait for a block job to finish, returning the event'''
994 while True:
995 for event in self.vm.get_qmp_events(wait=wait):
996 if event['event'] == 'BLOCK_JOB_COMPLETED':
997 self.assert_qmp(event, 'data/device', drive)
998 if error is None:
999 self.assert_qmp_absent(event, 'data/error')
1000 if check_offset:
1001 self.assert_qmp(event, 'data/offset',
1002 event['data']['len'])
1003 else:
1004 self.assert_qmp(event, 'data/error', error)
1005 self.assert_no_active_block_jobs()
1006 return event
1007 if event['event'] == 'JOB_STATUS_CHANGE':
1008 self.assert_qmp(event, 'data/id', drive)
1010 def wait_ready(self, drive='drive0'):
1011 """Wait until a BLOCK_JOB_READY event, and return the event."""
1012 return self.vm.events_wait([
1013 ('BLOCK_JOB_READY',
1014 {'data': {'type': 'mirror', 'device': drive}}),
1015 ('BLOCK_JOB_READY',
1016 {'data': {'type': 'commit', 'device': drive}})
1019 def wait_ready_and_cancel(self, drive='drive0'):
1020 self.wait_ready(drive=drive)
1021 event = self.cancel_and_wait(drive=drive)
1022 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1023 self.assert_qmp(event, 'data/type', 'mirror')
1024 self.assert_qmp(event, 'data/offset', event['data']['len'])
1026 def complete_and_wait(self, drive='drive0', wait_ready=True,
1027 completion_error=None):
1028 '''Complete a block job and wait for it to finish'''
1029 if wait_ready:
1030 self.wait_ready(drive=drive)
1032 result = self.vm.qmp('block-job-complete', device=drive)
1033 self.assert_qmp(result, 'return', {})
1035 event = self.wait_until_completed(drive=drive, error=completion_error)
1036 self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1038 def pause_wait(self, job_id='job0'):
1039 with Timeout(3, "Timeout waiting for job to pause"):
1040 while True:
1041 result = self.vm.qmp('query-block-jobs')
1042 found = False
1043 for job in result['return']:
1044 if job['device'] == job_id:
1045 found = True
1046 if job['paused'] and not job['busy']:
1047 return job
1048 break
1049 assert found
1051 def pause_job(self, job_id='job0', wait=True):
1052 result = self.vm.qmp('block-job-pause', device=job_id)
1053 self.assert_qmp(result, 'return', {})
1054 if wait:
1055 return self.pause_wait(job_id)
1056 return result
1058 def case_skip(self, reason):
1059 '''Skip this test case'''
1060 case_notrun(reason)
1061 self.skipTest(reason)
1064 def notrun(reason):
1065 '''Skip this test suite'''
1066 # Each test in qemu-iotests has a number ("seq")
1067 seq = os.path.basename(sys.argv[0])
1069 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1070 logger.warning("%s not run: %s", seq, reason)
1071 sys.exit(0)
1073 def case_notrun(reason):
1074 '''Mark this test case as not having been run (without actually
1075 skipping it, that is left to the caller). See
1076 QMPTestCase.case_skip() for a variant that actually skips the
1077 current test case.'''
1079 # Each test in qemu-iotests has a number ("seq")
1080 seq = os.path.basename(sys.argv[0])
1082 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1083 ' [case not run] ' + reason + '\n')
1085 def _verify_image_format(supported_fmts: Sequence[str] = (),
1086 unsupported_fmts: Sequence[str] = ()) -> None:
1087 if 'generic' in supported_fmts and \
1088 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1089 # similar to
1090 # _supported_fmt generic
1091 # for bash tests
1092 supported_fmts = ()
1094 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1095 if not_sup or (imgfmt in unsupported_fmts):
1096 notrun('not suitable for this image format: %s' % imgfmt)
1098 if imgfmt == 'luks':
1099 verify_working_luks()
1101 def _verify_protocol(supported: Sequence[str] = (),
1102 unsupported: Sequence[str] = ()) -> None:
1103 assert not (supported and unsupported)
1105 if 'generic' in supported:
1106 return
1108 not_sup = supported and (imgproto not in supported)
1109 if not_sup or (imgproto in unsupported):
1110 notrun('not suitable for this protocol: %s' % imgproto)
1112 def _verify_platform(supported: Sequence[str] = (),
1113 unsupported: Sequence[str] = ()) -> None:
1114 if any((sys.platform.startswith(x) for x in unsupported)):
1115 notrun('not suitable for this OS: %s' % sys.platform)
1117 if supported:
1118 if not any((sys.platform.startswith(x) for x in supported)):
1119 notrun('not suitable for this OS: %s' % sys.platform)
1121 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1122 if supported_cache_modes and (cachemode not in supported_cache_modes):
1123 notrun('not suitable for this cache mode: %s' % cachemode)
1125 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1126 if supported_aio_modes and (aiomode not in supported_aio_modes):
1127 notrun('not suitable for this aio mode: %s' % aiomode)
1129 def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1130 usf_list = list(set(required_formats) - set(supported_formats()))
1131 if usf_list:
1132 notrun(f'formats {usf_list} are not whitelisted')
1134 def supports_quorum():
1135 return 'quorum' in qemu_img_pipe('--help')
1137 def verify_quorum():
1138 '''Skip test suite if quorum support is not available'''
1139 if not supports_quorum():
1140 notrun('quorum support missing')
1142 def has_working_luks() -> Tuple[bool, str]:
1144 Check whether our LUKS driver can actually create images
1145 (this extends to LUKS encryption for qcow2).
1147 If not, return the reason why.
1150 img_file = f'{test_dir}/luks-test.luks'
1151 (output, status) = \
1152 qemu_img_pipe_and_status('create', '-f', 'luks',
1153 '--object', luks_default_secret_object,
1154 '-o', luks_default_key_secret_opt,
1155 '-o', 'iter-time=10',
1156 img_file, '1G')
1157 try:
1158 os.remove(img_file)
1159 except OSError:
1160 pass
1162 if status != 0:
1163 reason = output
1164 for line in output.splitlines():
1165 if img_file + ':' in line:
1166 reason = line.split(img_file + ':', 1)[1].strip()
1167 break
1169 return (False, reason)
1170 else:
1171 return (True, '')
1173 def verify_working_luks():
1175 Skip test suite if LUKS does not work
1177 (working, reason) = has_working_luks()
1178 if not working:
1179 notrun(reason)
1181 def qemu_pipe(*args: str) -> str:
1183 Run qemu with an option to print something and exit (e.g. a help option).
1185 :return: QEMU's stdout output.
1187 full_args = [qemu_prog] + qemu_opts + list(args)
1188 output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1189 return output
1191 def supported_formats(read_only=False):
1192 '''Set 'read_only' to True to check ro-whitelist
1193 Otherwise, rw-whitelist is checked'''
1195 if not hasattr(supported_formats, "formats"):
1196 supported_formats.formats = {}
1198 if read_only not in supported_formats.formats:
1199 format_message = qemu_pipe("-drive", "format=help")
1200 line = 1 if read_only else 0
1201 supported_formats.formats[read_only] = \
1202 format_message.splitlines()[line].split(":")[1].split()
1204 return supported_formats.formats[read_only]
1206 def skip_if_unsupported(required_formats=(), read_only=False):
1207 '''Skip Test Decorator
1208 Runs the test if all the required formats are whitelisted'''
1209 def skip_test_decorator(func):
1210 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1211 **kwargs: Dict[str, Any]) -> None:
1212 if callable(required_formats):
1213 fmts = required_formats(test_case)
1214 else:
1215 fmts = required_formats
1217 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1218 if usf_list:
1219 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1220 test_case.case_skip(msg)
1221 else:
1222 func(test_case, *args, **kwargs)
1223 return func_wrapper
1224 return skip_test_decorator
1226 def skip_for_formats(formats: Sequence[str] = ()) \
1227 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1228 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1229 '''Skip Test Decorator
1230 Skips the test for the given formats'''
1231 def skip_test_decorator(func):
1232 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1233 **kwargs: Dict[str, Any]) -> None:
1234 if imgfmt in formats:
1235 msg = f'{test_case}: Skipped for format {imgfmt}'
1236 test_case.case_skip(msg)
1237 else:
1238 func(test_case, *args, **kwargs)
1239 return func_wrapper
1240 return skip_test_decorator
1242 def skip_if_user_is_root(func):
1243 '''Skip Test Decorator
1244 Runs the test only without root permissions'''
1245 def func_wrapper(*args, **kwargs):
1246 if os.getuid() == 0:
1247 case_notrun('{}: cannot be run as root'.format(args[0]))
1248 return None
1249 else:
1250 return func(*args, **kwargs)
1251 return func_wrapper
1253 def execute_unittest(debug=False):
1254 """Executes unittests within the calling module."""
1256 verbosity = 2 if debug else 1
1258 if debug:
1259 output = sys.stdout
1260 else:
1261 # We need to filter out the time taken from the output so that
1262 # qemu-iotest can reliably diff the results against master output.
1263 output = io.StringIO()
1265 runner = unittest.TextTestRunner(stream=output, descriptions=True,
1266 verbosity=verbosity)
1267 try:
1268 # unittest.main() will use sys.exit(); so expect a SystemExit
1269 # exception
1270 unittest.main(testRunner=runner)
1271 finally:
1272 # We need to filter out the time taken from the output so that
1273 # qemu-iotest can reliably diff the results against master output.
1274 if not debug:
1275 out = output.getvalue()
1276 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1278 # Hide skipped tests from the reference output
1279 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1280 out_first_line, out_rest = out.split('\n', 1)
1281 out = out_first_line.replace('s', '.') + '\n' + out_rest
1283 sys.stderr.write(out)
1285 def execute_setup_common(supported_fmts: Sequence[str] = (),
1286 supported_platforms: Sequence[str] = (),
1287 supported_cache_modes: Sequence[str] = (),
1288 supported_aio_modes: Sequence[str] = (),
1289 unsupported_fmts: Sequence[str] = (),
1290 supported_protocols: Sequence[str] = (),
1291 unsupported_protocols: Sequence[str] = (),
1292 required_fmts: Sequence[str] = ()) -> bool:
1294 Perform necessary setup for either script-style or unittest-style tests.
1296 :return: Bool; Whether or not debug mode has been requested via the CLI.
1298 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1300 debug = '-d' in sys.argv
1301 if debug:
1302 sys.argv.remove('-d')
1303 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1305 _verify_image_format(supported_fmts, unsupported_fmts)
1306 _verify_protocol(supported_protocols, unsupported_protocols)
1307 _verify_platform(supported=supported_platforms)
1308 _verify_cache_mode(supported_cache_modes)
1309 _verify_aio_mode(supported_aio_modes)
1310 _verify_formats(required_fmts)
1312 return debug
1314 def execute_test(*args, test_function=None, **kwargs):
1315 """Run either unittest or script-style tests."""
1317 debug = execute_setup_common(*args, **kwargs)
1318 if not test_function:
1319 execute_unittest(debug)
1320 else:
1321 test_function()
1323 def activate_logging():
1324 """Activate iotests.log() output to stdout for script-style tests."""
1325 handler = logging.StreamHandler(stream=sys.stdout)
1326 formatter = logging.Formatter('%(message)s')
1327 handler.setFormatter(formatter)
1328 test_logger.addHandler(handler)
1329 test_logger.setLevel(logging.INFO)
1330 test_logger.propagate = False
1332 # This is called from script-style iotests without a single point of entry
1333 def script_initialize(*args, **kwargs):
1334 """Initialize script-style tests without running any tests."""
1335 activate_logging()
1336 execute_setup_common(*args, **kwargs)
1338 # This is called from script-style iotests with a single point of entry
1339 def script_main(test_function, *args, **kwargs):
1340 """Run script-style tests outside of the unittest framework"""
1341 activate_logging()
1342 execute_test(*args, test_function=test_function, **kwargs)
1344 # This is called from unittest style iotests
1345 def main(*args, **kwargs):
1346 """Run tests using the unittest framework"""
1347 execute_test(*args, **kwargs)