tests/acceptance: Ignore binary data sent on serial console
[qemu/ar7.git] / tests / acceptance / avocado_qemu / __init__.py
blobc3163af3b7b5d36b8d7222ae0e1ea5b183ca2b9c
1 # Test class and utilities for functional tests
3 # Copyright (c) 2018 Red Hat, Inc.
5 # Author:
6 # Cleber Rosa <crosa@redhat.com>
8 # This work is licensed under the terms of the GNU GPL, version 2 or
9 # later. See the COPYING file in the top-level directory.
11 import logging
12 import os
13 import shutil
14 import sys
15 import uuid
16 import tempfile
18 import avocado
20 from avocado.utils import cloudinit
21 from avocado.utils import datadrainer
22 from avocado.utils import network
23 from avocado.utils import ssh
24 from avocado.utils import vmimage
25 from avocado.utils.path import find_command
28 #: The QEMU build root directory. It may also be the source directory
29 #: if building from the source dir, but it's safer to use BUILD_DIR for
30 #: that purpose. Be aware that if this code is moved outside of a source
31 #: and build tree, it will not be accurate.
32 BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
34 if os.path.islink(os.path.dirname(os.path.dirname(__file__))):
35 # The link to the acceptance tests dir in the source code directory
36 lnk = os.path.dirname(os.path.dirname(__file__))
37 #: The QEMU root source directory
38 SOURCE_DIR = os.path.dirname(os.path.dirname(os.readlink(lnk)))
39 else:
40 SOURCE_DIR = BUILD_DIR
42 sys.path.append(os.path.join(SOURCE_DIR, 'python'))
44 from qemu.machine import QEMUMachine
45 from qemu.utils import (
46 get_info_usernet_hostfwd_port,
47 kvm_available,
48 tcg_available,
51 def is_readable_executable_file(path):
52 return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
55 def pick_default_qemu_bin(arch=None):
56 """
57 Picks the path of a QEMU binary, starting either in the current working
58 directory or in the source tree root directory.
60 :param arch: the arch to use when looking for a QEMU binary (the target
61 will match the arch given). If None (the default), arch
62 will be the current host system arch (as given by
63 :func:`os.uname`).
64 :type arch: str
65 :returns: the path to the default QEMU binary or None if one could not
66 be found
67 :rtype: str or None
68 """
69 if arch is None:
70 arch = os.uname()[4]
71 # qemu binary path does not match arch for powerpc, handle it
72 if 'ppc64le' in arch:
73 arch = 'ppc64'
74 qemu_bin_relative_path = "./qemu-system-%s" % arch
75 if is_readable_executable_file(qemu_bin_relative_path):
76 return qemu_bin_relative_path
78 qemu_bin_from_bld_dir_path = os.path.join(BUILD_DIR,
79 qemu_bin_relative_path)
80 if is_readable_executable_file(qemu_bin_from_bld_dir_path):
81 return qemu_bin_from_bld_dir_path
84 def _console_interaction(test, success_message, failure_message,
85 send_string, keep_sending=False, vm=None):
86 assert not keep_sending or send_string
87 if vm is None:
88 vm = test.vm
89 console = vm.console_socket.makefile(mode='rb', encoding='utf-8')
90 console_logger = logging.getLogger('console')
91 while True:
92 if send_string:
93 vm.console_socket.sendall(send_string.encode())
94 if not keep_sending:
95 send_string = None # send only once
96 try:
97 msg = console.readline().decode().strip()
98 except UnicodeDecodeError:
99 msg = None
100 if not msg:
101 continue
102 console_logger.debug(msg)
103 if success_message is None or success_message in msg:
104 break
105 if failure_message and failure_message in msg:
106 console.close()
107 fail = 'Failure message found in console: "%s". Expected: "%s"' % \
108 (failure_message, success_message)
109 test.fail(fail)
111 def interrupt_interactive_console_until_pattern(test, success_message,
112 failure_message=None,
113 interrupt_string='\r'):
115 Keep sending a string to interrupt a console prompt, while logging the
116 console output. Typical use case is to break a boot loader prompt, such:
118 Press a key within 5 seconds to interrupt boot process.
124 Booting default image...
126 :param test: an Avocado test containing a VM that will have its console
127 read and probed for a success or failure message
128 :type test: :class:`avocado_qemu.Test`
129 :param success_message: if this message appears, test succeeds
130 :param failure_message: if this message appears, test fails
131 :param interrupt_string: a string to send to the console before trying
132 to read a new line
134 _console_interaction(test, success_message, failure_message,
135 interrupt_string, True)
137 def wait_for_console_pattern(test, success_message, failure_message=None,
138 vm=None):
140 Waits for messages to appear on the console, while logging the content
142 :param test: an Avocado test containing a VM that will have its console
143 read and probed for a success or failure message
144 :type test: :class:`avocado_qemu.Test`
145 :param success_message: if this message appears, test succeeds
146 :param failure_message: if this message appears, test fails
148 _console_interaction(test, success_message, failure_message, None, vm=vm)
150 def exec_command(test, command):
152 Send a command to a console (appending CRLF characters), while logging
153 the content.
155 :param test: an Avocado test containing a VM.
156 :type test: :class:`avocado_qemu.Test`
157 :param command: the command to send
158 :type command: str
160 _console_interaction(test, None, None, command + '\r')
162 def exec_command_and_wait_for_pattern(test, command,
163 success_message, failure_message=None):
165 Send a command to a console (appending CRLF characters), then wait
166 for success_message to appear on the console, while logging the.
167 content. Mark the test as failed if failure_message is found instead.
169 :param test: an Avocado test containing a VM that will have its console
170 read and probed for a success or failure message
171 :type test: :class:`avocado_qemu.Test`
172 :param command: the command to send
173 :param success_message: if this message appears, test succeeds
174 :param failure_message: if this message appears, test fails
176 _console_interaction(test, success_message, failure_message, command + '\r')
178 class Test(avocado.Test):
179 def _get_unique_tag_val(self, tag_name):
181 Gets a tag value, if unique for a key
183 vals = self.tags.get(tag_name, [])
184 if len(vals) == 1:
185 return vals.pop()
186 return None
188 def require_accelerator(self, accelerator):
190 Requires an accelerator to be available for the test to continue
192 It takes into account the currently set qemu binary.
194 If the check fails, the test is canceled. If the check itself
195 for the given accelerator is not available, the test is also
196 canceled.
198 :param accelerator: name of the accelerator, such as "kvm" or "tcg"
199 :type accelerator: str
201 checker = {'tcg': tcg_available,
202 'kvm': kvm_available}.get(accelerator)
203 if checker is None:
204 self.cancel("Don't know how to check for the presence "
205 "of accelerator %s" % accelerator)
206 if not checker(qemu_bin=self.qemu_bin):
207 self.cancel("%s accelerator does not seem to be "
208 "available" % accelerator)
210 def setUp(self):
211 self._vms = {}
213 self.arch = self.params.get('arch',
214 default=self._get_unique_tag_val('arch'))
216 self.machine = self.params.get('machine',
217 default=self._get_unique_tag_val('machine'))
219 default_qemu_bin = pick_default_qemu_bin(arch=self.arch)
220 self.qemu_bin = self.params.get('qemu_bin',
221 default=default_qemu_bin)
222 if self.qemu_bin is None:
223 self.cancel("No QEMU binary defined or found in the build tree")
225 def _new_vm(self, name, *args):
226 self._sd = tempfile.TemporaryDirectory(prefix="avo_qemu_sock_")
227 vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir,
228 sock_dir=self._sd.name, log_dir=self.logdir)
229 self.log.debug('QEMUMachine "%s" created', name)
230 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
231 self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
232 if args:
233 vm.add_args(*args)
234 return vm
236 @property
237 def vm(self):
238 return self.get_vm(name='default')
240 def get_vm(self, *args, name=None):
241 if not name:
242 name = str(uuid.uuid4())
243 if self._vms.get(name) is None:
244 self._vms[name] = self._new_vm(name, *args)
245 if self.machine is not None:
246 self._vms[name].set_machine(self.machine)
247 return self._vms[name]
249 def tearDown(self):
250 for vm in self._vms.values():
251 vm.shutdown()
252 self._sd = None
254 def fetch_asset(self, name,
255 asset_hash=None, algorithm=None,
256 locations=None, expire=None,
257 find_only=False, cancel_on_missing=True):
258 return super(Test, self).fetch_asset(name,
259 asset_hash=asset_hash,
260 algorithm=algorithm,
261 locations=locations,
262 expire=expire,
263 find_only=find_only,
264 cancel_on_missing=cancel_on_missing)
267 class LinuxSSHMixIn:
268 """Contains utility methods for interacting with a guest via SSH."""
270 def ssh_connect(self, username, credential, credential_is_key=True):
271 self.ssh_logger = logging.getLogger('ssh')
272 res = self.vm.command('human-monitor-command',
273 command_line='info usernet')
274 port = get_info_usernet_hostfwd_port(res)
275 self.assertIsNotNone(port)
276 self.assertGreater(port, 0)
277 self.log.debug('sshd listening on port: %d', port)
278 if credential_is_key:
279 self.ssh_session = ssh.Session('127.0.0.1', port=port,
280 user=username, key=credential)
281 else:
282 self.ssh_session = ssh.Session('127.0.0.1', port=port,
283 user=username, password=credential)
284 for i in range(10):
285 try:
286 self.ssh_session.connect()
287 return
288 except:
289 time.sleep(4)
290 pass
291 self.fail('ssh connection timeout')
293 def ssh_command(self, command):
294 self.ssh_logger.info(command)
295 result = self.ssh_session.cmd(command)
296 stdout_lines = [line.rstrip() for line
297 in result.stdout_text.splitlines()]
298 for line in stdout_lines:
299 self.ssh_logger.info(line)
300 stderr_lines = [line.rstrip() for line
301 in result.stderr_text.splitlines()]
302 for line in stderr_lines:
303 self.ssh_logger.warning(line)
305 self.assertEqual(result.exit_status, 0,
306 f'Guest command failed: {command}')
307 return stdout_lines, stderr_lines
310 #: A collection of known distros and their respective image checksum
311 KNOWN_DISTROS = {
312 'fedora': {
313 '31': {
314 'x86_64':
315 {'checksum': ('e3c1b309d9203604922d6e255c2c5d09'
316 '8a309c2d46215d8fc026954f3c5c27a0')},
317 'aarch64':
318 {'checksum': ('1e18d9c0cf734940c4b5d5ec592facae'
319 'd2af0ad0329383d5639c997fdf16fe49')},
320 'ppc64':
321 {'checksum': ('7c3528b85a3df4b2306e892199a9e1e4'
322 '3f991c506f2cc390dc4efa2026ad2f58')},
323 's390x':
324 {'checksum': ('4caaab5a434fd4d1079149a072fdc789'
325 '1e354f834d355069ca982fdcaf5a122d')},
331 def get_known_distro_checksum(distro, distro_version, arch):
332 try:
333 return KNOWN_DISTROS.get(distro).get(distro_version).\
334 get(arch).get('checksum')
335 except AttributeError:
336 return None
339 class LinuxTest(Test, LinuxSSHMixIn):
340 """Facilitates having a cloud-image Linux based available.
342 For tests that indend to interact with guests, this is a better choice
343 to start with than the more vanilla `Test` class.
346 timeout = 900
347 distro_checksum = None
348 username = 'root'
349 password = 'password'
351 def _set_distro(self):
352 distro = self.params.get(
353 'distro',
354 default=self._get_unique_tag_val('distro'))
355 if not distro:
356 distro = 'fedora'
357 self.distro = distro
359 distro_version = self.params.get(
360 'distro_version',
361 default=self._get_unique_tag_val('distro_version'))
362 if not distro_version:
363 distro_version = '31'
364 self.distro_version = distro_version
366 # The distro checksum behaves differently than distro name and
367 # version. First, it does not respect a tag with the same
368 # name, given that it's not expected to be used for filtering
369 # (distro name versions are the natural choice). Second, the
370 # order of precedence is: parameter, attribute and then value
371 # from KNOWN_DISTROS.
372 distro_checksum = self.params.get('distro_checksum',
373 default=self.distro_checksum)
374 if not distro_checksum:
375 distro_checksum = get_known_distro_checksum(self.distro,
376 self.distro_version,
377 self.arch)
378 if distro_checksum:
379 self.distro_checksum = distro_checksum
381 def setUp(self, ssh_pubkey=None, network_device_type='virtio-net'):
382 super(LinuxTest, self).setUp()
383 self._set_distro()
384 self.vm.add_args('-smp', '2')
385 self.vm.add_args('-m', '1024')
386 # The following network device allows for SSH connections
387 self.vm.add_args('-netdev', 'user,id=vnet,hostfwd=:127.0.0.1:0-:22',
388 '-device', '%s,netdev=vnet' % network_device_type)
389 self.set_up_boot()
390 if ssh_pubkey is None:
391 ssh_pubkey, self.ssh_key = self.set_up_existing_ssh_keys()
392 self.set_up_cloudinit(ssh_pubkey)
394 def set_up_existing_ssh_keys(self):
395 ssh_public_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa.pub')
396 source_private_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa')
397 ssh_dir = os.path.join(self.workdir, '.ssh')
398 os.mkdir(ssh_dir, mode=0o700)
399 ssh_private_key = os.path.join(ssh_dir,
400 os.path.basename(source_private_key))
401 shutil.copyfile(source_private_key, ssh_private_key)
402 os.chmod(ssh_private_key, 0o600)
403 return (ssh_public_key, ssh_private_key)
405 def download_boot(self):
406 self.log.debug('Looking for and selecting a qemu-img binary to be '
407 'used to create the bootable snapshot image')
408 # If qemu-img has been built, use it, otherwise the system wide one
409 # will be used. If none is available, the test will cancel.
410 qemu_img = os.path.join(BUILD_DIR, 'qemu-img')
411 if not os.path.exists(qemu_img):
412 qemu_img = find_command('qemu-img', False)
413 if qemu_img is False:
414 self.cancel('Could not find "qemu-img", which is required to '
415 'create the bootable image')
416 vmimage.QEMU_IMG = qemu_img
418 self.log.info('Downloading/preparing boot image')
419 # Fedora 31 only provides ppc64le images
420 image_arch = self.arch
421 if self.distro == 'fedora':
422 if image_arch == 'ppc64':
423 image_arch = 'ppc64le'
425 try:
426 boot = vmimage.get(
427 self.distro, arch=image_arch, version=self.distro_version,
428 checksum=self.distro_checksum,
429 algorithm='sha256',
430 cache_dir=self.cache_dirs[0],
431 snapshot_dir=self.workdir)
432 except:
433 self.cancel('Failed to download/prepare boot image')
434 return boot.path
436 def prepare_cloudinit(self, ssh_pubkey=None):
437 self.log.info('Preparing cloudinit image')
438 try:
439 cloudinit_iso = os.path.join(self.workdir, 'cloudinit.iso')
440 self.phone_home_port = network.find_free_port()
441 pubkey_content = None
442 if ssh_pubkey:
443 with open(ssh_pubkey) as pubkey:
444 pubkey_content = pubkey.read()
445 cloudinit.iso(cloudinit_iso, self.name,
446 username=self.username,
447 password=self.password,
448 # QEMU's hard coded usermode router address
449 phone_home_host='10.0.2.2',
450 phone_home_port=self.phone_home_port,
451 authorized_key=pubkey_content)
452 except Exception:
453 self.cancel('Failed to prepare the cloudinit image')
454 return cloudinit_iso
456 def set_up_boot(self):
457 path = self.download_boot()
458 self.vm.add_args('-drive', 'file=%s' % path)
460 def set_up_cloudinit(self, ssh_pubkey=None):
461 cloudinit_iso = self.prepare_cloudinit(ssh_pubkey)
462 self.vm.add_args('-drive', 'file=%s,format=raw' % cloudinit_iso)
464 def launch_and_wait(self, set_up_ssh_connection=True):
465 self.vm.set_console()
466 self.vm.launch()
467 console_drainer = datadrainer.LineLogger(self.vm.console_socket.fileno(),
468 logger=self.log.getChild('console'))
469 console_drainer.start()
470 self.log.info('VM launched, waiting for boot confirmation from guest')
471 cloudinit.wait_for_phone_home(('0.0.0.0', self.phone_home_port), self.name)
472 if set_up_ssh_connection:
473 self.log.info('Setting up the SSH connection')
474 self.ssh_connect(self.username, self.ssh_key)