docs: rstfy vfio-ap documentation
[qemu/ar7.git] / tests / qemu-iotests / iotests.py
blob8815052eb538ad77edbed20f9c45727956368a6f
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import errno
20 import os
21 import re
22 import subprocess
23 import string
24 import unittest
25 import sys
26 import struct
27 import json
28 import signal
29 import logging
30 import atexit
31 import io
32 from collections import OrderedDict
34 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
35 from qemu import qtest
37 assert sys.version_info >= (3,6)
39 # This will not work if arguments contain spaces but is necessary if we
40 # want to support the override options that ./check supports.
41 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
42 if os.environ.get('QEMU_IMG_OPTIONS'):
43 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
45 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
46 if os.environ.get('QEMU_IO_OPTIONS'):
47 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
49 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
50 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
51 qemu_io_args_no_fmt += \
52 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
54 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
55 if os.environ.get('QEMU_NBD_OPTIONS'):
56 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
58 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
59 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
61 imgfmt = os.environ.get('IMGFMT', 'raw')
62 imgproto = os.environ.get('IMGPROTO', 'file')
63 test_dir = os.environ.get('TEST_DIR')
64 sock_dir = os.environ.get('SOCK_DIR')
65 output_dir = os.environ.get('OUTPUT_DIR', '.')
66 cachemode = os.environ.get('CACHEMODE')
67 aiomode = os.environ.get('AIOMODE')
68 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
70 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
72 luks_default_secret_object = 'secret,id=keysec0,data=' + \
73 os.environ.get('IMGKEYSECRET', '')
74 luks_default_key_secret_opt = 'key-secret=keysec0'
77 def qemu_img(*args):
78 '''Run qemu-img and return the exit code'''
79 devnull = open('/dev/null', 'r+')
80 exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
81 if exitcode < 0:
82 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
83 return exitcode
85 def ordered_qmp(qmsg, conv_keys=True):
86 # Dictionaries are not ordered prior to 3.6, therefore:
87 if isinstance(qmsg, list):
88 return [ordered_qmp(atom) for atom in qmsg]
89 if isinstance(qmsg, dict):
90 od = OrderedDict()
91 for k, v in sorted(qmsg.items()):
92 if conv_keys:
93 k = k.replace('_', '-')
94 od[k] = ordered_qmp(v, conv_keys=False)
95 return od
96 return qmsg
98 def qemu_img_create(*args):
99 args = list(args)
101 # default luks support
102 if '-f' in args and args[args.index('-f') + 1] == 'luks':
103 if '-o' in args:
104 i = args.index('-o')
105 if 'key-secret' not in args[i + 1]:
106 args[i + 1].append(luks_default_key_secret_opt)
107 args.insert(i + 2, '--object')
108 args.insert(i + 3, luks_default_secret_object)
109 else:
110 args = ['-o', luks_default_key_secret_opt,
111 '--object', luks_default_secret_object] + args
113 args.insert(0, 'create')
115 return qemu_img(*args)
117 def qemu_img_verbose(*args):
118 '''Run qemu-img without suppressing its output and return the exit code'''
119 exitcode = subprocess.call(qemu_img_args + list(args))
120 if exitcode < 0:
121 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
122 return exitcode
124 def qemu_img_pipe(*args):
125 '''Run qemu-img and return its output'''
126 subp = subprocess.Popen(qemu_img_args + list(args),
127 stdout=subprocess.PIPE,
128 stderr=subprocess.STDOUT,
129 universal_newlines=True)
130 exitcode = subp.wait()
131 if exitcode < 0:
132 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
133 return subp.communicate()[0]
135 def qemu_img_log(*args):
136 result = qemu_img_pipe(*args)
137 log(result, filters=[filter_testfiles])
138 return result
140 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]):
141 args = [ 'info' ]
142 if imgopts:
143 args.append('--image-opts')
144 else:
145 args += [ '-f', imgfmt ]
146 args += extra_args
147 args.append(filename)
149 output = qemu_img_pipe(*args)
150 if not filter_path:
151 filter_path = filename
152 log(filter_img_info(output, filter_path))
154 def qemu_io(*args):
155 '''Run qemu-io and return the stdout data'''
156 args = qemu_io_args + list(args)
157 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
158 stderr=subprocess.STDOUT,
159 universal_newlines=True)
160 exitcode = subp.wait()
161 if exitcode < 0:
162 sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
163 return subp.communicate()[0]
165 def qemu_io_log(*args):
166 result = qemu_io(*args)
167 log(result, filters=[filter_testfiles, filter_qemu_io])
168 return result
170 def qemu_io_silent(*args):
171 '''Run qemu-io and return the exit code, suppressing stdout'''
172 args = qemu_io_args + list(args)
173 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
174 if exitcode < 0:
175 sys.stderr.write('qemu-io received signal %i: %s\n' %
176 (-exitcode, ' '.join(args)))
177 return exitcode
179 def qemu_io_silent_check(*args):
180 '''Run qemu-io and return the true if subprocess returned 0'''
181 args = qemu_io_args + list(args)
182 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
183 stderr=subprocess.STDOUT)
184 return exitcode == 0
186 def get_virtio_scsi_device():
187 if qemu_default_machine == 's390-ccw-virtio':
188 return 'virtio-scsi-ccw'
189 return 'virtio-scsi-pci'
191 class QemuIoInteractive:
192 def __init__(self, *args):
193 self.args = qemu_io_args + list(args)
194 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
195 stdout=subprocess.PIPE,
196 stderr=subprocess.STDOUT,
197 universal_newlines=True)
198 assert self._p.stdout.read(9) == 'qemu-io> '
200 def close(self):
201 self._p.communicate('q\n')
203 def _read_output(self):
204 pattern = 'qemu-io> '
205 n = len(pattern)
206 pos = 0
207 s = []
208 while pos != n:
209 c = self._p.stdout.read(1)
210 # check unexpected EOF
211 assert c != ''
212 s.append(c)
213 if c == pattern[pos]:
214 pos += 1
215 else:
216 pos = 0
218 return ''.join(s[:-n])
220 def cmd(self, cmd):
221 # quit command is in close(), '\n' is added automatically
222 assert '\n' not in cmd
223 cmd = cmd.strip()
224 assert cmd != 'q' and cmd != 'quit'
225 self._p.stdin.write(cmd + '\n')
226 self._p.stdin.flush()
227 return self._read_output()
230 def qemu_nbd(*args):
231 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
232 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
234 def qemu_nbd_early_pipe(*args):
235 '''Run qemu-nbd in daemon mode and return both the parent's exit code
236 and its output in case of an error'''
237 subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
238 stdout=subprocess.PIPE,
239 stderr=subprocess.STDOUT,
240 universal_newlines=True)
241 exitcode = subp.wait()
242 if exitcode < 0:
243 sys.stderr.write('qemu-nbd received signal %i: %s\n' %
244 (-exitcode,
245 ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
246 if exitcode == 0:
247 return exitcode, ''
248 else:
249 return exitcode, subp.communicate()[0]
251 def qemu_nbd_popen(*args):
252 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
253 return subprocess.Popen(qemu_nbd_args + ['--persistent'] + list(args))
255 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
256 '''Return True if two image files are identical'''
257 return qemu_img('compare', '-f', fmt1,
258 '-F', fmt2, img1, img2) == 0
260 def create_image(name, size):
261 '''Create a fully-allocated raw image with sector markers'''
262 file = open(name, 'wb')
263 i = 0
264 while i < size:
265 sector = struct.pack('>l504xl', i // 512, i // 512)
266 file.write(sector)
267 i = i + 512
268 file.close()
270 def image_size(img):
271 '''Return image's virtual size'''
272 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
273 return json.loads(r)['virtual-size']
275 def is_str(val):
276 return isinstance(val, str)
278 test_dir_re = re.compile(r"%s" % test_dir)
279 def filter_test_dir(msg):
280 return test_dir_re.sub("TEST_DIR", msg)
282 win32_re = re.compile(r"\r")
283 def filter_win32(msg):
284 return win32_re.sub("", msg)
286 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
287 def filter_qemu_io(msg):
288 msg = filter_win32(msg)
289 return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
291 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
292 def filter_chown(msg):
293 return chown_re.sub("chown UID:GID", msg)
295 def filter_qmp_event(event):
296 '''Filter a QMP event dict'''
297 event = dict(event)
298 if 'timestamp' in event:
299 event['timestamp']['seconds'] = 'SECS'
300 event['timestamp']['microseconds'] = 'USECS'
301 return event
303 def filter_qmp(qmsg, filter_fn):
304 '''Given a string filter, filter a QMP object's values.
305 filter_fn takes a (key, value) pair.'''
306 # Iterate through either lists or dicts;
307 if isinstance(qmsg, list):
308 items = enumerate(qmsg)
309 else:
310 items = qmsg.items()
312 for k, v in items:
313 if isinstance(v, list) or isinstance(v, dict):
314 qmsg[k] = filter_qmp(v, filter_fn)
315 else:
316 qmsg[k] = filter_fn(k, v)
317 return qmsg
319 def filter_testfiles(msg):
320 prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
321 return msg.replace(prefix, 'TEST_DIR/PID-')
323 def filter_qmp_testfiles(qmsg):
324 def _filter(key, value):
325 if is_str(value):
326 return filter_testfiles(value)
327 return value
328 return filter_qmp(qmsg, _filter)
330 def filter_generated_node_ids(msg):
331 return re.sub("#block[0-9]+", "NODE_NAME", msg)
333 def filter_img_info(output, filename):
334 lines = []
335 for line in output.split('\n'):
336 if 'disk size' in line or 'actual-size' in line:
337 continue
338 line = line.replace(filename, 'TEST_IMG') \
339 .replace(imgfmt, 'IMGFMT')
340 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
341 line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
342 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
343 lines.append(line)
344 return '\n'.join(lines)
346 def filter_imgfmt(msg):
347 return msg.replace(imgfmt, 'IMGFMT')
349 def filter_qmp_imgfmt(qmsg):
350 def _filter(key, value):
351 if is_str(value):
352 return filter_imgfmt(value)
353 return value
354 return filter_qmp(qmsg, _filter)
356 def log(msg, filters=[], indent=None):
357 '''Logs either a string message or a JSON serializable message (like QMP).
358 If indent is provided, JSON serializable messages are pretty-printed.'''
359 for flt in filters:
360 msg = flt(msg)
361 if isinstance(msg, dict) or isinstance(msg, list):
362 # Python < 3.4 needs to know not to add whitespace when pretty-printing:
363 separators = (', ', ': ') if indent is None else (',', ': ')
364 # Don't sort if it's already sorted
365 do_sort = not isinstance(msg, OrderedDict)
366 print(json.dumps(msg, sort_keys=do_sort,
367 indent=indent, separators=separators))
368 else:
369 print(msg)
371 class Timeout:
372 def __init__(self, seconds, errmsg = "Timeout"):
373 self.seconds = seconds
374 self.errmsg = errmsg
375 def __enter__(self):
376 signal.signal(signal.SIGALRM, self.timeout)
377 signal.setitimer(signal.ITIMER_REAL, self.seconds)
378 return self
379 def __exit__(self, type, value, traceback):
380 signal.setitimer(signal.ITIMER_REAL, 0)
381 return False
382 def timeout(self, signum, frame):
383 raise Exception(self.errmsg)
385 def file_pattern(name):
386 return "{0}-{1}".format(os.getpid(), name)
388 class FilePaths(object):
390 FilePaths is an auto-generated filename that cleans itself up.
392 Use this context manager to generate filenames and ensure that the file
393 gets deleted::
395 with FilePaths(['test.img']) as img_path:
396 qemu_img('create', img_path, '1G')
397 # migration_sock_path is automatically deleted
399 def __init__(self, names, base_dir=test_dir):
400 self.paths = []
401 for name in names:
402 self.paths.append(os.path.join(base_dir, file_pattern(name)))
404 def __enter__(self):
405 return self.paths
407 def __exit__(self, exc_type, exc_val, exc_tb):
408 try:
409 for path in self.paths:
410 os.remove(path)
411 except OSError:
412 pass
413 return False
415 class FilePath(FilePaths):
417 FilePath is a specialization of FilePaths that takes a single filename.
419 def __init__(self, name, base_dir=test_dir):
420 super(FilePath, self).__init__([name], base_dir)
422 def __enter__(self):
423 return self.paths[0]
425 def file_path_remover():
426 for path in reversed(file_path_remover.paths):
427 try:
428 os.remove(path)
429 except OSError:
430 pass
433 def file_path(*names, base_dir=test_dir):
434 ''' Another way to get auto-generated filename that cleans itself up.
436 Use is as simple as:
438 img_a, img_b = file_path('a.img', 'b.img')
439 sock = file_path('socket')
442 if not hasattr(file_path_remover, 'paths'):
443 file_path_remover.paths = []
444 atexit.register(file_path_remover)
446 paths = []
447 for name in names:
448 filename = file_pattern(name)
449 path = os.path.join(base_dir, filename)
450 file_path_remover.paths.append(path)
451 paths.append(path)
453 return paths[0] if len(paths) == 1 else paths
455 def remote_filename(path):
456 if imgproto == 'file':
457 return path
458 elif imgproto == 'ssh':
459 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
460 else:
461 raise Exception("Protocol %s not supported" % (imgproto))
463 class VM(qtest.QEMUQtestMachine):
464 '''A QEMU VM'''
466 def __init__(self, path_suffix=''):
467 name = "qemu%s-%d" % (path_suffix, os.getpid())
468 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
469 test_dir=test_dir,
470 socket_scm_helper=socket_scm_helper,
471 sock_dir=sock_dir)
472 self._num_drives = 0
474 def add_object(self, opts):
475 self._args.append('-object')
476 self._args.append(opts)
477 return self
479 def add_device(self, opts):
480 self._args.append('-device')
481 self._args.append(opts)
482 return self
484 def add_drive_raw(self, opts):
485 self._args.append('-drive')
486 self._args.append(opts)
487 return self
489 def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
490 '''Add a virtio-blk drive to the VM'''
491 options = ['if=%s' % interface,
492 'id=drive%d' % self._num_drives]
494 if path is not None:
495 options.append('file=%s' % path)
496 options.append('format=%s' % format)
497 options.append('cache=%s' % cachemode)
498 options.append('aio=%s' % aiomode)
500 if opts:
501 options.append(opts)
503 if format == 'luks' and 'key-secret' not in opts:
504 # default luks support
505 if luks_default_secret_object not in self._args:
506 self.add_object(luks_default_secret_object)
508 options.append(luks_default_key_secret_opt)
510 self._args.append('-drive')
511 self._args.append(','.join(options))
512 self._num_drives += 1
513 return self
515 def add_blockdev(self, opts):
516 self._args.append('-blockdev')
517 if isinstance(opts, str):
518 self._args.append(opts)
519 else:
520 self._args.append(','.join(opts))
521 return self
523 def add_incoming(self, addr):
524 self._args.append('-incoming')
525 self._args.append(addr)
526 return self
528 def pause_drive(self, drive, event=None):
529 '''Pause drive r/w operations'''
530 if not event:
531 self.pause_drive(drive, "read_aio")
532 self.pause_drive(drive, "write_aio")
533 return
534 self.qmp('human-monitor-command',
535 command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
537 def resume_drive(self, drive):
538 self.qmp('human-monitor-command',
539 command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
541 def hmp_qemu_io(self, drive, cmd):
542 '''Write to a given drive using an HMP command'''
543 return self.qmp('human-monitor-command',
544 command_line='qemu-io %s "%s"' % (drive, cmd))
546 def flatten_qmp_object(self, obj, output=None, basestr=''):
547 if output is None:
548 output = dict()
549 if isinstance(obj, list):
550 for i in range(len(obj)):
551 self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
552 elif isinstance(obj, dict):
553 for key in obj:
554 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
555 else:
556 output[basestr[:-1]] = obj # Strip trailing '.'
557 return output
559 def qmp_to_opts(self, obj):
560 obj = self.flatten_qmp_object(obj)
561 output_list = list()
562 for key in obj:
563 output_list += [key + '=' + obj[key]]
564 return ','.join(output_list)
566 def get_qmp_events_filtered(self, wait=60.0):
567 result = []
568 for ev in self.get_qmp_events(wait=wait):
569 result.append(filter_qmp_event(ev))
570 return result
572 def qmp_log(self, cmd, filters=[], indent=None, **kwargs):
573 full_cmd = OrderedDict((
574 ("execute", cmd),
575 ("arguments", ordered_qmp(kwargs))
577 log(full_cmd, filters, indent=indent)
578 result = self.qmp(cmd, **kwargs)
579 log(result, filters, indent=indent)
580 return result
582 # Returns None on success, and an error string on failure
583 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
584 pre_finalize=None, cancel=False, use_log=True, wait=60.0):
586 run_job moves a job from creation through to dismissal.
588 :param job: String. ID of recently-launched job
589 :param auto_finalize: Bool. True if the job was launched with
590 auto_finalize. Defaults to True.
591 :param auto_dismiss: Bool. True if the job was launched with
592 auto_dismiss=True. Defaults to False.
593 :param pre_finalize: Callback. A callable that takes no arguments to be
594 invoked prior to issuing job-finalize, if any.
595 :param cancel: Bool. When true, cancels the job after the pre_finalize
596 callback.
597 :param use_log: Bool. When false, does not log QMP messages.
598 :param wait: Float. Timeout value specifying how long to wait for any
599 event, in seconds. Defaults to 60.0.
601 match_device = {'data': {'device': job}}
602 match_id = {'data': {'id': job}}
603 events = [
604 ('BLOCK_JOB_COMPLETED', match_device),
605 ('BLOCK_JOB_CANCELLED', match_device),
606 ('BLOCK_JOB_ERROR', match_device),
607 ('BLOCK_JOB_READY', match_device),
608 ('BLOCK_JOB_PENDING', match_id),
609 ('JOB_STATUS_CHANGE', match_id)
611 error = None
612 while True:
613 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
614 if ev['event'] != 'JOB_STATUS_CHANGE':
615 if use_log:
616 log(ev)
617 continue
618 status = ev['data']['status']
619 if status == 'aborting':
620 result = self.qmp('query-jobs')
621 for j in result['return']:
622 if j['id'] == job:
623 error = j['error']
624 if use_log:
625 log('Job failed: %s' % (j['error']))
626 elif status == 'ready':
627 self.qmp_log('job-complete', id=job)
628 elif status == 'pending' and not auto_finalize:
629 if pre_finalize:
630 pre_finalize()
631 if cancel and use_log:
632 self.qmp_log('job-cancel', id=job)
633 elif cancel:
634 self.qmp('job-cancel', id=job)
635 elif use_log:
636 self.qmp_log('job-finalize', id=job)
637 else:
638 self.qmp('job-finalize', id=job)
639 elif status == 'concluded' and not auto_dismiss:
640 if use_log:
641 self.qmp_log('job-dismiss', id=job)
642 else:
643 self.qmp('job-dismiss', id=job)
644 elif status == 'null':
645 return error
647 # Returns None on success, and an error string on failure
648 def blockdev_create(self, options, job_id='job0', filters=None):
649 if filters is None:
650 filters = [filter_qmp_testfiles]
651 result = self.qmp_log('blockdev-create', filters=filters,
652 job_id=job_id, options=options)
654 if 'return' in result:
655 assert result['return'] == {}
656 job_result = self.run_job(job_id)
657 else:
658 job_result = result['error']
660 log("")
661 return job_result
663 def enable_migration_events(self, name):
664 log('Enabling migration QMP events on %s...' % name)
665 log(self.qmp('migrate-set-capabilities', capabilities=[
667 'capability': 'events',
668 'state': True
672 def wait_migration(self, expect_runstate):
673 while True:
674 event = self.event_wait('MIGRATION')
675 log(event, filters=[filter_qmp_event])
676 if event['data']['status'] == 'completed':
677 break
678 # The event may occur in finish-migrate, so wait for the expected
679 # post-migration runstate
680 while self.qmp('query-status')['return']['status'] != expect_runstate:
681 pass
683 def node_info(self, node_name):
684 nodes = self.qmp('query-named-block-nodes')
685 for x in nodes['return']:
686 if x['node-name'] == node_name:
687 return x
688 return None
690 def query_bitmaps(self):
691 res = self.qmp("query-named-block-nodes")
692 return {device['node-name']: device['dirty-bitmaps']
693 for device in res['return'] if 'dirty-bitmaps' in device}
695 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
697 get a specific bitmap from the object returned by query_bitmaps.
698 :param recording: If specified, filter results by the specified value.
699 :param bitmaps: If specified, use it instead of call query_bitmaps()
701 if bitmaps is None:
702 bitmaps = self.query_bitmaps()
704 for bitmap in bitmaps[node_name]:
705 if bitmap.get('name', '') == bitmap_name:
706 if recording is None:
707 return bitmap
708 elif bitmap.get('recording') == recording:
709 return bitmap
710 return None
712 def check_bitmap_status(self, node_name, bitmap_name, fields):
713 ret = self.get_bitmap(node_name, bitmap_name)
715 return fields.items() <= ret.items()
717 def assert_block_path(self, root, path, expected_node, graph=None):
719 Check whether the node under the given path in the block graph
720 is @expected_node.
722 @root is the node name of the node where the @path is rooted.
724 @path is a string that consists of child names separated by
725 slashes. It must begin with a slash.
727 Examples for @root + @path:
728 - root="qcow2-node", path="/backing/file"
729 - root="quorum-node", path="/children.2/file"
731 Hypothetically, @path could be empty, in which case it would
732 point to @root. However, in practice this case is not useful
733 and hence not allowed.
735 @expected_node may be None. (All elements of the path but the
736 leaf must still exist.)
738 @graph may be None or the result of an x-debug-query-block-graph
739 call that has already been performed.
741 if graph is None:
742 graph = self.qmp('x-debug-query-block-graph')['return']
744 iter_path = iter(path.split('/'))
746 # Must start with a /
747 assert next(iter_path) == ''
749 node = next((node for node in graph['nodes'] if node['name'] == root),
750 None)
752 # An empty @path is not allowed, so the root node must be present
753 assert node is not None, 'Root node %s not found' % root
755 for child_name in iter_path:
756 assert node is not None, 'Cannot follow path %s%s' % (root, path)
758 try:
759 node_id = next(edge['child'] for edge in graph['edges'] \
760 if edge['parent'] == node['id'] and
761 edge['name'] == child_name)
763 node = next(node for node in graph['nodes'] \
764 if node['id'] == node_id)
765 except StopIteration:
766 node = None
768 if node is None:
769 assert expected_node is None, \
770 'No node found under %s (but expected %s)' % \
771 (path, expected_node)
772 else:
773 assert node['name'] == expected_node, \
774 'Found node %s under %s (but expected %s)' % \
775 (node['name'], path, expected_node)
777 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
779 class QMPTestCase(unittest.TestCase):
780 '''Abstract base class for QMP test cases'''
782 def dictpath(self, d, path):
783 '''Traverse a path in a nested dict'''
784 for component in path.split('/'):
785 m = index_re.match(component)
786 if m:
787 component, idx = m.groups()
788 idx = int(idx)
790 if not isinstance(d, dict) or component not in d:
791 self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
792 d = d[component]
794 if m:
795 if not isinstance(d, list):
796 self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
797 try:
798 d = d[idx]
799 except IndexError:
800 self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
801 return d
803 def assert_qmp_absent(self, d, path):
804 try:
805 result = self.dictpath(d, path)
806 except AssertionError:
807 return
808 self.fail('path "%s" has value "%s"' % (path, str(result)))
810 def assert_qmp(self, d, path, value):
811 '''Assert that the value for a specific path in a QMP dict
812 matches. When given a list of values, assert that any of
813 them matches.'''
815 result = self.dictpath(d, path)
817 # [] makes no sense as a list of valid values, so treat it as
818 # an actual single value.
819 if isinstance(value, list) and value != []:
820 for v in value:
821 if result == v:
822 return
823 self.fail('no match for "%s" in %s' % (str(result), str(value)))
824 else:
825 self.assertEqual(result, value,
826 '"%s" is "%s", expected "%s"'
827 % (path, str(result), str(value)))
829 def assert_no_active_block_jobs(self):
830 result = self.vm.qmp('query-block-jobs')
831 self.assert_qmp(result, 'return', [])
833 def assert_has_block_node(self, node_name=None, file_name=None):
834 """Issue a query-named-block-nodes and assert node_name and/or
835 file_name is present in the result"""
836 def check_equal_or_none(a, b):
837 return a == None or b == None or a == b
838 assert node_name or file_name
839 result = self.vm.qmp('query-named-block-nodes')
840 for x in result["return"]:
841 if check_equal_or_none(x.get("node-name"), node_name) and \
842 check_equal_or_none(x.get("file"), file_name):
843 return
844 self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
845 (node_name, file_name, result))
847 def assert_json_filename_equal(self, json_filename, reference):
848 '''Asserts that the given filename is a json: filename and that its
849 content is equal to the given reference object'''
850 self.assertEqual(json_filename[:5], 'json:')
851 self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
852 self.vm.flatten_qmp_object(reference))
854 def cancel_and_wait(self, drive='drive0', force=False, resume=False, wait=60.0):
855 '''Cancel a block job and wait for it to finish, returning the event'''
856 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
857 self.assert_qmp(result, 'return', {})
859 if resume:
860 self.vm.resume_drive(drive)
862 cancelled = False
863 result = None
864 while not cancelled:
865 for event in self.vm.get_qmp_events(wait=wait):
866 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
867 event['event'] == 'BLOCK_JOB_CANCELLED':
868 self.assert_qmp(event, 'data/device', drive)
869 result = event
870 cancelled = True
871 elif event['event'] == 'JOB_STATUS_CHANGE':
872 self.assert_qmp(event, 'data/id', drive)
875 self.assert_no_active_block_jobs()
876 return result
878 def wait_until_completed(self, drive='drive0', check_offset=True, wait=60.0,
879 error=None):
880 '''Wait for a block job to finish, returning the event'''
881 while True:
882 for event in self.vm.get_qmp_events(wait=wait):
883 if event['event'] == 'BLOCK_JOB_COMPLETED':
884 self.assert_qmp(event, 'data/device', drive)
885 if error is None:
886 self.assert_qmp_absent(event, 'data/error')
887 if check_offset:
888 self.assert_qmp(event, 'data/offset',
889 event['data']['len'])
890 else:
891 self.assert_qmp(event, 'data/error', error)
892 self.assert_no_active_block_jobs()
893 return event
894 elif event['event'] == 'JOB_STATUS_CHANGE':
895 self.assert_qmp(event, 'data/id', drive)
897 def wait_ready(self, drive='drive0'):
898 '''Wait until a block job BLOCK_JOB_READY event'''
899 f = {'data': {'type': 'mirror', 'device': drive } }
900 event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
902 def wait_ready_and_cancel(self, drive='drive0'):
903 self.wait_ready(drive=drive)
904 event = self.cancel_and_wait(drive=drive)
905 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
906 self.assert_qmp(event, 'data/type', 'mirror')
907 self.assert_qmp(event, 'data/offset', event['data']['len'])
909 def complete_and_wait(self, drive='drive0', wait_ready=True,
910 completion_error=None):
911 '''Complete a block job and wait for it to finish'''
912 if wait_ready:
913 self.wait_ready(drive=drive)
915 result = self.vm.qmp('block-job-complete', device=drive)
916 self.assert_qmp(result, 'return', {})
918 event = self.wait_until_completed(drive=drive, error=completion_error)
919 self.assert_qmp(event, 'data/type', 'mirror')
921 def pause_wait(self, job_id='job0'):
922 with Timeout(1, "Timeout waiting for job to pause"):
923 while True:
924 result = self.vm.qmp('query-block-jobs')
925 found = False
926 for job in result['return']:
927 if job['device'] == job_id:
928 found = True
929 if job['paused'] == True and job['busy'] == False:
930 return job
931 break
932 assert found
934 def pause_job(self, job_id='job0', wait=True):
935 result = self.vm.qmp('block-job-pause', device=job_id)
936 self.assert_qmp(result, 'return', {})
937 if wait:
938 return self.pause_wait(job_id)
939 return result
941 def case_skip(self, reason):
942 '''Skip this test case'''
943 case_notrun(reason)
944 self.skipTest(reason)
947 def notrun(reason):
948 '''Skip this test suite'''
949 # Each test in qemu-iotests has a number ("seq")
950 seq = os.path.basename(sys.argv[0])
952 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
953 print('%s not run: %s' % (seq, reason))
954 sys.exit(0)
956 def case_notrun(reason):
957 '''Mark this test case as not having been run (without actually
958 skipping it, that is left to the caller). See
959 QMPTestCase.case_skip() for a variant that actually skips the
960 current test case.'''
962 # Each test in qemu-iotests has a number ("seq")
963 seq = os.path.basename(sys.argv[0])
965 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
966 ' [case not run] ' + reason + '\n')
968 def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
969 assert not (supported_fmts and unsupported_fmts)
971 if 'generic' in supported_fmts and \
972 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
973 # similar to
974 # _supported_fmt generic
975 # for bash tests
976 return
978 not_sup = supported_fmts and (imgfmt not in supported_fmts)
979 if not_sup or (imgfmt in unsupported_fmts):
980 notrun('not suitable for this image format: %s' % imgfmt)
982 def verify_protocol(supported=[], unsupported=[]):
983 assert not (supported and unsupported)
985 if 'generic' in supported:
986 return
988 not_sup = supported and (imgproto not in supported)
989 if not_sup or (imgproto in unsupported):
990 notrun('not suitable for this protocol: %s' % imgproto)
992 def verify_platform(supported=None, unsupported=None):
993 if unsupported is not None:
994 if any((sys.platform.startswith(x) for x in unsupported)):
995 notrun('not suitable for this OS: %s' % sys.platform)
997 if supported is not None:
998 if not any((sys.platform.startswith(x) for x in supported)):
999 notrun('not suitable for this OS: %s' % sys.platform)
1001 def verify_cache_mode(supported_cache_modes=[]):
1002 if supported_cache_modes and (cachemode not in supported_cache_modes):
1003 notrun('not suitable for this cache mode: %s' % cachemode)
1005 def verify_aio_mode(supported_aio_modes=[]):
1006 if supported_aio_modes and (aiomode not in supported_aio_modes):
1007 notrun('not suitable for this aio mode: %s' % aiomode)
1009 def supports_quorum():
1010 return 'quorum' in qemu_img_pipe('--help')
1012 def verify_quorum():
1013 '''Skip test suite if quorum support is not available'''
1014 if not supports_quorum():
1015 notrun('quorum support missing')
1017 def qemu_pipe(*args):
1018 '''Run qemu with an option to print something and exit (e.g. a help option),
1019 and return its output'''
1020 args = [qemu_prog] + qemu_opts + list(args)
1021 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
1022 stderr=subprocess.STDOUT,
1023 universal_newlines=True)
1024 exitcode = subp.wait()
1025 if exitcode < 0:
1026 sys.stderr.write('qemu received signal %i: %s\n' % (-exitcode,
1027 ' '.join(args)))
1028 return subp.communicate()[0]
1030 def supported_formats(read_only=False):
1031 '''Set 'read_only' to True to check ro-whitelist
1032 Otherwise, rw-whitelist is checked'''
1034 if not hasattr(supported_formats, "formats"):
1035 supported_formats.formats = {}
1037 if read_only not in supported_formats.formats:
1038 format_message = qemu_pipe("-drive", "format=help")
1039 line = 1 if read_only else 0
1040 supported_formats.formats[read_only] = \
1041 format_message.splitlines()[line].split(":")[1].split()
1043 return supported_formats.formats[read_only]
1045 def skip_if_unsupported(required_formats=[], read_only=False):
1046 '''Skip Test Decorator
1047 Runs the test if all the required formats are whitelisted'''
1048 def skip_test_decorator(func):
1049 def func_wrapper(test_case: QMPTestCase, *args, **kwargs):
1050 if callable(required_formats):
1051 fmts = required_formats(test_case)
1052 else:
1053 fmts = required_formats
1055 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1056 if usf_list:
1057 test_case.case_skip('{}: formats {} are not whitelisted'.format(
1058 test_case, usf_list))
1059 else:
1060 return func(test_case, *args, **kwargs)
1061 return func_wrapper
1062 return skip_test_decorator
1064 def skip_if_user_is_root(func):
1065 '''Skip Test Decorator
1066 Runs the test only without root permissions'''
1067 def func_wrapper(*args, **kwargs):
1068 if os.getuid() == 0:
1069 case_notrun('{}: cannot be run as root'.format(args[0]))
1070 else:
1071 return func(*args, **kwargs)
1072 return func_wrapper
1074 def execute_unittest(output, verbosity, debug):
1075 runner = unittest.TextTestRunner(stream=output, descriptions=True,
1076 verbosity=verbosity)
1077 try:
1078 # unittest.main() will use sys.exit(); so expect a SystemExit
1079 # exception
1080 unittest.main(testRunner=runner)
1081 finally:
1082 if not debug:
1083 out = output.getvalue()
1084 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1086 # Hide skipped tests from the reference output
1087 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1088 out_first_line, out_rest = out.split('\n', 1)
1089 out = out_first_line.replace('s', '.') + '\n' + out_rest
1091 sys.stderr.write(out)
1093 def execute_test(test_function=None,
1094 supported_fmts=[],
1095 supported_platforms=None,
1096 supported_cache_modes=[], supported_aio_modes={},
1097 unsupported_fmts=[], supported_protocols=[],
1098 unsupported_protocols=[]):
1099 """Run either unittest or script-style tests."""
1101 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1102 # indicate that we're not being run via "check". There may be
1103 # other things set up by "check" that individual test cases rely
1104 # on.
1105 if test_dir is None or qemu_default_machine is None:
1106 sys.stderr.write('Please run this test via the "check" script\n')
1107 sys.exit(os.EX_USAGE)
1109 debug = '-d' in sys.argv
1110 verbosity = 1
1111 verify_image_format(supported_fmts, unsupported_fmts)
1112 verify_protocol(supported_protocols, unsupported_protocols)
1113 verify_platform(supported=supported_platforms)
1114 verify_cache_mode(supported_cache_modes)
1115 verify_aio_mode(supported_aio_modes)
1117 if debug:
1118 output = sys.stdout
1119 verbosity = 2
1120 sys.argv.remove('-d')
1121 else:
1122 # We need to filter out the time taken from the output so that
1123 # qemu-iotest can reliably diff the results against master output.
1124 output = io.StringIO()
1126 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1128 if not test_function:
1129 execute_unittest(output, verbosity, debug)
1130 else:
1131 test_function()
1133 def script_main(test_function, *args, **kwargs):
1134 """Run script-style tests outside of the unittest framework"""
1135 execute_test(test_function, *args, **kwargs)
1137 def main(*args, **kwargs):
1138 """Run tests using the unittest framework"""
1139 execute_test(None, *args, **kwargs)