1 # Library for manipulations with qcow2 image
3 # Copyright (c) 2020 Virtuozzo International GmbH.
4 # Copyright (C) 2012 Red Hat, Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 class ComplexEncoder(json
.JSONEncoder
):
26 def default(self
, obj
):
27 if hasattr(obj
, 'to_json'):
30 return json
.JSONEncoder
.default(self
, obj
)
35 def __init__(self
, value
):
39 return str(self
.value
)
42 class Flags64(Qcow2Field
):
47 if self
.value
& (1 << bit
):
52 class BitmapFlags(Qcow2Field
):
62 flag
= self
.value
& (1 << bit
)
64 bits
.append(self
.flags
.get(flag
, f
'bit-{bit}'))
65 return f
'{self.value:#x} ({bits})'
68 class Enum(Qcow2Field
):
71 return f
'{self.value:#x} ({self.mapping.get(self.value, "<unknown>")})'
74 class Qcow2StructMeta(type):
76 # Mapping from c types to python struct format
84 def __init__(self
, name
, bases
, attrs
):
86 self
.fmt
= '>' + ''.join(self
.ctypes
[f
[0]] for f
in self
.fields
)
89 class Qcow2Struct(metaclass
=Qcow2StructMeta
):
91 """Qcow2Struct: base class for qcow2 data structures
93 Successors should define fields class variable, which is: list of tuples,
94 each of three elements:
95 - c-type (one of 'u8', 'u16', 'u32', 'u64')
96 - format (format_spec to use with .format() when dump or 'mask' to dump
101 def __init__(self
, fd
=None, offset
=None, data
=None):
104 1. Specify data. fd and offset must be None.
105 2. Specify fd and offset, data must be None. offset may be omitted
106 in this case, than current position of fd is used.
109 assert fd
is not None
110 buf_size
= struct
.calcsize(self
.fmt
)
111 if offset
is not None:
113 data
= fd
.read(buf_size
)
115 assert fd
is None and offset
is None
117 values
= struct
.unpack(self
.fmt
, data
)
118 self
.__dict
__ = dict((field
[2], values
[i
])
119 for i
, field
in enumerate(self
.fields
))
121 def dump(self
, is_json
=False):
123 print(json
.dumps(self
.to_json(), indent
=4, cls
=ComplexEncoder
))
126 for f
in self
.fields
:
127 value
= self
.__dict
__[f
[2]]
128 if isinstance(f
[1], str):
129 value_str
= f
[1].format(value
)
131 value_str
= str(f
[1](value
))
133 print('{:<25} {}'.format(f
[2], value_str
))
136 return dict((f
[2], self
.__dict
__[f
[2]]) for f
in self
.fields
)
139 class Qcow2BitmapExt(Qcow2Struct
):
142 ('u32', '{}', 'nb_bitmaps'),
143 ('u32', '{}', 'reserved32'),
144 ('u64', '{:#x}', 'bitmap_directory_size'),
145 ('u64', '{:#x}', 'bitmap_directory_offset')
148 def __init__(self
, fd
, cluster_size
):
149 super().__init
__(fd
=fd
)
150 tail
= struct
.calcsize(self
.fmt
) % 8
154 self
.cluster_size
= cluster_size
155 self
.read_bitmap_directory(fd
)
158 def read_bitmap_directory(self
, fd
):
159 fd
.seek(self
.bitmap_directory_offset
)
160 self
.bitmap_directory
= \
161 [Qcow2BitmapDirEntry(fd
, cluster_size
=self
.cluster_size
)
162 for _
in range(self
.nb_bitmaps
)]
166 for entry
in self
.bitmap_directory
:
171 fields_dict
= super().to_json()
172 fields_dict
['bitmap_directory'] = self
.bitmap_directory
176 class Qcow2BitmapDirEntry(Qcow2Struct
):
179 ('u64', '{:#x}', 'bitmap_table_offset'),
180 ('u32', '{}', 'bitmap_table_size'),
181 ('u32', BitmapFlags
, 'flags'),
182 ('u8', '{}', 'type'),
183 ('u8', '{}', 'granularity_bits'),
184 ('u16', '{}', 'name_size'),
185 ('u32', '{}', 'extra_data_size')
188 def __init__(self
, fd
, cluster_size
):
189 super().__init
__(fd
=fd
)
190 self
.cluster_size
= cluster_size
191 # Seek relative to the current position in the file
192 fd
.seek(self
.extra_data_size
, 1)
193 bitmap_name
= fd
.read(self
.name_size
)
194 self
.name
= bitmap_name
.decode('ascii')
195 # Move position to the end of the entry in the directory
196 entry_raw_size
= self
.bitmap_dir_entry_raw_size()
197 padding
= ((entry_raw_size
+ 7) & ~
7) - entry_raw_size
199 self
.bitmap_table
= Qcow2BitmapTable(fd
=fd
,
200 offset
=self
.bitmap_table_offset
,
201 nb_entries
=self
.bitmap_table_size
,
202 cluster_size
=self
.cluster_size
)
204 def bitmap_dir_entry_raw_size(self
):
205 return struct
.calcsize(self
.fmt
) + self
.name_size
+ \
209 print(f
'{"Bitmap name":<25} {self.name}')
210 super(Qcow2BitmapDirEntry
, self
).dump()
211 self
.bitmap_table
.dump()
214 # Put the name ahead of the dict
218 'bitmap_table': self
.bitmap_table
222 class Qcow2BitmapTableEntry(Qcow2Struct
):
225 ('u64', '{}', 'entry'),
228 BME_TABLE_ENTRY_RESERVED_MASK
= 0xff000000000001fe
229 BME_TABLE_ENTRY_OFFSET_MASK
= 0x00fffffffffffe00
230 BME_TABLE_ENTRY_FLAG_ALL_ONES
= 1
232 def __init__(self
, fd
):
233 super().__init
__(fd
=fd
)
234 self
.reserved
= self
.entry
& self
.BME_TABLE_ENTRY_RESERVED_MASK
235 self
.offset
= self
.entry
& self
.BME_TABLE_ENTRY_OFFSET_MASK
237 if self
.entry
& self
.BME_TABLE_ENTRY_FLAG_ALL_ONES
:
238 self
.type = 'invalid'
240 self
.type = 'serialized'
241 elif self
.entry
& self
.BME_TABLE_ENTRY_FLAG_ALL_ONES
:
242 self
.type = 'all-ones'
244 self
.type = 'all-zeroes'
247 return {'type': self
.type, 'offset': self
.offset
,
248 'reserved': self
.reserved
}
251 class Qcow2BitmapTable
:
253 def __init__(self
, fd
, offset
, nb_entries
, cluster_size
):
254 self
.cluster_size
= cluster_size
257 self
.entries
= [Qcow2BitmapTableEntry(fd
) for _
in range(nb_entries
)]
261 bitmap_table
= enumerate(self
.entries
)
262 print(f
'{"Bitmap table":<14} {"type":<15} {"size":<12} {"offset"}')
263 for i
, entry
in bitmap_table
:
264 if entry
.type == 'serialized':
265 size
= self
.cluster_size
268 print(f
'{i:<14} {entry.type:<15} {size:<12} {entry.offset}')
274 QCOW2_EXT_MAGIC_BITMAPS
= 0x23852875
277 class QcowHeaderExtension(Qcow2Struct
):
281 0xe2792aca: 'Backing format',
282 0x6803f857: 'Feature table',
283 0x0537be77: 'Crypto header',
284 QCOW2_EXT_MAGIC_BITMAPS
: 'Bitmaps',
285 0x44415441: 'Data file'
289 return self
.mapping
.get(self
.value
, "<unknown>")
292 ('u32', Magic
, 'magic'),
293 ('u32', '{}', 'length')
294 # length bytes of data follows
295 # then padding to next multiply of 8
298 def __init__(self
, magic
=None, length
=None, data
=None, fd
=None,
301 Support both loading from fd and creation from user data.
302 For fd-based creation current position in a file will be used to read
304 The cluster_size value may be obtained by dependent structures.
306 This should be somehow refactored and functionality should be moved to
307 superclass (to allow creation of any qcow2 struct), but then, fields
308 of variable length (data here) should be supported in base class
309 somehow. Note also, that we probably want to parse different
310 extensions. Should they be subclasses of this class, or how to do it
311 better? Should it be something like QAPI union with discriminator field
312 (magic here). So, it's a TODO. We'll see how to properly refactor this
313 when we have more qcow2 structures.
316 assert all(v
is not None for v
in (magic
, length
, data
))
320 padding
= 8 - (length
% 8)
321 data
+= b
'\0' * padding
324 assert all(v
is None for v
in (magic
, length
, data
))
325 super().__init
__(fd
=fd
)
326 if self
.magic
== QCOW2_EXT_MAGIC_BITMAPS
:
327 self
.obj
= Qcow2BitmapExt(fd
=fd
, cluster_size
=cluster_size
)
330 padded
= (self
.length
+ 7) & ~
7
331 self
.data
= fd
.read(padded
)
332 assert self
.data
is not None
335 if self
.data
is not None:
336 data_str
= self
.data
[:self
.length
]
337 if all(c
in string
.printable
.encode(
338 'ascii') for c
in data_str
):
339 data_str
= f
"'{ data_str.decode('ascii') }'"
341 data_str
= '<binary>'
342 self
.data_str
= data_str
349 print(f
'{"data":<25} {self.data_str}')
354 # Put the name ahead of the dict
355 res
= {'name': self
.Magic(self
.magic
), **super().to_json()}
356 if self
.obj
is not None:
357 res
['data'] = self
.obj
359 res
['data_str'] = self
.data_str
364 def create(cls
, magic
, data
):
365 return QcowHeaderExtension(magic
, len(data
), data
)
368 class QcowHeader(Qcow2Struct
):
371 # Version 2 header fields
372 ('u32', '{:#x}', 'magic'),
373 ('u32', '{}', 'version'),
374 ('u64', '{:#x}', 'backing_file_offset'),
375 ('u32', '{:#x}', 'backing_file_size'),
376 ('u32', '{}', 'cluster_bits'),
377 ('u64', '{}', 'size'),
378 ('u32', '{}', 'crypt_method'),
379 ('u32', '{}', 'l1_size'),
380 ('u64', '{:#x}', 'l1_table_offset'),
381 ('u64', '{:#x}', 'refcount_table_offset'),
382 ('u32', '{}', 'refcount_table_clusters'),
383 ('u32', '{}', 'nb_snapshots'),
384 ('u64', '{:#x}', 'snapshot_offset'),
386 # Version 3 header fields
387 ('u64', Flags64
, 'incompatible_features'),
388 ('u64', Flags64
, 'compatible_features'),
389 ('u64', Flags64
, 'autoclear_features'),
390 ('u32', '{}', 'refcount_order'),
391 ('u32', '{}', 'header_length'),
394 def __init__(self
, fd
):
395 super().__init
__(fd
=fd
, offset
=0)
398 self
.cluster_size
= 1 << self
.cluster_bits
400 fd
.seek(self
.header_length
)
401 self
.load_extensions(fd
)
403 if self
.backing_file_offset
:
404 fd
.seek(self
.backing_file_offset
)
405 self
.backing_file
= fd
.read(self
.backing_file_size
)
407 self
.backing_file
= None
409 def set_defaults(self
):
410 if self
.version
== 2:
411 self
.incompatible_features
= 0
412 self
.compatible_features
= 0
413 self
.autoclear_features
= 0
414 self
.refcount_order
= 4
415 self
.header_length
= 72
417 def load_extensions(self
, fd
):
420 if self
.backing_file_offset
!= 0:
421 end
= min(self
.cluster_size
, self
.backing_file_offset
)
423 end
= self
.cluster_size
425 while fd
.tell() < end
:
426 ext
= QcowHeaderExtension(fd
=fd
, cluster_size
=self
.cluster_size
)
430 self
.extensions
.append(ext
)
432 def update_extensions(self
, fd
):
434 fd
.seek(self
.header_length
)
435 extensions
= self
.extensions
436 extensions
.append(QcowHeaderExtension(0, 0, b
''))
437 for ex
in extensions
:
438 buf
= struct
.pack('>II', ex
.magic
, ex
.length
)
442 if self
.backing_file
is not None:
443 self
.backing_file_offset
= fd
.tell()
444 fd
.write(self
.backing_file
)
446 if fd
.tell() > self
.cluster_size
:
447 raise Exception('I think I just broke the image...')
449 def update(self
, fd
):
450 header_bytes
= self
.header_length
452 self
.update_extensions(fd
)
455 header
= tuple(self
.__dict
__[f
] for t
, p
, f
in QcowHeader
.fields
)
456 buf
= struct
.pack(QcowHeader
.fmt
, *header
)
457 buf
= buf
[0:header_bytes
-1]
460 def dump_extensions(self
, is_json
=False):
462 print(json
.dumps(self
.extensions
, indent
=4, cls
=ComplexEncoder
))
465 for ex
in self
.extensions
:
466 print('Header extension:')