3 # QEMU Object Model test tools
5 # Copyright IBM, Corp. 2012
6 # Copyright (C) 2020 Red Hat, Inc.
9 # Anthony Liguori <aliguori@us.ibm.com>
10 # Markus Armbruster <armbru@redhat.com>
12 # This work is licensed under the terms of the GNU GPL, version 2 or later. See
13 # the COPYING file in the top-level directory.
17 from fuse import FUSE, FuseOSError, Operations
21 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
22 from qemu.qmp import QEMUMonitorProtocol
24 fuse.fuse_python_api = (0, 2)
26 class QOMFS(Operations):
27 def __init__(self, qmp):
33 def get_ino(self, path):
34 if path in self.ino_map:
35 return self.ino_map[path]
36 self.ino_map[path] = self.ino_count
38 return self.ino_map[path]
40 def is_object(self, path):
42 items = self.qmp.command('qom-list', path=path)
47 def is_property(self, path):
49 path, prop = path.rsplit('/', 1)
50 for item in self.qmp.command('qom-list', path=path):
51 if item['name'] == prop:
57 def is_link(self, path):
59 path, prop = path.rsplit('/', 1)
60 for item in self.qmp.command('qom-list', path=path):
61 if item['name'] == prop:
62 if item['type'].startswith('link<'):
69 def read(self, path, length, offset, fh):
70 if not self.is_property(path):
73 path, prop = path.rsplit('/', 1)
75 data = self.qmp.command('qom-get', path=path, property=prop)
76 data += '\n' # make values shell friendly
78 raise FuseOSError(EPERM)
80 if offset > len(data):
83 return bytes(data[offset:][:length], encoding='utf-8')
85 def readlink(self, path):
86 if not self.is_link(path):
88 path, prop = path.rsplit('/', 1)
89 prefix = '/'.join(['..'] * (len(path.split('/')) - 1))
90 return prefix + str(self.qmp.command('qom-get', path=path,
93 def getattr(self, path, fh=None):
94 if self.is_link(path):
95 value = { 'st_mode': 0o755 | stat.S_IFLNK,
96 'st_ino': self.get_ino(path),
105 elif self.is_object(path):
106 value = { 'st_mode': 0o755 | stat.S_IFDIR,
107 'st_ino': self.get_ino(path),
116 elif self.is_property(path):
117 value = { 'st_mode': 0o644 | stat.S_IFREG,
118 'st_ino': self.get_ino(path),
128 raise FuseOSError(ENOENT)
131 def readdir(self, path, fh):
134 for item in self.qmp.command('qom-list', path=path):
135 yield str(item['name'])
137 if __name__ == '__main__':
140 fuse = FUSE(QOMFS(QEMUMonitorProtocol(os.environ['QMP_SOCKET'])),
141 sys.argv[1], foreground=True)