tests/acceptance/migration: Test EXEC transport when migrating
[qemu/ar7.git] / tests / acceptance / migration.py
blob41b13b9e0d6045d5f0ad907532c8dc003e0b14d0
1 # Migration test
3 # Copyright (c) 2019 Red Hat, Inc.
5 # Authors:
6 # Cleber Rosa <crosa@redhat.com>
7 # Caio Carrara <ccarrara@redhat.com>
9 # This work is licensed under the terms of the GNU GPL, version 2 or
10 # later. See the COPYING file in the top-level directory.
13 import tempfile
14 from avocado_qemu import Test
15 from avocado import skipUnless
17 from avocado.utils import network
18 from avocado.utils import wait
19 from avocado.utils.path import find_command
22 class Migration(Test):
24 timeout = 10
26 @staticmethod
27 def migration_finished(vm):
28 return vm.command('query-migrate')['status'] in ('completed', 'failed')
30 def assert_migration(self, src_vm, dst_vm):
31 wait.wait_for(self.migration_finished,
32 timeout=self.timeout,
33 step=0.1,
34 args=(src_vm,))
35 self.assertEqual(src_vm.command('query-migrate')['status'], 'completed')
36 self.assertEqual(dst_vm.command('query-migrate')['status'], 'completed')
37 self.assertEqual(dst_vm.command('query-status')['status'], 'running')
38 self.assertEqual(src_vm.command('query-status')['status'],'postmigrate')
40 def do_migrate(self, dest_uri, src_uri=None):
41 source_vm = self.get_vm()
42 dest_vm = self.get_vm('-incoming', dest_uri)
43 dest_vm.launch()
44 if src_uri is None:
45 src_uri = dest_uri
46 source_vm.launch()
47 source_vm.qmp('migrate', uri=src_uri)
48 self.assert_migration(source_vm, dest_vm)
50 def _get_free_port(self):
51 port = network.find_free_port()
52 if port is None:
53 self.cancel('Failed to find a free port')
54 return port
57 def test_migration_with_tcp_localhost(self):
58 dest_uri = 'tcp:localhost:%u' % self._get_free_port()
59 self.do_migrate(dest_uri)
61 def test_migration_with_unix(self):
62 with tempfile.TemporaryDirectory(prefix='socket_') as socket_path:
63 dest_uri = 'unix:%s/qemu-test.sock' % socket_path
64 self.do_migrate(dest_uri)
66 @skipUnless(find_command('nc', default=False), "'nc' command not found")
67 def test_migration_with_exec(self):
68 """
69 The test works for both netcat-traditional and netcat-openbsd packages
70 """
71 free_port = self._get_free_port()
72 dest_uri = 'exec:nc -l localhost %u' % free_port