iotests.py: filter out successful output of qemu-img create
[qemu.git] / tests / qemu-iotests / iotests.py
blob65780c60989ae969c6e47f78c963da13aca99bbd
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import argparse
20 import atexit
21 import bz2
22 from collections import OrderedDict
23 import faulthandler
24 import json
25 import logging
26 import os
27 import re
28 import shutil
29 import signal
30 import struct
31 import subprocess
32 import sys
33 import time
34 from typing import (Any, Callable, Dict, Iterable, Iterator,
35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36 import unittest
38 from contextlib import contextmanager
40 from qemu.machine import qtest
41 from qemu.qmp import QMPMessage
43 # Use this logger for logging messages directly from the iotests module
44 logger = logging.getLogger('qemu.iotests')
45 logger.addHandler(logging.NullHandler())
47 # Use this logger for messages that ought to be used for diff output.
48 test_logger = logging.getLogger('qemu.iotests.diff_io')
51 faulthandler.enable()
53 # This will not work if arguments contain spaces but is necessary if we
54 # want to support the override options that ./check supports.
55 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
56 if os.environ.get('QEMU_IMG_OPTIONS'):
57 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
60 if os.environ.get('QEMU_IO_OPTIONS'):
61 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
65 qemu_io_args_no_fmt += \
66 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
69 qemu_nbd_args = [qemu_nbd_prog]
70 if os.environ.get('QEMU_NBD_OPTIONS'):
71 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
74 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76 gdb_qemu_env = os.environ.get('GDB_OPTIONS')
77 qemu_gdb = []
78 if gdb_qemu_env:
79 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
81 qemu_print = os.environ.get('PRINT_QEMU', False)
83 imgfmt = os.environ.get('IMGFMT', 'raw')
84 imgproto = os.environ.get('IMGPROTO', 'file')
85 output_dir = os.environ.get('OUTPUT_DIR', '.')
87 try:
88 test_dir = os.environ['TEST_DIR']
89 sock_dir = os.environ['SOCK_DIR']
90 cachemode = os.environ['CACHEMODE']
91 aiomode = os.environ['AIOMODE']
92 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
93 except KeyError:
94 # We are using these variables as proxies to indicate that we're
95 # not being run via "check". There may be other things set up by
96 # "check" that individual test cases rely on.
97 sys.stderr.write('Please run this test via the "check" script\n')
98 sys.exit(os.EX_USAGE)
100 qemu_valgrind = []
101 if os.environ.get('VALGRIND_QEMU') == "y" and \
102 os.environ.get('NO_VALGRIND') != "y":
103 valgrind_logfile = "--log-file=" + test_dir
104 # %p allows to put the valgrind process PID, since
105 # we don't know it a priori (subprocess.Popen is
106 # not yet invoked)
107 valgrind_logfile += "/%p.valgrind"
109 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
111 luks_default_secret_object = 'secret,id=keysec0,data=' + \
112 os.environ.get('IMGKEYSECRET', '')
113 luks_default_key_secret_opt = 'key-secret=keysec0'
115 sample_img_dir = os.environ['SAMPLE_IMG_DIR']
118 @contextmanager
119 def change_log_level(
120 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
122 Utility function for temporarily changing the log level of a logger.
124 This can be used to silence errors that are expected or uninteresting.
126 _logger = logging.getLogger(logger_name)
127 current_level = _logger.level
128 _logger.setLevel(level)
130 try:
131 yield
132 finally:
133 _logger.setLevel(current_level)
136 def unarchive_sample_image(sample, fname):
137 sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
138 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
139 shutil.copyfileobj(f_in, f_out)
142 def qemu_tool_popen(args: Sequence[str],
143 connect_stderr: bool = True) -> 'subprocess.Popen[str]':
144 stderr = subprocess.STDOUT if connect_stderr else None
145 # pylint: disable=consider-using-with
146 return subprocess.Popen(args,
147 stdout=subprocess.PIPE,
148 stderr=stderr,
149 universal_newlines=True)
152 def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
153 connect_stderr: bool = True,
154 drop_successful_output: bool = False) \
155 -> Tuple[str, int]:
157 Run a tool and return both its output and its exit code
159 with qemu_tool_popen(args, connect_stderr) as subp:
160 output = subp.communicate()[0]
161 if subp.returncode < 0:
162 cmd = ' '.join(args)
163 sys.stderr.write(f'{tool} received signal \
164 {-subp.returncode}: {cmd}\n')
165 if drop_successful_output and subp.returncode == 0:
166 output = ''
167 return (output, subp.returncode)
169 def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
170 if not args or args[0] != 'create':
171 return list(args)
172 args = args[1:]
174 p = argparse.ArgumentParser(allow_abbrev=False)
175 # -o option may be specified several times
176 p.add_argument('-o', action='append', default=[])
177 p.add_argument('-f')
178 parsed, remaining = p.parse_known_args(args)
180 opts_list = parsed.o
182 result = ['create']
183 if parsed.f is not None:
184 result += ['-f', parsed.f]
186 # IMGOPTS most probably contain options specific for the selected format,
187 # like extended_l2 or compression_type for qcow2. Test may want to create
188 # additional images in other formats that doesn't support these options.
189 # So, use IMGOPTS only for images created in imgfmt format.
190 imgopts = os.environ.get('IMGOPTS')
191 if imgopts and parsed.f == imgfmt:
192 opts_list.insert(0, imgopts)
194 # default luks support
195 if parsed.f == 'luks' and \
196 all('key-secret' not in opts for opts in opts_list):
197 result += ['--object', luks_default_secret_object]
198 opts_list.append(luks_default_key_secret_opt)
200 for opts in opts_list:
201 result += ['-o', opts]
203 result += remaining
205 return result
207 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
209 Run qemu-img and return both its output and its exit code
211 is_create = bool(args and args[0] == 'create')
212 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
213 return qemu_tool_pipe_and_status('qemu-img', full_args,
214 drop_successful_output=is_create)
216 def qemu_img(*args: str) -> int:
217 '''Run qemu-img and return the exit code'''
218 return qemu_img_pipe_and_status(*args)[1]
220 def ordered_qmp(qmsg, conv_keys=True):
221 # Dictionaries are not ordered prior to 3.6, therefore:
222 if isinstance(qmsg, list):
223 return [ordered_qmp(atom) for atom in qmsg]
224 if isinstance(qmsg, dict):
225 od = OrderedDict()
226 for k, v in sorted(qmsg.items()):
227 if conv_keys:
228 k = k.replace('_', '-')
229 od[k] = ordered_qmp(v, conv_keys=False)
230 return od
231 return qmsg
233 def qemu_img_create(*args):
234 return qemu_img('create', *args)
236 def qemu_img_measure(*args):
237 return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
239 def qemu_img_check(*args):
240 return json.loads(qemu_img_pipe("check", "--output", "json", *args))
242 def qemu_img_pipe(*args: str) -> str:
243 '''Run qemu-img and return its output'''
244 return qemu_img_pipe_and_status(*args)[0]
246 def qemu_img_log(*args):
247 result = qemu_img_pipe(*args)
248 log(result, filters=[filter_testfiles])
249 return result
251 def img_info_log(filename, filter_path=None, use_image_opts=False,
252 extra_args=()):
253 args = ['info']
254 if use_image_opts:
255 args.append('--image-opts')
256 else:
257 args += ['-f', imgfmt]
258 args += extra_args
259 args.append(filename)
261 output = qemu_img_pipe(*args)
262 if not filter_path:
263 filter_path = filename
264 log(filter_img_info(output, filter_path))
266 def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
267 if '-f' in args or '--image-opts' in args:
268 return qemu_io_args_no_fmt + list(args)
269 else:
270 return qemu_io_args + list(args)
272 def qemu_io_popen(*args):
273 return qemu_tool_popen(qemu_io_wrap_args(args))
275 def qemu_io(*args):
276 '''Run qemu-io and return the stdout data'''
277 return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0]
279 def qemu_io_log(*args):
280 result = qemu_io(*args)
281 log(result, filters=[filter_testfiles, filter_qemu_io])
282 return result
284 def qemu_io_silent(*args):
285 '''Run qemu-io and return the exit code, suppressing stdout'''
286 args = qemu_io_wrap_args(args)
287 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
288 if result.returncode < 0:
289 sys.stderr.write('qemu-io received signal %i: %s\n' %
290 (-result.returncode, ' '.join(args)))
291 return result.returncode
293 def qemu_io_silent_check(*args):
294 '''Run qemu-io and return the true if subprocess returned 0'''
295 args = qemu_io_wrap_args(args)
296 result = subprocess.run(args, stdout=subprocess.DEVNULL,
297 stderr=subprocess.STDOUT, check=False)
298 return result.returncode == 0
300 class QemuIoInteractive:
301 def __init__(self, *args):
302 self.args = qemu_io_wrap_args(args)
303 # We need to keep the Popen objext around, and not
304 # close it immediately. Therefore, disable the pylint check:
305 # pylint: disable=consider-using-with
306 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
307 stdout=subprocess.PIPE,
308 stderr=subprocess.STDOUT,
309 universal_newlines=True)
310 out = self._p.stdout.read(9)
311 if out != 'qemu-io> ':
312 # Most probably qemu-io just failed to start.
313 # Let's collect the whole output and exit.
314 out += self._p.stdout.read()
315 self._p.wait(timeout=1)
316 raise ValueError(out)
318 def close(self):
319 self._p.communicate('q\n')
321 def _read_output(self):
322 pattern = 'qemu-io> '
323 n = len(pattern)
324 pos = 0
325 s = []
326 while pos != n:
327 c = self._p.stdout.read(1)
328 # check unexpected EOF
329 assert c != ''
330 s.append(c)
331 if c == pattern[pos]:
332 pos += 1
333 else:
334 pos = 0
336 return ''.join(s[:-n])
338 def cmd(self, cmd):
339 # quit command is in close(), '\n' is added automatically
340 assert '\n' not in cmd
341 cmd = cmd.strip()
342 assert cmd not in ('q', 'quit')
343 self._p.stdin.write(cmd + '\n')
344 self._p.stdin.flush()
345 return self._read_output()
348 def qemu_nbd(*args):
349 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
350 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
352 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
353 '''Run qemu-nbd in daemon mode and return both the parent's exit code
354 and its output in case of an error'''
355 full_args = qemu_nbd_args + ['--fork'] + list(args)
356 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
357 connect_stderr=False)
358 return returncode, output if returncode else ''
360 def qemu_nbd_list_log(*args: str) -> str:
361 '''Run qemu-nbd to list remote exports'''
362 full_args = [qemu_nbd_prog, '-L'] + list(args)
363 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
364 log(output, filters=[filter_testfiles, filter_nbd_exports])
365 return output
367 @contextmanager
368 def qemu_nbd_popen(*args):
369 '''Context manager running qemu-nbd within the context'''
370 pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
372 assert not os.path.exists(pid_file)
374 cmd = list(qemu_nbd_args)
375 cmd.extend(('--persistent', '--pid-file', pid_file))
376 cmd.extend(args)
378 log('Start NBD server')
379 with subprocess.Popen(cmd) as p:
380 try:
381 while not os.path.exists(pid_file):
382 if p.poll() is not None:
383 raise RuntimeError(
384 "qemu-nbd terminated with exit code {}: {}"
385 .format(p.returncode, ' '.join(cmd)))
387 time.sleep(0.01)
388 yield
389 finally:
390 if os.path.exists(pid_file):
391 os.remove(pid_file)
392 log('Kill NBD server')
393 p.kill()
394 p.wait()
396 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
397 '''Return True if two image files are identical'''
398 return qemu_img('compare', '-f', fmt1,
399 '-F', fmt2, img1, img2) == 0
401 def create_image(name, size):
402 '''Create a fully-allocated raw image with sector markers'''
403 with open(name, 'wb') as file:
404 i = 0
405 while i < size:
406 sector = struct.pack('>l504xl', i // 512, i // 512)
407 file.write(sector)
408 i = i + 512
410 def image_size(img):
411 '''Return image's virtual size'''
412 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
413 return json.loads(r)['virtual-size']
415 def is_str(val):
416 return isinstance(val, str)
418 test_dir_re = re.compile(r"%s" % test_dir)
419 def filter_test_dir(msg):
420 return test_dir_re.sub("TEST_DIR", msg)
422 win32_re = re.compile(r"\r")
423 def filter_win32(msg):
424 return win32_re.sub("", msg)
426 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
427 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
428 r"and [0-9\/.inf]* ops\/sec\)")
429 def filter_qemu_io(msg):
430 msg = filter_win32(msg)
431 return qemu_io_re.sub("X ops; XX:XX:XX.X "
432 "(XXX YYY/sec and XXX ops/sec)", msg)
434 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
435 def filter_chown(msg):
436 return chown_re.sub("chown UID:GID", msg)
438 def filter_qmp_event(event):
439 '''Filter a QMP event dict'''
440 event = dict(event)
441 if 'timestamp' in event:
442 event['timestamp']['seconds'] = 'SECS'
443 event['timestamp']['microseconds'] = 'USECS'
444 return event
446 def filter_qmp(qmsg, filter_fn):
447 '''Given a string filter, filter a QMP object's values.
448 filter_fn takes a (key, value) pair.'''
449 # Iterate through either lists or dicts;
450 if isinstance(qmsg, list):
451 items = enumerate(qmsg)
452 else:
453 items = qmsg.items()
455 for k, v in items:
456 if isinstance(v, (dict, list)):
457 qmsg[k] = filter_qmp(v, filter_fn)
458 else:
459 qmsg[k] = filter_fn(k, v)
460 return qmsg
462 def filter_testfiles(msg):
463 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
464 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
465 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
467 def filter_qmp_testfiles(qmsg):
468 def _filter(_key, value):
469 if is_str(value):
470 return filter_testfiles(value)
471 return value
472 return filter_qmp(qmsg, _filter)
474 def filter_virtio_scsi(output: str) -> str:
475 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
477 def filter_qmp_virtio_scsi(qmsg):
478 def _filter(_key, value):
479 if is_str(value):
480 return filter_virtio_scsi(value)
481 return value
482 return filter_qmp(qmsg, _filter)
484 def filter_generated_node_ids(msg):
485 return re.sub("#block[0-9]+", "NODE_NAME", msg)
487 def filter_img_info(output, filename):
488 lines = []
489 for line in output.split('\n'):
490 if 'disk size' in line or 'actual-size' in line:
491 continue
492 line = line.replace(filename, 'TEST_IMG')
493 line = filter_testfiles(line)
494 line = line.replace(imgfmt, 'IMGFMT')
495 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
496 line = re.sub('uuid: [-a-f0-9]+',
497 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
498 line)
499 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
500 lines.append(line)
501 return '\n'.join(lines)
503 def filter_imgfmt(msg):
504 return msg.replace(imgfmt, 'IMGFMT')
506 def filter_qmp_imgfmt(qmsg):
507 def _filter(_key, value):
508 if is_str(value):
509 return filter_imgfmt(value)
510 return value
511 return filter_qmp(qmsg, _filter)
513 def filter_nbd_exports(output: str) -> str:
514 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
517 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
519 def log(msg: Msg,
520 filters: Iterable[Callable[[Msg], Msg]] = (),
521 indent: Optional[int] = None) -> None:
523 Logs either a string message or a JSON serializable message (like QMP).
524 If indent is provided, JSON serializable messages are pretty-printed.
526 for flt in filters:
527 msg = flt(msg)
528 if isinstance(msg, (dict, list)):
529 # Don't sort if it's already sorted
530 do_sort = not isinstance(msg, OrderedDict)
531 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
532 else:
533 test_logger.info(msg)
535 class Timeout:
536 def __init__(self, seconds, errmsg="Timeout"):
537 self.seconds = seconds
538 self.errmsg = errmsg
539 def __enter__(self):
540 if qemu_gdb or qemu_valgrind:
541 return self
542 signal.signal(signal.SIGALRM, self.timeout)
543 signal.setitimer(signal.ITIMER_REAL, self.seconds)
544 return self
545 def __exit__(self, exc_type, value, traceback):
546 if qemu_gdb or qemu_valgrind:
547 return False
548 signal.setitimer(signal.ITIMER_REAL, 0)
549 return False
550 def timeout(self, signum, frame):
551 raise Exception(self.errmsg)
553 def file_pattern(name):
554 return "{0}-{1}".format(os.getpid(), name)
556 class FilePath:
558 Context manager generating multiple file names. The generated files are
559 removed when exiting the context.
561 Example usage:
563 with FilePath('a.img', 'b.img') as (img_a, img_b):
564 # Use img_a and img_b here...
566 # a.img and b.img are automatically removed here.
568 By default images are created in iotests.test_dir. To create sockets use
569 iotests.sock_dir:
571 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
573 For convenience, calling with one argument yields a single file instead of
574 a tuple with one item.
577 def __init__(self, *names, base_dir=test_dir):
578 self.paths = [os.path.join(base_dir, file_pattern(name))
579 for name in names]
581 def __enter__(self):
582 if len(self.paths) == 1:
583 return self.paths[0]
584 else:
585 return self.paths
587 def __exit__(self, exc_type, exc_val, exc_tb):
588 for path in self.paths:
589 try:
590 os.remove(path)
591 except OSError:
592 pass
593 return False
596 def try_remove(img):
597 try:
598 os.remove(img)
599 except OSError:
600 pass
602 def file_path_remover():
603 for path in reversed(file_path_remover.paths):
604 try_remove(path)
607 def file_path(*names, base_dir=test_dir):
608 ''' Another way to get auto-generated filename that cleans itself up.
610 Use is as simple as:
612 img_a, img_b = file_path('a.img', 'b.img')
613 sock = file_path('socket')
616 if not hasattr(file_path_remover, 'paths'):
617 file_path_remover.paths = []
618 atexit.register(file_path_remover)
620 paths = []
621 for name in names:
622 filename = file_pattern(name)
623 path = os.path.join(base_dir, filename)
624 file_path_remover.paths.append(path)
625 paths.append(path)
627 return paths[0] if len(paths) == 1 else paths
629 def remote_filename(path):
630 if imgproto == 'file':
631 return path
632 elif imgproto == 'ssh':
633 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
634 else:
635 raise Exception("Protocol %s not supported" % (imgproto))
637 class VM(qtest.QEMUQtestMachine):
638 '''A QEMU VM'''
640 def __init__(self, path_suffix=''):
641 name = "qemu%s-%d" % (path_suffix, os.getpid())
642 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
643 if qemu_gdb and qemu_valgrind:
644 sys.stderr.write('gdb and valgrind are mutually exclusive\n')
645 sys.exit(1)
646 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
647 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
648 name=name,
649 base_temp_dir=test_dir,
650 sock_dir=sock_dir, qmp_timer=timer)
651 self._num_drives = 0
653 def _post_shutdown(self) -> None:
654 super()._post_shutdown()
655 if not qemu_valgrind or not self._popen:
656 return
657 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
658 if self.exitcode() == 99:
659 with open(valgrind_filename, encoding='utf-8') as f:
660 print(f.read())
661 else:
662 os.remove(valgrind_filename)
664 def _pre_launch(self) -> None:
665 super()._pre_launch()
666 if qemu_print:
667 # set QEMU binary output to stdout
668 self._close_qemu_log_file()
670 def add_object(self, opts):
671 self._args.append('-object')
672 self._args.append(opts)
673 return self
675 def add_device(self, opts):
676 self._args.append('-device')
677 self._args.append(opts)
678 return self
680 def add_drive_raw(self, opts):
681 self._args.append('-drive')
682 self._args.append(opts)
683 return self
685 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
686 '''Add a virtio-blk drive to the VM'''
687 options = ['if=%s' % interface,
688 'id=drive%d' % self._num_drives]
690 if path is not None:
691 options.append('file=%s' % path)
692 options.append('format=%s' % img_format)
693 options.append('cache=%s' % cachemode)
694 options.append('aio=%s' % aiomode)
696 if opts:
697 options.append(opts)
699 if img_format == 'luks' and 'key-secret' not in opts:
700 # default luks support
701 if luks_default_secret_object not in self._args:
702 self.add_object(luks_default_secret_object)
704 options.append(luks_default_key_secret_opt)
706 self._args.append('-drive')
707 self._args.append(','.join(options))
708 self._num_drives += 1
709 return self
711 def add_blockdev(self, opts):
712 self._args.append('-blockdev')
713 if isinstance(opts, str):
714 self._args.append(opts)
715 else:
716 self._args.append(','.join(opts))
717 return self
719 def add_incoming(self, addr):
720 self._args.append('-incoming')
721 self._args.append(addr)
722 return self
724 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
725 cmd = 'human-monitor-command'
726 kwargs: Dict[str, Any] = {'command-line': command_line}
727 if use_log:
728 return self.qmp_log(cmd, **kwargs)
729 else:
730 return self.qmp(cmd, **kwargs)
732 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
733 """Pause drive r/w operations"""
734 if not event:
735 self.pause_drive(drive, "read_aio")
736 self.pause_drive(drive, "write_aio")
737 return
738 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
740 def resume_drive(self, drive: str) -> None:
741 """Resume drive r/w operations"""
742 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
744 def hmp_qemu_io(self, drive: str, cmd: str,
745 use_log: bool = False, qdev: bool = False) -> QMPMessage:
746 """Write to a given drive using an HMP command"""
747 d = '-d ' if qdev else ''
748 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
750 def flatten_qmp_object(self, obj, output=None, basestr=''):
751 if output is None:
752 output = {}
753 if isinstance(obj, list):
754 for i, item in enumerate(obj):
755 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
756 elif isinstance(obj, dict):
757 for key in obj:
758 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
759 else:
760 output[basestr[:-1]] = obj # Strip trailing '.'
761 return output
763 def qmp_to_opts(self, obj):
764 obj = self.flatten_qmp_object(obj)
765 output_list = []
766 for key in obj:
767 output_list += [key + '=' + obj[key]]
768 return ','.join(output_list)
770 def get_qmp_events_filtered(self, wait=60.0):
771 result = []
772 for ev in self.get_qmp_events(wait=wait):
773 result.append(filter_qmp_event(ev))
774 return result
776 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
777 full_cmd = OrderedDict((
778 ("execute", cmd),
779 ("arguments", ordered_qmp(kwargs))
781 log(full_cmd, filters, indent=indent)
782 result = self.qmp(cmd, **kwargs)
783 log(result, filters, indent=indent)
784 return result
786 # Returns None on success, and an error string on failure
787 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
788 pre_finalize=None, cancel=False, wait=60.0):
790 run_job moves a job from creation through to dismissal.
792 :param job: String. ID of recently-launched job
793 :param auto_finalize: Bool. True if the job was launched with
794 auto_finalize. Defaults to True.
795 :param auto_dismiss: Bool. True if the job was launched with
796 auto_dismiss=True. Defaults to False.
797 :param pre_finalize: Callback. A callable that takes no arguments to be
798 invoked prior to issuing job-finalize, if any.
799 :param cancel: Bool. When true, cancels the job after the pre_finalize
800 callback.
801 :param wait: Float. Timeout value specifying how long to wait for any
802 event, in seconds. Defaults to 60.0.
804 match_device = {'data': {'device': job}}
805 match_id = {'data': {'id': job}}
806 events = [
807 ('BLOCK_JOB_COMPLETED', match_device),
808 ('BLOCK_JOB_CANCELLED', match_device),
809 ('BLOCK_JOB_ERROR', match_device),
810 ('BLOCK_JOB_READY', match_device),
811 ('BLOCK_JOB_PENDING', match_id),
812 ('JOB_STATUS_CHANGE', match_id)
814 error = None
815 while True:
816 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
817 if ev['event'] != 'JOB_STATUS_CHANGE':
818 log(ev)
819 continue
820 status = ev['data']['status']
821 if status == 'aborting':
822 result = self.qmp('query-jobs')
823 for j in result['return']:
824 if j['id'] == job:
825 error = j['error']
826 log('Job failed: %s' % (j['error']))
827 elif status == 'ready':
828 self.qmp_log('job-complete', id=job)
829 elif status == 'pending' and not auto_finalize:
830 if pre_finalize:
831 pre_finalize()
832 if cancel:
833 self.qmp_log('job-cancel', id=job)
834 else:
835 self.qmp_log('job-finalize', id=job)
836 elif status == 'concluded' and not auto_dismiss:
837 self.qmp_log('job-dismiss', id=job)
838 elif status == 'null':
839 return error
841 # Returns None on success, and an error string on failure
842 def blockdev_create(self, options, job_id='job0', filters=None):
843 if filters is None:
844 filters = [filter_qmp_testfiles]
845 result = self.qmp_log('blockdev-create', filters=filters,
846 job_id=job_id, options=options)
848 if 'return' in result:
849 assert result['return'] == {}
850 job_result = self.run_job(job_id)
851 else:
852 job_result = result['error']
854 log("")
855 return job_result
857 def enable_migration_events(self, name):
858 log('Enabling migration QMP events on %s...' % name)
859 log(self.qmp('migrate-set-capabilities', capabilities=[
861 'capability': 'events',
862 'state': True
866 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
867 while True:
868 event = self.event_wait('MIGRATION')
869 # We use the default timeout, and with a timeout, event_wait()
870 # never returns None
871 assert event
873 log(event, filters=[filter_qmp_event])
874 if event['data']['status'] in ('completed', 'failed'):
875 break
877 if event['data']['status'] == 'completed':
878 # The event may occur in finish-migrate, so wait for the expected
879 # post-migration runstate
880 runstate = None
881 while runstate != expect_runstate:
882 runstate = self.qmp('query-status')['return']['status']
883 return True
884 else:
885 return False
887 def node_info(self, node_name):
888 nodes = self.qmp('query-named-block-nodes')
889 for x in nodes['return']:
890 if x['node-name'] == node_name:
891 return x
892 return None
894 def query_bitmaps(self):
895 res = self.qmp("query-named-block-nodes")
896 return {device['node-name']: device['dirty-bitmaps']
897 for device in res['return'] if 'dirty-bitmaps' in device}
899 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
901 get a specific bitmap from the object returned by query_bitmaps.
902 :param recording: If specified, filter results by the specified value.
903 :param bitmaps: If specified, use it instead of call query_bitmaps()
905 if bitmaps is None:
906 bitmaps = self.query_bitmaps()
908 for bitmap in bitmaps[node_name]:
909 if bitmap.get('name', '') == bitmap_name:
910 if recording is None or bitmap.get('recording') == recording:
911 return bitmap
912 return None
914 def check_bitmap_status(self, node_name, bitmap_name, fields):
915 ret = self.get_bitmap(node_name, bitmap_name)
917 return fields.items() <= ret.items()
919 def assert_block_path(self, root, path, expected_node, graph=None):
921 Check whether the node under the given path in the block graph
922 is @expected_node.
924 @root is the node name of the node where the @path is rooted.
926 @path is a string that consists of child names separated by
927 slashes. It must begin with a slash.
929 Examples for @root + @path:
930 - root="qcow2-node", path="/backing/file"
931 - root="quorum-node", path="/children.2/file"
933 Hypothetically, @path could be empty, in which case it would
934 point to @root. However, in practice this case is not useful
935 and hence not allowed.
937 @expected_node may be None. (All elements of the path but the
938 leaf must still exist.)
940 @graph may be None or the result of an x-debug-query-block-graph
941 call that has already been performed.
943 if graph is None:
944 graph = self.qmp('x-debug-query-block-graph')['return']
946 iter_path = iter(path.split('/'))
948 # Must start with a /
949 assert next(iter_path) == ''
951 node = next((node for node in graph['nodes'] if node['name'] == root),
952 None)
954 # An empty @path is not allowed, so the root node must be present
955 assert node is not None, 'Root node %s not found' % root
957 for child_name in iter_path:
958 assert node is not None, 'Cannot follow path %s%s' % (root, path)
960 try:
961 node_id = next(edge['child'] for edge in graph['edges']
962 if (edge['parent'] == node['id'] and
963 edge['name'] == child_name))
965 node = next(node for node in graph['nodes']
966 if node['id'] == node_id)
968 except StopIteration:
969 node = None
971 if node is None:
972 assert expected_node is None, \
973 'No node found under %s (but expected %s)' % \
974 (path, expected_node)
975 else:
976 assert node['name'] == expected_node, \
977 'Found node %s under %s (but expected %s)' % \
978 (node['name'], path, expected_node)
980 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
982 class QMPTestCase(unittest.TestCase):
983 '''Abstract base class for QMP test cases'''
985 def __init__(self, *args, **kwargs):
986 super().__init__(*args, **kwargs)
987 # Many users of this class set a VM property we rely on heavily
988 # in the methods below.
989 self.vm = None
991 def dictpath(self, d, path):
992 '''Traverse a path in a nested dict'''
993 for component in path.split('/'):
994 m = index_re.match(component)
995 if m:
996 component, idx = m.groups()
997 idx = int(idx)
999 if not isinstance(d, dict) or component not in d:
1000 self.fail(f'failed path traversal for "{path}" in "{d}"')
1001 d = d[component]
1003 if m:
1004 if not isinstance(d, list):
1005 self.fail(f'path component "{component}" in "{path}" '
1006 f'is not a list in "{d}"')
1007 try:
1008 d = d[idx]
1009 except IndexError:
1010 self.fail(f'invalid index "{idx}" in path "{path}" '
1011 f'in "{d}"')
1012 return d
1014 def assert_qmp_absent(self, d, path):
1015 try:
1016 result = self.dictpath(d, path)
1017 except AssertionError:
1018 return
1019 self.fail('path "%s" has value "%s"' % (path, str(result)))
1021 def assert_qmp(self, d, path, value):
1022 '''Assert that the value for a specific path in a QMP dict
1023 matches. When given a list of values, assert that any of
1024 them matches.'''
1026 result = self.dictpath(d, path)
1028 # [] makes no sense as a list of valid values, so treat it as
1029 # an actual single value.
1030 if isinstance(value, list) and value != []:
1031 for v in value:
1032 if result == v:
1033 return
1034 self.fail('no match for "%s" in %s' % (str(result), str(value)))
1035 else:
1036 self.assertEqual(result, value,
1037 '"%s" is "%s", expected "%s"'
1038 % (path, str(result), str(value)))
1040 def assert_no_active_block_jobs(self):
1041 result = self.vm.qmp('query-block-jobs')
1042 self.assert_qmp(result, 'return', [])
1044 def assert_has_block_node(self, node_name=None, file_name=None):
1045 """Issue a query-named-block-nodes and assert node_name and/or
1046 file_name is present in the result"""
1047 def check_equal_or_none(a, b):
1048 return a is None or b is None or a == b
1049 assert node_name or file_name
1050 result = self.vm.qmp('query-named-block-nodes')
1051 for x in result["return"]:
1052 if check_equal_or_none(x.get("node-name"), node_name) and \
1053 check_equal_or_none(x.get("file"), file_name):
1054 return
1055 self.fail("Cannot find %s %s in result:\n%s" %
1056 (node_name, file_name, result))
1058 def assert_json_filename_equal(self, json_filename, reference):
1059 '''Asserts that the given filename is a json: filename and that its
1060 content is equal to the given reference object'''
1061 self.assertEqual(json_filename[:5], 'json:')
1062 self.assertEqual(
1063 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1064 self.vm.flatten_qmp_object(reference)
1067 def cancel_and_wait(self, drive='drive0', force=False,
1068 resume=False, wait=60.0):
1069 '''Cancel a block job and wait for it to finish, returning the event'''
1070 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1071 self.assert_qmp(result, 'return', {})
1073 if resume:
1074 self.vm.resume_drive(drive)
1076 cancelled = False
1077 result = None
1078 while not cancelled:
1079 for event in self.vm.get_qmp_events(wait=wait):
1080 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1081 event['event'] == 'BLOCK_JOB_CANCELLED':
1082 self.assert_qmp(event, 'data/device', drive)
1083 result = event
1084 cancelled = True
1085 elif event['event'] == 'JOB_STATUS_CHANGE':
1086 self.assert_qmp(event, 'data/id', drive)
1089 self.assert_no_active_block_jobs()
1090 return result
1092 def wait_until_completed(self, drive='drive0', check_offset=True,
1093 wait=60.0, error=None):
1094 '''Wait for a block job to finish, returning the event'''
1095 while True:
1096 for event in self.vm.get_qmp_events(wait=wait):
1097 if event['event'] == 'BLOCK_JOB_COMPLETED':
1098 self.assert_qmp(event, 'data/device', drive)
1099 if error is None:
1100 self.assert_qmp_absent(event, 'data/error')
1101 if check_offset:
1102 self.assert_qmp(event, 'data/offset',
1103 event['data']['len'])
1104 else:
1105 self.assert_qmp(event, 'data/error', error)
1106 self.assert_no_active_block_jobs()
1107 return event
1108 if event['event'] == 'JOB_STATUS_CHANGE':
1109 self.assert_qmp(event, 'data/id', drive)
1111 def wait_ready(self, drive='drive0'):
1112 """Wait until a BLOCK_JOB_READY event, and return the event."""
1113 return self.vm.events_wait([
1114 ('BLOCK_JOB_READY',
1115 {'data': {'type': 'mirror', 'device': drive}}),
1116 ('BLOCK_JOB_READY',
1117 {'data': {'type': 'commit', 'device': drive}})
1120 def wait_ready_and_cancel(self, drive='drive0'):
1121 self.wait_ready(drive=drive)
1122 event = self.cancel_and_wait(drive=drive)
1123 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1124 self.assert_qmp(event, 'data/type', 'mirror')
1125 self.assert_qmp(event, 'data/offset', event['data']['len'])
1127 def complete_and_wait(self, drive='drive0', wait_ready=True,
1128 completion_error=None):
1129 '''Complete a block job and wait for it to finish'''
1130 if wait_ready:
1131 self.wait_ready(drive=drive)
1133 result = self.vm.qmp('block-job-complete', device=drive)
1134 self.assert_qmp(result, 'return', {})
1136 event = self.wait_until_completed(drive=drive, error=completion_error)
1137 self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1139 def pause_wait(self, job_id='job0'):
1140 with Timeout(3, "Timeout waiting for job to pause"):
1141 while True:
1142 result = self.vm.qmp('query-block-jobs')
1143 found = False
1144 for job in result['return']:
1145 if job['device'] == job_id:
1146 found = True
1147 if job['paused'] and not job['busy']:
1148 return job
1149 break
1150 assert found
1152 def pause_job(self, job_id='job0', wait=True):
1153 result = self.vm.qmp('block-job-pause', device=job_id)
1154 self.assert_qmp(result, 'return', {})
1155 if wait:
1156 return self.pause_wait(job_id)
1157 return result
1159 def case_skip(self, reason):
1160 '''Skip this test case'''
1161 case_notrun(reason)
1162 self.skipTest(reason)
1165 def notrun(reason):
1166 '''Skip this test suite'''
1167 # Each test in qemu-iotests has a number ("seq")
1168 seq = os.path.basename(sys.argv[0])
1170 with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \
1171 as outfile:
1172 outfile.write(reason + '\n')
1173 logger.warning("%s not run: %s", seq, reason)
1174 sys.exit(0)
1176 def case_notrun(reason):
1177 '''Mark this test case as not having been run (without actually
1178 skipping it, that is left to the caller). See
1179 QMPTestCase.case_skip() for a variant that actually skips the
1180 current test case.'''
1182 # Each test in qemu-iotests has a number ("seq")
1183 seq = os.path.basename(sys.argv[0])
1185 with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \
1186 as outfile:
1187 outfile.write(' [case not run] ' + reason + '\n')
1189 def _verify_image_format(supported_fmts: Sequence[str] = (),
1190 unsupported_fmts: Sequence[str] = ()) -> None:
1191 if 'generic' in supported_fmts and \
1192 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1193 # similar to
1194 # _supported_fmt generic
1195 # for bash tests
1196 supported_fmts = ()
1198 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1199 if not_sup or (imgfmt in unsupported_fmts):
1200 notrun('not suitable for this image format: %s' % imgfmt)
1202 if imgfmt == 'luks':
1203 verify_working_luks()
1205 def _verify_protocol(supported: Sequence[str] = (),
1206 unsupported: Sequence[str] = ()) -> None:
1207 assert not (supported and unsupported)
1209 if 'generic' in supported:
1210 return
1212 not_sup = supported and (imgproto not in supported)
1213 if not_sup or (imgproto in unsupported):
1214 notrun('not suitable for this protocol: %s' % imgproto)
1216 def _verify_platform(supported: Sequence[str] = (),
1217 unsupported: Sequence[str] = ()) -> None:
1218 if any((sys.platform.startswith(x) for x in unsupported)):
1219 notrun('not suitable for this OS: %s' % sys.platform)
1221 if supported:
1222 if not any((sys.platform.startswith(x) for x in supported)):
1223 notrun('not suitable for this OS: %s' % sys.platform)
1225 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1226 if supported_cache_modes and (cachemode not in supported_cache_modes):
1227 notrun('not suitable for this cache mode: %s' % cachemode)
1229 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1230 if supported_aio_modes and (aiomode not in supported_aio_modes):
1231 notrun('not suitable for this aio mode: %s' % aiomode)
1233 def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1234 usf_list = list(set(required_formats) - set(supported_formats()))
1235 if usf_list:
1236 notrun(f'formats {usf_list} are not whitelisted')
1239 def _verify_virtio_blk() -> None:
1240 out = qemu_pipe('-M', 'none', '-device', 'help')
1241 if 'virtio-blk' not in out:
1242 notrun('Missing virtio-blk in QEMU binary')
1244 def _verify_virtio_scsi_pci_or_ccw() -> None:
1245 out = qemu_pipe('-M', 'none', '-device', 'help')
1246 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1247 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1250 def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1251 imgopts = os.environ.get('IMGOPTS')
1252 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1253 # but it supported only for bash tests. We don't have a concept of global
1254 # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1255 # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1256 unsup = list(unsupported) + ['$']
1257 if imgopts and any(x in imgopts for x in unsup):
1258 notrun(f'not suitable for this imgopts: {imgopts}')
1261 def supports_quorum():
1262 return 'quorum' in qemu_img_pipe('--help')
1264 def verify_quorum():
1265 '''Skip test suite if quorum support is not available'''
1266 if not supports_quorum():
1267 notrun('quorum support missing')
1269 def has_working_luks() -> Tuple[bool, str]:
1271 Check whether our LUKS driver can actually create images
1272 (this extends to LUKS encryption for qcow2).
1274 If not, return the reason why.
1277 img_file = f'{test_dir}/luks-test.luks'
1278 (output, status) = \
1279 qemu_img_pipe_and_status('create', '-f', 'luks',
1280 '--object', luks_default_secret_object,
1281 '-o', luks_default_key_secret_opt,
1282 '-o', 'iter-time=10',
1283 img_file, '1G')
1284 try:
1285 os.remove(img_file)
1286 except OSError:
1287 pass
1289 if status != 0:
1290 reason = output
1291 for line in output.splitlines():
1292 if img_file + ':' in line:
1293 reason = line.split(img_file + ':', 1)[1].strip()
1294 break
1296 return (False, reason)
1297 else:
1298 return (True, '')
1300 def verify_working_luks():
1302 Skip test suite if LUKS does not work
1304 (working, reason) = has_working_luks()
1305 if not working:
1306 notrun(reason)
1308 def qemu_pipe(*args: str) -> str:
1310 Run qemu with an option to print something and exit (e.g. a help option).
1312 :return: QEMU's stdout output.
1314 full_args = [qemu_prog] + qemu_opts + list(args)
1315 output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1316 return output
1318 def supported_formats(read_only=False):
1319 '''Set 'read_only' to True to check ro-whitelist
1320 Otherwise, rw-whitelist is checked'''
1322 if not hasattr(supported_formats, "formats"):
1323 supported_formats.formats = {}
1325 if read_only not in supported_formats.formats:
1326 format_message = qemu_pipe("-drive", "format=help")
1327 line = 1 if read_only else 0
1328 supported_formats.formats[read_only] = \
1329 format_message.splitlines()[line].split(":")[1].split()
1331 return supported_formats.formats[read_only]
1333 def skip_if_unsupported(required_formats=(), read_only=False):
1334 '''Skip Test Decorator
1335 Runs the test if all the required formats are whitelisted'''
1336 def skip_test_decorator(func):
1337 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1338 **kwargs: Dict[str, Any]) -> None:
1339 if callable(required_formats):
1340 fmts = required_formats(test_case)
1341 else:
1342 fmts = required_formats
1344 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1345 if usf_list:
1346 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1347 test_case.case_skip(msg)
1348 else:
1349 func(test_case, *args, **kwargs)
1350 return func_wrapper
1351 return skip_test_decorator
1353 def skip_for_formats(formats: Sequence[str] = ()) \
1354 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1355 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1356 '''Skip Test Decorator
1357 Skips the test for the given formats'''
1358 def skip_test_decorator(func):
1359 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1360 **kwargs: Dict[str, Any]) -> None:
1361 if imgfmt in formats:
1362 msg = f'{test_case}: Skipped for format {imgfmt}'
1363 test_case.case_skip(msg)
1364 else:
1365 func(test_case, *args, **kwargs)
1366 return func_wrapper
1367 return skip_test_decorator
1369 def skip_if_user_is_root(func):
1370 '''Skip Test Decorator
1371 Runs the test only without root permissions'''
1372 def func_wrapper(*args, **kwargs):
1373 if os.getuid() == 0:
1374 case_notrun('{}: cannot be run as root'.format(args[0]))
1375 return None
1376 else:
1377 return func(*args, **kwargs)
1378 return func_wrapper
1380 # We need to filter out the time taken from the output so that
1381 # qemu-iotest can reliably diff the results against master output,
1382 # and hide skipped tests from the reference output.
1384 class ReproducibleTestResult(unittest.TextTestResult):
1385 def addSkip(self, test, reason):
1386 # Same as TextTestResult, but print dot instead of "s"
1387 unittest.TestResult.addSkip(self, test, reason)
1388 if self.showAll:
1389 self.stream.writeln("skipped {0!r}".format(reason))
1390 elif self.dots:
1391 self.stream.write(".")
1392 self.stream.flush()
1394 class ReproducibleStreamWrapper:
1395 def __init__(self, stream: TextIO):
1396 self.stream = stream
1398 def __getattr__(self, attr):
1399 if attr in ('stream', '__getstate__'):
1400 raise AttributeError(attr)
1401 return getattr(self.stream, attr)
1403 def write(self, arg=None):
1404 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1405 arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1406 self.stream.write(arg)
1408 class ReproducibleTestRunner(unittest.TextTestRunner):
1409 def __init__(self, stream: Optional[TextIO] = None,
1410 resultclass: Type[unittest.TestResult] =
1411 ReproducibleTestResult,
1412 **kwargs: Any) -> None:
1413 rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1414 super().__init__(stream=rstream, # type: ignore
1415 descriptions=True,
1416 resultclass=resultclass,
1417 **kwargs)
1419 def execute_unittest(argv: List[str], debug: bool = False) -> None:
1420 """Executes unittests within the calling module."""
1422 # Some tests have warnings, especially ResourceWarnings for unclosed
1423 # files and sockets. Ignore them for now to ensure reproducibility of
1424 # the test output.
1425 unittest.main(argv=argv,
1426 testRunner=ReproducibleTestRunner,
1427 verbosity=2 if debug else 1,
1428 warnings=None if sys.warnoptions else 'ignore')
1430 def execute_setup_common(supported_fmts: Sequence[str] = (),
1431 supported_platforms: Sequence[str] = (),
1432 supported_cache_modes: Sequence[str] = (),
1433 supported_aio_modes: Sequence[str] = (),
1434 unsupported_fmts: Sequence[str] = (),
1435 supported_protocols: Sequence[str] = (),
1436 unsupported_protocols: Sequence[str] = (),
1437 required_fmts: Sequence[str] = (),
1438 unsupported_imgopts: Sequence[str] = ()) -> bool:
1440 Perform necessary setup for either script-style or unittest-style tests.
1442 :return: Bool; Whether or not debug mode has been requested via the CLI.
1444 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1446 debug = '-d' in sys.argv
1447 if debug:
1448 sys.argv.remove('-d')
1449 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1451 _verify_image_format(supported_fmts, unsupported_fmts)
1452 _verify_protocol(supported_protocols, unsupported_protocols)
1453 _verify_platform(supported=supported_platforms)
1454 _verify_cache_mode(supported_cache_modes)
1455 _verify_aio_mode(supported_aio_modes)
1456 _verify_formats(required_fmts)
1457 _verify_virtio_blk()
1458 _verify_imgopts(unsupported_imgopts)
1460 return debug
1462 def execute_test(*args, test_function=None, **kwargs):
1463 """Run either unittest or script-style tests."""
1465 debug = execute_setup_common(*args, **kwargs)
1466 if not test_function:
1467 execute_unittest(sys.argv, debug)
1468 else:
1469 test_function()
1471 def activate_logging():
1472 """Activate iotests.log() output to stdout for script-style tests."""
1473 handler = logging.StreamHandler(stream=sys.stdout)
1474 formatter = logging.Formatter('%(message)s')
1475 handler.setFormatter(formatter)
1476 test_logger.addHandler(handler)
1477 test_logger.setLevel(logging.INFO)
1478 test_logger.propagate = False
1480 # This is called from script-style iotests without a single point of entry
1481 def script_initialize(*args, **kwargs):
1482 """Initialize script-style tests without running any tests."""
1483 activate_logging()
1484 execute_setup_common(*args, **kwargs)
1486 # This is called from script-style iotests with a single point of entry
1487 def script_main(test_function, *args, **kwargs):
1488 """Run script-style tests outside of the unittest framework"""
1489 activate_logging()
1490 execute_test(*args, test_function=test_function, **kwargs)
1492 # This is called from unittest style iotests
1493 def main(*args, **kwargs):
1494 """Run tests using the unittest framework"""
1495 execute_test(*args, **kwargs)