iotests.py: fix qemu_tool_pipe_and_status()
[qemu/ar7.git] / tests / qemu-iotests / iotests.py
blobea5c3c51624ea35ad10119f4d2e77ed8f7c8a059
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import atexit
20 from collections import OrderedDict
21 import faulthandler
22 import io
23 import json
24 import logging
25 import os
26 import re
27 import signal
28 import struct
29 import subprocess
30 import sys
31 import time
32 from typing import (Any, Callable, Dict, Iterable,
33 List, Optional, Sequence, Tuple, TypeVar)
34 import unittest
36 from contextlib import contextmanager
38 # pylint: disable=import-error, wrong-import-position
39 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40 from qemu import qtest
41 from qemu.qmp import QMPMessage
43 # Use this logger for logging messages directly from the iotests module
44 logger = logging.getLogger('qemu.iotests')
45 logger.addHandler(logging.NullHandler())
47 # Use this logger for messages that ought to be used for diff output.
48 test_logger = logging.getLogger('qemu.iotests.diff_io')
51 faulthandler.enable()
53 # This will not work if arguments contain spaces but is necessary if we
54 # want to support the override options that ./check supports.
55 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
56 if os.environ.get('QEMU_IMG_OPTIONS'):
57 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
60 if os.environ.get('QEMU_IO_OPTIONS'):
61 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
65 qemu_io_args_no_fmt += \
66 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
69 qemu_nbd_args = [qemu_nbd_prog]
70 if os.environ.get('QEMU_NBD_OPTIONS'):
71 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
74 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76 imgfmt = os.environ.get('IMGFMT', 'raw')
77 imgproto = os.environ.get('IMGPROTO', 'file')
78 test_dir = os.environ.get('TEST_DIR')
79 sock_dir = os.environ.get('SOCK_DIR')
80 output_dir = os.environ.get('OUTPUT_DIR', '.')
81 cachemode = os.environ.get('CACHEMODE')
82 aiomode = os.environ.get('AIOMODE')
83 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
85 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
87 luks_default_secret_object = 'secret,id=keysec0,data=' + \
88 os.environ.get('IMGKEYSECRET', '')
89 luks_default_key_secret_opt = 'key-secret=keysec0'
92 def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
93 connect_stderr: bool = True) -> Tuple[str, int]:
94 """
95 Run a tool and return both its output and its exit code
96 """
97 stderr = subprocess.STDOUT if connect_stderr else None
98 subp = subprocess.Popen(args,
99 stdout=subprocess.PIPE,
100 stderr=stderr,
101 universal_newlines=True)
102 output = subp.communicate()[0]
103 if subp.returncode < 0:
104 cmd = ' '.join(args)
105 sys.stderr.write(f'{tool} received signal {-subp.returncode}: {cmd}\n')
106 return (output, subp.returncode)
108 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
110 Run qemu-img and return both its output and its exit code
112 full_args = qemu_img_args + list(args)
113 return qemu_tool_pipe_and_status('qemu-img', full_args)
115 def qemu_img(*args: str) -> int:
116 '''Run qemu-img and return the exit code'''
117 return qemu_img_pipe_and_status(*args)[1]
119 def ordered_qmp(qmsg, conv_keys=True):
120 # Dictionaries are not ordered prior to 3.6, therefore:
121 if isinstance(qmsg, list):
122 return [ordered_qmp(atom) for atom in qmsg]
123 if isinstance(qmsg, dict):
124 od = OrderedDict()
125 for k, v in sorted(qmsg.items()):
126 if conv_keys:
127 k = k.replace('_', '-')
128 od[k] = ordered_qmp(v, conv_keys=False)
129 return od
130 return qmsg
132 def qemu_img_create(*args):
133 args = list(args)
135 # default luks support
136 if '-f' in args and args[args.index('-f') + 1] == 'luks':
137 if '-o' in args:
138 i = args.index('-o')
139 if 'key-secret' not in args[i + 1]:
140 args[i + 1].append(luks_default_key_secret_opt)
141 args.insert(i + 2, '--object')
142 args.insert(i + 3, luks_default_secret_object)
143 else:
144 args = ['-o', luks_default_key_secret_opt,
145 '--object', luks_default_secret_object] + args
147 args.insert(0, 'create')
149 return qemu_img(*args)
151 def qemu_img_measure(*args):
152 return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
154 def qemu_img_check(*args):
155 return json.loads(qemu_img_pipe("check", "--output", "json", *args))
157 def qemu_img_verbose(*args):
158 '''Run qemu-img without suppressing its output and return the exit code'''
159 exitcode = subprocess.call(qemu_img_args + list(args))
160 if exitcode < 0:
161 sys.stderr.write('qemu-img received signal %i: %s\n'
162 % (-exitcode, ' '.join(qemu_img_args + list(args))))
163 return exitcode
165 def qemu_img_pipe(*args: str) -> str:
166 '''Run qemu-img and return its output'''
167 return qemu_img_pipe_and_status(*args)[0]
169 def qemu_img_log(*args):
170 result = qemu_img_pipe(*args)
171 log(result, filters=[filter_testfiles])
172 return result
174 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
175 args = ['info']
176 if imgopts:
177 args.append('--image-opts')
178 else:
179 args += ['-f', imgfmt]
180 args += extra_args
181 args.append(filename)
183 output = qemu_img_pipe(*args)
184 if not filter_path:
185 filter_path = filename
186 log(filter_img_info(output, filter_path))
188 def qemu_io(*args):
189 '''Run qemu-io and return the stdout data'''
190 args = qemu_io_args + list(args)
191 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
192 stderr=subprocess.STDOUT,
193 universal_newlines=True)
194 output = subp.communicate()[0]
195 if subp.returncode < 0:
196 sys.stderr.write('qemu-io received signal %i: %s\n'
197 % (-subp.returncode, ' '.join(args)))
198 return output
200 def qemu_io_log(*args):
201 result = qemu_io(*args)
202 log(result, filters=[filter_testfiles, filter_qemu_io])
203 return result
205 def qemu_io_silent(*args):
206 '''Run qemu-io and return the exit code, suppressing stdout'''
207 if '-f' in args or '--image-opts' in args:
208 default_args = qemu_io_args_no_fmt
209 else:
210 default_args = qemu_io_args
212 args = default_args + list(args)
213 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
214 if exitcode < 0:
215 sys.stderr.write('qemu-io received signal %i: %s\n' %
216 (-exitcode, ' '.join(args)))
217 return exitcode
219 def qemu_io_silent_check(*args):
220 '''Run qemu-io and return the true if subprocess returned 0'''
221 args = qemu_io_args + list(args)
222 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
223 stderr=subprocess.STDOUT)
224 return exitcode == 0
226 def get_virtio_scsi_device():
227 if qemu_default_machine == 's390-ccw-virtio':
228 return 'virtio-scsi-ccw'
229 return 'virtio-scsi-pci'
231 class QemuIoInteractive:
232 def __init__(self, *args):
233 self.args = qemu_io_args_no_fmt + list(args)
234 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
235 stdout=subprocess.PIPE,
236 stderr=subprocess.STDOUT,
237 universal_newlines=True)
238 out = self._p.stdout.read(9)
239 if out != 'qemu-io> ':
240 # Most probably qemu-io just failed to start.
241 # Let's collect the whole output and exit.
242 out += self._p.stdout.read()
243 self._p.wait(timeout=1)
244 raise ValueError(out)
246 def close(self):
247 self._p.communicate('q\n')
249 def _read_output(self):
250 pattern = 'qemu-io> '
251 n = len(pattern)
252 pos = 0
253 s = []
254 while pos != n:
255 c = self._p.stdout.read(1)
256 # check unexpected EOF
257 assert c != ''
258 s.append(c)
259 if c == pattern[pos]:
260 pos += 1
261 else:
262 pos = 0
264 return ''.join(s[:-n])
266 def cmd(self, cmd):
267 # quit command is in close(), '\n' is added automatically
268 assert '\n' not in cmd
269 cmd = cmd.strip()
270 assert cmd not in ('q', 'quit')
271 self._p.stdin.write(cmd + '\n')
272 self._p.stdin.flush()
273 return self._read_output()
276 def qemu_nbd(*args):
277 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
278 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
280 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
281 '''Run qemu-nbd in daemon mode and return both the parent's exit code
282 and its output in case of an error'''
283 full_args = qemu_nbd_args + ['--fork'] + list(args)
284 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
285 connect_stderr=False)
286 return returncode, output if returncode else ''
288 def qemu_nbd_list_log(*args: str) -> str:
289 '''Run qemu-nbd to list remote exports'''
290 full_args = [qemu_nbd_prog, '-L'] + list(args)
291 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
292 log(output, filters=[filter_testfiles, filter_nbd_exports])
293 return output
295 @contextmanager
296 def qemu_nbd_popen(*args):
297 '''Context manager running qemu-nbd within the context'''
298 pid_file = file_path("pid")
300 cmd = list(qemu_nbd_args)
301 cmd.extend(('--persistent', '--pid-file', pid_file))
302 cmd.extend(args)
304 log('Start NBD server')
305 p = subprocess.Popen(cmd)
306 try:
307 while not os.path.exists(pid_file):
308 if p.poll() is not None:
309 raise RuntimeError(
310 "qemu-nbd terminated with exit code {}: {}"
311 .format(p.returncode, ' '.join(cmd)))
313 time.sleep(0.01)
314 yield
315 finally:
316 log('Kill NBD server')
317 p.kill()
318 p.wait()
320 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
321 '''Return True if two image files are identical'''
322 return qemu_img('compare', '-f', fmt1,
323 '-F', fmt2, img1, img2) == 0
325 def create_image(name, size):
326 '''Create a fully-allocated raw image with sector markers'''
327 file = open(name, 'wb')
328 i = 0
329 while i < size:
330 sector = struct.pack('>l504xl', i // 512, i // 512)
331 file.write(sector)
332 i = i + 512
333 file.close()
335 def image_size(img):
336 '''Return image's virtual size'''
337 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
338 return json.loads(r)['virtual-size']
340 def is_str(val):
341 return isinstance(val, str)
343 test_dir_re = re.compile(r"%s" % test_dir)
344 def filter_test_dir(msg):
345 return test_dir_re.sub("TEST_DIR", msg)
347 win32_re = re.compile(r"\r")
348 def filter_win32(msg):
349 return win32_re.sub("", msg)
351 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
352 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
353 r"and [0-9\/.inf]* ops\/sec\)")
354 def filter_qemu_io(msg):
355 msg = filter_win32(msg)
356 return qemu_io_re.sub("X ops; XX:XX:XX.X "
357 "(XXX YYY/sec and XXX ops/sec)", msg)
359 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
360 def filter_chown(msg):
361 return chown_re.sub("chown UID:GID", msg)
363 def filter_qmp_event(event):
364 '''Filter a QMP event dict'''
365 event = dict(event)
366 if 'timestamp' in event:
367 event['timestamp']['seconds'] = 'SECS'
368 event['timestamp']['microseconds'] = 'USECS'
369 return event
371 def filter_qmp(qmsg, filter_fn):
372 '''Given a string filter, filter a QMP object's values.
373 filter_fn takes a (key, value) pair.'''
374 # Iterate through either lists or dicts;
375 if isinstance(qmsg, list):
376 items = enumerate(qmsg)
377 else:
378 items = qmsg.items()
380 for k, v in items:
381 if isinstance(v, (dict, list)):
382 qmsg[k] = filter_qmp(v, filter_fn)
383 else:
384 qmsg[k] = filter_fn(k, v)
385 return qmsg
387 def filter_testfiles(msg):
388 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
389 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
390 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
392 def filter_qmp_testfiles(qmsg):
393 def _filter(_key, value):
394 if is_str(value):
395 return filter_testfiles(value)
396 return value
397 return filter_qmp(qmsg, _filter)
399 def filter_virtio_scsi(output: str) -> str:
400 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
402 def filter_qmp_virtio_scsi(qmsg):
403 def _filter(_key, value):
404 if is_str(value):
405 return filter_virtio_scsi(value)
406 return value
407 return filter_qmp(qmsg, _filter)
409 def filter_generated_node_ids(msg):
410 return re.sub("#block[0-9]+", "NODE_NAME", msg)
412 def filter_img_info(output, filename):
413 lines = []
414 for line in output.split('\n'):
415 if 'disk size' in line or 'actual-size' in line:
416 continue
417 line = line.replace(filename, 'TEST_IMG')
418 line = filter_testfiles(line)
419 line = line.replace(imgfmt, 'IMGFMT')
420 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
421 line = re.sub('uuid: [-a-f0-9]+',
422 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
423 line)
424 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
425 lines.append(line)
426 return '\n'.join(lines)
428 def filter_imgfmt(msg):
429 return msg.replace(imgfmt, 'IMGFMT')
431 def filter_qmp_imgfmt(qmsg):
432 def _filter(_key, value):
433 if is_str(value):
434 return filter_imgfmt(value)
435 return value
436 return filter_qmp(qmsg, _filter)
438 def filter_nbd_exports(output: str) -> str:
439 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
442 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
444 def log(msg: Msg,
445 filters: Iterable[Callable[[Msg], Msg]] = (),
446 indent: Optional[int] = None) -> None:
448 Logs either a string message or a JSON serializable message (like QMP).
449 If indent is provided, JSON serializable messages are pretty-printed.
451 for flt in filters:
452 msg = flt(msg)
453 if isinstance(msg, (dict, list)):
454 # Don't sort if it's already sorted
455 do_sort = not isinstance(msg, OrderedDict)
456 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
457 else:
458 test_logger.info(msg)
460 class Timeout:
461 def __init__(self, seconds, errmsg="Timeout"):
462 self.seconds = seconds
463 self.errmsg = errmsg
464 def __enter__(self):
465 signal.signal(signal.SIGALRM, self.timeout)
466 signal.setitimer(signal.ITIMER_REAL, self.seconds)
467 return self
468 def __exit__(self, exc_type, value, traceback):
469 signal.setitimer(signal.ITIMER_REAL, 0)
470 return False
471 def timeout(self, signum, frame):
472 raise Exception(self.errmsg)
474 def file_pattern(name):
475 return "{0}-{1}".format(os.getpid(), name)
477 class FilePath:
479 Context manager generating multiple file names. The generated files are
480 removed when exiting the context.
482 Example usage:
484 with FilePath('a.img', 'b.img') as (img_a, img_b):
485 # Use img_a and img_b here...
487 # a.img and b.img are automatically removed here.
489 By default images are created in iotests.test_dir. To create sockets use
490 iotests.sock_dir:
492 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
494 For convenience, calling with one argument yields a single file instead of
495 a tuple with one item.
498 def __init__(self, *names, base_dir=test_dir):
499 self.paths = [os.path.join(base_dir, file_pattern(name))
500 for name in names]
502 def __enter__(self):
503 if len(self.paths) == 1:
504 return self.paths[0]
505 else:
506 return self.paths
508 def __exit__(self, exc_type, exc_val, exc_tb):
509 for path in self.paths:
510 try:
511 os.remove(path)
512 except OSError:
513 pass
514 return False
517 def file_path_remover():
518 for path in reversed(file_path_remover.paths):
519 try:
520 os.remove(path)
521 except OSError:
522 pass
525 def file_path(*names, base_dir=test_dir):
526 ''' Another way to get auto-generated filename that cleans itself up.
528 Use is as simple as:
530 img_a, img_b = file_path('a.img', 'b.img')
531 sock = file_path('socket')
534 if not hasattr(file_path_remover, 'paths'):
535 file_path_remover.paths = []
536 atexit.register(file_path_remover)
538 paths = []
539 for name in names:
540 filename = file_pattern(name)
541 path = os.path.join(base_dir, filename)
542 file_path_remover.paths.append(path)
543 paths.append(path)
545 return paths[0] if len(paths) == 1 else paths
547 def remote_filename(path):
548 if imgproto == 'file':
549 return path
550 elif imgproto == 'ssh':
551 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
552 else:
553 raise Exception("Protocol %s not supported" % (imgproto))
555 class VM(qtest.QEMUQtestMachine):
556 '''A QEMU VM'''
558 def __init__(self, path_suffix=''):
559 name = "qemu%s-%d" % (path_suffix, os.getpid())
560 super().__init__(qemu_prog, qemu_opts, name=name,
561 test_dir=test_dir,
562 socket_scm_helper=socket_scm_helper,
563 sock_dir=sock_dir)
564 self._num_drives = 0
566 def add_object(self, opts):
567 self._args.append('-object')
568 self._args.append(opts)
569 return self
571 def add_device(self, opts):
572 self._args.append('-device')
573 self._args.append(opts)
574 return self
576 def add_drive_raw(self, opts):
577 self._args.append('-drive')
578 self._args.append(opts)
579 return self
581 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
582 '''Add a virtio-blk drive to the VM'''
583 options = ['if=%s' % interface,
584 'id=drive%d' % self._num_drives]
586 if path is not None:
587 options.append('file=%s' % path)
588 options.append('format=%s' % img_format)
589 options.append('cache=%s' % cachemode)
590 options.append('aio=%s' % aiomode)
592 if opts:
593 options.append(opts)
595 if img_format == 'luks' and 'key-secret' not in opts:
596 # default luks support
597 if luks_default_secret_object not in self._args:
598 self.add_object(luks_default_secret_object)
600 options.append(luks_default_key_secret_opt)
602 self._args.append('-drive')
603 self._args.append(','.join(options))
604 self._num_drives += 1
605 return self
607 def add_blockdev(self, opts):
608 self._args.append('-blockdev')
609 if isinstance(opts, str):
610 self._args.append(opts)
611 else:
612 self._args.append(','.join(opts))
613 return self
615 def add_incoming(self, addr):
616 self._args.append('-incoming')
617 self._args.append(addr)
618 return self
620 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
621 cmd = 'human-monitor-command'
622 kwargs: Dict[str, Any] = {'command-line': command_line}
623 if use_log:
624 return self.qmp_log(cmd, **kwargs)
625 else:
626 return self.qmp(cmd, **kwargs)
628 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
629 """Pause drive r/w operations"""
630 if not event:
631 self.pause_drive(drive, "read_aio")
632 self.pause_drive(drive, "write_aio")
633 return
634 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
636 def resume_drive(self, drive: str) -> None:
637 """Resume drive r/w operations"""
638 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
640 def hmp_qemu_io(self, drive: str, cmd: str,
641 use_log: bool = False) -> QMPMessage:
642 """Write to a given drive using an HMP command"""
643 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
645 def flatten_qmp_object(self, obj, output=None, basestr=''):
646 if output is None:
647 output = dict()
648 if isinstance(obj, list):
649 for i, item in enumerate(obj):
650 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
651 elif isinstance(obj, dict):
652 for key in obj:
653 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
654 else:
655 output[basestr[:-1]] = obj # Strip trailing '.'
656 return output
658 def qmp_to_opts(self, obj):
659 obj = self.flatten_qmp_object(obj)
660 output_list = list()
661 for key in obj:
662 output_list += [key + '=' + obj[key]]
663 return ','.join(output_list)
665 def get_qmp_events_filtered(self, wait=60.0):
666 result = []
667 for ev in self.get_qmp_events(wait=wait):
668 result.append(filter_qmp_event(ev))
669 return result
671 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
672 full_cmd = OrderedDict((
673 ("execute", cmd),
674 ("arguments", ordered_qmp(kwargs))
676 log(full_cmd, filters, indent=indent)
677 result = self.qmp(cmd, **kwargs)
678 log(result, filters, indent=indent)
679 return result
681 # Returns None on success, and an error string on failure
682 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
683 pre_finalize=None, cancel=False, wait=60.0):
685 run_job moves a job from creation through to dismissal.
687 :param job: String. ID of recently-launched job
688 :param auto_finalize: Bool. True if the job was launched with
689 auto_finalize. Defaults to True.
690 :param auto_dismiss: Bool. True if the job was launched with
691 auto_dismiss=True. Defaults to False.
692 :param pre_finalize: Callback. A callable that takes no arguments to be
693 invoked prior to issuing job-finalize, if any.
694 :param cancel: Bool. When true, cancels the job after the pre_finalize
695 callback.
696 :param wait: Float. Timeout value specifying how long to wait for any
697 event, in seconds. Defaults to 60.0.
699 match_device = {'data': {'device': job}}
700 match_id = {'data': {'id': job}}
701 events = [
702 ('BLOCK_JOB_COMPLETED', match_device),
703 ('BLOCK_JOB_CANCELLED', match_device),
704 ('BLOCK_JOB_ERROR', match_device),
705 ('BLOCK_JOB_READY', match_device),
706 ('BLOCK_JOB_PENDING', match_id),
707 ('JOB_STATUS_CHANGE', match_id)
709 error = None
710 while True:
711 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
712 if ev['event'] != 'JOB_STATUS_CHANGE':
713 log(ev)
714 continue
715 status = ev['data']['status']
716 if status == 'aborting':
717 result = self.qmp('query-jobs')
718 for j in result['return']:
719 if j['id'] == job:
720 error = j['error']
721 log('Job failed: %s' % (j['error']))
722 elif status == 'ready':
723 self.qmp_log('job-complete', id=job)
724 elif status == 'pending' and not auto_finalize:
725 if pre_finalize:
726 pre_finalize()
727 if cancel:
728 self.qmp_log('job-cancel', id=job)
729 else:
730 self.qmp_log('job-finalize', id=job)
731 elif status == 'concluded' and not auto_dismiss:
732 self.qmp_log('job-dismiss', id=job)
733 elif status == 'null':
734 return error
736 # Returns None on success, and an error string on failure
737 def blockdev_create(self, options, job_id='job0', filters=None):
738 if filters is None:
739 filters = [filter_qmp_testfiles]
740 result = self.qmp_log('blockdev-create', filters=filters,
741 job_id=job_id, options=options)
743 if 'return' in result:
744 assert result['return'] == {}
745 job_result = self.run_job(job_id)
746 else:
747 job_result = result['error']
749 log("")
750 return job_result
752 def enable_migration_events(self, name):
753 log('Enabling migration QMP events on %s...' % name)
754 log(self.qmp('migrate-set-capabilities', capabilities=[
756 'capability': 'events',
757 'state': True
761 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
762 while True:
763 event = self.event_wait('MIGRATION')
764 # We use the default timeout, and with a timeout, event_wait()
765 # never returns None
766 assert event
768 log(event, filters=[filter_qmp_event])
769 if event['data']['status'] in ('completed', 'failed'):
770 break
772 if event['data']['status'] == 'completed':
773 # The event may occur in finish-migrate, so wait for the expected
774 # post-migration runstate
775 runstate = None
776 while runstate != expect_runstate:
777 runstate = self.qmp('query-status')['return']['status']
778 return True
779 else:
780 return False
782 def node_info(self, node_name):
783 nodes = self.qmp('query-named-block-nodes')
784 for x in nodes['return']:
785 if x['node-name'] == node_name:
786 return x
787 return None
789 def query_bitmaps(self):
790 res = self.qmp("query-named-block-nodes")
791 return {device['node-name']: device['dirty-bitmaps']
792 for device in res['return'] if 'dirty-bitmaps' in device}
794 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
796 get a specific bitmap from the object returned by query_bitmaps.
797 :param recording: If specified, filter results by the specified value.
798 :param bitmaps: If specified, use it instead of call query_bitmaps()
800 if bitmaps is None:
801 bitmaps = self.query_bitmaps()
803 for bitmap in bitmaps[node_name]:
804 if bitmap.get('name', '') == bitmap_name:
805 if recording is None or bitmap.get('recording') == recording:
806 return bitmap
807 return None
809 def check_bitmap_status(self, node_name, bitmap_name, fields):
810 ret = self.get_bitmap(node_name, bitmap_name)
812 return fields.items() <= ret.items()
814 def assert_block_path(self, root, path, expected_node, graph=None):
816 Check whether the node under the given path in the block graph
817 is @expected_node.
819 @root is the node name of the node where the @path is rooted.
821 @path is a string that consists of child names separated by
822 slashes. It must begin with a slash.
824 Examples for @root + @path:
825 - root="qcow2-node", path="/backing/file"
826 - root="quorum-node", path="/children.2/file"
828 Hypothetically, @path could be empty, in which case it would
829 point to @root. However, in practice this case is not useful
830 and hence not allowed.
832 @expected_node may be None. (All elements of the path but the
833 leaf must still exist.)
835 @graph may be None or the result of an x-debug-query-block-graph
836 call that has already been performed.
838 if graph is None:
839 graph = self.qmp('x-debug-query-block-graph')['return']
841 iter_path = iter(path.split('/'))
843 # Must start with a /
844 assert next(iter_path) == ''
846 node = next((node for node in graph['nodes'] if node['name'] == root),
847 None)
849 # An empty @path is not allowed, so the root node must be present
850 assert node is not None, 'Root node %s not found' % root
852 for child_name in iter_path:
853 assert node is not None, 'Cannot follow path %s%s' % (root, path)
855 try:
856 node_id = next(edge['child'] for edge in graph['edges']
857 if (edge['parent'] == node['id'] and
858 edge['name'] == child_name))
860 node = next(node for node in graph['nodes']
861 if node['id'] == node_id)
863 except StopIteration:
864 node = None
866 if node is None:
867 assert expected_node is None, \
868 'No node found under %s (but expected %s)' % \
869 (path, expected_node)
870 else:
871 assert node['name'] == expected_node, \
872 'Found node %s under %s (but expected %s)' % \
873 (node['name'], path, expected_node)
875 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
877 class QMPTestCase(unittest.TestCase):
878 '''Abstract base class for QMP test cases'''
880 def __init__(self, *args, **kwargs):
881 super().__init__(*args, **kwargs)
882 # Many users of this class set a VM property we rely on heavily
883 # in the methods below.
884 self.vm = None
886 def dictpath(self, d, path):
887 '''Traverse a path in a nested dict'''
888 for component in path.split('/'):
889 m = index_re.match(component)
890 if m:
891 component, idx = m.groups()
892 idx = int(idx)
894 if not isinstance(d, dict) or component not in d:
895 self.fail(f'failed path traversal for "{path}" in "{d}"')
896 d = d[component]
898 if m:
899 if not isinstance(d, list):
900 self.fail(f'path component "{component}" in "{path}" '
901 f'is not a list in "{d}"')
902 try:
903 d = d[idx]
904 except IndexError:
905 self.fail(f'invalid index "{idx}" in path "{path}" '
906 f'in "{d}"')
907 return d
909 def assert_qmp_absent(self, d, path):
910 try:
911 result = self.dictpath(d, path)
912 except AssertionError:
913 return
914 self.fail('path "%s" has value "%s"' % (path, str(result)))
916 def assert_qmp(self, d, path, value):
917 '''Assert that the value for a specific path in a QMP dict
918 matches. When given a list of values, assert that any of
919 them matches.'''
921 result = self.dictpath(d, path)
923 # [] makes no sense as a list of valid values, so treat it as
924 # an actual single value.
925 if isinstance(value, list) and value != []:
926 for v in value:
927 if result == v:
928 return
929 self.fail('no match for "%s" in %s' % (str(result), str(value)))
930 else:
931 self.assertEqual(result, value,
932 '"%s" is "%s", expected "%s"'
933 % (path, str(result), str(value)))
935 def assert_no_active_block_jobs(self):
936 result = self.vm.qmp('query-block-jobs')
937 self.assert_qmp(result, 'return', [])
939 def assert_has_block_node(self, node_name=None, file_name=None):
940 """Issue a query-named-block-nodes and assert node_name and/or
941 file_name is present in the result"""
942 def check_equal_or_none(a, b):
943 return a is None or b is None or a == b
944 assert node_name or file_name
945 result = self.vm.qmp('query-named-block-nodes')
946 for x in result["return"]:
947 if check_equal_or_none(x.get("node-name"), node_name) and \
948 check_equal_or_none(x.get("file"), file_name):
949 return
950 self.fail("Cannot find %s %s in result:\n%s" %
951 (node_name, file_name, result))
953 def assert_json_filename_equal(self, json_filename, reference):
954 '''Asserts that the given filename is a json: filename and that its
955 content is equal to the given reference object'''
956 self.assertEqual(json_filename[:5], 'json:')
957 self.assertEqual(
958 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
959 self.vm.flatten_qmp_object(reference)
962 def cancel_and_wait(self, drive='drive0', force=False,
963 resume=False, wait=60.0):
964 '''Cancel a block job and wait for it to finish, returning the event'''
965 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
966 self.assert_qmp(result, 'return', {})
968 if resume:
969 self.vm.resume_drive(drive)
971 cancelled = False
972 result = None
973 while not cancelled:
974 for event in self.vm.get_qmp_events(wait=wait):
975 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
976 event['event'] == 'BLOCK_JOB_CANCELLED':
977 self.assert_qmp(event, 'data/device', drive)
978 result = event
979 cancelled = True
980 elif event['event'] == 'JOB_STATUS_CHANGE':
981 self.assert_qmp(event, 'data/id', drive)
984 self.assert_no_active_block_jobs()
985 return result
987 def wait_until_completed(self, drive='drive0', check_offset=True,
988 wait=60.0, error=None):
989 '''Wait for a block job to finish, returning the event'''
990 while True:
991 for event in self.vm.get_qmp_events(wait=wait):
992 if event['event'] == 'BLOCK_JOB_COMPLETED':
993 self.assert_qmp(event, 'data/device', drive)
994 if error is None:
995 self.assert_qmp_absent(event, 'data/error')
996 if check_offset:
997 self.assert_qmp(event, 'data/offset',
998 event['data']['len'])
999 else:
1000 self.assert_qmp(event, 'data/error', error)
1001 self.assert_no_active_block_jobs()
1002 return event
1003 if event['event'] == 'JOB_STATUS_CHANGE':
1004 self.assert_qmp(event, 'data/id', drive)
1006 def wait_ready(self, drive='drive0'):
1007 """Wait until a BLOCK_JOB_READY event, and return the event."""
1008 return self.vm.events_wait([
1009 ('BLOCK_JOB_READY',
1010 {'data': {'type': 'mirror', 'device': drive}}),
1011 ('BLOCK_JOB_READY',
1012 {'data': {'type': 'commit', 'device': drive}})
1015 def wait_ready_and_cancel(self, drive='drive0'):
1016 self.wait_ready(drive=drive)
1017 event = self.cancel_and_wait(drive=drive)
1018 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1019 self.assert_qmp(event, 'data/type', 'mirror')
1020 self.assert_qmp(event, 'data/offset', event['data']['len'])
1022 def complete_and_wait(self, drive='drive0', wait_ready=True,
1023 completion_error=None):
1024 '''Complete a block job and wait for it to finish'''
1025 if wait_ready:
1026 self.wait_ready(drive=drive)
1028 result = self.vm.qmp('block-job-complete', device=drive)
1029 self.assert_qmp(result, 'return', {})
1031 event = self.wait_until_completed(drive=drive, error=completion_error)
1032 self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1034 def pause_wait(self, job_id='job0'):
1035 with Timeout(3, "Timeout waiting for job to pause"):
1036 while True:
1037 result = self.vm.qmp('query-block-jobs')
1038 found = False
1039 for job in result['return']:
1040 if job['device'] == job_id:
1041 found = True
1042 if job['paused'] and not job['busy']:
1043 return job
1044 break
1045 assert found
1047 def pause_job(self, job_id='job0', wait=True):
1048 result = self.vm.qmp('block-job-pause', device=job_id)
1049 self.assert_qmp(result, 'return', {})
1050 if wait:
1051 return self.pause_wait(job_id)
1052 return result
1054 def case_skip(self, reason):
1055 '''Skip this test case'''
1056 case_notrun(reason)
1057 self.skipTest(reason)
1060 def notrun(reason):
1061 '''Skip this test suite'''
1062 # Each test in qemu-iotests has a number ("seq")
1063 seq = os.path.basename(sys.argv[0])
1065 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1066 logger.warning("%s not run: %s", seq, reason)
1067 sys.exit(0)
1069 def case_notrun(reason):
1070 '''Mark this test case as not having been run (without actually
1071 skipping it, that is left to the caller). See
1072 QMPTestCase.case_skip() for a variant that actually skips the
1073 current test case.'''
1075 # Each test in qemu-iotests has a number ("seq")
1076 seq = os.path.basename(sys.argv[0])
1078 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1079 ' [case not run] ' + reason + '\n')
1081 def _verify_image_format(supported_fmts: Sequence[str] = (),
1082 unsupported_fmts: Sequence[str] = ()) -> None:
1083 if 'generic' in supported_fmts and \
1084 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1085 # similar to
1086 # _supported_fmt generic
1087 # for bash tests
1088 supported_fmts = ()
1090 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1091 if not_sup or (imgfmt in unsupported_fmts):
1092 notrun('not suitable for this image format: %s' % imgfmt)
1094 if imgfmt == 'luks':
1095 verify_working_luks()
1097 def _verify_protocol(supported: Sequence[str] = (),
1098 unsupported: Sequence[str] = ()) -> None:
1099 assert not (supported and unsupported)
1101 if 'generic' in supported:
1102 return
1104 not_sup = supported and (imgproto not in supported)
1105 if not_sup or (imgproto in unsupported):
1106 notrun('not suitable for this protocol: %s' % imgproto)
1108 def _verify_platform(supported: Sequence[str] = (),
1109 unsupported: Sequence[str] = ()) -> None:
1110 if any((sys.platform.startswith(x) for x in unsupported)):
1111 notrun('not suitable for this OS: %s' % sys.platform)
1113 if supported:
1114 if not any((sys.platform.startswith(x) for x in supported)):
1115 notrun('not suitable for this OS: %s' % sys.platform)
1117 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1118 if supported_cache_modes and (cachemode not in supported_cache_modes):
1119 notrun('not suitable for this cache mode: %s' % cachemode)
1121 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1122 if supported_aio_modes and (aiomode not in supported_aio_modes):
1123 notrun('not suitable for this aio mode: %s' % aiomode)
1125 def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1126 usf_list = list(set(required_formats) - set(supported_formats()))
1127 if usf_list:
1128 notrun(f'formats {usf_list} are not whitelisted')
1130 def supports_quorum():
1131 return 'quorum' in qemu_img_pipe('--help')
1133 def verify_quorum():
1134 '''Skip test suite if quorum support is not available'''
1135 if not supports_quorum():
1136 notrun('quorum support missing')
1138 def has_working_luks() -> Tuple[bool, str]:
1140 Check whether our LUKS driver can actually create images
1141 (this extends to LUKS encryption for qcow2).
1143 If not, return the reason why.
1146 img_file = f'{test_dir}/luks-test.luks'
1147 (output, status) = \
1148 qemu_img_pipe_and_status('create', '-f', 'luks',
1149 '--object', luks_default_secret_object,
1150 '-o', luks_default_key_secret_opt,
1151 '-o', 'iter-time=10',
1152 img_file, '1G')
1153 try:
1154 os.remove(img_file)
1155 except OSError:
1156 pass
1158 if status != 0:
1159 reason = output
1160 for line in output.splitlines():
1161 if img_file + ':' in line:
1162 reason = line.split(img_file + ':', 1)[1].strip()
1163 break
1165 return (False, reason)
1166 else:
1167 return (True, '')
1169 def verify_working_luks():
1171 Skip test suite if LUKS does not work
1173 (working, reason) = has_working_luks()
1174 if not working:
1175 notrun(reason)
1177 def qemu_pipe(*args: str) -> str:
1179 Run qemu with an option to print something and exit (e.g. a help option).
1181 :return: QEMU's stdout output.
1183 full_args = [qemu_prog] + qemu_opts + list(args)
1184 output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1185 return output
1187 def supported_formats(read_only=False):
1188 '''Set 'read_only' to True to check ro-whitelist
1189 Otherwise, rw-whitelist is checked'''
1191 if not hasattr(supported_formats, "formats"):
1192 supported_formats.formats = {}
1194 if read_only not in supported_formats.formats:
1195 format_message = qemu_pipe("-drive", "format=help")
1196 line = 1 if read_only else 0
1197 supported_formats.formats[read_only] = \
1198 format_message.splitlines()[line].split(":")[1].split()
1200 return supported_formats.formats[read_only]
1202 def skip_if_unsupported(required_formats=(), read_only=False):
1203 '''Skip Test Decorator
1204 Runs the test if all the required formats are whitelisted'''
1205 def skip_test_decorator(func):
1206 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1207 **kwargs: Dict[str, Any]) -> None:
1208 if callable(required_formats):
1209 fmts = required_formats(test_case)
1210 else:
1211 fmts = required_formats
1213 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1214 if usf_list:
1215 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1216 test_case.case_skip(msg)
1217 else:
1218 func(test_case, *args, **kwargs)
1219 return func_wrapper
1220 return skip_test_decorator
1222 def skip_for_formats(formats: Sequence[str] = ()) \
1223 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1224 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1225 '''Skip Test Decorator
1226 Skips the test for the given formats'''
1227 def skip_test_decorator(func):
1228 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1229 **kwargs: Dict[str, Any]) -> None:
1230 if imgfmt in formats:
1231 msg = f'{test_case}: Skipped for format {imgfmt}'
1232 test_case.case_skip(msg)
1233 else:
1234 func(test_case, *args, **kwargs)
1235 return func_wrapper
1236 return skip_test_decorator
1238 def skip_if_user_is_root(func):
1239 '''Skip Test Decorator
1240 Runs the test only without root permissions'''
1241 def func_wrapper(*args, **kwargs):
1242 if os.getuid() == 0:
1243 case_notrun('{}: cannot be run as root'.format(args[0]))
1244 return None
1245 else:
1246 return func(*args, **kwargs)
1247 return func_wrapper
1249 def execute_unittest(debug=False):
1250 """Executes unittests within the calling module."""
1252 verbosity = 2 if debug else 1
1254 if debug:
1255 output = sys.stdout
1256 else:
1257 # We need to filter out the time taken from the output so that
1258 # qemu-iotest can reliably diff the results against master output.
1259 output = io.StringIO()
1261 runner = unittest.TextTestRunner(stream=output, descriptions=True,
1262 verbosity=verbosity)
1263 try:
1264 # unittest.main() will use sys.exit(); so expect a SystemExit
1265 # exception
1266 unittest.main(testRunner=runner)
1267 finally:
1268 # We need to filter out the time taken from the output so that
1269 # qemu-iotest can reliably diff the results against master output.
1270 if not debug:
1271 out = output.getvalue()
1272 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1274 # Hide skipped tests from the reference output
1275 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1276 out_first_line, out_rest = out.split('\n', 1)
1277 out = out_first_line.replace('s', '.') + '\n' + out_rest
1279 sys.stderr.write(out)
1281 def execute_setup_common(supported_fmts: Sequence[str] = (),
1282 supported_platforms: Sequence[str] = (),
1283 supported_cache_modes: Sequence[str] = (),
1284 supported_aio_modes: Sequence[str] = (),
1285 unsupported_fmts: Sequence[str] = (),
1286 supported_protocols: Sequence[str] = (),
1287 unsupported_protocols: Sequence[str] = (),
1288 required_fmts: Sequence[str] = ()) -> bool:
1290 Perform necessary setup for either script-style or unittest-style tests.
1292 :return: Bool; Whether or not debug mode has been requested via the CLI.
1294 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1296 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1297 # indicate that we're not being run via "check". There may be
1298 # other things set up by "check" that individual test cases rely
1299 # on.
1300 if test_dir is None or qemu_default_machine is None:
1301 sys.stderr.write('Please run this test via the "check" script\n')
1302 sys.exit(os.EX_USAGE)
1304 debug = '-d' in sys.argv
1305 if debug:
1306 sys.argv.remove('-d')
1307 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1309 _verify_image_format(supported_fmts, unsupported_fmts)
1310 _verify_protocol(supported_protocols, unsupported_protocols)
1311 _verify_platform(supported=supported_platforms)
1312 _verify_cache_mode(supported_cache_modes)
1313 _verify_aio_mode(supported_aio_modes)
1314 _verify_formats(required_fmts)
1316 return debug
1318 def execute_test(*args, test_function=None, **kwargs):
1319 """Run either unittest or script-style tests."""
1321 debug = execute_setup_common(*args, **kwargs)
1322 if not test_function:
1323 execute_unittest(debug)
1324 else:
1325 test_function()
1327 def activate_logging():
1328 """Activate iotests.log() output to stdout for script-style tests."""
1329 handler = logging.StreamHandler(stream=sys.stdout)
1330 formatter = logging.Formatter('%(message)s')
1331 handler.setFormatter(formatter)
1332 test_logger.addHandler(handler)
1333 test_logger.setLevel(logging.INFO)
1334 test_logger.propagate = False
1336 # This is called from script-style iotests without a single point of entry
1337 def script_initialize(*args, **kwargs):
1338 """Initialize script-style tests without running any tests."""
1339 activate_logging()
1340 execute_setup_common(*args, **kwargs)
1342 # This is called from script-style iotests with a single point of entry
1343 def script_main(test_function, *args, **kwargs):
1344 """Run script-style tests outside of the unittest framework"""
1345 activate_logging()
1346 execute_test(*args, test_function=test_function, **kwargs)
1348 # This is called from unittest style iotests
1349 def main(*args, **kwargs):
1350 """Run tests using the unittest framework"""
1351 execute_test(*args, **kwargs)