qemu-iotests: allow valgrind to read/delete the generated log file
[qemu.git] / tests / qemu-iotests / iotests.py
blob85d8c0abbb17a50f67ccbcc4a8868ae2ec5e0fbf
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import atexit
20 import bz2
21 from collections import OrderedDict
22 import faulthandler
23 import json
24 import logging
25 import os
26 import re
27 import shutil
28 import signal
29 import struct
30 import subprocess
31 import sys
32 import time
33 from typing import (Any, Callable, Dict, Iterable,
34 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
35 import unittest
37 from contextlib import contextmanager
39 # pylint: disable=import-error, wrong-import-position
40 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
41 from qemu.machine import qtest
42 from qemu.qmp import QMPMessage
44 # Use this logger for logging messages directly from the iotests module
45 logger = logging.getLogger('qemu.iotests')
46 logger.addHandler(logging.NullHandler())
48 # Use this logger for messages that ought to be used for diff output.
49 test_logger = logging.getLogger('qemu.iotests.diff_io')
52 faulthandler.enable()
54 # This will not work if arguments contain spaces but is necessary if we
55 # want to support the override options that ./check supports.
56 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57 if os.environ.get('QEMU_IMG_OPTIONS'):
58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
60 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61 if os.environ.get('QEMU_IO_OPTIONS'):
62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
64 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66 qemu_io_args_no_fmt += \
67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70 qemu_nbd_args = [qemu_nbd_prog]
71 if os.environ.get('QEMU_NBD_OPTIONS'):
72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
74 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
77 gdb_qemu_env = os.environ.get('GDB_OPTIONS')
78 qemu_gdb = []
79 if gdb_qemu_env:
80 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
82 imgfmt = os.environ.get('IMGFMT', 'raw')
83 imgproto = os.environ.get('IMGPROTO', 'file')
84 output_dir = os.environ.get('OUTPUT_DIR', '.')
86 try:
87 test_dir = os.environ['TEST_DIR']
88 sock_dir = os.environ['SOCK_DIR']
89 cachemode = os.environ['CACHEMODE']
90 aiomode = os.environ['AIOMODE']
91 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
92 except KeyError:
93 # We are using these variables as proxies to indicate that we're
94 # not being run via "check". There may be other things set up by
95 # "check" that individual test cases rely on.
96 sys.stderr.write('Please run this test via the "check" script\n')
97 sys.exit(os.EX_USAGE)
99 qemu_valgrind = []
100 if os.environ.get('VALGRIND_QEMU') == "y" and \
101 os.environ.get('NO_VALGRIND') != "y":
102 valgrind_logfile = "--log-file=" + test_dir
103 # %p allows to put the valgrind process PID, since
104 # we don't know it a priori (subprocess.Popen is
105 # not yet invoked)
106 valgrind_logfile += "/%p.valgrind"
108 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
110 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
112 luks_default_secret_object = 'secret,id=keysec0,data=' + \
113 os.environ.get('IMGKEYSECRET', '')
114 luks_default_key_secret_opt = 'key-secret=keysec0'
116 sample_img_dir = os.environ['SAMPLE_IMG_DIR']
119 def unarchive_sample_image(sample, fname):
120 sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
121 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
122 shutil.copyfileobj(f_in, f_out)
125 def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
126 connect_stderr: bool = True) -> Tuple[str, int]:
128 Run a tool and return both its output and its exit code
130 stderr = subprocess.STDOUT if connect_stderr else None
131 with subprocess.Popen(args, stdout=subprocess.PIPE,
132 stderr=stderr, universal_newlines=True) as subp:
133 output = subp.communicate()[0]
134 if subp.returncode < 0:
135 cmd = ' '.join(args)
136 sys.stderr.write(f'{tool} received signal \
137 {-subp.returncode}: {cmd}\n')
138 return (output, subp.returncode)
140 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
142 Run qemu-img and return both its output and its exit code
144 full_args = qemu_img_args + list(args)
145 return qemu_tool_pipe_and_status('qemu-img', full_args)
147 def qemu_img(*args: str) -> int:
148 '''Run qemu-img and return the exit code'''
149 return qemu_img_pipe_and_status(*args)[1]
151 def ordered_qmp(qmsg, conv_keys=True):
152 # Dictionaries are not ordered prior to 3.6, therefore:
153 if isinstance(qmsg, list):
154 return [ordered_qmp(atom) for atom in qmsg]
155 if isinstance(qmsg, dict):
156 od = OrderedDict()
157 for k, v in sorted(qmsg.items()):
158 if conv_keys:
159 k = k.replace('_', '-')
160 od[k] = ordered_qmp(v, conv_keys=False)
161 return od
162 return qmsg
164 def qemu_img_create(*args):
165 args = list(args)
167 # default luks support
168 if '-f' in args and args[args.index('-f') + 1] == 'luks':
169 if '-o' in args:
170 i = args.index('-o')
171 if 'key-secret' not in args[i + 1]:
172 args[i + 1].append(luks_default_key_secret_opt)
173 args.insert(i + 2, '--object')
174 args.insert(i + 3, luks_default_secret_object)
175 else:
176 args = ['-o', luks_default_key_secret_opt,
177 '--object', luks_default_secret_object] + args
179 args.insert(0, 'create')
181 return qemu_img(*args)
183 def qemu_img_measure(*args):
184 return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
186 def qemu_img_check(*args):
187 return json.loads(qemu_img_pipe("check", "--output", "json", *args))
189 def qemu_img_verbose(*args):
190 '''Run qemu-img without suppressing its output and return the exit code'''
191 exitcode = subprocess.call(qemu_img_args + list(args))
192 if exitcode < 0:
193 sys.stderr.write('qemu-img received signal %i: %s\n'
194 % (-exitcode, ' '.join(qemu_img_args + list(args))))
195 return exitcode
197 def qemu_img_pipe(*args: str) -> str:
198 '''Run qemu-img and return its output'''
199 return qemu_img_pipe_and_status(*args)[0]
201 def qemu_img_log(*args):
202 result = qemu_img_pipe(*args)
203 log(result, filters=[filter_testfiles])
204 return result
206 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
207 args = ['info']
208 if imgopts:
209 args.append('--image-opts')
210 else:
211 args += ['-f', imgfmt]
212 args += extra_args
213 args.append(filename)
215 output = qemu_img_pipe(*args)
216 if not filter_path:
217 filter_path = filename
218 log(filter_img_info(output, filter_path))
220 def qemu_io(*args):
221 '''Run qemu-io and return the stdout data'''
222 args = qemu_io_args + list(args)
223 return qemu_tool_pipe_and_status('qemu-io', args)[0]
225 def qemu_io_log(*args):
226 result = qemu_io(*args)
227 log(result, filters=[filter_testfiles, filter_qemu_io])
228 return result
230 def qemu_io_silent(*args):
231 '''Run qemu-io and return the exit code, suppressing stdout'''
232 if '-f' in args or '--image-opts' in args:
233 default_args = qemu_io_args_no_fmt
234 else:
235 default_args = qemu_io_args
237 args = default_args + list(args)
238 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
239 if exitcode < 0:
240 sys.stderr.write('qemu-io received signal %i: %s\n' %
241 (-exitcode, ' '.join(args)))
242 return exitcode
244 def qemu_io_silent_check(*args):
245 '''Run qemu-io and return the true if subprocess returned 0'''
246 args = qemu_io_args + list(args)
247 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
248 stderr=subprocess.STDOUT)
249 return exitcode == 0
251 class QemuIoInteractive:
252 def __init__(self, *args):
253 self.args = qemu_io_args_no_fmt + list(args)
254 # We need to keep the Popen objext around, and not
255 # close it immediately. Therefore, disable the pylint check:
256 # pylint: disable=consider-using-with
257 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
258 stdout=subprocess.PIPE,
259 stderr=subprocess.STDOUT,
260 universal_newlines=True)
261 out = self._p.stdout.read(9)
262 if out != 'qemu-io> ':
263 # Most probably qemu-io just failed to start.
264 # Let's collect the whole output and exit.
265 out += self._p.stdout.read()
266 self._p.wait(timeout=1)
267 raise ValueError(out)
269 def close(self):
270 self._p.communicate('q\n')
272 def _read_output(self):
273 pattern = 'qemu-io> '
274 n = len(pattern)
275 pos = 0
276 s = []
277 while pos != n:
278 c = self._p.stdout.read(1)
279 # check unexpected EOF
280 assert c != ''
281 s.append(c)
282 if c == pattern[pos]:
283 pos += 1
284 else:
285 pos = 0
287 return ''.join(s[:-n])
289 def cmd(self, cmd):
290 # quit command is in close(), '\n' is added automatically
291 assert '\n' not in cmd
292 cmd = cmd.strip()
293 assert cmd not in ('q', 'quit')
294 self._p.stdin.write(cmd + '\n')
295 self._p.stdin.flush()
296 return self._read_output()
299 def qemu_nbd(*args):
300 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
301 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
303 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
304 '''Run qemu-nbd in daemon mode and return both the parent's exit code
305 and its output in case of an error'''
306 full_args = qemu_nbd_args + ['--fork'] + list(args)
307 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
308 connect_stderr=False)
309 return returncode, output if returncode else ''
311 def qemu_nbd_list_log(*args: str) -> str:
312 '''Run qemu-nbd to list remote exports'''
313 full_args = [qemu_nbd_prog, '-L'] + list(args)
314 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
315 log(output, filters=[filter_testfiles, filter_nbd_exports])
316 return output
318 @contextmanager
319 def qemu_nbd_popen(*args):
320 '''Context manager running qemu-nbd within the context'''
321 pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
323 assert not os.path.exists(pid_file)
325 cmd = list(qemu_nbd_args)
326 cmd.extend(('--persistent', '--pid-file', pid_file))
327 cmd.extend(args)
329 log('Start NBD server')
330 with subprocess.Popen(cmd) as p:
331 try:
332 while not os.path.exists(pid_file):
333 if p.poll() is not None:
334 raise RuntimeError(
335 "qemu-nbd terminated with exit code {}: {}"
336 .format(p.returncode, ' '.join(cmd)))
338 time.sleep(0.01)
339 yield
340 finally:
341 if os.path.exists(pid_file):
342 os.remove(pid_file)
343 log('Kill NBD server')
344 p.kill()
345 p.wait()
347 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
348 '''Return True if two image files are identical'''
349 return qemu_img('compare', '-f', fmt1,
350 '-F', fmt2, img1, img2) == 0
352 def create_image(name, size):
353 '''Create a fully-allocated raw image with sector markers'''
354 with open(name, 'wb') as file:
355 i = 0
356 while i < size:
357 sector = struct.pack('>l504xl', i // 512, i // 512)
358 file.write(sector)
359 i = i + 512
361 def image_size(img):
362 '''Return image's virtual size'''
363 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
364 return json.loads(r)['virtual-size']
366 def is_str(val):
367 return isinstance(val, str)
369 test_dir_re = re.compile(r"%s" % test_dir)
370 def filter_test_dir(msg):
371 return test_dir_re.sub("TEST_DIR", msg)
373 win32_re = re.compile(r"\r")
374 def filter_win32(msg):
375 return win32_re.sub("", msg)
377 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
378 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
379 r"and [0-9\/.inf]* ops\/sec\)")
380 def filter_qemu_io(msg):
381 msg = filter_win32(msg)
382 return qemu_io_re.sub("X ops; XX:XX:XX.X "
383 "(XXX YYY/sec and XXX ops/sec)", msg)
385 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
386 def filter_chown(msg):
387 return chown_re.sub("chown UID:GID", msg)
389 def filter_qmp_event(event):
390 '''Filter a QMP event dict'''
391 event = dict(event)
392 if 'timestamp' in event:
393 event['timestamp']['seconds'] = 'SECS'
394 event['timestamp']['microseconds'] = 'USECS'
395 return event
397 def filter_qmp(qmsg, filter_fn):
398 '''Given a string filter, filter a QMP object's values.
399 filter_fn takes a (key, value) pair.'''
400 # Iterate through either lists or dicts;
401 if isinstance(qmsg, list):
402 items = enumerate(qmsg)
403 else:
404 items = qmsg.items()
406 for k, v in items:
407 if isinstance(v, (dict, list)):
408 qmsg[k] = filter_qmp(v, filter_fn)
409 else:
410 qmsg[k] = filter_fn(k, v)
411 return qmsg
413 def filter_testfiles(msg):
414 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
415 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
416 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
418 def filter_qmp_testfiles(qmsg):
419 def _filter(_key, value):
420 if is_str(value):
421 return filter_testfiles(value)
422 return value
423 return filter_qmp(qmsg, _filter)
425 def filter_virtio_scsi(output: str) -> str:
426 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
428 def filter_qmp_virtio_scsi(qmsg):
429 def _filter(_key, value):
430 if is_str(value):
431 return filter_virtio_scsi(value)
432 return value
433 return filter_qmp(qmsg, _filter)
435 def filter_generated_node_ids(msg):
436 return re.sub("#block[0-9]+", "NODE_NAME", msg)
438 def filter_img_info(output, filename):
439 lines = []
440 for line in output.split('\n'):
441 if 'disk size' in line or 'actual-size' in line:
442 continue
443 line = line.replace(filename, 'TEST_IMG')
444 line = filter_testfiles(line)
445 line = line.replace(imgfmt, 'IMGFMT')
446 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
447 line = re.sub('uuid: [-a-f0-9]+',
448 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
449 line)
450 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
451 lines.append(line)
452 return '\n'.join(lines)
454 def filter_imgfmt(msg):
455 return msg.replace(imgfmt, 'IMGFMT')
457 def filter_qmp_imgfmt(qmsg):
458 def _filter(_key, value):
459 if is_str(value):
460 return filter_imgfmt(value)
461 return value
462 return filter_qmp(qmsg, _filter)
464 def filter_nbd_exports(output: str) -> str:
465 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
468 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
470 def log(msg: Msg,
471 filters: Iterable[Callable[[Msg], Msg]] = (),
472 indent: Optional[int] = None) -> None:
474 Logs either a string message or a JSON serializable message (like QMP).
475 If indent is provided, JSON serializable messages are pretty-printed.
477 for flt in filters:
478 msg = flt(msg)
479 if isinstance(msg, (dict, list)):
480 # Don't sort if it's already sorted
481 do_sort = not isinstance(msg, OrderedDict)
482 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
483 else:
484 test_logger.info(msg)
486 class Timeout:
487 def __init__(self, seconds, errmsg="Timeout"):
488 self.seconds = seconds
489 self.errmsg = errmsg
490 def __enter__(self):
491 if qemu_gdb or qemu_valgrind:
492 return self
493 signal.signal(signal.SIGALRM, self.timeout)
494 signal.setitimer(signal.ITIMER_REAL, self.seconds)
495 return self
496 def __exit__(self, exc_type, value, traceback):
497 if qemu_gdb or qemu_valgrind:
498 return False
499 signal.setitimer(signal.ITIMER_REAL, 0)
500 return False
501 def timeout(self, signum, frame):
502 raise Exception(self.errmsg)
504 def file_pattern(name):
505 return "{0}-{1}".format(os.getpid(), name)
507 class FilePath:
509 Context manager generating multiple file names. The generated files are
510 removed when exiting the context.
512 Example usage:
514 with FilePath('a.img', 'b.img') as (img_a, img_b):
515 # Use img_a and img_b here...
517 # a.img and b.img are automatically removed here.
519 By default images are created in iotests.test_dir. To create sockets use
520 iotests.sock_dir:
522 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
524 For convenience, calling with one argument yields a single file instead of
525 a tuple with one item.
528 def __init__(self, *names, base_dir=test_dir):
529 self.paths = [os.path.join(base_dir, file_pattern(name))
530 for name in names]
532 def __enter__(self):
533 if len(self.paths) == 1:
534 return self.paths[0]
535 else:
536 return self.paths
538 def __exit__(self, exc_type, exc_val, exc_tb):
539 for path in self.paths:
540 try:
541 os.remove(path)
542 except OSError:
543 pass
544 return False
547 def try_remove(img):
548 try:
549 os.remove(img)
550 except OSError:
551 pass
553 def file_path_remover():
554 for path in reversed(file_path_remover.paths):
555 try_remove(path)
558 def file_path(*names, base_dir=test_dir):
559 ''' Another way to get auto-generated filename that cleans itself up.
561 Use is as simple as:
563 img_a, img_b = file_path('a.img', 'b.img')
564 sock = file_path('socket')
567 if not hasattr(file_path_remover, 'paths'):
568 file_path_remover.paths = []
569 atexit.register(file_path_remover)
571 paths = []
572 for name in names:
573 filename = file_pattern(name)
574 path = os.path.join(base_dir, filename)
575 file_path_remover.paths.append(path)
576 paths.append(path)
578 return paths[0] if len(paths) == 1 else paths
580 def remote_filename(path):
581 if imgproto == 'file':
582 return path
583 elif imgproto == 'ssh':
584 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
585 else:
586 raise Exception("Protocol %s not supported" % (imgproto))
588 class VM(qtest.QEMUQtestMachine):
589 '''A QEMU VM'''
591 def __init__(self, path_suffix=''):
592 name = "qemu%s-%d" % (path_suffix, os.getpid())
593 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
594 super().__init__(qemu_prog, qemu_opts, wrapper=qemu_gdb,
595 name=name,
596 base_temp_dir=test_dir,
597 socket_scm_helper=socket_scm_helper,
598 sock_dir=sock_dir, qmp_timer=timer)
599 self._num_drives = 0
601 def _post_shutdown(self) -> None:
602 super()._post_shutdown()
603 if not qemu_valgrind or not self._popen:
604 return
605 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
606 if self.exitcode() == 99:
607 with open(valgrind_filename) as f:
608 print(f.read())
609 else:
610 os.remove(valgrind_filename)
612 def add_object(self, opts):
613 self._args.append('-object')
614 self._args.append(opts)
615 return self
617 def add_device(self, opts):
618 self._args.append('-device')
619 self._args.append(opts)
620 return self
622 def add_drive_raw(self, opts):
623 self._args.append('-drive')
624 self._args.append(opts)
625 return self
627 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
628 '''Add a virtio-blk drive to the VM'''
629 options = ['if=%s' % interface,
630 'id=drive%d' % self._num_drives]
632 if path is not None:
633 options.append('file=%s' % path)
634 options.append('format=%s' % img_format)
635 options.append('cache=%s' % cachemode)
636 options.append('aio=%s' % aiomode)
638 if opts:
639 options.append(opts)
641 if img_format == 'luks' and 'key-secret' not in opts:
642 # default luks support
643 if luks_default_secret_object not in self._args:
644 self.add_object(luks_default_secret_object)
646 options.append(luks_default_key_secret_opt)
648 self._args.append('-drive')
649 self._args.append(','.join(options))
650 self._num_drives += 1
651 return self
653 def add_blockdev(self, opts):
654 self._args.append('-blockdev')
655 if isinstance(opts, str):
656 self._args.append(opts)
657 else:
658 self._args.append(','.join(opts))
659 return self
661 def add_incoming(self, addr):
662 self._args.append('-incoming')
663 self._args.append(addr)
664 return self
666 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
667 cmd = 'human-monitor-command'
668 kwargs: Dict[str, Any] = {'command-line': command_line}
669 if use_log:
670 return self.qmp_log(cmd, **kwargs)
671 else:
672 return self.qmp(cmd, **kwargs)
674 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
675 """Pause drive r/w operations"""
676 if not event:
677 self.pause_drive(drive, "read_aio")
678 self.pause_drive(drive, "write_aio")
679 return
680 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
682 def resume_drive(self, drive: str) -> None:
683 """Resume drive r/w operations"""
684 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
686 def hmp_qemu_io(self, drive: str, cmd: str,
687 use_log: bool = False) -> QMPMessage:
688 """Write to a given drive using an HMP command"""
689 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
691 def flatten_qmp_object(self, obj, output=None, basestr=''):
692 if output is None:
693 output = dict()
694 if isinstance(obj, list):
695 for i, item in enumerate(obj):
696 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
697 elif isinstance(obj, dict):
698 for key in obj:
699 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
700 else:
701 output[basestr[:-1]] = obj # Strip trailing '.'
702 return output
704 def qmp_to_opts(self, obj):
705 obj = self.flatten_qmp_object(obj)
706 output_list = list()
707 for key in obj:
708 output_list += [key + '=' + obj[key]]
709 return ','.join(output_list)
711 def get_qmp_events_filtered(self, wait=60.0):
712 result = []
713 for ev in self.get_qmp_events(wait=wait):
714 result.append(filter_qmp_event(ev))
715 return result
717 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
718 full_cmd = OrderedDict((
719 ("execute", cmd),
720 ("arguments", ordered_qmp(kwargs))
722 log(full_cmd, filters, indent=indent)
723 result = self.qmp(cmd, **kwargs)
724 log(result, filters, indent=indent)
725 return result
727 # Returns None on success, and an error string on failure
728 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
729 pre_finalize=None, cancel=False, wait=60.0):
731 run_job moves a job from creation through to dismissal.
733 :param job: String. ID of recently-launched job
734 :param auto_finalize: Bool. True if the job was launched with
735 auto_finalize. Defaults to True.
736 :param auto_dismiss: Bool. True if the job was launched with
737 auto_dismiss=True. Defaults to False.
738 :param pre_finalize: Callback. A callable that takes no arguments to be
739 invoked prior to issuing job-finalize, if any.
740 :param cancel: Bool. When true, cancels the job after the pre_finalize
741 callback.
742 :param wait: Float. Timeout value specifying how long to wait for any
743 event, in seconds. Defaults to 60.0.
745 match_device = {'data': {'device': job}}
746 match_id = {'data': {'id': job}}
747 events = [
748 ('BLOCK_JOB_COMPLETED', match_device),
749 ('BLOCK_JOB_CANCELLED', match_device),
750 ('BLOCK_JOB_ERROR', match_device),
751 ('BLOCK_JOB_READY', match_device),
752 ('BLOCK_JOB_PENDING', match_id),
753 ('JOB_STATUS_CHANGE', match_id)
755 error = None
756 while True:
757 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
758 if ev['event'] != 'JOB_STATUS_CHANGE':
759 log(ev)
760 continue
761 status = ev['data']['status']
762 if status == 'aborting':
763 result = self.qmp('query-jobs')
764 for j in result['return']:
765 if j['id'] == job:
766 error = j['error']
767 log('Job failed: %s' % (j['error']))
768 elif status == 'ready':
769 self.qmp_log('job-complete', id=job)
770 elif status == 'pending' and not auto_finalize:
771 if pre_finalize:
772 pre_finalize()
773 if cancel:
774 self.qmp_log('job-cancel', id=job)
775 else:
776 self.qmp_log('job-finalize', id=job)
777 elif status == 'concluded' and not auto_dismiss:
778 self.qmp_log('job-dismiss', id=job)
779 elif status == 'null':
780 return error
782 # Returns None on success, and an error string on failure
783 def blockdev_create(self, options, job_id='job0', filters=None):
784 if filters is None:
785 filters = [filter_qmp_testfiles]
786 result = self.qmp_log('blockdev-create', filters=filters,
787 job_id=job_id, options=options)
789 if 'return' in result:
790 assert result['return'] == {}
791 job_result = self.run_job(job_id)
792 else:
793 job_result = result['error']
795 log("")
796 return job_result
798 def enable_migration_events(self, name):
799 log('Enabling migration QMP events on %s...' % name)
800 log(self.qmp('migrate-set-capabilities', capabilities=[
802 'capability': 'events',
803 'state': True
807 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
808 while True:
809 event = self.event_wait('MIGRATION')
810 # We use the default timeout, and with a timeout, event_wait()
811 # never returns None
812 assert event
814 log(event, filters=[filter_qmp_event])
815 if event['data']['status'] in ('completed', 'failed'):
816 break
818 if event['data']['status'] == 'completed':
819 # The event may occur in finish-migrate, so wait for the expected
820 # post-migration runstate
821 runstate = None
822 while runstate != expect_runstate:
823 runstate = self.qmp('query-status')['return']['status']
824 return True
825 else:
826 return False
828 def node_info(self, node_name):
829 nodes = self.qmp('query-named-block-nodes')
830 for x in nodes['return']:
831 if x['node-name'] == node_name:
832 return x
833 return None
835 def query_bitmaps(self):
836 res = self.qmp("query-named-block-nodes")
837 return {device['node-name']: device['dirty-bitmaps']
838 for device in res['return'] if 'dirty-bitmaps' in device}
840 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
842 get a specific bitmap from the object returned by query_bitmaps.
843 :param recording: If specified, filter results by the specified value.
844 :param bitmaps: If specified, use it instead of call query_bitmaps()
846 if bitmaps is None:
847 bitmaps = self.query_bitmaps()
849 for bitmap in bitmaps[node_name]:
850 if bitmap.get('name', '') == bitmap_name:
851 if recording is None or bitmap.get('recording') == recording:
852 return bitmap
853 return None
855 def check_bitmap_status(self, node_name, bitmap_name, fields):
856 ret = self.get_bitmap(node_name, bitmap_name)
858 return fields.items() <= ret.items()
860 def assert_block_path(self, root, path, expected_node, graph=None):
862 Check whether the node under the given path in the block graph
863 is @expected_node.
865 @root is the node name of the node where the @path is rooted.
867 @path is a string that consists of child names separated by
868 slashes. It must begin with a slash.
870 Examples for @root + @path:
871 - root="qcow2-node", path="/backing/file"
872 - root="quorum-node", path="/children.2/file"
874 Hypothetically, @path could be empty, in which case it would
875 point to @root. However, in practice this case is not useful
876 and hence not allowed.
878 @expected_node may be None. (All elements of the path but the
879 leaf must still exist.)
881 @graph may be None or the result of an x-debug-query-block-graph
882 call that has already been performed.
884 if graph is None:
885 graph = self.qmp('x-debug-query-block-graph')['return']
887 iter_path = iter(path.split('/'))
889 # Must start with a /
890 assert next(iter_path) == ''
892 node = next((node for node in graph['nodes'] if node['name'] == root),
893 None)
895 # An empty @path is not allowed, so the root node must be present
896 assert node is not None, 'Root node %s not found' % root
898 for child_name in iter_path:
899 assert node is not None, 'Cannot follow path %s%s' % (root, path)
901 try:
902 node_id = next(edge['child'] for edge in graph['edges']
903 if (edge['parent'] == node['id'] and
904 edge['name'] == child_name))
906 node = next(node for node in graph['nodes']
907 if node['id'] == node_id)
909 except StopIteration:
910 node = None
912 if node is None:
913 assert expected_node is None, \
914 'No node found under %s (but expected %s)' % \
915 (path, expected_node)
916 else:
917 assert node['name'] == expected_node, \
918 'Found node %s under %s (but expected %s)' % \
919 (node['name'], path, expected_node)
921 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
923 class QMPTestCase(unittest.TestCase):
924 '''Abstract base class for QMP test cases'''
926 def __init__(self, *args, **kwargs):
927 super().__init__(*args, **kwargs)
928 # Many users of this class set a VM property we rely on heavily
929 # in the methods below.
930 self.vm = None
932 def dictpath(self, d, path):
933 '''Traverse a path in a nested dict'''
934 for component in path.split('/'):
935 m = index_re.match(component)
936 if m:
937 component, idx = m.groups()
938 idx = int(idx)
940 if not isinstance(d, dict) or component not in d:
941 self.fail(f'failed path traversal for "{path}" in "{d}"')
942 d = d[component]
944 if m:
945 if not isinstance(d, list):
946 self.fail(f'path component "{component}" in "{path}" '
947 f'is not a list in "{d}"')
948 try:
949 d = d[idx]
950 except IndexError:
951 self.fail(f'invalid index "{idx}" in path "{path}" '
952 f'in "{d}"')
953 return d
955 def assert_qmp_absent(self, d, path):
956 try:
957 result = self.dictpath(d, path)
958 except AssertionError:
959 return
960 self.fail('path "%s" has value "%s"' % (path, str(result)))
962 def assert_qmp(self, d, path, value):
963 '''Assert that the value for a specific path in a QMP dict
964 matches. When given a list of values, assert that any of
965 them matches.'''
967 result = self.dictpath(d, path)
969 # [] makes no sense as a list of valid values, so treat it as
970 # an actual single value.
971 if isinstance(value, list) and value != []:
972 for v in value:
973 if result == v:
974 return
975 self.fail('no match for "%s" in %s' % (str(result), str(value)))
976 else:
977 self.assertEqual(result, value,
978 '"%s" is "%s", expected "%s"'
979 % (path, str(result), str(value)))
981 def assert_no_active_block_jobs(self):
982 result = self.vm.qmp('query-block-jobs')
983 self.assert_qmp(result, 'return', [])
985 def assert_has_block_node(self, node_name=None, file_name=None):
986 """Issue a query-named-block-nodes and assert node_name and/or
987 file_name is present in the result"""
988 def check_equal_or_none(a, b):
989 return a is None or b is None or a == b
990 assert node_name or file_name
991 result = self.vm.qmp('query-named-block-nodes')
992 for x in result["return"]:
993 if check_equal_or_none(x.get("node-name"), node_name) and \
994 check_equal_or_none(x.get("file"), file_name):
995 return
996 self.fail("Cannot find %s %s in result:\n%s" %
997 (node_name, file_name, result))
999 def assert_json_filename_equal(self, json_filename, reference):
1000 '''Asserts that the given filename is a json: filename and that its
1001 content is equal to the given reference object'''
1002 self.assertEqual(json_filename[:5], 'json:')
1003 self.assertEqual(
1004 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1005 self.vm.flatten_qmp_object(reference)
1008 def cancel_and_wait(self, drive='drive0', force=False,
1009 resume=False, wait=60.0):
1010 '''Cancel a block job and wait for it to finish, returning the event'''
1011 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1012 self.assert_qmp(result, 'return', {})
1014 if resume:
1015 self.vm.resume_drive(drive)
1017 cancelled = False
1018 result = None
1019 while not cancelled:
1020 for event in self.vm.get_qmp_events(wait=wait):
1021 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1022 event['event'] == 'BLOCK_JOB_CANCELLED':
1023 self.assert_qmp(event, 'data/device', drive)
1024 result = event
1025 cancelled = True
1026 elif event['event'] == 'JOB_STATUS_CHANGE':
1027 self.assert_qmp(event, 'data/id', drive)
1030 self.assert_no_active_block_jobs()
1031 return result
1033 def wait_until_completed(self, drive='drive0', check_offset=True,
1034 wait=60.0, error=None):
1035 '''Wait for a block job to finish, returning the event'''
1036 while True:
1037 for event in self.vm.get_qmp_events(wait=wait):
1038 if event['event'] == 'BLOCK_JOB_COMPLETED':
1039 self.assert_qmp(event, 'data/device', drive)
1040 if error is None:
1041 self.assert_qmp_absent(event, 'data/error')
1042 if check_offset:
1043 self.assert_qmp(event, 'data/offset',
1044 event['data']['len'])
1045 else:
1046 self.assert_qmp(event, 'data/error', error)
1047 self.assert_no_active_block_jobs()
1048 return event
1049 if event['event'] == 'JOB_STATUS_CHANGE':
1050 self.assert_qmp(event, 'data/id', drive)
1052 def wait_ready(self, drive='drive0'):
1053 """Wait until a BLOCK_JOB_READY event, and return the event."""
1054 return self.vm.events_wait([
1055 ('BLOCK_JOB_READY',
1056 {'data': {'type': 'mirror', 'device': drive}}),
1057 ('BLOCK_JOB_READY',
1058 {'data': {'type': 'commit', 'device': drive}})
1061 def wait_ready_and_cancel(self, drive='drive0'):
1062 self.wait_ready(drive=drive)
1063 event = self.cancel_and_wait(drive=drive)
1064 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1065 self.assert_qmp(event, 'data/type', 'mirror')
1066 self.assert_qmp(event, 'data/offset', event['data']['len'])
1068 def complete_and_wait(self, drive='drive0', wait_ready=True,
1069 completion_error=None):
1070 '''Complete a block job and wait for it to finish'''
1071 if wait_ready:
1072 self.wait_ready(drive=drive)
1074 result = self.vm.qmp('block-job-complete', device=drive)
1075 self.assert_qmp(result, 'return', {})
1077 event = self.wait_until_completed(drive=drive, error=completion_error)
1078 self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1080 def pause_wait(self, job_id='job0'):
1081 with Timeout(3, "Timeout waiting for job to pause"):
1082 while True:
1083 result = self.vm.qmp('query-block-jobs')
1084 found = False
1085 for job in result['return']:
1086 if job['device'] == job_id:
1087 found = True
1088 if job['paused'] and not job['busy']:
1089 return job
1090 break
1091 assert found
1093 def pause_job(self, job_id='job0', wait=True):
1094 result = self.vm.qmp('block-job-pause', device=job_id)
1095 self.assert_qmp(result, 'return', {})
1096 if wait:
1097 return self.pause_wait(job_id)
1098 return result
1100 def case_skip(self, reason):
1101 '''Skip this test case'''
1102 case_notrun(reason)
1103 self.skipTest(reason)
1106 def notrun(reason):
1107 '''Skip this test suite'''
1108 # Each test in qemu-iotests has a number ("seq")
1109 seq = os.path.basename(sys.argv[0])
1111 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1112 logger.warning("%s not run: %s", seq, reason)
1113 sys.exit(0)
1115 def case_notrun(reason):
1116 '''Mark this test case as not having been run (without actually
1117 skipping it, that is left to the caller). See
1118 QMPTestCase.case_skip() for a variant that actually skips the
1119 current test case.'''
1121 # Each test in qemu-iotests has a number ("seq")
1122 seq = os.path.basename(sys.argv[0])
1124 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1125 ' [case not run] ' + reason + '\n')
1127 def _verify_image_format(supported_fmts: Sequence[str] = (),
1128 unsupported_fmts: Sequence[str] = ()) -> None:
1129 if 'generic' in supported_fmts and \
1130 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1131 # similar to
1132 # _supported_fmt generic
1133 # for bash tests
1134 supported_fmts = ()
1136 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1137 if not_sup or (imgfmt in unsupported_fmts):
1138 notrun('not suitable for this image format: %s' % imgfmt)
1140 if imgfmt == 'luks':
1141 verify_working_luks()
1143 def _verify_protocol(supported: Sequence[str] = (),
1144 unsupported: Sequence[str] = ()) -> None:
1145 assert not (supported and unsupported)
1147 if 'generic' in supported:
1148 return
1150 not_sup = supported and (imgproto not in supported)
1151 if not_sup or (imgproto in unsupported):
1152 notrun('not suitable for this protocol: %s' % imgproto)
1154 def _verify_platform(supported: Sequence[str] = (),
1155 unsupported: Sequence[str] = ()) -> None:
1156 if any((sys.platform.startswith(x) for x in unsupported)):
1157 notrun('not suitable for this OS: %s' % sys.platform)
1159 if supported:
1160 if not any((sys.platform.startswith(x) for x in supported)):
1161 notrun('not suitable for this OS: %s' % sys.platform)
1163 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1164 if supported_cache_modes and (cachemode not in supported_cache_modes):
1165 notrun('not suitable for this cache mode: %s' % cachemode)
1167 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1168 if supported_aio_modes and (aiomode not in supported_aio_modes):
1169 notrun('not suitable for this aio mode: %s' % aiomode)
1171 def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1172 usf_list = list(set(required_formats) - set(supported_formats()))
1173 if usf_list:
1174 notrun(f'formats {usf_list} are not whitelisted')
1177 def _verify_virtio_blk() -> None:
1178 out = qemu_pipe('-M', 'none', '-device', 'help')
1179 if 'virtio-blk' not in out:
1180 notrun('Missing virtio-blk in QEMU binary')
1182 def _verify_virtio_scsi_pci_or_ccw() -> None:
1183 out = qemu_pipe('-M', 'none', '-device', 'help')
1184 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1185 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1188 def supports_quorum():
1189 return 'quorum' in qemu_img_pipe('--help')
1191 def verify_quorum():
1192 '''Skip test suite if quorum support is not available'''
1193 if not supports_quorum():
1194 notrun('quorum support missing')
1196 def has_working_luks() -> Tuple[bool, str]:
1198 Check whether our LUKS driver can actually create images
1199 (this extends to LUKS encryption for qcow2).
1201 If not, return the reason why.
1204 img_file = f'{test_dir}/luks-test.luks'
1205 (output, status) = \
1206 qemu_img_pipe_and_status('create', '-f', 'luks',
1207 '--object', luks_default_secret_object,
1208 '-o', luks_default_key_secret_opt,
1209 '-o', 'iter-time=10',
1210 img_file, '1G')
1211 try:
1212 os.remove(img_file)
1213 except OSError:
1214 pass
1216 if status != 0:
1217 reason = output
1218 for line in output.splitlines():
1219 if img_file + ':' in line:
1220 reason = line.split(img_file + ':', 1)[1].strip()
1221 break
1223 return (False, reason)
1224 else:
1225 return (True, '')
1227 def verify_working_luks():
1229 Skip test suite if LUKS does not work
1231 (working, reason) = has_working_luks()
1232 if not working:
1233 notrun(reason)
1235 def qemu_pipe(*args: str) -> str:
1237 Run qemu with an option to print something and exit (e.g. a help option).
1239 :return: QEMU's stdout output.
1241 full_args = [qemu_prog] + qemu_opts + list(args)
1242 output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1243 return output
1245 def supported_formats(read_only=False):
1246 '''Set 'read_only' to True to check ro-whitelist
1247 Otherwise, rw-whitelist is checked'''
1249 if not hasattr(supported_formats, "formats"):
1250 supported_formats.formats = {}
1252 if read_only not in supported_formats.formats:
1253 format_message = qemu_pipe("-drive", "format=help")
1254 line = 1 if read_only else 0
1255 supported_formats.formats[read_only] = \
1256 format_message.splitlines()[line].split(":")[1].split()
1258 return supported_formats.formats[read_only]
1260 def skip_if_unsupported(required_formats=(), read_only=False):
1261 '''Skip Test Decorator
1262 Runs the test if all the required formats are whitelisted'''
1263 def skip_test_decorator(func):
1264 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1265 **kwargs: Dict[str, Any]) -> None:
1266 if callable(required_formats):
1267 fmts = required_formats(test_case)
1268 else:
1269 fmts = required_formats
1271 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1272 if usf_list:
1273 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1274 test_case.case_skip(msg)
1275 else:
1276 func(test_case, *args, **kwargs)
1277 return func_wrapper
1278 return skip_test_decorator
1280 def skip_for_formats(formats: Sequence[str] = ()) \
1281 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1282 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1283 '''Skip Test Decorator
1284 Skips the test for the given formats'''
1285 def skip_test_decorator(func):
1286 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1287 **kwargs: Dict[str, Any]) -> None:
1288 if imgfmt in formats:
1289 msg = f'{test_case}: Skipped for format {imgfmt}'
1290 test_case.case_skip(msg)
1291 else:
1292 func(test_case, *args, **kwargs)
1293 return func_wrapper
1294 return skip_test_decorator
1296 def skip_if_user_is_root(func):
1297 '''Skip Test Decorator
1298 Runs the test only without root permissions'''
1299 def func_wrapper(*args, **kwargs):
1300 if os.getuid() == 0:
1301 case_notrun('{}: cannot be run as root'.format(args[0]))
1302 return None
1303 else:
1304 return func(*args, **kwargs)
1305 return func_wrapper
1307 # We need to filter out the time taken from the output so that
1308 # qemu-iotest can reliably diff the results against master output,
1309 # and hide skipped tests from the reference output.
1311 class ReproducibleTestResult(unittest.TextTestResult):
1312 def addSkip(self, test, reason):
1313 # Same as TextTestResult, but print dot instead of "s"
1314 unittest.TestResult.addSkip(self, test, reason)
1315 if self.showAll:
1316 self.stream.writeln("skipped {0!r}".format(reason))
1317 elif self.dots:
1318 self.stream.write(".")
1319 self.stream.flush()
1321 class ReproducibleStreamWrapper:
1322 def __init__(self, stream: TextIO):
1323 self.stream = stream
1325 def __getattr__(self, attr):
1326 if attr in ('stream', '__getstate__'):
1327 raise AttributeError(attr)
1328 return getattr(self.stream, attr)
1330 def write(self, arg=None):
1331 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1332 arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1333 self.stream.write(arg)
1335 class ReproducibleTestRunner(unittest.TextTestRunner):
1336 def __init__(self, stream: Optional[TextIO] = None,
1337 resultclass: Type[unittest.TestResult] = ReproducibleTestResult,
1338 **kwargs: Any) -> None:
1339 rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1340 super().__init__(stream=rstream, # type: ignore
1341 descriptions=True,
1342 resultclass=resultclass,
1343 **kwargs)
1345 def execute_unittest(argv: List[str], debug: bool = False) -> None:
1346 """Executes unittests within the calling module."""
1348 # Some tests have warnings, especially ResourceWarnings for unclosed
1349 # files and sockets. Ignore them for now to ensure reproducibility of
1350 # the test output.
1351 unittest.main(argv=argv,
1352 testRunner=ReproducibleTestRunner,
1353 verbosity=2 if debug else 1,
1354 warnings=None if sys.warnoptions else 'ignore')
1356 def execute_setup_common(supported_fmts: Sequence[str] = (),
1357 supported_platforms: Sequence[str] = (),
1358 supported_cache_modes: Sequence[str] = (),
1359 supported_aio_modes: Sequence[str] = (),
1360 unsupported_fmts: Sequence[str] = (),
1361 supported_protocols: Sequence[str] = (),
1362 unsupported_protocols: Sequence[str] = (),
1363 required_fmts: Sequence[str] = ()) -> bool:
1365 Perform necessary setup for either script-style or unittest-style tests.
1367 :return: Bool; Whether or not debug mode has been requested via the CLI.
1369 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1371 debug = '-d' in sys.argv
1372 if debug:
1373 sys.argv.remove('-d')
1374 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1376 _verify_image_format(supported_fmts, unsupported_fmts)
1377 _verify_protocol(supported_protocols, unsupported_protocols)
1378 _verify_platform(supported=supported_platforms)
1379 _verify_cache_mode(supported_cache_modes)
1380 _verify_aio_mode(supported_aio_modes)
1381 _verify_formats(required_fmts)
1382 _verify_virtio_blk()
1384 return debug
1386 def execute_test(*args, test_function=None, **kwargs):
1387 """Run either unittest or script-style tests."""
1389 debug = execute_setup_common(*args, **kwargs)
1390 if not test_function:
1391 execute_unittest(sys.argv, debug)
1392 else:
1393 test_function()
1395 def activate_logging():
1396 """Activate iotests.log() output to stdout for script-style tests."""
1397 handler = logging.StreamHandler(stream=sys.stdout)
1398 formatter = logging.Formatter('%(message)s')
1399 handler.setFormatter(formatter)
1400 test_logger.addHandler(handler)
1401 test_logger.setLevel(logging.INFO)
1402 test_logger.propagate = False
1404 # This is called from script-style iotests without a single point of entry
1405 def script_initialize(*args, **kwargs):
1406 """Initialize script-style tests without running any tests."""
1407 activate_logging()
1408 execute_setup_common(*args, **kwargs)
1410 # This is called from script-style iotests with a single point of entry
1411 def script_main(test_function, *args, **kwargs):
1412 """Run script-style tests outside of the unittest framework"""
1413 activate_logging()
1414 execute_test(*args, test_function=test_function, **kwargs)
1416 # This is called from unittest style iotests
1417 def main(*args, **kwargs):
1418 """Run tests using the unittest framework"""
1419 execute_test(*args, **kwargs)