1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 from collections
import OrderedDict
34 from typing
import (Any
, Callable
, Dict
, Iterable
, Iterator
,
35 List
, Optional
, Sequence
, TextIO
, Tuple
, Type
, TypeVar
)
38 from contextlib
import contextmanager
40 from qemu
.machine
import qtest
41 from qemu
.qmp
.legacy
import QMPMessage
, QMPReturnValue
, QEMUMonitorProtocol
42 from qemu
.utils
import VerboseProcessError
44 # Use this logger for logging messages directly from the iotests module
45 logger
= logging
.getLogger('qemu.iotests')
46 logger
.addHandler(logging
.NullHandler())
48 # Use this logger for messages that ought to be used for diff output.
49 test_logger
= logging
.getLogger('qemu.iotests.diff_io')
54 # This will not work if arguments contain spaces but is necessary if we
55 # want to support the override options that ./check supports.
56 qemu_img_args
= [os
.environ
.get('QEMU_IMG_PROG', 'qemu-img')]
57 if os
.environ
.get('QEMU_IMG_OPTIONS'):
58 qemu_img_args
+= os
.environ
['QEMU_IMG_OPTIONS'].strip().split(' ')
60 qemu_io_args
= [os
.environ
.get('QEMU_IO_PROG', 'qemu-io')]
61 if os
.environ
.get('QEMU_IO_OPTIONS'):
62 qemu_io_args
+= os
.environ
['QEMU_IO_OPTIONS'].strip().split(' ')
64 qemu_io_args_no_fmt
= [os
.environ
.get('QEMU_IO_PROG', 'qemu-io')]
65 if os
.environ
.get('QEMU_IO_OPTIONS_NO_FMT'):
66 qemu_io_args_no_fmt
+= \
67 os
.environ
['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69 qemu_nbd_prog
= os
.environ
.get('QEMU_NBD_PROG', 'qemu-nbd')
70 qemu_nbd_args
= [qemu_nbd_prog
]
71 if os
.environ
.get('QEMU_NBD_OPTIONS'):
72 qemu_nbd_args
+= os
.environ
['QEMU_NBD_OPTIONS'].strip().split(' ')
74 qemu_prog
= os
.environ
.get('QEMU_PROG', 'qemu')
75 qemu_opts
= os
.environ
.get('QEMU_OPTIONS', '').strip().split(' ')
77 qsd_prog
= os
.environ
.get('QSD_PROG', 'qemu-storage-daemon')
79 gdb_qemu_env
= os
.environ
.get('GDB_OPTIONS')
82 qemu_gdb
= ['gdbserver'] + gdb_qemu_env
.strip().split(' ')
84 qemu_print
= os
.environ
.get('PRINT_QEMU', False)
86 imgfmt
= os
.environ
.get('IMGFMT', 'raw')
87 imgproto
= os
.environ
.get('IMGPROTO', 'file')
90 test_dir
= os
.environ
['TEST_DIR']
91 sock_dir
= os
.environ
['SOCK_DIR']
92 cachemode
= os
.environ
['CACHEMODE']
93 aiomode
= os
.environ
['AIOMODE']
94 qemu_default_machine
= os
.environ
['QEMU_DEFAULT_MACHINE']
96 # We are using these variables as proxies to indicate that we're
97 # not being run via "check". There may be other things set up by
98 # "check" that individual test cases rely on.
99 sys
.stderr
.write('Please run this test via the "check" script\n')
100 sys
.exit(os
.EX_USAGE
)
103 if os
.environ
.get('VALGRIND_QEMU') == "y" and \
104 os
.environ
.get('NO_VALGRIND') != "y":
105 valgrind_logfile
= "--log-file=" + test_dir
106 # %p allows to put the valgrind process PID, since
107 # we don't know it a priori (subprocess.Popen is
109 valgrind_logfile
+= "/%p.valgrind"
111 qemu_valgrind
= ['valgrind', valgrind_logfile
, '--error-exitcode=99']
113 luks_default_secret_object
= 'secret,id=keysec0,data=' + \
114 os
.environ
.get('IMGKEYSECRET', '')
115 luks_default_key_secret_opt
= 'key-secret=keysec0'
117 sample_img_dir
= os
.environ
['SAMPLE_IMG_DIR']
121 def change_log_level(
122 logger_name
: str, level
: int = logging
.CRITICAL
) -> Iterator
[None]:
124 Utility function for temporarily changing the log level of a logger.
126 This can be used to silence errors that are expected or uninteresting.
128 _logger
= logging
.getLogger(logger_name
)
129 current_level
= _logger
.level
130 _logger
.setLevel(level
)
135 _logger
.setLevel(current_level
)
138 def unarchive_sample_image(sample
, fname
):
139 sample_fname
= os
.path
.join(sample_img_dir
, sample
+ '.bz2')
140 with bz2
.open(sample_fname
) as f_in
, open(fname
, 'wb') as f_out
:
141 shutil
.copyfileobj(f_in
, f_out
)
144 def qemu_tool_popen(args
: Sequence
[str],
145 connect_stderr
: bool = True) -> 'subprocess.Popen[str]':
146 stderr
= subprocess
.STDOUT
if connect_stderr
else None
147 # pylint: disable=consider-using-with
148 return subprocess
.Popen(args
,
149 stdout
=subprocess
.PIPE
,
151 universal_newlines
=True)
154 def qemu_tool_pipe_and_status(tool
: str, args
: Sequence
[str],
155 connect_stderr
: bool = True,
156 drop_successful_output
: bool = False) \
159 Run a tool and return both its output and its exit code
161 with
qemu_tool_popen(args
, connect_stderr
) as subp
:
162 output
= subp
.communicate()[0]
163 if subp
.returncode
< 0:
165 sys
.stderr
.write(f
'{tool} received signal \
166 {-subp.returncode}: {cmd}\n')
167 if drop_successful_output
and subp
.returncode
== 0:
169 return (output
, subp
.returncode
)
171 def qemu_img_create_prepare_args(args
: List
[str]) -> List
[str]:
172 if not args
or args
[0] != 'create':
176 p
= argparse
.ArgumentParser(allow_abbrev
=False)
177 # -o option may be specified several times
178 p
.add_argument('-o', action
='append', default
=[])
180 parsed
, remaining
= p
.parse_known_args(args
)
185 if parsed
.f
is not None:
186 result
+= ['-f', parsed
.f
]
188 # IMGOPTS most probably contain options specific for the selected format,
189 # like extended_l2 or compression_type for qcow2. Test may want to create
190 # additional images in other formats that doesn't support these options.
191 # So, use IMGOPTS only for images created in imgfmt format.
192 imgopts
= os
.environ
.get('IMGOPTS')
193 if imgopts
and parsed
.f
== imgfmt
:
194 opts_list
.insert(0, imgopts
)
196 # default luks support
197 if parsed
.f
== 'luks' and \
198 all('key-secret' not in opts
for opts
in opts_list
):
199 result
+= ['--object', luks_default_secret_object
]
200 opts_list
.append(luks_default_key_secret_opt
)
202 for opts
in opts_list
:
203 result
+= ['-o', opts
]
210 def qemu_tool(*args
: str, check
: bool = True, combine_stdio
: bool = True
211 ) -> 'subprocess.CompletedProcess[str]':
213 Run a qemu tool and return its status code and console output.
215 :param args: full command line to run.
216 :param check: Enforce a return code of zero.
217 :param combine_stdio: set to False to keep stdout/stderr separated.
219 :raise VerboseProcessError:
220 When the return code is negative, or on any non-zero exit code
221 when 'check=True' was provided (the default). This exception has
222 'stdout', 'stderr', and 'returncode' properties that may be
223 inspected to show greater detail. If this exception is not
224 handled, the command-line, return code, and all console output
225 will be included at the bottom of the stack trace.
228 a CompletedProcess. This object has args, returncode, and stdout
229 properties. If streams are not combined, it will also have a
232 subp
= subprocess
.run(
234 stdout
=subprocess
.PIPE
,
235 stderr
=subprocess
.STDOUT
if combine_stdio
else subprocess
.PIPE
,
236 universal_newlines
=True,
240 if check
and subp
.returncode
or (subp
.returncode
< 0):
241 raise VerboseProcessError(
242 subp
.returncode
, args
,
250 def qemu_img(*args
: str, check
: bool = True, combine_stdio
: bool = True
251 ) -> 'subprocess.CompletedProcess[str]':
253 Run QEMU_IMG_PROG and return its status code and console output.
255 This function always prepends QEMU_IMG_OPTIONS and may further alter
256 the args for 'create' commands.
258 See `qemu_tool()` for greater detail.
260 full_args
= qemu_img_args
+ qemu_img_create_prepare_args(list(args
))
261 return qemu_tool(*full_args
, check
=check
, combine_stdio
=combine_stdio
)
264 def ordered_qmp(qmsg
, conv_keys
=True):
265 # Dictionaries are not ordered prior to 3.6, therefore:
266 if isinstance(qmsg
, list):
267 return [ordered_qmp(atom
) for atom
in qmsg
]
268 if isinstance(qmsg
, dict):
270 for k
, v
in sorted(qmsg
.items()):
272 k
= k
.replace('_', '-')
273 od
[k
] = ordered_qmp(v
, conv_keys
=False)
277 def qemu_img_create(*args
: str) -> 'subprocess.CompletedProcess[str]':
278 return qemu_img('create', *args
)
280 def qemu_img_json(*args
: str) -> Any
:
282 Run qemu-img and return its output as deserialized JSON.
284 :raise CalledProcessError:
285 When qemu-img crashes, or returns a non-zero exit code without
286 producing a valid JSON document to stdout.
287 :raise JSONDecoderError:
288 When qemu-img returns 0, but failed to produce a valid JSON document.
290 :return: A deserialized JSON object; probably a dict[str, Any].
293 res
= qemu_img(*args
, combine_stdio
=False)
294 except subprocess
.CalledProcessError
as exc
:
295 # Terminated due to signal. Don't bother.
296 if exc
.returncode
< 0:
299 # Commands like 'check' can return failure (exit codes 2 and 3)
300 # to indicate command completion, but with errors found. For
301 # multi-command flexibility, ignore the exact error codes and
302 # *try* to load JSON.
304 return json
.loads(exc
.stdout
)
305 except json
.JSONDecodeError
:
306 # Nope. This thing is toast. Raise the /process/ error.
310 return json
.loads(res
.stdout
)
312 def qemu_img_measure(*args
: str) -> Any
:
313 return qemu_img_json("measure", "--output", "json", *args
)
315 def qemu_img_check(*args
: str) -> Any
:
316 return qemu_img_json("check", "--output", "json", *args
)
318 def qemu_img_info(*args
: str) -> Any
:
319 return qemu_img_json('info', "--output", "json", *args
)
321 def qemu_img_map(*args
: str) -> Any
:
322 return qemu_img_json('map', "--output", "json", *args
)
324 def qemu_img_log(*args
: str, check
: bool = True
325 ) -> 'subprocess.CompletedProcess[str]':
326 result
= qemu_img(*args
, check
=check
)
327 log(result
.stdout
, filters
=[filter_testfiles
])
330 def img_info_log(filename
: str, filter_path
: Optional
[str] = None,
331 use_image_opts
: bool = False, extra_args
: Sequence
[str] = (),
332 check
: bool = True, drop_child_info
: bool = True,
336 args
.append('--image-opts')
338 args
+= ['-f', imgfmt
]
340 args
.append(filename
)
342 output
= qemu_img(*args
, check
=check
).stdout
344 filter_path
= filename
345 log(filter_img_info(output
, filter_path
, drop_child_info
))
347 def qemu_io_wrap_args(args
: Sequence
[str]) -> List
[str]:
348 if '-f' in args
or '--image-opts' in args
:
349 return qemu_io_args_no_fmt
+ list(args
)
351 return qemu_io_args
+ list(args
)
353 def qemu_io_popen(*args
):
354 return qemu_tool_popen(qemu_io_wrap_args(args
))
356 def qemu_io(*args
: str, check
: bool = True, combine_stdio
: bool = True
357 ) -> 'subprocess.CompletedProcess[str]':
359 Run QEMU_IO_PROG and return the status code and console output.
361 This function always prepends either QEMU_IO_OPTIONS or
362 QEMU_IO_OPTIONS_NO_FMT.
364 return qemu_tool(*qemu_io_wrap_args(args
),
365 check
=check
, combine_stdio
=combine_stdio
)
367 def qemu_io_log(*args
: str, check
: bool = True
368 ) -> 'subprocess.CompletedProcess[str]':
369 result
= qemu_io(*args
, check
=check
)
370 log(result
.stdout
, filters
=[filter_testfiles
, filter_qemu_io
])
373 class QemuIoInteractive
:
374 def __init__(self
, *args
):
375 self
.args
= qemu_io_wrap_args(args
)
376 # We need to keep the Popen objext around, and not
377 # close it immediately. Therefore, disable the pylint check:
378 # pylint: disable=consider-using-with
379 self
._p
= subprocess
.Popen(self
.args
, stdin
=subprocess
.PIPE
,
380 stdout
=subprocess
.PIPE
,
381 stderr
=subprocess
.STDOUT
,
382 universal_newlines
=True)
383 out
= self
._p
.stdout
.read(9)
384 if out
!= 'qemu-io> ':
385 # Most probably qemu-io just failed to start.
386 # Let's collect the whole output and exit.
387 out
+= self
._p
.stdout
.read()
388 self
._p
.wait(timeout
=1)
389 raise ValueError(out
)
392 self
._p
.communicate('q\n')
394 def _read_output(self
):
395 pattern
= 'qemu-io> '
400 c
= self
._p
.stdout
.read(1)
401 # check unexpected EOF
404 if c
== pattern
[pos
]:
409 return ''.join(s
[:-n
])
412 # quit command is in close(), '\n' is added automatically
413 assert '\n' not in cmd
415 assert cmd
not in ('q', 'quit')
416 self
._p
.stdin
.write(cmd
+ '\n')
417 self
._p
.stdin
.flush()
418 return self
._read
_output
()
421 class QemuStorageDaemon
:
422 _qmp
: Optional
[QEMUMonitorProtocol
] = None
423 _qmpsock
: Optional
[str] = None
424 # Python < 3.8 would complain if this type were not a string literal
425 # (importing `annotations` from `__future__` would work; but not on <= 3.6)
426 _p
: 'Optional[subprocess.Popen[bytes]]' = None
428 def __init__(self
, *args
: str, instance_id
: str = 'a', qmp
: bool = False):
429 assert '--pidfile' not in args
430 self
.pidfile
= os
.path
.join(test_dir
, f
'qsd-{instance_id}-pid')
431 all_args
= [qsd_prog
] + list(args
) + ['--pidfile', self
.pidfile
]
434 self
._qmpsock
= os
.path
.join(sock_dir
, f
'qsd-{instance_id}.sock')
435 all_args
+= ['--chardev',
436 f
'socket,id=qmp-sock,path={self._qmpsock}',
437 '--monitor', 'qmp-sock']
439 self
._qmp
= QEMUMonitorProtocol(self
._qmpsock
, server
=True)
441 # Cannot use with here, we want the subprocess to stay around
442 # pylint: disable=consider-using-with
443 self
._p
= subprocess
.Popen(all_args
)
444 if self
._qmp
is not None:
446 while not os
.path
.exists(self
.pidfile
):
447 if self
._p
.poll() is not None:
448 cmd
= ' '.join(all_args
)
450 'qemu-storage-daemon terminated with exit code ' +
451 f
'{self._p.returncode}: {cmd}')
455 with
open(self
.pidfile
, encoding
='utf-8') as f
:
456 self
._pid
= int(f
.read().strip())
458 assert self
._pid
== self
._p
.pid
460 def qmp(self
, cmd
: str, args
: Optional
[Dict
[str, object]] = None) \
462 assert self
._qmp
is not None
463 return self
._qmp
.cmd_raw(cmd
, args
)
465 def get_qmp(self
) -> QEMUMonitorProtocol
:
466 assert self
._qmp
is not None
469 def cmd(self
, cmd
: str, args
: Optional
[Dict
[str, object]] = None) \
471 assert self
._qmp
is not None
472 return self
._qmp
.cmd(cmd
, **(args
or {}))
474 def stop(self
, kill_signal
=15):
475 self
._p
.send_signal(kill_signal
)
482 if self
._qmpsock
is not None:
484 os
.remove(self
._qmpsock
)
488 os
.remove(self
.pidfile
)
493 if self
._p
is not None:
494 self
.stop(kill_signal
=9)
498 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
499 return subprocess
.call(qemu_nbd_args
+ ['--fork'] + list(args
))
501 def qemu_nbd_early_pipe(*args
: str) -> Tuple
[int, str]:
502 '''Run qemu-nbd in daemon mode and return both the parent's exit code
503 and its output in case of an error'''
504 full_args
= qemu_nbd_args
+ ['--fork'] + list(args
)
505 output
, returncode
= qemu_tool_pipe_and_status('qemu-nbd', full_args
,
506 connect_stderr
=False)
507 return returncode
, output
if returncode
else ''
509 def qemu_nbd_list_log(*args
: str) -> str:
510 '''Run qemu-nbd to list remote exports'''
511 full_args
= [qemu_nbd_prog
, '-L'] + list(args
)
512 output
, _
= qemu_tool_pipe_and_status('qemu-nbd', full_args
)
513 log(output
, filters
=[filter_testfiles
, filter_nbd_exports
])
517 def qemu_nbd_popen(*args
):
518 '''Context manager running qemu-nbd within the context'''
519 pid_file
= file_path("qemu_nbd_popen-nbd-pid-file")
521 assert not os
.path
.exists(pid_file
)
523 cmd
= list(qemu_nbd_args
)
524 cmd
.extend(('--persistent', '--pid-file', pid_file
))
527 log('Start NBD server')
528 with subprocess
.Popen(cmd
) as p
:
530 while not os
.path
.exists(pid_file
):
531 if p
.poll() is not None:
533 "qemu-nbd terminated with exit code {}: {}"
534 .format(p
.returncode
, ' '.join(cmd
)))
539 if os
.path
.exists(pid_file
):
541 log('Kill NBD server')
545 def compare_images(img1
: str, img2
: str,
546 fmt1
: str = imgfmt
, fmt2
: str = imgfmt
) -> bool:
548 Compare two images with QEMU_IMG; return True if they are identical.
550 :raise CalledProcessError:
551 when qemu-img crashes or returns a status code of anything other
552 than 0 (identical) or 1 (different).
555 qemu_img('compare', '-f', fmt1
, '-F', fmt2
, img1
, img2
)
557 except subprocess
.CalledProcessError
as exc
:
558 if exc
.returncode
== 1:
562 def create_image(name
, size
):
563 '''Create a fully-allocated raw image with sector markers'''
564 with
open(name
, 'wb') as file:
567 sector
= struct
.pack('>l504xl', i
// 512, i
// 512)
571 def image_size(img
: str) -> int:
572 """Return image's virtual size"""
573 value
= qemu_img_info('-f', imgfmt
, img
)['virtual-size']
574 if not isinstance(value
, int):
575 type_name
= type(value
).__name
__
576 raise TypeError("Expected 'int' for 'virtual-size', "
577 f
"got '{value}' of type '{type_name}'")
581 return isinstance(val
, str)
583 test_dir_re
= re
.compile(r
"%s" % test_dir
)
584 def filter_test_dir(msg
):
585 return test_dir_re
.sub("TEST_DIR", msg
)
587 win32_re
= re
.compile(r
"\r")
588 def filter_win32(msg
):
589 return win32_re
.sub("", msg
)
591 qemu_io_re
= re
.compile(r
"[0-9]* ops; [0-9\/:. sec]* "
592 r
"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
593 r
"and [0-9\/.inf]* ops\/sec\)")
594 def filter_qemu_io(msg
):
595 msg
= filter_win32(msg
)
596 return qemu_io_re
.sub("X ops; XX:XX:XX.X "
597 "(XXX YYY/sec and XXX ops/sec)", msg
)
599 chown_re
= re
.compile(r
"chown [0-9]+:[0-9]+")
600 def filter_chown(msg
):
601 return chown_re
.sub("chown UID:GID", msg
)
603 def filter_qmp_event(event
):
604 '''Filter a QMP event dict'''
606 if 'timestamp' in event
:
607 event
['timestamp']['seconds'] = 'SECS'
608 event
['timestamp']['microseconds'] = 'USECS'
611 def filter_qmp(qmsg
, filter_fn
):
612 '''Given a string filter, filter a QMP object's values.
613 filter_fn takes a (key, value) pair.'''
614 # Iterate through either lists or dicts;
615 if isinstance(qmsg
, list):
616 items
= enumerate(qmsg
)
617 elif isinstance(qmsg
, dict):
620 return filter_fn(None, qmsg
)
623 if isinstance(v
, (dict, list)):
624 qmsg
[k
] = filter_qmp(v
, filter_fn
)
626 qmsg
[k
] = filter_fn(k
, v
)
629 def filter_testfiles(msg
):
630 pref1
= os
.path
.join(test_dir
, "%s-" % (os
.getpid()))
631 pref2
= os
.path
.join(sock_dir
, "%s-" % (os
.getpid()))
632 return msg
.replace(pref1
, 'TEST_DIR/PID-').replace(pref2
, 'SOCK_DIR/PID-')
634 def filter_qmp_testfiles(qmsg
):
635 def _filter(_key
, value
):
637 return filter_testfiles(value
)
639 return filter_qmp(qmsg
, _filter
)
641 def filter_virtio_scsi(output
: str) -> str:
642 return re
.sub(r
'(virtio-scsi)-(ccw|pci)', r
'\1', output
)
644 def filter_qmp_virtio_scsi(qmsg
):
645 def _filter(_key
, value
):
647 return filter_virtio_scsi(value
)
649 return filter_qmp(qmsg
, _filter
)
651 def filter_generated_node_ids(msg
):
652 return re
.sub("#block[0-9]+", "NODE_NAME", msg
)
654 def filter_qmp_generated_node_ids(qmsg
):
655 def _filter(_key
, value
):
657 return filter_generated_node_ids(value
)
659 return filter_qmp(qmsg
, _filter
)
661 def filter_img_info(output
: str, filename
: str,
662 drop_child_info
: bool = True) -> str:
664 drop_indented
= False
665 for line
in output
.split('\n'):
666 if 'disk size' in line
or 'actual-size' in line
:
669 # Drop child node info
671 if line
.startswith(' '):
673 drop_indented
= False
674 if drop_child_info
and "Child node '/" in line
:
678 line
= line
.replace(filename
, 'TEST_IMG')
679 line
= filter_testfiles(line
)
680 line
= line
.replace(imgfmt
, 'IMGFMT')
681 line
= re
.sub('iters: [0-9]+', 'iters: XXX', line
)
682 line
= re
.sub('uuid: [-a-f0-9]+',
683 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
685 line
= re
.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line
)
686 line
= re
.sub('(compression type: )(zlib|zstd)', r
'\1COMPRESSION_TYPE',
689 return '\n'.join(lines
)
691 def filter_imgfmt(msg
):
692 return msg
.replace(imgfmt
, 'IMGFMT')
694 def filter_qmp_imgfmt(qmsg
):
695 def _filter(_key
, value
):
697 return filter_imgfmt(value
)
699 return filter_qmp(qmsg
, _filter
)
701 def filter_nbd_exports(output
: str) -> str:
702 return re
.sub(r
'((min|opt|max) block): [0-9]+', r
'\1: XXX', output
)
705 Msg
= TypeVar('Msg', Dict
[str, Any
], List
[Any
], str)
708 filters
: Iterable
[Callable
[[Msg
], Msg
]] = (),
709 indent
: Optional
[int] = None) -> None:
711 Logs either a string message or a JSON serializable message (like QMP).
712 If indent is provided, JSON serializable messages are pretty-printed.
716 if isinstance(msg
, (dict, list)):
717 # Don't sort if it's already sorted
718 do_sort
= not isinstance(msg
, OrderedDict
)
719 test_logger
.info(json
.dumps(msg
, sort_keys
=do_sort
, indent
=indent
))
721 test_logger
.info(msg
)
724 def __init__(self
, seconds
, errmsg
="Timeout"):
725 self
.seconds
= seconds
728 if qemu_gdb
or qemu_valgrind
:
730 signal
.signal(signal
.SIGALRM
, self
.timeout
)
731 signal
.setitimer(signal
.ITIMER_REAL
, self
.seconds
)
733 def __exit__(self
, exc_type
, value
, traceback
):
734 if qemu_gdb
or qemu_valgrind
:
736 signal
.setitimer(signal
.ITIMER_REAL
, 0)
738 def timeout(self
, signum
, frame
):
739 raise TimeoutError(self
.errmsg
)
741 def file_pattern(name
):
742 return "{0}-{1}".format(os
.getpid(), name
)
746 Context manager generating multiple file names. The generated files are
747 removed when exiting the context.
751 with FilePath('a.img', 'b.img') as (img_a, img_b):
752 # Use img_a and img_b here...
754 # a.img and b.img are automatically removed here.
756 By default images are created in iotests.test_dir. To create sockets use
759 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
761 For convenience, calling with one argument yields a single file instead of
762 a tuple with one item.
765 def __init__(self
, *names
, base_dir
=test_dir
):
766 self
.paths
= [os
.path
.join(base_dir
, file_pattern(name
))
770 if len(self
.paths
) == 1:
775 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
776 for path
in self
.paths
:
790 def file_path_remover():
791 for path
in reversed(file_path_remover
.paths
):
795 def file_path(*names
, base_dir
=test_dir
):
796 ''' Another way to get auto-generated filename that cleans itself up.
800 img_a, img_b = file_path('a.img', 'b.img')
801 sock = file_path('socket')
804 if not hasattr(file_path_remover
, 'paths'):
805 file_path_remover
.paths
= []
806 atexit
.register(file_path_remover
)
810 filename
= file_pattern(name
)
811 path
= os
.path
.join(base_dir
, filename
)
812 file_path_remover
.paths
.append(path
)
815 return paths
[0] if len(paths
) == 1 else paths
817 def remote_filename(path
):
818 if imgproto
== 'file':
820 elif imgproto
== 'ssh':
821 return "ssh://%s@127.0.0.1:22%s" % (os
.environ
.get('USER'), path
)
823 raise ValueError("Protocol %s not supported" % (imgproto
))
825 class VM(qtest
.QEMUQtestMachine
):
828 def __init__(self
, path_suffix
=''):
829 name
= "qemu%s-%d" % (path_suffix
, os
.getpid())
830 timer
= 15.0 if not (qemu_gdb
or qemu_valgrind
) else None
831 if qemu_gdb
and qemu_valgrind
:
832 sys
.stderr
.write('gdb and valgrind are mutually exclusive\n')
834 wrapper
= qemu_gdb
if qemu_gdb
else qemu_valgrind
835 super().__init
__(qemu_prog
, qemu_opts
, wrapper
=wrapper
,
837 base_temp_dir
=test_dir
,
841 def _post_shutdown(self
) -> None:
842 super()._post
_shutdown
()
843 if not qemu_valgrind
or not self
._popen
:
845 valgrind_filename
= f
"{test_dir}/{self._popen.pid}.valgrind"
846 if self
.exitcode() == 99:
847 with
open(valgrind_filename
, encoding
='utf-8') as f
:
850 os
.remove(valgrind_filename
)
852 def _pre_launch(self
) -> None:
853 super()._pre
_launch
()
855 # set QEMU binary output to stdout
856 self
._close
_qemu
_log
_file
()
858 def add_object(self
, opts
):
859 self
._args
.append('-object')
860 self
._args
.append(opts
)
863 def add_device(self
, opts
):
864 self
._args
.append('-device')
865 self
._args
.append(opts
)
868 def add_drive_raw(self
, opts
):
869 self
._args
.append('-drive')
870 self
._args
.append(opts
)
873 def add_drive(self
, path
, opts
='', interface
='virtio', img_format
=imgfmt
):
874 '''Add a virtio-blk drive to the VM'''
875 options
= ['if=%s' % interface
,
876 'id=drive%d' % self
._num
_drives
]
879 options
.append('file=%s' % path
)
880 options
.append('format=%s' % img_format
)
881 options
.append('cache=%s' % cachemode
)
882 options
.append('aio=%s' % aiomode
)
887 if img_format
== 'luks' and 'key-secret' not in opts
:
888 # default luks support
889 if luks_default_secret_object
not in self
._args
:
890 self
.add_object(luks_default_secret_object
)
892 options
.append(luks_default_key_secret_opt
)
894 self
._args
.append('-drive')
895 self
._args
.append(','.join(options
))
896 self
._num
_drives
+= 1
899 def add_blockdev(self
, opts
):
900 self
._args
.append('-blockdev')
901 if isinstance(opts
, str):
902 self
._args
.append(opts
)
904 self
._args
.append(','.join(opts
))
907 def add_incoming(self
, addr
):
908 self
._args
.append('-incoming')
909 self
._args
.append(addr
)
912 def hmp(self
, command_line
: str, use_log
: bool = False) -> QMPMessage
:
913 cmd
= 'human-monitor-command'
914 kwargs
: Dict
[str, Any
] = {'command-line': command_line
}
916 return self
.qmp_log(cmd
, **kwargs
)
918 return self
.qmp(cmd
, **kwargs
)
920 def pause_drive(self
, drive
: str, event
: Optional
[str] = None) -> None:
921 """Pause drive r/w operations"""
923 self
.pause_drive(drive
, "read_aio")
924 self
.pause_drive(drive
, "write_aio")
926 self
.hmp(f
'qemu-io {drive} "break {event} bp_{drive}"')
928 def resume_drive(self
, drive
: str) -> None:
929 """Resume drive r/w operations"""
930 self
.hmp(f
'qemu-io {drive} "remove_break bp_{drive}"')
932 def hmp_qemu_io(self
, drive
: str, cmd
: str,
933 use_log
: bool = False, qdev
: bool = False) -> QMPMessage
:
934 """Write to a given drive using an HMP command"""
935 d
= '-d ' if qdev
else ''
936 return self
.hmp(f
'qemu-io {d}{drive} "{cmd}"', use_log
=use_log
)
938 def flatten_qmp_object(self
, obj
, output
=None, basestr
=''):
941 if isinstance(obj
, list):
942 for i
, item
in enumerate(obj
):
943 self
.flatten_qmp_object(item
, output
, basestr
+ str(i
) + '.')
944 elif isinstance(obj
, dict):
946 self
.flatten_qmp_object(obj
[key
], output
, basestr
+ key
+ '.')
948 output
[basestr
[:-1]] = obj
# Strip trailing '.'
951 def qmp_to_opts(self
, obj
):
952 obj
= self
.flatten_qmp_object(obj
)
955 output_list
+= [key
+ '=' + obj
[key
]]
956 return ','.join(output_list
)
958 def get_qmp_events_filtered(self
, wait
=60.0):
960 for ev
in self
.get_qmp_events(wait
=wait
):
961 result
.append(filter_qmp_event(ev
))
964 def qmp_log(self
, cmd
, filters
=(), indent
=None, **kwargs
):
965 full_cmd
= OrderedDict((
967 ("arguments", ordered_qmp(kwargs
))
969 log(full_cmd
, filters
, indent
=indent
)
970 result
= self
.qmp(cmd
, **kwargs
)
971 log(result
, filters
, indent
=indent
)
974 # Returns None on success, and an error string on failure
975 def run_job(self
, job
: str, auto_finalize
: bool = True,
976 auto_dismiss
: bool = False,
977 pre_finalize
: Optional
[Callable
[[], None]] = None,
978 cancel
: bool = False, wait
: float = 60.0,
979 filters
: Iterable
[Callable
[[Any
], Any
]] = (),
982 run_job moves a job from creation through to dismissal.
984 :param job: String. ID of recently-launched job
985 :param auto_finalize: Bool. True if the job was launched with
986 auto_finalize. Defaults to True.
987 :param auto_dismiss: Bool. True if the job was launched with
988 auto_dismiss=True. Defaults to False.
989 :param pre_finalize: Callback. A callable that takes no arguments to be
990 invoked prior to issuing job-finalize, if any.
991 :param cancel: Bool. When true, cancels the job after the pre_finalize
993 :param wait: Float. Timeout value specifying how long to wait for any
994 event, in seconds. Defaults to 60.0.
996 match_device
= {'data': {'device': job
}}
997 match_id
= {'data': {'id': job
}}
999 ('BLOCK_JOB_COMPLETED', match_device
),
1000 ('BLOCK_JOB_CANCELLED', match_device
),
1001 ('BLOCK_JOB_ERROR', match_device
),
1002 ('BLOCK_JOB_READY', match_device
),
1003 ('BLOCK_JOB_PENDING', match_id
),
1004 ('JOB_STATUS_CHANGE', match_id
)
1008 ev
= filter_qmp_event(self
.events_wait(events
, timeout
=wait
))
1009 if ev
['event'] != 'JOB_STATUS_CHANGE':
1010 log(ev
, filters
=filters
)
1012 status
= ev
['data']['status']
1013 if status
== 'aborting':
1014 result
= self
.qmp('query-jobs')
1015 for j
in result
['return']:
1018 log('Job failed: %s' % (j
['error']), filters
=filters
)
1019 elif status
== 'ready':
1020 self
.qmp_log('job-complete', id=job
, filters
=filters
)
1021 elif status
== 'pending' and not auto_finalize
:
1025 self
.qmp_log('job-cancel', id=job
, filters
=filters
)
1027 self
.qmp_log('job-finalize', id=job
, filters
=filters
)
1028 elif status
== 'concluded' and not auto_dismiss
:
1029 self
.qmp_log('job-dismiss', id=job
, filters
=filters
)
1030 elif status
== 'null':
1033 # Returns None on success, and an error string on failure
1034 def blockdev_create(self
, options
, job_id
='job0', filters
=None):
1036 filters
= [filter_qmp_testfiles
]
1037 result
= self
.qmp_log('blockdev-create', filters
=filters
,
1038 job_id
=job_id
, options
=options
)
1040 if 'return' in result
:
1041 assert result
['return'] == {}
1042 job_result
= self
.run_job(job_id
, filters
=filters
)
1044 job_result
= result
['error']
1049 def enable_migration_events(self
, name
):
1050 log('Enabling migration QMP events on %s...' % name
)
1051 log(self
.qmp('migrate-set-capabilities', capabilities
=[
1053 'capability': 'events',
1058 def wait_migration(self
, expect_runstate
: Optional
[str]) -> bool:
1060 event
= self
.event_wait('MIGRATION')
1061 # We use the default timeout, and with a timeout, event_wait()
1062 # never returns None
1065 log(event
, filters
=[filter_qmp_event
])
1066 if event
['data']['status'] in ('completed', 'failed'):
1069 if event
['data']['status'] == 'completed':
1070 # The event may occur in finish-migrate, so wait for the expected
1071 # post-migration runstate
1073 while runstate
!= expect_runstate
:
1074 runstate
= self
.qmp('query-status')['return']['status']
1079 def node_info(self
, node_name
):
1080 nodes
= self
.qmp('query-named-block-nodes')
1081 for x
in nodes
['return']:
1082 if x
['node-name'] == node_name
:
1086 def query_bitmaps(self
):
1087 res
= self
.qmp("query-named-block-nodes")
1088 return {device
['node-name']: device
['dirty-bitmaps']
1089 for device
in res
['return'] if 'dirty-bitmaps' in device
}
1091 def get_bitmap(self
, node_name
, bitmap_name
, recording
=None, bitmaps
=None):
1093 get a specific bitmap from the object returned by query_bitmaps.
1094 :param recording: If specified, filter results by the specified value.
1095 :param bitmaps: If specified, use it instead of call query_bitmaps()
1098 bitmaps
= self
.query_bitmaps()
1100 for bitmap
in bitmaps
[node_name
]:
1101 if bitmap
.get('name', '') == bitmap_name
:
1102 if recording
is None or bitmap
.get('recording') == recording
:
1106 def check_bitmap_status(self
, node_name
, bitmap_name
, fields
):
1107 ret
= self
.get_bitmap(node_name
, bitmap_name
)
1109 return fields
.items() <= ret
.items()
1111 def assert_block_path(self
, root
, path
, expected_node
, graph
=None):
1113 Check whether the node under the given path in the block graph
1116 @root is the node name of the node where the @path is rooted.
1118 @path is a string that consists of child names separated by
1119 slashes. It must begin with a slash.
1121 Examples for @root + @path:
1122 - root="qcow2-node", path="/backing/file"
1123 - root="quorum-node", path="/children.2/file"
1125 Hypothetically, @path could be empty, in which case it would
1126 point to @root. However, in practice this case is not useful
1127 and hence not allowed.
1129 @expected_node may be None. (All elements of the path but the
1130 leaf must still exist.)
1132 @graph may be None or the result of an x-debug-query-block-graph
1133 call that has already been performed.
1136 graph
= self
.qmp('x-debug-query-block-graph')['return']
1138 iter_path
= iter(path
.split('/'))
1140 # Must start with a /
1141 assert next(iter_path
) == ''
1143 node
= next((node
for node
in graph
['nodes'] if node
['name'] == root
),
1146 # An empty @path is not allowed, so the root node must be present
1147 assert node
is not None, 'Root node %s not found' % root
1149 for child_name
in iter_path
:
1150 assert node
is not None, 'Cannot follow path %s%s' % (root
, path
)
1153 node_id
= next(edge
['child'] for edge
in graph
['edges']
1154 if (edge
['parent'] == node
['id'] and
1155 edge
['name'] == child_name
))
1157 node
= next(node
for node
in graph
['nodes']
1158 if node
['id'] == node_id
)
1160 except StopIteration:
1164 assert expected_node
is None, \
1165 'No node found under %s (but expected %s)' % \
1166 (path
, expected_node
)
1168 assert node
['name'] == expected_node
, \
1169 'Found node %s under %s (but expected %s)' % \
1170 (node
['name'], path
, expected_node
)
1172 index_re
= re
.compile(r
'([^\[]+)\[([^\]]+)\]')
1174 class QMPTestCase(unittest
.TestCase
):
1175 '''Abstract base class for QMP test cases'''
1177 def __init__(self
, *args
, **kwargs
):
1178 super().__init
__(*args
, **kwargs
)
1179 # Many users of this class set a VM property we rely on heavily
1180 # in the methods below.
1183 def dictpath(self
, d
, path
):
1184 '''Traverse a path in a nested dict'''
1185 for component
in path
.split('/'):
1186 m
= index_re
.match(component
)
1188 component
, idx
= m
.groups()
1191 if not isinstance(d
, dict) or component
not in d
:
1192 self
.fail(f
'failed path traversal for "{path}" in "{d}"')
1196 if not isinstance(d
, list):
1197 self
.fail(f
'path component "{component}" in "{path}" '
1198 f
'is not a list in "{d}"')
1202 self
.fail(f
'invalid index "{idx}" in path "{path}" '
1206 def assert_qmp_absent(self
, d
, path
):
1208 result
= self
.dictpath(d
, path
)
1209 except AssertionError:
1211 self
.fail('path "%s" has value "%s"' % (path
, str(result
)))
1213 def assert_qmp(self
, d
, path
, value
):
1214 '''Assert that the value for a specific path in a QMP dict
1215 matches. When given a list of values, assert that any of
1218 result
= self
.dictpath(d
, path
)
1220 # [] makes no sense as a list of valid values, so treat it as
1221 # an actual single value.
1222 if isinstance(value
, list) and value
!= []:
1226 self
.fail('no match for "%s" in %s' % (str(result
), str(value
)))
1228 self
.assertEqual(result
, value
,
1229 '"%s" is "%s", expected "%s"'
1230 % (path
, str(result
), str(value
)))
1232 def assert_no_active_block_jobs(self
):
1233 result
= self
.vm
.qmp('query-block-jobs')
1234 self
.assert_qmp(result
, 'return', [])
1236 def assert_has_block_node(self
, node_name
=None, file_name
=None):
1237 """Issue a query-named-block-nodes and assert node_name and/or
1238 file_name is present in the result"""
1239 def check_equal_or_none(a
, b
):
1240 return a
is None or b
is None or a
== b
1241 assert node_name
or file_name
1242 result
= self
.vm
.qmp('query-named-block-nodes')
1243 for x
in result
["return"]:
1244 if check_equal_or_none(x
.get("node-name"), node_name
) and \
1245 check_equal_or_none(x
.get("file"), file_name
):
1247 self
.fail("Cannot find %s %s in result:\n%s" %
1248 (node_name
, file_name
, result
))
1250 def assert_json_filename_equal(self
, json_filename
, reference
):
1251 '''Asserts that the given filename is a json: filename and that its
1252 content is equal to the given reference object'''
1253 self
.assertEqual(json_filename
[:5], 'json:')
1255 self
.vm
.flatten_qmp_object(json
.loads(json_filename
[5:])),
1256 self
.vm
.flatten_qmp_object(reference
)
1259 def cancel_and_wait(self
, drive
='drive0', force
=False,
1260 resume
=False, wait
=60.0):
1261 '''Cancel a block job and wait for it to finish, returning the event'''
1262 self
.vm
.cmd('block-job-cancel', device
=drive
, force
=force
)
1265 self
.vm
.resume_drive(drive
)
1269 while not cancelled
:
1270 for event
in self
.vm
.get_qmp_events(wait
=wait
):
1271 if event
['event'] == 'BLOCK_JOB_COMPLETED' or \
1272 event
['event'] == 'BLOCK_JOB_CANCELLED':
1273 self
.assert_qmp(event
, 'data/device', drive
)
1276 elif event
['event'] == 'JOB_STATUS_CHANGE':
1277 self
.assert_qmp(event
, 'data/id', drive
)
1280 self
.assert_no_active_block_jobs()
1283 def wait_until_completed(self
, drive
='drive0', check_offset
=True,
1284 wait
=60.0, error
=None):
1285 '''Wait for a block job to finish, returning the event'''
1287 for event
in self
.vm
.get_qmp_events(wait
=wait
):
1288 if event
['event'] == 'BLOCK_JOB_COMPLETED':
1289 self
.assert_qmp(event
, 'data/device', drive
)
1291 self
.assert_qmp_absent(event
, 'data/error')
1293 self
.assert_qmp(event
, 'data/offset',
1294 event
['data']['len'])
1296 self
.assert_qmp(event
, 'data/error', error
)
1297 self
.assert_no_active_block_jobs()
1299 if event
['event'] == 'JOB_STATUS_CHANGE':
1300 self
.assert_qmp(event
, 'data/id', drive
)
1302 def wait_ready(self
, drive
='drive0'):
1303 """Wait until a BLOCK_JOB_READY event, and return the event."""
1304 return self
.vm
.events_wait([
1306 {'data': {'type': 'mirror', 'device': drive
}}),
1308 {'data': {'type': 'commit', 'device': drive
}})
1311 def wait_ready_and_cancel(self
, drive
='drive0'):
1312 self
.wait_ready(drive
=drive
)
1313 event
= self
.cancel_and_wait(drive
=drive
)
1314 self
.assertEqual(event
['event'], 'BLOCK_JOB_COMPLETED')
1315 self
.assert_qmp(event
, 'data/type', 'mirror')
1316 self
.assert_qmp(event
, 'data/offset', event
['data']['len'])
1318 def complete_and_wait(self
, drive
='drive0', wait_ready
=True,
1319 completion_error
=None):
1320 '''Complete a block job and wait for it to finish'''
1322 self
.wait_ready(drive
=drive
)
1324 self
.vm
.cmd('block-job-complete', device
=drive
)
1326 event
= self
.wait_until_completed(drive
=drive
, error
=completion_error
)
1327 self
.assertTrue(event
['data']['type'] in ['mirror', 'commit'])
1329 def pause_wait(self
, job_id
='job0'):
1330 with
Timeout(3, "Timeout waiting for job to pause"):
1332 result
= self
.vm
.qmp('query-block-jobs')
1334 for job
in result
['return']:
1335 if job
['device'] == job_id
:
1337 if job
['paused'] and not job
['busy']:
1342 def pause_job(self
, job_id
='job0', wait
=True):
1343 self
.vm
.cmd('block-job-pause', device
=job_id
)
1345 self
.pause_wait(job_id
)
1347 def case_skip(self
, reason
):
1348 '''Skip this test case'''
1350 self
.skipTest(reason
)
1354 '''Skip this test suite'''
1355 # Each test in qemu-iotests has a number ("seq")
1356 seq
= os
.path
.basename(sys
.argv
[0])
1358 with
open('%s/%s.notrun' % (test_dir
, seq
), 'w', encoding
='utf-8') \
1360 outfile
.write(reason
+ '\n')
1361 logger
.warning("%s not run: %s", seq
, reason
)
1364 def case_notrun(reason
):
1365 '''Mark this test case as not having been run (without actually
1366 skipping it, that is left to the caller). See
1367 QMPTestCase.case_skip() for a variant that actually skips the
1368 current test case.'''
1370 # Each test in qemu-iotests has a number ("seq")
1371 seq
= os
.path
.basename(sys
.argv
[0])
1373 with
open('%s/%s.casenotrun' % (test_dir
, seq
), 'a', encoding
='utf-8') \
1375 outfile
.write(' [case not run] ' + reason
+ '\n')
1377 def _verify_image_format(supported_fmts
: Sequence
[str] = (),
1378 unsupported_fmts
: Sequence
[str] = ()) -> None:
1379 if 'generic' in supported_fmts
and \
1380 os
.environ
.get('IMGFMT_GENERIC', 'true') == 'true':
1382 # _supported_fmt generic
1386 not_sup
= supported_fmts
and (imgfmt
not in supported_fmts
)
1387 if not_sup
or (imgfmt
in unsupported_fmts
):
1388 notrun('not suitable for this image format: %s' % imgfmt
)
1390 if imgfmt
== 'luks':
1391 verify_working_luks()
1393 def _verify_protocol(supported
: Sequence
[str] = (),
1394 unsupported
: Sequence
[str] = ()) -> None:
1395 assert not (supported
and unsupported
)
1397 if 'generic' in supported
:
1400 not_sup
= supported
and (imgproto
not in supported
)
1401 if not_sup
or (imgproto
in unsupported
):
1402 notrun('not suitable for this protocol: %s' % imgproto
)
1404 def _verify_platform(supported
: Sequence
[str] = (),
1405 unsupported
: Sequence
[str] = ()) -> None:
1406 if any((sys
.platform
.startswith(x
) for x
in unsupported
)):
1407 notrun('not suitable for this OS: %s' % sys
.platform
)
1410 if not any((sys
.platform
.startswith(x
) for x
in supported
)):
1411 notrun('not suitable for this OS: %s' % sys
.platform
)
1413 def _verify_cache_mode(supported_cache_modes
: Sequence
[str] = ()) -> None:
1414 if supported_cache_modes
and (cachemode
not in supported_cache_modes
):
1415 notrun('not suitable for this cache mode: %s' % cachemode
)
1417 def _verify_aio_mode(supported_aio_modes
: Sequence
[str] = ()) -> None:
1418 if supported_aio_modes
and (aiomode
not in supported_aio_modes
):
1419 notrun('not suitable for this aio mode: %s' % aiomode
)
1421 def _verify_formats(required_formats
: Sequence
[str] = ()) -> None:
1422 usf_list
= list(set(required_formats
) - set(supported_formats()))
1424 notrun(f
'formats {usf_list} are not whitelisted')
1427 def _verify_virtio_blk() -> None:
1428 out
= qemu_pipe('-M', 'none', '-device', 'help')
1429 if 'virtio-blk' not in out
:
1430 notrun('Missing virtio-blk in QEMU binary')
1432 def verify_virtio_scsi_pci_or_ccw() -> None:
1433 out
= qemu_pipe('-M', 'none', '-device', 'help')
1434 if 'virtio-scsi-pci' not in out
and 'virtio-scsi-ccw' not in out
:
1435 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1438 def _verify_imgopts(unsupported
: Sequence
[str] = ()) -> None:
1439 imgopts
= os
.environ
.get('IMGOPTS')
1440 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1441 # but it supported only for bash tests. We don't have a concept of global
1442 # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1443 # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1444 unsup
= list(unsupported
) + ['$']
1445 if imgopts
and any(x
in imgopts
for x
in unsup
):
1446 notrun(f
'not suitable for this imgopts: {imgopts}')
1449 def supports_quorum() -> bool:
1450 return 'quorum' in qemu_img('--help').stdout
1452 def verify_quorum():
1453 '''Skip test suite if quorum support is not available'''
1454 if not supports_quorum():
1455 notrun('quorum support missing')
1457 def has_working_luks() -> Tuple
[bool, str]:
1459 Check whether our LUKS driver can actually create images
1460 (this extends to LUKS encryption for qcow2).
1462 If not, return the reason why.
1465 img_file
= f
'{test_dir}/luks-test.luks'
1466 res
= qemu_img('create', '-f', 'luks',
1467 '--object', luks_default_secret_object
,
1468 '-o', luks_default_key_secret_opt
,
1469 '-o', 'iter-time=10',
1479 for line
in res
.stdout
.splitlines():
1480 if img_file
+ ':' in line
:
1481 reason
= line
.split(img_file
+ ':', 1)[1].strip()
1484 return (False, reason
)
1488 def verify_working_luks():
1490 Skip test suite if LUKS does not work
1492 (working
, reason
) = has_working_luks()
1496 def supports_qcow2_zstd_compression() -> bool:
1497 img_file
= f
'{test_dir}/qcow2-zstd-test.qcow2'
1498 res
= qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd',
1506 if res
.returncode
== 1 and \
1507 "'compression-type' does not accept value 'zstd'" in res
.stdout
:
1512 def verify_qcow2_zstd_compression():
1513 if not supports_qcow2_zstd_compression():
1514 notrun('zstd compression not supported')
1516 def qemu_pipe(*args
: str) -> str:
1518 Run qemu with an option to print something and exit (e.g. a help option).
1520 :return: QEMU's stdout output.
1522 full_args
= [qemu_prog
] + qemu_opts
+ list(args
)
1523 output
, _
= qemu_tool_pipe_and_status('qemu', full_args
)
1526 def supported_formats(read_only
=False):
1527 '''Set 'read_only' to True to check ro-whitelist
1528 Otherwise, rw-whitelist is checked'''
1530 if not hasattr(supported_formats
, "formats"):
1531 supported_formats
.formats
= {}
1533 if read_only
not in supported_formats
.formats
:
1534 format_message
= qemu_pipe("-drive", "format=help")
1535 line
= 1 if read_only
else 0
1536 supported_formats
.formats
[read_only
] = \
1537 format_message
.splitlines()[line
].split(":")[1].split()
1539 return supported_formats
.formats
[read_only
]
1541 def skip_if_unsupported(required_formats
=(), read_only
=False):
1542 '''Skip Test Decorator
1543 Runs the test if all the required formats are whitelisted'''
1544 def skip_test_decorator(func
):
1545 def func_wrapper(test_case
: QMPTestCase
, *args
: List
[Any
],
1546 **kwargs
: Dict
[str, Any
]) -> None:
1547 if callable(required_formats
):
1548 fmts
= required_formats(test_case
)
1550 fmts
= required_formats
1552 usf_list
= list(set(fmts
) - set(supported_formats(read_only
)))
1554 msg
= f
'{test_case}: formats {usf_list} are not whitelisted'
1555 test_case
.case_skip(msg
)
1557 func(test_case
, *args
, **kwargs
)
1559 return skip_test_decorator
1561 def skip_for_formats(formats
: Sequence
[str] = ()) \
1562 -> Callable
[[Callable
[[QMPTestCase
, List
[Any
], Dict
[str, Any
]], None]],
1563 Callable
[[QMPTestCase
, List
[Any
], Dict
[str, Any
]], None]]:
1564 '''Skip Test Decorator
1565 Skips the test for the given formats'''
1566 def skip_test_decorator(func
):
1567 def func_wrapper(test_case
: QMPTestCase
, *args
: List
[Any
],
1568 **kwargs
: Dict
[str, Any
]) -> None:
1569 if imgfmt
in formats
:
1570 msg
= f
'{test_case}: Skipped for format {imgfmt}'
1571 test_case
.case_skip(msg
)
1573 func(test_case
, *args
, **kwargs
)
1575 return skip_test_decorator
1577 def skip_if_user_is_root(func
):
1578 '''Skip Test Decorator
1579 Runs the test only without root permissions'''
1580 def func_wrapper(*args
, **kwargs
):
1581 if os
.getuid() == 0:
1582 case_notrun('{}: cannot be run as root'.format(args
[0]))
1585 return func(*args
, **kwargs
)
1588 # We need to filter out the time taken from the output so that
1589 # qemu-iotest can reliably diff the results against master output,
1590 # and hide skipped tests from the reference output.
1592 class ReproducibleTestResult(unittest
.TextTestResult
):
1593 def addSkip(self
, test
, reason
):
1594 # Same as TextTestResult, but print dot instead of "s"
1595 unittest
.TestResult
.addSkip(self
, test
, reason
)
1597 self
.stream
.writeln("skipped {0!r}".format(reason
))
1599 self
.stream
.write(".")
1602 class ReproducibleStreamWrapper
:
1603 def __init__(self
, stream
: TextIO
):
1604 self
.stream
= stream
1606 def __getattr__(self
, attr
):
1607 if attr
in ('stream', '__getstate__'):
1608 raise AttributeError(attr
)
1609 return getattr(self
.stream
, attr
)
1611 def write(self
, arg
=None):
1612 arg
= re
.sub(r
'Ran (\d+) tests? in [\d.]+s', r
'Ran \1 tests', arg
)
1613 arg
= re
.sub(r
' \(skipped=\d+\)', r
'', arg
)
1614 self
.stream
.write(arg
)
1616 class ReproducibleTestRunner(unittest
.TextTestRunner
):
1617 def __init__(self
, stream
: Optional
[TextIO
] = None,
1618 resultclass
: Type
[unittest
.TestResult
] =
1619 ReproducibleTestResult
,
1620 **kwargs
: Any
) -> None:
1621 rstream
= ReproducibleStreamWrapper(stream
or sys
.stdout
)
1622 super().__init
__(stream
=rstream
, # type: ignore
1624 resultclass
=resultclass
,
1627 def execute_unittest(argv
: List
[str], debug
: bool = False) -> None:
1628 """Executes unittests within the calling module."""
1630 # Some tests have warnings, especially ResourceWarnings for unclosed
1631 # files and sockets. Ignore them for now to ensure reproducibility of
1633 unittest
.main(argv
=argv
,
1634 testRunner
=ReproducibleTestRunner
,
1635 verbosity
=2 if debug
else 1,
1636 warnings
=None if sys
.warnoptions
else 'ignore')
1638 def execute_setup_common(supported_fmts
: Sequence
[str] = (),
1639 supported_platforms
: Sequence
[str] = (),
1640 supported_cache_modes
: Sequence
[str] = (),
1641 supported_aio_modes
: Sequence
[str] = (),
1642 unsupported_fmts
: Sequence
[str] = (),
1643 supported_protocols
: Sequence
[str] = (),
1644 unsupported_protocols
: Sequence
[str] = (),
1645 required_fmts
: Sequence
[str] = (),
1646 unsupported_imgopts
: Sequence
[str] = ()) -> bool:
1648 Perform necessary setup for either script-style or unittest-style tests.
1650 :return: Bool; Whether or not debug mode has been requested via the CLI.
1652 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1654 debug
= '-d' in sys
.argv
1656 sys
.argv
.remove('-d')
1657 logging
.basicConfig(level
=(logging
.DEBUG
if debug
else logging
.WARN
))
1659 _verify_image_format(supported_fmts
, unsupported_fmts
)
1660 _verify_protocol(supported_protocols
, unsupported_protocols
)
1661 _verify_platform(supported
=supported_platforms
)
1662 _verify_cache_mode(supported_cache_modes
)
1663 _verify_aio_mode(supported_aio_modes
)
1664 _verify_formats(required_fmts
)
1665 _verify_virtio_blk()
1666 _verify_imgopts(unsupported_imgopts
)
1670 def execute_test(*args
, test_function
=None, **kwargs
):
1671 """Run either unittest or script-style tests."""
1673 debug
= execute_setup_common(*args
, **kwargs
)
1674 if not test_function
:
1675 execute_unittest(sys
.argv
, debug
)
1679 def activate_logging():
1680 """Activate iotests.log() output to stdout for script-style tests."""
1681 handler
= logging
.StreamHandler(stream
=sys
.stdout
)
1682 formatter
= logging
.Formatter('%(message)s')
1683 handler
.setFormatter(formatter
)
1684 test_logger
.addHandler(handler
)
1685 test_logger
.setLevel(logging
.INFO
)
1686 test_logger
.propagate
= False
1688 # This is called from script-style iotests without a single point of entry
1689 def script_initialize(*args
, **kwargs
):
1690 """Initialize script-style tests without running any tests."""
1692 execute_setup_common(*args
, **kwargs
)
1694 # This is called from script-style iotests with a single point of entry
1695 def script_main(test_function
, *args
, **kwargs
):
1696 """Run script-style tests outside of the unittest framework"""
1698 execute_test(*args
, test_function
=test_function
, **kwargs
)
1700 # This is called from unittest style iotests
1701 def main(*args
, **kwargs
):
1702 """Run tests using the unittest framework"""
1703 execute_test(*args
, **kwargs
)