#!/usr/bin/env python # # Tests for shrinking images # # Copyright (c) 2016-2017 Parallels International GmbH # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import os, random, iotests, struct, qcow2, sys from iotests import qemu_img, qemu_io, image_size if sys.version_info.major == 2: range = xrange test_img = os.path.join(iotests.test_dir, 'test.img') check_img = os.path.join(iotests.test_dir, 'check.img') def size_to_int(str): suff = ['B', 'K', 'M', 'G', 'T'] return int(str[:-1]) * 1024**suff.index(str[-1:]) class ShrinkBaseClass(iotests.QMPTestCase): image_len = '128M' shrink_size = '10M' chunk_size = '16M' refcount_bits = '16' def __qcow2_check(self, filename): entry_bits = 3 entry_size = 1 << entry_bits l1_mask = 0x00fffffffffffe00 div_roundup = lambda n, d: (n + d - 1) // d def split_by_n(data, n): for x in range(0, len(data), n): yield struct.unpack('>Q', data[x:x + n])[0] & l1_mask def check_l1_table(h, l1_data): l1_list = list(split_by_n(l1_data, entry_size)) real_l1_size = div_roundup(h.size, 1 << (h.cluster_bits*2 - entry_size)) used, unused = l1_list[:real_l1_size], l1_list[real_l1_size:] self.assertTrue(len(used) != 0, "Verifying l1 table content") self.assertFalse(any(unused), "Verifying l1 table content") def check_reftable(fd, h, reftable): for offset in split_by_n(reftable, entry_size): if offset != 0: fd.seek(offset) cluster = fd.read(1 << h.cluster_bits) self.assertTrue(any(cluster), "Verifying reftable content") with open(filename, "rb") as fd: h = qcow2.QcowHeader(fd) fd.seek(h.l1_table_offset) l1_table = fd.read(h.l1_size << entry_bits) fd.seek(h.refcount_table_offset) reftable = fd.read(h.refcount_table_clusters << h.cluster_bits) check_l1_table(h, l1_table) check_reftable(fd, h, reftable) def __raw_check(self, filename): pass image_check = { 'qcow2' : __qcow2_check, 'raw' : __raw_check } def setUp(self): if iotests.imgfmt == 'raw': qemu_img('create', '-f', iotests.imgfmt, test_img, self.image_len) qemu_img('create', '-f', iotests.imgfmt, check_img, self.shrink_size) else: qemu_img('create', '-f', iotests.imgfmt, '-o', 'cluster_size=' + self.cluster_size + ',refcount_bits=' + self.refcount_bits, test_img, self.image_len) qemu_img('create', '-f', iotests.imgfmt, '-o', 'cluster_size=%s'% self.cluster_size, check_img, self.shrink_size) qemu_io('-c', 'write -P 0xff 0 ' + self.shrink_size, check_img) def tearDown(self): os.remove(test_img) os.remove(check_img) def image_verify(self): self.assertEqual(image_size(test_img), image_size(check_img), "Verifying image size") self.image_check[iotests.imgfmt](self, test_img) if iotests.imgfmt == 'raw': return self.assertEqual(qemu_img('check', test_img), 0, "Verifying image corruption") def test_empty_image(self): qemu_img('resize', '-f', iotests.imgfmt, '--shrink', test_img, self.shrink_size) self.assertEqual( qemu_io('-c', 'read -P 0x00 %s'%self.shrink_size, test_img), qemu_io('-c', 'read -P 0x00 %s'%self.shrink_size, check_img), "Verifying image content") self.image_verify() def test_sequential_write(self): for offs in range(0, size_to_int(self.image_len), size_to_int(self.chunk_size)): qemu_io('-c', 'write -P 0xff %d %s' % (offs, self.chunk_size), test_img) qemu_img('resize', '-f', iotests.imgfmt, '--shrink', test_img, self.shrink_size) self.assertEqual(qemu_img("compare", test_img, check_img), 0, "Verifying image content") self.image_verify() def test_random_write(self): offs_list = list(range(0, size_to_int(self.image_len), size_to_int(self.chunk_size))) random.shuffle(offs_list) for offs in offs_list: qemu_io('-c', 'write -P 0xff %d %s' % (offs, self.chunk_size), test_img) qemu_img('resize', '-f', iotests.imgfmt, '--shrink', test_img, self.shrink_size) self.assertEqual(qemu_img("compare", test_img, check_img), 0, "Verifying image content") self.image_verify() class TestShrink512(ShrinkBaseClass): image_len = '3M' shrink_size = '1M' chunk_size = '256K' cluster_size = '512' refcount_bits = '64' class TestShrink64K(ShrinkBaseClass): cluster_size = '64K' class TestShrink1M(ShrinkBaseClass): cluster_size = '1M' refcount_bits = '1' ShrinkBaseClass = None if __name__ == '__main__': iotests.main(supported_fmts=['raw', 'qcow2'], supported_protocols=['file'])