hw: usb: hcd-ohci: check for processed TD before retire
[qemu/ar7.git] / tests / qemu-iotests / iotests.py
blob91e4a5712609424c4b79dd6604f13e61e7e376a5
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import atexit
20 from collections import OrderedDict
21 import faulthandler
22 import io
23 import json
24 import logging
25 import os
26 import re
27 import signal
28 import struct
29 import subprocess
30 import sys
31 import time
32 from typing import (Any, Callable, Dict, Iterable,
33 List, Optional, Sequence, Tuple, TypeVar)
34 import unittest
36 from contextlib import contextmanager
38 # pylint: disable=import-error, wrong-import-position
39 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40 from qemu import qtest
41 from qemu.qmp import QMPMessage
43 assert sys.version_info >= (3, 6)
45 # Use this logger for logging messages directly from the iotests module
46 logger = logging.getLogger('qemu.iotests')
47 logger.addHandler(logging.NullHandler())
49 # Use this logger for messages that ought to be used for diff output.
50 test_logger = logging.getLogger('qemu.iotests.diff_io')
53 faulthandler.enable()
55 # This will not work if arguments contain spaces but is necessary if we
56 # want to support the override options that ./check supports.
57 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
58 if os.environ.get('QEMU_IMG_OPTIONS'):
59 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
61 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
62 if os.environ.get('QEMU_IO_OPTIONS'):
63 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
65 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
66 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
67 qemu_io_args_no_fmt += \
68 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
70 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
71 if os.environ.get('QEMU_NBD_OPTIONS'):
72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
74 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
77 imgfmt = os.environ.get('IMGFMT', 'raw')
78 imgproto = os.environ.get('IMGPROTO', 'file')
79 test_dir = os.environ.get('TEST_DIR')
80 sock_dir = os.environ.get('SOCK_DIR')
81 output_dir = os.environ.get('OUTPUT_DIR', '.')
82 cachemode = os.environ.get('CACHEMODE')
83 aiomode = os.environ.get('AIOMODE')
84 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
86 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
88 luks_default_secret_object = 'secret,id=keysec0,data=' + \
89 os.environ.get('IMGKEYSECRET', '')
90 luks_default_key_secret_opt = 'key-secret=keysec0'
93 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
94 """
95 Run qemu-img and return both its output and its exit code
96 """
97 subp = subprocess.Popen(qemu_img_args + list(args),
98 stdout=subprocess.PIPE,
99 stderr=subprocess.STDOUT,
100 universal_newlines=True)
101 output = subp.communicate()[0]
102 if subp.returncode < 0:
103 sys.stderr.write('qemu-img received signal %i: %s\n'
104 % (-subp.returncode,
105 ' '.join(qemu_img_args + list(args))))
106 return (output, subp.returncode)
108 def qemu_img(*args: str) -> int:
109 '''Run qemu-img and return the exit code'''
110 return qemu_img_pipe_and_status(*args)[1]
112 def ordered_qmp(qmsg, conv_keys=True):
113 # Dictionaries are not ordered prior to 3.6, therefore:
114 if isinstance(qmsg, list):
115 return [ordered_qmp(atom) for atom in qmsg]
116 if isinstance(qmsg, dict):
117 od = OrderedDict()
118 for k, v in sorted(qmsg.items()):
119 if conv_keys:
120 k = k.replace('_', '-')
121 od[k] = ordered_qmp(v, conv_keys=False)
122 return od
123 return qmsg
125 def qemu_img_create(*args):
126 args = list(args)
128 # default luks support
129 if '-f' in args and args[args.index('-f') + 1] == 'luks':
130 if '-o' in args:
131 i = args.index('-o')
132 if 'key-secret' not in args[i + 1]:
133 args[i + 1].append(luks_default_key_secret_opt)
134 args.insert(i + 2, '--object')
135 args.insert(i + 3, luks_default_secret_object)
136 else:
137 args = ['-o', luks_default_key_secret_opt,
138 '--object', luks_default_secret_object] + args
140 args.insert(0, 'create')
142 return qemu_img(*args)
144 def qemu_img_measure(*args):
145 return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
147 def qemu_img_check(*args):
148 return json.loads(qemu_img_pipe("check", "--output", "json", *args))
150 def qemu_img_verbose(*args):
151 '''Run qemu-img without suppressing its output and return the exit code'''
152 exitcode = subprocess.call(qemu_img_args + list(args))
153 if exitcode < 0:
154 sys.stderr.write('qemu-img received signal %i: %s\n'
155 % (-exitcode, ' '.join(qemu_img_args + list(args))))
156 return exitcode
158 def qemu_img_pipe(*args: str) -> str:
159 '''Run qemu-img and return its output'''
160 return qemu_img_pipe_and_status(*args)[0]
162 def qemu_img_log(*args):
163 result = qemu_img_pipe(*args)
164 log(result, filters=[filter_testfiles])
165 return result
167 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
168 args = ['info']
169 if imgopts:
170 args.append('--image-opts')
171 else:
172 args += ['-f', imgfmt]
173 args += extra_args
174 args.append(filename)
176 output = qemu_img_pipe(*args)
177 if not filter_path:
178 filter_path = filename
179 log(filter_img_info(output, filter_path))
181 def qemu_io(*args):
182 '''Run qemu-io and return the stdout data'''
183 args = qemu_io_args + list(args)
184 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
185 stderr=subprocess.STDOUT,
186 universal_newlines=True)
187 output = subp.communicate()[0]
188 if subp.returncode < 0:
189 sys.stderr.write('qemu-io received signal %i: %s\n'
190 % (-subp.returncode, ' '.join(args)))
191 return output
193 def qemu_io_log(*args):
194 result = qemu_io(*args)
195 log(result, filters=[filter_testfiles, filter_qemu_io])
196 return result
198 def qemu_io_silent(*args):
199 '''Run qemu-io and return the exit code, suppressing stdout'''
200 args = qemu_io_args + list(args)
201 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
202 if exitcode < 0:
203 sys.stderr.write('qemu-io received signal %i: %s\n' %
204 (-exitcode, ' '.join(args)))
205 return exitcode
207 def qemu_io_silent_check(*args):
208 '''Run qemu-io and return the true if subprocess returned 0'''
209 args = qemu_io_args + list(args)
210 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
211 stderr=subprocess.STDOUT)
212 return exitcode == 0
214 def get_virtio_scsi_device():
215 if qemu_default_machine == 's390-ccw-virtio':
216 return 'virtio-scsi-ccw'
217 return 'virtio-scsi-pci'
219 class QemuIoInteractive:
220 def __init__(self, *args):
221 self.args = qemu_io_args_no_fmt + list(args)
222 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
223 stdout=subprocess.PIPE,
224 stderr=subprocess.STDOUT,
225 universal_newlines=True)
226 out = self._p.stdout.read(9)
227 if out != 'qemu-io> ':
228 # Most probably qemu-io just failed to start.
229 # Let's collect the whole output and exit.
230 out += self._p.stdout.read()
231 self._p.wait(timeout=1)
232 raise ValueError(out)
234 def close(self):
235 self._p.communicate('q\n')
237 def _read_output(self):
238 pattern = 'qemu-io> '
239 n = len(pattern)
240 pos = 0
241 s = []
242 while pos != n:
243 c = self._p.stdout.read(1)
244 # check unexpected EOF
245 assert c != ''
246 s.append(c)
247 if c == pattern[pos]:
248 pos += 1
249 else:
250 pos = 0
252 return ''.join(s[:-n])
254 def cmd(self, cmd):
255 # quit command is in close(), '\n' is added automatically
256 assert '\n' not in cmd
257 cmd = cmd.strip()
258 assert cmd not in ('q', 'quit')
259 self._p.stdin.write(cmd + '\n')
260 self._p.stdin.flush()
261 return self._read_output()
264 def qemu_nbd(*args):
265 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
266 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
268 def qemu_nbd_early_pipe(*args):
269 '''Run qemu-nbd in daemon mode and return both the parent's exit code
270 and its output in case of an error'''
271 subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
272 stdout=subprocess.PIPE,
273 universal_newlines=True)
274 output = subp.communicate()[0]
275 if subp.returncode < 0:
276 sys.stderr.write('qemu-nbd received signal %i: %s\n' %
277 (-subp.returncode,
278 ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
280 return subp.returncode, output if subp.returncode else ''
282 @contextmanager
283 def qemu_nbd_popen(*args):
284 '''Context manager running qemu-nbd within the context'''
285 pid_file = file_path("pid")
287 cmd = list(qemu_nbd_args)
288 cmd.extend(('--persistent', '--pid-file', pid_file))
289 cmd.extend(args)
291 log('Start NBD server')
292 p = subprocess.Popen(cmd)
293 try:
294 while not os.path.exists(pid_file):
295 if p.poll() is not None:
296 raise RuntimeError(
297 "qemu-nbd terminated with exit code {}: {}"
298 .format(p.returncode, ' '.join(cmd)))
300 time.sleep(0.01)
301 yield
302 finally:
303 log('Kill NBD server')
304 p.kill()
305 p.wait()
307 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
308 '''Return True if two image files are identical'''
309 return qemu_img('compare', '-f', fmt1,
310 '-F', fmt2, img1, img2) == 0
312 def create_image(name, size):
313 '''Create a fully-allocated raw image with sector markers'''
314 file = open(name, 'wb')
315 i = 0
316 while i < size:
317 sector = struct.pack('>l504xl', i // 512, i // 512)
318 file.write(sector)
319 i = i + 512
320 file.close()
322 def image_size(img):
323 '''Return image's virtual size'''
324 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
325 return json.loads(r)['virtual-size']
327 def is_str(val):
328 return isinstance(val, str)
330 test_dir_re = re.compile(r"%s" % test_dir)
331 def filter_test_dir(msg):
332 return test_dir_re.sub("TEST_DIR", msg)
334 win32_re = re.compile(r"\r")
335 def filter_win32(msg):
336 return win32_re.sub("", msg)
338 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
339 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
340 r"and [0-9\/.inf]* ops\/sec\)")
341 def filter_qemu_io(msg):
342 msg = filter_win32(msg)
343 return qemu_io_re.sub("X ops; XX:XX:XX.X "
344 "(XXX YYY/sec and XXX ops/sec)", msg)
346 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
347 def filter_chown(msg):
348 return chown_re.sub("chown UID:GID", msg)
350 def filter_qmp_event(event):
351 '''Filter a QMP event dict'''
352 event = dict(event)
353 if 'timestamp' in event:
354 event['timestamp']['seconds'] = 'SECS'
355 event['timestamp']['microseconds'] = 'USECS'
356 return event
358 def filter_qmp(qmsg, filter_fn):
359 '''Given a string filter, filter a QMP object's values.
360 filter_fn takes a (key, value) pair.'''
361 # Iterate through either lists or dicts;
362 if isinstance(qmsg, list):
363 items = enumerate(qmsg)
364 else:
365 items = qmsg.items()
367 for k, v in items:
368 if isinstance(v, (dict, list)):
369 qmsg[k] = filter_qmp(v, filter_fn)
370 else:
371 qmsg[k] = filter_fn(k, v)
372 return qmsg
374 def filter_testfiles(msg):
375 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
376 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
377 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
379 def filter_qmp_testfiles(qmsg):
380 def _filter(_key, value):
381 if is_str(value):
382 return filter_testfiles(value)
383 return value
384 return filter_qmp(qmsg, _filter)
386 def filter_generated_node_ids(msg):
387 return re.sub("#block[0-9]+", "NODE_NAME", msg)
389 def filter_img_info(output, filename):
390 lines = []
391 for line in output.split('\n'):
392 if 'disk size' in line or 'actual-size' in line:
393 continue
394 line = line.replace(filename, 'TEST_IMG')
395 line = filter_testfiles(line)
396 line = line.replace(imgfmt, 'IMGFMT')
397 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
398 line = re.sub('uuid: [-a-f0-9]+',
399 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
400 line)
401 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
402 lines.append(line)
403 return '\n'.join(lines)
405 def filter_imgfmt(msg):
406 return msg.replace(imgfmt, 'IMGFMT')
408 def filter_qmp_imgfmt(qmsg):
409 def _filter(_key, value):
410 if is_str(value):
411 return filter_imgfmt(value)
412 return value
413 return filter_qmp(qmsg, _filter)
416 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
418 def log(msg: Msg,
419 filters: Iterable[Callable[[Msg], Msg]] = (),
420 indent: Optional[int] = None) -> None:
422 Logs either a string message or a JSON serializable message (like QMP).
423 If indent is provided, JSON serializable messages are pretty-printed.
425 for flt in filters:
426 msg = flt(msg)
427 if isinstance(msg, (dict, list)):
428 # Don't sort if it's already sorted
429 do_sort = not isinstance(msg, OrderedDict)
430 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
431 else:
432 test_logger.info(msg)
434 class Timeout:
435 def __init__(self, seconds, errmsg="Timeout"):
436 self.seconds = seconds
437 self.errmsg = errmsg
438 def __enter__(self):
439 signal.signal(signal.SIGALRM, self.timeout)
440 signal.setitimer(signal.ITIMER_REAL, self.seconds)
441 return self
442 def __exit__(self, exc_type, value, traceback):
443 signal.setitimer(signal.ITIMER_REAL, 0)
444 return False
445 def timeout(self, signum, frame):
446 raise Exception(self.errmsg)
448 def file_pattern(name):
449 return "{0}-{1}".format(os.getpid(), name)
451 class FilePath:
453 Context manager generating multiple file names. The generated files are
454 removed when exiting the context.
456 Example usage:
458 with FilePath('a.img', 'b.img') as (img_a, img_b):
459 # Use img_a and img_b here...
461 # a.img and b.img are automatically removed here.
463 By default images are created in iotests.test_dir. To create sockets use
464 iotests.sock_dir:
466 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
468 For convenience, calling with one argument yields a single file instead of
469 a tuple with one item.
472 def __init__(self, *names, base_dir=test_dir):
473 self.paths = [os.path.join(base_dir, file_pattern(name))
474 for name in names]
476 def __enter__(self):
477 if len(self.paths) == 1:
478 return self.paths[0]
479 else:
480 return self.paths
482 def __exit__(self, exc_type, exc_val, exc_tb):
483 for path in self.paths:
484 try:
485 os.remove(path)
486 except OSError:
487 pass
488 return False
491 def file_path_remover():
492 for path in reversed(file_path_remover.paths):
493 try:
494 os.remove(path)
495 except OSError:
496 pass
499 def file_path(*names, base_dir=test_dir):
500 ''' Another way to get auto-generated filename that cleans itself up.
502 Use is as simple as:
504 img_a, img_b = file_path('a.img', 'b.img')
505 sock = file_path('socket')
508 if not hasattr(file_path_remover, 'paths'):
509 file_path_remover.paths = []
510 atexit.register(file_path_remover)
512 paths = []
513 for name in names:
514 filename = file_pattern(name)
515 path = os.path.join(base_dir, filename)
516 file_path_remover.paths.append(path)
517 paths.append(path)
519 return paths[0] if len(paths) == 1 else paths
521 def remote_filename(path):
522 if imgproto == 'file':
523 return path
524 elif imgproto == 'ssh':
525 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
526 else:
527 raise Exception("Protocol %s not supported" % (imgproto))
529 class VM(qtest.QEMUQtestMachine):
530 '''A QEMU VM'''
532 def __init__(self, path_suffix=''):
533 name = "qemu%s-%d" % (path_suffix, os.getpid())
534 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
535 test_dir=test_dir,
536 socket_scm_helper=socket_scm_helper,
537 sock_dir=sock_dir)
538 self._num_drives = 0
540 def add_object(self, opts):
541 self._args.append('-object')
542 self._args.append(opts)
543 return self
545 def add_device(self, opts):
546 self._args.append('-device')
547 self._args.append(opts)
548 return self
550 def add_drive_raw(self, opts):
551 self._args.append('-drive')
552 self._args.append(opts)
553 return self
555 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
556 '''Add a virtio-blk drive to the VM'''
557 options = ['if=%s' % interface,
558 'id=drive%d' % self._num_drives]
560 if path is not None:
561 options.append('file=%s' % path)
562 options.append('format=%s' % img_format)
563 options.append('cache=%s' % cachemode)
564 options.append('aio=%s' % aiomode)
566 if opts:
567 options.append(opts)
569 if img_format == 'luks' and 'key-secret' not in opts:
570 # default luks support
571 if luks_default_secret_object not in self._args:
572 self.add_object(luks_default_secret_object)
574 options.append(luks_default_key_secret_opt)
576 self._args.append('-drive')
577 self._args.append(','.join(options))
578 self._num_drives += 1
579 return self
581 def add_blockdev(self, opts):
582 self._args.append('-blockdev')
583 if isinstance(opts, str):
584 self._args.append(opts)
585 else:
586 self._args.append(','.join(opts))
587 return self
589 def add_incoming(self, addr):
590 self._args.append('-incoming')
591 self._args.append(addr)
592 return self
594 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
595 cmd = 'human-monitor-command'
596 kwargs = {'command-line': command_line}
597 if use_log:
598 return self.qmp_log(cmd, **kwargs)
599 else:
600 return self.qmp(cmd, **kwargs)
602 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
603 """Pause drive r/w operations"""
604 if not event:
605 self.pause_drive(drive, "read_aio")
606 self.pause_drive(drive, "write_aio")
607 return
608 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
610 def resume_drive(self, drive: str) -> None:
611 """Resume drive r/w operations"""
612 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
614 def hmp_qemu_io(self, drive: str, cmd: str,
615 use_log: bool = False) -> QMPMessage:
616 """Write to a given drive using an HMP command"""
617 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
619 def flatten_qmp_object(self, obj, output=None, basestr=''):
620 if output is None:
621 output = dict()
622 if isinstance(obj, list):
623 for i, item in enumerate(obj):
624 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
625 elif isinstance(obj, dict):
626 for key in obj:
627 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
628 else:
629 output[basestr[:-1]] = obj # Strip trailing '.'
630 return output
632 def qmp_to_opts(self, obj):
633 obj = self.flatten_qmp_object(obj)
634 output_list = list()
635 for key in obj:
636 output_list += [key + '=' + obj[key]]
637 return ','.join(output_list)
639 def get_qmp_events_filtered(self, wait=60.0):
640 result = []
641 for ev in self.get_qmp_events(wait=wait):
642 result.append(filter_qmp_event(ev))
643 return result
645 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
646 full_cmd = OrderedDict((
647 ("execute", cmd),
648 ("arguments", ordered_qmp(kwargs))
650 log(full_cmd, filters, indent=indent)
651 result = self.qmp(cmd, **kwargs)
652 log(result, filters, indent=indent)
653 return result
655 # Returns None on success, and an error string on failure
656 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
657 pre_finalize=None, cancel=False, wait=60.0):
659 run_job moves a job from creation through to dismissal.
661 :param job: String. ID of recently-launched job
662 :param auto_finalize: Bool. True if the job was launched with
663 auto_finalize. Defaults to True.
664 :param auto_dismiss: Bool. True if the job was launched with
665 auto_dismiss=True. Defaults to False.
666 :param pre_finalize: Callback. A callable that takes no arguments to be
667 invoked prior to issuing job-finalize, if any.
668 :param cancel: Bool. When true, cancels the job after the pre_finalize
669 callback.
670 :param wait: Float. Timeout value specifying how long to wait for any
671 event, in seconds. Defaults to 60.0.
673 match_device = {'data': {'device': job}}
674 match_id = {'data': {'id': job}}
675 events = [
676 ('BLOCK_JOB_COMPLETED', match_device),
677 ('BLOCK_JOB_CANCELLED', match_device),
678 ('BLOCK_JOB_ERROR', match_device),
679 ('BLOCK_JOB_READY', match_device),
680 ('BLOCK_JOB_PENDING', match_id),
681 ('JOB_STATUS_CHANGE', match_id)
683 error = None
684 while True:
685 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
686 if ev['event'] != 'JOB_STATUS_CHANGE':
687 log(ev)
688 continue
689 status = ev['data']['status']
690 if status == 'aborting':
691 result = self.qmp('query-jobs')
692 for j in result['return']:
693 if j['id'] == job:
694 error = j['error']
695 log('Job failed: %s' % (j['error']))
696 elif status == 'ready':
697 self.qmp_log('job-complete', id=job)
698 elif status == 'pending' and not auto_finalize:
699 if pre_finalize:
700 pre_finalize()
701 if cancel:
702 self.qmp_log('job-cancel', id=job)
703 else:
704 self.qmp_log('job-finalize', id=job)
705 elif status == 'concluded' and not auto_dismiss:
706 self.qmp_log('job-dismiss', id=job)
707 elif status == 'null':
708 return error
710 # Returns None on success, and an error string on failure
711 def blockdev_create(self, options, job_id='job0', filters=None):
712 if filters is None:
713 filters = [filter_qmp_testfiles]
714 result = self.qmp_log('blockdev-create', filters=filters,
715 job_id=job_id, options=options)
717 if 'return' in result:
718 assert result['return'] == {}
719 job_result = self.run_job(job_id)
720 else:
721 job_result = result['error']
723 log("")
724 return job_result
726 def enable_migration_events(self, name):
727 log('Enabling migration QMP events on %s...' % name)
728 log(self.qmp('migrate-set-capabilities', capabilities=[
730 'capability': 'events',
731 'state': True
735 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
736 while True:
737 event = self.event_wait('MIGRATION')
738 log(event, filters=[filter_qmp_event])
739 if event['data']['status'] in ('completed', 'failed'):
740 break
742 if event['data']['status'] == 'completed':
743 # The event may occur in finish-migrate, so wait for the expected
744 # post-migration runstate
745 runstate = None
746 while runstate != expect_runstate:
747 runstate = self.qmp('query-status')['return']['status']
748 return True
749 else:
750 return False
752 def node_info(self, node_name):
753 nodes = self.qmp('query-named-block-nodes')
754 for x in nodes['return']:
755 if x['node-name'] == node_name:
756 return x
757 return None
759 def query_bitmaps(self):
760 res = self.qmp("query-named-block-nodes")
761 return {device['node-name']: device['dirty-bitmaps']
762 for device in res['return'] if 'dirty-bitmaps' in device}
764 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
766 get a specific bitmap from the object returned by query_bitmaps.
767 :param recording: If specified, filter results by the specified value.
768 :param bitmaps: If specified, use it instead of call query_bitmaps()
770 if bitmaps is None:
771 bitmaps = self.query_bitmaps()
773 for bitmap in bitmaps[node_name]:
774 if bitmap.get('name', '') == bitmap_name:
775 if recording is None or bitmap.get('recording') == recording:
776 return bitmap
777 return None
779 def check_bitmap_status(self, node_name, bitmap_name, fields):
780 ret = self.get_bitmap(node_name, bitmap_name)
782 return fields.items() <= ret.items()
784 def assert_block_path(self, root, path, expected_node, graph=None):
786 Check whether the node under the given path in the block graph
787 is @expected_node.
789 @root is the node name of the node where the @path is rooted.
791 @path is a string that consists of child names separated by
792 slashes. It must begin with a slash.
794 Examples for @root + @path:
795 - root="qcow2-node", path="/backing/file"
796 - root="quorum-node", path="/children.2/file"
798 Hypothetically, @path could be empty, in which case it would
799 point to @root. However, in practice this case is not useful
800 and hence not allowed.
802 @expected_node may be None. (All elements of the path but the
803 leaf must still exist.)
805 @graph may be None or the result of an x-debug-query-block-graph
806 call that has already been performed.
808 if graph is None:
809 graph = self.qmp('x-debug-query-block-graph')['return']
811 iter_path = iter(path.split('/'))
813 # Must start with a /
814 assert next(iter_path) == ''
816 node = next((node for node in graph['nodes'] if node['name'] == root),
817 None)
819 # An empty @path is not allowed, so the root node must be present
820 assert node is not None, 'Root node %s not found' % root
822 for child_name in iter_path:
823 assert node is not None, 'Cannot follow path %s%s' % (root, path)
825 try:
826 node_id = next(edge['child'] for edge in graph['edges']
827 if (edge['parent'] == node['id'] and
828 edge['name'] == child_name))
830 node = next(node for node in graph['nodes']
831 if node['id'] == node_id)
833 except StopIteration:
834 node = None
836 if node is None:
837 assert expected_node is None, \
838 'No node found under %s (but expected %s)' % \
839 (path, expected_node)
840 else:
841 assert node['name'] == expected_node, \
842 'Found node %s under %s (but expected %s)' % \
843 (node['name'], path, expected_node)
845 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
847 class QMPTestCase(unittest.TestCase):
848 '''Abstract base class for QMP test cases'''
850 def __init__(self, *args, **kwargs):
851 super().__init__(*args, **kwargs)
852 # Many users of this class set a VM property we rely on heavily
853 # in the methods below.
854 self.vm = None
856 def dictpath(self, d, path):
857 '''Traverse a path in a nested dict'''
858 for component in path.split('/'):
859 m = index_re.match(component)
860 if m:
861 component, idx = m.groups()
862 idx = int(idx)
864 if not isinstance(d, dict) or component not in d:
865 self.fail(f'failed path traversal for "{path}" in "{d}"')
866 d = d[component]
868 if m:
869 if not isinstance(d, list):
870 self.fail(f'path component "{component}" in "{path}" '
871 f'is not a list in "{d}"')
872 try:
873 d = d[idx]
874 except IndexError:
875 self.fail(f'invalid index "{idx}" in path "{path}" '
876 f'in "{d}"')
877 return d
879 def assert_qmp_absent(self, d, path):
880 try:
881 result = self.dictpath(d, path)
882 except AssertionError:
883 return
884 self.fail('path "%s" has value "%s"' % (path, str(result)))
886 def assert_qmp(self, d, path, value):
887 '''Assert that the value for a specific path in a QMP dict
888 matches. When given a list of values, assert that any of
889 them matches.'''
891 result = self.dictpath(d, path)
893 # [] makes no sense as a list of valid values, so treat it as
894 # an actual single value.
895 if isinstance(value, list) and value != []:
896 for v in value:
897 if result == v:
898 return
899 self.fail('no match for "%s" in %s' % (str(result), str(value)))
900 else:
901 self.assertEqual(result, value,
902 '"%s" is "%s", expected "%s"'
903 % (path, str(result), str(value)))
905 def assert_no_active_block_jobs(self):
906 result = self.vm.qmp('query-block-jobs')
907 self.assert_qmp(result, 'return', [])
909 def assert_has_block_node(self, node_name=None, file_name=None):
910 """Issue a query-named-block-nodes and assert node_name and/or
911 file_name is present in the result"""
912 def check_equal_or_none(a, b):
913 return a is None or b is None or a == b
914 assert node_name or file_name
915 result = self.vm.qmp('query-named-block-nodes')
916 for x in result["return"]:
917 if check_equal_or_none(x.get("node-name"), node_name) and \
918 check_equal_or_none(x.get("file"), file_name):
919 return
920 self.fail("Cannot find %s %s in result:\n%s" %
921 (node_name, file_name, result))
923 def assert_json_filename_equal(self, json_filename, reference):
924 '''Asserts that the given filename is a json: filename and that its
925 content is equal to the given reference object'''
926 self.assertEqual(json_filename[:5], 'json:')
927 self.assertEqual(
928 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
929 self.vm.flatten_qmp_object(reference)
932 def cancel_and_wait(self, drive='drive0', force=False,
933 resume=False, wait=60.0):
934 '''Cancel a block job and wait for it to finish, returning the event'''
935 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
936 self.assert_qmp(result, 'return', {})
938 if resume:
939 self.vm.resume_drive(drive)
941 cancelled = False
942 result = None
943 while not cancelled:
944 for event in self.vm.get_qmp_events(wait=wait):
945 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
946 event['event'] == 'BLOCK_JOB_CANCELLED':
947 self.assert_qmp(event, 'data/device', drive)
948 result = event
949 cancelled = True
950 elif event['event'] == 'JOB_STATUS_CHANGE':
951 self.assert_qmp(event, 'data/id', drive)
954 self.assert_no_active_block_jobs()
955 return result
957 def wait_until_completed(self, drive='drive0', check_offset=True,
958 wait=60.0, error=None):
959 '''Wait for a block job to finish, returning the event'''
960 while True:
961 for event in self.vm.get_qmp_events(wait=wait):
962 if event['event'] == 'BLOCK_JOB_COMPLETED':
963 self.assert_qmp(event, 'data/device', drive)
964 if error is None:
965 self.assert_qmp_absent(event, 'data/error')
966 if check_offset:
967 self.assert_qmp(event, 'data/offset',
968 event['data']['len'])
969 else:
970 self.assert_qmp(event, 'data/error', error)
971 self.assert_no_active_block_jobs()
972 return event
973 if event['event'] == 'JOB_STATUS_CHANGE':
974 self.assert_qmp(event, 'data/id', drive)
976 def wait_ready(self, drive='drive0'):
977 """Wait until a BLOCK_JOB_READY event, and return the event."""
978 return self.vm.events_wait([
979 ('BLOCK_JOB_READY',
980 {'data': {'type': 'mirror', 'device': drive}}),
981 ('BLOCK_JOB_READY',
982 {'data': {'type': 'commit', 'device': drive}})
985 def wait_ready_and_cancel(self, drive='drive0'):
986 self.wait_ready(drive=drive)
987 event = self.cancel_and_wait(drive=drive)
988 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
989 self.assert_qmp(event, 'data/type', 'mirror')
990 self.assert_qmp(event, 'data/offset', event['data']['len'])
992 def complete_and_wait(self, drive='drive0', wait_ready=True,
993 completion_error=None):
994 '''Complete a block job and wait for it to finish'''
995 if wait_ready:
996 self.wait_ready(drive=drive)
998 result = self.vm.qmp('block-job-complete', device=drive)
999 self.assert_qmp(result, 'return', {})
1001 event = self.wait_until_completed(drive=drive, error=completion_error)
1002 self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1004 def pause_wait(self, job_id='job0'):
1005 with Timeout(3, "Timeout waiting for job to pause"):
1006 while True:
1007 result = self.vm.qmp('query-block-jobs')
1008 found = False
1009 for job in result['return']:
1010 if job['device'] == job_id:
1011 found = True
1012 if job['paused'] and not job['busy']:
1013 return job
1014 break
1015 assert found
1017 def pause_job(self, job_id='job0', wait=True):
1018 result = self.vm.qmp('block-job-pause', device=job_id)
1019 self.assert_qmp(result, 'return', {})
1020 if wait:
1021 return self.pause_wait(job_id)
1022 return result
1024 def case_skip(self, reason):
1025 '''Skip this test case'''
1026 case_notrun(reason)
1027 self.skipTest(reason)
1030 def notrun(reason):
1031 '''Skip this test suite'''
1032 # Each test in qemu-iotests has a number ("seq")
1033 seq = os.path.basename(sys.argv[0])
1035 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1036 logger.warning("%s not run: %s", seq, reason)
1037 sys.exit(0)
1039 def case_notrun(reason):
1040 '''Mark this test case as not having been run (without actually
1041 skipping it, that is left to the caller). See
1042 QMPTestCase.case_skip() for a variant that actually skips the
1043 current test case.'''
1045 # Each test in qemu-iotests has a number ("seq")
1046 seq = os.path.basename(sys.argv[0])
1048 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1049 ' [case not run] ' + reason + '\n')
1051 def _verify_image_format(supported_fmts: Sequence[str] = (),
1052 unsupported_fmts: Sequence[str] = ()) -> None:
1053 assert not (supported_fmts and unsupported_fmts)
1055 if 'generic' in supported_fmts and \
1056 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1057 # similar to
1058 # _supported_fmt generic
1059 # for bash tests
1060 if imgfmt == 'luks':
1061 verify_working_luks()
1062 return
1064 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1065 if not_sup or (imgfmt in unsupported_fmts):
1066 notrun('not suitable for this image format: %s' % imgfmt)
1068 if imgfmt == 'luks':
1069 verify_working_luks()
1071 def _verify_protocol(supported: Sequence[str] = (),
1072 unsupported: Sequence[str] = ()) -> None:
1073 assert not (supported and unsupported)
1075 if 'generic' in supported:
1076 return
1078 not_sup = supported and (imgproto not in supported)
1079 if not_sup or (imgproto in unsupported):
1080 notrun('not suitable for this protocol: %s' % imgproto)
1082 def _verify_platform(supported: Sequence[str] = (),
1083 unsupported: Sequence[str] = ()) -> None:
1084 if any((sys.platform.startswith(x) for x in unsupported)):
1085 notrun('not suitable for this OS: %s' % sys.platform)
1087 if supported:
1088 if not any((sys.platform.startswith(x) for x in supported)):
1089 notrun('not suitable for this OS: %s' % sys.platform)
1091 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1092 if supported_cache_modes and (cachemode not in supported_cache_modes):
1093 notrun('not suitable for this cache mode: %s' % cachemode)
1095 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1096 if supported_aio_modes and (aiomode not in supported_aio_modes):
1097 notrun('not suitable for this aio mode: %s' % aiomode)
1099 def supports_quorum():
1100 return 'quorum' in qemu_img_pipe('--help')
1102 def verify_quorum():
1103 '''Skip test suite if quorum support is not available'''
1104 if not supports_quorum():
1105 notrun('quorum support missing')
1107 def has_working_luks() -> Tuple[bool, str]:
1109 Check whether our LUKS driver can actually create images
1110 (this extends to LUKS encryption for qcow2).
1112 If not, return the reason why.
1115 img_file = f'{test_dir}/luks-test.luks'
1116 (output, status) = \
1117 qemu_img_pipe_and_status('create', '-f', 'luks',
1118 '--object', luks_default_secret_object,
1119 '-o', luks_default_key_secret_opt,
1120 '-o', 'iter-time=10',
1121 img_file, '1G')
1122 try:
1123 os.remove(img_file)
1124 except OSError:
1125 pass
1127 if status != 0:
1128 reason = output
1129 for line in output.splitlines():
1130 if img_file + ':' in line:
1131 reason = line.split(img_file + ':', 1)[1].strip()
1132 break
1134 return (False, reason)
1135 else:
1136 return (True, '')
1138 def verify_working_luks():
1140 Skip test suite if LUKS does not work
1142 (working, reason) = has_working_luks()
1143 if not working:
1144 notrun(reason)
1146 def qemu_pipe(*args):
1148 Run qemu with an option to print something and exit (e.g. a help option).
1150 :return: QEMU's stdout output.
1152 args = [qemu_prog] + qemu_opts + list(args)
1153 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
1154 stderr=subprocess.STDOUT,
1155 universal_newlines=True)
1156 output = subp.communicate()[0]
1157 if subp.returncode < 0:
1158 sys.stderr.write('qemu received signal %i: %s\n' %
1159 (-subp.returncode, ' '.join(args)))
1160 return output
1162 def supported_formats(read_only=False):
1163 '''Set 'read_only' to True to check ro-whitelist
1164 Otherwise, rw-whitelist is checked'''
1166 if not hasattr(supported_formats, "formats"):
1167 supported_formats.formats = {}
1169 if read_only not in supported_formats.formats:
1170 format_message = qemu_pipe("-drive", "format=help")
1171 line = 1 if read_only else 0
1172 supported_formats.formats[read_only] = \
1173 format_message.splitlines()[line].split(":")[1].split()
1175 return supported_formats.formats[read_only]
1177 def skip_if_unsupported(required_formats=(), read_only=False):
1178 '''Skip Test Decorator
1179 Runs the test if all the required formats are whitelisted'''
1180 def skip_test_decorator(func):
1181 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1182 **kwargs: Dict[str, Any]) -> None:
1183 if callable(required_formats):
1184 fmts = required_formats(test_case)
1185 else:
1186 fmts = required_formats
1188 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1189 if usf_list:
1190 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1191 test_case.case_skip(msg)
1192 else:
1193 func(test_case, *args, **kwargs)
1194 return func_wrapper
1195 return skip_test_decorator
1197 def skip_for_formats(formats: Sequence[str] = ()) \
1198 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1199 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1200 '''Skip Test Decorator
1201 Skips the test for the given formats'''
1202 def skip_test_decorator(func):
1203 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1204 **kwargs: Dict[str, Any]) -> None:
1205 if imgfmt in formats:
1206 msg = f'{test_case}: Skipped for format {imgfmt}'
1207 test_case.case_skip(msg)
1208 else:
1209 func(test_case, *args, **kwargs)
1210 return func_wrapper
1211 return skip_test_decorator
1213 def skip_if_user_is_root(func):
1214 '''Skip Test Decorator
1215 Runs the test only without root permissions'''
1216 def func_wrapper(*args, **kwargs):
1217 if os.getuid() == 0:
1218 case_notrun('{}: cannot be run as root'.format(args[0]))
1219 return None
1220 else:
1221 return func(*args, **kwargs)
1222 return func_wrapper
1224 def execute_unittest(debug=False):
1225 """Executes unittests within the calling module."""
1227 verbosity = 2 if debug else 1
1229 if debug:
1230 output = sys.stdout
1231 else:
1232 # We need to filter out the time taken from the output so that
1233 # qemu-iotest can reliably diff the results against master output.
1234 output = io.StringIO()
1236 runner = unittest.TextTestRunner(stream=output, descriptions=True,
1237 verbosity=verbosity)
1238 try:
1239 # unittest.main() will use sys.exit(); so expect a SystemExit
1240 # exception
1241 unittest.main(testRunner=runner)
1242 finally:
1243 # We need to filter out the time taken from the output so that
1244 # qemu-iotest can reliably diff the results against master output.
1245 if not debug:
1246 out = output.getvalue()
1247 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1249 # Hide skipped tests from the reference output
1250 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1251 out_first_line, out_rest = out.split('\n', 1)
1252 out = out_first_line.replace('s', '.') + '\n' + out_rest
1254 sys.stderr.write(out)
1256 def execute_setup_common(supported_fmts: Sequence[str] = (),
1257 supported_platforms: Sequence[str] = (),
1258 supported_cache_modes: Sequence[str] = (),
1259 supported_aio_modes: Sequence[str] = (),
1260 unsupported_fmts: Sequence[str] = (),
1261 supported_protocols: Sequence[str] = (),
1262 unsupported_protocols: Sequence[str] = ()) -> bool:
1264 Perform necessary setup for either script-style or unittest-style tests.
1266 :return: Bool; Whether or not debug mode has been requested via the CLI.
1268 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1270 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1271 # indicate that we're not being run via "check". There may be
1272 # other things set up by "check" that individual test cases rely
1273 # on.
1274 if test_dir is None or qemu_default_machine is None:
1275 sys.stderr.write('Please run this test via the "check" script\n')
1276 sys.exit(os.EX_USAGE)
1278 debug = '-d' in sys.argv
1279 if debug:
1280 sys.argv.remove('-d')
1281 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1283 _verify_image_format(supported_fmts, unsupported_fmts)
1284 _verify_protocol(supported_protocols, unsupported_protocols)
1285 _verify_platform(supported=supported_platforms)
1286 _verify_cache_mode(supported_cache_modes)
1287 _verify_aio_mode(supported_aio_modes)
1289 return debug
1291 def execute_test(*args, test_function=None, **kwargs):
1292 """Run either unittest or script-style tests."""
1294 debug = execute_setup_common(*args, **kwargs)
1295 if not test_function:
1296 execute_unittest(debug)
1297 else:
1298 test_function()
1300 def activate_logging():
1301 """Activate iotests.log() output to stdout for script-style tests."""
1302 handler = logging.StreamHandler(stream=sys.stdout)
1303 formatter = logging.Formatter('%(message)s')
1304 handler.setFormatter(formatter)
1305 test_logger.addHandler(handler)
1306 test_logger.setLevel(logging.INFO)
1307 test_logger.propagate = False
1309 # This is called from script-style iotests without a single point of entry
1310 def script_initialize(*args, **kwargs):
1311 """Initialize script-style tests without running any tests."""
1312 activate_logging()
1313 execute_setup_common(*args, **kwargs)
1315 # This is called from script-style iotests with a single point of entry
1316 def script_main(test_function, *args, **kwargs):
1317 """Run script-style tests outside of the unittest framework"""
1318 activate_logging()
1319 execute_test(*args, test_function=test_function, **kwargs)
1321 # This is called from unittest style iotests
1322 def main(*args, **kwargs):
1323 """Run tests using the unittest framework"""
1324 execute_test(*args, **kwargs)