nbd/client: Work around 3.0 bug for listing meta contexts
[qemu/ar7.git] / tests / qemu-iotests / iotests.py
blobcbedfaf1df670ca29cf2838af370785d051142e6
1 from __future__ import print_function
2 # Common utilities and Python wrappers for qemu-iotests
4 # Copyright (C) 2012 IBM Corp.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 import errno
21 import os
22 import re
23 import subprocess
24 import string
25 import unittest
26 import sys
27 import struct
28 import json
29 import signal
30 import logging
31 import atexit
32 import io
33 from collections import OrderedDict
35 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
36 import qtest
39 # This will not work if arguments contain spaces but is necessary if we
40 # want to support the override options that ./check supports.
41 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
42 if os.environ.get('QEMU_IMG_OPTIONS'):
43 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
45 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
46 if os.environ.get('QEMU_IO_OPTIONS'):
47 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
49 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
50 if os.environ.get('QEMU_NBD_OPTIONS'):
51 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
53 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
54 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
56 imgfmt = os.environ.get('IMGFMT', 'raw')
57 imgproto = os.environ.get('IMGPROTO', 'file')
58 test_dir = os.environ.get('TEST_DIR')
59 output_dir = os.environ.get('OUTPUT_DIR', '.')
60 cachemode = os.environ.get('CACHEMODE')
61 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
63 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
64 debug = False
66 luks_default_secret_object = 'secret,id=keysec0,data=' + \
67 os.environ.get('IMGKEYSECRET', '')
68 luks_default_key_secret_opt = 'key-secret=keysec0'
71 def qemu_img(*args):
72 '''Run qemu-img and return the exit code'''
73 devnull = open('/dev/null', 'r+')
74 exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
75 if exitcode < 0:
76 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
77 return exitcode
79 def ordered_kwargs(kwargs):
80 # kwargs prior to 3.6 are not ordered, so:
81 od = OrderedDict()
82 for k, v in sorted(kwargs.items()):
83 if isinstance(v, dict):
84 od[k] = ordered_kwargs(v)
85 else:
86 od[k] = v
87 return od
89 def qemu_img_create(*args):
90 args = list(args)
92 # default luks support
93 if '-f' in args and args[args.index('-f') + 1] == 'luks':
94 if '-o' in args:
95 i = args.index('-o')
96 if 'key-secret' not in args[i + 1]:
97 args[i + 1].append(luks_default_key_secret_opt)
98 args.insert(i + 2, '--object')
99 args.insert(i + 3, luks_default_secret_object)
100 else:
101 args = ['-o', luks_default_key_secret_opt,
102 '--object', luks_default_secret_object] + args
104 args.insert(0, 'create')
106 return qemu_img(*args)
108 def qemu_img_verbose(*args):
109 '''Run qemu-img without suppressing its output and return the exit code'''
110 exitcode = subprocess.call(qemu_img_args + list(args))
111 if exitcode < 0:
112 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
113 return exitcode
115 def qemu_img_pipe(*args):
116 '''Run qemu-img and return its output'''
117 subp = subprocess.Popen(qemu_img_args + list(args),
118 stdout=subprocess.PIPE,
119 stderr=subprocess.STDOUT,
120 universal_newlines=True)
121 exitcode = subp.wait()
122 if exitcode < 0:
123 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
124 return subp.communicate()[0]
126 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]):
127 args = [ 'info' ]
128 if imgopts:
129 args.append('--image-opts')
130 else:
131 args += [ '-f', imgfmt ]
132 args += extra_args
133 args.append(filename)
135 output = qemu_img_pipe(*args)
136 if not filter_path:
137 filter_path = filename
138 log(filter_img_info(output, filter_path))
140 def qemu_io(*args):
141 '''Run qemu-io and return the stdout data'''
142 args = qemu_io_args + list(args)
143 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
144 stderr=subprocess.STDOUT,
145 universal_newlines=True)
146 exitcode = subp.wait()
147 if exitcode < 0:
148 sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
149 return subp.communicate()[0]
151 def qemu_io_silent(*args):
152 '''Run qemu-io and return the exit code, suppressing stdout'''
153 args = qemu_io_args + list(args)
154 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
155 if exitcode < 0:
156 sys.stderr.write('qemu-io received signal %i: %s\n' %
157 (-exitcode, ' '.join(args)))
158 return exitcode
161 class QemuIoInteractive:
162 def __init__(self, *args):
163 self.args = qemu_io_args + list(args)
164 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
165 stdout=subprocess.PIPE,
166 stderr=subprocess.STDOUT,
167 universal_newlines=True)
168 assert self._p.stdout.read(9) == 'qemu-io> '
170 def close(self):
171 self._p.communicate('q\n')
173 def _read_output(self):
174 pattern = 'qemu-io> '
175 n = len(pattern)
176 pos = 0
177 s = []
178 while pos != n:
179 c = self._p.stdout.read(1)
180 # check unexpected EOF
181 assert c != ''
182 s.append(c)
183 if c == pattern[pos]:
184 pos += 1
185 else:
186 pos = 0
188 return ''.join(s[:-n])
190 def cmd(self, cmd):
191 # quit command is in close(), '\n' is added automatically
192 assert '\n' not in cmd
193 cmd = cmd.strip()
194 assert cmd != 'q' and cmd != 'quit'
195 self._p.stdin.write(cmd + '\n')
196 self._p.stdin.flush()
197 return self._read_output()
200 def qemu_nbd(*args):
201 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
202 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
204 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
205 '''Return True if two image files are identical'''
206 return qemu_img('compare', '-f', fmt1,
207 '-F', fmt2, img1, img2) == 0
209 def create_image(name, size):
210 '''Create a fully-allocated raw image with sector markers'''
211 file = open(name, 'wb')
212 i = 0
213 while i < size:
214 sector = struct.pack('>l504xl', i // 512, i // 512)
215 file.write(sector)
216 i = i + 512
217 file.close()
219 def image_size(img):
220 '''Return image's virtual size'''
221 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
222 return json.loads(r)['virtual-size']
224 test_dir_re = re.compile(r"%s" % test_dir)
225 def filter_test_dir(msg):
226 return test_dir_re.sub("TEST_DIR", msg)
228 win32_re = re.compile(r"\r")
229 def filter_win32(msg):
230 return win32_re.sub("", msg)
232 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
233 def filter_qemu_io(msg):
234 msg = filter_win32(msg)
235 return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
237 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
238 def filter_chown(msg):
239 return chown_re.sub("chown UID:GID", msg)
241 def filter_qmp_event(event):
242 '''Filter a QMP event dict'''
243 event = dict(event)
244 if 'timestamp' in event:
245 event['timestamp']['seconds'] = 'SECS'
246 event['timestamp']['microseconds'] = 'USECS'
247 return event
249 def filter_qmp(qmsg, filter_fn):
250 '''Given a string filter, filter a QMP object's values.
251 filter_fn takes a (key, value) pair.'''
252 # Iterate through either lists or dicts;
253 if isinstance(qmsg, list):
254 items = enumerate(qmsg)
255 else:
256 items = qmsg.items()
258 for k, v in items:
259 if isinstance(v, list) or isinstance(v, dict):
260 qmsg[k] = filter_qmp(v, filter_fn)
261 else:
262 qmsg[k] = filter_fn(k, v)
263 return qmsg
265 def filter_testfiles(msg):
266 prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
267 return msg.replace(prefix, 'TEST_DIR/PID-')
269 def filter_qmp_testfiles(qmsg):
270 def _filter(key, value):
271 if key == 'filename' or key == 'backing-file':
272 return filter_testfiles(value)
273 return value
274 return filter_qmp(qmsg, _filter)
276 def filter_generated_node_ids(msg):
277 return re.sub("#block[0-9]+", "NODE_NAME", msg)
279 def filter_img_info(output, filename):
280 lines = []
281 for line in output.split('\n'):
282 if 'disk size' in line or 'actual-size' in line:
283 continue
284 line = line.replace(filename, 'TEST_IMG') \
285 .replace(imgfmt, 'IMGFMT')
286 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
287 line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
288 lines.append(line)
289 return '\n'.join(lines)
291 def log(msg, filters=[], indent=None):
292 '''Logs either a string message or a JSON serializable message (like QMP).
293 If indent is provided, JSON serializable messages are pretty-printed.'''
294 for flt in filters:
295 msg = flt(msg)
296 if isinstance(msg, dict) or isinstance(msg, list):
297 # Python < 3.4 needs to know not to add whitespace when pretty-printing:
298 separators = (', ', ': ') if indent is None else (',', ': ')
299 # Don't sort if it's already sorted
300 do_sort = not isinstance(msg, OrderedDict)
301 print(json.dumps(msg, sort_keys=do_sort,
302 indent=indent, separators=separators))
303 else:
304 print(msg)
306 class Timeout:
307 def __init__(self, seconds, errmsg = "Timeout"):
308 self.seconds = seconds
309 self.errmsg = errmsg
310 def __enter__(self):
311 signal.signal(signal.SIGALRM, self.timeout)
312 signal.setitimer(signal.ITIMER_REAL, self.seconds)
313 return self
314 def __exit__(self, type, value, traceback):
315 signal.setitimer(signal.ITIMER_REAL, 0)
316 return False
317 def timeout(self, signum, frame):
318 raise Exception(self.errmsg)
321 class FilePath(object):
322 '''An auto-generated filename that cleans itself up.
324 Use this context manager to generate filenames and ensure that the file
325 gets deleted::
327 with TestFilePath('test.img') as img_path:
328 qemu_img('create', img_path, '1G')
329 # migration_sock_path is automatically deleted
331 def __init__(self, name):
332 filename = '{0}-{1}'.format(os.getpid(), name)
333 self.path = os.path.join(test_dir, filename)
335 def __enter__(self):
336 return self.path
338 def __exit__(self, exc_type, exc_val, exc_tb):
339 try:
340 os.remove(self.path)
341 except OSError:
342 pass
343 return False
346 def file_path_remover():
347 for path in reversed(file_path_remover.paths):
348 try:
349 os.remove(path)
350 except OSError:
351 pass
354 def file_path(*names):
355 ''' Another way to get auto-generated filename that cleans itself up.
357 Use is as simple as:
359 img_a, img_b = file_path('a.img', 'b.img')
360 sock = file_path('socket')
363 if not hasattr(file_path_remover, 'paths'):
364 file_path_remover.paths = []
365 atexit.register(file_path_remover)
367 paths = []
368 for name in names:
369 filename = '{0}-{1}'.format(os.getpid(), name)
370 path = os.path.join(test_dir, filename)
371 file_path_remover.paths.append(path)
372 paths.append(path)
374 return paths[0] if len(paths) == 1 else paths
376 def remote_filename(path):
377 if imgproto == 'file':
378 return path
379 elif imgproto == 'ssh':
380 return "ssh://127.0.0.1%s" % (path)
381 else:
382 raise Exception("Protocol %s not supported" % (imgproto))
384 class VM(qtest.QEMUQtestMachine):
385 '''A QEMU VM'''
387 def __init__(self, path_suffix=''):
388 name = "qemu%s-%d" % (path_suffix, os.getpid())
389 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
390 test_dir=test_dir,
391 socket_scm_helper=socket_scm_helper)
392 self._num_drives = 0
394 def add_object(self, opts):
395 self._args.append('-object')
396 self._args.append(opts)
397 return self
399 def add_device(self, opts):
400 self._args.append('-device')
401 self._args.append(opts)
402 return self
404 def add_drive_raw(self, opts):
405 self._args.append('-drive')
406 self._args.append(opts)
407 return self
409 def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
410 '''Add a virtio-blk drive to the VM'''
411 options = ['if=%s' % interface,
412 'id=drive%d' % self._num_drives]
414 if path is not None:
415 options.append('file=%s' % path)
416 options.append('format=%s' % format)
417 options.append('cache=%s' % cachemode)
419 if opts:
420 options.append(opts)
422 if format == 'luks' and 'key-secret' not in opts:
423 # default luks support
424 if luks_default_secret_object not in self._args:
425 self.add_object(luks_default_secret_object)
427 options.append(luks_default_key_secret_opt)
429 self._args.append('-drive')
430 self._args.append(','.join(options))
431 self._num_drives += 1
432 return self
434 def add_blockdev(self, opts):
435 self._args.append('-blockdev')
436 if isinstance(opts, str):
437 self._args.append(opts)
438 else:
439 self._args.append(','.join(opts))
440 return self
442 def add_incoming(self, addr):
443 self._args.append('-incoming')
444 self._args.append(addr)
445 return self
447 def pause_drive(self, drive, event=None):
448 '''Pause drive r/w operations'''
449 if not event:
450 self.pause_drive(drive, "read_aio")
451 self.pause_drive(drive, "write_aio")
452 return
453 self.qmp('human-monitor-command',
454 command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
456 def resume_drive(self, drive):
457 self.qmp('human-monitor-command',
458 command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
460 def hmp_qemu_io(self, drive, cmd):
461 '''Write to a given drive using an HMP command'''
462 return self.qmp('human-monitor-command',
463 command_line='qemu-io %s "%s"' % (drive, cmd))
465 def flatten_qmp_object(self, obj, output=None, basestr=''):
466 if output is None:
467 output = dict()
468 if isinstance(obj, list):
469 for i in range(len(obj)):
470 self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
471 elif isinstance(obj, dict):
472 for key in obj:
473 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
474 else:
475 output[basestr[:-1]] = obj # Strip trailing '.'
476 return output
478 def qmp_to_opts(self, obj):
479 obj = self.flatten_qmp_object(obj)
480 output_list = list()
481 for key in obj:
482 output_list += [key + '=' + obj[key]]
483 return ','.join(output_list)
485 def get_qmp_events_filtered(self, wait=True):
486 result = []
487 for ev in self.get_qmp_events(wait=wait):
488 result.append(filter_qmp_event(ev))
489 return result
491 def qmp_log(self, cmd, filters=[], indent=None, **kwargs):
492 full_cmd = OrderedDict((
493 ("execute", cmd),
494 ("arguments", ordered_kwargs(kwargs))
496 log(full_cmd, filters, indent=indent)
497 result = self.qmp(cmd, **kwargs)
498 log(result, filters, indent=indent)
499 return result
501 def run_job(self, job, auto_finalize=True, auto_dismiss=False):
502 while True:
503 for ev in self.get_qmp_events_filtered(wait=True):
504 if ev['event'] == 'JOB_STATUS_CHANGE':
505 status = ev['data']['status']
506 if status == 'aborting':
507 result = self.qmp('query-jobs')
508 for j in result['return']:
509 if j['id'] == job:
510 log('Job failed: %s' % (j['error']))
511 elif status == 'pending' and not auto_finalize:
512 self.qmp_log('job-finalize', id=job)
513 elif status == 'concluded' and not auto_dismiss:
514 self.qmp_log('job-dismiss', id=job)
515 elif status == 'null':
516 return
517 else:
518 iotests.log(ev)
521 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
523 class QMPTestCase(unittest.TestCase):
524 '''Abstract base class for QMP test cases'''
526 def dictpath(self, d, path):
527 '''Traverse a path in a nested dict'''
528 for component in path.split('/'):
529 m = index_re.match(component)
530 if m:
531 component, idx = m.groups()
532 idx = int(idx)
534 if not isinstance(d, dict) or component not in d:
535 self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
536 d = d[component]
538 if m:
539 if not isinstance(d, list):
540 self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
541 try:
542 d = d[idx]
543 except IndexError:
544 self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
545 return d
547 def assert_qmp_absent(self, d, path):
548 try:
549 result = self.dictpath(d, path)
550 except AssertionError:
551 return
552 self.fail('path "%s" has value "%s"' % (path, str(result)))
554 def assert_qmp(self, d, path, value):
555 '''Assert that the value for a specific path in a QMP dict matches'''
556 result = self.dictpath(d, path)
557 self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value)))
559 def assert_no_active_block_jobs(self):
560 result = self.vm.qmp('query-block-jobs')
561 self.assert_qmp(result, 'return', [])
563 def assert_has_block_node(self, node_name=None, file_name=None):
564 """Issue a query-named-block-nodes and assert node_name and/or
565 file_name is present in the result"""
566 def check_equal_or_none(a, b):
567 return a == None or b == None or a == b
568 assert node_name or file_name
569 result = self.vm.qmp('query-named-block-nodes')
570 for x in result["return"]:
571 if check_equal_or_none(x.get("node-name"), node_name) and \
572 check_equal_or_none(x.get("file"), file_name):
573 return
574 self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
575 (node_name, file_name, result))
577 def assert_json_filename_equal(self, json_filename, reference):
578 '''Asserts that the given filename is a json: filename and that its
579 content is equal to the given reference object'''
580 self.assertEqual(json_filename[:5], 'json:')
581 self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
582 self.vm.flatten_qmp_object(reference))
584 def cancel_and_wait(self, drive='drive0', force=False, resume=False):
585 '''Cancel a block job and wait for it to finish, returning the event'''
586 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
587 self.assert_qmp(result, 'return', {})
589 if resume:
590 self.vm.resume_drive(drive)
592 cancelled = False
593 result = None
594 while not cancelled:
595 for event in self.vm.get_qmp_events(wait=True):
596 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
597 event['event'] == 'BLOCK_JOB_CANCELLED':
598 self.assert_qmp(event, 'data/device', drive)
599 result = event
600 cancelled = True
601 elif event['event'] == 'JOB_STATUS_CHANGE':
602 self.assert_qmp(event, 'data/id', drive)
605 self.assert_no_active_block_jobs()
606 return result
608 def wait_until_completed(self, drive='drive0', check_offset=True):
609 '''Wait for a block job to finish, returning the event'''
610 while True:
611 for event in self.vm.get_qmp_events(wait=True):
612 if event['event'] == 'BLOCK_JOB_COMPLETED':
613 self.assert_qmp(event, 'data/device', drive)
614 self.assert_qmp_absent(event, 'data/error')
615 if check_offset:
616 self.assert_qmp(event, 'data/offset', event['data']['len'])
617 self.assert_no_active_block_jobs()
618 return event
619 elif event['event'] == 'JOB_STATUS_CHANGE':
620 self.assert_qmp(event, 'data/id', drive)
622 def wait_ready(self, drive='drive0'):
623 '''Wait until a block job BLOCK_JOB_READY event'''
624 f = {'data': {'type': 'mirror', 'device': drive } }
625 event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
627 def wait_ready_and_cancel(self, drive='drive0'):
628 self.wait_ready(drive=drive)
629 event = self.cancel_and_wait(drive=drive)
630 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
631 self.assert_qmp(event, 'data/type', 'mirror')
632 self.assert_qmp(event, 'data/offset', event['data']['len'])
634 def complete_and_wait(self, drive='drive0', wait_ready=True):
635 '''Complete a block job and wait for it to finish'''
636 if wait_ready:
637 self.wait_ready(drive=drive)
639 result = self.vm.qmp('block-job-complete', device=drive)
640 self.assert_qmp(result, 'return', {})
642 event = self.wait_until_completed(drive=drive)
643 self.assert_qmp(event, 'data/type', 'mirror')
645 def pause_wait(self, job_id='job0'):
646 with Timeout(1, "Timeout waiting for job to pause"):
647 while True:
648 result = self.vm.qmp('query-block-jobs')
649 found = False
650 for job in result['return']:
651 if job['device'] == job_id:
652 found = True
653 if job['paused'] == True and job['busy'] == False:
654 return job
655 break
656 assert found
658 def pause_job(self, job_id='job0', wait=True):
659 result = self.vm.qmp('block-job-pause', device=job_id)
660 self.assert_qmp(result, 'return', {})
661 if wait:
662 return self.pause_wait(job_id)
663 return result
666 def notrun(reason):
667 '''Skip this test suite'''
668 # Each test in qemu-iotests has a number ("seq")
669 seq = os.path.basename(sys.argv[0])
671 open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n')
672 print('%s not run: %s' % (seq, reason))
673 sys.exit(0)
675 def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
676 assert not (supported_fmts and unsupported_fmts)
678 if 'generic' in supported_fmts and \
679 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
680 # similar to
681 # _supported_fmt generic
682 # for bash tests
683 return
685 not_sup = supported_fmts and (imgfmt not in supported_fmts)
686 if not_sup or (imgfmt in unsupported_fmts):
687 notrun('not suitable for this image format: %s' % imgfmt)
689 def verify_protocol(supported=[], unsupported=[]):
690 assert not (supported and unsupported)
692 if 'generic' in supported:
693 return
695 not_sup = supported and (imgproto not in supported)
696 if not_sup or (imgproto in unsupported):
697 notrun('not suitable for this protocol: %s' % imgproto)
699 def verify_platform(supported_oses=['linux']):
700 if True not in [sys.platform.startswith(x) for x in supported_oses]:
701 notrun('not suitable for this OS: %s' % sys.platform)
703 def verify_cache_mode(supported_cache_modes=[]):
704 if supported_cache_modes and (cachemode not in supported_cache_modes):
705 notrun('not suitable for this cache mode: %s' % cachemode)
707 def supports_quorum():
708 return 'quorum' in qemu_img_pipe('--help')
710 def verify_quorum():
711 '''Skip test suite if quorum support is not available'''
712 if not supports_quorum():
713 notrun('quorum support missing')
715 def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[],
716 unsupported_fmts=[]):
717 '''Run tests'''
719 global debug
721 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
722 # indicate that we're not being run via "check". There may be
723 # other things set up by "check" that individual test cases rely
724 # on.
725 if test_dir is None or qemu_default_machine is None:
726 sys.stderr.write('Please run this test via the "check" script\n')
727 sys.exit(os.EX_USAGE)
729 debug = '-d' in sys.argv
730 verbosity = 1
731 verify_image_format(supported_fmts, unsupported_fmts)
732 verify_platform(supported_oses)
733 verify_cache_mode(supported_cache_modes)
735 if debug:
736 output = sys.stdout
737 verbosity = 2
738 sys.argv.remove('-d')
739 else:
740 # We need to filter out the time taken from the output so that
741 # qemu-iotest can reliably diff the results against master output.
742 if sys.version_info.major >= 3:
743 output = io.StringIO()
744 else:
745 # io.StringIO is for unicode strings, which is not what
746 # 2.x's test runner emits.
747 output = io.BytesIO()
749 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
751 class MyTestRunner(unittest.TextTestRunner):
752 def __init__(self, stream=output, descriptions=True, verbosity=verbosity):
753 unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
755 # unittest.main() will use sys.exit() so expect a SystemExit exception
756 try:
757 unittest.main(testRunner=MyTestRunner)
758 finally:
759 if not debug:
760 sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', output.getvalue()))