Pull request

-----BEGIN PGP SIGNATURE-----
 
 iQIzBAABCAAdFiEE+ber27ys35W+dsvQfe+BBqr8OQ4FAl+PNNYACgkQfe+BBqr8
 OQ5WHA/8CxKN5vHw1Ob4rAc1vjp58plwgpFOnwOUlw11FQ5+qJhSFvWjIaqFMzJ5
 tCs6Qus+k2arpcFY/RcTHEkNfJR7t8o4u62cZh+CzcpxpaTleM7Ru32xaiiw1dKH
 3ujfjnT4U9dTDGg6vqQeaZj9RBSKd3cFCzzupO6iho9md3ZCJXsO05w0SNyAe1Ct
 o7y0jHAxJMRQJ4kDtp8e2F9w+jNP2zLQZxPB3Af3SF0CTkXymuEv+5Jv+aoJ5Z9d
 tZxGF8imi9Gq+JSMxp7Oatw5RMBqCKEqr81cddbiPafisNBQJ4fR/HiPxzGjNTIp
 TW61duK/hLMbQk7my6bmGr2FOAo4Xw2t3e0pGORi20W19Su/us07KwFx81Ls6Wok
 3n/biFi6pWeFwDXO8nsgs4eJEcD0blrEqDbjLrMhcKgnFUGJTIbnZ+AIjpNy8OLZ
 edT5DZRymIROiPopBSsvUM9KYjo9kvoFodPl9RtiWFvjrkw1gyqnUAaa10xKpZ0s
 WKFrktFoPXOyNjlmZO936X2jKuGGv7+yYBLGKNpUOSQu8P4Jan8GEGmOtHZAWKMH
 xRnDSEfkBQPRmSriI0je7P3WVqlwqBzVH+a4P0d4b00LKVLiKtYAqO/x3UcQ20CQ
 AlBglcRL96hxm+capuGOmm79Nl5+u3dxwpQZEBVa2M65kbbXdZ4=
 =iStV
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/jsnow-gitlab/tags/python-pull-request' into staging

Pull request

# gpg: Signature made Tue 20 Oct 2020 20:04:54 BST
# gpg:                using RSA key F9B7ABDBBCACDF95BE76CBD07DEF8106AAFC390E
# gpg: Good signature from "John Snow (John Huston) <jsnow@redhat.com>" [full]
# Primary key fingerprint: FAEB 9711 A12C F475 812F  18F2 88A9 064D 1835 61EB
#      Subkey fingerprint: F9B7 ABDB BCAC DF95 BE76  CBD0 7DEF 8106 AAFC 390E

* remotes/jsnow-gitlab/tags/python-pull-request: (21 commits)
  python/qemu/qmp.py: Fix settimeout operation
  python/qemu/qmp.py: re-raise OSError when encountered
  python: add mypy config
  python/qemu/qmp.py: Preserve error context on re-raise
  python/qemu/console_socket.py: avoid encoding to/from string
  python/qemu/console_socket.py: Add type hint annotations
  python/qemu/console_socket.py: Clarify type of drain_thread
  python/qemu/console_socket.py: fix typing of settimeout
  python/qemu/console_socket.py: Correct type of recv()
  python/qemu: Add mypy type annotations
  iotests.py: Adjust HMP kwargs typing
  python/qemu: make 'args' style arguments immutable
  python/machine.py: fix _popen access
  python/machine.py: Add _qmp access shim
  python/machine.py: use qmp.command
  python/machine.py: Handle None events in events_wait
  python/machine.py: Don't modify state in _base_args()
  python/machine.py: reorder __init__
  python/machine.py: Fix monitor address typing
  python/qemu: use isort to lay out imports
  ...

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2020-10-21 11:09:13 +01:00
commit 67e8498937
9 changed files with 326 additions and 211 deletions

View File

@ -2373,11 +2373,18 @@ S: Maintained
F: include/sysemu/cryptodev*.h
F: backends/cryptodev*.c
Python library
M: John Snow <jsnow@redhat.com>
M: Cleber Rosa <crosa@redhat.com>
R: Eduardo Habkost <ehabkost@redhat.com>
S: Maintained
F: python/
T: git https://gitlab.com/jsnow/qemu.git python
Python scripts
M: Eduardo Habkost <ehabkost@redhat.com>
M: Cleber Rosa <crosa@redhat.com>
S: Odd fixes
F: python/qemu/*py
F: scripts/*.py
F: tests/*.py

4
python/mypy.ini Normal file
View File

@ -0,0 +1,4 @@
[mypy]
strict = True
python_version = 3.6
warn_unused_configs = True

7
python/qemu/.isort.cfg Normal file
View File

@ -0,0 +1,7 @@
[settings]
force_grid_wrap=4
force_sort_within_sections=True
include_trailing_comma=True
line_length=72
lines_after_imports=2
multi_line_output=3

View File

@ -17,6 +17,8 @@ accelerators.
import logging
import os
import subprocess
from typing import List, Optional
LOG = logging.getLogger(__name__)
@ -29,7 +31,7 @@ ADDITIONAL_ARCHES = {
}
def list_accel(qemu_bin):
def list_accel(qemu_bin: str) -> List[str]:
"""
List accelerators enabled in the QEMU binary.
@ -49,7 +51,8 @@ def list_accel(qemu_bin):
return [acc.strip() for acc in out.splitlines()[1:]]
def kvm_available(target_arch=None, qemu_bin=None):
def kvm_available(target_arch: Optional[str] = None,
qemu_bin: Optional[str] = None) -> bool:
"""
Check if KVM is available using the following heuristic:
- Kernel module is present in the host;
@ -72,7 +75,7 @@ def kvm_available(target_arch=None, qemu_bin=None):
return True
def tcg_available(qemu_bin):
def tcg_available(qemu_bin: str) -> bool:
"""
Check if TCG is available.

View File

@ -13,10 +13,11 @@ which can drain a socket and optionally dump the bytes to file.
# the COPYING file in the top-level directory.
#
from collections import deque
import socket
import threading
from collections import deque
import time
from typing import Deque, Optional
class ConsoleSocket(socket.socket):
@ -29,22 +30,22 @@ class ConsoleSocket(socket.socket):
Optionally a file path can be passed in and we will also
dump the characters to this file for debugging purposes.
"""
def __init__(self, address, file=None, drain=False):
self._recv_timeout_sec = 300
def __init__(self, address: str, file: Optional[str] = None,
drain: bool = False):
self._recv_timeout_sec = 300.0
self._sleep_time = 0.5
self._buffer = deque()
self._buffer: Deque[int] = deque()
socket.socket.__init__(self, socket.AF_UNIX, socket.SOCK_STREAM)
self.connect(address)
self._logfile = None
if file:
self._logfile = open(file, "w")
self._logfile = open(file, "bw")
self._open = True
self._drain_thread = None
if drain:
self._drain_thread = self._thread_start()
else:
self._drain_thread = None
def _drain_fn(self):
def _drain_fn(self) -> None:
"""Drains the socket and runs while the socket is open."""
while self._open:
try:
@ -55,7 +56,7 @@ class ConsoleSocket(socket.socket):
# self._open is set to False.
time.sleep(self._sleep_time)
def _thread_start(self):
def _thread_start(self) -> threading.Thread:
"""Kick off a thread to drain the socket."""
# Configure socket to not block and timeout.
# This allows our drain thread to not block
@ -67,7 +68,7 @@ class ConsoleSocket(socket.socket):
drain_thread.start()
return drain_thread
def close(self):
def close(self) -> None:
"""Close the base object and wait for the thread to terminate"""
if self._open:
self._open = False
@ -79,51 +80,42 @@ class ConsoleSocket(socket.socket):
self._logfile.close()
self._logfile = None
def _drain_socket(self):
def _drain_socket(self) -> None:
"""process arriving characters into in memory _buffer"""
data = socket.socket.recv(self, 1)
# latin1 is needed since there are some chars
# we are receiving that cannot be encoded to utf-8
# such as 0xe2, 0x80, 0xA6.
string = data.decode("latin1")
if self._logfile:
self._logfile.write("{}".format(string))
self._logfile.write(data)
self._logfile.flush()
for c in string:
self._buffer.extend(c)
self._buffer.extend(data)
def recv(self, bufsize=1):
def recv(self, bufsize: int = 1, flags: int = 0) -> bytes:
"""Return chars from in memory buffer.
Maintains the same API as socket.socket.recv.
"""
if self._drain_thread is None:
# Not buffering the socket, pass thru to socket.
return socket.socket.recv(self, bufsize)
return socket.socket.recv(self, bufsize, flags)
assert not flags, "Cannot pass flags to recv() in drained mode"
start_time = time.time()
while len(self._buffer) < bufsize:
time.sleep(self._sleep_time)
elapsed_sec = time.time() - start_time
if elapsed_sec > self._recv_timeout_sec:
raise socket.timeout
chars = ''.join([self._buffer.popleft() for i in range(bufsize)])
# We choose to use latin1 to remain consistent with
# handle_read() and give back the same data as the user would
# receive if they were reading directly from the
# socket w/o our intervention.
return chars.encode("latin1")
return bytes((self._buffer.popleft() for i in range(bufsize)))
def setblocking(self, value):
def setblocking(self, value: bool) -> None:
"""When not draining we pass thru to the socket,
since when draining we control socket blocking.
"""
if self._drain_thread is None:
socket.socket.setblocking(self, value)
def settimeout(self, seconds):
def settimeout(self, value: Optional[float]) -> None:
"""When not draining we pass thru to the socket,
since when draining we control the timeout.
"""
if seconds is not None:
self._recv_timeout_sec = seconds
if value is not None:
self._recv_timeout_sec = value
if self._drain_thread is None:
socket.socket.settimeout(self, seconds)
socket.socket.settimeout(self, value)

View File

@ -18,17 +18,29 @@ which provides facilities for managing the lifetime of a QEMU VM.
#
import errno
from itertools import chain
import logging
import os
import subprocess
import shutil
import signal
import socket
import subprocess
import tempfile
from typing import Optional, Type
from types import TracebackType
from . import console_socket
from typing import (
Any,
BinaryIO,
Dict,
List,
Optional,
Sequence,
Tuple,
Type,
)
from . import console_socket, qmp
from .qmp import QMPMessage, QMPReturnValue, SocketAddrT
from . import qmp
LOG = logging.getLogger(__name__)
@ -57,7 +69,7 @@ class AbnormalShutdown(QEMUMachineError):
class QEMUMachine:
"""
A QEMU VM
A QEMU VM.
Use this object as a context manager to ensure
the QEMU process terminates::
@ -67,10 +79,17 @@ class QEMUMachine:
# vm is guaranteed to be shut down here
"""
def __init__(self, binary, args=None, wrapper=None, name=None,
test_dir="/var/tmp", monitor_address=None,
socket_scm_helper=None, sock_dir=None,
drain_console=False, console_log=None):
def __init__(self,
binary: str,
args: Sequence[str] = (),
wrapper: Sequence[str] = (),
name: Optional[str] = None,
test_dir: str = "/var/tmp",
monitor_address: Optional[SocketAddrT] = None,
socket_scm_helper: Optional[str] = None,
sock_dir: Optional[str] = None,
drain_console: bool = False,
console_log: Optional[str] = None):
'''
Initialize a QEMUMachine
@ -82,45 +101,30 @@ class QEMUMachine:
@param monitor_address: address for QMP monitor
@param socket_scm_helper: helper program, required for send_fd_scm()
@param sock_dir: where to create socket (overrides test_dir for sock)
@param console_log: (optional) path to console log file
@param drain_console: (optional) True to drain console socket to buffer
@param console_log: (optional) path to console log file
@note: Qemu process is not started until launch() is used.
'''
if args is None:
args = []
if wrapper is None:
wrapper = []
if name is None:
name = "qemu-%d" % os.getpid()
if sock_dir is None:
sock_dir = test_dir
self._name = name
self._monitor_address = monitor_address
self._vm_monitor = None
self._qemu_log_path = None
self._qemu_log_file = None
self._popen = None
# Direct user configuration
self._binary = binary
self._args = list(args) # Force copy args in case we modify them
self._args = list(args)
self._wrapper = wrapper
self._events = []
self._iolog = None
self._socket_scm_helper = socket_scm_helper
self._qmp_set = True # Enable QMP monitor by default.
self._qmp = None
self._qemu_full_args = None
self._name = name or "qemu-%d" % os.getpid()
self._test_dir = test_dir
self._temp_dir = None
self._sock_dir = sock_dir
self._launched = False
self._machine = None
self._console_index = 0
self._console_set = False
self._console_device_type = None
self._console_address = None
self._console_socket = None
self._remove_files = []
self._user_killed = False
self._sock_dir = sock_dir or self._test_dir
self._socket_scm_helper = socket_scm_helper
if monitor_address is not None:
self._monitor_address = monitor_address
self._remove_monitor_sockfile = False
else:
self._monitor_address = os.path.join(
self._sock_dir, f"{self._name}-monitor.sock"
)
self._remove_monitor_sockfile = True
self._console_log_path = console_log
if self._console_log_path:
# In order to log the console, buffering needs to be enabled.
@ -128,7 +132,29 @@ class QEMUMachine:
else:
self._drain_console = drain_console
def __enter__(self):
# Runstate
self._qemu_log_path: Optional[str] = None
self._qemu_log_file: Optional[BinaryIO] = None
self._popen: Optional['subprocess.Popen[bytes]'] = None
self._events: List[QMPMessage] = []
self._iolog: Optional[str] = None
self._qmp_set = True # Enable QMP monitor by default.
self._qmp_connection: Optional[qmp.QEMUMonitorProtocol] = None
self._qemu_full_args: Tuple[str, ...] = ()
self._temp_dir: Optional[str] = None
self._launched = False
self._machine: Optional[str] = None
self._console_index = 0
self._console_set = False
self._console_device_type: Optional[str] = None
self._console_address = os.path.join(
self._sock_dir, f"{self._name}-console.sock"
)
self._console_socket: Optional[socket.socket] = None
self._remove_files: List[str] = []
self._user_killed = False
def __enter__(self) -> 'QEMUMachine':
return self
def __exit__(self,
@ -137,14 +163,15 @@ class QEMUMachine:
exc_tb: Optional[TracebackType]) -> None:
self.shutdown()
def add_monitor_null(self):
def add_monitor_null(self) -> None:
"""
This can be used to add an unused monitor instance.
"""
self._args.append('-monitor')
self._args.append('null')
def add_fd(self, fd, fdset, opaque, opts=''):
def add_fd(self, fd: int, fdset: int,
opaque: str, opts: str = '') -> 'QEMUMachine':
"""
Pass a file descriptor to the VM
"""
@ -163,7 +190,8 @@ class QEMUMachine:
self._args.append(','.join(options))
return self
def send_fd_scm(self, fd=None, file_path=None):
def send_fd_scm(self, fd: Optional[int] = None,
file_path: Optional[str] = None) -> int:
"""
Send an fd or file_path to socket_scm_helper.
@ -207,7 +235,7 @@ class QEMUMachine:
return proc.returncode
@staticmethod
def _remove_if_exists(path):
def _remove_if_exists(path: str) -> None:
"""
Remove file object at path if it exists
"""
@ -218,46 +246,52 @@ class QEMUMachine:
return
raise
def is_running(self):
def is_running(self) -> bool:
"""Returns true if the VM is running."""
return self._popen is not None and self._popen.poll() is None
def exitcode(self):
@property
def _subp(self) -> 'subprocess.Popen[bytes]':
if self._popen is None:
raise QEMUMachineError('Subprocess pipe not present')
return self._popen
def exitcode(self) -> Optional[int]:
"""Returns the exit code if possible, or None."""
if self._popen is None:
return None
return self._popen.poll()
def get_pid(self):
def get_pid(self) -> Optional[int]:
"""Returns the PID of the running process, or None."""
if not self.is_running():
return None
return self._popen.pid
return self._subp.pid
def _load_io_log(self):
def _load_io_log(self) -> None:
if self._qemu_log_path is not None:
with open(self._qemu_log_path, "r") as iolog:
self._iolog = iolog.read()
def _base_args(self):
@property
def _base_args(self) -> List[str]:
args = ['-display', 'none', '-vga', 'none']
if self._qmp_set:
if isinstance(self._monitor_address, tuple):
moncdev = "socket,id=mon,host=%s,port=%s" % (
self._monitor_address[0],
self._monitor_address[1])
moncdev = "socket,id=mon,host={},port={}".format(
*self._monitor_address
)
else:
moncdev = 'socket,id=mon,path=%s' % self._vm_monitor
moncdev = f"socket,id=mon,path={self._monitor_address}"
args.extend(['-chardev', moncdev, '-mon',
'chardev=mon,mode=control'])
if self._machine is not None:
args.extend(['-machine', self._machine])
for _ in range(self._console_index):
args.extend(['-serial', 'null'])
if self._console_set:
self._console_address = os.path.join(self._sock_dir,
self._name + "-console.sock")
self._remove_files.append(self._console_address)
chardev = ('socket,id=console,path=%s,server,nowait' %
self._console_address)
args.extend(['-chardev', chardev])
@ -268,26 +302,29 @@ class QEMUMachine:
args.extend(['-device', device])
return args
def _pre_launch(self):
def _pre_launch(self) -> None:
self._temp_dir = tempfile.mkdtemp(dir=self._test_dir)
self._qemu_log_path = os.path.join(self._temp_dir, self._name + ".log")
self._qemu_log_file = open(self._qemu_log_path, 'wb')
if self._qmp_set:
if self._monitor_address is not None:
self._vm_monitor = self._monitor_address
else:
self._vm_monitor = os.path.join(self._sock_dir,
self._name + "-monitor.sock")
self._remove_files.append(self._vm_monitor)
self._qmp = qmp.QEMUMonitorProtocol(self._vm_monitor, server=True,
nickname=self._name)
if self._console_set:
self._remove_files.append(self._console_address)
def _post_launch(self):
if self._qmp:
if self._qmp_set:
if self._remove_monitor_sockfile:
assert isinstance(self._monitor_address, str)
self._remove_files.append(self._monitor_address)
self._qmp_connection = qmp.QEMUMonitorProtocol(
self._monitor_address,
server=True,
nickname=self._name
)
def _post_launch(self) -> None:
if self._qmp_connection:
self._qmp.accept()
def _post_shutdown(self):
def _post_shutdown(self) -> None:
"""
Called to cleanup the VM instance after the process has exited.
May also be called after a failed launch.
@ -295,9 +332,9 @@ class QEMUMachine:
# Comprehensive reset for the failed launch case:
self._early_cleanup()
if self._qmp:
if self._qmp_connection:
self._qmp.close()
self._qmp = None
self._qmp_connection = None
self._load_io_log()
@ -327,7 +364,7 @@ class QEMUMachine:
self._user_killed = False
self._launched = False
def launch(self):
def launch(self) -> None:
"""
Launch the VM and make sure we cleanup and expose the
command line/output in case of exception
@ -337,7 +374,7 @@ class QEMUMachine:
raise QEMUMachineError('VM already launched')
self._iolog = None
self._qemu_full_args = None
self._qemu_full_args = ()
try:
self._launch()
self._launched = True
@ -351,14 +388,18 @@ class QEMUMachine:
LOG.debug('Output: %r', self._iolog)
raise
def _launch(self):
def _launch(self) -> None:
"""
Launch the VM and establish a QMP connection
"""
devnull = open(os.path.devnull, 'rb')
self._pre_launch()
self._qemu_full_args = (self._wrapper + [self._binary] +
self._base_args() + self._args)
self._qemu_full_args = tuple(
chain(self._wrapper,
[self._binary],
self._base_args,
self._args)
)
LOG.debug('VM launch command: %r', ' '.join(self._qemu_full_args))
self._popen = subprocess.Popen(self._qemu_full_args,
stdin=devnull,
@ -390,8 +431,8 @@ class QEMUMachine:
waiting for the QEMU process to terminate.
"""
self._early_cleanup()
self._popen.kill()
self._popen.wait(timeout=60)
self._subp.kill()
self._subp.wait(timeout=60)
def _soft_shutdown(self, timeout: Optional[int],
has_quit: bool = False) -> None:
@ -409,13 +450,13 @@ class QEMUMachine:
"""
self._early_cleanup()
if self._qmp is not None:
if self._qmp_connection:
if not has_quit:
# Might raise ConnectionReset
self._qmp.cmd('quit')
# May raise subprocess.TimeoutExpired
self._popen.wait(timeout=timeout)
self._subp.wait(timeout=timeout)
def _do_shutdown(self, timeout: Optional[int],
has_quit: bool = False) -> None:
@ -466,7 +507,7 @@ class QEMUMachine:
finally:
self._post_shutdown()
def kill(self):
def kill(self) -> None:
"""
Terminate the VM forcefully, wait for it to exit, and perform cleanup.
"""
@ -481,7 +522,7 @@ class QEMUMachine:
"""
self.shutdown(has_quit=True, timeout=timeout)
def set_qmp_monitor(self, enabled=True):
def set_qmp_monitor(self, enabled: bool = True) -> None:
"""
Set the QMP monitor.
@ -490,39 +531,45 @@ class QEMUMachine:
line. Default is True.
@note: call this function before launch().
"""
if enabled:
self._qmp_set = True
else:
self._qmp_set = False
self._qmp = None
self._qmp_set = enabled
def qmp(self, cmd, conv_keys=True, **args):
"""
Invoke a QMP command and return the response dict
"""
@property
def _qmp(self) -> qmp.QEMUMonitorProtocol:
if self._qmp_connection is None:
raise QEMUMachineError("Attempt to access QMP with no connection")
return self._qmp_connection
@classmethod
def _qmp_args(cls, _conv_keys: bool = True, **args: Any) -> Dict[str, Any]:
qmp_args = dict()
for key, value in args.items():
if conv_keys:
if _conv_keys:
qmp_args[key.replace('_', '-')] = value
else:
qmp_args[key] = value
return qmp_args
def qmp(self, cmd: str,
conv_keys: bool = True,
**args: Any) -> QMPMessage:
"""
Invoke a QMP command and return the response dict
"""
qmp_args = self._qmp_args(conv_keys, **args)
return self._qmp.cmd(cmd, args=qmp_args)
def command(self, cmd, conv_keys=True, **args):
def command(self, cmd: str,
conv_keys: bool = True,
**args: Any) -> QMPReturnValue:
"""
Invoke a QMP command.
On success return the response dict.
On failure raise an exception.
"""
reply = self.qmp(cmd, conv_keys, **args)
if reply is None:
raise qmp.QMPError("Monitor is closed")
if "error" in reply:
raise qmp.QMPResponseError(reply)
return reply["return"]
qmp_args = self._qmp_args(conv_keys, **args)
return self._qmp.command(cmd, **qmp_args)
def get_qmp_event(self, wait=False):
def get_qmp_event(self, wait: bool = False) -> Optional[QMPMessage]:
"""
Poll for one queued QMP events and return it
"""
@ -530,7 +577,7 @@ class QEMUMachine:
return self._events.pop(0)
return self._qmp.pull_event(wait=wait)
def get_qmp_events(self, wait=False):
def get_qmp_events(self, wait: bool = False) -> List[QMPMessage]:
"""
Poll for queued QMP events and return a list of dicts
"""
@ -541,7 +588,7 @@ class QEMUMachine:
return events
@staticmethod
def event_match(event, match=None):
def event_match(event: Any, match: Optional[Any]) -> bool:
"""
Check if an event matches optional match criteria.
@ -571,9 +618,11 @@ class QEMUMachine:
return True
except TypeError:
# either match or event wasn't iterable (not a dict)
return match == event
return bool(match == event)
def event_wait(self, name, timeout=60.0, match=None):
def event_wait(self, name: str,
timeout: float = 60.0,
match: Optional[QMPMessage] = None) -> Optional[QMPMessage]:
"""
event_wait waits for and returns a named event from QMP with a timeout.
@ -583,22 +632,33 @@ class QEMUMachine:
"""
return self.events_wait([(name, match)], timeout)
def events_wait(self, events, timeout=60.0):
def events_wait(self,
events: Sequence[Tuple[str, Any]],
timeout: float = 60.0) -> Optional[QMPMessage]:
"""
events_wait waits for and returns a named event
from QMP with a timeout.
events_wait waits for and returns a single named event from QMP.
In the case of multiple qualifying events, this function returns the
first one.
events: a sequence of (name, match_criteria) tuples.
The match criteria are optional and may be None.
See event_match for details.
timeout: QEMUMonitorProtocol.pull_event timeout parameter.
:param events: A sequence of (name, match_criteria) tuples.
The match criteria are optional and may be None.
See event_match for details.
:param timeout: Optional timeout, in seconds.
See QEMUMonitorProtocol.pull_event.
:raise QMPTimeoutError: If timeout was non-zero and no matching events
were found.
:return: A QMP event matching the filter criteria.
If timeout was 0 and no event matched, None.
"""
def _match(event):
def _match(event: QMPMessage) -> bool:
for name, match in events:
if event['event'] == name and self.event_match(event, match):
return True
return False
event: Optional[QMPMessage]
# Search cached events
for event in self._events:
if _match(event):
@ -608,26 +668,30 @@ class QEMUMachine:
# Poll for new events
while True:
event = self._qmp.pull_event(wait=timeout)
if event is None:
# NB: None is only returned when timeout is false-ish.
# Timeouts raise QMPTimeoutError instead!
break
if _match(event):
return event
self._events.append(event)
return None
def get_log(self):
def get_log(self) -> Optional[str]:
"""
After self.shutdown or failed qemu execution, this returns the output
of the qemu process.
"""
return self._iolog
def add_args(self, *args):
def add_args(self, *args: str) -> None:
"""
Adds to the list of extra arguments to be given to the QEMU binary
"""
self._args.extend(args)
def set_machine(self, machine_type):
def set_machine(self, machine_type: str) -> None:
"""
Sets the machine type
@ -636,7 +700,9 @@ class QEMUMachine:
"""
self._machine = machine_type
def set_console(self, device_type=None, console_index=0):
def set_console(self,
device_type: Optional[str] = None,
console_index: int = 0) -> None:
"""
Sets the device type for a console device
@ -667,7 +733,7 @@ class QEMUMachine:
self._console_index = console_index
@property
def console_socket(self):
def console_socket(self) -> socket.socket:
"""
Returns a socket connected to the console
"""

View File

@ -7,21 +7,22 @@
# This work is licensed under the terms of the GNU GPL, version 2. See
# the COPYING file in the top-level directory.
import json
import errno
import socket
import json
import logging
import socket
from types import TracebackType
from typing import (
Any,
cast,
Dict,
List,
Optional,
TextIO,
Type,
Tuple,
Type,
Union,
cast,
)
from types import TracebackType
# QMPMessage is a QMP Message of any kind.
@ -90,7 +91,9 @@ class QEMUMonitorProtocol:
#: Logger object for debugging messages
logger = logging.getLogger('QMP')
def __init__(self, address, server=False, nickname=None):
def __init__(self, address: SocketAddrT,
server: bool = False,
nickname: Optional[str] = None):
"""
Create a QEMUMonitorProtocol class.
@ -102,7 +105,7 @@ class QEMUMonitorProtocol:
@note No connection is established, this is done by the connect() or
accept() methods
"""
self.__events = []
self.__events: List[QMPMessage] = []
self.__address = address
self.__sock = self.__get_sock()
self.__sockfile: Optional[TextIO] = None
@ -114,14 +117,14 @@ class QEMUMonitorProtocol:
self.__sock.bind(self.__address)
self.__sock.listen(1)
def __get_sock(self):
def __get_sock(self) -> socket.socket:
if isinstance(self.__address, tuple):
family = socket.AF_INET
else:
family = socket.AF_UNIX
return socket.socket(family, socket.SOCK_STREAM)
def __negotiate_capabilities(self):
def __negotiate_capabilities(self) -> QMPMessage:
greeting = self.__json_read()
if greeting is None or "QMP" not in greeting:
raise QMPConnectError
@ -131,7 +134,7 @@ class QEMUMonitorProtocol:
return greeting
raise QMPCapabilitiesError
def __json_read(self, only_event=False):
def __json_read(self, only_event: bool = False) -> Optional[QMPMessage]:
assert self.__sockfile is not None
while True:
data = self.__sockfile.readline()
@ -148,7 +151,7 @@ class QEMUMonitorProtocol:
continue
return resp
def __get_events(self, wait=False):
def __get_events(self, wait: Union[bool, float] = False) -> None:
"""
Check for new events in the stream and cache them in __events.
@ -161,15 +164,19 @@ class QEMUMonitorProtocol:
retrieved or if some other error occurred.
"""
# Current timeout and blocking status
current_timeout = self.__sock.gettimeout()
# Check for new events regardless and pull them into the cache:
self.__sock.setblocking(False)
self.__sock.settimeout(0) # i.e. setblocking(False)
try:
self.__json_read()
except OSError as err:
if err.errno == errno.EAGAIN:
# No data available
pass
self.__sock.setblocking(True)
# EAGAIN: No data available; not critical
if err.errno != errno.EAGAIN:
raise
finally:
self.__sock.settimeout(current_timeout)
# Wait for new events, if needed.
# if wait is 0.0, this means "no wait" and is also implicitly false.
@ -178,15 +185,18 @@ class QEMUMonitorProtocol:
self.__sock.settimeout(wait)
try:
ret = self.__json_read(only_event=True)
except socket.timeout:
raise QMPTimeoutError("Timeout waiting for event")
except:
raise QMPConnectError("Error while reading from socket")
except socket.timeout as err:
raise QMPTimeoutError("Timeout waiting for event") from err
except Exception as err:
msg = "Error while reading from socket"
raise QMPConnectError(msg) from err
finally:
self.__sock.settimeout(current_timeout)
if ret is None:
raise QMPConnectError("Error while reading from socket")
self.__sock.settimeout(None)
def __enter__(self):
def __enter__(self) -> 'QEMUMonitorProtocol':
# Implement context manager enter function.
return self
@ -199,7 +209,7 @@ class QEMUMonitorProtocol:
# Implement context manager exit function.
self.close()
def connect(self, negotiate=True):
def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
"""
Connect to the QMP Monitor and perform capabilities negotiation.
@ -214,7 +224,7 @@ class QEMUMonitorProtocol:
return self.__negotiate_capabilities()
return None
def accept(self, timeout=15.0):
def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
"""
Await connection from QMP Monitor and perform capabilities negotiation.
@ -250,7 +260,9 @@ class QEMUMonitorProtocol:
self.logger.debug("<<< %s", resp)
return resp
def cmd(self, name, args=None, cmd_id=None):
def cmd(self, name: str,
args: Optional[Dict[str, Any]] = None,
cmd_id: Optional[Any] = None) -> QMPMessage:
"""
Build a QMP command and send it to the QMP Monitor.
@ -258,14 +270,14 @@ class QEMUMonitorProtocol:
@param args: command arguments (dict)
@param cmd_id: command id (dict, list, string or int)
"""
qmp_cmd = {'execute': name}
qmp_cmd: QMPMessage = {'execute': name}
if args:
qmp_cmd['arguments'] = args
if cmd_id:
qmp_cmd['id'] = cmd_id
return self.cmd_obj(qmp_cmd)
def command(self, cmd, **kwds):
def command(self, cmd: str, **kwds: Any) -> QMPReturnValue:
"""
Build and send a QMP command to the monitor, report errors if any
"""
@ -278,7 +290,8 @@ class QEMUMonitorProtocol:
)
return cast(QMPReturnValue, ret['return'])
def pull_event(self, wait=False):
def pull_event(self,
wait: Union[bool, float] = False) -> Optional[QMPMessage]:
"""
Pulls a single event.
@ -298,7 +311,7 @@ class QEMUMonitorProtocol:
return self.__events.pop(0)
return None
def get_events(self, wait=False):
def get_events(self, wait: bool = False) -> List[QMPMessage]:
"""
Get a list of available QMP events.
@ -315,13 +328,13 @@ class QEMUMonitorProtocol:
self.__get_events(wait)
return self.__events
def clear_events(self):
def clear_events(self) -> None:
"""
Clear current list of pending events.
"""
self.__events = []
def close(self):
def close(self) -> None:
"""
Close the socket and socket file.
"""
@ -330,16 +343,22 @@ class QEMUMonitorProtocol:
if self.__sockfile:
self.__sockfile.close()
def settimeout(self, timeout):
def settimeout(self, timeout: Optional[float]) -> None:
"""
Set the socket timeout.
@param timeout (float): timeout in seconds, or None.
@param timeout (float): timeout in seconds (non-zero), or None.
@note This is a wrap around socket.settimeout
@raise ValueError: if timeout was set to 0.
"""
if timeout == 0:
msg = "timeout cannot be 0; this engages non-blocking mode."
msg += " Use 'None' instead to disable timeouts."
raise ValueError(msg)
self.__sock.settimeout(timeout)
def get_sock_fd(self):
def get_sock_fd(self) -> int:
"""
Get the socket file descriptor.
@ -347,7 +366,7 @@ class QEMUMonitorProtocol:
"""
return self.__sock.fileno()
def is_scm_available(self):
def is_scm_available(self) -> bool:
"""
Check if the socket allows for SCM_RIGHTS.

View File

@ -17,11 +17,17 @@ subclass of QEMUMachine, respectively.
# Based on qmp.py.
#
import socket
import os
from typing import Optional, TextIO
import socket
from typing import (
List,
Optional,
Sequence,
TextIO,
)
from .machine import QEMUMachine
from .qmp import SocketAddrT
class QEMUQtestProtocol:
@ -38,7 +44,8 @@ class QEMUQtestProtocol:
No conection is estabalished by __init__(), this is done
by the connect() or accept() methods.
"""
def __init__(self, address, server=False):
def __init__(self, address: SocketAddrT,
server: bool = False):
self._address = address
self._sock = self._get_sock()
self._sockfile: Optional[TextIO] = None
@ -46,14 +53,14 @@ class QEMUQtestProtocol:
self._sock.bind(self._address)
self._sock.listen(1)
def _get_sock(self):
def _get_sock(self) -> socket.socket:
if isinstance(self._address, tuple):
family = socket.AF_INET
else:
family = socket.AF_UNIX
return socket.socket(family, socket.SOCK_STREAM)
def connect(self):
def connect(self) -> None:
"""
Connect to the qtest socket.
@ -62,7 +69,7 @@ class QEMUQtestProtocol:
self._sock.connect(self._address)
self._sockfile = self._sock.makefile(mode='r')
def accept(self):
def accept(self) -> None:
"""
Await connection from QEMU.
@ -71,7 +78,7 @@ class QEMUQtestProtocol:
self._sock, _ = self._sock.accept()
self._sockfile = self._sock.makefile(mode='r')
def cmd(self, qtest_cmd):
def cmd(self, qtest_cmd: str) -> str:
"""
Send a qtest command on the wire.
@ -82,14 +89,16 @@ class QEMUQtestProtocol:
resp = self._sockfile.readline()
return resp
def close(self):
"""Close this socket."""
def close(self) -> None:
"""
Close this socket.
"""
self._sock.close()
if self._sockfile:
self._sockfile.close()
self._sockfile = None
def settimeout(self, timeout):
def settimeout(self, timeout: Optional[float]) -> None:
"""Set a timeout, in seconds."""
self._sock.settimeout(timeout)
@ -99,8 +108,13 @@ class QEMUQtestMachine(QEMUMachine):
A QEMU VM, with a qtest socket available.
"""
def __init__(self, binary, args=None, name=None, test_dir="/var/tmp",
socket_scm_helper=None, sock_dir=None):
def __init__(self,
binary: str,
args: Sequence[str] = (),
name: Optional[str] = None,
test_dir: str = "/var/tmp",
socket_scm_helper: Optional[str] = None,
sock_dir: Optional[str] = None):
if name is None:
name = "qemu-%d" % os.getpid()
if sock_dir is None:
@ -108,16 +122,19 @@ class QEMUQtestMachine(QEMUMachine):
super().__init__(binary, args, name=name, test_dir=test_dir,
socket_scm_helper=socket_scm_helper,
sock_dir=sock_dir)
self._qtest = None
self._qtest: Optional[QEMUQtestProtocol] = None
self._qtest_path = os.path.join(sock_dir, name + "-qtest.sock")
def _base_args(self):
args = super()._base_args()
args.extend(['-qtest', 'unix:path=' + self._qtest_path,
'-accel', 'qtest'])
@property
def _base_args(self) -> List[str]:
args = super()._base_args
args.extend([
'-qtest', f"unix:path={self._qtest_path}",
'-accel', 'qtest'
])
return args
def _pre_launch(self):
def _pre_launch(self) -> None:
super()._pre_launch()
self._qtest = QEMUQtestProtocol(self._qtest_path, server=True)
@ -126,7 +143,7 @@ class QEMUQtestMachine(QEMUMachine):
super()._post_launch()
self._qtest.accept()
def _post_shutdown(self):
def _post_shutdown(self) -> None:
super()._post_shutdown()
self._remove_if_exists(self._qtest_path)

View File

@ -605,7 +605,7 @@ class VM(qtest.QEMUQtestMachine):
def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
cmd = 'human-monitor-command'
kwargs = {'command-line': command_line}
kwargs: Dict[str, Any] = {'command-line': command_line}
if use_log:
return self.qmp_log(cmd, **kwargs)
else: