qcow2: Add list of bitmaps to ImageInfoSpecificQCow2
[qemu/ar7.git] / scripts / qtest.py
blobafac3fe900ab5b443ccb618907f3d45cf0dfdf39
1 # QEMU qtest library
3 # Copyright (C) 2015 Red Hat Inc.
5 # Authors:
6 # Fam Zheng <famz@redhat.com>
8 # This work is licensed under the terms of the GNU GPL, version 2. See
9 # the COPYING file in the top-level directory.
11 # Based on qmp.py.
14 import socket
15 import os
16 import qemu
19 class QEMUQtestProtocol(object):
20 def __init__(self, address, server=False):
21 """
22 Create a QEMUQtestProtocol object.
24 @param address: QEMU address, can be either a unix socket path (string)
25 or a tuple in the form ( address, port ) for a TCP
26 connection
27 @param server: server mode, listens on the socket (bool)
28 @raise socket.error on socket connection errors
29 @note No connection is established, this is done by the connect() or
30 accept() methods
31 """
32 self._address = address
33 self._sock = self._get_sock()
34 self._sockfile = None
35 if server:
36 self._sock.bind(self._address)
37 self._sock.listen(1)
39 def _get_sock(self):
40 if isinstance(self._address, tuple):
41 family = socket.AF_INET
42 else:
43 family = socket.AF_UNIX
44 return socket.socket(family, socket.SOCK_STREAM)
46 def connect(self):
47 """
48 Connect to the qtest socket.
50 @raise socket.error on socket connection errors
51 """
52 self._sock.connect(self._address)
53 self._sockfile = self._sock.makefile()
55 def accept(self):
56 """
57 Await connection from QEMU.
59 @raise socket.error on socket connection errors
60 """
61 self._sock, _ = self._sock.accept()
62 self._sockfile = self._sock.makefile()
64 def cmd(self, qtest_cmd):
65 """
66 Send a qtest command on the wire.
68 @param qtest_cmd: qtest command text to be sent
69 """
70 self._sock.sendall((qtest_cmd + "\n").encode('utf-8'))
71 resp = self._sockfile.readline()
72 return resp
74 def close(self):
75 self._sock.close()
76 self._sockfile.close()
78 def settimeout(self, timeout):
79 self._sock.settimeout(timeout)
82 class QEMUQtestMachine(qemu.QEMUMachine):
83 '''A QEMU VM'''
85 def __init__(self, binary, args=None, name=None, test_dir="/var/tmp",
86 socket_scm_helper=None):
87 if name is None:
88 name = "qemu-%d" % os.getpid()
89 super(QEMUQtestMachine,
90 self).__init__(binary, args, name=name, test_dir=test_dir,
91 socket_scm_helper=socket_scm_helper)
92 self._qtest = None
93 self._qtest_path = os.path.join(test_dir, name + "-qtest.sock")
95 def _base_args(self):
96 args = super(QEMUQtestMachine, self)._base_args()
97 args.extend(['-qtest', 'unix:path=' + self._qtest_path,
98 '-machine', 'accel=qtest'])
99 return args
101 def _pre_launch(self):
102 super(QEMUQtestMachine, self)._pre_launch()
103 self._qtest = QEMUQtestProtocol(self._qtest_path, server=True)
105 def _post_launch(self):
106 super(QEMUQtestMachine, self)._post_launch()
107 self._qtest.accept()
109 def _post_shutdown(self):
110 super(QEMUQtestMachine, self)._post_shutdown()
111 self._remove_if_exists(self._qtest_path)
113 def qtest(self, cmd):
114 '''Send a qtest command to guest'''
115 return self._qtest.cmd(cmd)