s390x: Move diagnose 308 subcodes and rcs into ipl.h
[qemu/ar7.git] / tests / qemu-iotests / iotests.py
blob7bc4934cd23fa71da2e43a56f7124f9a537499c3
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import errno
20 import os
21 import re
22 import subprocess
23 import string
24 import unittest
25 import sys
26 import struct
27 import json
28 import signal
29 import logging
30 import atexit
31 import io
32 from collections import OrderedDict
33 import faulthandler
35 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
36 from qemu import qtest
38 assert sys.version_info >= (3,6)
40 faulthandler.enable()
42 # This will not work if arguments contain spaces but is necessary if we
43 # want to support the override options that ./check supports.
44 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
45 if os.environ.get('QEMU_IMG_OPTIONS'):
46 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
48 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
49 if os.environ.get('QEMU_IO_OPTIONS'):
50 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
52 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
53 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
54 qemu_io_args_no_fmt += \
55 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
57 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
58 if os.environ.get('QEMU_NBD_OPTIONS'):
59 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
61 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
62 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
64 imgfmt = os.environ.get('IMGFMT', 'raw')
65 imgproto = os.environ.get('IMGPROTO', 'file')
66 test_dir = os.environ.get('TEST_DIR')
67 sock_dir = os.environ.get('SOCK_DIR')
68 output_dir = os.environ.get('OUTPUT_DIR', '.')
69 cachemode = os.environ.get('CACHEMODE')
70 aiomode = os.environ.get('AIOMODE')
71 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
73 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
75 luks_default_secret_object = 'secret,id=keysec0,data=' + \
76 os.environ.get('IMGKEYSECRET', '')
77 luks_default_key_secret_opt = 'key-secret=keysec0'
80 def qemu_img(*args):
81 '''Run qemu-img and return the exit code'''
82 devnull = open('/dev/null', 'r+')
83 exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
84 if exitcode < 0:
85 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
86 return exitcode
88 def ordered_qmp(qmsg, conv_keys=True):
89 # Dictionaries are not ordered prior to 3.6, therefore:
90 if isinstance(qmsg, list):
91 return [ordered_qmp(atom) for atom in qmsg]
92 if isinstance(qmsg, dict):
93 od = OrderedDict()
94 for k, v in sorted(qmsg.items()):
95 if conv_keys:
96 k = k.replace('_', '-')
97 od[k] = ordered_qmp(v, conv_keys=False)
98 return od
99 return qmsg
101 def qemu_img_create(*args):
102 args = list(args)
104 # default luks support
105 if '-f' in args and args[args.index('-f') + 1] == 'luks':
106 if '-o' in args:
107 i = args.index('-o')
108 if 'key-secret' not in args[i + 1]:
109 args[i + 1].append(luks_default_key_secret_opt)
110 args.insert(i + 2, '--object')
111 args.insert(i + 3, luks_default_secret_object)
112 else:
113 args = ['-o', luks_default_key_secret_opt,
114 '--object', luks_default_secret_object] + args
116 args.insert(0, 'create')
118 return qemu_img(*args)
120 def qemu_img_verbose(*args):
121 '''Run qemu-img without suppressing its output and return the exit code'''
122 exitcode = subprocess.call(qemu_img_args + list(args))
123 if exitcode < 0:
124 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
125 return exitcode
127 def qemu_img_pipe(*args):
128 '''Run qemu-img and return its output'''
129 subp = subprocess.Popen(qemu_img_args + list(args),
130 stdout=subprocess.PIPE,
131 stderr=subprocess.STDOUT,
132 universal_newlines=True)
133 exitcode = subp.wait()
134 if exitcode < 0:
135 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
136 return subp.communicate()[0]
138 def qemu_img_log(*args):
139 result = qemu_img_pipe(*args)
140 log(result, filters=[filter_testfiles])
141 return result
143 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]):
144 args = [ 'info' ]
145 if imgopts:
146 args.append('--image-opts')
147 else:
148 args += [ '-f', imgfmt ]
149 args += extra_args
150 args.append(filename)
152 output = qemu_img_pipe(*args)
153 if not filter_path:
154 filter_path = filename
155 log(filter_img_info(output, filter_path))
157 def qemu_io(*args):
158 '''Run qemu-io and return the stdout data'''
159 args = qemu_io_args + list(args)
160 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
161 stderr=subprocess.STDOUT,
162 universal_newlines=True)
163 exitcode = subp.wait()
164 if exitcode < 0:
165 sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
166 return subp.communicate()[0]
168 def qemu_io_log(*args):
169 result = qemu_io(*args)
170 log(result, filters=[filter_testfiles, filter_qemu_io])
171 return result
173 def qemu_io_silent(*args):
174 '''Run qemu-io and return the exit code, suppressing stdout'''
175 args = qemu_io_args + list(args)
176 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
177 if exitcode < 0:
178 sys.stderr.write('qemu-io received signal %i: %s\n' %
179 (-exitcode, ' '.join(args)))
180 return exitcode
182 def qemu_io_silent_check(*args):
183 '''Run qemu-io and return the true if subprocess returned 0'''
184 args = qemu_io_args + list(args)
185 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
186 stderr=subprocess.STDOUT)
187 return exitcode == 0
189 def get_virtio_scsi_device():
190 if qemu_default_machine == 's390-ccw-virtio':
191 return 'virtio-scsi-ccw'
192 return 'virtio-scsi-pci'
194 class QemuIoInteractive:
195 def __init__(self, *args):
196 self.args = qemu_io_args + list(args)
197 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
198 stdout=subprocess.PIPE,
199 stderr=subprocess.STDOUT,
200 universal_newlines=True)
201 assert self._p.stdout.read(9) == 'qemu-io> '
203 def close(self):
204 self._p.communicate('q\n')
206 def _read_output(self):
207 pattern = 'qemu-io> '
208 n = len(pattern)
209 pos = 0
210 s = []
211 while pos != n:
212 c = self._p.stdout.read(1)
213 # check unexpected EOF
214 assert c != ''
215 s.append(c)
216 if c == pattern[pos]:
217 pos += 1
218 else:
219 pos = 0
221 return ''.join(s[:-n])
223 def cmd(self, cmd):
224 # quit command is in close(), '\n' is added automatically
225 assert '\n' not in cmd
226 cmd = cmd.strip()
227 assert cmd != 'q' and cmd != 'quit'
228 self._p.stdin.write(cmd + '\n')
229 self._p.stdin.flush()
230 return self._read_output()
233 def qemu_nbd(*args):
234 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
235 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
237 def qemu_nbd_early_pipe(*args):
238 '''Run qemu-nbd in daemon mode and return both the parent's exit code
239 and its output in case of an error'''
240 subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
241 stdout=subprocess.PIPE,
242 stderr=subprocess.STDOUT,
243 universal_newlines=True)
244 exitcode = subp.wait()
245 if exitcode < 0:
246 sys.stderr.write('qemu-nbd received signal %i: %s\n' %
247 (-exitcode,
248 ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
249 if exitcode == 0:
250 return exitcode, ''
251 else:
252 return exitcode, subp.communicate()[0]
254 def qemu_nbd_popen(*args):
255 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
256 return subprocess.Popen(qemu_nbd_args + ['--persistent'] + list(args))
258 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
259 '''Return True if two image files are identical'''
260 return qemu_img('compare', '-f', fmt1,
261 '-F', fmt2, img1, img2) == 0
263 def create_image(name, size):
264 '''Create a fully-allocated raw image with sector markers'''
265 file = open(name, 'wb')
266 i = 0
267 while i < size:
268 sector = struct.pack('>l504xl', i // 512, i // 512)
269 file.write(sector)
270 i = i + 512
271 file.close()
273 def image_size(img):
274 '''Return image's virtual size'''
275 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
276 return json.loads(r)['virtual-size']
278 def is_str(val):
279 return isinstance(val, str)
281 test_dir_re = re.compile(r"%s" % test_dir)
282 def filter_test_dir(msg):
283 return test_dir_re.sub("TEST_DIR", msg)
285 win32_re = re.compile(r"\r")
286 def filter_win32(msg):
287 return win32_re.sub("", msg)
289 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
290 def filter_qemu_io(msg):
291 msg = filter_win32(msg)
292 return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
294 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
295 def filter_chown(msg):
296 return chown_re.sub("chown UID:GID", msg)
298 def filter_qmp_event(event):
299 '''Filter a QMP event dict'''
300 event = dict(event)
301 if 'timestamp' in event:
302 event['timestamp']['seconds'] = 'SECS'
303 event['timestamp']['microseconds'] = 'USECS'
304 return event
306 def filter_qmp(qmsg, filter_fn):
307 '''Given a string filter, filter a QMP object's values.
308 filter_fn takes a (key, value) pair.'''
309 # Iterate through either lists or dicts;
310 if isinstance(qmsg, list):
311 items = enumerate(qmsg)
312 else:
313 items = qmsg.items()
315 for k, v in items:
316 if isinstance(v, list) or isinstance(v, dict):
317 qmsg[k] = filter_qmp(v, filter_fn)
318 else:
319 qmsg[k] = filter_fn(k, v)
320 return qmsg
322 def filter_testfiles(msg):
323 prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
324 return msg.replace(prefix, 'TEST_DIR/PID-')
326 def filter_qmp_testfiles(qmsg):
327 def _filter(key, value):
328 if is_str(value):
329 return filter_testfiles(value)
330 return value
331 return filter_qmp(qmsg, _filter)
333 def filter_generated_node_ids(msg):
334 return re.sub("#block[0-9]+", "NODE_NAME", msg)
336 def filter_img_info(output, filename):
337 lines = []
338 for line in output.split('\n'):
339 if 'disk size' in line or 'actual-size' in line:
340 continue
341 line = line.replace(filename, 'TEST_IMG') \
342 .replace(imgfmt, 'IMGFMT')
343 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
344 line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
345 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
346 lines.append(line)
347 return '\n'.join(lines)
349 def filter_imgfmt(msg):
350 return msg.replace(imgfmt, 'IMGFMT')
352 def filter_qmp_imgfmt(qmsg):
353 def _filter(key, value):
354 if is_str(value):
355 return filter_imgfmt(value)
356 return value
357 return filter_qmp(qmsg, _filter)
359 def log(msg, filters=[], indent=None):
360 '''Logs either a string message or a JSON serializable message (like QMP).
361 If indent is provided, JSON serializable messages are pretty-printed.'''
362 for flt in filters:
363 msg = flt(msg)
364 if isinstance(msg, dict) or isinstance(msg, list):
365 # Python < 3.4 needs to know not to add whitespace when pretty-printing:
366 separators = (', ', ': ') if indent is None else (',', ': ')
367 # Don't sort if it's already sorted
368 do_sort = not isinstance(msg, OrderedDict)
369 print(json.dumps(msg, sort_keys=do_sort,
370 indent=indent, separators=separators))
371 else:
372 print(msg)
374 class Timeout:
375 def __init__(self, seconds, errmsg = "Timeout"):
376 self.seconds = seconds
377 self.errmsg = errmsg
378 def __enter__(self):
379 signal.signal(signal.SIGALRM, self.timeout)
380 signal.setitimer(signal.ITIMER_REAL, self.seconds)
381 return self
382 def __exit__(self, type, value, traceback):
383 signal.setitimer(signal.ITIMER_REAL, 0)
384 return False
385 def timeout(self, signum, frame):
386 raise Exception(self.errmsg)
388 def file_pattern(name):
389 return "{0}-{1}".format(os.getpid(), name)
391 class FilePaths(object):
393 FilePaths is an auto-generated filename that cleans itself up.
395 Use this context manager to generate filenames and ensure that the file
396 gets deleted::
398 with FilePaths(['test.img']) as img_path:
399 qemu_img('create', img_path, '1G')
400 # migration_sock_path is automatically deleted
402 def __init__(self, names, base_dir=test_dir):
403 self.paths = []
404 for name in names:
405 self.paths.append(os.path.join(base_dir, file_pattern(name)))
407 def __enter__(self):
408 return self.paths
410 def __exit__(self, exc_type, exc_val, exc_tb):
411 try:
412 for path in self.paths:
413 os.remove(path)
414 except OSError:
415 pass
416 return False
418 class FilePath(FilePaths):
420 FilePath is a specialization of FilePaths that takes a single filename.
422 def __init__(self, name, base_dir=test_dir):
423 super(FilePath, self).__init__([name], base_dir)
425 def __enter__(self):
426 return self.paths[0]
428 def file_path_remover():
429 for path in reversed(file_path_remover.paths):
430 try:
431 os.remove(path)
432 except OSError:
433 pass
436 def file_path(*names, base_dir=test_dir):
437 ''' Another way to get auto-generated filename that cleans itself up.
439 Use is as simple as:
441 img_a, img_b = file_path('a.img', 'b.img')
442 sock = file_path('socket')
445 if not hasattr(file_path_remover, 'paths'):
446 file_path_remover.paths = []
447 atexit.register(file_path_remover)
449 paths = []
450 for name in names:
451 filename = file_pattern(name)
452 path = os.path.join(base_dir, filename)
453 file_path_remover.paths.append(path)
454 paths.append(path)
456 return paths[0] if len(paths) == 1 else paths
458 def remote_filename(path):
459 if imgproto == 'file':
460 return path
461 elif imgproto == 'ssh':
462 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
463 else:
464 raise Exception("Protocol %s not supported" % (imgproto))
466 class VM(qtest.QEMUQtestMachine):
467 '''A QEMU VM'''
469 def __init__(self, path_suffix=''):
470 name = "qemu%s-%d" % (path_suffix, os.getpid())
471 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
472 test_dir=test_dir,
473 socket_scm_helper=socket_scm_helper,
474 sock_dir=sock_dir)
475 self._num_drives = 0
477 def add_object(self, opts):
478 self._args.append('-object')
479 self._args.append(opts)
480 return self
482 def add_device(self, opts):
483 self._args.append('-device')
484 self._args.append(opts)
485 return self
487 def add_drive_raw(self, opts):
488 self._args.append('-drive')
489 self._args.append(opts)
490 return self
492 def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
493 '''Add a virtio-blk drive to the VM'''
494 options = ['if=%s' % interface,
495 'id=drive%d' % self._num_drives]
497 if path is not None:
498 options.append('file=%s' % path)
499 options.append('format=%s' % format)
500 options.append('cache=%s' % cachemode)
501 options.append('aio=%s' % aiomode)
503 if opts:
504 options.append(opts)
506 if format == 'luks' and 'key-secret' not in opts:
507 # default luks support
508 if luks_default_secret_object not in self._args:
509 self.add_object(luks_default_secret_object)
511 options.append(luks_default_key_secret_opt)
513 self._args.append('-drive')
514 self._args.append(','.join(options))
515 self._num_drives += 1
516 return self
518 def add_blockdev(self, opts):
519 self._args.append('-blockdev')
520 if isinstance(opts, str):
521 self._args.append(opts)
522 else:
523 self._args.append(','.join(opts))
524 return self
526 def add_incoming(self, addr):
527 self._args.append('-incoming')
528 self._args.append(addr)
529 return self
531 def pause_drive(self, drive, event=None):
532 '''Pause drive r/w operations'''
533 if not event:
534 self.pause_drive(drive, "read_aio")
535 self.pause_drive(drive, "write_aio")
536 return
537 self.qmp('human-monitor-command',
538 command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
540 def resume_drive(self, drive):
541 self.qmp('human-monitor-command',
542 command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
544 def hmp_qemu_io(self, drive, cmd):
545 '''Write to a given drive using an HMP command'''
546 return self.qmp('human-monitor-command',
547 command_line='qemu-io %s "%s"' % (drive, cmd))
549 def flatten_qmp_object(self, obj, output=None, basestr=''):
550 if output is None:
551 output = dict()
552 if isinstance(obj, list):
553 for i in range(len(obj)):
554 self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
555 elif isinstance(obj, dict):
556 for key in obj:
557 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
558 else:
559 output[basestr[:-1]] = obj # Strip trailing '.'
560 return output
562 def qmp_to_opts(self, obj):
563 obj = self.flatten_qmp_object(obj)
564 output_list = list()
565 for key in obj:
566 output_list += [key + '=' + obj[key]]
567 return ','.join(output_list)
569 def get_qmp_events_filtered(self, wait=60.0):
570 result = []
571 for ev in self.get_qmp_events(wait=wait):
572 result.append(filter_qmp_event(ev))
573 return result
575 def qmp_log(self, cmd, filters=[], indent=None, **kwargs):
576 full_cmd = OrderedDict((
577 ("execute", cmd),
578 ("arguments", ordered_qmp(kwargs))
580 log(full_cmd, filters, indent=indent)
581 result = self.qmp(cmd, **kwargs)
582 log(result, filters, indent=indent)
583 return result
585 # Returns None on success, and an error string on failure
586 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
587 pre_finalize=None, cancel=False, use_log=True, wait=60.0):
589 run_job moves a job from creation through to dismissal.
591 :param job: String. ID of recently-launched job
592 :param auto_finalize: Bool. True if the job was launched with
593 auto_finalize. Defaults to True.
594 :param auto_dismiss: Bool. True if the job was launched with
595 auto_dismiss=True. Defaults to False.
596 :param pre_finalize: Callback. A callable that takes no arguments to be
597 invoked prior to issuing job-finalize, if any.
598 :param cancel: Bool. When true, cancels the job after the pre_finalize
599 callback.
600 :param use_log: Bool. When false, does not log QMP messages.
601 :param wait: Float. Timeout value specifying how long to wait for any
602 event, in seconds. Defaults to 60.0.
604 match_device = {'data': {'device': job}}
605 match_id = {'data': {'id': job}}
606 events = [
607 ('BLOCK_JOB_COMPLETED', match_device),
608 ('BLOCK_JOB_CANCELLED', match_device),
609 ('BLOCK_JOB_ERROR', match_device),
610 ('BLOCK_JOB_READY', match_device),
611 ('BLOCK_JOB_PENDING', match_id),
612 ('JOB_STATUS_CHANGE', match_id)
614 error = None
615 while True:
616 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
617 if ev['event'] != 'JOB_STATUS_CHANGE':
618 if use_log:
619 log(ev)
620 continue
621 status = ev['data']['status']
622 if status == 'aborting':
623 result = self.qmp('query-jobs')
624 for j in result['return']:
625 if j['id'] == job:
626 error = j['error']
627 if use_log:
628 log('Job failed: %s' % (j['error']))
629 elif status == 'ready':
630 if use_log:
631 self.qmp_log('job-complete', id=job)
632 else:
633 self.qmp('job-complete', id=job)
634 elif status == 'pending' and not auto_finalize:
635 if pre_finalize:
636 pre_finalize()
637 if cancel and use_log:
638 self.qmp_log('job-cancel', id=job)
639 elif cancel:
640 self.qmp('job-cancel', id=job)
641 elif use_log:
642 self.qmp_log('job-finalize', id=job)
643 else:
644 self.qmp('job-finalize', id=job)
645 elif status == 'concluded' and not auto_dismiss:
646 if use_log:
647 self.qmp_log('job-dismiss', id=job)
648 else:
649 self.qmp('job-dismiss', id=job)
650 elif status == 'null':
651 return error
653 # Returns None on success, and an error string on failure
654 def blockdev_create(self, options, job_id='job0', filters=None):
655 if filters is None:
656 filters = [filter_qmp_testfiles]
657 result = self.qmp_log('blockdev-create', filters=filters,
658 job_id=job_id, options=options)
660 if 'return' in result:
661 assert result['return'] == {}
662 job_result = self.run_job(job_id)
663 else:
664 job_result = result['error']
666 log("")
667 return job_result
669 def enable_migration_events(self, name):
670 log('Enabling migration QMP events on %s...' % name)
671 log(self.qmp('migrate-set-capabilities', capabilities=[
673 'capability': 'events',
674 'state': True
678 def wait_migration(self, expect_runstate):
679 while True:
680 event = self.event_wait('MIGRATION')
681 log(event, filters=[filter_qmp_event])
682 if event['data']['status'] == 'completed':
683 break
684 # The event may occur in finish-migrate, so wait for the expected
685 # post-migration runstate
686 while self.qmp('query-status')['return']['status'] != expect_runstate:
687 pass
689 def node_info(self, node_name):
690 nodes = self.qmp('query-named-block-nodes')
691 for x in nodes['return']:
692 if x['node-name'] == node_name:
693 return x
694 return None
696 def query_bitmaps(self):
697 res = self.qmp("query-named-block-nodes")
698 return {device['node-name']: device['dirty-bitmaps']
699 for device in res['return'] if 'dirty-bitmaps' in device}
701 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
703 get a specific bitmap from the object returned by query_bitmaps.
704 :param recording: If specified, filter results by the specified value.
705 :param bitmaps: If specified, use it instead of call query_bitmaps()
707 if bitmaps is None:
708 bitmaps = self.query_bitmaps()
710 for bitmap in bitmaps[node_name]:
711 if bitmap.get('name', '') == bitmap_name:
712 if recording is None:
713 return bitmap
714 elif bitmap.get('recording') == recording:
715 return bitmap
716 return None
718 def check_bitmap_status(self, node_name, bitmap_name, fields):
719 ret = self.get_bitmap(node_name, bitmap_name)
721 return fields.items() <= ret.items()
723 def assert_block_path(self, root, path, expected_node, graph=None):
725 Check whether the node under the given path in the block graph
726 is @expected_node.
728 @root is the node name of the node where the @path is rooted.
730 @path is a string that consists of child names separated by
731 slashes. It must begin with a slash.
733 Examples for @root + @path:
734 - root="qcow2-node", path="/backing/file"
735 - root="quorum-node", path="/children.2/file"
737 Hypothetically, @path could be empty, in which case it would
738 point to @root. However, in practice this case is not useful
739 and hence not allowed.
741 @expected_node may be None. (All elements of the path but the
742 leaf must still exist.)
744 @graph may be None or the result of an x-debug-query-block-graph
745 call that has already been performed.
747 if graph is None:
748 graph = self.qmp('x-debug-query-block-graph')['return']
750 iter_path = iter(path.split('/'))
752 # Must start with a /
753 assert next(iter_path) == ''
755 node = next((node for node in graph['nodes'] if node['name'] == root),
756 None)
758 # An empty @path is not allowed, so the root node must be present
759 assert node is not None, 'Root node %s not found' % root
761 for child_name in iter_path:
762 assert node is not None, 'Cannot follow path %s%s' % (root, path)
764 try:
765 node_id = next(edge['child'] for edge in graph['edges'] \
766 if edge['parent'] == node['id'] and
767 edge['name'] == child_name)
769 node = next(node for node in graph['nodes'] \
770 if node['id'] == node_id)
771 except StopIteration:
772 node = None
774 if node is None:
775 assert expected_node is None, \
776 'No node found under %s (but expected %s)' % \
777 (path, expected_node)
778 else:
779 assert node['name'] == expected_node, \
780 'Found node %s under %s (but expected %s)' % \
781 (node['name'], path, expected_node)
783 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
785 class QMPTestCase(unittest.TestCase):
786 '''Abstract base class for QMP test cases'''
788 def dictpath(self, d, path):
789 '''Traverse a path in a nested dict'''
790 for component in path.split('/'):
791 m = index_re.match(component)
792 if m:
793 component, idx = m.groups()
794 idx = int(idx)
796 if not isinstance(d, dict) or component not in d:
797 self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
798 d = d[component]
800 if m:
801 if not isinstance(d, list):
802 self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
803 try:
804 d = d[idx]
805 except IndexError:
806 self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
807 return d
809 def assert_qmp_absent(self, d, path):
810 try:
811 result = self.dictpath(d, path)
812 except AssertionError:
813 return
814 self.fail('path "%s" has value "%s"' % (path, str(result)))
816 def assert_qmp(self, d, path, value):
817 '''Assert that the value for a specific path in a QMP dict
818 matches. When given a list of values, assert that any of
819 them matches.'''
821 result = self.dictpath(d, path)
823 # [] makes no sense as a list of valid values, so treat it as
824 # an actual single value.
825 if isinstance(value, list) and value != []:
826 for v in value:
827 if result == v:
828 return
829 self.fail('no match for "%s" in %s' % (str(result), str(value)))
830 else:
831 self.assertEqual(result, value,
832 '"%s" is "%s", expected "%s"'
833 % (path, str(result), str(value)))
835 def assert_no_active_block_jobs(self):
836 result = self.vm.qmp('query-block-jobs')
837 self.assert_qmp(result, 'return', [])
839 def assert_has_block_node(self, node_name=None, file_name=None):
840 """Issue a query-named-block-nodes and assert node_name and/or
841 file_name is present in the result"""
842 def check_equal_or_none(a, b):
843 return a == None or b == None or a == b
844 assert node_name or file_name
845 result = self.vm.qmp('query-named-block-nodes')
846 for x in result["return"]:
847 if check_equal_or_none(x.get("node-name"), node_name) and \
848 check_equal_or_none(x.get("file"), file_name):
849 return
850 self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
851 (node_name, file_name, result))
853 def assert_json_filename_equal(self, json_filename, reference):
854 '''Asserts that the given filename is a json: filename and that its
855 content is equal to the given reference object'''
856 self.assertEqual(json_filename[:5], 'json:')
857 self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
858 self.vm.flatten_qmp_object(reference))
860 def cancel_and_wait(self, drive='drive0', force=False, resume=False, wait=60.0):
861 '''Cancel a block job and wait for it to finish, returning the event'''
862 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
863 self.assert_qmp(result, 'return', {})
865 if resume:
866 self.vm.resume_drive(drive)
868 cancelled = False
869 result = None
870 while not cancelled:
871 for event in self.vm.get_qmp_events(wait=wait):
872 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
873 event['event'] == 'BLOCK_JOB_CANCELLED':
874 self.assert_qmp(event, 'data/device', drive)
875 result = event
876 cancelled = True
877 elif event['event'] == 'JOB_STATUS_CHANGE':
878 self.assert_qmp(event, 'data/id', drive)
881 self.assert_no_active_block_jobs()
882 return result
884 def wait_until_completed(self, drive='drive0', check_offset=True, wait=60.0,
885 error=None):
886 '''Wait for a block job to finish, returning the event'''
887 while True:
888 for event in self.vm.get_qmp_events(wait=wait):
889 if event['event'] == 'BLOCK_JOB_COMPLETED':
890 self.assert_qmp(event, 'data/device', drive)
891 if error is None:
892 self.assert_qmp_absent(event, 'data/error')
893 if check_offset:
894 self.assert_qmp(event, 'data/offset',
895 event['data']['len'])
896 else:
897 self.assert_qmp(event, 'data/error', error)
898 self.assert_no_active_block_jobs()
899 return event
900 elif event['event'] == 'JOB_STATUS_CHANGE':
901 self.assert_qmp(event, 'data/id', drive)
903 def wait_ready(self, drive='drive0'):
904 '''Wait until a block job BLOCK_JOB_READY event'''
905 f = {'data': {'type': 'mirror', 'device': drive } }
906 event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
908 def wait_ready_and_cancel(self, drive='drive0'):
909 self.wait_ready(drive=drive)
910 event = self.cancel_and_wait(drive=drive)
911 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
912 self.assert_qmp(event, 'data/type', 'mirror')
913 self.assert_qmp(event, 'data/offset', event['data']['len'])
915 def complete_and_wait(self, drive='drive0', wait_ready=True,
916 completion_error=None):
917 '''Complete a block job and wait for it to finish'''
918 if wait_ready:
919 self.wait_ready(drive=drive)
921 result = self.vm.qmp('block-job-complete', device=drive)
922 self.assert_qmp(result, 'return', {})
924 event = self.wait_until_completed(drive=drive, error=completion_error)
925 self.assert_qmp(event, 'data/type', 'mirror')
927 def pause_wait(self, job_id='job0'):
928 with Timeout(3, "Timeout waiting for job to pause"):
929 while True:
930 result = self.vm.qmp('query-block-jobs')
931 found = False
932 for job in result['return']:
933 if job['device'] == job_id:
934 found = True
935 if job['paused'] == True and job['busy'] == False:
936 return job
937 break
938 assert found
940 def pause_job(self, job_id='job0', wait=True):
941 result = self.vm.qmp('block-job-pause', device=job_id)
942 self.assert_qmp(result, 'return', {})
943 if wait:
944 return self.pause_wait(job_id)
945 return result
947 def case_skip(self, reason):
948 '''Skip this test case'''
949 case_notrun(reason)
950 self.skipTest(reason)
953 def notrun(reason):
954 '''Skip this test suite'''
955 # Each test in qemu-iotests has a number ("seq")
956 seq = os.path.basename(sys.argv[0])
958 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
959 print('%s not run: %s' % (seq, reason))
960 sys.exit(0)
962 def case_notrun(reason):
963 '''Mark this test case as not having been run (without actually
964 skipping it, that is left to the caller). See
965 QMPTestCase.case_skip() for a variant that actually skips the
966 current test case.'''
968 # Each test in qemu-iotests has a number ("seq")
969 seq = os.path.basename(sys.argv[0])
971 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
972 ' [case not run] ' + reason + '\n')
974 def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
975 assert not (supported_fmts and unsupported_fmts)
977 if 'generic' in supported_fmts and \
978 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
979 # similar to
980 # _supported_fmt generic
981 # for bash tests
982 return
984 not_sup = supported_fmts and (imgfmt not in supported_fmts)
985 if not_sup or (imgfmt in unsupported_fmts):
986 notrun('not suitable for this image format: %s' % imgfmt)
988 def verify_protocol(supported=[], unsupported=[]):
989 assert not (supported and unsupported)
991 if 'generic' in supported:
992 return
994 not_sup = supported and (imgproto not in supported)
995 if not_sup or (imgproto in unsupported):
996 notrun('not suitable for this protocol: %s' % imgproto)
998 def verify_platform(supported=None, unsupported=None):
999 if unsupported is not None:
1000 if any((sys.platform.startswith(x) for x in unsupported)):
1001 notrun('not suitable for this OS: %s' % sys.platform)
1003 if supported is not None:
1004 if not any((sys.platform.startswith(x) for x in supported)):
1005 notrun('not suitable for this OS: %s' % sys.platform)
1007 def verify_cache_mode(supported_cache_modes=[]):
1008 if supported_cache_modes and (cachemode not in supported_cache_modes):
1009 notrun('not suitable for this cache mode: %s' % cachemode)
1011 def verify_aio_mode(supported_aio_modes=[]):
1012 if supported_aio_modes and (aiomode not in supported_aio_modes):
1013 notrun('not suitable for this aio mode: %s' % aiomode)
1015 def supports_quorum():
1016 return 'quorum' in qemu_img_pipe('--help')
1018 def verify_quorum():
1019 '''Skip test suite if quorum support is not available'''
1020 if not supports_quorum():
1021 notrun('quorum support missing')
1023 def qemu_pipe(*args):
1024 '''Run qemu with an option to print something and exit (e.g. a help option),
1025 and return its output'''
1026 args = [qemu_prog] + qemu_opts + list(args)
1027 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
1028 stderr=subprocess.STDOUT,
1029 universal_newlines=True)
1030 exitcode = subp.wait()
1031 if exitcode < 0:
1032 sys.stderr.write('qemu received signal %i: %s\n' % (-exitcode,
1033 ' '.join(args)))
1034 return subp.communicate()[0]
1036 def supported_formats(read_only=False):
1037 '''Set 'read_only' to True to check ro-whitelist
1038 Otherwise, rw-whitelist is checked'''
1040 if not hasattr(supported_formats, "formats"):
1041 supported_formats.formats = {}
1043 if read_only not in supported_formats.formats:
1044 format_message = qemu_pipe("-drive", "format=help")
1045 line = 1 if read_only else 0
1046 supported_formats.formats[read_only] = \
1047 format_message.splitlines()[line].split(":")[1].split()
1049 return supported_formats.formats[read_only]
1051 def skip_if_unsupported(required_formats=[], read_only=False):
1052 '''Skip Test Decorator
1053 Runs the test if all the required formats are whitelisted'''
1054 def skip_test_decorator(func):
1055 def func_wrapper(test_case: QMPTestCase, *args, **kwargs):
1056 if callable(required_formats):
1057 fmts = required_formats(test_case)
1058 else:
1059 fmts = required_formats
1061 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1062 if usf_list:
1063 test_case.case_skip('{}: formats {} are not whitelisted'.format(
1064 test_case, usf_list))
1065 else:
1066 return func(test_case, *args, **kwargs)
1067 return func_wrapper
1068 return skip_test_decorator
1070 def skip_if_user_is_root(func):
1071 '''Skip Test Decorator
1072 Runs the test only without root permissions'''
1073 def func_wrapper(*args, **kwargs):
1074 if os.getuid() == 0:
1075 case_notrun('{}: cannot be run as root'.format(args[0]))
1076 else:
1077 return func(*args, **kwargs)
1078 return func_wrapper
1080 def execute_unittest(output, verbosity, debug):
1081 runner = unittest.TextTestRunner(stream=output, descriptions=True,
1082 verbosity=verbosity)
1083 try:
1084 # unittest.main() will use sys.exit(); so expect a SystemExit
1085 # exception
1086 unittest.main(testRunner=runner)
1087 finally:
1088 if not debug:
1089 out = output.getvalue()
1090 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1092 # Hide skipped tests from the reference output
1093 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1094 out_first_line, out_rest = out.split('\n', 1)
1095 out = out_first_line.replace('s', '.') + '\n' + out_rest
1097 sys.stderr.write(out)
1099 def execute_test(test_function=None,
1100 supported_fmts=[],
1101 supported_platforms=None,
1102 supported_cache_modes=[], supported_aio_modes={},
1103 unsupported_fmts=[], supported_protocols=[],
1104 unsupported_protocols=[]):
1105 """Run either unittest or script-style tests."""
1107 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1108 # indicate that we're not being run via "check". There may be
1109 # other things set up by "check" that individual test cases rely
1110 # on.
1111 if test_dir is None or qemu_default_machine is None:
1112 sys.stderr.write('Please run this test via the "check" script\n')
1113 sys.exit(os.EX_USAGE)
1115 debug = '-d' in sys.argv
1116 verbosity = 1
1117 verify_image_format(supported_fmts, unsupported_fmts)
1118 verify_protocol(supported_protocols, unsupported_protocols)
1119 verify_platform(supported=supported_platforms)
1120 verify_cache_mode(supported_cache_modes)
1121 verify_aio_mode(supported_aio_modes)
1123 if debug:
1124 output = sys.stdout
1125 verbosity = 2
1126 sys.argv.remove('-d')
1127 else:
1128 # We need to filter out the time taken from the output so that
1129 # qemu-iotest can reliably diff the results against master output.
1130 output = io.StringIO()
1132 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1134 if not test_function:
1135 execute_unittest(output, verbosity, debug)
1136 else:
1137 test_function()
1139 def script_main(test_function, *args, **kwargs):
1140 """Run script-style tests outside of the unittest framework"""
1141 execute_test(test_function, *args, **kwargs)
1143 def main(*args, **kwargs):
1144 """Run tests using the unittest framework"""
1145 execute_test(None, *args, **kwargs)