68474776f3
In Python 3, several functions now return iterators instead of lists. This includes range(), items(), map(), and filter(). This means that if we really want a list, we have to wrap those instances with list(). But then again, the two instances where this is the case for map() and filter(), there are shorter expressions which work without either function. On the other hand, sometimes we do just want an iterator, in which case we have sometimes used xrange() and iteritems() which no longer exist in Python 3. Just change these calls to be range() and items(), works in both Python 2 and 3, and is really what we want in 3 (which is what matters). But because it is so simple to do (and to find and remove once we completely switch to Python 3), make range() be an alias for xrange() in the two affected tests (044 and 163). In one instance, we only wanted the first instance of the result of a filter() call. Instead of using next(filter()) which would work only in Python 3, or list(filter())[0] which would work everywhere but is a bit weird, this instance is changed to use a generator expression with a next() wrapped around, which works both in 2.7 and 3. Signed-off-by: Max Reitz <mreitz@redhat.com> Reviewed-by: Eduardo Habkost <ehabkost@redhat.com> Reviewed-by: Cleber Rosa <crosa@redhat.com> Message-Id: <20181022135307.14398-6-mreitz@redhat.com> Signed-off-by: Eduardo Habkost <ehabkost@redhat.com>
362 lines
15 KiB
Python
Executable File
362 lines
15 KiB
Python
Executable File
#!/usr/bin/env python
|
|
#
|
|
# Test cases for the QMP 'blockdev-del' command
|
|
#
|
|
# Copyright (C) 2015 Igalia, S.L.
|
|
# Author: Alberto Garcia <berto@igalia.com>
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
import os
|
|
import iotests
|
|
import time
|
|
|
|
base_img = os.path.join(iotests.test_dir, 'base.img')
|
|
new_img = os.path.join(iotests.test_dir, 'new.img')
|
|
if iotests.qemu_default_machine == 's390-ccw-virtio':
|
|
default_virtio_blk = 'virtio-blk-ccw'
|
|
else:
|
|
default_virtio_blk = 'virtio-blk-pci'
|
|
|
|
class TestBlockdevDel(iotests.QMPTestCase):
|
|
|
|
def setUp(self):
|
|
iotests.qemu_img('create', '-f', iotests.imgfmt, base_img, '1M')
|
|
self.vm = iotests.VM()
|
|
if iotests.qemu_default_machine == 's390-ccw-virtio':
|
|
self.vm.add_device("virtio-scsi-ccw,id=virtio-scsi")
|
|
else:
|
|
self.vm.add_device("virtio-scsi-pci,id=virtio-scsi")
|
|
|
|
self.vm.launch()
|
|
|
|
def tearDown(self):
|
|
self.vm.shutdown()
|
|
os.remove(base_img)
|
|
if os.path.isfile(new_img):
|
|
os.remove(new_img)
|
|
|
|
# Check whether a BlockDriverState exists
|
|
def checkBlockDriverState(self, node, must_exist = True):
|
|
result = self.vm.qmp('query-named-block-nodes')
|
|
nodes = [x for x in result['return'] if x['node-name'] == node]
|
|
self.assertLessEqual(len(nodes), 1)
|
|
self.assertEqual(must_exist, len(nodes) == 1)
|
|
|
|
# Add a BlockDriverState without a BlockBackend
|
|
def addBlockDriverState(self, node):
|
|
file_node = '%s_file' % node
|
|
self.checkBlockDriverState(node, False)
|
|
self.checkBlockDriverState(file_node, False)
|
|
opts = {'driver': iotests.imgfmt,
|
|
'node-name': node,
|
|
'file': {'driver': 'file',
|
|
'node-name': file_node,
|
|
'filename': base_img}}
|
|
result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(file_node)
|
|
|
|
# Add a BlockDriverState that will be used as overlay for the base_img BDS
|
|
def addBlockDriverStateOverlay(self, node):
|
|
self.checkBlockDriverState(node, False)
|
|
iotests.qemu_img('create', '-u', '-f', iotests.imgfmt,
|
|
'-b', base_img, new_img, '1M')
|
|
opts = {'driver': iotests.imgfmt,
|
|
'node-name': node,
|
|
'backing': None,
|
|
'file': {'driver': 'file',
|
|
'filename': new_img}}
|
|
result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
|
|
# Delete a BlockDriverState
|
|
def delBlockDriverState(self, node, expect_error = False):
|
|
self.checkBlockDriverState(node)
|
|
result = self.vm.qmp('blockdev-del', node_name = node)
|
|
if expect_error:
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
else:
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node, expect_error)
|
|
|
|
# Add a device model
|
|
def addDeviceModel(self, device, backend, driver = default_virtio_blk):
|
|
result = self.vm.qmp('device_add', id = device,
|
|
driver = driver, drive = backend)
|
|
self.assert_qmp(result, 'return', {})
|
|
|
|
# Delete a device model
|
|
def delDeviceModel(self, device, is_virtio_blk = True):
|
|
result = self.vm.qmp('device_del', id = device)
|
|
self.assert_qmp(result, 'return', {})
|
|
|
|
result = self.vm.qmp('system_reset')
|
|
self.assert_qmp(result, 'return', {})
|
|
|
|
if is_virtio_blk:
|
|
device_path = '/machine/peripheral/%s/virtio-backend' % device
|
|
event = self.vm.event_wait(name="DEVICE_DELETED",
|
|
match={'data': {'path': device_path}})
|
|
self.assertNotEqual(event, None)
|
|
|
|
event = self.vm.event_wait(name="DEVICE_DELETED",
|
|
match={'data': {'device': device}})
|
|
self.assertNotEqual(event, None)
|
|
|
|
# Remove a BlockDriverState
|
|
def ejectDrive(self, device, node, expect_error = False,
|
|
destroys_media = True):
|
|
self.checkBlockDriverState(node)
|
|
result = self.vm.qmp('eject', id = device)
|
|
if expect_error:
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
self.checkBlockDriverState(node)
|
|
else:
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node, not destroys_media)
|
|
|
|
# Insert a BlockDriverState
|
|
def insertDrive(self, device, node):
|
|
self.checkBlockDriverState(node)
|
|
result = self.vm.qmp('blockdev-insert-medium',
|
|
id = device, node_name = node)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
|
|
# Create a snapshot using 'blockdev-snapshot-sync'
|
|
def createSnapshotSync(self, node, overlay):
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(overlay, False)
|
|
opts = {'node-name': node,
|
|
'snapshot-file': new_img,
|
|
'snapshot-node-name': overlay,
|
|
'format': iotests.imgfmt}
|
|
result = self.vm.qmp('blockdev-snapshot-sync', conv_keys=False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(overlay)
|
|
|
|
# Create a snapshot using 'blockdev-snapshot'
|
|
def createSnapshot(self, node, overlay):
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(overlay)
|
|
result = self.vm.qmp('blockdev-snapshot',
|
|
node = node, overlay = overlay)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(overlay)
|
|
|
|
# Create a mirror
|
|
def createMirror(self, node, new_node):
|
|
self.checkBlockDriverState(new_node, False)
|
|
opts = {'device': node,
|
|
'job-id': node,
|
|
'target': new_img,
|
|
'node-name': new_node,
|
|
'sync': 'top',
|
|
'format': iotests.imgfmt}
|
|
result = self.vm.qmp('drive-mirror', conv_keys=False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(new_node)
|
|
|
|
# Complete an existing block job
|
|
def completeBlockJob(self, id, node_before, node_after):
|
|
result = self.vm.qmp('block-job-complete', device=id)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.wait_until_completed(id)
|
|
|
|
# Add a BlkDebug node
|
|
# Note that the purpose of this is to test the blockdev-del
|
|
# sanity checks, not to create a usable blkdebug drive
|
|
def addBlkDebug(self, debug, node):
|
|
self.checkBlockDriverState(node, False)
|
|
self.checkBlockDriverState(debug, False)
|
|
image = {'driver': iotests.imgfmt,
|
|
'node-name': node,
|
|
'file': {'driver': 'file',
|
|
'filename': base_img}}
|
|
opts = {'driver': 'blkdebug',
|
|
'node-name': debug,
|
|
'image': image}
|
|
result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(debug)
|
|
|
|
# Add a BlkVerify node
|
|
# Note that the purpose of this is to test the blockdev-del
|
|
# sanity checks, not to create a usable blkverify drive
|
|
def addBlkVerify(self, blkverify, test, raw):
|
|
self.checkBlockDriverState(test, False)
|
|
self.checkBlockDriverState(raw, False)
|
|
self.checkBlockDriverState(blkverify, False)
|
|
iotests.qemu_img('create', '-f', iotests.imgfmt, new_img, '1M')
|
|
node_0 = {'driver': iotests.imgfmt,
|
|
'node-name': test,
|
|
'file': {'driver': 'file',
|
|
'filename': base_img}}
|
|
node_1 = {'driver': iotests.imgfmt,
|
|
'node-name': raw,
|
|
'file': {'driver': 'file',
|
|
'filename': new_img}}
|
|
opts = {'driver': 'blkverify',
|
|
'node-name': blkverify,
|
|
'test': node_0,
|
|
'raw': node_1}
|
|
result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(test)
|
|
self.checkBlockDriverState(raw)
|
|
self.checkBlockDriverState(blkverify)
|
|
|
|
# Add a Quorum node
|
|
def addQuorum(self, quorum, child0, child1):
|
|
self.checkBlockDriverState(child0, False)
|
|
self.checkBlockDriverState(child1, False)
|
|
self.checkBlockDriverState(quorum, False)
|
|
iotests.qemu_img('create', '-f', iotests.imgfmt, new_img, '1M')
|
|
child_0 = {'driver': iotests.imgfmt,
|
|
'node-name': child0,
|
|
'file': {'driver': 'file',
|
|
'filename': base_img}}
|
|
child_1 = {'driver': iotests.imgfmt,
|
|
'node-name': child1,
|
|
'file': {'driver': 'file',
|
|
'filename': new_img}}
|
|
opts = {'driver': 'quorum',
|
|
'node-name': quorum,
|
|
'vote-threshold': 1,
|
|
'children': [ child_0, child_1 ]}
|
|
result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(child0)
|
|
self.checkBlockDriverState(child1)
|
|
self.checkBlockDriverState(quorum)
|
|
|
|
########################
|
|
# The tests start here #
|
|
########################
|
|
|
|
def testBlockDriverState(self):
|
|
self.addBlockDriverState('node0')
|
|
# You cannot delete a file BDS directly
|
|
self.delBlockDriverState('node0_file', expect_error = True)
|
|
self.delBlockDriverState('node0')
|
|
|
|
def testDeviceModel(self):
|
|
self.addBlockDriverState('node0')
|
|
self.addDeviceModel('device0', 'node0')
|
|
self.ejectDrive('device0', 'node0', expect_error = True)
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delDeviceModel('device0')
|
|
self.delBlockDriverState('node0')
|
|
|
|
def testAttachMedia(self):
|
|
# This creates a BlockBackend and removes its media
|
|
self.addBlockDriverState('node0')
|
|
self.addDeviceModel('device0', 'node0', 'scsi-cd')
|
|
self.ejectDrive('device0', 'node0', destroys_media = False)
|
|
self.delBlockDriverState('node0')
|
|
|
|
# This creates a new BlockDriverState and inserts it into the device
|
|
self.addBlockDriverState('node1')
|
|
self.insertDrive('device0', 'node1')
|
|
# The node can't be removed: the new device has an extra reference
|
|
self.delBlockDriverState('node1', expect_error = True)
|
|
# The BDS still exists after being ejected, but now it can be removed
|
|
self.ejectDrive('device0', 'node1', destroys_media = False)
|
|
self.delBlockDriverState('node1')
|
|
self.delDeviceModel('device0', False)
|
|
|
|
def testSnapshotSync(self):
|
|
self.addBlockDriverState('node0')
|
|
self.addDeviceModel('device0', 'node0')
|
|
self.createSnapshotSync('node0', 'overlay0')
|
|
# This fails because node0 is now being used as a backing image
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('overlay0', expect_error = True)
|
|
# This succeeds because device0 only has the backend reference
|
|
self.delDeviceModel('device0')
|
|
# FIXME Would still be there if blockdev-snapshot-sync took a ref
|
|
self.checkBlockDriverState('overlay0', False)
|
|
self.delBlockDriverState('node0')
|
|
|
|
def testSnapshot(self):
|
|
self.addBlockDriverState('node0')
|
|
self.addDeviceModel('device0', 'node0', 'scsi-cd')
|
|
self.addBlockDriverStateOverlay('overlay0')
|
|
self.createSnapshot('node0', 'overlay0')
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('overlay0', expect_error = True)
|
|
self.ejectDrive('device0', 'overlay0', destroys_media = False)
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('overlay0')
|
|
self.delBlockDriverState('node0')
|
|
|
|
def testMirror(self):
|
|
self.addBlockDriverState('node0')
|
|
self.addDeviceModel('device0', 'node0', 'scsi-cd')
|
|
self.createMirror('node0', 'mirror0')
|
|
# The block job prevents removing the device
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('mirror0', expect_error = True)
|
|
self.wait_ready('node0')
|
|
self.completeBlockJob('node0', 'node0', 'mirror0')
|
|
self.assert_no_active_block_jobs()
|
|
# This succeeds because the device now points to mirror0
|
|
self.delBlockDriverState('node0')
|
|
self.delBlockDriverState('mirror0', expect_error = True)
|
|
self.delDeviceModel('device0', False)
|
|
# FIXME mirror0 disappears, drive-mirror doesn't take a reference
|
|
#self.delBlockDriverState('mirror0')
|
|
|
|
def testBlkDebug(self):
|
|
self.addBlkDebug('debug0', 'node0')
|
|
# 'node0' is used by the blkdebug node
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
# But we can remove the blkdebug node directly
|
|
self.delBlockDriverState('debug0')
|
|
self.checkBlockDriverState('node0', False)
|
|
|
|
def testBlkVerify(self):
|
|
self.addBlkVerify('verify0', 'node0', 'node1')
|
|
# We cannot remove the children of a blkverify device
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('node1', expect_error = True)
|
|
# But we can remove the blkverify node directly
|
|
self.delBlockDriverState('verify0')
|
|
self.checkBlockDriverState('node0', False)
|
|
self.checkBlockDriverState('node1', False)
|
|
|
|
def testQuorum(self):
|
|
if not iotests.supports_quorum():
|
|
return
|
|
|
|
self.addQuorum('quorum0', 'node0', 'node1')
|
|
# We cannot remove the children of a Quorum device
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('node1', expect_error = True)
|
|
# But we can remove the Quorum node directly
|
|
self.delBlockDriverState('quorum0')
|
|
self.checkBlockDriverState('node0', False)
|
|
self.checkBlockDriverState('node1', False)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
iotests.main(supported_fmts=["qcow2"])
|