qemu/scripts/qmp/qmp-shell
John Snow a64fe44d5a scripts/qmp-shell: remove double-underscores
They're not needed; single underscore is enough to express intent that
these methods are "internal". double underscore is used as a weak name
mangling, but that isn't beneficial for us here.

Signed-off-by: John Snow <jsnow@redhat.com>
Message-id: 20210607200649.1840382-38-jsnow@redhat.com
Signed-off-by: John Snow <jsnow@redhat.com>
2021-06-18 16:10:07 -04:00

502 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
#
# Copyright (C) 2009, 2010 Red Hat Inc.
#
# Authors:
# Luiz Capitulino <lcapitulino@redhat.com>
#
# This work is licensed under the terms of the GNU GPL, version 2. See
# the COPYING file in the top-level directory.
#
"""
Low-level QEMU shell on top of QMP.
usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
positional arguments:
qmp_server < UNIX socket path | TCP address:port >
optional arguments:
-h, --help show this help message and exit
-H, --hmp Use HMP interface
-N, --skip-negotiation
Skip negotiate (for qemu-ga)
-v, --verbose Verbose (echo commands sent and received)
-p, --pretty Pretty-print JSON
Start QEMU with:
# qemu [...] -qmp unix:./qmp-sock,server
Run the shell:
$ qmp-shell ./qmp-sock
Commands have the following format:
< command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
For example:
(QEMU) device_add driver=e1000 id=net1
{'return': {}}
(QEMU)
key=value pairs also support Python or JSON object literal subset notations,
without spaces. Dictionaries/objects {} are supported as are arrays [].
example-command arg-name1={'key':'value','obj'={'prop':"value"}}
Both JSON and Python formatting should work, including both styles of
string literal quotes. Both paradigms of literal values should work,
including null/true/false for JSON and None/True/False for Python.
Transactions have the following multi-line format:
transaction(
action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
...
action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
)
One line transactions are also supported:
transaction( action-name1 ... )
For example:
(QEMU) transaction(
TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
TRANS> )
{"return": {}}
(QEMU)
Use the -v and -p options to activate the verbose and pretty-print options,
which will echo back the properly formatted JSON-compliant QMP that is being
sent to QEMU, which is useful for debugging and documentation generation.
"""
import argparse
import ast
import json
import logging
import os
import re
import readline
import sys
from typing import (
Iterator,
List,
NoReturn,
Optional,
Sequence,
)
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
from qemu import qmp
from qemu.qmp import QMPMessage
LOG = logging.getLogger(__name__)
class QMPCompleter:
# NB: Python 3.9+ will probably allow us to subclass list[str] directly,
# but pylint as of today does not know that List[str] is simply 'list'.
def __init__(self) -> None:
self._matches: List[str] = []
def append(self, value: str) -> None:
return self._matches.append(value)
def complete(self, text: str, state: int) -> Optional[str]:
for cmd in self._matches:
if cmd.startswith(text):
if state == 0:
return cmd
state -= 1
return None
class QMPShellError(Exception):
pass
class FuzzyJSON(ast.NodeTransformer):
"""
This extension of ast.NodeTransformer filters literal "true/false/null"
values in a Python AST and replaces them by proper "True/False/None" values
that Python can properly evaluate.
"""
@classmethod
def visit_Name(cls, # pylint: disable=invalid-name
node: ast.Name) -> ast.AST:
if node.id == 'true':
return ast.Constant(value=True)
if node.id == 'false':
return ast.Constant(value=False)
if node.id == 'null':
return ast.Constant(value=None)
return node
class QMPShell(qmp.QEMUMonitorProtocol):
def __init__(self, address: qmp.SocketAddrT,
pretty: bool = False, verbose: bool = False):
super().__init__(address)
self._greeting: Optional[QMPMessage] = None
self._completer = QMPCompleter()
self._transmode = False
self._actions: List[QMPMessage] = []
self._histfile = os.path.join(os.path.expanduser('~'),
'.qmp-shell_history')
self.pretty = pretty
self.verbose = verbose
def close(self) -> None:
# Hook into context manager of parent to save shell history.
self._save_history()
super().close()
def _fill_completion(self) -> None:
cmds = self.cmd('query-commands')
if 'error' in cmds:
return
for cmd in cmds['return']:
self._completer.append(cmd['name'])
def _completer_setup(self) -> None:
self._completer = QMPCompleter()
self._fill_completion()
readline.set_history_length(1024)
readline.set_completer(self._completer.complete)
readline.parse_and_bind("tab: complete")
# NB: default delimiters conflict with some command names
# (eg. query-), clearing everything as it doesn't seem to matter
readline.set_completer_delims('')
try:
readline.read_history_file(self._histfile)
except FileNotFoundError:
pass
except IOError as err:
msg = f"Failed to read history '{self._histfile}': {err!s}"
LOG.warning(msg)
def _save_history(self) -> None:
try:
readline.write_history_file(self._histfile)
except IOError as err:
msg = f"Failed to save history file '{self._histfile}': {err!s}"
LOG.warning(msg)
@classmethod
def _parse_value(cls, val: str) -> object:
try:
return int(val)
except ValueError:
pass
if val.lower() == 'true':
return True
if val.lower() == 'false':
return False
if val.startswith(('{', '[')):
# Try first as pure JSON:
try:
return json.loads(val)
except ValueError:
pass
# Try once again as FuzzyJSON:
try:
tree = ast.parse(val, mode='eval')
transformed = FuzzyJSON().visit(tree)
return ast.literal_eval(transformed)
except (SyntaxError, ValueError):
pass
return val
def _cli_expr(self,
tokens: Sequence[str],
parent: qmp.QMPObject) -> None:
for arg in tokens:
(key, sep, val) = arg.partition('=')
if sep != '=':
raise QMPShellError(
f"Expected a key=value pair, got '{arg!s}'"
)
value = self._parse_value(val)
optpath = key.split('.')
curpath = []
for path in optpath[:-1]:
curpath.append(path)
obj = parent.get(path, {})
if not isinstance(obj, dict):
msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
raise QMPShellError(msg.format('.'.join(curpath)))
parent[path] = obj
parent = obj
if optpath[-1] in parent:
if isinstance(parent[optpath[-1]], dict):
msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
raise QMPShellError(msg.format('.'.join(curpath)))
raise QMPShellError(f'Cannot set "{key}" multiple times')
parent[optpath[-1]] = value
def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
"""
Build a QMP input object from a user provided command-line in the
following format:
< command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
"""
argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
cmdargs = re.findall(argument_regex, cmdline)
qmpcmd: QMPMessage
# Transactional CLI entry:
if cmdargs and cmdargs[0] == 'transaction(':
self._transmode = True
self._actions = []
cmdargs.pop(0)
# Transactional CLI exit:
if cmdargs and cmdargs[0] == ')' and self._transmode:
self._transmode = False
if len(cmdargs) > 1:
msg = 'Unexpected input after close of Transaction sub-shell'
raise QMPShellError(msg)
qmpcmd = {
'execute': 'transaction',
'arguments': {'actions': self._actions}
}
return qmpcmd
# No args, or no args remaining
if not cmdargs:
return None
if self._transmode:
# Parse and cache this Transactional Action
finalize = False
action = {'type': cmdargs[0], 'data': {}}
if cmdargs[-1] == ')':
cmdargs.pop(-1)
finalize = True
self._cli_expr(cmdargs[1:], action['data'])
self._actions.append(action)
return self._build_cmd(')') if finalize else None
# Standard command: parse and return it to be executed.
qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
self._cli_expr(cmdargs[1:], qmpcmd['arguments'])
return qmpcmd
def _print(self, qmp_message: object) -> None:
jsobj = json.dumps(qmp_message,
indent=4 if self.pretty else None,
sort_keys=self.pretty)
print(str(jsobj))
def _execute_cmd(self, cmdline: str) -> bool:
try:
qmpcmd = self._build_cmd(cmdline)
except QMPShellError as err:
print(
f"Error while parsing command line: {err!s}\n"
"command format: <command-name> "
"[arg-name1=arg1] ... [arg-nameN=argN",
file=sys.stderr
)
return True
# For transaction mode, we may have just cached the action:
if qmpcmd is None:
return True
if self.verbose:
self._print(qmpcmd)
resp = self.cmd_obj(qmpcmd)
if resp is None:
print('Disconnected')
return False
self._print(resp)
return True
def connect(self, negotiate: bool = True) -> None:
self._greeting = super().connect(negotiate)
self._completer_setup()
def show_banner(self,
msg: str = 'Welcome to the QMP low-level shell!') -> None:
print(msg)
if not self._greeting:
print('Connected')
return
version = self._greeting['QMP']['version']['qemu']
print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
@property
def prompt(self) -> str:
if self._transmode:
return 'TRANS> '
return '(QEMU) '
def read_exec_command(self) -> bool:
"""
Read and execute a command.
@return True if execution was ok, return False if disconnected.
"""
try:
cmdline = input(self.prompt)
except EOFError:
print()
return False
if cmdline == '':
for event in self.get_events():
print(event)
self.clear_events()
return True
return self._execute_cmd(cmdline)
def repl(self) -> Iterator[None]:
self.show_banner()
while self.read_exec_command():
yield
self.close()
class HMPShell(QMPShell):
def __init__(self, address: qmp.SocketAddrT,
pretty: bool = False, verbose: bool = False):
super().__init__(address, pretty, verbose)
self._cpu_index = 0
def _cmd_completion(self) -> None:
for cmd in self._cmd_passthrough('help')['return'].split('\r\n'):
if cmd and cmd[0] != '[' and cmd[0] != '\t':
name = cmd.split()[0] # drop help text
if name == 'info':
continue
if name.find('|') != -1:
# Command in the form 'foobar|f' or 'f|foobar', take the
# full name
opt = name.split('|')
if len(opt[0]) == 1:
name = opt[1]
else:
name = opt[0]
self._completer.append(name)
self._completer.append('help ' + name) # help completion
def _info_completion(self) -> None:
for cmd in self._cmd_passthrough('info')['return'].split('\r\n'):
if cmd:
self._completer.append('info ' + cmd.split()[1])
def _other_completion(self) -> None:
# special cases
self._completer.append('help info')
def _fill_completion(self) -> None:
self._cmd_completion()
self._info_completion()
self._other_completion()
def _cmd_passthrough(self, cmdline: str,
cpu_index: int = 0) -> QMPMessage:
return self.cmd_obj({
'execute': 'human-monitor-command',
'arguments': {
'command-line': cmdline,
'cpu-index': cpu_index
}
})
def _execute_cmd(self, cmdline: str) -> bool:
if cmdline.split()[0] == "cpu":
# trap the cpu command, it requires special setting
try:
idx = int(cmdline.split()[1])
if 'return' not in self._cmd_passthrough('info version', idx):
print('bad CPU index')
return True
self._cpu_index = idx
except ValueError:
print('cpu command takes an integer argument')
return True
resp = self._cmd_passthrough(cmdline, self._cpu_index)
if resp is None:
print('Disconnected')
return False
assert 'return' in resp or 'error' in resp
if 'return' in resp:
# Success
if len(resp['return']) > 0:
print(resp['return'], end=' ')
else:
# Error
print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
return True
def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
QMPShell.show_banner(self, msg)
def die(msg: str) -> NoReturn:
sys.stderr.write('ERROR: %s\n' % msg)
sys.exit(1)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--hmp', action='store_true',
help='Use HMP interface')
parser.add_argument('-N', '--skip-negotiation', action='store_true',
help='Skip negotiate (for qemu-ga)')
parser.add_argument('-v', '--verbose', action='store_true',
help='Verbose (echo commands sent and received)')
parser.add_argument('-p', '--pretty', action='store_true',
help='Pretty-print JSON')
default_server = os.environ.get('QMP_SOCKET')
parser.add_argument('qmp_server', action='store',
default=default_server,
help='< UNIX socket path | TCP address:port >')
args = parser.parse_args()
if args.qmp_server is None:
parser.error("QMP socket or TCP address must be specified")
shell_class = HMPShell if args.hmp else QMPShell
try:
address = shell_class.parse_address(args.qmp_server)
except qmp.QMPBadPortError:
parser.error(f"Bad port number: {args.qmp_server}")
return # pycharm doesn't know error() is noreturn
with shell_class(address, args.pretty, args.verbose) as qemu:
try:
qemu.connect(negotiate=not args.skip_negotiation)
except qmp.QMPConnectError:
die("Didn't get QMP greeting message")
except qmp.QMPCapabilitiesError:
die("Couldn't negotiate capabilities")
except OSError as err:
die(f"Couldn't connect to {args.qmp_server}: {err!s}")
for _ in qemu.repl():
pass
if __name__ == '__main__':
main()