osdep: powerpc64 align memory to allow 2MB radix THP page tables
[qemu/ar7.git] / scripts / qtest.py
blobdf0daf26ca34d0f6350efde48240c94567ffefc6
1 # QEMU qtest library
3 # Copyright (C) 2015 Red Hat Inc.
5 # Authors:
6 # Fam Zheng <famz@redhat.com>
8 # This work is licensed under the terms of the GNU GPL, version 2. See
9 # the COPYING file in the top-level directory.
11 # Based on qmp.py.
14 import socket
15 import os
16 import qemu
19 class QEMUQtestProtocol(object):
20 def __init__(self, address, server=False):
21 """
22 Create a QEMUQtestProtocol object.
24 @param address: QEMU address, can be either a unix socket path (string)
25 or a tuple in the form ( address, port ) for a TCP
26 connection
27 @param server: server mode, listens on the socket (bool)
28 @raise socket.error on socket connection errors
29 @note No connection is established, this is done by the connect() or
30 accept() methods
31 """
32 self._address = address
33 self._sock = self._get_sock()
34 if server:
35 self._sock.bind(self._address)
36 self._sock.listen(1)
38 def _get_sock(self):
39 if isinstance(self._address, tuple):
40 family = socket.AF_INET
41 else:
42 family = socket.AF_UNIX
43 return socket.socket(family, socket.SOCK_STREAM)
45 def connect(self):
46 """
47 Connect to the qtest socket.
49 @raise socket.error on socket connection errors
50 """
51 self._sock.connect(self._address)
53 def accept(self):
54 """
55 Await connection from QEMU.
57 @raise socket.error on socket connection errors
58 """
59 self._sock, _ = self._sock.accept()
61 def cmd(self, qtest_cmd):
62 """
63 Send a qtest command on the wire.
65 @param qtest_cmd: qtest command text to be sent
66 """
67 self._sock.sendall(qtest_cmd + "\n")
69 def close(self):
70 self._sock.close()
72 def settimeout(self, timeout):
73 self._sock.settimeout(timeout)
76 class QEMUQtestMachine(qemu.QEMUMachine):
77 '''A QEMU VM'''
79 def __init__(self, binary, args=None, name=None, test_dir="/var/tmp",
80 socket_scm_helper=None):
81 if name is None:
82 name = "qemu-%d" % os.getpid()
83 super(QEMUQtestMachine,
84 self).__init__(binary, args, name=name, test_dir=test_dir,
85 socket_scm_helper=socket_scm_helper)
86 self._qtest = None
87 self._qtest_path = os.path.join(test_dir, name + "-qtest.sock")
89 def _base_args(self):
90 args = super(QEMUQtestMachine, self)._base_args()
91 args.extend(['-qtest', 'unix:path=' + self._qtest_path,
92 '-machine', 'accel=qtest'])
93 return args
95 def _pre_launch(self):
96 super(QEMUQtestMachine, self)._pre_launch()
97 self._qtest = QEMUQtestProtocol(self._qtest_path, server=True)
99 def _post_launch(self):
100 super(QEMUQtestMachine, self)._post_launch()
101 self._qtest.accept()
103 def _post_shutdown(self):
104 super(QEMUQtestMachine, self)._post_shutdown()
105 self._remove_if_exists(self._qtest_path)
107 def qtest(self, cmd):
108 '''Send a qtest command to guest'''
109 return self._qtest.cmd(cmd)