iotests: rebase qemu_io() on top of qemu_tool()
[qemu.git] / tests / qemu-iotests / iotests.py
blob06d35af21a064af1fc6333cfdfcdad6c92686410
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import argparse
20 import atexit
21 import bz2
22 from collections import OrderedDict
23 import faulthandler
24 import json
25 import logging
26 import os
27 import re
28 import shutil
29 import signal
30 import struct
31 import subprocess
32 import sys
33 import time
34 from typing import (Any, Callable, Dict, Iterable, Iterator,
35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36 import unittest
38 from contextlib import contextmanager
40 from qemu.machine import qtest
41 from qemu.qmp.legacy import QMPMessage, QEMUMonitorProtocol
42 from qemu.utils import VerboseProcessError
44 # Use this logger for logging messages directly from the iotests module
45 logger = logging.getLogger('qemu.iotests')
46 logger.addHandler(logging.NullHandler())
48 # Use this logger for messages that ought to be used for diff output.
49 test_logger = logging.getLogger('qemu.iotests.diff_io')
52 faulthandler.enable()
54 # This will not work if arguments contain spaces but is necessary if we
55 # want to support the override options that ./check supports.
56 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57 if os.environ.get('QEMU_IMG_OPTIONS'):
58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
60 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61 if os.environ.get('QEMU_IO_OPTIONS'):
62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
64 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66 qemu_io_args_no_fmt += \
67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70 qemu_nbd_args = [qemu_nbd_prog]
71 if os.environ.get('QEMU_NBD_OPTIONS'):
72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
74 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
77 qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
79 gdb_qemu_env = os.environ.get('GDB_OPTIONS')
80 qemu_gdb = []
81 if gdb_qemu_env:
82 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
84 qemu_print = os.environ.get('PRINT_QEMU', False)
86 imgfmt = os.environ.get('IMGFMT', 'raw')
87 imgproto = os.environ.get('IMGPROTO', 'file')
89 try:
90 test_dir = os.environ['TEST_DIR']
91 sock_dir = os.environ['SOCK_DIR']
92 cachemode = os.environ['CACHEMODE']
93 aiomode = os.environ['AIOMODE']
94 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
95 except KeyError:
96 # We are using these variables as proxies to indicate that we're
97 # not being run via "check". There may be other things set up by
98 # "check" that individual test cases rely on.
99 sys.stderr.write('Please run this test via the "check" script\n')
100 sys.exit(os.EX_USAGE)
102 qemu_valgrind = []
103 if os.environ.get('VALGRIND_QEMU') == "y" and \
104 os.environ.get('NO_VALGRIND') != "y":
105 valgrind_logfile = "--log-file=" + test_dir
106 # %p allows to put the valgrind process PID, since
107 # we don't know it a priori (subprocess.Popen is
108 # not yet invoked)
109 valgrind_logfile += "/%p.valgrind"
111 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
113 luks_default_secret_object = 'secret,id=keysec0,data=' + \
114 os.environ.get('IMGKEYSECRET', '')
115 luks_default_key_secret_opt = 'key-secret=keysec0'
117 sample_img_dir = os.environ['SAMPLE_IMG_DIR']
120 @contextmanager
121 def change_log_level(
122 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
124 Utility function for temporarily changing the log level of a logger.
126 This can be used to silence errors that are expected or uninteresting.
128 _logger = logging.getLogger(logger_name)
129 current_level = _logger.level
130 _logger.setLevel(level)
132 try:
133 yield
134 finally:
135 _logger.setLevel(current_level)
138 def unarchive_sample_image(sample, fname):
139 sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
140 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
141 shutil.copyfileobj(f_in, f_out)
144 def qemu_tool_popen(args: Sequence[str],
145 connect_stderr: bool = True) -> 'subprocess.Popen[str]':
146 stderr = subprocess.STDOUT if connect_stderr else None
147 # pylint: disable=consider-using-with
148 return subprocess.Popen(args,
149 stdout=subprocess.PIPE,
150 stderr=stderr,
151 universal_newlines=True)
154 def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
155 connect_stderr: bool = True,
156 drop_successful_output: bool = False) \
157 -> Tuple[str, int]:
159 Run a tool and return both its output and its exit code
161 with qemu_tool_popen(args, connect_stderr) as subp:
162 output = subp.communicate()[0]
163 if subp.returncode < 0:
164 cmd = ' '.join(args)
165 sys.stderr.write(f'{tool} received signal \
166 {-subp.returncode}: {cmd}\n')
167 if drop_successful_output and subp.returncode == 0:
168 output = ''
169 return (output, subp.returncode)
171 def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
172 if not args or args[0] != 'create':
173 return list(args)
174 args = args[1:]
176 p = argparse.ArgumentParser(allow_abbrev=False)
177 # -o option may be specified several times
178 p.add_argument('-o', action='append', default=[])
179 p.add_argument('-f')
180 parsed, remaining = p.parse_known_args(args)
182 opts_list = parsed.o
184 result = ['create']
185 if parsed.f is not None:
186 result += ['-f', parsed.f]
188 # IMGOPTS most probably contain options specific for the selected format,
189 # like extended_l2 or compression_type for qcow2. Test may want to create
190 # additional images in other formats that doesn't support these options.
191 # So, use IMGOPTS only for images created in imgfmt format.
192 imgopts = os.environ.get('IMGOPTS')
193 if imgopts and parsed.f == imgfmt:
194 opts_list.insert(0, imgopts)
196 # default luks support
197 if parsed.f == 'luks' and \
198 all('key-secret' not in opts for opts in opts_list):
199 result += ['--object', luks_default_secret_object]
200 opts_list.append(luks_default_key_secret_opt)
202 for opts in opts_list:
203 result += ['-o', opts]
205 result += remaining
207 return result
210 def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True
211 ) -> 'subprocess.CompletedProcess[str]':
213 Run a qemu tool and return its status code and console output.
215 :param args: full command line to run.
216 :param check: Enforce a return code of zero.
217 :param combine_stdio: set to False to keep stdout/stderr separated.
219 :raise VerboseProcessError:
220 When the return code is negative, or on any non-zero exit code
221 when 'check=True' was provided (the default). This exception has
222 'stdout', 'stderr', and 'returncode' properties that may be
223 inspected to show greater detail. If this exception is not
224 handled, the command-line, return code, and all console output
225 will be included at the bottom of the stack trace.
227 :return:
228 a CompletedProcess. This object has args, returncode, and stdout
229 properties. If streams are not combined, it will also have a
230 stderr property.
232 subp = subprocess.run(
233 args,
234 stdout=subprocess.PIPE,
235 stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE,
236 universal_newlines=True,
237 check=False
240 if check and subp.returncode or (subp.returncode < 0):
241 raise VerboseProcessError(
242 subp.returncode, args,
243 output=subp.stdout,
244 stderr=subp.stderr,
247 return subp
250 def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True
251 ) -> 'subprocess.CompletedProcess[str]':
253 Run QEMU_IMG_PROG and return its status code and console output.
255 This function always prepends QEMU_IMG_OPTIONS and may further alter
256 the args for 'create' commands.
258 See `qemu_tool()` for greater detail.
260 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
261 return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio)
264 def ordered_qmp(qmsg, conv_keys=True):
265 # Dictionaries are not ordered prior to 3.6, therefore:
266 if isinstance(qmsg, list):
267 return [ordered_qmp(atom) for atom in qmsg]
268 if isinstance(qmsg, dict):
269 od = OrderedDict()
270 for k, v in sorted(qmsg.items()):
271 if conv_keys:
272 k = k.replace('_', '-')
273 od[k] = ordered_qmp(v, conv_keys=False)
274 return od
275 return qmsg
277 def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]':
278 return qemu_img('create', *args)
280 def qemu_img_json(*args: str) -> Any:
282 Run qemu-img and return its output as deserialized JSON.
284 :raise CalledProcessError:
285 When qemu-img crashes, or returns a non-zero exit code without
286 producing a valid JSON document to stdout.
287 :raise JSONDecoderError:
288 When qemu-img returns 0, but failed to produce a valid JSON document.
290 :return: A deserialized JSON object; probably a dict[str, Any].
292 try:
293 res = qemu_img(*args, combine_stdio=False)
294 except subprocess.CalledProcessError as exc:
295 # Terminated due to signal. Don't bother.
296 if exc.returncode < 0:
297 raise
299 # Commands like 'check' can return failure (exit codes 2 and 3)
300 # to indicate command completion, but with errors found. For
301 # multi-command flexibility, ignore the exact error codes and
302 # *try* to load JSON.
303 try:
304 return json.loads(exc.stdout)
305 except json.JSONDecodeError:
306 # Nope. This thing is toast. Raise the /process/ error.
307 pass
308 raise
310 return json.loads(res.stdout)
312 def qemu_img_measure(*args: str) -> Any:
313 return qemu_img_json("measure", "--output", "json", *args)
315 def qemu_img_check(*args: str) -> Any:
316 return qemu_img_json("check", "--output", "json", *args)
318 def qemu_img_info(*args: str) -> Any:
319 return qemu_img_json('info', "--output", "json", *args)
321 def qemu_img_map(*args: str) -> Any:
322 return qemu_img_json('map', "--output", "json", *args)
324 def qemu_img_log(*args: str, check: bool = True
325 ) -> 'subprocess.CompletedProcess[str]':
326 result = qemu_img(*args, check=check)
327 log(result.stdout, filters=[filter_testfiles])
328 return result
330 def img_info_log(filename: str, filter_path: Optional[str] = None,
331 use_image_opts: bool = False, extra_args: Sequence[str] = (),
332 check: bool = True,
333 ) -> None:
334 args = ['info']
335 if use_image_opts:
336 args.append('--image-opts')
337 else:
338 args += ['-f', imgfmt]
339 args += extra_args
340 args.append(filename)
342 output = qemu_img(*args, check=check).stdout
343 if not filter_path:
344 filter_path = filename
345 log(filter_img_info(output, filter_path))
347 def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
348 if '-f' in args or '--image-opts' in args:
349 return qemu_io_args_no_fmt + list(args)
350 else:
351 return qemu_io_args + list(args)
353 def qemu_io_popen(*args):
354 return qemu_tool_popen(qemu_io_wrap_args(args))
356 def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True
357 ) -> 'subprocess.CompletedProcess[str]':
359 Run QEMU_IO_PROG and return the status code and console output.
361 This function always prepends either QEMU_IO_OPTIONS or
362 QEMU_IO_OPTIONS_NO_FMT.
364 return qemu_tool(*qemu_io_wrap_args(args),
365 check=check, combine_stdio=combine_stdio)
367 def qemu_io_pipe_and_status(*args):
368 return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))
370 def qemu_io_log(*args: str) -> 'subprocess.CompletedProcess[str]':
371 result = qemu_io(*args, check=False)
372 log(result.stdout, filters=[filter_testfiles, filter_qemu_io])
373 return result
375 def qemu_io_silent(*args):
376 '''Run qemu-io and return the exit code, suppressing stdout'''
377 args = qemu_io_wrap_args(args)
378 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
379 if result.returncode < 0:
380 sys.stderr.write('qemu-io received signal %i: %s\n' %
381 (-result.returncode, ' '.join(args)))
382 return result.returncode
384 def qemu_io_silent_check(*args):
385 '''Run qemu-io and return the true if subprocess returned 0'''
386 args = qemu_io_wrap_args(args)
387 result = subprocess.run(args, stdout=subprocess.DEVNULL,
388 stderr=subprocess.STDOUT, check=False)
389 return result.returncode == 0
391 class QemuIoInteractive:
392 def __init__(self, *args):
393 self.args = qemu_io_wrap_args(args)
394 # We need to keep the Popen objext around, and not
395 # close it immediately. Therefore, disable the pylint check:
396 # pylint: disable=consider-using-with
397 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
398 stdout=subprocess.PIPE,
399 stderr=subprocess.STDOUT,
400 universal_newlines=True)
401 out = self._p.stdout.read(9)
402 if out != 'qemu-io> ':
403 # Most probably qemu-io just failed to start.
404 # Let's collect the whole output and exit.
405 out += self._p.stdout.read()
406 self._p.wait(timeout=1)
407 raise ValueError(out)
409 def close(self):
410 self._p.communicate('q\n')
412 def _read_output(self):
413 pattern = 'qemu-io> '
414 n = len(pattern)
415 pos = 0
416 s = []
417 while pos != n:
418 c = self._p.stdout.read(1)
419 # check unexpected EOF
420 assert c != ''
421 s.append(c)
422 if c == pattern[pos]:
423 pos += 1
424 else:
425 pos = 0
427 return ''.join(s[:-n])
429 def cmd(self, cmd):
430 # quit command is in close(), '\n' is added automatically
431 assert '\n' not in cmd
432 cmd = cmd.strip()
433 assert cmd not in ('q', 'quit')
434 self._p.stdin.write(cmd + '\n')
435 self._p.stdin.flush()
436 return self._read_output()
439 class QemuStorageDaemon:
440 _qmp: Optional[QEMUMonitorProtocol] = None
441 _qmpsock: Optional[str] = None
442 # Python < 3.8 would complain if this type were not a string literal
443 # (importing `annotations` from `__future__` would work; but not on <= 3.6)
444 _p: 'Optional[subprocess.Popen[bytes]]' = None
446 def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
447 assert '--pidfile' not in args
448 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
449 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
451 if qmp:
452 self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
453 all_args += ['--chardev',
454 f'socket,id=qmp-sock,path={self._qmpsock}',
455 '--monitor', 'qmp-sock']
457 self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True)
459 # Cannot use with here, we want the subprocess to stay around
460 # pylint: disable=consider-using-with
461 self._p = subprocess.Popen(all_args)
462 if self._qmp is not None:
463 self._qmp.accept()
464 while not os.path.exists(self.pidfile):
465 if self._p.poll() is not None:
466 cmd = ' '.join(all_args)
467 raise RuntimeError(
468 'qemu-storage-daemon terminated with exit code ' +
469 f'{self._p.returncode}: {cmd}')
471 time.sleep(0.01)
473 with open(self.pidfile, encoding='utf-8') as f:
474 self._pid = int(f.read().strip())
476 assert self._pid == self._p.pid
478 def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \
479 -> QMPMessage:
480 assert self._qmp is not None
481 return self._qmp.cmd(cmd, args)
483 def stop(self, kill_signal=15):
484 self._p.send_signal(kill_signal)
485 self._p.wait()
486 self._p = None
488 if self._qmp:
489 self._qmp.close()
491 if self._qmpsock is not None:
492 try:
493 os.remove(self._qmpsock)
494 except OSError:
495 pass
496 try:
497 os.remove(self.pidfile)
498 except OSError:
499 pass
501 def __del__(self):
502 if self._p is not None:
503 self.stop(kill_signal=9)
506 def qemu_nbd(*args):
507 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
508 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
510 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
511 '''Run qemu-nbd in daemon mode and return both the parent's exit code
512 and its output in case of an error'''
513 full_args = qemu_nbd_args + ['--fork'] + list(args)
514 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
515 connect_stderr=False)
516 return returncode, output if returncode else ''
518 def qemu_nbd_list_log(*args: str) -> str:
519 '''Run qemu-nbd to list remote exports'''
520 full_args = [qemu_nbd_prog, '-L'] + list(args)
521 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
522 log(output, filters=[filter_testfiles, filter_nbd_exports])
523 return output
525 @contextmanager
526 def qemu_nbd_popen(*args):
527 '''Context manager running qemu-nbd within the context'''
528 pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
530 assert not os.path.exists(pid_file)
532 cmd = list(qemu_nbd_args)
533 cmd.extend(('--persistent', '--pid-file', pid_file))
534 cmd.extend(args)
536 log('Start NBD server')
537 with subprocess.Popen(cmd) as p:
538 try:
539 while not os.path.exists(pid_file):
540 if p.poll() is not None:
541 raise RuntimeError(
542 "qemu-nbd terminated with exit code {}: {}"
543 .format(p.returncode, ' '.join(cmd)))
545 time.sleep(0.01)
546 yield
547 finally:
548 if os.path.exists(pid_file):
549 os.remove(pid_file)
550 log('Kill NBD server')
551 p.kill()
552 p.wait()
554 def compare_images(img1: str, img2: str,
555 fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool:
557 Compare two images with QEMU_IMG; return True if they are identical.
559 :raise CalledProcessError:
560 when qemu-img crashes or returns a status code of anything other
561 than 0 (identical) or 1 (different).
563 try:
564 qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2)
565 return True
566 except subprocess.CalledProcessError as exc:
567 if exc.returncode == 1:
568 return False
569 raise
571 def create_image(name, size):
572 '''Create a fully-allocated raw image with sector markers'''
573 with open(name, 'wb') as file:
574 i = 0
575 while i < size:
576 sector = struct.pack('>l504xl', i // 512, i // 512)
577 file.write(sector)
578 i = i + 512
580 def image_size(img: str) -> int:
581 """Return image's virtual size"""
582 value = qemu_img_info('-f', imgfmt, img)['virtual-size']
583 if not isinstance(value, int):
584 type_name = type(value).__name__
585 raise TypeError("Expected 'int' for 'virtual-size', "
586 f"got '{value}' of type '{type_name}'")
587 return value
589 def is_str(val):
590 return isinstance(val, str)
592 test_dir_re = re.compile(r"%s" % test_dir)
593 def filter_test_dir(msg):
594 return test_dir_re.sub("TEST_DIR", msg)
596 win32_re = re.compile(r"\r")
597 def filter_win32(msg):
598 return win32_re.sub("", msg)
600 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
601 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
602 r"and [0-9\/.inf]* ops\/sec\)")
603 def filter_qemu_io(msg):
604 msg = filter_win32(msg)
605 return qemu_io_re.sub("X ops; XX:XX:XX.X "
606 "(XXX YYY/sec and XXX ops/sec)", msg)
608 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
609 def filter_chown(msg):
610 return chown_re.sub("chown UID:GID", msg)
612 def filter_qmp_event(event):
613 '''Filter a QMP event dict'''
614 event = dict(event)
615 if 'timestamp' in event:
616 event['timestamp']['seconds'] = 'SECS'
617 event['timestamp']['microseconds'] = 'USECS'
618 return event
620 def filter_qmp(qmsg, filter_fn):
621 '''Given a string filter, filter a QMP object's values.
622 filter_fn takes a (key, value) pair.'''
623 # Iterate through either lists or dicts;
624 if isinstance(qmsg, list):
625 items = enumerate(qmsg)
626 elif isinstance(qmsg, dict):
627 items = qmsg.items()
628 else:
629 return filter_fn(None, qmsg)
631 for k, v in items:
632 if isinstance(v, (dict, list)):
633 qmsg[k] = filter_qmp(v, filter_fn)
634 else:
635 qmsg[k] = filter_fn(k, v)
636 return qmsg
638 def filter_testfiles(msg):
639 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
640 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
641 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
643 def filter_qmp_testfiles(qmsg):
644 def _filter(_key, value):
645 if is_str(value):
646 return filter_testfiles(value)
647 return value
648 return filter_qmp(qmsg, _filter)
650 def filter_virtio_scsi(output: str) -> str:
651 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
653 def filter_qmp_virtio_scsi(qmsg):
654 def _filter(_key, value):
655 if is_str(value):
656 return filter_virtio_scsi(value)
657 return value
658 return filter_qmp(qmsg, _filter)
660 def filter_generated_node_ids(msg):
661 return re.sub("#block[0-9]+", "NODE_NAME", msg)
663 def filter_img_info(output, filename):
664 lines = []
665 for line in output.split('\n'):
666 if 'disk size' in line or 'actual-size' in line:
667 continue
668 line = line.replace(filename, 'TEST_IMG')
669 line = filter_testfiles(line)
670 line = line.replace(imgfmt, 'IMGFMT')
671 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
672 line = re.sub('uuid: [-a-f0-9]+',
673 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
674 line)
675 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
676 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
677 line)
678 lines.append(line)
679 return '\n'.join(lines)
681 def filter_imgfmt(msg):
682 return msg.replace(imgfmt, 'IMGFMT')
684 def filter_qmp_imgfmt(qmsg):
685 def _filter(_key, value):
686 if is_str(value):
687 return filter_imgfmt(value)
688 return value
689 return filter_qmp(qmsg, _filter)
691 def filter_nbd_exports(output: str) -> str:
692 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
695 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
697 def log(msg: Msg,
698 filters: Iterable[Callable[[Msg], Msg]] = (),
699 indent: Optional[int] = None) -> None:
701 Logs either a string message or a JSON serializable message (like QMP).
702 If indent is provided, JSON serializable messages are pretty-printed.
704 for flt in filters:
705 msg = flt(msg)
706 if isinstance(msg, (dict, list)):
707 # Don't sort if it's already sorted
708 do_sort = not isinstance(msg, OrderedDict)
709 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
710 else:
711 test_logger.info(msg)
713 class Timeout:
714 def __init__(self, seconds, errmsg="Timeout"):
715 self.seconds = seconds
716 self.errmsg = errmsg
717 def __enter__(self):
718 if qemu_gdb or qemu_valgrind:
719 return self
720 signal.signal(signal.SIGALRM, self.timeout)
721 signal.setitimer(signal.ITIMER_REAL, self.seconds)
722 return self
723 def __exit__(self, exc_type, value, traceback):
724 if qemu_gdb or qemu_valgrind:
725 return False
726 signal.setitimer(signal.ITIMER_REAL, 0)
727 return False
728 def timeout(self, signum, frame):
729 raise Exception(self.errmsg)
731 def file_pattern(name):
732 return "{0}-{1}".format(os.getpid(), name)
734 class FilePath:
736 Context manager generating multiple file names. The generated files are
737 removed when exiting the context.
739 Example usage:
741 with FilePath('a.img', 'b.img') as (img_a, img_b):
742 # Use img_a and img_b here...
744 # a.img and b.img are automatically removed here.
746 By default images are created in iotests.test_dir. To create sockets use
747 iotests.sock_dir:
749 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
751 For convenience, calling with one argument yields a single file instead of
752 a tuple with one item.
755 def __init__(self, *names, base_dir=test_dir):
756 self.paths = [os.path.join(base_dir, file_pattern(name))
757 for name in names]
759 def __enter__(self):
760 if len(self.paths) == 1:
761 return self.paths[0]
762 else:
763 return self.paths
765 def __exit__(self, exc_type, exc_val, exc_tb):
766 for path in self.paths:
767 try:
768 os.remove(path)
769 except OSError:
770 pass
771 return False
774 def try_remove(img):
775 try:
776 os.remove(img)
777 except OSError:
778 pass
780 def file_path_remover():
781 for path in reversed(file_path_remover.paths):
782 try_remove(path)
785 def file_path(*names, base_dir=test_dir):
786 ''' Another way to get auto-generated filename that cleans itself up.
788 Use is as simple as:
790 img_a, img_b = file_path('a.img', 'b.img')
791 sock = file_path('socket')
794 if not hasattr(file_path_remover, 'paths'):
795 file_path_remover.paths = []
796 atexit.register(file_path_remover)
798 paths = []
799 for name in names:
800 filename = file_pattern(name)
801 path = os.path.join(base_dir, filename)
802 file_path_remover.paths.append(path)
803 paths.append(path)
805 return paths[0] if len(paths) == 1 else paths
807 def remote_filename(path):
808 if imgproto == 'file':
809 return path
810 elif imgproto == 'ssh':
811 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
812 else:
813 raise Exception("Protocol %s not supported" % (imgproto))
815 class VM(qtest.QEMUQtestMachine):
816 '''A QEMU VM'''
818 def __init__(self, path_suffix=''):
819 name = "qemu%s-%d" % (path_suffix, os.getpid())
820 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
821 if qemu_gdb and qemu_valgrind:
822 sys.stderr.write('gdb and valgrind are mutually exclusive\n')
823 sys.exit(1)
824 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
825 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
826 name=name,
827 base_temp_dir=test_dir,
828 sock_dir=sock_dir, qmp_timer=timer)
829 self._num_drives = 0
831 def _post_shutdown(self) -> None:
832 super()._post_shutdown()
833 if not qemu_valgrind or not self._popen:
834 return
835 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
836 if self.exitcode() == 99:
837 with open(valgrind_filename, encoding='utf-8') as f:
838 print(f.read())
839 else:
840 os.remove(valgrind_filename)
842 def _pre_launch(self) -> None:
843 super()._pre_launch()
844 if qemu_print:
845 # set QEMU binary output to stdout
846 self._close_qemu_log_file()
848 def add_object(self, opts):
849 self._args.append('-object')
850 self._args.append(opts)
851 return self
853 def add_device(self, opts):
854 self._args.append('-device')
855 self._args.append(opts)
856 return self
858 def add_drive_raw(self, opts):
859 self._args.append('-drive')
860 self._args.append(opts)
861 return self
863 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
864 '''Add a virtio-blk drive to the VM'''
865 options = ['if=%s' % interface,
866 'id=drive%d' % self._num_drives]
868 if path is not None:
869 options.append('file=%s' % path)
870 options.append('format=%s' % img_format)
871 options.append('cache=%s' % cachemode)
872 options.append('aio=%s' % aiomode)
874 if opts:
875 options.append(opts)
877 if img_format == 'luks' and 'key-secret' not in opts:
878 # default luks support
879 if luks_default_secret_object not in self._args:
880 self.add_object(luks_default_secret_object)
882 options.append(luks_default_key_secret_opt)
884 self._args.append('-drive')
885 self._args.append(','.join(options))
886 self._num_drives += 1
887 return self
889 def add_blockdev(self, opts):
890 self._args.append('-blockdev')
891 if isinstance(opts, str):
892 self._args.append(opts)
893 else:
894 self._args.append(','.join(opts))
895 return self
897 def add_incoming(self, addr):
898 self._args.append('-incoming')
899 self._args.append(addr)
900 return self
902 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
903 cmd = 'human-monitor-command'
904 kwargs: Dict[str, Any] = {'command-line': command_line}
905 if use_log:
906 return self.qmp_log(cmd, **kwargs)
907 else:
908 return self.qmp(cmd, **kwargs)
910 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
911 """Pause drive r/w operations"""
912 if not event:
913 self.pause_drive(drive, "read_aio")
914 self.pause_drive(drive, "write_aio")
915 return
916 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
918 def resume_drive(self, drive: str) -> None:
919 """Resume drive r/w operations"""
920 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
922 def hmp_qemu_io(self, drive: str, cmd: str,
923 use_log: bool = False, qdev: bool = False) -> QMPMessage:
924 """Write to a given drive using an HMP command"""
925 d = '-d ' if qdev else ''
926 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
928 def flatten_qmp_object(self, obj, output=None, basestr=''):
929 if output is None:
930 output = {}
931 if isinstance(obj, list):
932 for i, item in enumerate(obj):
933 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
934 elif isinstance(obj, dict):
935 for key in obj:
936 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
937 else:
938 output[basestr[:-1]] = obj # Strip trailing '.'
939 return output
941 def qmp_to_opts(self, obj):
942 obj = self.flatten_qmp_object(obj)
943 output_list = []
944 for key in obj:
945 output_list += [key + '=' + obj[key]]
946 return ','.join(output_list)
948 def get_qmp_events_filtered(self, wait=60.0):
949 result = []
950 for ev in self.get_qmp_events(wait=wait):
951 result.append(filter_qmp_event(ev))
952 return result
954 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
955 full_cmd = OrderedDict((
956 ("execute", cmd),
957 ("arguments", ordered_qmp(kwargs))
959 log(full_cmd, filters, indent=indent)
960 result = self.qmp(cmd, **kwargs)
961 log(result, filters, indent=indent)
962 return result
964 # Returns None on success, and an error string on failure
965 def run_job(self, job: str, auto_finalize: bool = True,
966 auto_dismiss: bool = False,
967 pre_finalize: Optional[Callable[[], None]] = None,
968 cancel: bool = False, wait: float = 60.0,
969 filters: Iterable[Callable[[Any], Any]] = (),
970 ) -> Optional[str]:
972 run_job moves a job from creation through to dismissal.
974 :param job: String. ID of recently-launched job
975 :param auto_finalize: Bool. True if the job was launched with
976 auto_finalize. Defaults to True.
977 :param auto_dismiss: Bool. True if the job was launched with
978 auto_dismiss=True. Defaults to False.
979 :param pre_finalize: Callback. A callable that takes no arguments to be
980 invoked prior to issuing job-finalize, if any.
981 :param cancel: Bool. When true, cancels the job after the pre_finalize
982 callback.
983 :param wait: Float. Timeout value specifying how long to wait for any
984 event, in seconds. Defaults to 60.0.
986 match_device = {'data': {'device': job}}
987 match_id = {'data': {'id': job}}
988 events = [
989 ('BLOCK_JOB_COMPLETED', match_device),
990 ('BLOCK_JOB_CANCELLED', match_device),
991 ('BLOCK_JOB_ERROR', match_device),
992 ('BLOCK_JOB_READY', match_device),
993 ('BLOCK_JOB_PENDING', match_id),
994 ('JOB_STATUS_CHANGE', match_id)
996 error = None
997 while True:
998 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
999 if ev['event'] != 'JOB_STATUS_CHANGE':
1000 log(ev, filters=filters)
1001 continue
1002 status = ev['data']['status']
1003 if status == 'aborting':
1004 result = self.qmp('query-jobs')
1005 for j in result['return']:
1006 if j['id'] == job:
1007 error = j['error']
1008 log('Job failed: %s' % (j['error']), filters=filters)
1009 elif status == 'ready':
1010 self.qmp_log('job-complete', id=job, filters=filters)
1011 elif status == 'pending' and not auto_finalize:
1012 if pre_finalize:
1013 pre_finalize()
1014 if cancel:
1015 self.qmp_log('job-cancel', id=job, filters=filters)
1016 else:
1017 self.qmp_log('job-finalize', id=job, filters=filters)
1018 elif status == 'concluded' and not auto_dismiss:
1019 self.qmp_log('job-dismiss', id=job, filters=filters)
1020 elif status == 'null':
1021 return error
1023 # Returns None on success, and an error string on failure
1024 def blockdev_create(self, options, job_id='job0', filters=None):
1025 if filters is None:
1026 filters = [filter_qmp_testfiles]
1027 result = self.qmp_log('blockdev-create', filters=filters,
1028 job_id=job_id, options=options)
1030 if 'return' in result:
1031 assert result['return'] == {}
1032 job_result = self.run_job(job_id, filters=filters)
1033 else:
1034 job_result = result['error']
1036 log("")
1037 return job_result
1039 def enable_migration_events(self, name):
1040 log('Enabling migration QMP events on %s...' % name)
1041 log(self.qmp('migrate-set-capabilities', capabilities=[
1043 'capability': 'events',
1044 'state': True
1048 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
1049 while True:
1050 event = self.event_wait('MIGRATION')
1051 # We use the default timeout, and with a timeout, event_wait()
1052 # never returns None
1053 assert event
1055 log(event, filters=[filter_qmp_event])
1056 if event['data']['status'] in ('completed', 'failed'):
1057 break
1059 if event['data']['status'] == 'completed':
1060 # The event may occur in finish-migrate, so wait for the expected
1061 # post-migration runstate
1062 runstate = None
1063 while runstate != expect_runstate:
1064 runstate = self.qmp('query-status')['return']['status']
1065 return True
1066 else:
1067 return False
1069 def node_info(self, node_name):
1070 nodes = self.qmp('query-named-block-nodes')
1071 for x in nodes['return']:
1072 if x['node-name'] == node_name:
1073 return x
1074 return None
1076 def query_bitmaps(self):
1077 res = self.qmp("query-named-block-nodes")
1078 return {device['node-name']: device['dirty-bitmaps']
1079 for device in res['return'] if 'dirty-bitmaps' in device}
1081 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
1083 get a specific bitmap from the object returned by query_bitmaps.
1084 :param recording: If specified, filter results by the specified value.
1085 :param bitmaps: If specified, use it instead of call query_bitmaps()
1087 if bitmaps is None:
1088 bitmaps = self.query_bitmaps()
1090 for bitmap in bitmaps[node_name]:
1091 if bitmap.get('name', '') == bitmap_name:
1092 if recording is None or bitmap.get('recording') == recording:
1093 return bitmap
1094 return None
1096 def check_bitmap_status(self, node_name, bitmap_name, fields):
1097 ret = self.get_bitmap(node_name, bitmap_name)
1099 return fields.items() <= ret.items()
1101 def assert_block_path(self, root, path, expected_node, graph=None):
1103 Check whether the node under the given path in the block graph
1104 is @expected_node.
1106 @root is the node name of the node where the @path is rooted.
1108 @path is a string that consists of child names separated by
1109 slashes. It must begin with a slash.
1111 Examples for @root + @path:
1112 - root="qcow2-node", path="/backing/file"
1113 - root="quorum-node", path="/children.2/file"
1115 Hypothetically, @path could be empty, in which case it would
1116 point to @root. However, in practice this case is not useful
1117 and hence not allowed.
1119 @expected_node may be None. (All elements of the path but the
1120 leaf must still exist.)
1122 @graph may be None or the result of an x-debug-query-block-graph
1123 call that has already been performed.
1125 if graph is None:
1126 graph = self.qmp('x-debug-query-block-graph')['return']
1128 iter_path = iter(path.split('/'))
1130 # Must start with a /
1131 assert next(iter_path) == ''
1133 node = next((node for node in graph['nodes'] if node['name'] == root),
1134 None)
1136 # An empty @path is not allowed, so the root node must be present
1137 assert node is not None, 'Root node %s not found' % root
1139 for child_name in iter_path:
1140 assert node is not None, 'Cannot follow path %s%s' % (root, path)
1142 try:
1143 node_id = next(edge['child'] for edge in graph['edges']
1144 if (edge['parent'] == node['id'] and
1145 edge['name'] == child_name))
1147 node = next(node for node in graph['nodes']
1148 if node['id'] == node_id)
1150 except StopIteration:
1151 node = None
1153 if node is None:
1154 assert expected_node is None, \
1155 'No node found under %s (but expected %s)' % \
1156 (path, expected_node)
1157 else:
1158 assert node['name'] == expected_node, \
1159 'Found node %s under %s (but expected %s)' % \
1160 (node['name'], path, expected_node)
1162 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1164 class QMPTestCase(unittest.TestCase):
1165 '''Abstract base class for QMP test cases'''
1167 def __init__(self, *args, **kwargs):
1168 super().__init__(*args, **kwargs)
1169 # Many users of this class set a VM property we rely on heavily
1170 # in the methods below.
1171 self.vm = None
1173 def dictpath(self, d, path):
1174 '''Traverse a path in a nested dict'''
1175 for component in path.split('/'):
1176 m = index_re.match(component)
1177 if m:
1178 component, idx = m.groups()
1179 idx = int(idx)
1181 if not isinstance(d, dict) or component not in d:
1182 self.fail(f'failed path traversal for "{path}" in "{d}"')
1183 d = d[component]
1185 if m:
1186 if not isinstance(d, list):
1187 self.fail(f'path component "{component}" in "{path}" '
1188 f'is not a list in "{d}"')
1189 try:
1190 d = d[idx]
1191 except IndexError:
1192 self.fail(f'invalid index "{idx}" in path "{path}" '
1193 f'in "{d}"')
1194 return d
1196 def assert_qmp_absent(self, d, path):
1197 try:
1198 result = self.dictpath(d, path)
1199 except AssertionError:
1200 return
1201 self.fail('path "%s" has value "%s"' % (path, str(result)))
1203 def assert_qmp(self, d, path, value):
1204 '''Assert that the value for a specific path in a QMP dict
1205 matches. When given a list of values, assert that any of
1206 them matches.'''
1208 result = self.dictpath(d, path)
1210 # [] makes no sense as a list of valid values, so treat it as
1211 # an actual single value.
1212 if isinstance(value, list) and value != []:
1213 for v in value:
1214 if result == v:
1215 return
1216 self.fail('no match for "%s" in %s' % (str(result), str(value)))
1217 else:
1218 self.assertEqual(result, value,
1219 '"%s" is "%s", expected "%s"'
1220 % (path, str(result), str(value)))
1222 def assert_no_active_block_jobs(self):
1223 result = self.vm.qmp('query-block-jobs')
1224 self.assert_qmp(result, 'return', [])
1226 def assert_has_block_node(self, node_name=None, file_name=None):
1227 """Issue a query-named-block-nodes and assert node_name and/or
1228 file_name is present in the result"""
1229 def check_equal_or_none(a, b):
1230 return a is None or b is None or a == b
1231 assert node_name or file_name
1232 result = self.vm.qmp('query-named-block-nodes')
1233 for x in result["return"]:
1234 if check_equal_or_none(x.get("node-name"), node_name) and \
1235 check_equal_or_none(x.get("file"), file_name):
1236 return
1237 self.fail("Cannot find %s %s in result:\n%s" %
1238 (node_name, file_name, result))
1240 def assert_json_filename_equal(self, json_filename, reference):
1241 '''Asserts that the given filename is a json: filename and that its
1242 content is equal to the given reference object'''
1243 self.assertEqual(json_filename[:5], 'json:')
1244 self.assertEqual(
1245 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1246 self.vm.flatten_qmp_object(reference)
1249 def cancel_and_wait(self, drive='drive0', force=False,
1250 resume=False, wait=60.0):
1251 '''Cancel a block job and wait for it to finish, returning the event'''
1252 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1253 self.assert_qmp(result, 'return', {})
1255 if resume:
1256 self.vm.resume_drive(drive)
1258 cancelled = False
1259 result = None
1260 while not cancelled:
1261 for event in self.vm.get_qmp_events(wait=wait):
1262 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1263 event['event'] == 'BLOCK_JOB_CANCELLED':
1264 self.assert_qmp(event, 'data/device', drive)
1265 result = event
1266 cancelled = True
1267 elif event['event'] == 'JOB_STATUS_CHANGE':
1268 self.assert_qmp(event, 'data/id', drive)
1271 self.assert_no_active_block_jobs()
1272 return result
1274 def wait_until_completed(self, drive='drive0', check_offset=True,
1275 wait=60.0, error=None):
1276 '''Wait for a block job to finish, returning the event'''
1277 while True:
1278 for event in self.vm.get_qmp_events(wait=wait):
1279 if event['event'] == 'BLOCK_JOB_COMPLETED':
1280 self.assert_qmp(event, 'data/device', drive)
1281 if error is None:
1282 self.assert_qmp_absent(event, 'data/error')
1283 if check_offset:
1284 self.assert_qmp(event, 'data/offset',
1285 event['data']['len'])
1286 else:
1287 self.assert_qmp(event, 'data/error', error)
1288 self.assert_no_active_block_jobs()
1289 return event
1290 if event['event'] == 'JOB_STATUS_CHANGE':
1291 self.assert_qmp(event, 'data/id', drive)
1293 def wait_ready(self, drive='drive0'):
1294 """Wait until a BLOCK_JOB_READY event, and return the event."""
1295 return self.vm.events_wait([
1296 ('BLOCK_JOB_READY',
1297 {'data': {'type': 'mirror', 'device': drive}}),
1298 ('BLOCK_JOB_READY',
1299 {'data': {'type': 'commit', 'device': drive}})
1302 def wait_ready_and_cancel(self, drive='drive0'):
1303 self.wait_ready(drive=drive)
1304 event = self.cancel_and_wait(drive=drive)
1305 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1306 self.assert_qmp(event, 'data/type', 'mirror')
1307 self.assert_qmp(event, 'data/offset', event['data']['len'])
1309 def complete_and_wait(self, drive='drive0', wait_ready=True,
1310 completion_error=None):
1311 '''Complete a block job and wait for it to finish'''
1312 if wait_ready:
1313 self.wait_ready(drive=drive)
1315 result = self.vm.qmp('block-job-complete', device=drive)
1316 self.assert_qmp(result, 'return', {})
1318 event = self.wait_until_completed(drive=drive, error=completion_error)
1319 self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1321 def pause_wait(self, job_id='job0'):
1322 with Timeout(3, "Timeout waiting for job to pause"):
1323 while True:
1324 result = self.vm.qmp('query-block-jobs')
1325 found = False
1326 for job in result['return']:
1327 if job['device'] == job_id:
1328 found = True
1329 if job['paused'] and not job['busy']:
1330 return job
1331 break
1332 assert found
1334 def pause_job(self, job_id='job0', wait=True):
1335 result = self.vm.qmp('block-job-pause', device=job_id)
1336 self.assert_qmp(result, 'return', {})
1337 if wait:
1338 return self.pause_wait(job_id)
1339 return result
1341 def case_skip(self, reason):
1342 '''Skip this test case'''
1343 case_notrun(reason)
1344 self.skipTest(reason)
1347 def notrun(reason):
1348 '''Skip this test suite'''
1349 # Each test in qemu-iotests has a number ("seq")
1350 seq = os.path.basename(sys.argv[0])
1352 with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \
1353 as outfile:
1354 outfile.write(reason + '\n')
1355 logger.warning("%s not run: %s", seq, reason)
1356 sys.exit(0)
1358 def case_notrun(reason):
1359 '''Mark this test case as not having been run (without actually
1360 skipping it, that is left to the caller). See
1361 QMPTestCase.case_skip() for a variant that actually skips the
1362 current test case.'''
1364 # Each test in qemu-iotests has a number ("seq")
1365 seq = os.path.basename(sys.argv[0])
1367 with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \
1368 as outfile:
1369 outfile.write(' [case not run] ' + reason + '\n')
1371 def _verify_image_format(supported_fmts: Sequence[str] = (),
1372 unsupported_fmts: Sequence[str] = ()) -> None:
1373 if 'generic' in supported_fmts and \
1374 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1375 # similar to
1376 # _supported_fmt generic
1377 # for bash tests
1378 supported_fmts = ()
1380 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1381 if not_sup or (imgfmt in unsupported_fmts):
1382 notrun('not suitable for this image format: %s' % imgfmt)
1384 if imgfmt == 'luks':
1385 verify_working_luks()
1387 def _verify_protocol(supported: Sequence[str] = (),
1388 unsupported: Sequence[str] = ()) -> None:
1389 assert not (supported and unsupported)
1391 if 'generic' in supported:
1392 return
1394 not_sup = supported and (imgproto not in supported)
1395 if not_sup or (imgproto in unsupported):
1396 notrun('not suitable for this protocol: %s' % imgproto)
1398 def _verify_platform(supported: Sequence[str] = (),
1399 unsupported: Sequence[str] = ()) -> None:
1400 if any((sys.platform.startswith(x) for x in unsupported)):
1401 notrun('not suitable for this OS: %s' % sys.platform)
1403 if supported:
1404 if not any((sys.platform.startswith(x) for x in supported)):
1405 notrun('not suitable for this OS: %s' % sys.platform)
1407 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1408 if supported_cache_modes and (cachemode not in supported_cache_modes):
1409 notrun('not suitable for this cache mode: %s' % cachemode)
1411 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1412 if supported_aio_modes and (aiomode not in supported_aio_modes):
1413 notrun('not suitable for this aio mode: %s' % aiomode)
1415 def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1416 usf_list = list(set(required_formats) - set(supported_formats()))
1417 if usf_list:
1418 notrun(f'formats {usf_list} are not whitelisted')
1421 def _verify_virtio_blk() -> None:
1422 out = qemu_pipe('-M', 'none', '-device', 'help')
1423 if 'virtio-blk' not in out:
1424 notrun('Missing virtio-blk in QEMU binary')
1426 def _verify_virtio_scsi_pci_or_ccw() -> None:
1427 out = qemu_pipe('-M', 'none', '-device', 'help')
1428 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1429 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1432 def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1433 imgopts = os.environ.get('IMGOPTS')
1434 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1435 # but it supported only for bash tests. We don't have a concept of global
1436 # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1437 # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1438 unsup = list(unsupported) + ['$']
1439 if imgopts and any(x in imgopts for x in unsup):
1440 notrun(f'not suitable for this imgopts: {imgopts}')
1443 def supports_quorum() -> bool:
1444 return 'quorum' in qemu_img('--help').stdout
1446 def verify_quorum():
1447 '''Skip test suite if quorum support is not available'''
1448 if not supports_quorum():
1449 notrun('quorum support missing')
1451 def has_working_luks() -> Tuple[bool, str]:
1453 Check whether our LUKS driver can actually create images
1454 (this extends to LUKS encryption for qcow2).
1456 If not, return the reason why.
1459 img_file = f'{test_dir}/luks-test.luks'
1460 res = qemu_img('create', '-f', 'luks',
1461 '--object', luks_default_secret_object,
1462 '-o', luks_default_key_secret_opt,
1463 '-o', 'iter-time=10',
1464 img_file, '1G',
1465 check=False)
1466 try:
1467 os.remove(img_file)
1468 except OSError:
1469 pass
1471 if res.returncode:
1472 reason = res.stdout
1473 for line in res.stdout.splitlines():
1474 if img_file + ':' in line:
1475 reason = line.split(img_file + ':', 1)[1].strip()
1476 break
1478 return (False, reason)
1479 else:
1480 return (True, '')
1482 def verify_working_luks():
1484 Skip test suite if LUKS does not work
1486 (working, reason) = has_working_luks()
1487 if not working:
1488 notrun(reason)
1490 def supports_qcow2_zstd_compression() -> bool:
1491 img_file = f'{test_dir}/qcow2-zstd-test.qcow2'
1492 res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd',
1493 img_file, '0',
1494 check=False)
1495 try:
1496 os.remove(img_file)
1497 except OSError:
1498 pass
1500 if res.returncode == 1 and \
1501 "'compression-type' does not accept value 'zstd'" in res.stdout:
1502 return False
1503 else:
1504 return True
1506 def verify_qcow2_zstd_compression():
1507 if not supports_qcow2_zstd_compression():
1508 notrun('zstd compression not supported')
1510 def qemu_pipe(*args: str) -> str:
1512 Run qemu with an option to print something and exit (e.g. a help option).
1514 :return: QEMU's stdout output.
1516 full_args = [qemu_prog] + qemu_opts + list(args)
1517 output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1518 return output
1520 def supported_formats(read_only=False):
1521 '''Set 'read_only' to True to check ro-whitelist
1522 Otherwise, rw-whitelist is checked'''
1524 if not hasattr(supported_formats, "formats"):
1525 supported_formats.formats = {}
1527 if read_only not in supported_formats.formats:
1528 format_message = qemu_pipe("-drive", "format=help")
1529 line = 1 if read_only else 0
1530 supported_formats.formats[read_only] = \
1531 format_message.splitlines()[line].split(":")[1].split()
1533 return supported_formats.formats[read_only]
1535 def skip_if_unsupported(required_formats=(), read_only=False):
1536 '''Skip Test Decorator
1537 Runs the test if all the required formats are whitelisted'''
1538 def skip_test_decorator(func):
1539 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1540 **kwargs: Dict[str, Any]) -> None:
1541 if callable(required_formats):
1542 fmts = required_formats(test_case)
1543 else:
1544 fmts = required_formats
1546 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1547 if usf_list:
1548 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1549 test_case.case_skip(msg)
1550 else:
1551 func(test_case, *args, **kwargs)
1552 return func_wrapper
1553 return skip_test_decorator
1555 def skip_for_formats(formats: Sequence[str] = ()) \
1556 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1557 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1558 '''Skip Test Decorator
1559 Skips the test for the given formats'''
1560 def skip_test_decorator(func):
1561 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1562 **kwargs: Dict[str, Any]) -> None:
1563 if imgfmt in formats:
1564 msg = f'{test_case}: Skipped for format {imgfmt}'
1565 test_case.case_skip(msg)
1566 else:
1567 func(test_case, *args, **kwargs)
1568 return func_wrapper
1569 return skip_test_decorator
1571 def skip_if_user_is_root(func):
1572 '''Skip Test Decorator
1573 Runs the test only without root permissions'''
1574 def func_wrapper(*args, **kwargs):
1575 if os.getuid() == 0:
1576 case_notrun('{}: cannot be run as root'.format(args[0]))
1577 return None
1578 else:
1579 return func(*args, **kwargs)
1580 return func_wrapper
1582 # We need to filter out the time taken from the output so that
1583 # qemu-iotest can reliably diff the results against master output,
1584 # and hide skipped tests from the reference output.
1586 class ReproducibleTestResult(unittest.TextTestResult):
1587 def addSkip(self, test, reason):
1588 # Same as TextTestResult, but print dot instead of "s"
1589 unittest.TestResult.addSkip(self, test, reason)
1590 if self.showAll:
1591 self.stream.writeln("skipped {0!r}".format(reason))
1592 elif self.dots:
1593 self.stream.write(".")
1594 self.stream.flush()
1596 class ReproducibleStreamWrapper:
1597 def __init__(self, stream: TextIO):
1598 self.stream = stream
1600 def __getattr__(self, attr):
1601 if attr in ('stream', '__getstate__'):
1602 raise AttributeError(attr)
1603 return getattr(self.stream, attr)
1605 def write(self, arg=None):
1606 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1607 arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1608 self.stream.write(arg)
1610 class ReproducibleTestRunner(unittest.TextTestRunner):
1611 def __init__(self, stream: Optional[TextIO] = None,
1612 resultclass: Type[unittest.TestResult] =
1613 ReproducibleTestResult,
1614 **kwargs: Any) -> None:
1615 rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1616 super().__init__(stream=rstream, # type: ignore
1617 descriptions=True,
1618 resultclass=resultclass,
1619 **kwargs)
1621 def execute_unittest(argv: List[str], debug: bool = False) -> None:
1622 """Executes unittests within the calling module."""
1624 # Some tests have warnings, especially ResourceWarnings for unclosed
1625 # files and sockets. Ignore them for now to ensure reproducibility of
1626 # the test output.
1627 unittest.main(argv=argv,
1628 testRunner=ReproducibleTestRunner,
1629 verbosity=2 if debug else 1,
1630 warnings=None if sys.warnoptions else 'ignore')
1632 def execute_setup_common(supported_fmts: Sequence[str] = (),
1633 supported_platforms: Sequence[str] = (),
1634 supported_cache_modes: Sequence[str] = (),
1635 supported_aio_modes: Sequence[str] = (),
1636 unsupported_fmts: Sequence[str] = (),
1637 supported_protocols: Sequence[str] = (),
1638 unsupported_protocols: Sequence[str] = (),
1639 required_fmts: Sequence[str] = (),
1640 unsupported_imgopts: Sequence[str] = ()) -> bool:
1642 Perform necessary setup for either script-style or unittest-style tests.
1644 :return: Bool; Whether or not debug mode has been requested via the CLI.
1646 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1648 debug = '-d' in sys.argv
1649 if debug:
1650 sys.argv.remove('-d')
1651 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1653 _verify_image_format(supported_fmts, unsupported_fmts)
1654 _verify_protocol(supported_protocols, unsupported_protocols)
1655 _verify_platform(supported=supported_platforms)
1656 _verify_cache_mode(supported_cache_modes)
1657 _verify_aio_mode(supported_aio_modes)
1658 _verify_formats(required_fmts)
1659 _verify_virtio_blk()
1660 _verify_imgopts(unsupported_imgopts)
1662 return debug
1664 def execute_test(*args, test_function=None, **kwargs):
1665 """Run either unittest or script-style tests."""
1667 debug = execute_setup_common(*args, **kwargs)
1668 if not test_function:
1669 execute_unittest(sys.argv, debug)
1670 else:
1671 test_function()
1673 def activate_logging():
1674 """Activate iotests.log() output to stdout for script-style tests."""
1675 handler = logging.StreamHandler(stream=sys.stdout)
1676 formatter = logging.Formatter('%(message)s')
1677 handler.setFormatter(formatter)
1678 test_logger.addHandler(handler)
1679 test_logger.setLevel(logging.INFO)
1680 test_logger.propagate = False
1682 # This is called from script-style iotests without a single point of entry
1683 def script_initialize(*args, **kwargs):
1684 """Initialize script-style tests without running any tests."""
1685 activate_logging()
1686 execute_setup_common(*args, **kwargs)
1688 # This is called from script-style iotests with a single point of entry
1689 def script_main(test_function, *args, **kwargs):
1690 """Run script-style tests outside of the unittest framework"""
1691 activate_logging()
1692 execute_test(*args, test_function=test_function, **kwargs)
1694 # This is called from unittest style iotests
1695 def main(*args, **kwargs):
1696 """Run tests using the unittest framework"""
1697 execute_test(*args, **kwargs)