Merge remote-tracking branch 'remotes/mdroth/tags/qga-pull-2019-11-04-tag' into staging
[qemu/ar7.git] / tests / qemu-iotests / qed.py
blob8adaaf46c4ace924e6584cfaef19a13b3b593f22
1 #!/usr/bin/env python
3 # Tool to manipulate QED image files
5 # Copyright (C) 2010 IBM, Corp.
7 # Authors:
8 # Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
10 # This work is licensed under the terms of the GNU GPL, version 2 or later.
11 # See the COPYING file in the top-level directory.
13 from __future__ import print_function
14 import sys
15 import struct
16 import random
17 import optparse
19 # This can be used as a module
20 __all__ = ['QED_F_NEED_CHECK', 'QED']
22 QED_F_NEED_CHECK = 0x02
24 header_fmt = '<IIIIQQQQQII'
25 header_size = struct.calcsize(header_fmt)
26 field_names = ['magic', 'cluster_size', 'table_size',
27 'header_size', 'features', 'compat_features',
28 'autoclear_features', 'l1_table_offset', 'image_size',
29 'backing_filename_offset', 'backing_filename_size']
30 table_elem_fmt = '<Q'
31 table_elem_size = struct.calcsize(table_elem_fmt)
33 def err(msg):
34 sys.stderr.write(msg + '\n')
35 sys.exit(1)
37 def unpack_header(s):
38 fields = struct.unpack(header_fmt, s)
39 return dict((field_names[idx], val) for idx, val in enumerate(fields))
41 def pack_header(header):
42 fields = tuple(header[x] for x in field_names)
43 return struct.pack(header_fmt, *fields)
45 def unpack_table_elem(s):
46 return struct.unpack(table_elem_fmt, s)[0]
48 def pack_table_elem(elem):
49 return struct.pack(table_elem_fmt, elem)
51 class QED(object):
52 def __init__(self, f):
53 self.f = f
55 self.f.seek(0, 2)
56 self.filesize = f.tell()
58 self.load_header()
59 self.load_l1_table()
61 def raw_pread(self, offset, size):
62 self.f.seek(offset)
63 return self.f.read(size)
65 def raw_pwrite(self, offset, data):
66 self.f.seek(offset)
67 return self.f.write(data)
69 def load_header(self):
70 self.header = unpack_header(self.raw_pread(0, header_size))
72 def store_header(self):
73 self.raw_pwrite(0, pack_header(self.header))
75 def read_table(self, offset):
76 size = self.header['table_size'] * self.header['cluster_size']
77 s = self.raw_pread(offset, size)
78 table = [unpack_table_elem(s[i:i + table_elem_size]) for i in xrange(0, size, table_elem_size)]
79 return table
81 def load_l1_table(self):
82 self.l1_table = self.read_table(self.header['l1_table_offset'])
83 self.table_nelems = self.header['table_size'] * self.header['cluster_size'] // table_elem_size
85 def write_table(self, offset, table):
86 s = ''.join(pack_table_elem(x) for x in table)
87 self.raw_pwrite(offset, s)
89 def random_table_item(table):
90 vals = [(index, offset) for index, offset in enumerate(table) if offset != 0]
91 if not vals:
92 err('cannot pick random item because table is empty')
93 return random.choice(vals)
95 def corrupt_table_duplicate(table):
96 '''Corrupt a table by introducing a duplicate offset'''
97 victim_idx, victim_val = random_table_item(table)
98 unique_vals = set(table)
99 if len(unique_vals) == 1:
100 err('no duplication corruption possible in table')
101 dup_val = random.choice(list(unique_vals.difference([victim_val])))
102 table[victim_idx] = dup_val
104 def corrupt_table_invalidate(qed, table):
105 '''Corrupt a table by introducing an invalid offset'''
106 index, _ = random_table_item(table)
107 table[index] = qed.filesize + random.randint(0, 100 * 1024 * 1024 * 1024 * 1024)
109 def cmd_show(qed, *args):
110 '''show [header|l1|l2 <offset>]- Show header or l1/l2 tables'''
111 if not args or args[0] == 'header':
112 print(qed.header)
113 elif args[0] == 'l1':
114 print(qed.l1_table)
115 elif len(args) == 2 and args[0] == 'l2':
116 offset = int(args[1])
117 print(qed.read_table(offset))
118 else:
119 err('unrecognized sub-command')
121 def cmd_duplicate(qed, table_level):
122 '''duplicate l1|l2 - Duplicate a random table element'''
123 if table_level == 'l1':
124 offset = qed.header['l1_table_offset']
125 table = qed.l1_table
126 elif table_level == 'l2':
127 _, offset = random_table_item(qed.l1_table)
128 table = qed.read_table(offset)
129 else:
130 err('unrecognized sub-command')
131 corrupt_table_duplicate(table)
132 qed.write_table(offset, table)
134 def cmd_invalidate(qed, table_level):
135 '''invalidate l1|l2 - Plant an invalid table element at random'''
136 if table_level == 'l1':
137 offset = qed.header['l1_table_offset']
138 table = qed.l1_table
139 elif table_level == 'l2':
140 _, offset = random_table_item(qed.l1_table)
141 table = qed.read_table(offset)
142 else:
143 err('unrecognized sub-command')
144 corrupt_table_invalidate(qed, table)
145 qed.write_table(offset, table)
147 def cmd_need_check(qed, *args):
148 '''need-check [on|off] - Test, set, or clear the QED_F_NEED_CHECK header bit'''
149 if not args:
150 print(bool(qed.header['features'] & QED_F_NEED_CHECK))
151 return
153 if args[0] == 'on':
154 qed.header['features'] |= QED_F_NEED_CHECK
155 elif args[0] == 'off':
156 qed.header['features'] &= ~QED_F_NEED_CHECK
157 else:
158 err('unrecognized sub-command')
159 qed.store_header()
161 def cmd_zero_cluster(qed, pos, *args):
162 '''zero-cluster <pos> [<n>] - Zero data clusters'''
163 pos, n = int(pos), 1
164 if args:
165 if len(args) != 1:
166 err('expected one argument')
167 n = int(args[0])
169 for i in xrange(n):
170 l1_index = pos // qed.header['cluster_size'] // len(qed.l1_table)
171 if qed.l1_table[l1_index] == 0:
172 err('no l2 table allocated')
174 l2_offset = qed.l1_table[l1_index]
175 l2_table = qed.read_table(l2_offset)
177 l2_index = (pos // qed.header['cluster_size']) % len(qed.l1_table)
178 l2_table[l2_index] = 1 # zero the data cluster
179 qed.write_table(l2_offset, l2_table)
180 pos += qed.header['cluster_size']
182 def cmd_copy_metadata(qed, outfile):
183 '''copy-metadata <outfile> - Copy metadata only (for scrubbing corrupted images)'''
184 out = open(outfile, 'wb')
186 # Match file size
187 out.seek(qed.filesize - 1)
188 out.write('\0')
190 # Copy header clusters
191 out.seek(0)
192 header_size_bytes = qed.header['header_size'] * qed.header['cluster_size']
193 out.write(qed.raw_pread(0, header_size_bytes))
195 # Copy L1 table
196 out.seek(qed.header['l1_table_offset'])
197 s = ''.join(pack_table_elem(x) for x in qed.l1_table)
198 out.write(s)
200 # Copy L2 tables
201 for l2_offset in qed.l1_table:
202 if l2_offset == 0:
203 continue
204 l2_table = qed.read_table(l2_offset)
205 out.seek(l2_offset)
206 s = ''.join(pack_table_elem(x) for x in l2_table)
207 out.write(s)
209 out.close()
211 def usage():
212 print('Usage: %s <file> <cmd> [<arg>, ...]' % sys.argv[0])
213 print()
214 print('Supported commands:')
215 for cmd in sorted(x for x in globals() if x.startswith('cmd_')):
216 print(globals()[cmd].__doc__)
217 sys.exit(1)
219 def main():
220 if len(sys.argv) < 3:
221 usage()
222 filename, cmd = sys.argv[1:3]
224 cmd = 'cmd_' + cmd.replace('-', '_')
225 if cmd not in globals():
226 usage()
228 qed = QED(open(filename, 'r+b'))
229 try:
230 globals()[cmd](qed, *sys.argv[3:])
231 except TypeError as e:
232 sys.stderr.write(globals()[cmd].__doc__ + '\n')
233 sys.exit(1)
235 if __name__ == '__main__':
236 main()