sdl2: move opts assignment into loop
[qemu/ar7.git] / tests / qemu-iotests / iotests.py
blobb25d48a91b37a463b6acd16b5e7c319fe8742d0c
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import errno
20 import os
21 import re
22 import subprocess
23 import string
24 import unittest
25 import sys
26 import struct
27 import json
28 import signal
29 import logging
30 import atexit
32 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
33 import qtest
36 # This will not work if arguments contain spaces but is necessary if we
37 # want to support the override options that ./check supports.
38 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
39 if os.environ.get('QEMU_IMG_OPTIONS'):
40 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
42 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
43 if os.environ.get('QEMU_IO_OPTIONS'):
44 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
46 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
47 if os.environ.get('QEMU_NBD_OPTIONS'):
48 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
50 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
51 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
53 imgfmt = os.environ.get('IMGFMT', 'raw')
54 imgproto = os.environ.get('IMGPROTO', 'file')
55 test_dir = os.environ.get('TEST_DIR')
56 output_dir = os.environ.get('OUTPUT_DIR', '.')
57 cachemode = os.environ.get('CACHEMODE')
58 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
60 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
61 debug = False
63 luks_default_secret_object = 'secret,id=keysec0,data=' + \
64 os.environ['IMGKEYSECRET']
65 luks_default_key_secret_opt = 'key-secret=keysec0'
68 def qemu_img(*args):
69 '''Run qemu-img and return the exit code'''
70 devnull = open('/dev/null', 'r+')
71 exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
72 if exitcode < 0:
73 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
74 return exitcode
76 def qemu_img_create(*args):
77 args = list(args)
79 # default luks support
80 if '-f' in args and args[args.index('-f') + 1] == 'luks':
81 if '-o' in args:
82 i = args.index('-o')
83 if 'key-secret' not in args[i + 1]:
84 args[i + 1].append(luks_default_key_secret_opt)
85 args.insert(i + 2, '--object')
86 args.insert(i + 3, luks_default_secret_object)
87 else:
88 args = ['-o', luks_default_key_secret_opt,
89 '--object', luks_default_secret_object] + args
91 args.insert(0, 'create')
93 return qemu_img(*args)
95 def qemu_img_verbose(*args):
96 '''Run qemu-img without suppressing its output and return the exit code'''
97 exitcode = subprocess.call(qemu_img_args + list(args))
98 if exitcode < 0:
99 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
100 return exitcode
102 def qemu_img_pipe(*args):
103 '''Run qemu-img and return its output'''
104 subp = subprocess.Popen(qemu_img_args + list(args),
105 stdout=subprocess.PIPE,
106 stderr=subprocess.STDOUT)
107 exitcode = subp.wait()
108 if exitcode < 0:
109 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
110 return subp.communicate()[0]
112 def qemu_io(*args):
113 '''Run qemu-io and return the stdout data'''
114 args = qemu_io_args + list(args)
115 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
116 stderr=subprocess.STDOUT)
117 exitcode = subp.wait()
118 if exitcode < 0:
119 sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
120 return subp.communicate()[0]
123 class QemuIoInteractive:
124 def __init__(self, *args):
125 self.args = qemu_io_args + list(args)
126 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
127 stdout=subprocess.PIPE,
128 stderr=subprocess.STDOUT)
129 assert self._p.stdout.read(9) == 'qemu-io> '
131 def close(self):
132 self._p.communicate('q\n')
134 def _read_output(self):
135 pattern = 'qemu-io> '
136 n = len(pattern)
137 pos = 0
138 s = []
139 while pos != n:
140 c = self._p.stdout.read(1)
141 # check unexpected EOF
142 assert c != ''
143 s.append(c)
144 if c == pattern[pos]:
145 pos += 1
146 else:
147 pos = 0
149 return ''.join(s[:-n])
151 def cmd(self, cmd):
152 # quit command is in close(), '\n' is added automatically
153 assert '\n' not in cmd
154 cmd = cmd.strip()
155 assert cmd != 'q' and cmd != 'quit'
156 self._p.stdin.write(cmd + '\n')
157 return self._read_output()
160 def qemu_nbd(*args):
161 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
162 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
164 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
165 '''Return True if two image files are identical'''
166 return qemu_img('compare', '-f', fmt1,
167 '-F', fmt2, img1, img2) == 0
169 def create_image(name, size):
170 '''Create a fully-allocated raw image with sector markers'''
171 file = open(name, 'w')
172 i = 0
173 while i < size:
174 sector = struct.pack('>l504xl', i / 512, i / 512)
175 file.write(sector)
176 i = i + 512
177 file.close()
179 def image_size(img):
180 '''Return image's virtual size'''
181 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
182 return json.loads(r)['virtual-size']
184 test_dir_re = re.compile(r"%s" % test_dir)
185 def filter_test_dir(msg):
186 return test_dir_re.sub("TEST_DIR", msg)
188 win32_re = re.compile(r"\r")
189 def filter_win32(msg):
190 return win32_re.sub("", msg)
192 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
193 def filter_qemu_io(msg):
194 msg = filter_win32(msg)
195 return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
197 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
198 def filter_chown(msg):
199 return chown_re.sub("chown UID:GID", msg)
201 def filter_qmp_event(event):
202 '''Filter a QMP event dict'''
203 event = dict(event)
204 if 'timestamp' in event:
205 event['timestamp']['seconds'] = 'SECS'
206 event['timestamp']['microseconds'] = 'USECS'
207 return event
209 def log(msg, filters=[]):
210 for flt in filters:
211 msg = flt(msg)
212 print msg
214 class Timeout:
215 def __init__(self, seconds, errmsg = "Timeout"):
216 self.seconds = seconds
217 self.errmsg = errmsg
218 def __enter__(self):
219 signal.signal(signal.SIGALRM, self.timeout)
220 signal.setitimer(signal.ITIMER_REAL, self.seconds)
221 return self
222 def __exit__(self, type, value, traceback):
223 signal.setitimer(signal.ITIMER_REAL, 0)
224 return False
225 def timeout(self, signum, frame):
226 raise Exception(self.errmsg)
229 class FilePath(object):
230 '''An auto-generated filename that cleans itself up.
232 Use this context manager to generate filenames and ensure that the file
233 gets deleted::
235 with TestFilePath('test.img') as img_path:
236 qemu_img('create', img_path, '1G')
237 # migration_sock_path is automatically deleted
239 def __init__(self, name):
240 filename = '{0}-{1}'.format(os.getpid(), name)
241 self.path = os.path.join(test_dir, filename)
243 def __enter__(self):
244 return self.path
246 def __exit__(self, exc_type, exc_val, exc_tb):
247 try:
248 os.remove(self.path)
249 except OSError:
250 pass
251 return False
254 def file_path_remover():
255 for path in reversed(file_path_remover.paths):
256 try:
257 os.remove(path)
258 except OSError:
259 pass
262 def file_path(*names):
263 ''' Another way to get auto-generated filename that cleans itself up.
265 Use is as simple as:
267 img_a, img_b = file_path('a.img', 'b.img')
268 sock = file_path('socket')
271 if not hasattr(file_path_remover, 'paths'):
272 file_path_remover.paths = []
273 atexit.register(file_path_remover)
275 paths = []
276 for name in names:
277 filename = '{0}-{1}'.format(os.getpid(), name)
278 path = os.path.join(test_dir, filename)
279 file_path_remover.paths.append(path)
280 paths.append(path)
282 return paths[0] if len(paths) == 1 else paths
285 class VM(qtest.QEMUQtestMachine):
286 '''A QEMU VM'''
288 def __init__(self, path_suffix=''):
289 name = "qemu%s-%d" % (path_suffix, os.getpid())
290 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
291 test_dir=test_dir,
292 socket_scm_helper=socket_scm_helper)
293 self._num_drives = 0
295 def add_object(self, opts):
296 self._args.append('-object')
297 self._args.append(opts)
298 return self
300 def add_device(self, opts):
301 self._args.append('-device')
302 self._args.append(opts)
303 return self
305 def add_drive_raw(self, opts):
306 self._args.append('-drive')
307 self._args.append(opts)
308 return self
310 def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
311 '''Add a virtio-blk drive to the VM'''
312 options = ['if=%s' % interface,
313 'id=drive%d' % self._num_drives]
315 if path is not None:
316 options.append('file=%s' % path)
317 options.append('format=%s' % format)
318 options.append('cache=%s' % cachemode)
320 if opts:
321 options.append(opts)
323 if format == 'luks' and 'key-secret' not in opts:
324 # default luks support
325 if luks_default_secret_object not in self._args:
326 self.add_object(luks_default_secret_object)
328 options.append(luks_default_key_secret_opt)
330 self._args.append('-drive')
331 self._args.append(','.join(options))
332 self._num_drives += 1
333 return self
335 def add_blockdev(self, opts):
336 self._args.append('-blockdev')
337 if isinstance(opts, str):
338 self._args.append(opts)
339 else:
340 self._args.append(','.join(opts))
341 return self
343 def add_incoming(self, addr):
344 self._args.append('-incoming')
345 self._args.append(addr)
346 return self
348 def pause_drive(self, drive, event=None):
349 '''Pause drive r/w operations'''
350 if not event:
351 self.pause_drive(drive, "read_aio")
352 self.pause_drive(drive, "write_aio")
353 return
354 self.qmp('human-monitor-command',
355 command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
357 def resume_drive(self, drive):
358 self.qmp('human-monitor-command',
359 command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
361 def hmp_qemu_io(self, drive, cmd):
362 '''Write to a given drive using an HMP command'''
363 return self.qmp('human-monitor-command',
364 command_line='qemu-io %s "%s"' % (drive, cmd))
367 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
369 class QMPTestCase(unittest.TestCase):
370 '''Abstract base class for QMP test cases'''
372 def dictpath(self, d, path):
373 '''Traverse a path in a nested dict'''
374 for component in path.split('/'):
375 m = index_re.match(component)
376 if m:
377 component, idx = m.groups()
378 idx = int(idx)
380 if not isinstance(d, dict) or component not in d:
381 self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
382 d = d[component]
384 if m:
385 if not isinstance(d, list):
386 self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
387 try:
388 d = d[idx]
389 except IndexError:
390 self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
391 return d
393 def flatten_qmp_object(self, obj, output=None, basestr=''):
394 if output is None:
395 output = dict()
396 if isinstance(obj, list):
397 for i in range(len(obj)):
398 self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
399 elif isinstance(obj, dict):
400 for key in obj:
401 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
402 else:
403 output[basestr[:-1]] = obj # Strip trailing '.'
404 return output
406 def qmp_to_opts(self, obj):
407 obj = self.flatten_qmp_object(obj)
408 output_list = list()
409 for key in obj:
410 output_list += [key + '=' + obj[key]]
411 return ','.join(output_list)
413 def assert_qmp_absent(self, d, path):
414 try:
415 result = self.dictpath(d, path)
416 except AssertionError:
417 return
418 self.fail('path "%s" has value "%s"' % (path, str(result)))
420 def assert_qmp(self, d, path, value):
421 '''Assert that the value for a specific path in a QMP dict matches'''
422 result = self.dictpath(d, path)
423 self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value)))
425 def assert_no_active_block_jobs(self):
426 result = self.vm.qmp('query-block-jobs')
427 self.assert_qmp(result, 'return', [])
429 def assert_has_block_node(self, node_name=None, file_name=None):
430 """Issue a query-named-block-nodes and assert node_name and/or
431 file_name is present in the result"""
432 def check_equal_or_none(a, b):
433 return a == None or b == None or a == b
434 assert node_name or file_name
435 result = self.vm.qmp('query-named-block-nodes')
436 for x in result["return"]:
437 if check_equal_or_none(x.get("node-name"), node_name) and \
438 check_equal_or_none(x.get("file"), file_name):
439 return
440 self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
441 (node_name, file_name, result))
443 def assert_json_filename_equal(self, json_filename, reference):
444 '''Asserts that the given filename is a json: filename and that its
445 content is equal to the given reference object'''
446 self.assertEqual(json_filename[:5], 'json:')
447 self.assertEqual(self.flatten_qmp_object(json.loads(json_filename[5:])),
448 self.flatten_qmp_object(reference))
450 def cancel_and_wait(self, drive='drive0', force=False, resume=False):
451 '''Cancel a block job and wait for it to finish, returning the event'''
452 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
453 self.assert_qmp(result, 'return', {})
455 if resume:
456 self.vm.resume_drive(drive)
458 cancelled = False
459 result = None
460 while not cancelled:
461 for event in self.vm.get_qmp_events(wait=True):
462 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
463 event['event'] == 'BLOCK_JOB_CANCELLED':
464 self.assert_qmp(event, 'data/device', drive)
465 result = event
466 cancelled = True
468 self.assert_no_active_block_jobs()
469 return result
471 def wait_until_completed(self, drive='drive0', check_offset=True):
472 '''Wait for a block job to finish, returning the event'''
473 while True:
474 for event in self.vm.get_qmp_events(wait=True):
475 if event['event'] == 'BLOCK_JOB_COMPLETED':
476 self.assert_qmp(event, 'data/device', drive)
477 self.assert_qmp_absent(event, 'data/error')
478 if check_offset:
479 self.assert_qmp(event, 'data/offset', event['data']['len'])
480 self.assert_no_active_block_jobs()
481 return event
483 def wait_ready(self, drive='drive0'):
484 '''Wait until a block job BLOCK_JOB_READY event'''
485 f = {'data': {'type': 'mirror', 'device': drive } }
486 event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
488 def wait_ready_and_cancel(self, drive='drive0'):
489 self.wait_ready(drive=drive)
490 event = self.cancel_and_wait(drive=drive)
491 self.assertEquals(event['event'], 'BLOCK_JOB_COMPLETED')
492 self.assert_qmp(event, 'data/type', 'mirror')
493 self.assert_qmp(event, 'data/offset', event['data']['len'])
495 def complete_and_wait(self, drive='drive0', wait_ready=True):
496 '''Complete a block job and wait for it to finish'''
497 if wait_ready:
498 self.wait_ready(drive=drive)
500 result = self.vm.qmp('block-job-complete', device=drive)
501 self.assert_qmp(result, 'return', {})
503 event = self.wait_until_completed(drive=drive)
504 self.assert_qmp(event, 'data/type', 'mirror')
506 def pause_wait(self, job_id='job0'):
507 with Timeout(1, "Timeout waiting for job to pause"):
508 while True:
509 result = self.vm.qmp('query-block-jobs')
510 for job in result['return']:
511 if job['device'] == job_id and job['paused'] == True and job['busy'] == False:
512 return job
514 def pause_job(self, job_id='job0', wait=True):
515 result = self.vm.qmp('block-job-pause', device=job_id)
516 self.assert_qmp(result, 'return', {})
517 if wait:
518 return self.pause_wait(job_id)
519 return result
522 def notrun(reason):
523 '''Skip this test suite'''
524 # Each test in qemu-iotests has a number ("seq")
525 seq = os.path.basename(sys.argv[0])
527 open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n')
528 print '%s not run: %s' % (seq, reason)
529 sys.exit(0)
531 def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
532 assert not (supported_fmts and unsupported_fmts)
534 if 'generic' in supported_fmts and \
535 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
536 # similar to
537 # _supported_fmt generic
538 # for bash tests
539 return
541 not_sup = supported_fmts and (imgfmt not in supported_fmts)
542 if not_sup or (imgfmt in unsupported_fmts):
543 notrun('not suitable for this image format: %s' % imgfmt)
545 def verify_platform(supported_oses=['linux']):
546 if True not in [sys.platform.startswith(x) for x in supported_oses]:
547 notrun('not suitable for this OS: %s' % sys.platform)
549 def verify_cache_mode(supported_cache_modes=[]):
550 if supported_cache_modes and (cachemode not in supported_cache_modes):
551 notrun('not suitable for this cache mode: %s' % cachemode)
553 def supports_quorum():
554 return 'quorum' in qemu_img_pipe('--help')
556 def verify_quorum():
557 '''Skip test suite if quorum support is not available'''
558 if not supports_quorum():
559 notrun('quorum support missing')
561 def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[],
562 unsupported_fmts=[]):
563 '''Run tests'''
565 global debug
567 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
568 # indicate that we're not being run via "check". There may be
569 # other things set up by "check" that individual test cases rely
570 # on.
571 if test_dir is None or qemu_default_machine is None:
572 sys.stderr.write('Please run this test via the "check" script\n')
573 sys.exit(os.EX_USAGE)
575 debug = '-d' in sys.argv
576 verbosity = 1
577 verify_image_format(supported_fmts, unsupported_fmts)
578 verify_platform(supported_oses)
579 verify_cache_mode(supported_cache_modes)
581 # We need to filter out the time taken from the output so that qemu-iotest
582 # can reliably diff the results against master output.
583 import StringIO
584 if debug:
585 output = sys.stdout
586 verbosity = 2
587 sys.argv.remove('-d')
588 else:
589 output = StringIO.StringIO()
591 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
593 class MyTestRunner(unittest.TextTestRunner):
594 def __init__(self, stream=output, descriptions=True, verbosity=verbosity):
595 unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
597 # unittest.main() will use sys.exit() so expect a SystemExit exception
598 try:
599 unittest.main(testRunner=MyTestRunner)
600 finally:
601 if not debug:
602 sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', output.getvalue()))