scripts/qmp-shell: remove double-underscores
[qemu/ar7.git] / scripts / qmp / qmp-shell
blob40ff9e0a82bdd9ce961d7a5a9e6d96b7fc28b415
1 #!/usr/bin/env python3
3 # Copyright (C) 2009, 2010 Red Hat Inc.
5 # Authors:
6 #  Luiz Capitulino <lcapitulino@redhat.com>
8 # This work is licensed under the terms of the GNU GPL, version 2.  See
9 # the COPYING file in the top-level directory.
12 """
13 Low-level QEMU shell on top of QMP.
15 usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
17 positional arguments:
18   qmp_server            < UNIX socket path | TCP address:port >
20 optional arguments:
21   -h, --help            show this help message and exit
22   -H, --hmp             Use HMP interface
23   -N, --skip-negotiation
24                         Skip negotiate (for qemu-ga)
25   -v, --verbose         Verbose (echo commands sent and received)
26   -p, --pretty          Pretty-print JSON
29 Start QEMU with:
31 # qemu [...] -qmp unix:./qmp-sock,server
33 Run the shell:
35 $ qmp-shell ./qmp-sock
37 Commands have the following format:
39    < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
41 For example:
43 (QEMU) device_add driver=e1000 id=net1
44 {'return': {}}
45 (QEMU)
47 key=value pairs also support Python or JSON object literal subset notations,
48 without spaces. Dictionaries/objects {} are supported as are arrays [].
50    example-command arg-name1={'key':'value','obj'={'prop':"value"}}
52 Both JSON and Python formatting should work, including both styles of
53 string literal quotes. Both paradigms of literal values should work,
54 including null/true/false for JSON and None/True/False for Python.
57 Transactions have the following multi-line format:
59    transaction(
60    action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
61    ...
62    action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
63    )
65 One line transactions are also supported:
67    transaction( action-name1 ... )
69 For example:
71     (QEMU) transaction(
72     TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
73     TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
74     TRANS> )
75     {"return": {}}
76     (QEMU)
78 Use the -v and -p options to activate the verbose and pretty-print options,
79 which will echo back the properly formatted JSON-compliant QMP that is being
80 sent to QEMU, which is useful for debugging and documentation generation.
81 """
83 import argparse
84 import ast
85 import json
86 import logging
87 import os
88 import re
89 import readline
90 import sys
91 from typing import (
92     Iterator,
93     List,
94     NoReturn,
95     Optional,
96     Sequence,
100 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
101 from qemu import qmp
102 from qemu.qmp import QMPMessage
105 LOG = logging.getLogger(__name__)
108 class QMPCompleter:
109     # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
110     # but pylint as of today does not know that List[str] is simply 'list'.
111     def __init__(self) -> None:
112         self._matches: List[str] = []
114     def append(self, value: str) -> None:
115         return self._matches.append(value)
117     def complete(self, text: str, state: int) -> Optional[str]:
118         for cmd in self._matches:
119             if cmd.startswith(text):
120                 if state == 0:
121                     return cmd
122                 state -= 1
123         return None
126 class QMPShellError(Exception):
127     pass
130 class FuzzyJSON(ast.NodeTransformer):
131     """
132     This extension of ast.NodeTransformer filters literal "true/false/null"
133     values in a Python AST and replaces them by proper "True/False/None" values
134     that Python can properly evaluate.
135     """
137     @classmethod
138     def visit_Name(cls,  # pylint: disable=invalid-name
139                    node: ast.Name) -> ast.AST:
140         if node.id == 'true':
141             return ast.Constant(value=True)
142         if node.id == 'false':
143             return ast.Constant(value=False)
144         if node.id == 'null':
145             return ast.Constant(value=None)
146         return node
149 class QMPShell(qmp.QEMUMonitorProtocol):
150     def __init__(self, address: qmp.SocketAddrT,
151                  pretty: bool = False, verbose: bool = False):
152         super().__init__(address)
153         self._greeting: Optional[QMPMessage] = None
154         self._completer = QMPCompleter()
155         self._transmode = False
156         self._actions: List[QMPMessage] = []
157         self._histfile = os.path.join(os.path.expanduser('~'),
158                                       '.qmp-shell_history')
159         self.pretty = pretty
160         self.verbose = verbose
162     def close(self) -> None:
163         # Hook into context manager of parent to save shell history.
164         self._save_history()
165         super().close()
167     def _fill_completion(self) -> None:
168         cmds = self.cmd('query-commands')
169         if 'error' in cmds:
170             return
171         for cmd in cmds['return']:
172             self._completer.append(cmd['name'])
174     def _completer_setup(self) -> None:
175         self._completer = QMPCompleter()
176         self._fill_completion()
177         readline.set_history_length(1024)
178         readline.set_completer(self._completer.complete)
179         readline.parse_and_bind("tab: complete")
180         # NB: default delimiters conflict with some command names
181         # (eg. query-), clearing everything as it doesn't seem to matter
182         readline.set_completer_delims('')
183         try:
184             readline.read_history_file(self._histfile)
185         except FileNotFoundError:
186             pass
187         except IOError as err:
188             msg = f"Failed to read history '{self._histfile}': {err!s}"
189             LOG.warning(msg)
191     def _save_history(self) -> None:
192         try:
193             readline.write_history_file(self._histfile)
194         except IOError as err:
195             msg = f"Failed to save history file '{self._histfile}': {err!s}"
196             LOG.warning(msg)
198     @classmethod
199     def _parse_value(cls, val: str) -> object:
200         try:
201             return int(val)
202         except ValueError:
203             pass
205         if val.lower() == 'true':
206             return True
207         if val.lower() == 'false':
208             return False
209         if val.startswith(('{', '[')):
210             # Try first as pure JSON:
211             try:
212                 return json.loads(val)
213             except ValueError:
214                 pass
215             # Try once again as FuzzyJSON:
216             try:
217                 tree = ast.parse(val, mode='eval')
218                 transformed = FuzzyJSON().visit(tree)
219                 return ast.literal_eval(transformed)
220             except (SyntaxError, ValueError):
221                 pass
222         return val
224     def _cli_expr(self,
225                   tokens: Sequence[str],
226                   parent: qmp.QMPObject) -> None:
227         for arg in tokens:
228             (key, sep, val) = arg.partition('=')
229             if sep != '=':
230                 raise QMPShellError(
231                     f"Expected a key=value pair, got '{arg!s}'"
232                 )
234             value = self._parse_value(val)
235             optpath = key.split('.')
236             curpath = []
237             for path in optpath[:-1]:
238                 curpath.append(path)
239                 obj = parent.get(path, {})
240                 if not isinstance(obj, dict):
241                     msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
242                     raise QMPShellError(msg.format('.'.join(curpath)))
243                 parent[path] = obj
244                 parent = obj
245             if optpath[-1] in parent:
246                 if isinstance(parent[optpath[-1]], dict):
247                     msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
248                     raise QMPShellError(msg.format('.'.join(curpath)))
249                 raise QMPShellError(f'Cannot set "{key}" multiple times')
250             parent[optpath[-1]] = value
252     def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
253         """
254         Build a QMP input object from a user provided command-line in the
255         following format:
257             < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
258         """
259         argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
260         cmdargs = re.findall(argument_regex, cmdline)
261         qmpcmd: QMPMessage
263         # Transactional CLI entry:
264         if cmdargs and cmdargs[0] == 'transaction(':
265             self._transmode = True
266             self._actions = []
267             cmdargs.pop(0)
269         # Transactional CLI exit:
270         if cmdargs and cmdargs[0] == ')' and self._transmode:
271             self._transmode = False
272             if len(cmdargs) > 1:
273                 msg = 'Unexpected input after close of Transaction sub-shell'
274                 raise QMPShellError(msg)
275             qmpcmd = {
276                 'execute': 'transaction',
277                 'arguments': {'actions': self._actions}
278             }
279             return qmpcmd
281         # No args, or no args remaining
282         if not cmdargs:
283             return None
285         if self._transmode:
286             # Parse and cache this Transactional Action
287             finalize = False
288             action = {'type': cmdargs[0], 'data': {}}
289             if cmdargs[-1] == ')':
290                 cmdargs.pop(-1)
291                 finalize = True
292             self._cli_expr(cmdargs[1:], action['data'])
293             self._actions.append(action)
294             return self._build_cmd(')') if finalize else None
296         # Standard command: parse and return it to be executed.
297         qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
298         self._cli_expr(cmdargs[1:], qmpcmd['arguments'])
299         return qmpcmd
301     def _print(self, qmp_message: object) -> None:
302         jsobj = json.dumps(qmp_message,
303                            indent=4 if self.pretty else None,
304                            sort_keys=self.pretty)
305         print(str(jsobj))
307     def _execute_cmd(self, cmdline: str) -> bool:
308         try:
309             qmpcmd = self._build_cmd(cmdline)
310         except QMPShellError as err:
311             print(
312                 f"Error while parsing command line: {err!s}\n"
313                 "command format: <command-name> "
314                 "[arg-name1=arg1] ... [arg-nameN=argN",
315                 file=sys.stderr
316             )
317             return True
318         # For transaction mode, we may have just cached the action:
319         if qmpcmd is None:
320             return True
321         if self.verbose:
322             self._print(qmpcmd)
323         resp = self.cmd_obj(qmpcmd)
324         if resp is None:
325             print('Disconnected')
326             return False
327         self._print(resp)
328         return True
330     def connect(self, negotiate: bool = True) -> None:
331         self._greeting = super().connect(negotiate)
332         self._completer_setup()
334     def show_banner(self,
335                     msg: str = 'Welcome to the QMP low-level shell!') -> None:
336         print(msg)
337         if not self._greeting:
338             print('Connected')
339             return
340         version = self._greeting['QMP']['version']['qemu']
341         print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
343     @property
344     def prompt(self) -> str:
345         if self._transmode:
346             return 'TRANS> '
347         return '(QEMU) '
349     def read_exec_command(self) -> bool:
350         """
351         Read and execute a command.
353         @return True if execution was ok, return False if disconnected.
354         """
355         try:
356             cmdline = input(self.prompt)
357         except EOFError:
358             print()
359             return False
361         if cmdline == '':
362             for event in self.get_events():
363                 print(event)
364             self.clear_events()
365             return True
367         return self._execute_cmd(cmdline)
369     def repl(self) -> Iterator[None]:
370         self.show_banner()
371         while self.read_exec_command():
372             yield
373         self.close()
376 class HMPShell(QMPShell):
377     def __init__(self, address: qmp.SocketAddrT,
378                  pretty: bool = False, verbose: bool = False):
379         super().__init__(address, pretty, verbose)
380         self._cpu_index = 0
382     def _cmd_completion(self) -> None:
383         for cmd in self._cmd_passthrough('help')['return'].split('\r\n'):
384             if cmd and cmd[0] != '[' and cmd[0] != '\t':
385                 name = cmd.split()[0]  # drop help text
386                 if name == 'info':
387                     continue
388                 if name.find('|') != -1:
389                     # Command in the form 'foobar|f' or 'f|foobar', take the
390                     # full name
391                     opt = name.split('|')
392                     if len(opt[0]) == 1:
393                         name = opt[1]
394                     else:
395                         name = opt[0]
396                 self._completer.append(name)
397                 self._completer.append('help ' + name)  # help completion
399     def _info_completion(self) -> None:
400         for cmd in self._cmd_passthrough('info')['return'].split('\r\n'):
401             if cmd:
402                 self._completer.append('info ' + cmd.split()[1])
404     def _other_completion(self) -> None:
405         # special cases
406         self._completer.append('help info')
408     def _fill_completion(self) -> None:
409         self._cmd_completion()
410         self._info_completion()
411         self._other_completion()
413     def _cmd_passthrough(self, cmdline: str,
414                          cpu_index: int = 0) -> QMPMessage:
415         return self.cmd_obj({
416             'execute': 'human-monitor-command',
417             'arguments': {
418                 'command-line': cmdline,
419                 'cpu-index': cpu_index
420             }
421         })
423     def _execute_cmd(self, cmdline: str) -> bool:
424         if cmdline.split()[0] == "cpu":
425             # trap the cpu command, it requires special setting
426             try:
427                 idx = int(cmdline.split()[1])
428                 if 'return' not in self._cmd_passthrough('info version', idx):
429                     print('bad CPU index')
430                     return True
431                 self._cpu_index = idx
432             except ValueError:
433                 print('cpu command takes an integer argument')
434                 return True
435         resp = self._cmd_passthrough(cmdline, self._cpu_index)
436         if resp is None:
437             print('Disconnected')
438             return False
439         assert 'return' in resp or 'error' in resp
440         if 'return' in resp:
441             # Success
442             if len(resp['return']) > 0:
443                 print(resp['return'], end=' ')
444         else:
445             # Error
446             print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
447         return True
449     def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
450         QMPShell.show_banner(self, msg)
453 def die(msg: str) -> NoReturn:
454     sys.stderr.write('ERROR: %s\n' % msg)
455     sys.exit(1)
458 def main() -> None:
459     parser = argparse.ArgumentParser()
460     parser.add_argument('-H', '--hmp', action='store_true',
461                         help='Use HMP interface')
462     parser.add_argument('-N', '--skip-negotiation', action='store_true',
463                         help='Skip negotiate (for qemu-ga)')
464     parser.add_argument('-v', '--verbose', action='store_true',
465                         help='Verbose (echo commands sent and received)')
466     parser.add_argument('-p', '--pretty', action='store_true',
467                         help='Pretty-print JSON')
469     default_server = os.environ.get('QMP_SOCKET')
470     parser.add_argument('qmp_server', action='store',
471                         default=default_server,
472                         help='< UNIX socket path | TCP address:port >')
474     args = parser.parse_args()
475     if args.qmp_server is None:
476         parser.error("QMP socket or TCP address must be specified")
478     shell_class = HMPShell if args.hmp else QMPShell
480     try:
481         address = shell_class.parse_address(args.qmp_server)
482     except qmp.QMPBadPortError:
483         parser.error(f"Bad port number: {args.qmp_server}")
484         return  # pycharm doesn't know error() is noreturn
486     with shell_class(address, args.pretty, args.verbose) as qemu:
487         try:
488             qemu.connect(negotiate=not args.skip_negotiation)
489         except qmp.QMPConnectError:
490             die("Didn't get QMP greeting message")
491         except qmp.QMPCapabilitiesError:
492             die("Couldn't negotiate capabilities")
493         except OSError as err:
494             die(f"Couldn't connect to {args.qmp_server}: {err!s}")
496         for _ in qemu.repl():
497             pass
500 if __name__ == '__main__':
501     main()