python/qemu/qmp/legacy: Replace 'returns-whitelist' with the correct type
[qemu.git] / python / qemu / qmp / legacy.py
blob1951754455a92fde69f97d8420fc7f2f43fa2655
1 """
2 (Legacy) Sync QMP Wrapper
4 This module provides the `QEMUMonitorProtocol` class, which is a
5 synchronous wrapper around `QMPClient`.
7 Its design closely resembles that of the original QEMUMonitorProtocol
8 class, originally written by Luiz Capitulino. It is provided here for
9 compatibility with scripts inside the QEMU source tree that expect the
10 old interface.
11 """
14 # Copyright (C) 2009-2022 Red Hat Inc.
16 # Authors:
17 # Luiz Capitulino <lcapitulino@redhat.com>
18 # John Snow <jsnow@redhat.com>
20 # This work is licensed under the terms of the GNU GPL, version 2. See
21 # the COPYING file in the top-level directory.
24 import asyncio
25 from types import TracebackType
26 from typing import (
27 Any,
28 Awaitable,
29 Dict,
30 List,
31 Optional,
32 Type,
33 TypeVar,
34 Union,
37 from .error import QMPError
38 from .protocol import Runstate, SocketAddrT
39 from .qmp_client import QMPClient
42 #: QMPMessage is an entire QMP message of any kind.
43 QMPMessage = Dict[str, Any]
45 #: QMPReturnValue is the 'return' value of a command.
46 QMPReturnValue = object
48 #: QMPObject is any object in a QMP message.
49 QMPObject = Dict[str, object]
51 # QMPMessage can be outgoing commands or incoming events/returns.
52 # QMPReturnValue is usually a dict/json object, but due to QAPI's
53 # 'command-returns-exceptions', it can actually be anything.
55 # {'return': {}} is a QMPMessage,
56 # {} is the QMPReturnValue.
59 class QMPBadPortError(QMPError):
60 """
61 Unable to parse socket address: Port was non-numerical.
62 """
65 class QEMUMonitorProtocol:
66 """
67 Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP)
68 and then allow to handle commands and events.
70 :param address: QEMU address, can be either a unix socket path (string)
71 or a tuple in the form ( address, port ) for a TCP
72 connection
73 :param server: Act as the socket server. (See 'accept')
74 :param nickname: Optional nickname used for logging.
75 """
77 def __init__(self, address: SocketAddrT,
78 server: bool = False,
79 nickname: Optional[str] = None):
81 self._qmp = QMPClient(nickname)
82 self._aloop = asyncio.get_event_loop()
83 self._address = address
84 self._timeout: Optional[float] = None
86 if server:
87 self._sync(self._qmp.start_server(self._address))
89 _T = TypeVar('_T')
91 def _sync(
92 self, future: Awaitable[_T], timeout: Optional[float] = None
93 ) -> _T:
94 return self._aloop.run_until_complete(
95 asyncio.wait_for(future, timeout=timeout)
98 def _get_greeting(self) -> Optional[QMPMessage]:
99 if self._qmp.greeting is not None:
100 # pylint: disable=protected-access
101 return self._qmp.greeting._asdict()
102 return None
104 def __enter__(self: _T) -> _T:
105 # Implement context manager enter function.
106 return self
108 def __exit__(self,
109 exc_type: Optional[Type[BaseException]],
110 exc_val: Optional[BaseException],
111 exc_tb: Optional[TracebackType]) -> None:
112 # Implement context manager exit function.
113 self.close()
115 @classmethod
116 def parse_address(cls, address: str) -> SocketAddrT:
118 Parse a string into a QMP address.
120 Figure out if the argument is in the port:host form.
121 If it's not, it's probably a file path.
123 components = address.split(':')
124 if len(components) == 2:
125 try:
126 port = int(components[1])
127 except ValueError:
128 msg = f"Bad port: '{components[1]}' in '{address}'."
129 raise QMPBadPortError(msg) from None
130 return (components[0], port)
132 # Treat as filepath.
133 return address
135 def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
137 Connect to the QMP Monitor and perform capabilities negotiation.
139 :return: QMP greeting dict, or None if negotiate is false
140 :raise ConnectError: on connection errors
142 self._qmp.await_greeting = negotiate
143 self._qmp.negotiate = negotiate
145 self._sync(
146 self._qmp.connect(self._address)
148 return self._get_greeting()
150 def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
152 Await connection from QMP Monitor and perform capabilities negotiation.
154 :param timeout:
155 timeout in seconds (nonnegative float number, or None).
156 If None, there is no timeout, and this may block forever.
158 :return: QMP greeting dict
159 :raise ConnectError: on connection errors
161 self._qmp.await_greeting = True
162 self._qmp.negotiate = True
164 self._sync(self._qmp.accept(), timeout)
166 ret = self._get_greeting()
167 assert ret is not None
168 return ret
170 def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
172 Send a QMP command to the QMP Monitor.
174 :param qmp_cmd: QMP command to be sent as a Python dict
175 :return: QMP response as a Python dict
177 return dict(
178 self._sync(
179 # pylint: disable=protected-access
181 # _raw() isn't a public API, because turning off
182 # automatic ID assignment is discouraged. For
183 # compatibility with iotests *only*, do it anyway.
184 self._qmp._raw(qmp_cmd, assign_id=False),
185 self._timeout
189 def cmd(self, name: str,
190 args: Optional[Dict[str, object]] = None,
191 cmd_id: Optional[object] = None) -> QMPMessage:
193 Build a QMP command and send it to the QMP Monitor.
195 :param name: command name (string)
196 :param args: command arguments (dict)
197 :param cmd_id: command id (dict, list, string or int)
199 qmp_cmd: QMPMessage = {'execute': name}
200 if args:
201 qmp_cmd['arguments'] = args
202 if cmd_id:
203 qmp_cmd['id'] = cmd_id
204 return self.cmd_obj(qmp_cmd)
206 def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
208 Build and send a QMP command to the monitor, report errors if any
210 return self._sync(
211 self._qmp.execute(cmd, kwds),
212 self._timeout
215 def pull_event(self,
216 wait: Union[bool, float] = False) -> Optional[QMPMessage]:
218 Pulls a single event.
220 :param wait:
221 If False or 0, do not wait. Return None if no events ready.
222 If True, wait forever until the next event.
223 Otherwise, wait for the specified number of seconds.
225 :raise asyncio.TimeoutError:
226 When a timeout is requested and the timeout period elapses.
228 :return: The first available QMP event, or None.
230 if not wait:
231 # wait is False/0: "do not wait, do not except."
232 if self._qmp.events.empty():
233 return None
235 # If wait is 'True', wait forever. If wait is False/0, the events
236 # queue must not be empty; but it still needs some real amount
237 # of time to complete.
238 timeout = None
239 if wait and isinstance(wait, float):
240 timeout = wait
242 return dict(
243 self._sync(
244 self._qmp.events.get(),
245 timeout
249 def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
251 Get a list of QMP events and clear all pending events.
253 :param wait:
254 If False or 0, do not wait. Return None if no events ready.
255 If True, wait until we have at least one event.
256 Otherwise, wait for up to the specified number of seconds for at
257 least one event.
259 :raise asyncio.TimeoutError:
260 When a timeout is requested and the timeout period elapses.
262 :return: A list of QMP events.
264 events = [dict(x) for x in self._qmp.events.clear()]
265 if events:
266 return events
268 event = self.pull_event(wait)
269 return [event] if event is not None else []
271 def clear_events(self) -> None:
272 """Clear current list of pending events."""
273 self._qmp.events.clear()
275 def close(self) -> None:
276 """Close the connection."""
277 self._sync(
278 self._qmp.disconnect()
281 def settimeout(self, timeout: Optional[float]) -> None:
283 Set the timeout for QMP RPC execution.
285 This timeout affects the `cmd`, `cmd_obj`, and `command` methods.
286 The `accept`, `pull_event` and `get_event` methods have their
287 own configurable timeouts.
289 :param timeout:
290 timeout in seconds, or None.
291 None will wait indefinitely.
293 self._timeout = timeout
295 def send_fd_scm(self, fd: int) -> None:
297 Send a file descriptor to the remote via SCM_RIGHTS.
299 self._qmp.send_fd_scm(fd)
301 def __del__(self) -> None:
302 if self._qmp.runstate == Runstate.IDLE:
303 return
305 if not self._aloop.is_running():
306 self.close()
307 else:
308 # Garbage collection ran while the event loop was running.
309 # Nothing we can do about it now, but if we don't raise our
310 # own error, the user will be treated to a lot of traceback
311 # they might not understand.
312 raise QMPError(
313 "QEMUMonitorProtocol.close()"
314 " was not called before object was garbage collected"