3 # Tests for fdsets and getfd.
5 # Copyright (C) 2012 IBM Corp.
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 from iotests
import qemu_img
25 image0
= os
.path
.join(iotests
.test_dir
, 'image0')
26 image1
= os
.path
.join(iotests
.test_dir
, 'image1')
27 image2
= os
.path
.join(iotests
.test_dir
, 'image2')
28 image3
= os
.path
.join(iotests
.test_dir
, 'image3')
29 image4
= os
.path
.join(iotests
.test_dir
, 'image4')
31 class TestFdSets(iotests
.QMPTestCase
):
34 self
.vm
= iotests
.VM()
35 qemu_img('create', '-f', iotests
.imgfmt
, image0
, '128K')
36 qemu_img('create', '-f', iotests
.imgfmt
, image1
, '128K')
37 qemu_img('create', '-f', iotests
.imgfmt
, image2
, '128K')
38 qemu_img('create', '-f', iotests
.imgfmt
, image3
, '128K')
39 qemu_img('create', '-f', iotests
.imgfmt
, image4
, '128K')
40 self
.file0
= open(image0
, 'r')
41 self
.file1
= open(image1
, 'w+')
42 self
.file2
= open(image2
, 'r')
43 self
.file3
= open(image3
, 'r')
44 self
.file4
= open(image4
, 'r')
45 self
.vm
.add_fd(self
.file0
.fileno(), 1, 'image0:r')
46 self
.vm
.add_fd(self
.file1
.fileno(), 1, 'image1:w+')
47 self
.vm
.add_fd(self
.file2
.fileno(), 0, 'image2:r')
48 self
.vm
.add_fd(self
.file3
.fileno(), 2, 'image3:r')
49 self
.vm
.add_fd(self
.file4
.fileno(), 2, 'image4:r')
50 self
.vm
.add_drive("/dev/fdset/1")
66 def test_query_fdset(self
):
67 result
= self
.vm
.qmp('query-fdsets')
68 self
.assert_qmp(result
, 'return[0]/fdset-id', 2)
69 self
.assert_qmp(result
, 'return[1]/fdset-id', 1)
70 self
.assert_qmp(result
, 'return[2]/fdset-id', 0)
71 self
.assert_qmp(result
, 'return[0]/fds[0]/opaque', 'image3:r')
72 self
.assert_qmp(result
, 'return[0]/fds[1]/opaque', 'image4:r')
73 self
.assert_qmp(result
, 'return[1]/fds[0]/opaque', 'image0:r')
74 self
.assert_qmp(result
, 'return[1]/fds[1]/opaque', 'image1:w+')
75 self
.assert_qmp(result
, 'return[2]/fds[0]/opaque', 'image2:r')
78 def test_remove_fdset(self
):
79 result
= self
.vm
.qmp('remove-fd', fdset_id
=2)
80 self
.assert_qmp(result
, 'return', {})
81 result
= self
.vm
.qmp('query-fdsets')
82 self
.assert_qmp(result
, 'return[0]/fdset-id', 1)
83 self
.assert_qmp(result
, 'return[1]/fdset-id', 0)
84 self
.assert_qmp(result
, 'return[0]/fds[0]/opaque', 'image0:r')
85 self
.assert_qmp(result
, 'return[0]/fds[1]/opaque', 'image1:w+')
86 self
.assert_qmp(result
, 'return[1]/fds[0]/opaque', 'image2:r')
89 def test_remove_fd(self
):
90 result
= self
.vm
.qmp('query-fdsets')
91 fd_image3
= result
['return'][0]['fds'][0]['fd']
92 result
= self
.vm
.qmp('remove-fd', fdset_id
=2, fd
=fd_image3
)
93 self
.assert_qmp(result
, 'return', {})
94 result
= self
.vm
.qmp('query-fdsets')
95 self
.assert_qmp(result
, 'return[0]/fdset-id', 2)
96 self
.assert_qmp(result
, 'return[1]/fdset-id', 1)
97 self
.assert_qmp(result
, 'return[2]/fdset-id', 0)
98 self
.assert_qmp(result
, 'return[0]/fds[0]/opaque', 'image4:r')
99 self
.assert_qmp(result
, 'return[1]/fds[0]/opaque', 'image0:r')
100 self
.assert_qmp(result
, 'return[1]/fds[1]/opaque', 'image1:w+')
101 self
.assert_qmp(result
, 'return[2]/fds[0]/opaque', 'image2:r')
104 def test_remove_fd_invalid_fdset(self
):
105 result
= self
.vm
.qmp('query-fdsets')
106 fd_image3
= result
['return'][0]['fds'][0]['fd']
107 result
= self
.vm
.qmp('remove-fd', fdset_id
=3, fd
=fd_image3
)
108 self
.assert_qmp(result
, 'error/class', 'GenericError')
109 self
.assert_qmp(result
, 'error/desc',
110 'File descriptor named \'fdset-id:3, fd:%d\' not found' % fd_image3
)
113 def test_remove_fd_invalid_fd(self
):
114 result
= self
.vm
.qmp('query-fdsets')
115 result
= self
.vm
.qmp('remove-fd', fdset_id
=2, fd
=999)
116 self
.assert_qmp(result
, 'error/class', 'GenericError')
117 self
.assert_qmp(result
, 'error/desc',
118 'File descriptor named \'fdset-id:2, fd:999\' not found')
121 def test_add_fd_invalid_fd(self
):
122 result
= self
.vm
.qmp('add-fd', fdset_id
=2)
123 self
.assert_qmp(result
, 'error/class', 'GenericError')
124 self
.assert_qmp(result
, 'error/desc',
125 'No file descriptor supplied via SCM_RIGHTS')
128 # Add fd at runtime, there are two ways: monitor related or fdset related
129 class TestSCMFd(iotests
.QMPTestCase
):
131 self
.vm
= iotests
.VM()
132 qemu_img('create', '-f', iotests
.imgfmt
, image0
, '128K')
133 # Add an unused monitor, to verify it works fine when two monitor
135 self
.vm
.add_monitor_null()
142 def _send_fd_by_SCM(self
):
143 ret
= self
.vm
.send_fd_scm(file_path
=image0
)
144 self
.assertEqual(ret
, 0, 'Failed to send fd with UNIX SCM')
146 def test_add_fd(self
):
147 self
._send
_fd
_by
_SCM
()
148 result
= self
.vm
.qmp('add-fd', fdset_id
=2, opaque
='image0:r')
149 self
.assert_qmp(result
, 'return/fdset-id', 2)
151 def test_getfd(self
):
152 self
._send
_fd
_by
_SCM
()
153 result
= self
.vm
.qmp('getfd', fdname
='image0:r')
154 self
.assert_qmp(result
, 'return', {})
156 def test_getfd_invalid_fdname(self
):
157 self
._send
_fd
_by
_SCM
()
158 result
= self
.vm
.qmp('getfd', fdname
='0image0:r')
159 self
.assert_qmp(result
, 'error/class', 'GenericError')
160 self
.assert_qmp(result
, 'error/desc',
161 "Parameter 'fdname' expects a name not starting with a digit")
163 def test_closefd(self
):
164 self
._send
_fd
_by
_SCM
()
165 result
= self
.vm
.qmp('getfd', fdname
='image0:r')
166 self
.assert_qmp(result
, 'return', {})
167 result
= self
.vm
.qmp('closefd', fdname
='image0:r')
168 self
.assert_qmp(result
, 'return', {})
170 def test_closefd_fd_not_found(self
):
172 result
= self
.vm
.qmp('closefd', fdname
=fdname
)
173 self
.assert_qmp(result
, 'error/class', 'GenericError')
174 self
.assert_qmp(result
, 'error/desc',
175 "File descriptor named '%s' not found" % fdname
)
177 if __name__
== '__main__':
178 iotests
.main(supported_fmts
=['raw'],
179 supported_protocols
=['file'])