qcow2_format.py: support dumping metadata in JSON format
[qemu/ar7.git] / tests / qemu-iotests / qcow2_format.py
blob8adc9959e10bf637c72a67263186b04cac59bb3c
1 # Library for manipulations with qcow2 image
3 # Copyright (c) 2020 Virtuozzo International GmbH.
4 # Copyright (C) 2012 Red Hat, Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 import struct
21 import string
22 import json
25 class ComplexEncoder(json.JSONEncoder):
26 def default(self, obj):
27 if hasattr(obj, 'to_json'):
28 return obj.to_json()
29 else:
30 return json.JSONEncoder.default(self, obj)
33 class Qcow2Field:
35 def __init__(self, value):
36 self.value = value
38 def __str__(self):
39 return str(self.value)
42 class Flags64(Qcow2Field):
44 def __str__(self):
45 bits = []
46 for bit in range(64):
47 if self.value & (1 << bit):
48 bits.append(bit)
49 return str(bits)
52 class BitmapFlags(Qcow2Field):
54 flags = {
55 0x1: 'in-use',
56 0x2: 'auto'
59 def __str__(self):
60 bits = []
61 for bit in range(64):
62 flag = self.value & (1 << bit)
63 if flag:
64 bits.append(self.flags.get(flag, f'bit-{bit}'))
65 return f'{self.value:#x} ({bits})'
68 class Enum(Qcow2Field):
70 def __str__(self):
71 return f'{self.value:#x} ({self.mapping.get(self.value, "<unknown>")})'
74 class Qcow2StructMeta(type):
76 # Mapping from c types to python struct format
77 ctypes = {
78 'u8': 'B',
79 'u16': 'H',
80 'u32': 'I',
81 'u64': 'Q'
84 def __init__(self, name, bases, attrs):
85 if 'fields' in attrs:
86 self.fmt = '>' + ''.join(self.ctypes[f[0]] for f in self.fields)
89 class Qcow2Struct(metaclass=Qcow2StructMeta):
91 """Qcow2Struct: base class for qcow2 data structures
93 Successors should define fields class variable, which is: list of tuples,
94 each of three elements:
95 - c-type (one of 'u8', 'u16', 'u32', 'u64')
96 - format (format_spec to use with .format() when dump or 'mask' to dump
97 bitmasks)
98 - field name
99 """
101 def __init__(self, fd=None, offset=None, data=None):
103 Two variants:
104 1. Specify data. fd and offset must be None.
105 2. Specify fd and offset, data must be None. offset may be omitted
106 in this case, than current position of fd is used.
108 if data is None:
109 assert fd is not None
110 buf_size = struct.calcsize(self.fmt)
111 if offset is not None:
112 fd.seek(offset)
113 data = fd.read(buf_size)
114 else:
115 assert fd is None and offset is None
117 values = struct.unpack(self.fmt, data)
118 self.__dict__ = dict((field[2], values[i])
119 for i, field in enumerate(self.fields))
121 def dump(self, is_json=False):
122 if is_json:
123 print(json.dumps(self.to_json(), indent=4, cls=ComplexEncoder))
124 return
126 for f in self.fields:
127 value = self.__dict__[f[2]]
128 if isinstance(f[1], str):
129 value_str = f[1].format(value)
130 else:
131 value_str = str(f[1](value))
133 print('{:<25} {}'.format(f[2], value_str))
135 def to_json(self):
136 return dict((f[2], self.__dict__[f[2]]) for f in self.fields)
139 class Qcow2BitmapExt(Qcow2Struct):
141 fields = (
142 ('u32', '{}', 'nb_bitmaps'),
143 ('u32', '{}', 'reserved32'),
144 ('u64', '{:#x}', 'bitmap_directory_size'),
145 ('u64', '{:#x}', 'bitmap_directory_offset')
148 def __init__(self, fd, cluster_size):
149 super().__init__(fd=fd)
150 tail = struct.calcsize(self.fmt) % 8
151 if tail:
152 fd.seek(8 - tail, 1)
153 position = fd.tell()
154 self.cluster_size = cluster_size
155 self.read_bitmap_directory(fd)
156 fd.seek(position)
158 def read_bitmap_directory(self, fd):
159 fd.seek(self.bitmap_directory_offset)
160 self.bitmap_directory = \
161 [Qcow2BitmapDirEntry(fd, cluster_size=self.cluster_size)
162 for _ in range(self.nb_bitmaps)]
164 def dump(self):
165 super().dump()
166 for entry in self.bitmap_directory:
167 print()
168 entry.dump()
170 def to_json(self):
171 fields_dict = super().to_json()
172 fields_dict['bitmap_directory'] = self.bitmap_directory
173 return fields_dict
176 class Qcow2BitmapDirEntry(Qcow2Struct):
178 fields = (
179 ('u64', '{:#x}', 'bitmap_table_offset'),
180 ('u32', '{}', 'bitmap_table_size'),
181 ('u32', BitmapFlags, 'flags'),
182 ('u8', '{}', 'type'),
183 ('u8', '{}', 'granularity_bits'),
184 ('u16', '{}', 'name_size'),
185 ('u32', '{}', 'extra_data_size')
188 def __init__(self, fd, cluster_size):
189 super().__init__(fd=fd)
190 self.cluster_size = cluster_size
191 # Seek relative to the current position in the file
192 fd.seek(self.extra_data_size, 1)
193 bitmap_name = fd.read(self.name_size)
194 self.name = bitmap_name.decode('ascii')
195 # Move position to the end of the entry in the directory
196 entry_raw_size = self.bitmap_dir_entry_raw_size()
197 padding = ((entry_raw_size + 7) & ~7) - entry_raw_size
198 fd.seek(padding, 1)
199 self.bitmap_table = Qcow2BitmapTable(fd=fd,
200 offset=self.bitmap_table_offset,
201 nb_entries=self.bitmap_table_size,
202 cluster_size=self.cluster_size)
204 def bitmap_dir_entry_raw_size(self):
205 return struct.calcsize(self.fmt) + self.name_size + \
206 self.extra_data_size
208 def dump(self):
209 print(f'{"Bitmap name":<25} {self.name}')
210 super(Qcow2BitmapDirEntry, self).dump()
211 self.bitmap_table.dump()
213 def to_json(self):
214 # Put the name ahead of the dict
215 return {
216 'name': self.name,
217 **super().to_json(),
218 'bitmap_table': self.bitmap_table
222 class Qcow2BitmapTableEntry(Qcow2Struct):
224 fields = (
225 ('u64', '{}', 'entry'),
228 BME_TABLE_ENTRY_RESERVED_MASK = 0xff000000000001fe
229 BME_TABLE_ENTRY_OFFSET_MASK = 0x00fffffffffffe00
230 BME_TABLE_ENTRY_FLAG_ALL_ONES = 1
232 def __init__(self, fd):
233 super().__init__(fd=fd)
234 self.reserved = self.entry & self.BME_TABLE_ENTRY_RESERVED_MASK
235 self.offset = self.entry & self.BME_TABLE_ENTRY_OFFSET_MASK
236 if self.offset:
237 if self.entry & self.BME_TABLE_ENTRY_FLAG_ALL_ONES:
238 self.type = 'invalid'
239 else:
240 self.type = 'serialized'
241 elif self.entry & self.BME_TABLE_ENTRY_FLAG_ALL_ONES:
242 self.type = 'all-ones'
243 else:
244 self.type = 'all-zeroes'
246 def to_json(self):
247 return {'type': self.type, 'offset': self.offset,
248 'reserved': self.reserved}
251 class Qcow2BitmapTable:
253 def __init__(self, fd, offset, nb_entries, cluster_size):
254 self.cluster_size = cluster_size
255 position = fd.tell()
256 fd.seek(offset)
257 self.entries = [Qcow2BitmapTableEntry(fd) for _ in range(nb_entries)]
258 fd.seek(position)
260 def dump(self):
261 bitmap_table = enumerate(self.entries)
262 print(f'{"Bitmap table":<14} {"type":<15} {"size":<12} {"offset"}')
263 for i, entry in bitmap_table:
264 if entry.type == 'serialized':
265 size = self.cluster_size
266 else:
267 size = 0
268 print(f'{i:<14} {entry.type:<15} {size:<12} {entry.offset}')
270 def to_json(self):
271 return self.entries
274 QCOW2_EXT_MAGIC_BITMAPS = 0x23852875
277 class QcowHeaderExtension(Qcow2Struct):
279 class Magic(Enum):
280 mapping = {
281 0xe2792aca: 'Backing format',
282 0x6803f857: 'Feature table',
283 0x0537be77: 'Crypto header',
284 QCOW2_EXT_MAGIC_BITMAPS: 'Bitmaps',
285 0x44415441: 'Data file'
288 def to_json(self):
289 return self.mapping.get(self.value, "<unknown>")
291 fields = (
292 ('u32', Magic, 'magic'),
293 ('u32', '{}', 'length')
294 # length bytes of data follows
295 # then padding to next multiply of 8
298 def __init__(self, magic=None, length=None, data=None, fd=None,
299 cluster_size=None):
301 Support both loading from fd and creation from user data.
302 For fd-based creation current position in a file will be used to read
303 the data.
304 The cluster_size value may be obtained by dependent structures.
306 This should be somehow refactored and functionality should be moved to
307 superclass (to allow creation of any qcow2 struct), but then, fields
308 of variable length (data here) should be supported in base class
309 somehow. Note also, that we probably want to parse different
310 extensions. Should they be subclasses of this class, or how to do it
311 better? Should it be something like QAPI union with discriminator field
312 (magic here). So, it's a TODO. We'll see how to properly refactor this
313 when we have more qcow2 structures.
315 if fd is None:
316 assert all(v is not None for v in (magic, length, data))
317 self.magic = magic
318 self.length = length
319 if length % 8 != 0:
320 padding = 8 - (length % 8)
321 data += b'\0' * padding
322 self.data = data
323 else:
324 assert all(v is None for v in (magic, length, data))
325 super().__init__(fd=fd)
326 if self.magic == QCOW2_EXT_MAGIC_BITMAPS:
327 self.obj = Qcow2BitmapExt(fd=fd, cluster_size=cluster_size)
328 self.data = None
329 else:
330 padded = (self.length + 7) & ~7
331 self.data = fd.read(padded)
332 assert self.data is not None
333 self.obj = None
335 if self.data is not None:
336 data_str = self.data[:self.length]
337 if all(c in string.printable.encode(
338 'ascii') for c in data_str):
339 data_str = f"'{ data_str.decode('ascii') }'"
340 else:
341 data_str = '<binary>'
342 self.data_str = data_str
345 def dump(self):
346 super().dump()
348 if self.obj is None:
349 print(f'{"data":<25} {self.data_str}')
350 else:
351 self.obj.dump()
353 def to_json(self):
354 # Put the name ahead of the dict
355 res = {'name': self.Magic(self.magic), **super().to_json()}
356 if self.obj is not None:
357 res['data'] = self.obj
358 else:
359 res['data_str'] = self.data_str
361 return res
363 @classmethod
364 def create(cls, magic, data):
365 return QcowHeaderExtension(magic, len(data), data)
368 class QcowHeader(Qcow2Struct):
370 fields = (
371 # Version 2 header fields
372 ('u32', '{:#x}', 'magic'),
373 ('u32', '{}', 'version'),
374 ('u64', '{:#x}', 'backing_file_offset'),
375 ('u32', '{:#x}', 'backing_file_size'),
376 ('u32', '{}', 'cluster_bits'),
377 ('u64', '{}', 'size'),
378 ('u32', '{}', 'crypt_method'),
379 ('u32', '{}', 'l1_size'),
380 ('u64', '{:#x}', 'l1_table_offset'),
381 ('u64', '{:#x}', 'refcount_table_offset'),
382 ('u32', '{}', 'refcount_table_clusters'),
383 ('u32', '{}', 'nb_snapshots'),
384 ('u64', '{:#x}', 'snapshot_offset'),
386 # Version 3 header fields
387 ('u64', Flags64, 'incompatible_features'),
388 ('u64', Flags64, 'compatible_features'),
389 ('u64', Flags64, 'autoclear_features'),
390 ('u32', '{}', 'refcount_order'),
391 ('u32', '{}', 'header_length'),
394 def __init__(self, fd):
395 super().__init__(fd=fd, offset=0)
397 self.set_defaults()
398 self.cluster_size = 1 << self.cluster_bits
400 fd.seek(self.header_length)
401 self.load_extensions(fd)
403 if self.backing_file_offset:
404 fd.seek(self.backing_file_offset)
405 self.backing_file = fd.read(self.backing_file_size)
406 else:
407 self.backing_file = None
409 def set_defaults(self):
410 if self.version == 2:
411 self.incompatible_features = 0
412 self.compatible_features = 0
413 self.autoclear_features = 0
414 self.refcount_order = 4
415 self.header_length = 72
417 def load_extensions(self, fd):
418 self.extensions = []
420 if self.backing_file_offset != 0:
421 end = min(self.cluster_size, self.backing_file_offset)
422 else:
423 end = self.cluster_size
425 while fd.tell() < end:
426 ext = QcowHeaderExtension(fd=fd, cluster_size=self.cluster_size)
427 if ext.magic == 0:
428 break
429 else:
430 self.extensions.append(ext)
432 def update_extensions(self, fd):
434 fd.seek(self.header_length)
435 extensions = self.extensions
436 extensions.append(QcowHeaderExtension(0, 0, b''))
437 for ex in extensions:
438 buf = struct.pack('>II', ex.magic, ex.length)
439 fd.write(buf)
440 fd.write(ex.data)
442 if self.backing_file is not None:
443 self.backing_file_offset = fd.tell()
444 fd.write(self.backing_file)
446 if fd.tell() > self.cluster_size:
447 raise Exception('I think I just broke the image...')
449 def update(self, fd):
450 header_bytes = self.header_length
452 self.update_extensions(fd)
454 fd.seek(0)
455 header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields)
456 buf = struct.pack(QcowHeader.fmt, *header)
457 buf = buf[0:header_bytes-1]
458 fd.write(buf)
460 def dump_extensions(self, is_json=False):
461 if is_json:
462 print(json.dumps(self.extensions, indent=4, cls=ComplexEncoder))
463 return
465 for ex in self.extensions:
466 print('Header extension:')
467 ex.dump()
468 print()