qemu/scripts/qmp/qom-fuse
Markus Armbruster f713ed4f7e scripts/qmp/qom-fuse: Port to current Python module fuse
Signed-off-by: Markus Armbruster <armbru@redhat.com>
Message-Id: <20200723142738.1868568-3-armbru@redhat.com>
Reviewed-by: John Snow <jsnow@redhat.com>
2020-09-03 09:43:50 +02:00

142 lines
4.3 KiB
Python
Executable File

#!/usr/bin/env python3
##
# QEMU Object Model test tools
#
# Copyright IBM, Corp. 2012
# Copyright (C) 2020 Red Hat, Inc.
#
# Authors:
# Anthony Liguori <aliguori@us.ibm.com>
# Markus Armbruster <armbru@redhat.com>
#
# This work is licensed under the terms of the GNU GPL, version 2 or later. See
# the COPYING file in the top-level directory.
##
import fuse, stat
from fuse import FUSE, FuseOSError, Operations
import os, posix, sys
from errno import *
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
from qemu.qmp import QEMUMonitorProtocol
fuse.fuse_python_api = (0, 2)
class QOMFS(Operations):
def __init__(self, qmp):
self.qmp = qmp
self.qmp.connect()
self.ino_map = {}
self.ino_count = 1
def get_ino(self, path):
if path in self.ino_map:
return self.ino_map[path]
self.ino_map[path] = self.ino_count
self.ino_count += 1
return self.ino_map[path]
def is_object(self, path):
try:
items = self.qmp.command('qom-list', path=path)
return True
except:
return False
def is_property(self, path):
try:
path, prop = path.rsplit('/', 1)
for item in self.qmp.command('qom-list', path=path):
if item['name'] == prop:
return True
return False
except:
return False
def is_link(self, path):
try:
path, prop = path.rsplit('/', 1)
for item in self.qmp.command('qom-list', path=path):
if item['name'] == prop:
if item['type'].startswith('link<'):
return True
return False
return False
except:
return False
def read(self, path, length, offset, fh):
if not self.is_property(path):
return -ENOENT
path, prop = path.rsplit('/', 1)
try:
data = self.qmp.command('qom-get', path=path, property=prop)
data += '\n' # make values shell friendly
except:
raise FuseOSError(EPERM)
if offset > len(data):
return ''
return bytes(data[offset:][:length], encoding='utf-8')
def readlink(self, path):
if not self.is_link(path):
return False
path, prop = path.rsplit('/', 1)
prefix = '/'.join(['..'] * (len(path.split('/')) - 1))
return prefix + str(self.qmp.command('qom-get', path=path,
property=prop))
def getattr(self, path, fh=None):
if self.is_link(path):
value = { 'st_mode': 0o755 | stat.S_IFLNK,
'st_ino': self.get_ino(path),
'st_dev': 0,
'st_nlink': 2,
'st_uid': 1000,
'st_gid': 1000,
'st_size': 4096,
'st_atime': 0,
'st_mtime': 0,
'st_ctime': 0 }
elif self.is_object(path):
value = { 'st_mode': 0o755 | stat.S_IFDIR,
'st_ino': self.get_ino(path),
'st_dev': 0,
'st_nlink': 2,
'st_uid': 1000,
'st_gid': 1000,
'st_size': 4096,
'st_atime': 0,
'st_mtime': 0,
'st_ctime': 0 }
elif self.is_property(path):
value = { 'st_mode': 0o644 | stat.S_IFREG,
'st_ino': self.get_ino(path),
'st_dev': 0,
'st_nlink': 1,
'st_uid': 1000,
'st_gid': 1000,
'st_size': 4096,
'st_atime': 0,
'st_mtime': 0,
'st_ctime': 0 }
else:
raise FuseOSError(ENOENT)
return value
def readdir(self, path, fh):
yield '.'
yield '..'
for item in self.qmp.command('qom-list', path=path):
yield str(item['name'])
if __name__ == '__main__':
import os
fuse = FUSE(QOMFS(QEMUMonitorProtocol(os.environ['QMP_SOCKET'])),
sys.argv[1], foreground=True)