qemu-iotests: Add VM method qtest() to iotests.py

This will allow test cases to run command in qtest protocol. It's
write-only for now.

Signed-off-by: Fam Zheng <famz@redhat.com>
Reviewed-by: Max Reitz <mreitz@redhat.com>
Message-id: 1422586186-9925-4-git-send-email-famz@redhat.com
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
This commit is contained in:
Fam Zheng 2015-01-30 10:49:44 +08:00 committed by Stefan Hajnoczi
parent a628daa42d
commit ed338bb075

View File

@ -21,8 +21,11 @@ import re
import subprocess
import string
import unittest
import sys; sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts', 'qmp'))
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts', 'qmp'))
import qmp
import qtest
import struct
__all__ = ['imgfmt', 'imgproto', 'test_dir' 'qemu_img', 'qemu_io',
@ -81,10 +84,12 @@ class VM(object):
def __init__(self):
self._monitor_path = os.path.join(test_dir, 'qemu-mon.%d' % os.getpid())
self._qemu_log_path = os.path.join(test_dir, 'qemu-log.%d' % os.getpid())
self._qtest_path = os.path.join(test_dir, 'qemu-qtest.%d' % os.getpid())
self._args = qemu_args + ['-chardev',
'socket,id=mon,path=' + self._monitor_path,
'-mon', 'chardev=mon,mode=control',
'-qtest', 'stdio', '-machine', 'accel=qtest',
'-qtest', 'unix:path=' + self._qtest_path,
'-machine', 'accel=qtest',
'-display', 'none', '-vga', 'none']
self._num_drives = 0
@ -160,9 +165,11 @@ class VM(object):
qemulog = open(self._qemu_log_path, 'wb')
try:
self._qmp = qmp.QEMUMonitorProtocol(self._monitor_path, server=True)
self._qtest = qtest.QEMUQtestProtocol(self._qtest_path, server=True)
self._popen = subprocess.Popen(self._args, stdin=devnull, stdout=qemulog,
stderr=subprocess.STDOUT)
self._qmp.accept()
self._qtest.accept()
except:
os.remove(self._monitor_path)
raise
@ -173,6 +180,7 @@ class VM(object):
self._qmp.cmd('quit')
self._popen.wait()
os.remove(self._monitor_path)
os.remove(self._qtest_path)
os.remove(self._qemu_log_path)
self._popen = None
@ -185,6 +193,10 @@ class VM(object):
return self._qmp.cmd(cmd, args=qmp_args)
def qtest(self, cmd):
'''Send a qtest command to guest'''
return self._qtest.cmd(cmd)
def get_qmp_event(self, wait=False):
'''Poll for one queued QMP events and return it'''
return self._qmp.pull_event(wait=wait)