target-ppc: add xscmp[eq,gt,ge,ne]dp instructions
[qemu/rayw.git] / scripts / qtest.py
blobd5aecb5f49b963870925c17a5fc31b6834c43eb8
1 # QEMU qtest library
3 # Copyright (C) 2015 Red Hat Inc.
5 # Authors:
6 # Fam Zheng <famz@redhat.com>
8 # This work is licensed under the terms of the GNU GPL, version 2. See
9 # the COPYING file in the top-level directory.
11 # Based on qmp.py.
14 import errno
15 import socket
16 import string
17 import os
18 import subprocess
19 import qmp.qmp
20 import qemu
22 class QEMUQtestProtocol(object):
23 def __init__(self, address, server=False):
24 """
25 Create a QEMUQtestProtocol object.
27 @param address: QEMU address, can be either a unix socket path (string)
28 or a tuple in the form ( address, port ) for a TCP
29 connection
30 @param server: server mode, listens on the socket (bool)
31 @raise socket.error on socket connection errors
32 @note No connection is established, this is done by the connect() or
33 accept() methods
34 """
35 self._address = address
36 self._sock = self._get_sock()
37 if server:
38 self._sock.bind(self._address)
39 self._sock.listen(1)
41 def _get_sock(self):
42 if isinstance(self._address, tuple):
43 family = socket.AF_INET
44 else:
45 family = socket.AF_UNIX
46 return socket.socket(family, socket.SOCK_STREAM)
48 def connect(self):
49 """
50 Connect to the qtest socket.
52 @raise socket.error on socket connection errors
53 """
54 self._sock.connect(self._address)
56 def accept(self):
57 """
58 Await connection from QEMU.
60 @raise socket.error on socket connection errors
61 """
62 self._sock, _ = self._sock.accept()
64 def cmd(self, qtest_cmd):
65 """
66 Send a qtest command on the wire.
68 @param qtest_cmd: qtest command text to be sent
69 """
70 self._sock.sendall(qtest_cmd + "\n")
72 def close(self):
73 self._sock.close()
75 def settimeout(self, timeout):
76 self._sock.settimeout(timeout)
79 class QEMUQtestMachine(qemu.QEMUMachine):
80 '''A QEMU VM'''
82 def __init__(self, binary, args=[], name=None, test_dir="/var/tmp",
83 socket_scm_helper=None):
84 if name is None:
85 name = "qemu-%d" % os.getpid()
86 super(QEMUQtestMachine, self).__init__(binary, args, name=name, test_dir=test_dir,
87 socket_scm_helper=socket_scm_helper)
88 self._qtest_path = os.path.join(test_dir, name + "-qtest.sock")
90 def _base_args(self):
91 args = super(QEMUQtestMachine, self)._base_args()
92 args.extend(['-qtest', 'unix:path=' + self._qtest_path,
93 '-machine', 'accel=qtest'])
94 return args
96 def _pre_launch(self):
97 super(QEMUQtestMachine, self)._pre_launch()
98 self._qtest = QEMUQtestProtocol(self._qtest_path, server=True)
100 def _post_launch(self):
101 super(QEMUQtestMachine, self)._post_launch()
102 self._qtest.accept()
104 def _post_shutdown(self):
105 super(QEMUQtestMachine, self)._post_shutdown()
106 self._remove_if_exists(self._qtest_path)
108 def qtest(self, cmd):
109 '''Send a qtest command to guest'''
110 return self._qtest.cmd(cmd)