3 # Tests for shrinking images
5 # Copyright (c) 2016-2017 Parallels International GmbH
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 import os
, random
, iotests
, struct
, qcow2
, sys
22 from iotests
import qemu_img
, qemu_io
, image_size
24 if sys
.version_info
.major
== 2:
27 test_img
= os
.path
.join(iotests
.test_dir
, 'test.img')
28 check_img
= os
.path
.join(iotests
.test_dir
, 'check.img')
31 suff
= ['B', 'K', 'M', 'G', 'T']
32 return int(str[:-1]) * 1024**suff
.index(str[-1:])
34 class ShrinkBaseClass(iotests
.QMPTestCase
):
40 def __qcow2_check(self
, filename
):
42 entry_size
= 1 << entry_bits
43 l1_mask
= 0x00fffffffffffe00
44 div_roundup
= lambda n
, d
: (n
+ d
- 1) // d
46 def split_by_n(data
, n
):
47 for x
in range(0, len(data
), n
):
48 yield struct
.unpack('>Q', data
[x
:x
+ n
])[0] & l1_mask
50 def check_l1_table(h
, l1_data
):
51 l1_list
= list(split_by_n(l1_data
, entry_size
))
52 real_l1_size
= div_roundup(h
.size
,
53 1 << (h
.cluster_bits
*2 - entry_size
))
54 used
, unused
= l1_list
[:real_l1_size
], l1_list
[real_l1_size
:]
56 self
.assertTrue(len(used
) != 0, "Verifying l1 table content")
57 self
.assertFalse(any(unused
), "Verifying l1 table content")
59 def check_reftable(fd
, h
, reftable
):
60 for offset
in split_by_n(reftable
, entry_size
):
63 cluster
= fd
.read(1 << h
.cluster_bits
)
64 self
.assertTrue(any(cluster
), "Verifying reftable content")
66 with
open(filename
, "rb") as fd
:
67 h
= qcow2
.QcowHeader(fd
)
69 fd
.seek(h
.l1_table_offset
)
70 l1_table
= fd
.read(h
.l1_size
<< entry_bits
)
72 fd
.seek(h
.refcount_table_offset
)
73 reftable
= fd
.read(h
.refcount_table_clusters
<< h
.cluster_bits
)
75 check_l1_table(h
, l1_table
)
76 check_reftable(fd
, h
, reftable
)
78 def __raw_check(self
, filename
):
82 'qcow2' : __qcow2_check
,
87 if iotests
.imgfmt
== 'raw':
88 qemu_img('create', '-f', iotests
.imgfmt
, test_img
, self
.image_len
)
89 qemu_img('create', '-f', iotests
.imgfmt
, check_img
,
92 qemu_img('create', '-f', iotests
.imgfmt
,
93 '-o', 'cluster_size=' + self
.cluster_size
+
94 ',refcount_bits=' + self
.refcount_bits
,
95 test_img
, self
.image_len
)
96 qemu_img('create', '-f', iotests
.imgfmt
,
97 '-o', 'cluster_size=%s'% self
.cluster_size
,
98 check_img
, self
.shrink_size
)
99 qemu_io('-c', 'write -P 0xff 0 ' + self
.shrink_size
, check_img
)
105 def image_verify(self
):
106 self
.assertEqual(image_size(test_img
), image_size(check_img
),
107 "Verifying image size")
108 self
.image_check
[iotests
.imgfmt
](self
, test_img
)
110 if iotests
.imgfmt
== 'raw':
112 self
.assertEqual(qemu_img('check', test_img
), 0,
113 "Verifying image corruption")
115 def test_empty_image(self
):
116 qemu_img('resize', '-f', iotests
.imgfmt
, '--shrink', test_img
,
120 qemu_io('-c', 'read -P 0x00 %s'%self
.shrink_size
, test_img
),
121 qemu_io('-c', 'read -P 0x00 %s'%self
.shrink_size
, check_img
),
122 "Verifying image content")
126 def test_sequential_write(self
):
127 for offs
in range(0, size_to_int(self
.image_len
),
128 size_to_int(self
.chunk_size
)):
129 qemu_io('-c', 'write -P 0xff %d %s' % (offs
, self
.chunk_size
),
132 qemu_img('resize', '-f', iotests
.imgfmt
, '--shrink', test_img
,
135 self
.assertEqual(qemu_img("compare", test_img
, check_img
), 0,
136 "Verifying image content")
140 def test_random_write(self
):
141 offs_list
= list(range(0, size_to_int(self
.image_len
),
142 size_to_int(self
.chunk_size
)))
143 random
.shuffle(offs_list
)
144 for offs
in offs_list
:
145 qemu_io('-c', 'write -P 0xff %d %s' % (offs
, self
.chunk_size
),
148 qemu_img('resize', '-f', iotests
.imgfmt
, '--shrink', test_img
,
151 self
.assertEqual(qemu_img("compare", test_img
, check_img
), 0,
152 "Verifying image content")
156 class TestShrink512(ShrinkBaseClass
):
163 class TestShrink64K(ShrinkBaseClass
):
166 class TestShrink1M(ShrinkBaseClass
):
170 ShrinkBaseClass
= None
172 if __name__
== '__main__':
173 iotests
.main(supported_fmts
=['raw', 'qcow2'])