hw/arm/boot: If booting a kernel in EL2, set SCR_EL3.HCE
[qemu/ar7.git] / tests / qemu-iotests / iotests.py
blobb5d7945af882a0e96ba428118990222ad465f956
1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import errno
20 import os
21 import re
22 import subprocess
23 import string
24 import unittest
25 import sys
26 import struct
27 import json
28 import signal
29 import logging
30 import atexit
32 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
33 import qtest
36 # This will not work if arguments contain spaces but is necessary if we
37 # want to support the override options that ./check supports.
38 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
39 if os.environ.get('QEMU_IMG_OPTIONS'):
40 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
42 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
43 if os.environ.get('QEMU_IO_OPTIONS'):
44 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
46 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
47 if os.environ.get('QEMU_NBD_OPTIONS'):
48 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
50 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
51 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
53 imgfmt = os.environ.get('IMGFMT', 'raw')
54 imgproto = os.environ.get('IMGPROTO', 'file')
55 test_dir = os.environ.get('TEST_DIR')
56 output_dir = os.environ.get('OUTPUT_DIR', '.')
57 cachemode = os.environ.get('CACHEMODE')
58 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
60 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
61 debug = False
63 luks_default_secret_object = 'secret,id=keysec0,data=' + \
64 os.environ['IMGKEYSECRET']
65 luks_default_key_secret_opt = 'key-secret=keysec0'
68 def qemu_img(*args):
69 '''Run qemu-img and return the exit code'''
70 devnull = open('/dev/null', 'r+')
71 exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
72 if exitcode < 0:
73 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
74 return exitcode
76 def qemu_img_create(*args):
77 args = list(args)
79 # default luks support
80 if '-f' in args and args[args.index('-f') + 1] == 'luks':
81 if '-o' in args:
82 i = args.index('-o')
83 if 'key-secret' not in args[i + 1]:
84 args[i + 1].append(luks_default_key_secret_opt)
85 args.insert(i + 2, '--object')
86 args.insert(i + 3, luks_default_secret_object)
87 else:
88 args = ['-o', luks_default_key_secret_opt,
89 '--object', luks_default_secret_object] + args
91 args.insert(0, 'create')
93 return qemu_img(*args)
95 def qemu_img_verbose(*args):
96 '''Run qemu-img without suppressing its output and return the exit code'''
97 exitcode = subprocess.call(qemu_img_args + list(args))
98 if exitcode < 0:
99 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
100 return exitcode
102 def qemu_img_pipe(*args):
103 '''Run qemu-img and return its output'''
104 subp = subprocess.Popen(qemu_img_args + list(args),
105 stdout=subprocess.PIPE,
106 stderr=subprocess.STDOUT)
107 exitcode = subp.wait()
108 if exitcode < 0:
109 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
110 return subp.communicate()[0]
112 def qemu_io(*args):
113 '''Run qemu-io and return the stdout data'''
114 args = qemu_io_args + list(args)
115 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
116 stderr=subprocess.STDOUT)
117 exitcode = subp.wait()
118 if exitcode < 0:
119 sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
120 return subp.communicate()[0]
123 class QemuIoInteractive:
124 def __init__(self, *args):
125 self.args = qemu_io_args + list(args)
126 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
127 stdout=subprocess.PIPE,
128 stderr=subprocess.STDOUT)
129 assert self._p.stdout.read(9) == 'qemu-io> '
131 def close(self):
132 self._p.communicate('q\n')
134 def _read_output(self):
135 pattern = 'qemu-io> '
136 n = len(pattern)
137 pos = 0
138 s = []
139 while pos != n:
140 c = self._p.stdout.read(1)
141 # check unexpected EOF
142 assert c != ''
143 s.append(c)
144 if c == pattern[pos]:
145 pos += 1
146 else:
147 pos = 0
149 return ''.join(s[:-n])
151 def cmd(self, cmd):
152 # quit command is in close(), '\n' is added automatically
153 assert '\n' not in cmd
154 cmd = cmd.strip()
155 assert cmd != 'q' and cmd != 'quit'
156 self._p.stdin.write(cmd + '\n')
157 return self._read_output()
160 def qemu_nbd(*args):
161 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
162 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
164 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
165 '''Return True if two image files are identical'''
166 return qemu_img('compare', '-f', fmt1,
167 '-F', fmt2, img1, img2) == 0
169 def create_image(name, size):
170 '''Create a fully-allocated raw image with sector markers'''
171 file = open(name, 'w')
172 i = 0
173 while i < size:
174 sector = struct.pack('>l504xl', i / 512, i / 512)
175 file.write(sector)
176 i = i + 512
177 file.close()
179 def image_size(img):
180 '''Return image's virtual size'''
181 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
182 return json.loads(r)['virtual-size']
184 test_dir_re = re.compile(r"%s" % test_dir)
185 def filter_test_dir(msg):
186 return test_dir_re.sub("TEST_DIR", msg)
188 win32_re = re.compile(r"\r")
189 def filter_win32(msg):
190 return win32_re.sub("", msg)
192 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
193 def filter_qemu_io(msg):
194 msg = filter_win32(msg)
195 return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
197 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
198 def filter_chown(msg):
199 return chown_re.sub("chown UID:GID", msg)
201 def filter_qmp_event(event):
202 '''Filter a QMP event dict'''
203 event = dict(event)
204 if 'timestamp' in event:
205 event['timestamp']['seconds'] = 'SECS'
206 event['timestamp']['microseconds'] = 'USECS'
207 return event
209 def log(msg, filters=[]):
210 for flt in filters:
211 msg = flt(msg)
212 print msg
214 class Timeout:
215 def __init__(self, seconds, errmsg = "Timeout"):
216 self.seconds = seconds
217 self.errmsg = errmsg
218 def __enter__(self):
219 signal.signal(signal.SIGALRM, self.timeout)
220 signal.setitimer(signal.ITIMER_REAL, self.seconds)
221 return self
222 def __exit__(self, type, value, traceback):
223 signal.setitimer(signal.ITIMER_REAL, 0)
224 return False
225 def timeout(self, signum, frame):
226 raise Exception(self.errmsg)
229 class FilePath(object):
230 '''An auto-generated filename that cleans itself up.
232 Use this context manager to generate filenames and ensure that the file
233 gets deleted::
235 with TestFilePath('test.img') as img_path:
236 qemu_img('create', img_path, '1G')
237 # migration_sock_path is automatically deleted
239 def __init__(self, name):
240 filename = '{0}-{1}'.format(os.getpid(), name)
241 self.path = os.path.join(test_dir, filename)
243 def __enter__(self):
244 return self.path
246 def __exit__(self, exc_type, exc_val, exc_tb):
247 try:
248 os.remove(self.path)
249 except OSError:
250 pass
251 return False
254 def file_path_remover():
255 for path in reversed(file_path_remover.paths):
256 try:
257 os.remove(path)
258 except OSError:
259 pass
262 def file_path(*names):
263 ''' Another way to get auto-generated filename that cleans itself up.
265 Use is as simple as:
267 img_a, img_b = file_path('a.img', 'b.img')
268 sock = file_path('socket')
271 if not hasattr(file_path_remover, 'paths'):
272 file_path_remover.paths = []
273 atexit.register(file_path_remover)
275 paths = []
276 for name in names:
277 filename = '{0}-{1}'.format(os.getpid(), name)
278 path = os.path.join(test_dir, filename)
279 file_path_remover.paths.append(path)
280 paths.append(path)
282 return paths[0] if len(paths) == 1 else paths
285 class VM(qtest.QEMUQtestMachine):
286 '''A QEMU VM'''
288 def __init__(self, path_suffix=''):
289 name = "qemu%s-%d" % (path_suffix, os.getpid())
290 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
291 test_dir=test_dir,
292 socket_scm_helper=socket_scm_helper)
293 self._num_drives = 0
295 def add_object(self, opts):
296 self._args.append('-object')
297 self._args.append(opts)
298 return self
300 def add_device(self, opts):
301 self._args.append('-device')
302 self._args.append(opts)
303 return self
305 def add_drive_raw(self, opts):
306 self._args.append('-drive')
307 self._args.append(opts)
308 return self
310 def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
311 '''Add a virtio-blk drive to the VM'''
312 options = ['if=%s' % interface,
313 'id=drive%d' % self._num_drives]
315 if path is not None:
316 options.append('file=%s' % path)
317 options.append('format=%s' % format)
318 options.append('cache=%s' % cachemode)
320 if opts:
321 options.append(opts)
323 if format == 'luks' and 'key-secret' not in opts:
324 # default luks support
325 if luks_default_secret_object not in self._args:
326 self.add_object(luks_default_secret_object)
328 options.append(luks_default_key_secret_opt)
330 self._args.append('-drive')
331 self._args.append(','.join(options))
332 self._num_drives += 1
333 return self
335 def add_blockdev(self, opts):
336 self._args.append('-blockdev')
337 if isinstance(opts, str):
338 self._args.append(opts)
339 else:
340 self._args.append(','.join(opts))
341 return self
343 def add_incoming(self, addr):
344 self._args.append('-incoming')
345 self._args.append(addr)
346 return self
348 def pause_drive(self, drive, event=None):
349 '''Pause drive r/w operations'''
350 if not event:
351 self.pause_drive(drive, "read_aio")
352 self.pause_drive(drive, "write_aio")
353 return
354 self.qmp('human-monitor-command',
355 command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
357 def resume_drive(self, drive):
358 self.qmp('human-monitor-command',
359 command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
361 def hmp_qemu_io(self, drive, cmd):
362 '''Write to a given drive using an HMP command'''
363 return self.qmp('human-monitor-command',
364 command_line='qemu-io %s "%s"' % (drive, cmd))
367 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
369 class QMPTestCase(unittest.TestCase):
370 '''Abstract base class for QMP test cases'''
372 def dictpath(self, d, path):
373 '''Traverse a path in a nested dict'''
374 for component in path.split('/'):
375 m = index_re.match(component)
376 if m:
377 component, idx = m.groups()
378 idx = int(idx)
380 if not isinstance(d, dict) or component not in d:
381 self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
382 d = d[component]
384 if m:
385 if not isinstance(d, list):
386 self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
387 try:
388 d = d[idx]
389 except IndexError:
390 self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
391 return d
393 def flatten_qmp_object(self, obj, output=None, basestr=''):
394 if output is None:
395 output = dict()
396 if isinstance(obj, list):
397 for i in range(len(obj)):
398 self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
399 elif isinstance(obj, dict):
400 for key in obj:
401 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
402 else:
403 output[basestr[:-1]] = obj # Strip trailing '.'
404 return output
406 def qmp_to_opts(self, obj):
407 obj = self.flatten_qmp_object(obj)
408 output_list = list()
409 for key in obj:
410 output_list += [key + '=' + obj[key]]
411 return ','.join(output_list)
413 def assert_qmp_absent(self, d, path):
414 try:
415 result = self.dictpath(d, path)
416 except AssertionError:
417 return
418 self.fail('path "%s" has value "%s"' % (path, str(result)))
420 def assert_qmp(self, d, path, value):
421 '''Assert that the value for a specific path in a QMP dict matches'''
422 result = self.dictpath(d, path)
423 self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value)))
425 def assert_no_active_block_jobs(self):
426 result = self.vm.qmp('query-block-jobs')
427 self.assert_qmp(result, 'return', [])
429 def assert_has_block_node(self, node_name=None, file_name=None):
430 """Issue a query-named-block-nodes and assert node_name and/or
431 file_name is present in the result"""
432 def check_equal_or_none(a, b):
433 return a == None or b == None or a == b
434 assert node_name or file_name
435 result = self.vm.qmp('query-named-block-nodes')
436 for x in result["return"]:
437 if check_equal_or_none(x.get("node-name"), node_name) and \
438 check_equal_or_none(x.get("file"), file_name):
439 return
440 self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
441 (node_name, file_name, result))
443 def assert_json_filename_equal(self, json_filename, reference):
444 '''Asserts that the given filename is a json: filename and that its
445 content is equal to the given reference object'''
446 self.assertEqual(json_filename[:5], 'json:')
447 self.assertEqual(self.flatten_qmp_object(json.loads(json_filename[5:])),
448 self.flatten_qmp_object(reference))
450 def cancel_and_wait(self, drive='drive0', force=False, resume=False):
451 '''Cancel a block job and wait for it to finish, returning the event'''
452 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
453 self.assert_qmp(result, 'return', {})
455 if resume:
456 self.vm.resume_drive(drive)
458 cancelled = False
459 result = None
460 while not cancelled:
461 for event in self.vm.get_qmp_events(wait=True):
462 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
463 event['event'] == 'BLOCK_JOB_CANCELLED':
464 self.assert_qmp(event, 'data/device', drive)
465 result = event
466 cancelled = True
468 self.assert_no_active_block_jobs()
469 return result
471 def wait_until_completed(self, drive='drive0', check_offset=True):
472 '''Wait for a block job to finish, returning the event'''
473 completed = False
474 while not completed:
475 for event in self.vm.get_qmp_events(wait=True):
476 if event['event'] == 'BLOCK_JOB_COMPLETED':
477 self.assert_qmp(event, 'data/device', drive)
478 self.assert_qmp_absent(event, 'data/error')
479 if check_offset:
480 self.assert_qmp(event, 'data/offset', event['data']['len'])
481 completed = True
483 self.assert_no_active_block_jobs()
484 return event
486 def wait_ready(self, drive='drive0'):
487 '''Wait until a block job BLOCK_JOB_READY event'''
488 f = {'data': {'type': 'mirror', 'device': drive } }
489 event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
491 def wait_ready_and_cancel(self, drive='drive0'):
492 self.wait_ready(drive=drive)
493 event = self.cancel_and_wait(drive=drive)
494 self.assertEquals(event['event'], 'BLOCK_JOB_COMPLETED')
495 self.assert_qmp(event, 'data/type', 'mirror')
496 self.assert_qmp(event, 'data/offset', event['data']['len'])
498 def complete_and_wait(self, drive='drive0', wait_ready=True):
499 '''Complete a block job and wait for it to finish'''
500 if wait_ready:
501 self.wait_ready(drive=drive)
503 result = self.vm.qmp('block-job-complete', device=drive)
504 self.assert_qmp(result, 'return', {})
506 event = self.wait_until_completed(drive=drive)
507 self.assert_qmp(event, 'data/type', 'mirror')
509 def pause_wait(self, job_id='job0'):
510 with Timeout(1, "Timeout waiting for job to pause"):
511 while True:
512 result = self.vm.qmp('query-block-jobs')
513 for job in result['return']:
514 if job['device'] == job_id and job['paused'] == True and job['busy'] == False:
515 return job
517 def pause_job(self, job_id='job0', wait=True):
518 result = self.vm.qmp('block-job-pause', device=job_id)
519 self.assert_qmp(result, 'return', {})
520 if wait:
521 return self.pause_wait(job_id)
522 return result
525 def notrun(reason):
526 '''Skip this test suite'''
527 # Each test in qemu-iotests has a number ("seq")
528 seq = os.path.basename(sys.argv[0])
530 open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n')
531 print '%s not run: %s' % (seq, reason)
532 sys.exit(0)
534 def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
535 if supported_fmts and (imgfmt not in supported_fmts):
536 notrun('not suitable for this image format: %s' % imgfmt)
537 if unsupported_fmts and (imgfmt in unsupported_fmts):
538 notrun('not suitable for this image format: %s' % imgfmt)
540 def verify_platform(supported_oses=['linux']):
541 if True not in [sys.platform.startswith(x) for x in supported_oses]:
542 notrun('not suitable for this OS: %s' % sys.platform)
544 def verify_cache_mode(supported_cache_modes=[]):
545 if supported_cache_modes and (cachemode not in supported_cache_modes):
546 notrun('not suitable for this cache mode: %s' % cachemode)
548 def supports_quorum():
549 return 'quorum' in qemu_img_pipe('--help')
551 def verify_quorum():
552 '''Skip test suite if quorum support is not available'''
553 if not supports_quorum():
554 notrun('quorum support missing')
556 def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[]):
557 '''Run tests'''
559 global debug
561 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
562 # indicate that we're not being run via "check". There may be
563 # other things set up by "check" that individual test cases rely
564 # on.
565 if test_dir is None or qemu_default_machine is None:
566 sys.stderr.write('Please run this test via the "check" script\n')
567 sys.exit(os.EX_USAGE)
569 debug = '-d' in sys.argv
570 verbosity = 1
571 verify_image_format(supported_fmts)
572 verify_platform(supported_oses)
573 verify_cache_mode(supported_cache_modes)
575 # We need to filter out the time taken from the output so that qemu-iotest
576 # can reliably diff the results against master output.
577 import StringIO
578 if debug:
579 output = sys.stdout
580 verbosity = 2
581 sys.argv.remove('-d')
582 else:
583 output = StringIO.StringIO()
585 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
587 class MyTestRunner(unittest.TextTestRunner):
588 def __init__(self, stream=output, descriptions=True, verbosity=verbosity):
589 unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
591 # unittest.main() will use sys.exit() so expect a SystemExit exception
592 try:
593 unittest.main(testRunner=MyTestRunner)
594 finally:
595 if not debug:
596 sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', output.getvalue()))