scripts/qom-fuse: apply flake8 rules
[qemu.git] / scripts / qmp / qom-fuse
blobca30e928679ef8647e753822941fab30db803eb6
1 #!/usr/bin/env python3
2 ##
3 # QEMU Object Model test tools
5 # Copyright IBM, Corp. 2012
6 # Copyright (C) 2020 Red Hat, Inc.
8 # Authors:
9 #  Anthony Liguori   <aliguori@us.ibm.com>
10 #  Markus Armbruster <armbru@redhat.com>
12 # This work is licensed under the terms of the GNU GPL, version 2 or later.
13 # See the COPYING file in the top-level directory.
16 from errno import ENOENT, EPERM
17 import os
18 import stat
19 import sys
21 import fuse
22 from fuse import FUSE, FuseOSError, Operations
25 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
26 from qemu.qmp import QEMUMonitorProtocol
29 fuse.fuse_python_api = (0, 2)
32 class QOMFS(Operations):
33     def __init__(self, qmp):
34         self.qmp = qmp
35         self.qmp.connect()
36         self.ino_map = {}
37         self.ino_count = 1
39     def get_ino(self, path):
40         if path in self.ino_map:
41             return self.ino_map[path]
42         self.ino_map[path] = self.ino_count
43         self.ino_count += 1
44         return self.ino_map[path]
46     def is_object(self, path):
47         try:
48             self.qmp.command('qom-list', path=path)
49             return True
50         except:
51             return False
53     def is_property(self, path):
54         path, prop = path.rsplit('/', 1)
55         if path == '':
56             path = '/'
57         try:
58             for item in self.qmp.command('qom-list', path=path):
59                 if item['name'] == prop:
60                     return True
61             return False
62         except:
63             return False
65     def is_link(self, path):
66         path, prop = path.rsplit('/', 1)
67         if path == '':
68             path = '/'
69         try:
70             for item in self.qmp.command('qom-list', path=path):
71                 if item['name'] == prop:
72                     if item['type'].startswith('link<'):
73                         return True
74                     return False
75             return False
76         except:
77             return False
79     def read(self, path, length, offset, fh):
80         if not self.is_property(path):
81             return -ENOENT
83         path, prop = path.rsplit('/', 1)
84         if path == '':
85             path = '/'
86         try:
87             data = self.qmp.command('qom-get', path=path, property=prop)
88             data += '\n'  # make values shell friendly
89         except:
90             raise FuseOSError(EPERM)
92         if offset > len(data):
93             return ''
95         return bytes(data[offset:][:length], encoding='utf-8')
97     def readlink(self, path):
98         if not self.is_link(path):
99             return False
100         path, prop = path.rsplit('/', 1)
101         prefix = '/'.join(['..'] * (len(path.split('/')) - 1))
102         return prefix + str(self.qmp.command('qom-get', path=path,
103                                              property=prop))
105     def getattr(self, path, fh=None):
106         if self.is_link(path):
107             value = {
108                 'st_mode': 0o755 | stat.S_IFLNK,
109                 'st_ino': self.get_ino(path),
110                 'st_dev': 0,
111                 'st_nlink': 2,
112                 'st_uid': 1000,
113                 'st_gid': 1000,
114                 'st_size': 4096,
115                 'st_atime': 0,
116                 'st_mtime': 0,
117                 'st_ctime': 0
118             }
119         elif self.is_object(path):
120             value = {
121                 'st_mode': 0o755 | stat.S_IFDIR,
122                 'st_ino': self.get_ino(path),
123                 'st_dev': 0,
124                 'st_nlink': 2,
125                 'st_uid': 1000,
126                 'st_gid': 1000,
127                 'st_size': 4096,
128                 'st_atime': 0,
129                 'st_mtime': 0,
130                 'st_ctime': 0
131             }
132         elif self.is_property(path):
133             value = {
134                 'st_mode': 0o644 | stat.S_IFREG,
135                 'st_ino': self.get_ino(path),
136                 'st_dev': 0,
137                 'st_nlink': 1,
138                 'st_uid': 1000,
139                 'st_gid': 1000,
140                 'st_size': 4096,
141                 'st_atime': 0,
142                 'st_mtime': 0,
143                 'st_ctime': 0
144             }
145         else:
146             raise FuseOSError(ENOENT)
147         return value
149     def readdir(self, path, fh):
150         yield '.'
151         yield '..'
152         for item in self.qmp.command('qom-list', path=path):
153             yield str(item['name'])
156 if __name__ == '__main__':
157     fuse = FUSE(QOMFS(QEMUMonitorProtocol(os.environ['QMP_SOCKET'])),
158                 sys.argv[1], foreground=True)