b6aed193e5
In many cases we just want an effect of qmp command and want to raise on failure. Use vm.cmd() method which does exactly this. The commit is generated by command git grep -l '\.qmp(' | xargs ./scripts/python_qmp_updater.py And then, fix self.assertRaises to expect ExecuteError exception in tests/qemu-iotests/124 Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@yandex-team.ru> Reviewed-by: Eric Blake <eblake@redhat.com> Message-id: 20231006154125.1068348-16-vsementsov@yandex-team.ru Signed-off-by: John Snow <jsnow@redhat.com>
280 lines
8.8 KiB
Python
Executable File
280 lines
8.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# group: rw
|
|
#
|
|
# Test case for encryption key management versus image sharing
|
|
#
|
|
# Copyright (C) 2019 Red Hat, Inc.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
import iotests
|
|
import os
|
|
import time
|
|
import json
|
|
|
|
test_img = os.path.join(iotests.test_dir, 'test.img')
|
|
|
|
class Secret:
|
|
def __init__(self, index):
|
|
self._id = "keysec" + str(index)
|
|
# you are not supposed to see the password...
|
|
self._secret = "hunter" + str(index)
|
|
|
|
def id(self):
|
|
return self._id
|
|
|
|
def secret(self):
|
|
return self._secret
|
|
|
|
def to_cmdline_object(self):
|
|
return [ "secret,id=" + self._id + ",data=" + self._secret]
|
|
|
|
def to_qmp_object(self):
|
|
return { "qom-type" : "secret", "id": self.id(),
|
|
"data": self.secret() }
|
|
|
|
################################################################################
|
|
|
|
class EncryptionSetupTestCase(iotests.QMPTestCase):
|
|
|
|
# test case startup
|
|
def setUp(self):
|
|
|
|
# start the VMs
|
|
self.vm1 = iotests.VM(path_suffix = 'VM1')
|
|
self.vm2 = iotests.VM(path_suffix = 'VM2')
|
|
self.vm1.launch()
|
|
self.vm2.launch()
|
|
|
|
# create the secrets and load 'em into the VMs
|
|
self.secrets = [ Secret(i) for i in range(0, 4) ]
|
|
for secret in self.secrets:
|
|
self.vm1.cmd("object-add", secret.to_qmp_object())
|
|
self.vm2.cmd("object-add", secret.to_qmp_object())
|
|
|
|
# test case shutdown
|
|
def tearDown(self):
|
|
# stop the VM
|
|
self.vm1.shutdown()
|
|
self.vm2.shutdown()
|
|
|
|
###########################################################################
|
|
# create the encrypted block device using qemu-img
|
|
def createImg(self, file, secret):
|
|
|
|
iotests.qemu_img(
|
|
'create',
|
|
'--object', *secret.to_cmdline_object(),
|
|
'-f', iotests.imgfmt,
|
|
'-o', 'key-secret=' + secret.id(),
|
|
'-o', 'iter-time=10',
|
|
file,
|
|
'1M')
|
|
iotests.log('')
|
|
|
|
# attempts to add a key using qemu-img
|
|
def addKey(self, file, secret, new_secret):
|
|
|
|
image_options = {
|
|
'key-secret' : secret.id(),
|
|
'driver' : iotests.imgfmt,
|
|
'file' : {
|
|
'driver':'file',
|
|
'filename': file,
|
|
}
|
|
}
|
|
|
|
output = iotests.qemu_img(
|
|
'amend',
|
|
'--object', *secret.to_cmdline_object(),
|
|
'--object', *new_secret.to_cmdline_object(),
|
|
|
|
'-o', 'state=active',
|
|
'-o', 'new-secret=' + new_secret.id(),
|
|
'-o', 'iter-time=10',
|
|
|
|
"json:" + json.dumps(image_options),
|
|
check=False # Expected to fail. Log output.
|
|
).stdout
|
|
|
|
iotests.log(output, filters=[iotests.filter_test_dir])
|
|
|
|
###########################################################################
|
|
# open an encrypted block device
|
|
def openImageQmp(self, vm, id, file, secret,
|
|
readOnly = False, reOpen = False):
|
|
|
|
command = 'blockdev-reopen' if reOpen else 'blockdev-add'
|
|
|
|
opts = {
|
|
'driver': iotests.imgfmt,
|
|
'node-name': id,
|
|
'read-only': readOnly,
|
|
'key-secret' : secret.id(),
|
|
'file': {
|
|
'driver': 'file',
|
|
'filename': test_img,
|
|
}
|
|
}
|
|
|
|
if reOpen:
|
|
vm.cmd(command, options=[opts])
|
|
else:
|
|
vm.cmd(command, opts)
|
|
|
|
|
|
###########################################################################
|
|
# add virtio-blk consumer for a block device
|
|
def addImageUser(self, vm, id, disk_id, share_rw=False):
|
|
result = vm.qmp('device_add', {
|
|
'driver': 'virtio-blk',
|
|
'id': id,
|
|
'drive': disk_id,
|
|
'share-rw' : share_rw
|
|
}
|
|
)
|
|
|
|
iotests.log(result)
|
|
|
|
# close the encrypted block device
|
|
def closeImageQmp(self, vm, id):
|
|
vm.cmd('blockdev-del', {'node-name': id})
|
|
|
|
###########################################################################
|
|
|
|
# add a key to an encrypted block device
|
|
def addKeyQmp(self, vm, id, new_secret):
|
|
|
|
args = {
|
|
'node-name': id,
|
|
'job-id' : 'job0',
|
|
'options' : {
|
|
'state' : 'active',
|
|
'driver' : iotests.imgfmt,
|
|
'new-secret': new_secret.id(),
|
|
'iter-time' : 10
|
|
},
|
|
}
|
|
|
|
result = vm.qmp('x-blockdev-amend', args)
|
|
iotests.log(result)
|
|
# Run the job only if it was created
|
|
event = ('JOB_STATUS_CHANGE',
|
|
{'data': {'id': 'job0', 'status': 'created'}})
|
|
if vm.events_wait([event], timeout=0.0) is not None:
|
|
vm.run_job('job0')
|
|
|
|
# test that when the image opened by two qemu processes,
|
|
# neither of them can update the encryption keys
|
|
def test1(self):
|
|
self.createImg(test_img, self.secrets[0]);
|
|
|
|
# VM1 opens the image and adds a key
|
|
self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0])
|
|
self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[1])
|
|
|
|
|
|
# VM2 opens the image
|
|
self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
|
|
|
|
|
|
# neither VMs now should be able to add a key
|
|
self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
|
|
self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2])
|
|
|
|
|
|
# VM 1 closes the image
|
|
self.closeImageQmp(self.vm1, "testdev")
|
|
|
|
|
|
# now VM2 can add the key
|
|
self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2])
|
|
|
|
|
|
# qemu-img should also not be able to add a key
|
|
self.addKey(test_img, self.secrets[0], self.secrets[2])
|
|
|
|
# cleanup
|
|
self.closeImageQmp(self.vm2, "testdev")
|
|
os.remove(test_img)
|
|
|
|
|
|
# test that when the image opened by two qemu processes,
|
|
# even if first VM opens it read-only, the second can't update encryption
|
|
# keys
|
|
def test2(self):
|
|
self.createImg(test_img, self.secrets[0]);
|
|
|
|
# VM1 opens the image readonly
|
|
self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0],
|
|
readOnly = True)
|
|
|
|
# VM2 opens the image
|
|
self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
|
|
|
|
# VM1 can't add a key since image is readonly
|
|
self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
|
|
|
|
# VM2 can't add a key since VM is has the image opened
|
|
self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2])
|
|
|
|
|
|
#VM1 reopens the image read-write
|
|
self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0],
|
|
reOpen = True, readOnly = False)
|
|
|
|
# VM1 still can't add the key
|
|
self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
|
|
|
|
# VM2 gets away
|
|
self.closeImageQmp(self.vm2, "testdev")
|
|
|
|
# VM1 now can add the key
|
|
self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
|
|
|
|
self.closeImageQmp(self.vm1, "testdev")
|
|
os.remove(test_img)
|
|
|
|
# test that two VMs can't open the same luks image by default
|
|
# and attach it to a guest device
|
|
def test3(self):
|
|
self.createImg(test_img, self.secrets[0]);
|
|
|
|
self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0])
|
|
self.addImageUser(self.vm1, "testctrl", "testdev")
|
|
|
|
self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
|
|
self.addImageUser(self.vm2, "testctrl", "testdev")
|
|
|
|
|
|
# test that two VMs can attach the same luks image to a guest device,
|
|
# if both use share-rw=on
|
|
def test4(self):
|
|
self.createImg(test_img, self.secrets[0]);
|
|
|
|
self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0])
|
|
self.addImageUser(self.vm1, "testctrl", "testdev", share_rw=True)
|
|
|
|
self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
|
|
self.addImageUser(self.vm2, "testctrl", "testdev", share_rw=True)
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# support only raw luks since luks encrypted qcow2 is a proper
|
|
# format driver which doesn't allow any sharing
|
|
iotests.activate_logging()
|
|
iotests.main(supported_fmts = ['luks'])
|