util: Implement debug-threads for macOS
[qemu/ar7.git] / tests / qemu-iotests / 147
blob05b374b7d3f366216b3ab1bc1b9919fd9c8d6b96
1 #!/usr/bin/env python
3 # Test case for NBD's blockdev-add interface
5 # Copyright (C) 2016 Red Hat, Inc.
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 import os
22 import socket
23 import stat
24 import time
25 import iotests
26 from iotests import cachemode, imgfmt, qemu_img, qemu_nbd
28 NBD_PORT = 10811
30 test_img = os.path.join(iotests.test_dir, 'test.img')
31 unix_socket = os.path.join(iotests.test_dir, 'nbd.socket')
34 def flatten_sock_addr(crumpled_address):
35 result = { 'type': crumpled_address['type'] }
36 result.update(crumpled_address['data'])
37 return result
40 class NBDBlockdevAddBase(iotests.QMPTestCase):
41 def blockdev_add_options(self, address, export, node_name):
42 options = { 'node-name': node_name,
43 'driver': 'raw',
44 'file': {
45 'driver': 'nbd',
46 'read-only': True,
47 'server': address
48 } }
49 if export is not None:
50 options['file']['export'] = export
51 return options
53 def client_test(self, filename, address, export=None,
54 node_name='nbd-blockdev', delete=True):
55 bao = self.blockdev_add_options(address, export, node_name)
56 result = self.vm.qmp('blockdev-add', **bao)
57 self.assert_qmp(result, 'return', {})
59 found = False
60 result = self.vm.qmp('query-named-block-nodes')
61 for node in result['return']:
62 if node['node-name'] == node_name:
63 found = True
64 if isinstance(filename, str):
65 self.assert_qmp(node, 'image/filename', filename)
66 else:
67 self.assert_json_filename_equal(node['image']['filename'],
68 filename)
69 break
70 self.assertTrue(found)
72 if delete:
73 result = self.vm.qmp('blockdev-del', node_name=node_name)
74 self.assert_qmp(result, 'return', {})
77 class QemuNBD(NBDBlockdevAddBase):
78 def setUp(self):
79 qemu_img('create', '-f', iotests.imgfmt, test_img, '64k')
80 self.vm = iotests.VM()
81 self.vm.launch()
83 def tearDown(self):
84 self.vm.shutdown()
85 os.remove(test_img)
86 try:
87 os.remove(unix_socket)
88 except OSError:
89 pass
91 def _server_up(self, *args):
92 self.assertEqual(qemu_nbd('-f', imgfmt, test_img, *args), 0)
94 def test_inet(self):
95 self._server_up('-p', str(NBD_PORT))
96 address = { 'type': 'inet',
97 'data': {
98 'host': 'localhost',
99 'port': str(NBD_PORT)
101 self.client_test('nbd://localhost:%i' % NBD_PORT,
102 flatten_sock_addr(address))
104 def test_unix(self):
105 self._server_up('-k', unix_socket)
106 address = { 'type': 'unix',
107 'data': { 'path': unix_socket } }
108 self.client_test('nbd+unix://?socket=' + unix_socket,
109 flatten_sock_addr(address))
112 class BuiltinNBD(NBDBlockdevAddBase):
113 def setUp(self):
114 qemu_img('create', '-f', iotests.imgfmt, test_img, '64k')
115 self.vm = iotests.VM()
116 self.vm.launch()
117 self.server = iotests.VM('.server')
118 self.server.add_drive_raw('if=none,id=nbd-export,' +
119 'file=%s,' % test_img +
120 'format=%s,' % imgfmt +
121 'cache=%s' % cachemode)
122 self.server.launch()
124 def tearDown(self):
125 self.vm.shutdown()
126 self.server.shutdown()
127 os.remove(test_img)
128 try:
129 os.remove(unix_socket)
130 except OSError:
131 pass
133 def _server_up(self, address, export_name=None, export_name2=None):
134 result = self.server.qmp('nbd-server-start', addr=address)
135 self.assert_qmp(result, 'return', {})
137 if export_name is None:
138 result = self.server.qmp('nbd-server-add', device='nbd-export')
139 else:
140 result = self.server.qmp('nbd-server-add', device='nbd-export',
141 name=export_name)
142 self.assert_qmp(result, 'return', {})
144 if export_name2 is not None:
145 result = self.server.qmp('nbd-server-add', device='nbd-export',
146 name=export_name2)
147 self.assert_qmp(result, 'return', {})
150 def _server_down(self):
151 result = self.server.qmp('nbd-server-stop')
152 self.assert_qmp(result, 'return', {})
154 def do_test_inet(self, export_name=None):
155 address = { 'type': 'inet',
156 'data': {
157 'host': 'localhost',
158 'port': str(NBD_PORT)
160 self._server_up(address, export_name)
161 export_name = export_name or 'nbd-export'
162 self.client_test('nbd://localhost:%i/%s' % (NBD_PORT, export_name),
163 flatten_sock_addr(address), export_name)
164 self._server_down()
166 def test_inet_default_export_name(self):
167 self.do_test_inet()
169 def test_inet_same_export_name(self):
170 self.do_test_inet('nbd-export')
172 def test_inet_different_export_name(self):
173 self.do_test_inet('shadow')
175 def test_inet_two_exports(self):
176 address = { 'type': 'inet',
177 'data': {
178 'host': 'localhost',
179 'port': str(NBD_PORT)
181 self._server_up(address, 'exp1', 'exp2')
182 self.client_test('nbd://localhost:%i/%s' % (NBD_PORT, 'exp1'),
183 flatten_sock_addr(address), 'exp1', 'node1', False)
184 self.client_test('nbd://localhost:%i/%s' % (NBD_PORT, 'exp2'),
185 flatten_sock_addr(address), 'exp2', 'node2', False)
186 result = self.vm.qmp('blockdev-del', node_name='node1')
187 self.assert_qmp(result, 'return', {})
188 result = self.vm.qmp('blockdev-del', node_name='node2')
189 self.assert_qmp(result, 'return', {})
190 self._server_down()
192 def test_inet6(self):
193 try:
194 socket.getaddrinfo("::0", "0", socket.AF_INET6,
195 socket.SOCK_STREAM, socket.IPPROTO_TCP,
196 socket.AI_ADDRCONFIG | socket.AI_CANONNAME)
197 except socket.gaierror:
198 # IPv6 not available, skip
199 return
200 address = { 'type': 'inet',
201 'data': {
202 'host': '::1',
203 'port': str(NBD_PORT),
204 'ipv4': False,
205 'ipv6': True
207 filename = { 'driver': 'raw',
208 'file': {
209 'driver': 'nbd',
210 'export': 'nbd-export',
211 'server': flatten_sock_addr(address)
213 self._server_up(address)
214 self.client_test(filename, flatten_sock_addr(address), 'nbd-export')
215 self._server_down()
217 def test_unix(self):
218 address = { 'type': 'unix',
219 'data': { 'path': unix_socket } }
220 self._server_up(address)
221 self.client_test('nbd+unix:///nbd-export?socket=' + unix_socket,
222 flatten_sock_addr(address), 'nbd-export')
223 self._server_down()
225 def test_fd(self):
226 self._server_up({ 'type': 'unix',
227 'data': { 'path': unix_socket } })
229 sockfd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
230 sockfd.connect(unix_socket)
232 result = self.vm.send_fd_scm(fd=sockfd.fileno())
233 self.assertEqual(result, 0, 'Failed to send socket FD')
235 result = self.vm.qmp('getfd', fdname='nbd-fifo')
236 self.assert_qmp(result, 'return', {})
238 address = { 'type': 'fd',
239 'data': { 'str': 'nbd-fifo' } }
240 filename = { 'driver': 'raw',
241 'file': {
242 'driver': 'nbd',
243 'export': 'nbd-export',
244 'server': flatten_sock_addr(address)
246 self.client_test(filename, flatten_sock_addr(address), 'nbd-export')
248 self._server_down()
251 if __name__ == '__main__':
252 # Need to support image creation
253 iotests.main(supported_fmts=['vpc', 'parallels', 'qcow', 'vdi', 'qcow2',
254 'vmdk', 'raw', 'vhdx', 'qed'])