iotests: Write test output to TEST_DIR
[qemu.git] / tests / qemu-iotests / iotests.py
blobaff1b5d3053805488761df1e84302d3d5721c2a7
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import argparse
20 import atexit
21 import bz2
22 from collections import OrderedDict
23 import faulthandler
24 import json
25 import logging
26 import os
27 import re
28 import shutil
29 import signal
30 import struct
31 import subprocess
32 import sys
33 import time
34 from typing import (Any, Callable, Dict, Iterable, Iterator,
35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36 import unittest
38 from contextlib import contextmanager
40 from qemu.machine import qtest
41 from qemu.qmp import QMPMessage
42 from qemu.aqmp.legacy import QEMUMonitorProtocol
44 # Use this logger for logging messages directly from the iotests module
45 logger = logging.getLogger('qemu.iotests')
46 logger.addHandler(logging.NullHandler())
48 # Use this logger for messages that ought to be used for diff output.
49 test_logger = logging.getLogger('qemu.iotests.diff_io')
52 faulthandler.enable()
54 # This will not work if arguments contain spaces but is necessary if we
55 # want to support the override options that ./check supports.
56 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57 if os.environ.get('QEMU_IMG_OPTIONS'):
58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
60 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61 if os.environ.get('QEMU_IO_OPTIONS'):
62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
64 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66 qemu_io_args_no_fmt += \
67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70 qemu_nbd_args = [qemu_nbd_prog]
71 if os.environ.get('QEMU_NBD_OPTIONS'):
72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
74 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
77 qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
79 gdb_qemu_env = os.environ.get('GDB_OPTIONS')
80 qemu_gdb = []
81 if gdb_qemu_env:
82 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
84 qemu_print = os.environ.get('PRINT_QEMU', False)
86 imgfmt = os.environ.get('IMGFMT', 'raw')
87 imgproto = os.environ.get('IMGPROTO', 'file')
89 try:
90 test_dir = os.environ['TEST_DIR']
91 sock_dir = os.environ['SOCK_DIR']
92 cachemode = os.environ['CACHEMODE']
93 aiomode = os.environ['AIOMODE']
94 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
95 except KeyError:
96 # We are using these variables as proxies to indicate that we're
97 # not being run via "check". There may be other things set up by
98 # "check" that individual test cases rely on.
99 sys.stderr.write('Please run this test via the "check" script\n')
100 sys.exit(os.EX_USAGE)
102 qemu_valgrind = []
103 if os.environ.get('VALGRIND_QEMU') == "y" and \
104 os.environ.get('NO_VALGRIND') != "y":
105 valgrind_logfile = "--log-file=" + test_dir
106 # %p allows to put the valgrind process PID, since
107 # we don't know it a priori (subprocess.Popen is
108 # not yet invoked)
109 valgrind_logfile += "/%p.valgrind"
111 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
113 luks_default_secret_object = 'secret,id=keysec0,data=' + \
114 os.environ.get('IMGKEYSECRET', '')
115 luks_default_key_secret_opt = 'key-secret=keysec0'
117 sample_img_dir = os.environ['SAMPLE_IMG_DIR']
120 @contextmanager
121 def change_log_level(
122 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
124 Utility function for temporarily changing the log level of a logger.
126 This can be used to silence errors that are expected or uninteresting.
128 _logger = logging.getLogger(logger_name)
129 current_level = _logger.level
130 _logger.setLevel(level)
132 try:
133 yield
134 finally:
135 _logger.setLevel(current_level)
138 def unarchive_sample_image(sample, fname):
139 sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
140 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
141 shutil.copyfileobj(f_in, f_out)
144 def qemu_tool_popen(args: Sequence[str],
145 connect_stderr: bool = True) -> 'subprocess.Popen[str]':
146 stderr = subprocess.STDOUT if connect_stderr else None
147 # pylint: disable=consider-using-with
148 return subprocess.Popen(args,
149 stdout=subprocess.PIPE,
150 stderr=stderr,
151 universal_newlines=True)
154 def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
155 connect_stderr: bool = True,
156 drop_successful_output: bool = False) \
157 -> Tuple[str, int]:
159 Run a tool and return both its output and its exit code
161 with qemu_tool_popen(args, connect_stderr) as subp:
162 output = subp.communicate()[0]
163 if subp.returncode < 0:
164 cmd = ' '.join(args)
165 sys.stderr.write(f'{tool} received signal \
166 {-subp.returncode}: {cmd}\n')
167 if drop_successful_output and subp.returncode == 0:
168 output = ''
169 return (output, subp.returncode)
171 def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
172 if not args or args[0] != 'create':
173 return list(args)
174 args = args[1:]
176 p = argparse.ArgumentParser(allow_abbrev=False)
177 # -o option may be specified several times
178 p.add_argument('-o', action='append', default=[])
179 p.add_argument('-f')
180 parsed, remaining = p.parse_known_args(args)
182 opts_list = parsed.o
184 result = ['create']
185 if parsed.f is not None:
186 result += ['-f', parsed.f]
188 # IMGOPTS most probably contain options specific for the selected format,
189 # like extended_l2 or compression_type for qcow2. Test may want to create
190 # additional images in other formats that doesn't support these options.
191 # So, use IMGOPTS only for images created in imgfmt format.
192 imgopts = os.environ.get('IMGOPTS')
193 if imgopts and parsed.f == imgfmt:
194 opts_list.insert(0, imgopts)
196 # default luks support
197 if parsed.f == 'luks' and \
198 all('key-secret' not in opts for opts in opts_list):
199 result += ['--object', luks_default_secret_object]
200 opts_list.append(luks_default_key_secret_opt)
202 for opts in opts_list:
203 result += ['-o', opts]
205 result += remaining
207 return result
209 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
211 Run qemu-img and return both its output and its exit code
213 is_create = bool(args and args[0] == 'create')
214 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
215 return qemu_tool_pipe_and_status('qemu-img', full_args,
216 drop_successful_output=is_create)
218 def qemu_img(*args: str) -> int:
219 '''Run qemu-img and return the exit code'''
220 return qemu_img_pipe_and_status(*args)[1]
222 def ordered_qmp(qmsg, conv_keys=True):
223 # Dictionaries are not ordered prior to 3.6, therefore:
224 if isinstance(qmsg, list):
225 return [ordered_qmp(atom) for atom in qmsg]
226 if isinstance(qmsg, dict):
227 od = OrderedDict()
228 for k, v in sorted(qmsg.items()):
229 if conv_keys:
230 k = k.replace('_', '-')
231 od[k] = ordered_qmp(v, conv_keys=False)
232 return od
233 return qmsg
235 def qemu_img_create(*args):
236 return qemu_img('create', *args)
238 def qemu_img_measure(*args):
239 return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
241 def qemu_img_check(*args):
242 return json.loads(qemu_img_pipe("check", "--output", "json", *args))
244 def qemu_img_pipe(*args: str) -> str:
245 '''Run qemu-img and return its output'''
246 return qemu_img_pipe_and_status(*args)[0]
248 def qemu_img_log(*args):
249 result = qemu_img_pipe(*args)
250 log(result, filters=[filter_testfiles])
251 return result
253 def img_info_log(filename, filter_path=None, use_image_opts=False,
254 extra_args=()):
255 args = ['info']
256 if use_image_opts:
257 args.append('--image-opts')
258 else:
259 args += ['-f', imgfmt]
260 args += extra_args
261 args.append(filename)
263 output = qemu_img_pipe(*args)
264 if not filter_path:
265 filter_path = filename
266 log(filter_img_info(output, filter_path))
268 def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
269 if '-f' in args or '--image-opts' in args:
270 return qemu_io_args_no_fmt + list(args)
271 else:
272 return qemu_io_args + list(args)
274 def qemu_io_popen(*args):
275 return qemu_tool_popen(qemu_io_wrap_args(args))
277 def qemu_io(*args):
278 '''Run qemu-io and return the stdout data'''
279 return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0]
281 def qemu_io_log(*args):
282 result = qemu_io(*args)
283 log(result, filters=[filter_testfiles, filter_qemu_io])
284 return result
286 def qemu_io_silent(*args):
287 '''Run qemu-io and return the exit code, suppressing stdout'''
288 args = qemu_io_wrap_args(args)
289 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
290 if result.returncode < 0:
291 sys.stderr.write('qemu-io received signal %i: %s\n' %
292 (-result.returncode, ' '.join(args)))
293 return result.returncode
295 def qemu_io_silent_check(*args):
296 '''Run qemu-io and return the true if subprocess returned 0'''
297 args = qemu_io_wrap_args(args)
298 result = subprocess.run(args, stdout=subprocess.DEVNULL,
299 stderr=subprocess.STDOUT, check=False)
300 return result.returncode == 0
302 class QemuIoInteractive:
303 def __init__(self, *args):
304 self.args = qemu_io_wrap_args(args)
305 # We need to keep the Popen objext around, and not
306 # close it immediately. Therefore, disable the pylint check:
307 # pylint: disable=consider-using-with
308 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
309 stdout=subprocess.PIPE,
310 stderr=subprocess.STDOUT,
311 universal_newlines=True)
312 out = self._p.stdout.read(9)
313 if out != 'qemu-io> ':
314 # Most probably qemu-io just failed to start.
315 # Let's collect the whole output and exit.
316 out += self._p.stdout.read()
317 self._p.wait(timeout=1)
318 raise ValueError(out)
320 def close(self):
321 self._p.communicate('q\n')
323 def _read_output(self):
324 pattern = 'qemu-io> '
325 n = len(pattern)
326 pos = 0
327 s = []
328 while pos != n:
329 c = self._p.stdout.read(1)
330 # check unexpected EOF
331 assert c != ''
332 s.append(c)
333 if c == pattern[pos]:
334 pos += 1
335 else:
336 pos = 0
338 return ''.join(s[:-n])
340 def cmd(self, cmd):
341 # quit command is in close(), '\n' is added automatically
342 assert '\n' not in cmd
343 cmd = cmd.strip()
344 assert cmd not in ('q', 'quit')
345 self._p.stdin.write(cmd + '\n')
346 self._p.stdin.flush()
347 return self._read_output()
350 class QemuStorageDaemon:
351 _qmp: Optional[QEMUMonitorProtocol] = None
352 _qmpsock: Optional[str] = None
353 # Python < 3.8 would complain if this type were not a string literal
354 # (importing `annotations` from `__future__` would work; but not on <= 3.6)
355 _p: 'Optional[subprocess.Popen[bytes]]' = None
357 def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
358 assert '--pidfile' not in args
359 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
360 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
362 if qmp:
363 self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
364 all_args += ['--chardev',
365 f'socket,id=qmp-sock,path={self._qmpsock}',
366 '--monitor', 'qmp-sock']
368 self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True)
370 # Cannot use with here, we want the subprocess to stay around
371 # pylint: disable=consider-using-with
372 self._p = subprocess.Popen(all_args)
373 if self._qmp is not None:
374 self._qmp.accept()
375 while not os.path.exists(self.pidfile):
376 if self._p.poll() is not None:
377 cmd = ' '.join(all_args)
378 raise RuntimeError(
379 'qemu-storage-daemon terminated with exit code ' +
380 f'{self._p.returncode}: {cmd}')
382 time.sleep(0.01)
384 with open(self.pidfile, encoding='utf-8') as f:
385 self._pid = int(f.read().strip())
387 assert self._pid == self._p.pid
389 def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \
390 -> QMPMessage:
391 assert self._qmp is not None
392 return self._qmp.cmd(cmd, args)
394 def stop(self, kill_signal=15):
395 self._p.send_signal(kill_signal)
396 self._p.wait()
397 self._p = None
399 if self._qmp:
400 self._qmp.close()
402 if self._qmpsock is not None:
403 try:
404 os.remove(self._qmpsock)
405 except OSError:
406 pass
407 try:
408 os.remove(self.pidfile)
409 except OSError:
410 pass
412 def __del__(self):
413 if self._p is not None:
414 self.stop(kill_signal=9)
417 def qemu_nbd(*args):
418 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
419 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
421 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
422 '''Run qemu-nbd in daemon mode and return both the parent's exit code
423 and its output in case of an error'''
424 full_args = qemu_nbd_args + ['--fork'] + list(args)
425 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
426 connect_stderr=False)
427 return returncode, output if returncode else ''
429 def qemu_nbd_list_log(*args: str) -> str:
430 '''Run qemu-nbd to list remote exports'''
431 full_args = [qemu_nbd_prog, '-L'] + list(args)
432 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
433 log(output, filters=[filter_testfiles, filter_nbd_exports])
434 return output
436 @contextmanager
437 def qemu_nbd_popen(*args):
438 '''Context manager running qemu-nbd within the context'''
439 pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
441 assert not os.path.exists(pid_file)
443 cmd = list(qemu_nbd_args)
444 cmd.extend(('--persistent', '--pid-file', pid_file))
445 cmd.extend(args)
447 log('Start NBD server')
448 with subprocess.Popen(cmd) as p:
449 try:
450 while not os.path.exists(pid_file):
451 if p.poll() is not None:
452 raise RuntimeError(
453 "qemu-nbd terminated with exit code {}: {}"
454 .format(p.returncode, ' '.join(cmd)))
456 time.sleep(0.01)
457 yield
458 finally:
459 if os.path.exists(pid_file):
460 os.remove(pid_file)
461 log('Kill NBD server')
462 p.kill()
463 p.wait()
465 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
466 '''Return True if two image files are identical'''
467 return qemu_img('compare', '-f', fmt1,
468 '-F', fmt2, img1, img2) == 0
470 def create_image(name, size):
471 '''Create a fully-allocated raw image with sector markers'''
472 with open(name, 'wb') as file:
473 i = 0
474 while i < size:
475 sector = struct.pack('>l504xl', i // 512, i // 512)
476 file.write(sector)
477 i = i + 512
479 def image_size(img):
480 '''Return image's virtual size'''
481 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
482 return json.loads(r)['virtual-size']
484 def is_str(val):
485 return isinstance(val, str)
487 test_dir_re = re.compile(r"%s" % test_dir)
488 def filter_test_dir(msg):
489 return test_dir_re.sub("TEST_DIR", msg)
491 win32_re = re.compile(r"\r")
492 def filter_win32(msg):
493 return win32_re.sub("", msg)
495 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
496 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
497 r"and [0-9\/.inf]* ops\/sec\)")
498 def filter_qemu_io(msg):
499 msg = filter_win32(msg)
500 return qemu_io_re.sub("X ops; XX:XX:XX.X "
501 "(XXX YYY/sec and XXX ops/sec)", msg)
503 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
504 def filter_chown(msg):
505 return chown_re.sub("chown UID:GID", msg)
507 def filter_qmp_event(event):
508 '''Filter a QMP event dict'''
509 event = dict(event)
510 if 'timestamp' in event:
511 event['timestamp']['seconds'] = 'SECS'
512 event['timestamp']['microseconds'] = 'USECS'
513 return event
515 def filter_qmp(qmsg, filter_fn):
516 '''Given a string filter, filter a QMP object's values.
517 filter_fn takes a (key, value) pair.'''
518 # Iterate through either lists or dicts;
519 if isinstance(qmsg, list):
520 items = enumerate(qmsg)
521 else:
522 items = qmsg.items()
524 for k, v in items:
525 if isinstance(v, (dict, list)):
526 qmsg[k] = filter_qmp(v, filter_fn)
527 else:
528 qmsg[k] = filter_fn(k, v)
529 return qmsg
531 def filter_testfiles(msg):
532 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
533 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
534 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
536 def filter_qmp_testfiles(qmsg):
537 def _filter(_key, value):
538 if is_str(value):
539 return filter_testfiles(value)
540 return value
541 return filter_qmp(qmsg, _filter)
543 def filter_virtio_scsi(output: str) -> str:
544 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
546 def filter_qmp_virtio_scsi(qmsg):
547 def _filter(_key, value):
548 if is_str(value):
549 return filter_virtio_scsi(value)
550 return value
551 return filter_qmp(qmsg, _filter)
553 def filter_generated_node_ids(msg):
554 return re.sub("#block[0-9]+", "NODE_NAME", msg)
556 def filter_img_info(output, filename):
557 lines = []
558 for line in output.split('\n'):
559 if 'disk size' in line or 'actual-size' in line:
560 continue
561 line = line.replace(filename, 'TEST_IMG')
562 line = filter_testfiles(line)
563 line = line.replace(imgfmt, 'IMGFMT')
564 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
565 line = re.sub('uuid: [-a-f0-9]+',
566 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
567 line)
568 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
569 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
570 line)
571 lines.append(line)
572 return '\n'.join(lines)
574 def filter_imgfmt(msg):
575 return msg.replace(imgfmt, 'IMGFMT')
577 def filter_qmp_imgfmt(qmsg):
578 def _filter(_key, value):
579 if is_str(value):
580 return filter_imgfmt(value)
581 return value
582 return filter_qmp(qmsg, _filter)
584 def filter_nbd_exports(output: str) -> str:
585 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
588 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
590 def log(msg: Msg,
591 filters: Iterable[Callable[[Msg], Msg]] = (),
592 indent: Optional[int] = None) -> None:
594 Logs either a string message or a JSON serializable message (like QMP).
595 If indent is provided, JSON serializable messages are pretty-printed.
597 for flt in filters:
598 msg = flt(msg)
599 if isinstance(msg, (dict, list)):
600 # Don't sort if it's already sorted
601 do_sort = not isinstance(msg, OrderedDict)
602 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
603 else:
604 test_logger.info(msg)
606 class Timeout:
607 def __init__(self, seconds, errmsg="Timeout"):
608 self.seconds = seconds
609 self.errmsg = errmsg
610 def __enter__(self):
611 if qemu_gdb or qemu_valgrind:
612 return self
613 signal.signal(signal.SIGALRM, self.timeout)
614 signal.setitimer(signal.ITIMER_REAL, self.seconds)
615 return self
616 def __exit__(self, exc_type, value, traceback):
617 if qemu_gdb or qemu_valgrind:
618 return False
619 signal.setitimer(signal.ITIMER_REAL, 0)
620 return False
621 def timeout(self, signum, frame):
622 raise Exception(self.errmsg)
624 def file_pattern(name):
625 return "{0}-{1}".format(os.getpid(), name)
627 class FilePath:
629 Context manager generating multiple file names. The generated files are
630 removed when exiting the context.
632 Example usage:
634 with FilePath('a.img', 'b.img') as (img_a, img_b):
635 # Use img_a and img_b here...
637 # a.img and b.img are automatically removed here.
639 By default images are created in iotests.test_dir. To create sockets use
640 iotests.sock_dir:
642 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
644 For convenience, calling with one argument yields a single file instead of
645 a tuple with one item.
648 def __init__(self, *names, base_dir=test_dir):
649 self.paths = [os.path.join(base_dir, file_pattern(name))
650 for name in names]
652 def __enter__(self):
653 if len(self.paths) == 1:
654 return self.paths[0]
655 else:
656 return self.paths
658 def __exit__(self, exc_type, exc_val, exc_tb):
659 for path in self.paths:
660 try:
661 os.remove(path)
662 except OSError:
663 pass
664 return False
667 def try_remove(img):
668 try:
669 os.remove(img)
670 except OSError:
671 pass
673 def file_path_remover():
674 for path in reversed(file_path_remover.paths):
675 try_remove(path)
678 def file_path(*names, base_dir=test_dir):
679 ''' Another way to get auto-generated filename that cleans itself up.
681 Use is as simple as:
683 img_a, img_b = file_path('a.img', 'b.img')
684 sock = file_path('socket')
687 if not hasattr(file_path_remover, 'paths'):
688 file_path_remover.paths = []
689 atexit.register(file_path_remover)
691 paths = []
692 for name in names:
693 filename = file_pattern(name)
694 path = os.path.join(base_dir, filename)
695 file_path_remover.paths.append(path)
696 paths.append(path)
698 return paths[0] if len(paths) == 1 else paths
700 def remote_filename(path):
701 if imgproto == 'file':
702 return path
703 elif imgproto == 'ssh':
704 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
705 else:
706 raise Exception("Protocol %s not supported" % (imgproto))
708 class VM(qtest.QEMUQtestMachine):
709 '''A QEMU VM'''
711 def __init__(self, path_suffix=''):
712 name = "qemu%s-%d" % (path_suffix, os.getpid())
713 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
714 if qemu_gdb and qemu_valgrind:
715 sys.stderr.write('gdb and valgrind are mutually exclusive\n')
716 sys.exit(1)
717 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
718 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
719 name=name,
720 base_temp_dir=test_dir,
721 sock_dir=sock_dir, qmp_timer=timer)
722 self._num_drives = 0
724 def _post_shutdown(self) -> None:
725 super()._post_shutdown()
726 if not qemu_valgrind or not self._popen:
727 return
728 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
729 if self.exitcode() == 99:
730 with open(valgrind_filename, encoding='utf-8') as f:
731 print(f.read())
732 else:
733 os.remove(valgrind_filename)
735 def _pre_launch(self) -> None:
736 super()._pre_launch()
737 if qemu_print:
738 # set QEMU binary output to stdout
739 self._close_qemu_log_file()
741 def add_object(self, opts):
742 self._args.append('-object')
743 self._args.append(opts)
744 return self
746 def add_device(self, opts):
747 self._args.append('-device')
748 self._args.append(opts)
749 return self
751 def add_drive_raw(self, opts):
752 self._args.append('-drive')
753 self._args.append(opts)
754 return self
756 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
757 '''Add a virtio-blk drive to the VM'''
758 options = ['if=%s' % interface,
759 'id=drive%d' % self._num_drives]
761 if path is not None:
762 options.append('file=%s' % path)
763 options.append('format=%s' % img_format)
764 options.append('cache=%s' % cachemode)
765 options.append('aio=%s' % aiomode)
767 if opts:
768 options.append(opts)
770 if img_format == 'luks' and 'key-secret' not in opts:
771 # default luks support
772 if luks_default_secret_object not in self._args:
773 self.add_object(luks_default_secret_object)
775 options.append(luks_default_key_secret_opt)
777 self._args.append('-drive')
778 self._args.append(','.join(options))
779 self._num_drives += 1
780 return self
782 def add_blockdev(self, opts):
783 self._args.append('-blockdev')
784 if isinstance(opts, str):
785 self._args.append(opts)
786 else:
787 self._args.append(','.join(opts))
788 return self
790 def add_incoming(self, addr):
791 self._args.append('-incoming')
792 self._args.append(addr)
793 return self
795 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
796 cmd = 'human-monitor-command'
797 kwargs: Dict[str, Any] = {'command-line': command_line}
798 if use_log:
799 return self.qmp_log(cmd, **kwargs)
800 else:
801 return self.qmp(cmd, **kwargs)
803 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
804 """Pause drive r/w operations"""
805 if not event:
806 self.pause_drive(drive, "read_aio")
807 self.pause_drive(drive, "write_aio")
808 return
809 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
811 def resume_drive(self, drive: str) -> None:
812 """Resume drive r/w operations"""
813 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
815 def hmp_qemu_io(self, drive: str, cmd: str,
816 use_log: bool = False, qdev: bool = False) -> QMPMessage:
817 """Write to a given drive using an HMP command"""
818 d = '-d ' if qdev else ''
819 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
821 def flatten_qmp_object(self, obj, output=None, basestr=''):
822 if output is None:
823 output = {}
824 if isinstance(obj, list):
825 for i, item in enumerate(obj):
826 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
827 elif isinstance(obj, dict):
828 for key in obj:
829 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
830 else:
831 output[basestr[:-1]] = obj # Strip trailing '.'
832 return output
834 def qmp_to_opts(self, obj):
835 obj = self.flatten_qmp_object(obj)
836 output_list = []
837 for key in obj:
838 output_list += [key + '=' + obj[key]]
839 return ','.join(output_list)
841 def get_qmp_events_filtered(self, wait=60.0):
842 result = []
843 for ev in self.get_qmp_events(wait=wait):
844 result.append(filter_qmp_event(ev))
845 return result
847 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
848 full_cmd = OrderedDict((
849 ("execute", cmd),
850 ("arguments", ordered_qmp(kwargs))
852 log(full_cmd, filters, indent=indent)
853 result = self.qmp(cmd, **kwargs)
854 log(result, filters, indent=indent)
855 return result
857 # Returns None on success, and an error string on failure
858 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
859 pre_finalize=None, cancel=False, wait=60.0):
861 run_job moves a job from creation through to dismissal.
863 :param job: String. ID of recently-launched job
864 :param auto_finalize: Bool. True if the job was launched with
865 auto_finalize. Defaults to True.
866 :param auto_dismiss: Bool. True if the job was launched with
867 auto_dismiss=True. Defaults to False.
868 :param pre_finalize: Callback. A callable that takes no arguments to be
869 invoked prior to issuing job-finalize, if any.
870 :param cancel: Bool. When true, cancels the job after the pre_finalize
871 callback.
872 :param wait: Float. Timeout value specifying how long to wait for any
873 event, in seconds. Defaults to 60.0.
875 match_device = {'data': {'device': job}}
876 match_id = {'data': {'id': job}}
877 events = [
878 ('BLOCK_JOB_COMPLETED', match_device),
879 ('BLOCK_JOB_CANCELLED', match_device),
880 ('BLOCK_JOB_ERROR', match_device),
881 ('BLOCK_JOB_READY', match_device),
882 ('BLOCK_JOB_PENDING', match_id),
883 ('JOB_STATUS_CHANGE', match_id)
885 error = None
886 while True:
887 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
888 if ev['event'] != 'JOB_STATUS_CHANGE':
889 log(ev)
890 continue
891 status = ev['data']['status']
892 if status == 'aborting':
893 result = self.qmp('query-jobs')
894 for j in result['return']:
895 if j['id'] == job:
896 error = j['error']
897 log('Job failed: %s' % (j['error']))
898 elif status == 'ready':
899 self.qmp_log('job-complete', id=job)
900 elif status == 'pending' and not auto_finalize:
901 if pre_finalize:
902 pre_finalize()
903 if cancel:
904 self.qmp_log('job-cancel', id=job)
905 else:
906 self.qmp_log('job-finalize', id=job)
907 elif status == 'concluded' and not auto_dismiss:
908 self.qmp_log('job-dismiss', id=job)
909 elif status == 'null':
910 return error
912 # Returns None on success, and an error string on failure
913 def blockdev_create(self, options, job_id='job0', filters=None):
914 if filters is None:
915 filters = [filter_qmp_testfiles]
916 result = self.qmp_log('blockdev-create', filters=filters,
917 job_id=job_id, options=options)
919 if 'return' in result:
920 assert result['return'] == {}
921 job_result = self.run_job(job_id)
922 else:
923 job_result = result['error']
925 log("")
926 return job_result
928 def enable_migration_events(self, name):
929 log('Enabling migration QMP events on %s...' % name)
930 log(self.qmp('migrate-set-capabilities', capabilities=[
932 'capability': 'events',
933 'state': True
937 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
938 while True:
939 event = self.event_wait('MIGRATION')
940 # We use the default timeout, and with a timeout, event_wait()
941 # never returns None
942 assert event
944 log(event, filters=[filter_qmp_event])
945 if event['data']['status'] in ('completed', 'failed'):
946 break
948 if event['data']['status'] == 'completed':
949 # The event may occur in finish-migrate, so wait for the expected
950 # post-migration runstate
951 runstate = None
952 while runstate != expect_runstate:
953 runstate = self.qmp('query-status')['return']['status']
954 return True
955 else:
956 return False
958 def node_info(self, node_name):
959 nodes = self.qmp('query-named-block-nodes')
960 for x in nodes['return']:
961 if x['node-name'] == node_name:
962 return x
963 return None
965 def query_bitmaps(self):
966 res = self.qmp("query-named-block-nodes")
967 return {device['node-name']: device['dirty-bitmaps']
968 for device in res['return'] if 'dirty-bitmaps' in device}
970 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
972 get a specific bitmap from the object returned by query_bitmaps.
973 :param recording: If specified, filter results by the specified value.
974 :param bitmaps: If specified, use it instead of call query_bitmaps()
976 if bitmaps is None:
977 bitmaps = self.query_bitmaps()
979 for bitmap in bitmaps[node_name]:
980 if bitmap.get('name', '') == bitmap_name:
981 if recording is None or bitmap.get('recording') == recording:
982 return bitmap
983 return None
985 def check_bitmap_status(self, node_name, bitmap_name, fields):
986 ret = self.get_bitmap(node_name, bitmap_name)
988 return fields.items() <= ret.items()
990 def assert_block_path(self, root, path, expected_node, graph=None):
992 Check whether the node under the given path in the block graph
993 is @expected_node.
995 @root is the node name of the node where the @path is rooted.
997 @path is a string that consists of child names separated by
998 slashes. It must begin with a slash.
1000 Examples for @root + @path:
1001 - root="qcow2-node", path="/backing/file"
1002 - root="quorum-node", path="/children.2/file"
1004 Hypothetically, @path could be empty, in which case it would
1005 point to @root. However, in practice this case is not useful
1006 and hence not allowed.
1008 @expected_node may be None. (All elements of the path but the
1009 leaf must still exist.)
1011 @graph may be None or the result of an x-debug-query-block-graph
1012 call that has already been performed.
1014 if graph is None:
1015 graph = self.qmp('x-debug-query-block-graph')['return']
1017 iter_path = iter(path.split('/'))
1019 # Must start with a /
1020 assert next(iter_path) == ''
1022 node = next((node for node in graph['nodes'] if node['name'] == root),
1023 None)
1025 # An empty @path is not allowed, so the root node must be present
1026 assert node is not None, 'Root node %s not found' % root
1028 for child_name in iter_path:
1029 assert node is not None, 'Cannot follow path %s%s' % (root, path)
1031 try:
1032 node_id = next(edge['child'] for edge in graph['edges']
1033 if (edge['parent'] == node['id'] and
1034 edge['name'] == child_name))
1036 node = next(node for node in graph['nodes']
1037 if node['id'] == node_id)
1039 except StopIteration:
1040 node = None
1042 if node is None:
1043 assert expected_node is None, \
1044 'No node found under %s (but expected %s)' % \
1045 (path, expected_node)
1046 else:
1047 assert node['name'] == expected_node, \
1048 'Found node %s under %s (but expected %s)' % \
1049 (node['name'], path, expected_node)
1051 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1053 class QMPTestCase(unittest.TestCase):
1054 '''Abstract base class for QMP test cases'''
1056 def __init__(self, *args, **kwargs):
1057 super().__init__(*args, **kwargs)
1058 # Many users of this class set a VM property we rely on heavily
1059 # in the methods below.
1060 self.vm = None
1062 def dictpath(self, d, path):
1063 '''Traverse a path in a nested dict'''
1064 for component in path.split('/'):
1065 m = index_re.match(component)
1066 if m:
1067 component, idx = m.groups()
1068 idx = int(idx)
1070 if not isinstance(d, dict) or component not in d:
1071 self.fail(f'failed path traversal for "{path}" in "{d}"')
1072 d = d[component]
1074 if m:
1075 if not isinstance(d, list):
1076 self.fail(f'path component "{component}" in "{path}" '
1077 f'is not a list in "{d}"')
1078 try:
1079 d = d[idx]
1080 except IndexError:
1081 self.fail(f'invalid index "{idx}" in path "{path}" '
1082 f'in "{d}"')
1083 return d
1085 def assert_qmp_absent(self, d, path):
1086 try:
1087 result = self.dictpath(d, path)
1088 except AssertionError:
1089 return
1090 self.fail('path "%s" has value "%s"' % (path, str(result)))
1092 def assert_qmp(self, d, path, value):
1093 '''Assert that the value for a specific path in a QMP dict
1094 matches. When given a list of values, assert that any of
1095 them matches.'''
1097 result = self.dictpath(d, path)
1099 # [] makes no sense as a list of valid values, so treat it as
1100 # an actual single value.
1101 if isinstance(value, list) and value != []:
1102 for v in value:
1103 if result == v:
1104 return
1105 self.fail('no match for "%s" in %s' % (str(result), str(value)))
1106 else:
1107 self.assertEqual(result, value,
1108 '"%s" is "%s", expected "%s"'
1109 % (path, str(result), str(value)))
1111 def assert_no_active_block_jobs(self):
1112 result = self.vm.qmp('query-block-jobs')
1113 self.assert_qmp(result, 'return', [])
1115 def assert_has_block_node(self, node_name=None, file_name=None):
1116 """Issue a query-named-block-nodes and assert node_name and/or
1117 file_name is present in the result"""
1118 def check_equal_or_none(a, b):
1119 return a is None or b is None or a == b
1120 assert node_name or file_name
1121 result = self.vm.qmp('query-named-block-nodes')
1122 for x in result["return"]:
1123 if check_equal_or_none(x.get("node-name"), node_name) and \
1124 check_equal_or_none(x.get("file"), file_name):
1125 return
1126 self.fail("Cannot find %s %s in result:\n%s" %
1127 (node_name, file_name, result))
1129 def assert_json_filename_equal(self, json_filename, reference):
1130 '''Asserts that the given filename is a json: filename and that its
1131 content is equal to the given reference object'''
1132 self.assertEqual(json_filename[:5], 'json:')
1133 self.assertEqual(
1134 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1135 self.vm.flatten_qmp_object(reference)
1138 def cancel_and_wait(self, drive='drive0', force=False,
1139 resume=False, wait=60.0):
1140 '''Cancel a block job and wait for it to finish, returning the event'''
1141 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1142 self.assert_qmp(result, 'return', {})
1144 if resume:
1145 self.vm.resume_drive(drive)
1147 cancelled = False
1148 result = None
1149 while not cancelled:
1150 for event in self.vm.get_qmp_events(wait=wait):
1151 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1152 event['event'] == 'BLOCK_JOB_CANCELLED':
1153 self.assert_qmp(event, 'data/device', drive)
1154 result = event
1155 cancelled = True
1156 elif event['event'] == 'JOB_STATUS_CHANGE':
1157 self.assert_qmp(event, 'data/id', drive)
1160 self.assert_no_active_block_jobs()
1161 return result
1163 def wait_until_completed(self, drive='drive0', check_offset=True,
1164 wait=60.0, error=None):
1165 '''Wait for a block job to finish, returning the event'''
1166 while True:
1167 for event in self.vm.get_qmp_events(wait=wait):
1168 if event['event'] == 'BLOCK_JOB_COMPLETED':
1169 self.assert_qmp(event, 'data/device', drive)
1170 if error is None:
1171 self.assert_qmp_absent(event, 'data/error')
1172 if check_offset:
1173 self.assert_qmp(event, 'data/offset',
1174 event['data']['len'])
1175 else:
1176 self.assert_qmp(event, 'data/error', error)
1177 self.assert_no_active_block_jobs()
1178 return event
1179 if event['event'] == 'JOB_STATUS_CHANGE':
1180 self.assert_qmp(event, 'data/id', drive)
1182 def wait_ready(self, drive='drive0'):
1183 """Wait until a BLOCK_JOB_READY event, and return the event."""
1184 return self.vm.events_wait([
1185 ('BLOCK_JOB_READY',
1186 {'data': {'type': 'mirror', 'device': drive}}),
1187 ('BLOCK_JOB_READY',
1188 {'data': {'type': 'commit', 'device': drive}})
1191 def wait_ready_and_cancel(self, drive='drive0'):
1192 self.wait_ready(drive=drive)
1193 event = self.cancel_and_wait(drive=drive)
1194 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1195 self.assert_qmp(event, 'data/type', 'mirror')
1196 self.assert_qmp(event, 'data/offset', event['data']['len'])
1198 def complete_and_wait(self, drive='drive0', wait_ready=True,
1199 completion_error=None):
1200 '''Complete a block job and wait for it to finish'''
1201 if wait_ready:
1202 self.wait_ready(drive=drive)
1204 result = self.vm.qmp('block-job-complete', device=drive)
1205 self.assert_qmp(result, 'return', {})
1207 event = self.wait_until_completed(drive=drive, error=completion_error)
1208 self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1210 def pause_wait(self, job_id='job0'):
1211 with Timeout(3, "Timeout waiting for job to pause"):
1212 while True:
1213 result = self.vm.qmp('query-block-jobs')
1214 found = False
1215 for job in result['return']:
1216 if job['device'] == job_id:
1217 found = True
1218 if job['paused'] and not job['busy']:
1219 return job
1220 break
1221 assert found
1223 def pause_job(self, job_id='job0', wait=True):
1224 result = self.vm.qmp('block-job-pause', device=job_id)
1225 self.assert_qmp(result, 'return', {})
1226 if wait:
1227 return self.pause_wait(job_id)
1228 return result
1230 def case_skip(self, reason):
1231 '''Skip this test case'''
1232 case_notrun(reason)
1233 self.skipTest(reason)
1236 def notrun(reason):
1237 '''Skip this test suite'''
1238 # Each test in qemu-iotests has a number ("seq")
1239 seq = os.path.basename(sys.argv[0])
1241 with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \
1242 as outfile:
1243 outfile.write(reason + '\n')
1244 logger.warning("%s not run: %s", seq, reason)
1245 sys.exit(0)
1247 def case_notrun(reason):
1248 '''Mark this test case as not having been run (without actually
1249 skipping it, that is left to the caller). See
1250 QMPTestCase.case_skip() for a variant that actually skips the
1251 current test case.'''
1253 # Each test in qemu-iotests has a number ("seq")
1254 seq = os.path.basename(sys.argv[0])
1256 with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \
1257 as outfile:
1258 outfile.write(' [case not run] ' + reason + '\n')
1260 def _verify_image_format(supported_fmts: Sequence[str] = (),
1261 unsupported_fmts: Sequence[str] = ()) -> None:
1262 if 'generic' in supported_fmts and \
1263 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1264 # similar to
1265 # _supported_fmt generic
1266 # for bash tests
1267 supported_fmts = ()
1269 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1270 if not_sup or (imgfmt in unsupported_fmts):
1271 notrun('not suitable for this image format: %s' % imgfmt)
1273 if imgfmt == 'luks':
1274 verify_working_luks()
1276 def _verify_protocol(supported: Sequence[str] = (),
1277 unsupported: Sequence[str] = ()) -> None:
1278 assert not (supported and unsupported)
1280 if 'generic' in supported:
1281 return
1283 not_sup = supported and (imgproto not in supported)
1284 if not_sup or (imgproto in unsupported):
1285 notrun('not suitable for this protocol: %s' % imgproto)
1287 def _verify_platform(supported: Sequence[str] = (),
1288 unsupported: Sequence[str] = ()) -> None:
1289 if any((sys.platform.startswith(x) for x in unsupported)):
1290 notrun('not suitable for this OS: %s' % sys.platform)
1292 if supported:
1293 if not any((sys.platform.startswith(x) for x in supported)):
1294 notrun('not suitable for this OS: %s' % sys.platform)
1296 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1297 if supported_cache_modes and (cachemode not in supported_cache_modes):
1298 notrun('not suitable for this cache mode: %s' % cachemode)
1300 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1301 if supported_aio_modes and (aiomode not in supported_aio_modes):
1302 notrun('not suitable for this aio mode: %s' % aiomode)
1304 def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1305 usf_list = list(set(required_formats) - set(supported_formats()))
1306 if usf_list:
1307 notrun(f'formats {usf_list} are not whitelisted')
1310 def _verify_virtio_blk() -> None:
1311 out = qemu_pipe('-M', 'none', '-device', 'help')
1312 if 'virtio-blk' not in out:
1313 notrun('Missing virtio-blk in QEMU binary')
1315 def _verify_virtio_scsi_pci_or_ccw() -> None:
1316 out = qemu_pipe('-M', 'none', '-device', 'help')
1317 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1318 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1321 def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1322 imgopts = os.environ.get('IMGOPTS')
1323 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1324 # but it supported only for bash tests. We don't have a concept of global
1325 # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1326 # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1327 unsup = list(unsupported) + ['$']
1328 if imgopts and any(x in imgopts for x in unsup):
1329 notrun(f'not suitable for this imgopts: {imgopts}')
1332 def supports_quorum():
1333 return 'quorum' in qemu_img_pipe('--help')
1335 def verify_quorum():
1336 '''Skip test suite if quorum support is not available'''
1337 if not supports_quorum():
1338 notrun('quorum support missing')
1340 def has_working_luks() -> Tuple[bool, str]:
1342 Check whether our LUKS driver can actually create images
1343 (this extends to LUKS encryption for qcow2).
1345 If not, return the reason why.
1348 img_file = f'{test_dir}/luks-test.luks'
1349 (output, status) = \
1350 qemu_img_pipe_and_status('create', '-f', 'luks',
1351 '--object', luks_default_secret_object,
1352 '-o', luks_default_key_secret_opt,
1353 '-o', 'iter-time=10',
1354 img_file, '1G')
1355 try:
1356 os.remove(img_file)
1357 except OSError:
1358 pass
1360 if status != 0:
1361 reason = output
1362 for line in output.splitlines():
1363 if img_file + ':' in line:
1364 reason = line.split(img_file + ':', 1)[1].strip()
1365 break
1367 return (False, reason)
1368 else:
1369 return (True, '')
1371 def verify_working_luks():
1373 Skip test suite if LUKS does not work
1375 (working, reason) = has_working_luks()
1376 if not working:
1377 notrun(reason)
1379 def qemu_pipe(*args: str) -> str:
1381 Run qemu with an option to print something and exit (e.g. a help option).
1383 :return: QEMU's stdout output.
1385 full_args = [qemu_prog] + qemu_opts + list(args)
1386 output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1387 return output
1389 def supported_formats(read_only=False):
1390 '''Set 'read_only' to True to check ro-whitelist
1391 Otherwise, rw-whitelist is checked'''
1393 if not hasattr(supported_formats, "formats"):
1394 supported_formats.formats = {}
1396 if read_only not in supported_formats.formats:
1397 format_message = qemu_pipe("-drive", "format=help")
1398 line = 1 if read_only else 0
1399 supported_formats.formats[read_only] = \
1400 format_message.splitlines()[line].split(":")[1].split()
1402 return supported_formats.formats[read_only]
1404 def skip_if_unsupported(required_formats=(), read_only=False):
1405 '''Skip Test Decorator
1406 Runs the test if all the required formats are whitelisted'''
1407 def skip_test_decorator(func):
1408 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1409 **kwargs: Dict[str, Any]) -> None:
1410 if callable(required_formats):
1411 fmts = required_formats(test_case)
1412 else:
1413 fmts = required_formats
1415 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1416 if usf_list:
1417 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1418 test_case.case_skip(msg)
1419 else:
1420 func(test_case, *args, **kwargs)
1421 return func_wrapper
1422 return skip_test_decorator
1424 def skip_for_formats(formats: Sequence[str] = ()) \
1425 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1426 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1427 '''Skip Test Decorator
1428 Skips the test for the given formats'''
1429 def skip_test_decorator(func):
1430 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1431 **kwargs: Dict[str, Any]) -> None:
1432 if imgfmt in formats:
1433 msg = f'{test_case}: Skipped for format {imgfmt}'
1434 test_case.case_skip(msg)
1435 else:
1436 func(test_case, *args, **kwargs)
1437 return func_wrapper
1438 return skip_test_decorator
1440 def skip_if_user_is_root(func):
1441 '''Skip Test Decorator
1442 Runs the test only without root permissions'''
1443 def func_wrapper(*args, **kwargs):
1444 if os.getuid() == 0:
1445 case_notrun('{}: cannot be run as root'.format(args[0]))
1446 return None
1447 else:
1448 return func(*args, **kwargs)
1449 return func_wrapper
1451 # We need to filter out the time taken from the output so that
1452 # qemu-iotest can reliably diff the results against master output,
1453 # and hide skipped tests from the reference output.
1455 class ReproducibleTestResult(unittest.TextTestResult):
1456 def addSkip(self, test, reason):
1457 # Same as TextTestResult, but print dot instead of "s"
1458 unittest.TestResult.addSkip(self, test, reason)
1459 if self.showAll:
1460 self.stream.writeln("skipped {0!r}".format(reason))
1461 elif self.dots:
1462 self.stream.write(".")
1463 self.stream.flush()
1465 class ReproducibleStreamWrapper:
1466 def __init__(self, stream: TextIO):
1467 self.stream = stream
1469 def __getattr__(self, attr):
1470 if attr in ('stream', '__getstate__'):
1471 raise AttributeError(attr)
1472 return getattr(self.stream, attr)
1474 def write(self, arg=None):
1475 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1476 arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1477 self.stream.write(arg)
1479 class ReproducibleTestRunner(unittest.TextTestRunner):
1480 def __init__(self, stream: Optional[TextIO] = None,
1481 resultclass: Type[unittest.TestResult] =
1482 ReproducibleTestResult,
1483 **kwargs: Any) -> None:
1484 rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1485 super().__init__(stream=rstream, # type: ignore
1486 descriptions=True,
1487 resultclass=resultclass,
1488 **kwargs)
1490 def execute_unittest(argv: List[str], debug: bool = False) -> None:
1491 """Executes unittests within the calling module."""
1493 # Some tests have warnings, especially ResourceWarnings for unclosed
1494 # files and sockets. Ignore them for now to ensure reproducibility of
1495 # the test output.
1496 unittest.main(argv=argv,
1497 testRunner=ReproducibleTestRunner,
1498 verbosity=2 if debug else 1,
1499 warnings=None if sys.warnoptions else 'ignore')
1501 def execute_setup_common(supported_fmts: Sequence[str] = (),
1502 supported_platforms: Sequence[str] = (),
1503 supported_cache_modes: Sequence[str] = (),
1504 supported_aio_modes: Sequence[str] = (),
1505 unsupported_fmts: Sequence[str] = (),
1506 supported_protocols: Sequence[str] = (),
1507 unsupported_protocols: Sequence[str] = (),
1508 required_fmts: Sequence[str] = (),
1509 unsupported_imgopts: Sequence[str] = ()) -> bool:
1511 Perform necessary setup for either script-style or unittest-style tests.
1513 :return: Bool; Whether or not debug mode has been requested via the CLI.
1515 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1517 debug = '-d' in sys.argv
1518 if debug:
1519 sys.argv.remove('-d')
1520 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1522 _verify_image_format(supported_fmts, unsupported_fmts)
1523 _verify_protocol(supported_protocols, unsupported_protocols)
1524 _verify_platform(supported=supported_platforms)
1525 _verify_cache_mode(supported_cache_modes)
1526 _verify_aio_mode(supported_aio_modes)
1527 _verify_formats(required_fmts)
1528 _verify_virtio_blk()
1529 _verify_imgopts(unsupported_imgopts)
1531 return debug
1533 def execute_test(*args, test_function=None, **kwargs):
1534 """Run either unittest or script-style tests."""
1536 debug = execute_setup_common(*args, **kwargs)
1537 if not test_function:
1538 execute_unittest(sys.argv, debug)
1539 else:
1540 test_function()
1542 def activate_logging():
1543 """Activate iotests.log() output to stdout for script-style tests."""
1544 handler = logging.StreamHandler(stream=sys.stdout)
1545 formatter = logging.Formatter('%(message)s')
1546 handler.setFormatter(formatter)
1547 test_logger.addHandler(handler)
1548 test_logger.setLevel(logging.INFO)
1549 test_logger.propagate = False
1551 # This is called from script-style iotests without a single point of entry
1552 def script_initialize(*args, **kwargs):
1553 """Initialize script-style tests without running any tests."""
1554 activate_logging()
1555 execute_setup_common(*args, **kwargs)
1557 # This is called from script-style iotests with a single point of entry
1558 def script_main(test_function, *args, **kwargs):
1559 """Run script-style tests outside of the unittest framework"""
1560 activate_logging()
1561 execute_test(*args, test_function=test_function, **kwargs)
1563 # This is called from unittest style iotests
1564 def main(*args, **kwargs):
1565 """Run tests using the unittest framework"""
1566 execute_test(*args, **kwargs)