softmmu: List CPU types again
[qemu/rayw.git] / tests / qemu-iotests / iotests.py
blob508adade9e8fd891191f952dde0827c430c47395
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import argparse
20 import atexit
21 import bz2
22 from collections import OrderedDict
23 import faulthandler
24 import json
25 import logging
26 import os
27 import re
28 import shutil
29 import signal
30 import struct
31 import subprocess
32 import sys
33 import time
34 from typing import (Any, Callable, Dict, Iterable, Iterator,
35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36 import unittest
38 from contextlib import contextmanager
40 from qemu.machine import qtest
41 from qemu.qmp import QMPMessage
42 from qemu.aqmp.legacy import QEMUMonitorProtocol
44 # Use this logger for logging messages directly from the iotests module
45 logger = logging.getLogger('qemu.iotests')
46 logger.addHandler(logging.NullHandler())
48 # Use this logger for messages that ought to be used for diff output.
49 test_logger = logging.getLogger('qemu.iotests.diff_io')
52 faulthandler.enable()
54 # This will not work if arguments contain spaces but is necessary if we
55 # want to support the override options that ./check supports.
56 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57 if os.environ.get('QEMU_IMG_OPTIONS'):
58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
60 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61 if os.environ.get('QEMU_IO_OPTIONS'):
62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
64 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66 qemu_io_args_no_fmt += \
67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70 qemu_nbd_args = [qemu_nbd_prog]
71 if os.environ.get('QEMU_NBD_OPTIONS'):
72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
74 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
77 qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
79 gdb_qemu_env = os.environ.get('GDB_OPTIONS')
80 qemu_gdb = []
81 if gdb_qemu_env:
82 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
84 qemu_print = os.environ.get('PRINT_QEMU', False)
86 imgfmt = os.environ.get('IMGFMT', 'raw')
87 imgproto = os.environ.get('IMGPROTO', 'file')
89 try:
90 test_dir = os.environ['TEST_DIR']
91 sock_dir = os.environ['SOCK_DIR']
92 cachemode = os.environ['CACHEMODE']
93 aiomode = os.environ['AIOMODE']
94 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
95 except KeyError:
96 # We are using these variables as proxies to indicate that we're
97 # not being run via "check". There may be other things set up by
98 # "check" that individual test cases rely on.
99 sys.stderr.write('Please run this test via the "check" script\n')
100 sys.exit(os.EX_USAGE)
102 qemu_valgrind = []
103 if os.environ.get('VALGRIND_QEMU') == "y" and \
104 os.environ.get('NO_VALGRIND') != "y":
105 valgrind_logfile = "--log-file=" + test_dir
106 # %p allows to put the valgrind process PID, since
107 # we don't know it a priori (subprocess.Popen is
108 # not yet invoked)
109 valgrind_logfile += "/%p.valgrind"
111 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
113 luks_default_secret_object = 'secret,id=keysec0,data=' + \
114 os.environ.get('IMGKEYSECRET', '')
115 luks_default_key_secret_opt = 'key-secret=keysec0'
117 sample_img_dir = os.environ['SAMPLE_IMG_DIR']
120 @contextmanager
121 def change_log_level(
122 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
124 Utility function for temporarily changing the log level of a logger.
126 This can be used to silence errors that are expected or uninteresting.
128 _logger = logging.getLogger(logger_name)
129 current_level = _logger.level
130 _logger.setLevel(level)
132 try:
133 yield
134 finally:
135 _logger.setLevel(current_level)
138 def unarchive_sample_image(sample, fname):
139 sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
140 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
141 shutil.copyfileobj(f_in, f_out)
144 def qemu_tool_popen(args: Sequence[str],
145 connect_stderr: bool = True) -> 'subprocess.Popen[str]':
146 stderr = subprocess.STDOUT if connect_stderr else None
147 # pylint: disable=consider-using-with
148 return subprocess.Popen(args,
149 stdout=subprocess.PIPE,
150 stderr=stderr,
151 universal_newlines=True)
154 def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
155 connect_stderr: bool = True,
156 drop_successful_output: bool = False) \
157 -> Tuple[str, int]:
159 Run a tool and return both its output and its exit code
161 with qemu_tool_popen(args, connect_stderr) as subp:
162 output = subp.communicate()[0]
163 if subp.returncode < 0:
164 cmd = ' '.join(args)
165 sys.stderr.write(f'{tool} received signal \
166 {-subp.returncode}: {cmd}\n')
167 if drop_successful_output and subp.returncode == 0:
168 output = ''
169 return (output, subp.returncode)
171 def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
172 if not args or args[0] != 'create':
173 return list(args)
174 args = args[1:]
176 p = argparse.ArgumentParser(allow_abbrev=False)
177 # -o option may be specified several times
178 p.add_argument('-o', action='append', default=[])
179 p.add_argument('-f')
180 parsed, remaining = p.parse_known_args(args)
182 opts_list = parsed.o
184 result = ['create']
185 if parsed.f is not None:
186 result += ['-f', parsed.f]
188 # IMGOPTS most probably contain options specific for the selected format,
189 # like extended_l2 or compression_type for qcow2. Test may want to create
190 # additional images in other formats that doesn't support these options.
191 # So, use IMGOPTS only for images created in imgfmt format.
192 imgopts = os.environ.get('IMGOPTS')
193 if imgopts and parsed.f == imgfmt:
194 opts_list.insert(0, imgopts)
196 # default luks support
197 if parsed.f == 'luks' and \
198 all('key-secret' not in opts for opts in opts_list):
199 result += ['--object', luks_default_secret_object]
200 opts_list.append(luks_default_key_secret_opt)
202 for opts in opts_list:
203 result += ['-o', opts]
205 result += remaining
207 return result
209 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
211 Run qemu-img and return both its output and its exit code
213 is_create = bool(args and args[0] == 'create')
214 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
215 return qemu_tool_pipe_and_status('qemu-img', full_args,
216 drop_successful_output=is_create)
218 def qemu_img(*args: str) -> int:
219 '''Run qemu-img and return the exit code'''
220 return qemu_img_pipe_and_status(*args)[1]
222 def ordered_qmp(qmsg, conv_keys=True):
223 # Dictionaries are not ordered prior to 3.6, therefore:
224 if isinstance(qmsg, list):
225 return [ordered_qmp(atom) for atom in qmsg]
226 if isinstance(qmsg, dict):
227 od = OrderedDict()
228 for k, v in sorted(qmsg.items()):
229 if conv_keys:
230 k = k.replace('_', '-')
231 od[k] = ordered_qmp(v, conv_keys=False)
232 return od
233 return qmsg
235 def qemu_img_create(*args):
236 return qemu_img('create', *args)
238 def qemu_img_measure(*args):
239 return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
241 def qemu_img_check(*args):
242 return json.loads(qemu_img_pipe("check", "--output", "json", *args))
244 def qemu_img_pipe(*args: str) -> str:
245 '''Run qemu-img and return its output'''
246 return qemu_img_pipe_and_status(*args)[0]
248 def qemu_img_log(*args):
249 result = qemu_img_pipe(*args)
250 log(result, filters=[filter_testfiles])
251 return result
253 def img_info_log(filename, filter_path=None, use_image_opts=False,
254 extra_args=()):
255 args = ['info']
256 if use_image_opts:
257 args.append('--image-opts')
258 else:
259 args += ['-f', imgfmt]
260 args += extra_args
261 args.append(filename)
263 output = qemu_img_pipe(*args)
264 if not filter_path:
265 filter_path = filename
266 log(filter_img_info(output, filter_path))
268 def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
269 if '-f' in args or '--image-opts' in args:
270 return qemu_io_args_no_fmt + list(args)
271 else:
272 return qemu_io_args + list(args)
274 def qemu_io_popen(*args):
275 return qemu_tool_popen(qemu_io_wrap_args(args))
277 def qemu_io(*args):
278 '''Run qemu-io and return the stdout data'''
279 return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0]
281 def qemu_io_pipe_and_status(*args):
282 return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))
284 def qemu_io_log(*args):
285 result = qemu_io(*args)
286 log(result, filters=[filter_testfiles, filter_qemu_io])
287 return result
289 def qemu_io_silent(*args):
290 '''Run qemu-io and return the exit code, suppressing stdout'''
291 args = qemu_io_wrap_args(args)
292 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
293 if result.returncode < 0:
294 sys.stderr.write('qemu-io received signal %i: %s\n' %
295 (-result.returncode, ' '.join(args)))
296 return result.returncode
298 def qemu_io_silent_check(*args):
299 '''Run qemu-io and return the true if subprocess returned 0'''
300 args = qemu_io_wrap_args(args)
301 result = subprocess.run(args, stdout=subprocess.DEVNULL,
302 stderr=subprocess.STDOUT, check=False)
303 return result.returncode == 0
305 class QemuIoInteractive:
306 def __init__(self, *args):
307 self.args = qemu_io_wrap_args(args)
308 # We need to keep the Popen objext around, and not
309 # close it immediately. Therefore, disable the pylint check:
310 # pylint: disable=consider-using-with
311 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
312 stdout=subprocess.PIPE,
313 stderr=subprocess.STDOUT,
314 universal_newlines=True)
315 out = self._p.stdout.read(9)
316 if out != 'qemu-io> ':
317 # Most probably qemu-io just failed to start.
318 # Let's collect the whole output and exit.
319 out += self._p.stdout.read()
320 self._p.wait(timeout=1)
321 raise ValueError(out)
323 def close(self):
324 self._p.communicate('q\n')
326 def _read_output(self):
327 pattern = 'qemu-io> '
328 n = len(pattern)
329 pos = 0
330 s = []
331 while pos != n:
332 c = self._p.stdout.read(1)
333 # check unexpected EOF
334 assert c != ''
335 s.append(c)
336 if c == pattern[pos]:
337 pos += 1
338 else:
339 pos = 0
341 return ''.join(s[:-n])
343 def cmd(self, cmd):
344 # quit command is in close(), '\n' is added automatically
345 assert '\n' not in cmd
346 cmd = cmd.strip()
347 assert cmd not in ('q', 'quit')
348 self._p.stdin.write(cmd + '\n')
349 self._p.stdin.flush()
350 return self._read_output()
353 class QemuStorageDaemon:
354 _qmp: Optional[QEMUMonitorProtocol] = None
355 _qmpsock: Optional[str] = None
356 # Python < 3.8 would complain if this type were not a string literal
357 # (importing `annotations` from `__future__` would work; but not on <= 3.6)
358 _p: 'Optional[subprocess.Popen[bytes]]' = None
360 def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
361 assert '--pidfile' not in args
362 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
363 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
365 if qmp:
366 self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
367 all_args += ['--chardev',
368 f'socket,id=qmp-sock,path={self._qmpsock}',
369 '--monitor', 'qmp-sock']
371 self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True)
373 # Cannot use with here, we want the subprocess to stay around
374 # pylint: disable=consider-using-with
375 self._p = subprocess.Popen(all_args)
376 if self._qmp is not None:
377 self._qmp.accept()
378 while not os.path.exists(self.pidfile):
379 if self._p.poll() is not None:
380 cmd = ' '.join(all_args)
381 raise RuntimeError(
382 'qemu-storage-daemon terminated with exit code ' +
383 f'{self._p.returncode}: {cmd}')
385 time.sleep(0.01)
387 with open(self.pidfile, encoding='utf-8') as f:
388 self._pid = int(f.read().strip())
390 assert self._pid == self._p.pid
392 def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \
393 -> QMPMessage:
394 assert self._qmp is not None
395 return self._qmp.cmd(cmd, args)
397 def stop(self, kill_signal=15):
398 self._p.send_signal(kill_signal)
399 self._p.wait()
400 self._p = None
402 if self._qmp:
403 self._qmp.close()
405 if self._qmpsock is not None:
406 try:
407 os.remove(self._qmpsock)
408 except OSError:
409 pass
410 try:
411 os.remove(self.pidfile)
412 except OSError:
413 pass
415 def __del__(self):
416 if self._p is not None:
417 self.stop(kill_signal=9)
420 def qemu_nbd(*args):
421 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
422 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
424 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
425 '''Run qemu-nbd in daemon mode and return both the parent's exit code
426 and its output in case of an error'''
427 full_args = qemu_nbd_args + ['--fork'] + list(args)
428 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
429 connect_stderr=False)
430 return returncode, output if returncode else ''
432 def qemu_nbd_list_log(*args: str) -> str:
433 '''Run qemu-nbd to list remote exports'''
434 full_args = [qemu_nbd_prog, '-L'] + list(args)
435 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
436 log(output, filters=[filter_testfiles, filter_nbd_exports])
437 return output
439 @contextmanager
440 def qemu_nbd_popen(*args):
441 '''Context manager running qemu-nbd within the context'''
442 pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
444 assert not os.path.exists(pid_file)
446 cmd = list(qemu_nbd_args)
447 cmd.extend(('--persistent', '--pid-file', pid_file))
448 cmd.extend(args)
450 log('Start NBD server')
451 with subprocess.Popen(cmd) as p:
452 try:
453 while not os.path.exists(pid_file):
454 if p.poll() is not None:
455 raise RuntimeError(
456 "qemu-nbd terminated with exit code {}: {}"
457 .format(p.returncode, ' '.join(cmd)))
459 time.sleep(0.01)
460 yield
461 finally:
462 if os.path.exists(pid_file):
463 os.remove(pid_file)
464 log('Kill NBD server')
465 p.kill()
466 p.wait()
468 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
469 '''Return True if two image files are identical'''
470 return qemu_img('compare', '-f', fmt1,
471 '-F', fmt2, img1, img2) == 0
473 def create_image(name, size):
474 '''Create a fully-allocated raw image with sector markers'''
475 with open(name, 'wb') as file:
476 i = 0
477 while i < size:
478 sector = struct.pack('>l504xl', i // 512, i // 512)
479 file.write(sector)
480 i = i + 512
482 def image_size(img):
483 '''Return image's virtual size'''
484 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
485 return json.loads(r)['virtual-size']
487 def is_str(val):
488 return isinstance(val, str)
490 test_dir_re = re.compile(r"%s" % test_dir)
491 def filter_test_dir(msg):
492 return test_dir_re.sub("TEST_DIR", msg)
494 win32_re = re.compile(r"\r")
495 def filter_win32(msg):
496 return win32_re.sub("", msg)
498 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
499 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
500 r"and [0-9\/.inf]* ops\/sec\)")
501 def filter_qemu_io(msg):
502 msg = filter_win32(msg)
503 return qemu_io_re.sub("X ops; XX:XX:XX.X "
504 "(XXX YYY/sec and XXX ops/sec)", msg)
506 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
507 def filter_chown(msg):
508 return chown_re.sub("chown UID:GID", msg)
510 def filter_qmp_event(event):
511 '''Filter a QMP event dict'''
512 event = dict(event)
513 if 'timestamp' in event:
514 event['timestamp']['seconds'] = 'SECS'
515 event['timestamp']['microseconds'] = 'USECS'
516 return event
518 def filter_qmp(qmsg, filter_fn):
519 '''Given a string filter, filter a QMP object's values.
520 filter_fn takes a (key, value) pair.'''
521 # Iterate through either lists or dicts;
522 if isinstance(qmsg, list):
523 items = enumerate(qmsg)
524 else:
525 items = qmsg.items()
527 for k, v in items:
528 if isinstance(v, (dict, list)):
529 qmsg[k] = filter_qmp(v, filter_fn)
530 else:
531 qmsg[k] = filter_fn(k, v)
532 return qmsg
534 def filter_testfiles(msg):
535 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
536 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
537 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
539 def filter_qmp_testfiles(qmsg):
540 def _filter(_key, value):
541 if is_str(value):
542 return filter_testfiles(value)
543 return value
544 return filter_qmp(qmsg, _filter)
546 def filter_virtio_scsi(output: str) -> str:
547 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
549 def filter_qmp_virtio_scsi(qmsg):
550 def _filter(_key, value):
551 if is_str(value):
552 return filter_virtio_scsi(value)
553 return value
554 return filter_qmp(qmsg, _filter)
556 def filter_generated_node_ids(msg):
557 return re.sub("#block[0-9]+", "NODE_NAME", msg)
559 def filter_img_info(output, filename):
560 lines = []
561 for line in output.split('\n'):
562 if 'disk size' in line or 'actual-size' in line:
563 continue
564 line = line.replace(filename, 'TEST_IMG')
565 line = filter_testfiles(line)
566 line = line.replace(imgfmt, 'IMGFMT')
567 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
568 line = re.sub('uuid: [-a-f0-9]+',
569 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
570 line)
571 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
572 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
573 line)
574 lines.append(line)
575 return '\n'.join(lines)
577 def filter_imgfmt(msg):
578 return msg.replace(imgfmt, 'IMGFMT')
580 def filter_qmp_imgfmt(qmsg):
581 def _filter(_key, value):
582 if is_str(value):
583 return filter_imgfmt(value)
584 return value
585 return filter_qmp(qmsg, _filter)
587 def filter_nbd_exports(output: str) -> str:
588 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
591 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
593 def log(msg: Msg,
594 filters: Iterable[Callable[[Msg], Msg]] = (),
595 indent: Optional[int] = None) -> None:
597 Logs either a string message or a JSON serializable message (like QMP).
598 If indent is provided, JSON serializable messages are pretty-printed.
600 for flt in filters:
601 msg = flt(msg)
602 if isinstance(msg, (dict, list)):
603 # Don't sort if it's already sorted
604 do_sort = not isinstance(msg, OrderedDict)
605 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
606 else:
607 test_logger.info(msg)
609 class Timeout:
610 def __init__(self, seconds, errmsg="Timeout"):
611 self.seconds = seconds
612 self.errmsg = errmsg
613 def __enter__(self):
614 if qemu_gdb or qemu_valgrind:
615 return self
616 signal.signal(signal.SIGALRM, self.timeout)
617 signal.setitimer(signal.ITIMER_REAL, self.seconds)
618 return self
619 def __exit__(self, exc_type, value, traceback):
620 if qemu_gdb or qemu_valgrind:
621 return False
622 signal.setitimer(signal.ITIMER_REAL, 0)
623 return False
624 def timeout(self, signum, frame):
625 raise Exception(self.errmsg)
627 def file_pattern(name):
628 return "{0}-{1}".format(os.getpid(), name)
630 class FilePath:
632 Context manager generating multiple file names. The generated files are
633 removed when exiting the context.
635 Example usage:
637 with FilePath('a.img', 'b.img') as (img_a, img_b):
638 # Use img_a and img_b here...
640 # a.img and b.img are automatically removed here.
642 By default images are created in iotests.test_dir. To create sockets use
643 iotests.sock_dir:
645 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
647 For convenience, calling with one argument yields a single file instead of
648 a tuple with one item.
651 def __init__(self, *names, base_dir=test_dir):
652 self.paths = [os.path.join(base_dir, file_pattern(name))
653 for name in names]
655 def __enter__(self):
656 if len(self.paths) == 1:
657 return self.paths[0]
658 else:
659 return self.paths
661 def __exit__(self, exc_type, exc_val, exc_tb):
662 for path in self.paths:
663 try:
664 os.remove(path)
665 except OSError:
666 pass
667 return False
670 def try_remove(img):
671 try:
672 os.remove(img)
673 except OSError:
674 pass
676 def file_path_remover():
677 for path in reversed(file_path_remover.paths):
678 try_remove(path)
681 def file_path(*names, base_dir=test_dir):
682 ''' Another way to get auto-generated filename that cleans itself up.
684 Use is as simple as:
686 img_a, img_b = file_path('a.img', 'b.img')
687 sock = file_path('socket')
690 if not hasattr(file_path_remover, 'paths'):
691 file_path_remover.paths = []
692 atexit.register(file_path_remover)
694 paths = []
695 for name in names:
696 filename = file_pattern(name)
697 path = os.path.join(base_dir, filename)
698 file_path_remover.paths.append(path)
699 paths.append(path)
701 return paths[0] if len(paths) == 1 else paths
703 def remote_filename(path):
704 if imgproto == 'file':
705 return path
706 elif imgproto == 'ssh':
707 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
708 else:
709 raise Exception("Protocol %s not supported" % (imgproto))
711 class VM(qtest.QEMUQtestMachine):
712 '''A QEMU VM'''
714 def __init__(self, path_suffix=''):
715 name = "qemu%s-%d" % (path_suffix, os.getpid())
716 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
717 if qemu_gdb and qemu_valgrind:
718 sys.stderr.write('gdb and valgrind are mutually exclusive\n')
719 sys.exit(1)
720 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
721 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
722 name=name,
723 base_temp_dir=test_dir,
724 sock_dir=sock_dir, qmp_timer=timer)
725 self._num_drives = 0
727 def _post_shutdown(self) -> None:
728 super()._post_shutdown()
729 if not qemu_valgrind or not self._popen:
730 return
731 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
732 if self.exitcode() == 99:
733 with open(valgrind_filename, encoding='utf-8') as f:
734 print(f.read())
735 else:
736 os.remove(valgrind_filename)
738 def _pre_launch(self) -> None:
739 super()._pre_launch()
740 if qemu_print:
741 # set QEMU binary output to stdout
742 self._close_qemu_log_file()
744 def add_object(self, opts):
745 self._args.append('-object')
746 self._args.append(opts)
747 return self
749 def add_device(self, opts):
750 self._args.append('-device')
751 self._args.append(opts)
752 return self
754 def add_drive_raw(self, opts):
755 self._args.append('-drive')
756 self._args.append(opts)
757 return self
759 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
760 '''Add a virtio-blk drive to the VM'''
761 options = ['if=%s' % interface,
762 'id=drive%d' % self._num_drives]
764 if path is not None:
765 options.append('file=%s' % path)
766 options.append('format=%s' % img_format)
767 options.append('cache=%s' % cachemode)
768 options.append('aio=%s' % aiomode)
770 if opts:
771 options.append(opts)
773 if img_format == 'luks' and 'key-secret' not in opts:
774 # default luks support
775 if luks_default_secret_object not in self._args:
776 self.add_object(luks_default_secret_object)
778 options.append(luks_default_key_secret_opt)
780 self._args.append('-drive')
781 self._args.append(','.join(options))
782 self._num_drives += 1
783 return self
785 def add_blockdev(self, opts):
786 self._args.append('-blockdev')
787 if isinstance(opts, str):
788 self._args.append(opts)
789 else:
790 self._args.append(','.join(opts))
791 return self
793 def add_incoming(self, addr):
794 self._args.append('-incoming')
795 self._args.append(addr)
796 return self
798 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
799 cmd = 'human-monitor-command'
800 kwargs: Dict[str, Any] = {'command-line': command_line}
801 if use_log:
802 return self.qmp_log(cmd, **kwargs)
803 else:
804 return self.qmp(cmd, **kwargs)
806 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
807 """Pause drive r/w operations"""
808 if not event:
809 self.pause_drive(drive, "read_aio")
810 self.pause_drive(drive, "write_aio")
811 return
812 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
814 def resume_drive(self, drive: str) -> None:
815 """Resume drive r/w operations"""
816 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
818 def hmp_qemu_io(self, drive: str, cmd: str,
819 use_log: bool = False, qdev: bool = False) -> QMPMessage:
820 """Write to a given drive using an HMP command"""
821 d = '-d ' if qdev else ''
822 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
824 def flatten_qmp_object(self, obj, output=None, basestr=''):
825 if output is None:
826 output = {}
827 if isinstance(obj, list):
828 for i, item in enumerate(obj):
829 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
830 elif isinstance(obj, dict):
831 for key in obj:
832 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
833 else:
834 output[basestr[:-1]] = obj # Strip trailing '.'
835 return output
837 def qmp_to_opts(self, obj):
838 obj = self.flatten_qmp_object(obj)
839 output_list = []
840 for key in obj:
841 output_list += [key + '=' + obj[key]]
842 return ','.join(output_list)
844 def get_qmp_events_filtered(self, wait=60.0):
845 result = []
846 for ev in self.get_qmp_events(wait=wait):
847 result.append(filter_qmp_event(ev))
848 return result
850 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
851 full_cmd = OrderedDict((
852 ("execute", cmd),
853 ("arguments", ordered_qmp(kwargs))
855 log(full_cmd, filters, indent=indent)
856 result = self.qmp(cmd, **kwargs)
857 log(result, filters, indent=indent)
858 return result
860 # Returns None on success, and an error string on failure
861 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
862 pre_finalize=None, cancel=False, wait=60.0):
864 run_job moves a job from creation through to dismissal.
866 :param job: String. ID of recently-launched job
867 :param auto_finalize: Bool. True if the job was launched with
868 auto_finalize. Defaults to True.
869 :param auto_dismiss: Bool. True if the job was launched with
870 auto_dismiss=True. Defaults to False.
871 :param pre_finalize: Callback. A callable that takes no arguments to be
872 invoked prior to issuing job-finalize, if any.
873 :param cancel: Bool. When true, cancels the job after the pre_finalize
874 callback.
875 :param wait: Float. Timeout value specifying how long to wait for any
876 event, in seconds. Defaults to 60.0.
878 match_device = {'data': {'device': job}}
879 match_id = {'data': {'id': job}}
880 events = [
881 ('BLOCK_JOB_COMPLETED', match_device),
882 ('BLOCK_JOB_CANCELLED', match_device),
883 ('BLOCK_JOB_ERROR', match_device),
884 ('BLOCK_JOB_READY', match_device),
885 ('BLOCK_JOB_PENDING', match_id),
886 ('JOB_STATUS_CHANGE', match_id)
888 error = None
889 while True:
890 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
891 if ev['event'] != 'JOB_STATUS_CHANGE':
892 log(ev)
893 continue
894 status = ev['data']['status']
895 if status == 'aborting':
896 result = self.qmp('query-jobs')
897 for j in result['return']:
898 if j['id'] == job:
899 error = j['error']
900 log('Job failed: %s' % (j['error']))
901 elif status == 'ready':
902 self.qmp_log('job-complete', id=job)
903 elif status == 'pending' and not auto_finalize:
904 if pre_finalize:
905 pre_finalize()
906 if cancel:
907 self.qmp_log('job-cancel', id=job)
908 else:
909 self.qmp_log('job-finalize', id=job)
910 elif status == 'concluded' and not auto_dismiss:
911 self.qmp_log('job-dismiss', id=job)
912 elif status == 'null':
913 return error
915 # Returns None on success, and an error string on failure
916 def blockdev_create(self, options, job_id='job0', filters=None):
917 if filters is None:
918 filters = [filter_qmp_testfiles]
919 result = self.qmp_log('blockdev-create', filters=filters,
920 job_id=job_id, options=options)
922 if 'return' in result:
923 assert result['return'] == {}
924 job_result = self.run_job(job_id)
925 else:
926 job_result = result['error']
928 log("")
929 return job_result
931 def enable_migration_events(self, name):
932 log('Enabling migration QMP events on %s...' % name)
933 log(self.qmp('migrate-set-capabilities', capabilities=[
935 'capability': 'events',
936 'state': True
940 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
941 while True:
942 event = self.event_wait('MIGRATION')
943 # We use the default timeout, and with a timeout, event_wait()
944 # never returns None
945 assert event
947 log(event, filters=[filter_qmp_event])
948 if event['data']['status'] in ('completed', 'failed'):
949 break
951 if event['data']['status'] == 'completed':
952 # The event may occur in finish-migrate, so wait for the expected
953 # post-migration runstate
954 runstate = None
955 while runstate != expect_runstate:
956 runstate = self.qmp('query-status')['return']['status']
957 return True
958 else:
959 return False
961 def node_info(self, node_name):
962 nodes = self.qmp('query-named-block-nodes')
963 for x in nodes['return']:
964 if x['node-name'] == node_name:
965 return x
966 return None
968 def query_bitmaps(self):
969 res = self.qmp("query-named-block-nodes")
970 return {device['node-name']: device['dirty-bitmaps']
971 for device in res['return'] if 'dirty-bitmaps' in device}
973 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
975 get a specific bitmap from the object returned by query_bitmaps.
976 :param recording: If specified, filter results by the specified value.
977 :param bitmaps: If specified, use it instead of call query_bitmaps()
979 if bitmaps is None:
980 bitmaps = self.query_bitmaps()
982 for bitmap in bitmaps[node_name]:
983 if bitmap.get('name', '') == bitmap_name:
984 if recording is None or bitmap.get('recording') == recording:
985 return bitmap
986 return None
988 def check_bitmap_status(self, node_name, bitmap_name, fields):
989 ret = self.get_bitmap(node_name, bitmap_name)
991 return fields.items() <= ret.items()
993 def assert_block_path(self, root, path, expected_node, graph=None):
995 Check whether the node under the given path in the block graph
996 is @expected_node.
998 @root is the node name of the node where the @path is rooted.
1000 @path is a string that consists of child names separated by
1001 slashes. It must begin with a slash.
1003 Examples for @root + @path:
1004 - root="qcow2-node", path="/backing/file"
1005 - root="quorum-node", path="/children.2/file"
1007 Hypothetically, @path could be empty, in which case it would
1008 point to @root. However, in practice this case is not useful
1009 and hence not allowed.
1011 @expected_node may be None. (All elements of the path but the
1012 leaf must still exist.)
1014 @graph may be None or the result of an x-debug-query-block-graph
1015 call that has already been performed.
1017 if graph is None:
1018 graph = self.qmp('x-debug-query-block-graph')['return']
1020 iter_path = iter(path.split('/'))
1022 # Must start with a /
1023 assert next(iter_path) == ''
1025 node = next((node for node in graph['nodes'] if node['name'] == root),
1026 None)
1028 # An empty @path is not allowed, so the root node must be present
1029 assert node is not None, 'Root node %s not found' % root
1031 for child_name in iter_path:
1032 assert node is not None, 'Cannot follow path %s%s' % (root, path)
1034 try:
1035 node_id = next(edge['child'] for edge in graph['edges']
1036 if (edge['parent'] == node['id'] and
1037 edge['name'] == child_name))
1039 node = next(node for node in graph['nodes']
1040 if node['id'] == node_id)
1042 except StopIteration:
1043 node = None
1045 if node is None:
1046 assert expected_node is None, \
1047 'No node found under %s (but expected %s)' % \
1048 (path, expected_node)
1049 else:
1050 assert node['name'] == expected_node, \
1051 'Found node %s under %s (but expected %s)' % \
1052 (node['name'], path, expected_node)
1054 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1056 class QMPTestCase(unittest.TestCase):
1057 '''Abstract base class for QMP test cases'''
1059 def __init__(self, *args, **kwargs):
1060 super().__init__(*args, **kwargs)
1061 # Many users of this class set a VM property we rely on heavily
1062 # in the methods below.
1063 self.vm = None
1065 def dictpath(self, d, path):
1066 '''Traverse a path in a nested dict'''
1067 for component in path.split('/'):
1068 m = index_re.match(component)
1069 if m:
1070 component, idx = m.groups()
1071 idx = int(idx)
1073 if not isinstance(d, dict) or component not in d:
1074 self.fail(f'failed path traversal for "{path}" in "{d}"')
1075 d = d[component]
1077 if m:
1078 if not isinstance(d, list):
1079 self.fail(f'path component "{component}" in "{path}" '
1080 f'is not a list in "{d}"')
1081 try:
1082 d = d[idx]
1083 except IndexError:
1084 self.fail(f'invalid index "{idx}" in path "{path}" '
1085 f'in "{d}"')
1086 return d
1088 def assert_qmp_absent(self, d, path):
1089 try:
1090 result = self.dictpath(d, path)
1091 except AssertionError:
1092 return
1093 self.fail('path "%s" has value "%s"' % (path, str(result)))
1095 def assert_qmp(self, d, path, value):
1096 '''Assert that the value for a specific path in a QMP dict
1097 matches. When given a list of values, assert that any of
1098 them matches.'''
1100 result = self.dictpath(d, path)
1102 # [] makes no sense as a list of valid values, so treat it as
1103 # an actual single value.
1104 if isinstance(value, list) and value != []:
1105 for v in value:
1106 if result == v:
1107 return
1108 self.fail('no match for "%s" in %s' % (str(result), str(value)))
1109 else:
1110 self.assertEqual(result, value,
1111 '"%s" is "%s", expected "%s"'
1112 % (path, str(result), str(value)))
1114 def assert_no_active_block_jobs(self):
1115 result = self.vm.qmp('query-block-jobs')
1116 self.assert_qmp(result, 'return', [])
1118 def assert_has_block_node(self, node_name=None, file_name=None):
1119 """Issue a query-named-block-nodes and assert node_name and/or
1120 file_name is present in the result"""
1121 def check_equal_or_none(a, b):
1122 return a is None or b is None or a == b
1123 assert node_name or file_name
1124 result = self.vm.qmp('query-named-block-nodes')
1125 for x in result["return"]:
1126 if check_equal_or_none(x.get("node-name"), node_name) and \
1127 check_equal_or_none(x.get("file"), file_name):
1128 return
1129 self.fail("Cannot find %s %s in result:\n%s" %
1130 (node_name, file_name, result))
1132 def assert_json_filename_equal(self, json_filename, reference):
1133 '''Asserts that the given filename is a json: filename and that its
1134 content is equal to the given reference object'''
1135 self.assertEqual(json_filename[:5], 'json:')
1136 self.assertEqual(
1137 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1138 self.vm.flatten_qmp_object(reference)
1141 def cancel_and_wait(self, drive='drive0', force=False,
1142 resume=False, wait=60.0):
1143 '''Cancel a block job and wait for it to finish, returning the event'''
1144 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1145 self.assert_qmp(result, 'return', {})
1147 if resume:
1148 self.vm.resume_drive(drive)
1150 cancelled = False
1151 result = None
1152 while not cancelled:
1153 for event in self.vm.get_qmp_events(wait=wait):
1154 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1155 event['event'] == 'BLOCK_JOB_CANCELLED':
1156 self.assert_qmp(event, 'data/device', drive)
1157 result = event
1158 cancelled = True
1159 elif event['event'] == 'JOB_STATUS_CHANGE':
1160 self.assert_qmp(event, 'data/id', drive)
1163 self.assert_no_active_block_jobs()
1164 return result
1166 def wait_until_completed(self, drive='drive0', check_offset=True,
1167 wait=60.0, error=None):
1168 '''Wait for a block job to finish, returning the event'''
1169 while True:
1170 for event in self.vm.get_qmp_events(wait=wait):
1171 if event['event'] == 'BLOCK_JOB_COMPLETED':
1172 self.assert_qmp(event, 'data/device', drive)
1173 if error is None:
1174 self.assert_qmp_absent(event, 'data/error')
1175 if check_offset:
1176 self.assert_qmp(event, 'data/offset',
1177 event['data']['len'])
1178 else:
1179 self.assert_qmp(event, 'data/error', error)
1180 self.assert_no_active_block_jobs()
1181 return event
1182 if event['event'] == 'JOB_STATUS_CHANGE':
1183 self.assert_qmp(event, 'data/id', drive)
1185 def wait_ready(self, drive='drive0'):
1186 """Wait until a BLOCK_JOB_READY event, and return the event."""
1187 return self.vm.events_wait([
1188 ('BLOCK_JOB_READY',
1189 {'data': {'type': 'mirror', 'device': drive}}),
1190 ('BLOCK_JOB_READY',
1191 {'data': {'type': 'commit', 'device': drive}})
1194 def wait_ready_and_cancel(self, drive='drive0'):
1195 self.wait_ready(drive=drive)
1196 event = self.cancel_and_wait(drive=drive)
1197 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1198 self.assert_qmp(event, 'data/type', 'mirror')
1199 self.assert_qmp(event, 'data/offset', event['data']['len'])
1201 def complete_and_wait(self, drive='drive0', wait_ready=True,
1202 completion_error=None):
1203 '''Complete a block job and wait for it to finish'''
1204 if wait_ready:
1205 self.wait_ready(drive=drive)
1207 result = self.vm.qmp('block-job-complete', device=drive)
1208 self.assert_qmp(result, 'return', {})
1210 event = self.wait_until_completed(drive=drive, error=completion_error)
1211 self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1213 def pause_wait(self, job_id='job0'):
1214 with Timeout(3, "Timeout waiting for job to pause"):
1215 while True:
1216 result = self.vm.qmp('query-block-jobs')
1217 found = False
1218 for job in result['return']:
1219 if job['device'] == job_id:
1220 found = True
1221 if job['paused'] and not job['busy']:
1222 return job
1223 break
1224 assert found
1226 def pause_job(self, job_id='job0', wait=True):
1227 result = self.vm.qmp('block-job-pause', device=job_id)
1228 self.assert_qmp(result, 'return', {})
1229 if wait:
1230 return self.pause_wait(job_id)
1231 return result
1233 def case_skip(self, reason):
1234 '''Skip this test case'''
1235 case_notrun(reason)
1236 self.skipTest(reason)
1239 def notrun(reason):
1240 '''Skip this test suite'''
1241 # Each test in qemu-iotests has a number ("seq")
1242 seq = os.path.basename(sys.argv[0])
1244 with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \
1245 as outfile:
1246 outfile.write(reason + '\n')
1247 logger.warning("%s not run: %s", seq, reason)
1248 sys.exit(0)
1250 def case_notrun(reason):
1251 '''Mark this test case as not having been run (without actually
1252 skipping it, that is left to the caller). See
1253 QMPTestCase.case_skip() for a variant that actually skips the
1254 current test case.'''
1256 # Each test in qemu-iotests has a number ("seq")
1257 seq = os.path.basename(sys.argv[0])
1259 with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \
1260 as outfile:
1261 outfile.write(' [case not run] ' + reason + '\n')
1263 def _verify_image_format(supported_fmts: Sequence[str] = (),
1264 unsupported_fmts: Sequence[str] = ()) -> None:
1265 if 'generic' in supported_fmts and \
1266 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1267 # similar to
1268 # _supported_fmt generic
1269 # for bash tests
1270 supported_fmts = ()
1272 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1273 if not_sup or (imgfmt in unsupported_fmts):
1274 notrun('not suitable for this image format: %s' % imgfmt)
1276 if imgfmt == 'luks':
1277 verify_working_luks()
1279 def _verify_protocol(supported: Sequence[str] = (),
1280 unsupported: Sequence[str] = ()) -> None:
1281 assert not (supported and unsupported)
1283 if 'generic' in supported:
1284 return
1286 not_sup = supported and (imgproto not in supported)
1287 if not_sup or (imgproto in unsupported):
1288 notrun('not suitable for this protocol: %s' % imgproto)
1290 def _verify_platform(supported: Sequence[str] = (),
1291 unsupported: Sequence[str] = ()) -> None:
1292 if any((sys.platform.startswith(x) for x in unsupported)):
1293 notrun('not suitable for this OS: %s' % sys.platform)
1295 if supported:
1296 if not any((sys.platform.startswith(x) for x in supported)):
1297 notrun('not suitable for this OS: %s' % sys.platform)
1299 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1300 if supported_cache_modes and (cachemode not in supported_cache_modes):
1301 notrun('not suitable for this cache mode: %s' % cachemode)
1303 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1304 if supported_aio_modes and (aiomode not in supported_aio_modes):
1305 notrun('not suitable for this aio mode: %s' % aiomode)
1307 def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1308 usf_list = list(set(required_formats) - set(supported_formats()))
1309 if usf_list:
1310 notrun(f'formats {usf_list} are not whitelisted')
1313 def _verify_virtio_blk() -> None:
1314 out = qemu_pipe('-M', 'none', '-device', 'help')
1315 if 'virtio-blk' not in out:
1316 notrun('Missing virtio-blk in QEMU binary')
1318 def _verify_virtio_scsi_pci_or_ccw() -> None:
1319 out = qemu_pipe('-M', 'none', '-device', 'help')
1320 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1321 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1324 def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1325 imgopts = os.environ.get('IMGOPTS')
1326 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1327 # but it supported only for bash tests. We don't have a concept of global
1328 # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1329 # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1330 unsup = list(unsupported) + ['$']
1331 if imgopts and any(x in imgopts for x in unsup):
1332 notrun(f'not suitable for this imgopts: {imgopts}')
1335 def supports_quorum():
1336 return 'quorum' in qemu_img_pipe('--help')
1338 def verify_quorum():
1339 '''Skip test suite if quorum support is not available'''
1340 if not supports_quorum():
1341 notrun('quorum support missing')
1343 def has_working_luks() -> Tuple[bool, str]:
1345 Check whether our LUKS driver can actually create images
1346 (this extends to LUKS encryption for qcow2).
1348 If not, return the reason why.
1351 img_file = f'{test_dir}/luks-test.luks'
1352 (output, status) = \
1353 qemu_img_pipe_and_status('create', '-f', 'luks',
1354 '--object', luks_default_secret_object,
1355 '-o', luks_default_key_secret_opt,
1356 '-o', 'iter-time=10',
1357 img_file, '1G')
1358 try:
1359 os.remove(img_file)
1360 except OSError:
1361 pass
1363 if status != 0:
1364 reason = output
1365 for line in output.splitlines():
1366 if img_file + ':' in line:
1367 reason = line.split(img_file + ':', 1)[1].strip()
1368 break
1370 return (False, reason)
1371 else:
1372 return (True, '')
1374 def verify_working_luks():
1376 Skip test suite if LUKS does not work
1378 (working, reason) = has_working_luks()
1379 if not working:
1380 notrun(reason)
1382 def qemu_pipe(*args: str) -> str:
1384 Run qemu with an option to print something and exit (e.g. a help option).
1386 :return: QEMU's stdout output.
1388 full_args = [qemu_prog] + qemu_opts + list(args)
1389 output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1390 return output
1392 def supported_formats(read_only=False):
1393 '''Set 'read_only' to True to check ro-whitelist
1394 Otherwise, rw-whitelist is checked'''
1396 if not hasattr(supported_formats, "formats"):
1397 supported_formats.formats = {}
1399 if read_only not in supported_formats.formats:
1400 format_message = qemu_pipe("-drive", "format=help")
1401 line = 1 if read_only else 0
1402 supported_formats.formats[read_only] = \
1403 format_message.splitlines()[line].split(":")[1].split()
1405 return supported_formats.formats[read_only]
1407 def skip_if_unsupported(required_formats=(), read_only=False):
1408 '''Skip Test Decorator
1409 Runs the test if all the required formats are whitelisted'''
1410 def skip_test_decorator(func):
1411 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1412 **kwargs: Dict[str, Any]) -> None:
1413 if callable(required_formats):
1414 fmts = required_formats(test_case)
1415 else:
1416 fmts = required_formats
1418 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1419 if usf_list:
1420 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1421 test_case.case_skip(msg)
1422 else:
1423 func(test_case, *args, **kwargs)
1424 return func_wrapper
1425 return skip_test_decorator
1427 def skip_for_formats(formats: Sequence[str] = ()) \
1428 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1429 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1430 '''Skip Test Decorator
1431 Skips the test for the given formats'''
1432 def skip_test_decorator(func):
1433 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1434 **kwargs: Dict[str, Any]) -> None:
1435 if imgfmt in formats:
1436 msg = f'{test_case}: Skipped for format {imgfmt}'
1437 test_case.case_skip(msg)
1438 else:
1439 func(test_case, *args, **kwargs)
1440 return func_wrapper
1441 return skip_test_decorator
1443 def skip_if_user_is_root(func):
1444 '''Skip Test Decorator
1445 Runs the test only without root permissions'''
1446 def func_wrapper(*args, **kwargs):
1447 if os.getuid() == 0:
1448 case_notrun('{}: cannot be run as root'.format(args[0]))
1449 return None
1450 else:
1451 return func(*args, **kwargs)
1452 return func_wrapper
1454 # We need to filter out the time taken from the output so that
1455 # qemu-iotest can reliably diff the results against master output,
1456 # and hide skipped tests from the reference output.
1458 class ReproducibleTestResult(unittest.TextTestResult):
1459 def addSkip(self, test, reason):
1460 # Same as TextTestResult, but print dot instead of "s"
1461 unittest.TestResult.addSkip(self, test, reason)
1462 if self.showAll:
1463 self.stream.writeln("skipped {0!r}".format(reason))
1464 elif self.dots:
1465 self.stream.write(".")
1466 self.stream.flush()
1468 class ReproducibleStreamWrapper:
1469 def __init__(self, stream: TextIO):
1470 self.stream = stream
1472 def __getattr__(self, attr):
1473 if attr in ('stream', '__getstate__'):
1474 raise AttributeError(attr)
1475 return getattr(self.stream, attr)
1477 def write(self, arg=None):
1478 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1479 arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1480 self.stream.write(arg)
1482 class ReproducibleTestRunner(unittest.TextTestRunner):
1483 def __init__(self, stream: Optional[TextIO] = None,
1484 resultclass: Type[unittest.TestResult] =
1485 ReproducibleTestResult,
1486 **kwargs: Any) -> None:
1487 rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1488 super().__init__(stream=rstream, # type: ignore
1489 descriptions=True,
1490 resultclass=resultclass,
1491 **kwargs)
1493 def execute_unittest(argv: List[str], debug: bool = False) -> None:
1494 """Executes unittests within the calling module."""
1496 # Some tests have warnings, especially ResourceWarnings for unclosed
1497 # files and sockets. Ignore them for now to ensure reproducibility of
1498 # the test output.
1499 unittest.main(argv=argv,
1500 testRunner=ReproducibleTestRunner,
1501 verbosity=2 if debug else 1,
1502 warnings=None if sys.warnoptions else 'ignore')
1504 def execute_setup_common(supported_fmts: Sequence[str] = (),
1505 supported_platforms: Sequence[str] = (),
1506 supported_cache_modes: Sequence[str] = (),
1507 supported_aio_modes: Sequence[str] = (),
1508 unsupported_fmts: Sequence[str] = (),
1509 supported_protocols: Sequence[str] = (),
1510 unsupported_protocols: Sequence[str] = (),
1511 required_fmts: Sequence[str] = (),
1512 unsupported_imgopts: Sequence[str] = ()) -> bool:
1514 Perform necessary setup for either script-style or unittest-style tests.
1516 :return: Bool; Whether or not debug mode has been requested via the CLI.
1518 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1520 debug = '-d' in sys.argv
1521 if debug:
1522 sys.argv.remove('-d')
1523 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1525 _verify_image_format(supported_fmts, unsupported_fmts)
1526 _verify_protocol(supported_protocols, unsupported_protocols)
1527 _verify_platform(supported=supported_platforms)
1528 _verify_cache_mode(supported_cache_modes)
1529 _verify_aio_mode(supported_aio_modes)
1530 _verify_formats(required_fmts)
1531 _verify_virtio_blk()
1532 _verify_imgopts(unsupported_imgopts)
1534 return debug
1536 def execute_test(*args, test_function=None, **kwargs):
1537 """Run either unittest or script-style tests."""
1539 debug = execute_setup_common(*args, **kwargs)
1540 if not test_function:
1541 execute_unittest(sys.argv, debug)
1542 else:
1543 test_function()
1545 def activate_logging():
1546 """Activate iotests.log() output to stdout for script-style tests."""
1547 handler = logging.StreamHandler(stream=sys.stdout)
1548 formatter = logging.Formatter('%(message)s')
1549 handler.setFormatter(formatter)
1550 test_logger.addHandler(handler)
1551 test_logger.setLevel(logging.INFO)
1552 test_logger.propagate = False
1554 # This is called from script-style iotests without a single point of entry
1555 def script_initialize(*args, **kwargs):
1556 """Initialize script-style tests without running any tests."""
1557 activate_logging()
1558 execute_setup_common(*args, **kwargs)
1560 # This is called from script-style iotests with a single point of entry
1561 def script_main(test_function, *args, **kwargs):
1562 """Run script-style tests outside of the unittest framework"""
1563 activate_logging()
1564 execute_test(*args, test_function=test_function, **kwargs)
1566 # This is called from unittest style iotests
1567 def main(*args, **kwargs):
1568 """Run tests using the unittest framework"""
1569 execute_test(*args, **kwargs)