tests/Makefile: test-image-locking needs CONFIG_POSIX
[qemu/ar7.git] / tests / qemu-iotests / iotests.py
blobe197c73ca5014ac71bcdcf0bff59011dc04ffd5f
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import atexit
20 from collections import OrderedDict
21 import faulthandler
22 import io
23 import json
24 import logging
25 import os
26 import re
27 import signal
28 import struct
29 import subprocess
30 import sys
31 import time
32 from typing import (Any, Callable, Dict, Iterable,
33 List, Optional, Sequence, Tuple, TypeVar)
34 import unittest
36 from contextlib import contextmanager
38 # pylint: disable=import-error, wrong-import-position
39 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40 from qemu import qtest
41 from qemu.qmp import QMPMessage
43 assert sys.version_info >= (3, 6)
45 # Use this logger for logging messages directly from the iotests module
46 logger = logging.getLogger('qemu.iotests')
47 logger.addHandler(logging.NullHandler())
49 # Use this logger for messages that ought to be used for diff output.
50 test_logger = logging.getLogger('qemu.iotests.diff_io')
53 faulthandler.enable()
55 # This will not work if arguments contain spaces but is necessary if we
56 # want to support the override options that ./check supports.
57 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
58 if os.environ.get('QEMU_IMG_OPTIONS'):
59 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
61 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
62 if os.environ.get('QEMU_IO_OPTIONS'):
63 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
65 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
66 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
67 qemu_io_args_no_fmt += \
68 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
70 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
71 if os.environ.get('QEMU_NBD_OPTIONS'):
72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
74 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
77 imgfmt = os.environ.get('IMGFMT', 'raw')
78 imgproto = os.environ.get('IMGPROTO', 'file')
79 test_dir = os.environ.get('TEST_DIR')
80 sock_dir = os.environ.get('SOCK_DIR')
81 output_dir = os.environ.get('OUTPUT_DIR', '.')
82 cachemode = os.environ.get('CACHEMODE')
83 aiomode = os.environ.get('AIOMODE')
84 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
86 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
88 luks_default_secret_object = 'secret,id=keysec0,data=' + \
89 os.environ.get('IMGKEYSECRET', '')
90 luks_default_key_secret_opt = 'key-secret=keysec0'
93 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
94 """
95 Run qemu-img and return both its output and its exit code
96 """
97 subp = subprocess.Popen(qemu_img_args + list(args),
98 stdout=subprocess.PIPE,
99 stderr=subprocess.STDOUT,
100 universal_newlines=True)
101 output = subp.communicate()[0]
102 if subp.returncode < 0:
103 sys.stderr.write('qemu-img received signal %i: %s\n'
104 % (-subp.returncode,
105 ' '.join(qemu_img_args + list(args))))
106 return (output, subp.returncode)
108 def qemu_img(*args: str) -> int:
109 '''Run qemu-img and return the exit code'''
110 return qemu_img_pipe_and_status(*args)[1]
112 def ordered_qmp(qmsg, conv_keys=True):
113 # Dictionaries are not ordered prior to 3.6, therefore:
114 if isinstance(qmsg, list):
115 return [ordered_qmp(atom) for atom in qmsg]
116 if isinstance(qmsg, dict):
117 od = OrderedDict()
118 for k, v in sorted(qmsg.items()):
119 if conv_keys:
120 k = k.replace('_', '-')
121 od[k] = ordered_qmp(v, conv_keys=False)
122 return od
123 return qmsg
125 def qemu_img_create(*args):
126 args = list(args)
128 # default luks support
129 if '-f' in args and args[args.index('-f') + 1] == 'luks':
130 if '-o' in args:
131 i = args.index('-o')
132 if 'key-secret' not in args[i + 1]:
133 args[i + 1].append(luks_default_key_secret_opt)
134 args.insert(i + 2, '--object')
135 args.insert(i + 3, luks_default_secret_object)
136 else:
137 args = ['-o', luks_default_key_secret_opt,
138 '--object', luks_default_secret_object] + args
140 args.insert(0, 'create')
142 return qemu_img(*args)
144 def qemu_img_measure(*args):
145 return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
147 def qemu_img_check(*args):
148 return json.loads(qemu_img_pipe("check", "--output", "json", *args))
150 def qemu_img_verbose(*args):
151 '''Run qemu-img without suppressing its output and return the exit code'''
152 exitcode = subprocess.call(qemu_img_args + list(args))
153 if exitcode < 0:
154 sys.stderr.write('qemu-img received signal %i: %s\n'
155 % (-exitcode, ' '.join(qemu_img_args + list(args))))
156 return exitcode
158 def qemu_img_pipe(*args: str) -> str:
159 '''Run qemu-img and return its output'''
160 return qemu_img_pipe_and_status(*args)[0]
162 def qemu_img_log(*args):
163 result = qemu_img_pipe(*args)
164 log(result, filters=[filter_testfiles])
165 return result
167 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
168 args = ['info']
169 if imgopts:
170 args.append('--image-opts')
171 else:
172 args += ['-f', imgfmt]
173 args += extra_args
174 args.append(filename)
176 output = qemu_img_pipe(*args)
177 if not filter_path:
178 filter_path = filename
179 log(filter_img_info(output, filter_path))
181 def qemu_io(*args):
182 '''Run qemu-io and return the stdout data'''
183 args = qemu_io_args + list(args)
184 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
185 stderr=subprocess.STDOUT,
186 universal_newlines=True)
187 output = subp.communicate()[0]
188 if subp.returncode < 0:
189 sys.stderr.write('qemu-io received signal %i: %s\n'
190 % (-subp.returncode, ' '.join(args)))
191 return output
193 def qemu_io_log(*args):
194 result = qemu_io(*args)
195 log(result, filters=[filter_testfiles, filter_qemu_io])
196 return result
198 def qemu_io_silent(*args):
199 '''Run qemu-io and return the exit code, suppressing stdout'''
200 args = qemu_io_args + list(args)
201 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
202 if exitcode < 0:
203 sys.stderr.write('qemu-io received signal %i: %s\n' %
204 (-exitcode, ' '.join(args)))
205 return exitcode
207 def qemu_io_silent_check(*args):
208 '''Run qemu-io and return the true if subprocess returned 0'''
209 args = qemu_io_args + list(args)
210 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
211 stderr=subprocess.STDOUT)
212 return exitcode == 0
214 def get_virtio_scsi_device():
215 if qemu_default_machine == 's390-ccw-virtio':
216 return 'virtio-scsi-ccw'
217 return 'virtio-scsi-pci'
219 class QemuIoInteractive:
220 def __init__(self, *args):
221 self.args = qemu_io_args_no_fmt + list(args)
222 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
223 stdout=subprocess.PIPE,
224 stderr=subprocess.STDOUT,
225 universal_newlines=True)
226 out = self._p.stdout.read(9)
227 if out != 'qemu-io> ':
228 # Most probably qemu-io just failed to start.
229 # Let's collect the whole output and exit.
230 out += self._p.stdout.read()
231 self._p.wait(timeout=1)
232 raise ValueError(out)
234 def close(self):
235 self._p.communicate('q\n')
237 def _read_output(self):
238 pattern = 'qemu-io> '
239 n = len(pattern)
240 pos = 0
241 s = []
242 while pos != n:
243 c = self._p.stdout.read(1)
244 # check unexpected EOF
245 assert c != ''
246 s.append(c)
247 if c == pattern[pos]:
248 pos += 1
249 else:
250 pos = 0
252 return ''.join(s[:-n])
254 def cmd(self, cmd):
255 # quit command is in close(), '\n' is added automatically
256 assert '\n' not in cmd
257 cmd = cmd.strip()
258 assert cmd not in ('q', 'quit')
259 self._p.stdin.write(cmd + '\n')
260 self._p.stdin.flush()
261 return self._read_output()
264 def qemu_nbd(*args):
265 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
266 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
268 def qemu_nbd_early_pipe(*args):
269 '''Run qemu-nbd in daemon mode and return both the parent's exit code
270 and its output in case of an error'''
271 subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
272 stdout=subprocess.PIPE,
273 universal_newlines=True)
274 output = subp.communicate()[0]
275 if subp.returncode < 0:
276 sys.stderr.write('qemu-nbd received signal %i: %s\n' %
277 (-subp.returncode,
278 ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
280 return subp.returncode, output if subp.returncode else ''
282 @contextmanager
283 def qemu_nbd_popen(*args):
284 '''Context manager running qemu-nbd within the context'''
285 pid_file = file_path("pid")
287 cmd = list(qemu_nbd_args)
288 cmd.extend(('--persistent', '--pid-file', pid_file))
289 cmd.extend(args)
291 log('Start NBD server')
292 p = subprocess.Popen(cmd)
293 try:
294 while not os.path.exists(pid_file):
295 if p.poll() is not None:
296 raise RuntimeError(
297 "qemu-nbd terminated with exit code {}: {}"
298 .format(p.returncode, ' '.join(cmd)))
300 time.sleep(0.01)
301 yield
302 finally:
303 log('Kill NBD server')
304 p.kill()
305 p.wait()
307 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
308 '''Return True if two image files are identical'''
309 return qemu_img('compare', '-f', fmt1,
310 '-F', fmt2, img1, img2) == 0
312 def create_image(name, size):
313 '''Create a fully-allocated raw image with sector markers'''
314 file = open(name, 'wb')
315 i = 0
316 while i < size:
317 sector = struct.pack('>l504xl', i // 512, i // 512)
318 file.write(sector)
319 i = i + 512
320 file.close()
322 def image_size(img):
323 '''Return image's virtual size'''
324 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
325 return json.loads(r)['virtual-size']
327 def is_str(val):
328 return isinstance(val, str)
330 test_dir_re = re.compile(r"%s" % test_dir)
331 def filter_test_dir(msg):
332 return test_dir_re.sub("TEST_DIR", msg)
334 win32_re = re.compile(r"\r")
335 def filter_win32(msg):
336 return win32_re.sub("", msg)
338 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
339 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
340 r"and [0-9\/.inf]* ops\/sec\)")
341 def filter_qemu_io(msg):
342 msg = filter_win32(msg)
343 return qemu_io_re.sub("X ops; XX:XX:XX.X "
344 "(XXX YYY/sec and XXX ops/sec)", msg)
346 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
347 def filter_chown(msg):
348 return chown_re.sub("chown UID:GID", msg)
350 def filter_qmp_event(event):
351 '''Filter a QMP event dict'''
352 event = dict(event)
353 if 'timestamp' in event:
354 event['timestamp']['seconds'] = 'SECS'
355 event['timestamp']['microseconds'] = 'USECS'
356 return event
358 def filter_qmp(qmsg, filter_fn):
359 '''Given a string filter, filter a QMP object's values.
360 filter_fn takes a (key, value) pair.'''
361 # Iterate through either lists or dicts;
362 if isinstance(qmsg, list):
363 items = enumerate(qmsg)
364 else:
365 items = qmsg.items()
367 for k, v in items:
368 if isinstance(v, (dict, list)):
369 qmsg[k] = filter_qmp(v, filter_fn)
370 else:
371 qmsg[k] = filter_fn(k, v)
372 return qmsg
374 def filter_testfiles(msg):
375 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
376 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
377 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
379 def filter_qmp_testfiles(qmsg):
380 def _filter(_key, value):
381 if is_str(value):
382 return filter_testfiles(value)
383 return value
384 return filter_qmp(qmsg, _filter)
386 def filter_generated_node_ids(msg):
387 return re.sub("#block[0-9]+", "NODE_NAME", msg)
389 def filter_img_info(output, filename):
390 lines = []
391 for line in output.split('\n'):
392 if 'disk size' in line or 'actual-size' in line:
393 continue
394 line = line.replace(filename, 'TEST_IMG')
395 line = filter_testfiles(line)
396 line = line.replace(imgfmt, 'IMGFMT')
397 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
398 line = re.sub('uuid: [-a-f0-9]+',
399 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
400 line)
401 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
402 lines.append(line)
403 return '\n'.join(lines)
405 def filter_imgfmt(msg):
406 return msg.replace(imgfmt, 'IMGFMT')
408 def filter_qmp_imgfmt(qmsg):
409 def _filter(_key, value):
410 if is_str(value):
411 return filter_imgfmt(value)
412 return value
413 return filter_qmp(qmsg, _filter)
416 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
418 def log(msg: Msg,
419 filters: Iterable[Callable[[Msg], Msg]] = (),
420 indent: Optional[int] = None) -> None:
422 Logs either a string message or a JSON serializable message (like QMP).
423 If indent is provided, JSON serializable messages are pretty-printed.
425 for flt in filters:
426 msg = flt(msg)
427 if isinstance(msg, (dict, list)):
428 # Don't sort if it's already sorted
429 do_sort = not isinstance(msg, OrderedDict)
430 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
431 else:
432 test_logger.info(msg)
434 class Timeout:
435 def __init__(self, seconds, errmsg="Timeout"):
436 self.seconds = seconds
437 self.errmsg = errmsg
438 def __enter__(self):
439 signal.signal(signal.SIGALRM, self.timeout)
440 signal.setitimer(signal.ITIMER_REAL, self.seconds)
441 return self
442 def __exit__(self, exc_type, value, traceback):
443 signal.setitimer(signal.ITIMER_REAL, 0)
444 return False
445 def timeout(self, signum, frame):
446 raise Exception(self.errmsg)
448 def file_pattern(name):
449 return "{0}-{1}".format(os.getpid(), name)
451 class FilePaths:
453 FilePaths is an auto-generated filename that cleans itself up.
455 Use this context manager to generate filenames and ensure that the file
456 gets deleted::
458 with FilePaths(['test.img']) as img_path:
459 qemu_img('create', img_path, '1G')
460 # migration_sock_path is automatically deleted
462 def __init__(self, names, base_dir=test_dir):
463 self.paths = []
464 for name in names:
465 self.paths.append(os.path.join(base_dir, file_pattern(name)))
467 def __enter__(self):
468 return self.paths
470 def __exit__(self, exc_type, exc_val, exc_tb):
471 try:
472 for path in self.paths:
473 os.remove(path)
474 except OSError:
475 pass
476 return False
478 class FilePath(FilePaths):
480 FilePath is a specialization of FilePaths that takes a single filename.
482 def __init__(self, name, base_dir=test_dir):
483 super(FilePath, self).__init__([name], base_dir)
485 def __enter__(self):
486 return self.paths[0]
488 def file_path_remover():
489 for path in reversed(file_path_remover.paths):
490 try:
491 os.remove(path)
492 except OSError:
493 pass
496 def file_path(*names, base_dir=test_dir):
497 ''' Another way to get auto-generated filename that cleans itself up.
499 Use is as simple as:
501 img_a, img_b = file_path('a.img', 'b.img')
502 sock = file_path('socket')
505 if not hasattr(file_path_remover, 'paths'):
506 file_path_remover.paths = []
507 atexit.register(file_path_remover)
509 paths = []
510 for name in names:
511 filename = file_pattern(name)
512 path = os.path.join(base_dir, filename)
513 file_path_remover.paths.append(path)
514 paths.append(path)
516 return paths[0] if len(paths) == 1 else paths
518 def remote_filename(path):
519 if imgproto == 'file':
520 return path
521 elif imgproto == 'ssh':
522 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
523 else:
524 raise Exception("Protocol %s not supported" % (imgproto))
526 class VM(qtest.QEMUQtestMachine):
527 '''A QEMU VM'''
529 def __init__(self, path_suffix=''):
530 name = "qemu%s-%d" % (path_suffix, os.getpid())
531 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
532 test_dir=test_dir,
533 socket_scm_helper=socket_scm_helper,
534 sock_dir=sock_dir)
535 self._num_drives = 0
537 def add_object(self, opts):
538 self._args.append('-object')
539 self._args.append(opts)
540 return self
542 def add_device(self, opts):
543 self._args.append('-device')
544 self._args.append(opts)
545 return self
547 def add_drive_raw(self, opts):
548 self._args.append('-drive')
549 self._args.append(opts)
550 return self
552 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
553 '''Add a virtio-blk drive to the VM'''
554 options = ['if=%s' % interface,
555 'id=drive%d' % self._num_drives]
557 if path is not None:
558 options.append('file=%s' % path)
559 options.append('format=%s' % img_format)
560 options.append('cache=%s' % cachemode)
561 options.append('aio=%s' % aiomode)
563 if opts:
564 options.append(opts)
566 if img_format == 'luks' and 'key-secret' not in opts:
567 # default luks support
568 if luks_default_secret_object not in self._args:
569 self.add_object(luks_default_secret_object)
571 options.append(luks_default_key_secret_opt)
573 self._args.append('-drive')
574 self._args.append(','.join(options))
575 self._num_drives += 1
576 return self
578 def add_blockdev(self, opts):
579 self._args.append('-blockdev')
580 if isinstance(opts, str):
581 self._args.append(opts)
582 else:
583 self._args.append(','.join(opts))
584 return self
586 def add_incoming(self, addr):
587 self._args.append('-incoming')
588 self._args.append(addr)
589 return self
591 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
592 cmd = 'human-monitor-command'
593 kwargs = {'command-line': command_line}
594 if use_log:
595 return self.qmp_log(cmd, **kwargs)
596 else:
597 return self.qmp(cmd, **kwargs)
599 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
600 """Pause drive r/w operations"""
601 if not event:
602 self.pause_drive(drive, "read_aio")
603 self.pause_drive(drive, "write_aio")
604 return
605 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
607 def resume_drive(self, drive: str) -> None:
608 """Resume drive r/w operations"""
609 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
611 def hmp_qemu_io(self, drive: str, cmd: str,
612 use_log: bool = False) -> QMPMessage:
613 """Write to a given drive using an HMP command"""
614 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
616 def flatten_qmp_object(self, obj, output=None, basestr=''):
617 if output is None:
618 output = dict()
619 if isinstance(obj, list):
620 for i, item in enumerate(obj):
621 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
622 elif isinstance(obj, dict):
623 for key in obj:
624 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
625 else:
626 output[basestr[:-1]] = obj # Strip trailing '.'
627 return output
629 def qmp_to_opts(self, obj):
630 obj = self.flatten_qmp_object(obj)
631 output_list = list()
632 for key in obj:
633 output_list += [key + '=' + obj[key]]
634 return ','.join(output_list)
636 def get_qmp_events_filtered(self, wait=60.0):
637 result = []
638 for ev in self.get_qmp_events(wait=wait):
639 result.append(filter_qmp_event(ev))
640 return result
642 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
643 full_cmd = OrderedDict((
644 ("execute", cmd),
645 ("arguments", ordered_qmp(kwargs))
647 log(full_cmd, filters, indent=indent)
648 result = self.qmp(cmd, **kwargs)
649 log(result, filters, indent=indent)
650 return result
652 # Returns None on success, and an error string on failure
653 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
654 pre_finalize=None, cancel=False, wait=60.0):
656 run_job moves a job from creation through to dismissal.
658 :param job: String. ID of recently-launched job
659 :param auto_finalize: Bool. True if the job was launched with
660 auto_finalize. Defaults to True.
661 :param auto_dismiss: Bool. True if the job was launched with
662 auto_dismiss=True. Defaults to False.
663 :param pre_finalize: Callback. A callable that takes no arguments to be
664 invoked prior to issuing job-finalize, if any.
665 :param cancel: Bool. When true, cancels the job after the pre_finalize
666 callback.
667 :param wait: Float. Timeout value specifying how long to wait for any
668 event, in seconds. Defaults to 60.0.
670 match_device = {'data': {'device': job}}
671 match_id = {'data': {'id': job}}
672 events = [
673 ('BLOCK_JOB_COMPLETED', match_device),
674 ('BLOCK_JOB_CANCELLED', match_device),
675 ('BLOCK_JOB_ERROR', match_device),
676 ('BLOCK_JOB_READY', match_device),
677 ('BLOCK_JOB_PENDING', match_id),
678 ('JOB_STATUS_CHANGE', match_id)
680 error = None
681 while True:
682 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
683 if ev['event'] != 'JOB_STATUS_CHANGE':
684 log(ev)
685 continue
686 status = ev['data']['status']
687 if status == 'aborting':
688 result = self.qmp('query-jobs')
689 for j in result['return']:
690 if j['id'] == job:
691 error = j['error']
692 log('Job failed: %s' % (j['error']))
693 elif status == 'ready':
694 self.qmp_log('job-complete', id=job)
695 elif status == 'pending' and not auto_finalize:
696 if pre_finalize:
697 pre_finalize()
698 if cancel:
699 self.qmp_log('job-cancel', id=job)
700 else:
701 self.qmp_log('job-finalize', id=job)
702 elif status == 'concluded' and not auto_dismiss:
703 self.qmp_log('job-dismiss', id=job)
704 elif status == 'null':
705 return error
707 # Returns None on success, and an error string on failure
708 def blockdev_create(self, options, job_id='job0', filters=None):
709 if filters is None:
710 filters = [filter_qmp_testfiles]
711 result = self.qmp_log('blockdev-create', filters=filters,
712 job_id=job_id, options=options)
714 if 'return' in result:
715 assert result['return'] == {}
716 job_result = self.run_job(job_id)
717 else:
718 job_result = result['error']
720 log("")
721 return job_result
723 def enable_migration_events(self, name):
724 log('Enabling migration QMP events on %s...' % name)
725 log(self.qmp('migrate-set-capabilities', capabilities=[
727 'capability': 'events',
728 'state': True
732 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
733 while True:
734 event = self.event_wait('MIGRATION')
735 log(event, filters=[filter_qmp_event])
736 if event['data']['status'] in ('completed', 'failed'):
737 break
739 if event['data']['status'] == 'completed':
740 # The event may occur in finish-migrate, so wait for the expected
741 # post-migration runstate
742 runstate = None
743 while runstate != expect_runstate:
744 runstate = self.qmp('query-status')['return']['status']
745 return True
746 else:
747 return False
749 def node_info(self, node_name):
750 nodes = self.qmp('query-named-block-nodes')
751 for x in nodes['return']:
752 if x['node-name'] == node_name:
753 return x
754 return None
756 def query_bitmaps(self):
757 res = self.qmp("query-named-block-nodes")
758 return {device['node-name']: device['dirty-bitmaps']
759 for device in res['return'] if 'dirty-bitmaps' in device}
761 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
763 get a specific bitmap from the object returned by query_bitmaps.
764 :param recording: If specified, filter results by the specified value.
765 :param bitmaps: If specified, use it instead of call query_bitmaps()
767 if bitmaps is None:
768 bitmaps = self.query_bitmaps()
770 for bitmap in bitmaps[node_name]:
771 if bitmap.get('name', '') == bitmap_name:
772 if recording is None or bitmap.get('recording') == recording:
773 return bitmap
774 return None
776 def check_bitmap_status(self, node_name, bitmap_name, fields):
777 ret = self.get_bitmap(node_name, bitmap_name)
779 return fields.items() <= ret.items()
781 def assert_block_path(self, root, path, expected_node, graph=None):
783 Check whether the node under the given path in the block graph
784 is @expected_node.
786 @root is the node name of the node where the @path is rooted.
788 @path is a string that consists of child names separated by
789 slashes. It must begin with a slash.
791 Examples for @root + @path:
792 - root="qcow2-node", path="/backing/file"
793 - root="quorum-node", path="/children.2/file"
795 Hypothetically, @path could be empty, in which case it would
796 point to @root. However, in practice this case is not useful
797 and hence not allowed.
799 @expected_node may be None. (All elements of the path but the
800 leaf must still exist.)
802 @graph may be None or the result of an x-debug-query-block-graph
803 call that has already been performed.
805 if graph is None:
806 graph = self.qmp('x-debug-query-block-graph')['return']
808 iter_path = iter(path.split('/'))
810 # Must start with a /
811 assert next(iter_path) == ''
813 node = next((node for node in graph['nodes'] if node['name'] == root),
814 None)
816 # An empty @path is not allowed, so the root node must be present
817 assert node is not None, 'Root node %s not found' % root
819 for child_name in iter_path:
820 assert node is not None, 'Cannot follow path %s%s' % (root, path)
822 try:
823 node_id = next(edge['child'] for edge in graph['edges']
824 if (edge['parent'] == node['id'] and
825 edge['name'] == child_name))
827 node = next(node for node in graph['nodes']
828 if node['id'] == node_id)
830 except StopIteration:
831 node = None
833 if node is None:
834 assert expected_node is None, \
835 'No node found under %s (but expected %s)' % \
836 (path, expected_node)
837 else:
838 assert node['name'] == expected_node, \
839 'Found node %s under %s (but expected %s)' % \
840 (node['name'], path, expected_node)
842 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
844 class QMPTestCase(unittest.TestCase):
845 '''Abstract base class for QMP test cases'''
847 def __init__(self, *args, **kwargs):
848 super().__init__(*args, **kwargs)
849 # Many users of this class set a VM property we rely on heavily
850 # in the methods below.
851 self.vm = None
853 def dictpath(self, d, path):
854 '''Traverse a path in a nested dict'''
855 for component in path.split('/'):
856 m = index_re.match(component)
857 if m:
858 component, idx = m.groups()
859 idx = int(idx)
861 if not isinstance(d, dict) or component not in d:
862 self.fail(f'failed path traversal for "{path}" in "{d}"')
863 d = d[component]
865 if m:
866 if not isinstance(d, list):
867 self.fail(f'path component "{component}" in "{path}" '
868 f'is not a list in "{d}"')
869 try:
870 d = d[idx]
871 except IndexError:
872 self.fail(f'invalid index "{idx}" in path "{path}" '
873 f'in "{d}"')
874 return d
876 def assert_qmp_absent(self, d, path):
877 try:
878 result = self.dictpath(d, path)
879 except AssertionError:
880 return
881 self.fail('path "%s" has value "%s"' % (path, str(result)))
883 def assert_qmp(self, d, path, value):
884 '''Assert that the value for a specific path in a QMP dict
885 matches. When given a list of values, assert that any of
886 them matches.'''
888 result = self.dictpath(d, path)
890 # [] makes no sense as a list of valid values, so treat it as
891 # an actual single value.
892 if isinstance(value, list) and value != []:
893 for v in value:
894 if result == v:
895 return
896 self.fail('no match for "%s" in %s' % (str(result), str(value)))
897 else:
898 self.assertEqual(result, value,
899 '"%s" is "%s", expected "%s"'
900 % (path, str(result), str(value)))
902 def assert_no_active_block_jobs(self):
903 result = self.vm.qmp('query-block-jobs')
904 self.assert_qmp(result, 'return', [])
906 def assert_has_block_node(self, node_name=None, file_name=None):
907 """Issue a query-named-block-nodes and assert node_name and/or
908 file_name is present in the result"""
909 def check_equal_or_none(a, b):
910 return a is None or b is None or a == b
911 assert node_name or file_name
912 result = self.vm.qmp('query-named-block-nodes')
913 for x in result["return"]:
914 if check_equal_or_none(x.get("node-name"), node_name) and \
915 check_equal_or_none(x.get("file"), file_name):
916 return
917 self.fail("Cannot find %s %s in result:\n%s" %
918 (node_name, file_name, result))
920 def assert_json_filename_equal(self, json_filename, reference):
921 '''Asserts that the given filename is a json: filename and that its
922 content is equal to the given reference object'''
923 self.assertEqual(json_filename[:5], 'json:')
924 self.assertEqual(
925 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
926 self.vm.flatten_qmp_object(reference)
929 def cancel_and_wait(self, drive='drive0', force=False,
930 resume=False, wait=60.0):
931 '''Cancel a block job and wait for it to finish, returning the event'''
932 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
933 self.assert_qmp(result, 'return', {})
935 if resume:
936 self.vm.resume_drive(drive)
938 cancelled = False
939 result = None
940 while not cancelled:
941 for event in self.vm.get_qmp_events(wait=wait):
942 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
943 event['event'] == 'BLOCK_JOB_CANCELLED':
944 self.assert_qmp(event, 'data/device', drive)
945 result = event
946 cancelled = True
947 elif event['event'] == 'JOB_STATUS_CHANGE':
948 self.assert_qmp(event, 'data/id', drive)
951 self.assert_no_active_block_jobs()
952 return result
954 def wait_until_completed(self, drive='drive0', check_offset=True,
955 wait=60.0, error=None):
956 '''Wait for a block job to finish, returning the event'''
957 while True:
958 for event in self.vm.get_qmp_events(wait=wait):
959 if event['event'] == 'BLOCK_JOB_COMPLETED':
960 self.assert_qmp(event, 'data/device', drive)
961 if error is None:
962 self.assert_qmp_absent(event, 'data/error')
963 if check_offset:
964 self.assert_qmp(event, 'data/offset',
965 event['data']['len'])
966 else:
967 self.assert_qmp(event, 'data/error', error)
968 self.assert_no_active_block_jobs()
969 return event
970 if event['event'] == 'JOB_STATUS_CHANGE':
971 self.assert_qmp(event, 'data/id', drive)
973 def wait_ready(self, drive='drive0'):
974 """Wait until a BLOCK_JOB_READY event, and return the event."""
975 f = {'data': {'type': 'mirror', 'device': drive}}
976 return self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
978 def wait_ready_and_cancel(self, drive='drive0'):
979 self.wait_ready(drive=drive)
980 event = self.cancel_and_wait(drive=drive)
981 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
982 self.assert_qmp(event, 'data/type', 'mirror')
983 self.assert_qmp(event, 'data/offset', event['data']['len'])
985 def complete_and_wait(self, drive='drive0', wait_ready=True,
986 completion_error=None):
987 '''Complete a block job and wait for it to finish'''
988 if wait_ready:
989 self.wait_ready(drive=drive)
991 result = self.vm.qmp('block-job-complete', device=drive)
992 self.assert_qmp(result, 'return', {})
994 event = self.wait_until_completed(drive=drive, error=completion_error)
995 self.assert_qmp(event, 'data/type', 'mirror')
997 def pause_wait(self, job_id='job0'):
998 with Timeout(3, "Timeout waiting for job to pause"):
999 while True:
1000 result = self.vm.qmp('query-block-jobs')
1001 found = False
1002 for job in result['return']:
1003 if job['device'] == job_id:
1004 found = True
1005 if job['paused'] and not job['busy']:
1006 return job
1007 break
1008 assert found
1010 def pause_job(self, job_id='job0', wait=True):
1011 result = self.vm.qmp('block-job-pause', device=job_id)
1012 self.assert_qmp(result, 'return', {})
1013 if wait:
1014 return self.pause_wait(job_id)
1015 return result
1017 def case_skip(self, reason):
1018 '''Skip this test case'''
1019 case_notrun(reason)
1020 self.skipTest(reason)
1023 def notrun(reason):
1024 '''Skip this test suite'''
1025 # Each test in qemu-iotests has a number ("seq")
1026 seq = os.path.basename(sys.argv[0])
1028 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1029 logger.warning("%s not run: %s", seq, reason)
1030 sys.exit(0)
1032 def case_notrun(reason):
1033 '''Mark this test case as not having been run (without actually
1034 skipping it, that is left to the caller). See
1035 QMPTestCase.case_skip() for a variant that actually skips the
1036 current test case.'''
1038 # Each test in qemu-iotests has a number ("seq")
1039 seq = os.path.basename(sys.argv[0])
1041 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1042 ' [case not run] ' + reason + '\n')
1044 def _verify_image_format(supported_fmts: Sequence[str] = (),
1045 unsupported_fmts: Sequence[str] = ()) -> None:
1046 assert not (supported_fmts and unsupported_fmts)
1048 if 'generic' in supported_fmts and \
1049 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1050 # similar to
1051 # _supported_fmt generic
1052 # for bash tests
1053 if imgfmt == 'luks':
1054 verify_working_luks()
1055 return
1057 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1058 if not_sup or (imgfmt in unsupported_fmts):
1059 notrun('not suitable for this image format: %s' % imgfmt)
1061 if imgfmt == 'luks':
1062 verify_working_luks()
1064 def _verify_protocol(supported: Sequence[str] = (),
1065 unsupported: Sequence[str] = ()) -> None:
1066 assert not (supported and unsupported)
1068 if 'generic' in supported:
1069 return
1071 not_sup = supported and (imgproto not in supported)
1072 if not_sup or (imgproto in unsupported):
1073 notrun('not suitable for this protocol: %s' % imgproto)
1075 def _verify_platform(supported: Sequence[str] = (),
1076 unsupported: Sequence[str] = ()) -> None:
1077 if any((sys.platform.startswith(x) for x in unsupported)):
1078 notrun('not suitable for this OS: %s' % sys.platform)
1080 if supported:
1081 if not any((sys.platform.startswith(x) for x in supported)):
1082 notrun('not suitable for this OS: %s' % sys.platform)
1084 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1085 if supported_cache_modes and (cachemode not in supported_cache_modes):
1086 notrun('not suitable for this cache mode: %s' % cachemode)
1088 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1089 if supported_aio_modes and (aiomode not in supported_aio_modes):
1090 notrun('not suitable for this aio mode: %s' % aiomode)
1092 def supports_quorum():
1093 return 'quorum' in qemu_img_pipe('--help')
1095 def verify_quorum():
1096 '''Skip test suite if quorum support is not available'''
1097 if not supports_quorum():
1098 notrun('quorum support missing')
1100 def has_working_luks() -> Tuple[bool, str]:
1102 Check whether our LUKS driver can actually create images
1103 (this extends to LUKS encryption for qcow2).
1105 If not, return the reason why.
1108 img_file = f'{test_dir}/luks-test.luks'
1109 (output, status) = \
1110 qemu_img_pipe_and_status('create', '-f', 'luks',
1111 '--object', luks_default_secret_object,
1112 '-o', luks_default_key_secret_opt,
1113 '-o', 'iter-time=10',
1114 img_file, '1G')
1115 try:
1116 os.remove(img_file)
1117 except OSError:
1118 pass
1120 if status != 0:
1121 reason = output
1122 for line in output.splitlines():
1123 if img_file + ':' in line:
1124 reason = line.split(img_file + ':', 1)[1].strip()
1125 break
1127 return (False, reason)
1128 else:
1129 return (True, '')
1131 def verify_working_luks():
1133 Skip test suite if LUKS does not work
1135 (working, reason) = has_working_luks()
1136 if not working:
1137 notrun(reason)
1139 def qemu_pipe(*args):
1141 Run qemu with an option to print something and exit (e.g. a help option).
1143 :return: QEMU's stdout output.
1145 args = [qemu_prog] + qemu_opts + list(args)
1146 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
1147 stderr=subprocess.STDOUT,
1148 universal_newlines=True)
1149 output = subp.communicate()[0]
1150 if subp.returncode < 0:
1151 sys.stderr.write('qemu received signal %i: %s\n' %
1152 (-subp.returncode, ' '.join(args)))
1153 return output
1155 def supported_formats(read_only=False):
1156 '''Set 'read_only' to True to check ro-whitelist
1157 Otherwise, rw-whitelist is checked'''
1159 if not hasattr(supported_formats, "formats"):
1160 supported_formats.formats = {}
1162 if read_only not in supported_formats.formats:
1163 format_message = qemu_pipe("-drive", "format=help")
1164 line = 1 if read_only else 0
1165 supported_formats.formats[read_only] = \
1166 format_message.splitlines()[line].split(":")[1].split()
1168 return supported_formats.formats[read_only]
1170 def skip_if_unsupported(required_formats=(), read_only=False):
1171 '''Skip Test Decorator
1172 Runs the test if all the required formats are whitelisted'''
1173 def skip_test_decorator(func):
1174 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1175 **kwargs: Dict[str, Any]) -> None:
1176 if callable(required_formats):
1177 fmts = required_formats(test_case)
1178 else:
1179 fmts = required_formats
1181 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1182 if usf_list:
1183 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1184 test_case.case_skip(msg)
1185 else:
1186 func(test_case, *args, **kwargs)
1187 return func_wrapper
1188 return skip_test_decorator
1190 def skip_for_formats(formats: Sequence[str] = ()) \
1191 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1192 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1193 '''Skip Test Decorator
1194 Skips the test for the given formats'''
1195 def skip_test_decorator(func):
1196 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1197 **kwargs: Dict[str, Any]) -> None:
1198 if imgfmt in formats:
1199 msg = f'{test_case}: Skipped for format {imgfmt}'
1200 test_case.case_skip(msg)
1201 else:
1202 func(test_case, *args, **kwargs)
1203 return func_wrapper
1204 return skip_test_decorator
1206 def skip_if_user_is_root(func):
1207 '''Skip Test Decorator
1208 Runs the test only without root permissions'''
1209 def func_wrapper(*args, **kwargs):
1210 if os.getuid() == 0:
1211 case_notrun('{}: cannot be run as root'.format(args[0]))
1212 return None
1213 else:
1214 return func(*args, **kwargs)
1215 return func_wrapper
1217 def execute_unittest(debug=False):
1218 """Executes unittests within the calling module."""
1220 verbosity = 2 if debug else 1
1222 if debug:
1223 output = sys.stdout
1224 else:
1225 # We need to filter out the time taken from the output so that
1226 # qemu-iotest can reliably diff the results against master output.
1227 output = io.StringIO()
1229 runner = unittest.TextTestRunner(stream=output, descriptions=True,
1230 verbosity=verbosity)
1231 try:
1232 # unittest.main() will use sys.exit(); so expect a SystemExit
1233 # exception
1234 unittest.main(testRunner=runner)
1235 finally:
1236 # We need to filter out the time taken from the output so that
1237 # qemu-iotest can reliably diff the results against master output.
1238 if not debug:
1239 out = output.getvalue()
1240 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1242 # Hide skipped tests from the reference output
1243 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1244 out_first_line, out_rest = out.split('\n', 1)
1245 out = out_first_line.replace('s', '.') + '\n' + out_rest
1247 sys.stderr.write(out)
1249 def execute_setup_common(supported_fmts: Sequence[str] = (),
1250 supported_platforms: Sequence[str] = (),
1251 supported_cache_modes: Sequence[str] = (),
1252 supported_aio_modes: Sequence[str] = (),
1253 unsupported_fmts: Sequence[str] = (),
1254 supported_protocols: Sequence[str] = (),
1255 unsupported_protocols: Sequence[str] = ()) -> bool:
1257 Perform necessary setup for either script-style or unittest-style tests.
1259 :return: Bool; Whether or not debug mode has been requested via the CLI.
1261 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1263 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1264 # indicate that we're not being run via "check". There may be
1265 # other things set up by "check" that individual test cases rely
1266 # on.
1267 if test_dir is None or qemu_default_machine is None:
1268 sys.stderr.write('Please run this test via the "check" script\n')
1269 sys.exit(os.EX_USAGE)
1271 debug = '-d' in sys.argv
1272 if debug:
1273 sys.argv.remove('-d')
1274 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1276 _verify_image_format(supported_fmts, unsupported_fmts)
1277 _verify_protocol(supported_protocols, unsupported_protocols)
1278 _verify_platform(supported=supported_platforms)
1279 _verify_cache_mode(supported_cache_modes)
1280 _verify_aio_mode(supported_aio_modes)
1282 return debug
1284 def execute_test(*args, test_function=None, **kwargs):
1285 """Run either unittest or script-style tests."""
1287 debug = execute_setup_common(*args, **kwargs)
1288 if not test_function:
1289 execute_unittest(debug)
1290 else:
1291 test_function()
1293 def activate_logging():
1294 """Activate iotests.log() output to stdout for script-style tests."""
1295 handler = logging.StreamHandler(stream=sys.stdout)
1296 formatter = logging.Formatter('%(message)s')
1297 handler.setFormatter(formatter)
1298 test_logger.addHandler(handler)
1299 test_logger.setLevel(logging.INFO)
1300 test_logger.propagate = False
1302 # This is called from script-style iotests without a single point of entry
1303 def script_initialize(*args, **kwargs):
1304 """Initialize script-style tests without running any tests."""
1305 activate_logging()
1306 execute_setup_common(*args, **kwargs)
1308 # This is called from script-style iotests with a single point of entry
1309 def script_main(test_function, *args, **kwargs):
1310 """Run script-style tests outside of the unittest framework"""
1311 activate_logging()
1312 execute_test(*args, test_function=test_function, **kwargs)
1314 # This is called from unittest style iotests
1315 def main(*args, **kwargs):
1316 """Run tests using the unittest framework"""
1317 execute_test(*args, **kwargs)