qdev: New qdev_new(), qdev_realize(), etc.
[qemu/armbru.git] / tests / qemu-iotests / qcow2_format.py
blob0f65fd161d5befea33bd02f6f00827295cbd47ab
1 # Library for manipulations with qcow2 image
3 # Copyright (c) 2020 Virtuozzo International GmbH.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import struct
20 import string
23 class Qcow2Field:
25 def __init__(self, value):
26 self.value = value
28 def __str__(self):
29 return str(self.value)
32 class Flags64(Qcow2Field):
34 def __str__(self):
35 bits = []
36 for bit in range(64):
37 if self.value & (1 << bit):
38 bits.append(bit)
39 return str(bits)
42 class Enum(Qcow2Field):
44 def __str__(self):
45 return f'{self.value:#x} ({self.mapping.get(self.value, "<unknown>")})'
48 class Qcow2StructMeta(type):
50 # Mapping from c types to python struct format
51 ctypes = {
52 'u8': 'B',
53 'u16': 'H',
54 'u32': 'I',
55 'u64': 'Q'
58 def __init__(self, name, bases, attrs):
59 if 'fields' in attrs:
60 self.fmt = '>' + ''.join(self.ctypes[f[0]] for f in self.fields)
63 class Qcow2Struct(metaclass=Qcow2StructMeta):
65 """Qcow2Struct: base class for qcow2 data structures
67 Successors should define fields class variable, which is: list of tuples,
68 each of three elements:
69 - c-type (one of 'u8', 'u16', 'u32', 'u64')
70 - format (format_spec to use with .format() when dump or 'mask' to dump
71 bitmasks)
72 - field name
73 """
75 def __init__(self, fd=None, offset=None, data=None):
76 """
77 Two variants:
78 1. Specify data. fd and offset must be None.
79 2. Specify fd and offset, data must be None. offset may be omitted
80 in this case, than current position of fd is used.
81 """
82 if data is None:
83 assert fd is not None
84 buf_size = struct.calcsize(self.fmt)
85 if offset is not None:
86 fd.seek(offset)
87 data = fd.read(buf_size)
88 else:
89 assert fd is None and offset is None
91 values = struct.unpack(self.fmt, data)
92 self.__dict__ = dict((field[2], values[i])
93 for i, field in enumerate(self.fields))
95 def dump(self):
96 for f in self.fields:
97 value = self.__dict__[f[2]]
98 if isinstance(f[1], str):
99 value_str = f[1].format(value)
100 else:
101 value_str = str(f[1](value))
103 print('{:<25} {}'.format(f[2], value_str))
106 class Qcow2BitmapExt(Qcow2Struct):
108 fields = (
109 ('u32', '{}', 'nb_bitmaps'),
110 ('u32', '{}', 'reserved32'),
111 ('u64', '{:#x}', 'bitmap_directory_size'),
112 ('u64', '{:#x}', 'bitmap_directory_offset')
116 QCOW2_EXT_MAGIC_BITMAPS = 0x23852875
119 class QcowHeaderExtension(Qcow2Struct):
121 class Magic(Enum):
122 mapping = {
123 0xe2792aca: 'Backing format',
124 0x6803f857: 'Feature table',
125 0x0537be77: 'Crypto header',
126 QCOW2_EXT_MAGIC_BITMAPS: 'Bitmaps',
127 0x44415441: 'Data file'
130 fields = (
131 ('u32', Magic, 'magic'),
132 ('u32', '{}', 'length')
133 # length bytes of data follows
134 # then padding to next multiply of 8
137 def __init__(self, magic=None, length=None, data=None, fd=None):
139 Support both loading from fd and creation from user data.
140 For fd-based creation current position in a file will be used to read
141 the data.
143 This should be somehow refactored and functionality should be moved to
144 superclass (to allow creation of any qcow2 struct), but then, fields
145 of variable length (data here) should be supported in base class
146 somehow. Note also, that we probably want to parse different
147 extensions. Should they be subclasses of this class, or how to do it
148 better? Should it be something like QAPI union with discriminator field
149 (magic here). So, it's a TODO. We'll see how to properly refactor this
150 when we have more qcow2 structures.
152 if fd is None:
153 assert all(v is not None for v in (magic, length, data))
154 self.magic = magic
155 self.length = length
156 if length % 8 != 0:
157 padding = 8 - (length % 8)
158 data += b'\0' * padding
159 self.data = data
160 else:
161 assert all(v is None for v in (magic, length, data))
162 super().__init__(fd=fd)
163 padded = (self.length + 7) & ~7
164 self.data = fd.read(padded)
165 assert self.data is not None
167 if self.magic == QCOW2_EXT_MAGIC_BITMAPS:
168 self.obj = Qcow2BitmapExt(data=self.data)
169 else:
170 self.obj = None
172 def dump(self):
173 super().dump()
175 if self.obj is None:
176 data = self.data[:self.length]
177 if all(c in string.printable.encode('ascii') for c in data):
178 data = f"'{ data.decode('ascii') }'"
179 else:
180 data = '<binary>'
181 print(f'{"data":<25} {data}')
182 else:
183 self.obj.dump()
185 @classmethod
186 def create(cls, magic, data):
187 return QcowHeaderExtension(magic, len(data), data)
190 class QcowHeader(Qcow2Struct):
192 fields = (
193 # Version 2 header fields
194 ('u32', '{:#x}', 'magic'),
195 ('u32', '{}', 'version'),
196 ('u64', '{:#x}', 'backing_file_offset'),
197 ('u32', '{:#x}', 'backing_file_size'),
198 ('u32', '{}', 'cluster_bits'),
199 ('u64', '{}', 'size'),
200 ('u32', '{}', 'crypt_method'),
201 ('u32', '{}', 'l1_size'),
202 ('u64', '{:#x}', 'l1_table_offset'),
203 ('u64', '{:#x}', 'refcount_table_offset'),
204 ('u32', '{}', 'refcount_table_clusters'),
205 ('u32', '{}', 'nb_snapshots'),
206 ('u64', '{:#x}', 'snapshot_offset'),
208 # Version 3 header fields
209 ('u64', Flags64, 'incompatible_features'),
210 ('u64', Flags64, 'compatible_features'),
211 ('u64', Flags64, 'autoclear_features'),
212 ('u32', '{}', 'refcount_order'),
213 ('u32', '{}', 'header_length'),
216 def __init__(self, fd):
217 super().__init__(fd=fd, offset=0)
219 self.set_defaults()
220 self.cluster_size = 1 << self.cluster_bits
222 fd.seek(self.header_length)
223 self.load_extensions(fd)
225 if self.backing_file_offset:
226 fd.seek(self.backing_file_offset)
227 self.backing_file = fd.read(self.backing_file_size)
228 else:
229 self.backing_file = None
231 def set_defaults(self):
232 if self.version == 2:
233 self.incompatible_features = 0
234 self.compatible_features = 0
235 self.autoclear_features = 0
236 self.refcount_order = 4
237 self.header_length = 72
239 def load_extensions(self, fd):
240 self.extensions = []
242 if self.backing_file_offset != 0:
243 end = min(self.cluster_size, self.backing_file_offset)
244 else:
245 end = self.cluster_size
247 while fd.tell() < end:
248 ext = QcowHeaderExtension(fd=fd)
249 if ext.magic == 0:
250 break
251 else:
252 self.extensions.append(ext)
254 def update_extensions(self, fd):
256 fd.seek(self.header_length)
257 extensions = self.extensions
258 extensions.append(QcowHeaderExtension(0, 0, b''))
259 for ex in extensions:
260 buf = struct.pack('>II', ex.magic, ex.length)
261 fd.write(buf)
262 fd.write(ex.data)
264 if self.backing_file is not None:
265 self.backing_file_offset = fd.tell()
266 fd.write(self.backing_file)
268 if fd.tell() > self.cluster_size:
269 raise Exception('I think I just broke the image...')
271 def update(self, fd):
272 header_bytes = self.header_length
274 self.update_extensions(fd)
276 fd.seek(0)
277 header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields)
278 buf = struct.pack(QcowHeader.fmt, *header)
279 buf = buf[0:header_bytes-1]
280 fd.write(buf)
282 def dump_extensions(self):
283 for ex in self.extensions:
284 print('Header extension:')
285 ex.dump()
286 print()