103cbc771e
Most of our Python unittest-style tests only support the file protocol. You can run them with any other protocol, but the test will simply ignore your choice and use file anyway. We should let them signal that they require the file protocol so they are skipped when you want to test some other protocol. Signed-off-by: Max Reitz <mreitz@redhat.com> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
180 lines
7.1 KiB
Python
Executable File
180 lines
7.1 KiB
Python
Executable File
#!/usr/bin/env python
|
|
#
|
|
# Tests for fdsets and getfd.
|
|
#
|
|
# Copyright (C) 2012 IBM Corp.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
import os
|
|
import iotests
|
|
from iotests import qemu_img
|
|
|
|
image0 = os.path.join(iotests.test_dir, 'image0')
|
|
image1 = os.path.join(iotests.test_dir, 'image1')
|
|
image2 = os.path.join(iotests.test_dir, 'image2')
|
|
image3 = os.path.join(iotests.test_dir, 'image3')
|
|
image4 = os.path.join(iotests.test_dir, 'image4')
|
|
|
|
class TestFdSets(iotests.QMPTestCase):
|
|
|
|
def setUp(self):
|
|
self.vm = iotests.VM()
|
|
qemu_img('create', '-f', iotests.imgfmt, image0, '128K')
|
|
qemu_img('create', '-f', iotests.imgfmt, image1, '128K')
|
|
qemu_img('create', '-f', iotests.imgfmt, image2, '128K')
|
|
qemu_img('create', '-f', iotests.imgfmt, image3, '128K')
|
|
qemu_img('create', '-f', iotests.imgfmt, image4, '128K')
|
|
self.file0 = open(image0, 'r')
|
|
self.file1 = open(image1, 'w+')
|
|
self.file2 = open(image2, 'r')
|
|
self.file3 = open(image3, 'r')
|
|
self.file4 = open(image4, 'r')
|
|
self.vm.add_fd(self.file0.fileno(), 1, 'image0:r')
|
|
self.vm.add_fd(self.file1.fileno(), 1, 'image1:w+')
|
|
self.vm.add_fd(self.file2.fileno(), 0, 'image2:r')
|
|
self.vm.add_fd(self.file3.fileno(), 2, 'image3:r')
|
|
self.vm.add_fd(self.file4.fileno(), 2, 'image4:r')
|
|
self.vm.add_drive("/dev/fdset/1")
|
|
self.vm.launch()
|
|
|
|
def tearDown(self):
|
|
self.vm.shutdown()
|
|
self.file0.close()
|
|
self.file1.close()
|
|
self.file2.close()
|
|
self.file3.close()
|
|
self.file4.close()
|
|
os.remove(image0)
|
|
os.remove(image1)
|
|
os.remove(image2)
|
|
os.remove(image3)
|
|
os.remove(image4)
|
|
|
|
def test_query_fdset(self):
|
|
result = self.vm.qmp('query-fdsets')
|
|
self.assert_qmp(result, 'return[0]/fdset-id', 2)
|
|
self.assert_qmp(result, 'return[1]/fdset-id', 1)
|
|
self.assert_qmp(result, 'return[2]/fdset-id', 0)
|
|
self.assert_qmp(result, 'return[0]/fds[0]/opaque', 'image3:r')
|
|
self.assert_qmp(result, 'return[0]/fds[1]/opaque', 'image4:r')
|
|
self.assert_qmp(result, 'return[1]/fds[0]/opaque', 'image0:r')
|
|
self.assert_qmp(result, 'return[1]/fds[1]/opaque', 'image1:w+')
|
|
self.assert_qmp(result, 'return[2]/fds[0]/opaque', 'image2:r')
|
|
self.vm.shutdown()
|
|
|
|
def test_remove_fdset(self):
|
|
result = self.vm.qmp('remove-fd', fdset_id=2)
|
|
self.assert_qmp(result, 'return', {})
|
|
result = self.vm.qmp('query-fdsets')
|
|
self.assert_qmp(result, 'return[0]/fdset-id', 1)
|
|
self.assert_qmp(result, 'return[1]/fdset-id', 0)
|
|
self.assert_qmp(result, 'return[0]/fds[0]/opaque', 'image0:r')
|
|
self.assert_qmp(result, 'return[0]/fds[1]/opaque', 'image1:w+')
|
|
self.assert_qmp(result, 'return[1]/fds[0]/opaque', 'image2:r')
|
|
self.vm.shutdown()
|
|
|
|
def test_remove_fd(self):
|
|
result = self.vm.qmp('query-fdsets')
|
|
fd_image3 = result['return'][0]['fds'][0]['fd']
|
|
result = self.vm.qmp('remove-fd', fdset_id=2, fd=fd_image3)
|
|
self.assert_qmp(result, 'return', {})
|
|
result = self.vm.qmp('query-fdsets')
|
|
self.assert_qmp(result, 'return[0]/fdset-id', 2)
|
|
self.assert_qmp(result, 'return[1]/fdset-id', 1)
|
|
self.assert_qmp(result, 'return[2]/fdset-id', 0)
|
|
self.assert_qmp(result, 'return[0]/fds[0]/opaque', 'image4:r')
|
|
self.assert_qmp(result, 'return[1]/fds[0]/opaque', 'image0:r')
|
|
self.assert_qmp(result, 'return[1]/fds[1]/opaque', 'image1:w+')
|
|
self.assert_qmp(result, 'return[2]/fds[0]/opaque', 'image2:r')
|
|
self.vm.shutdown()
|
|
|
|
def test_remove_fd_invalid_fdset(self):
|
|
result = self.vm.qmp('query-fdsets')
|
|
fd_image3 = result['return'][0]['fds'][0]['fd']
|
|
result = self.vm.qmp('remove-fd', fdset_id=3, fd=fd_image3)
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
self.assert_qmp(result, 'error/desc',
|
|
'File descriptor named \'fdset-id:3, fd:%d\' not found' % fd_image3)
|
|
self.vm.shutdown()
|
|
|
|
def test_remove_fd_invalid_fd(self):
|
|
result = self.vm.qmp('query-fdsets')
|
|
result = self.vm.qmp('remove-fd', fdset_id=2, fd=999)
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
self.assert_qmp(result, 'error/desc',
|
|
'File descriptor named \'fdset-id:2, fd:999\' not found')
|
|
self.vm.shutdown()
|
|
|
|
def test_add_fd_invalid_fd(self):
|
|
result = self.vm.qmp('add-fd', fdset_id=2)
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
self.assert_qmp(result, 'error/desc',
|
|
'No file descriptor supplied via SCM_RIGHTS')
|
|
self.vm.shutdown()
|
|
|
|
# Add fd at runtime, there are two ways: monitor related or fdset related
|
|
class TestSCMFd(iotests.QMPTestCase):
|
|
def setUp(self):
|
|
self.vm = iotests.VM()
|
|
qemu_img('create', '-f', iotests.imgfmt, image0, '128K')
|
|
# Add an unused monitor, to verify it works fine when two monitor
|
|
# instances present
|
|
self.vm.add_monitor_null()
|
|
self.vm.launch()
|
|
|
|
def tearDown(self):
|
|
self.vm.shutdown()
|
|
os.remove(image0)
|
|
|
|
def _send_fd_by_SCM(self):
|
|
ret = self.vm.send_fd_scm(file_path=image0)
|
|
self.assertEqual(ret, 0, 'Failed to send fd with UNIX SCM')
|
|
|
|
def test_add_fd(self):
|
|
self._send_fd_by_SCM()
|
|
result = self.vm.qmp('add-fd', fdset_id=2, opaque='image0:r')
|
|
self.assert_qmp(result, 'return/fdset-id', 2)
|
|
|
|
def test_getfd(self):
|
|
self._send_fd_by_SCM()
|
|
result = self.vm.qmp('getfd', fdname='image0:r')
|
|
self.assert_qmp(result, 'return', {})
|
|
|
|
def test_getfd_invalid_fdname(self):
|
|
self._send_fd_by_SCM()
|
|
result = self.vm.qmp('getfd', fdname='0image0:r')
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
self.assert_qmp(result, 'error/desc',
|
|
"Parameter 'fdname' expects a name not starting with a digit")
|
|
|
|
def test_closefd(self):
|
|
self._send_fd_by_SCM()
|
|
result = self.vm.qmp('getfd', fdname='image0:r')
|
|
self.assert_qmp(result, 'return', {})
|
|
result = self.vm.qmp('closefd', fdname='image0:r')
|
|
self.assert_qmp(result, 'return', {})
|
|
|
|
def test_closefd_fd_not_found(self):
|
|
fdname = 'image0:r'
|
|
result = self.vm.qmp('closefd', fdname=fdname)
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
self.assert_qmp(result, 'error/desc',
|
|
"File descriptor named '%s' not found" % fdname)
|
|
|
|
if __name__ == '__main__':
|
|
iotests.main(supported_fmts=['raw'],
|
|
supported_protocols=['file'])
|