hw/arm: add Xunlong Orange Pi PC machine
[qemu/ar7.git] / python / qemu / qtest.py
blobd24ad04256b7dd5889fbe7da1814ec1cfe15fa6a
1 # QEMU qtest library
3 # Copyright (C) 2015 Red Hat Inc.
5 # Authors:
6 # Fam Zheng <famz@redhat.com>
8 # This work is licensed under the terms of the GNU GPL, version 2. See
9 # the COPYING file in the top-level directory.
11 # Based on qmp.py.
14 import socket
15 import os
17 from .machine import QEMUMachine
20 class QEMUQtestProtocol(object):
21 def __init__(self, address, server=False):
22 """
23 Create a QEMUQtestProtocol object.
25 @param address: QEMU address, can be either a unix socket path (string)
26 or a tuple in the form ( address, port ) for a TCP
27 connection
28 @param server: server mode, listens on the socket (bool)
29 @raise socket.error on socket connection errors
30 @note No connection is established, this is done by the connect() or
31 accept() methods
32 """
33 self._address = address
34 self._sock = self._get_sock()
35 self._sockfile = None
36 if server:
37 self._sock.bind(self._address)
38 self._sock.listen(1)
40 def _get_sock(self):
41 if isinstance(self._address, tuple):
42 family = socket.AF_INET
43 else:
44 family = socket.AF_UNIX
45 return socket.socket(family, socket.SOCK_STREAM)
47 def connect(self):
48 """
49 Connect to the qtest socket.
51 @raise socket.error on socket connection errors
52 """
53 self._sock.connect(self._address)
54 self._sockfile = self._sock.makefile()
56 def accept(self):
57 """
58 Await connection from QEMU.
60 @raise socket.error on socket connection errors
61 """
62 self._sock, _ = self._sock.accept()
63 self._sockfile = self._sock.makefile()
65 def cmd(self, qtest_cmd):
66 """
67 Send a qtest command on the wire.
69 @param qtest_cmd: qtest command text to be sent
70 """
71 self._sock.sendall((qtest_cmd + "\n").encode('utf-8'))
72 resp = self._sockfile.readline()
73 return resp
75 def close(self):
76 self._sock.close()
77 self._sockfile.close()
79 def settimeout(self, timeout):
80 self._sock.settimeout(timeout)
83 class QEMUQtestMachine(QEMUMachine):
84 '''A QEMU VM'''
86 def __init__(self, binary, args=None, name=None, test_dir="/var/tmp",
87 socket_scm_helper=None, sock_dir=None):
88 if name is None:
89 name = "qemu-%d" % os.getpid()
90 if sock_dir is None:
91 sock_dir = test_dir
92 super(QEMUQtestMachine,
93 self).__init__(binary, args, name=name, test_dir=test_dir,
94 socket_scm_helper=socket_scm_helper,
95 sock_dir=sock_dir)
96 self._qtest = None
97 self._qtest_path = os.path.join(sock_dir, name + "-qtest.sock")
99 def _base_args(self):
100 args = super(QEMUQtestMachine, self)._base_args()
101 args.extend(['-qtest', 'unix:path=' + self._qtest_path,
102 '-accel', 'qtest'])
103 return args
105 def _pre_launch(self):
106 super(QEMUQtestMachine, self)._pre_launch()
107 self._qtest = QEMUQtestProtocol(self._qtest_path, server=True)
109 def _post_launch(self):
110 super(QEMUQtestMachine, self)._post_launch()
111 self._qtest.accept()
113 def _post_shutdown(self):
114 super(QEMUQtestMachine, self)._post_shutdown()
115 self._remove_if_exists(self._qtest_path)
117 def qtest(self, cmd):
118 '''Send a qtest command to guest'''
119 return self._qtest.cmd(cmd)