060b5a9323
QAPISchema.lookup_entity() takes an optional type argument, a subtype of QAPISchemaDefinition, and returns that type or None. Callers can use this to save themselves an isinstance() test. The only remaining user of this convenience feature is .lookup_type(). But we don't actually save anything anymore there: we still need the isinstance() to help mypy over the hump. Drop the .lookup_entity() argument, and adjust .lookup_type(). Signed-off-by: Markus Armbruster <armbru@redhat.com> Message-ID: <20240315152301.3621858-26-armbru@redhat.com> Reviewed-by: John Snow <jsnow@redhat.com> [Commit message typo fixed]
1484 lines
50 KiB
Python
1484 lines
50 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# QAPI schema internal representation
|
|
#
|
|
# Copyright (c) 2015-2019 Red Hat Inc.
|
|
#
|
|
# Authors:
|
|
# Markus Armbruster <armbru@redhat.com>
|
|
# Eric Blake <eblake@redhat.com>
|
|
# Marc-André Lureau <marcandre.lureau@redhat.com>
|
|
#
|
|
# This work is licensed under the terms of the GNU GPL, version 2.
|
|
# See the COPYING file in the top-level directory.
|
|
|
|
# pylint: disable=too-many-lines
|
|
|
|
# TODO catching name collisions in generated code would be nice
|
|
|
|
from __future__ import annotations
|
|
|
|
from abc import ABC, abstractmethod
|
|
from collections import OrderedDict
|
|
import os
|
|
import re
|
|
from typing import (
|
|
Any,
|
|
Callable,
|
|
Dict,
|
|
List,
|
|
Optional,
|
|
Union,
|
|
cast,
|
|
)
|
|
|
|
from .common import (
|
|
POINTER_SUFFIX,
|
|
c_name,
|
|
cgen_ifcond,
|
|
docgen_ifcond,
|
|
gen_endif,
|
|
gen_if,
|
|
)
|
|
from .error import QAPIError, QAPISemError, QAPISourceError
|
|
from .expr import check_exprs
|
|
from .parser import QAPIDoc, QAPIExpression, QAPISchemaParser
|
|
from .source import QAPISourceInfo
|
|
|
|
|
|
class QAPISchemaIfCond:
|
|
def __init__(
|
|
self,
|
|
ifcond: Optional[Union[str, Dict[str, object]]] = None,
|
|
) -> None:
|
|
self.ifcond = ifcond
|
|
|
|
def _cgen(self) -> str:
|
|
return cgen_ifcond(self.ifcond)
|
|
|
|
def gen_if(self) -> str:
|
|
return gen_if(self._cgen())
|
|
|
|
def gen_endif(self) -> str:
|
|
return gen_endif(self._cgen())
|
|
|
|
def docgen(self) -> str:
|
|
return docgen_ifcond(self.ifcond)
|
|
|
|
def is_present(self) -> bool:
|
|
return bool(self.ifcond)
|
|
|
|
|
|
class QAPISchemaEntity:
|
|
"""
|
|
A schema entity.
|
|
|
|
This is either a directive, such as include, or a definition.
|
|
The latter uses sub-class `QAPISchemaDefinition`.
|
|
"""
|
|
def __init__(self, info: Optional[QAPISourceInfo]):
|
|
self._module: Optional[QAPISchemaModule] = None
|
|
# For explicitly defined entities, info points to the (explicit)
|
|
# definition. For builtins (and their arrays), info is None.
|
|
# For implicitly defined entities, info points to a place that
|
|
# triggered the implicit definition (there may be more than one
|
|
# such place).
|
|
self.info = info
|
|
self._checked = False
|
|
|
|
def __repr__(self) -> str:
|
|
return "<%s at 0x%x>" % (type(self).__name__, id(self))
|
|
|
|
def check(self, schema: QAPISchema) -> None:
|
|
# pylint: disable=unused-argument
|
|
self._checked = True
|
|
|
|
def connect_doc(self, doc: Optional[QAPIDoc] = None) -> None:
|
|
pass
|
|
|
|
def _set_module(
|
|
self, schema: QAPISchema, info: Optional[QAPISourceInfo]
|
|
) -> None:
|
|
assert self._checked
|
|
fname = info.fname if info else QAPISchemaModule.BUILTIN_MODULE_NAME
|
|
self._module = schema.module_by_fname(fname)
|
|
self._module.add_entity(self)
|
|
|
|
def set_module(self, schema: QAPISchema) -> None:
|
|
self._set_module(schema, self.info)
|
|
|
|
def visit(self, visitor: QAPISchemaVisitor) -> None:
|
|
# pylint: disable=unused-argument
|
|
assert self._checked
|
|
|
|
|
|
class QAPISchemaDefinition(QAPISchemaEntity):
|
|
meta: str
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
info: Optional[QAPISourceInfo],
|
|
doc: Optional[QAPIDoc],
|
|
ifcond: Optional[QAPISchemaIfCond] = None,
|
|
features: Optional[List[QAPISchemaFeature]] = None,
|
|
):
|
|
super().__init__(info)
|
|
for f in features or []:
|
|
f.set_defined_in(name)
|
|
self.name = name
|
|
self.doc = doc
|
|
self._ifcond = ifcond or QAPISchemaIfCond()
|
|
self.features = features or []
|
|
|
|
def __repr__(self) -> str:
|
|
return "<%s:%s at 0x%x>" % (type(self).__name__, self.name,
|
|
id(self))
|
|
|
|
def c_name(self) -> str:
|
|
return c_name(self.name)
|
|
|
|
def check(self, schema: QAPISchema) -> None:
|
|
assert not self._checked
|
|
super().check(schema)
|
|
seen: Dict[str, QAPISchemaMember] = {}
|
|
for f in self.features:
|
|
f.check_clash(self.info, seen)
|
|
|
|
def connect_doc(self, doc: Optional[QAPIDoc] = None) -> None:
|
|
super().connect_doc(doc)
|
|
doc = doc or self.doc
|
|
if doc:
|
|
for f in self.features:
|
|
doc.connect_feature(f)
|
|
|
|
@property
|
|
def ifcond(self) -> QAPISchemaIfCond:
|
|
assert self._checked
|
|
return self._ifcond
|
|
|
|
def is_implicit(self) -> bool:
|
|
return not self.info
|
|
|
|
def describe(self) -> str:
|
|
return "%s '%s'" % (self.meta, self.name)
|
|
|
|
|
|
class QAPISchemaVisitor:
|
|
def visit_begin(self, schema: QAPISchema) -> None:
|
|
pass
|
|
|
|
def visit_end(self) -> None:
|
|
pass
|
|
|
|
def visit_module(self, name: str) -> None:
|
|
pass
|
|
|
|
def visit_needed(self, entity: QAPISchemaEntity) -> bool:
|
|
# pylint: disable=unused-argument
|
|
# Default to visiting everything
|
|
return True
|
|
|
|
def visit_include(self, name: str, info: Optional[QAPISourceInfo]) -> None:
|
|
pass
|
|
|
|
def visit_builtin_type(
|
|
self, name: str, info: Optional[QAPISourceInfo], json_type: str
|
|
) -> None:
|
|
pass
|
|
|
|
def visit_enum_type(
|
|
self,
|
|
name: str,
|
|
info: Optional[QAPISourceInfo],
|
|
ifcond: QAPISchemaIfCond,
|
|
features: List[QAPISchemaFeature],
|
|
members: List[QAPISchemaEnumMember],
|
|
prefix: Optional[str],
|
|
) -> None:
|
|
pass
|
|
|
|
def visit_array_type(
|
|
self,
|
|
name: str,
|
|
info: Optional[QAPISourceInfo],
|
|
ifcond: QAPISchemaIfCond,
|
|
element_type: QAPISchemaType,
|
|
) -> None:
|
|
pass
|
|
|
|
def visit_object_type(
|
|
self,
|
|
name: str,
|
|
info: Optional[QAPISourceInfo],
|
|
ifcond: QAPISchemaIfCond,
|
|
features: List[QAPISchemaFeature],
|
|
base: Optional[QAPISchemaObjectType],
|
|
members: List[QAPISchemaObjectTypeMember],
|
|
variants: Optional[QAPISchemaVariants],
|
|
) -> None:
|
|
pass
|
|
|
|
def visit_object_type_flat(
|
|
self,
|
|
name: str,
|
|
info: Optional[QAPISourceInfo],
|
|
ifcond: QAPISchemaIfCond,
|
|
features: List[QAPISchemaFeature],
|
|
members: List[QAPISchemaObjectTypeMember],
|
|
variants: Optional[QAPISchemaVariants],
|
|
) -> None:
|
|
pass
|
|
|
|
def visit_alternate_type(
|
|
self,
|
|
name: str,
|
|
info: Optional[QAPISourceInfo],
|
|
ifcond: QAPISchemaIfCond,
|
|
features: List[QAPISchemaFeature],
|
|
variants: QAPISchemaVariants,
|
|
) -> None:
|
|
pass
|
|
|
|
def visit_command(
|
|
self,
|
|
name: str,
|
|
info: Optional[QAPISourceInfo],
|
|
ifcond: QAPISchemaIfCond,
|
|
features: List[QAPISchemaFeature],
|
|
arg_type: Optional[QAPISchemaObjectType],
|
|
ret_type: Optional[QAPISchemaType],
|
|
gen: bool,
|
|
success_response: bool,
|
|
boxed: bool,
|
|
allow_oob: bool,
|
|
allow_preconfig: bool,
|
|
coroutine: bool,
|
|
) -> None:
|
|
pass
|
|
|
|
def visit_event(
|
|
self,
|
|
name: str,
|
|
info: Optional[QAPISourceInfo],
|
|
ifcond: QAPISchemaIfCond,
|
|
features: List[QAPISchemaFeature],
|
|
arg_type: Optional[QAPISchemaObjectType],
|
|
boxed: bool,
|
|
) -> None:
|
|
pass
|
|
|
|
|
|
class QAPISchemaModule:
|
|
|
|
BUILTIN_MODULE_NAME = './builtin'
|
|
|
|
def __init__(self, name: str):
|
|
self.name = name
|
|
self._entity_list: List[QAPISchemaEntity] = []
|
|
|
|
@staticmethod
|
|
def is_system_module(name: str) -> bool:
|
|
"""
|
|
System modules are internally defined modules.
|
|
|
|
Their names start with the "./" prefix.
|
|
"""
|
|
return name.startswith('./')
|
|
|
|
@classmethod
|
|
def is_user_module(cls, name: str) -> bool:
|
|
"""
|
|
User modules are those defined by the user in qapi JSON files.
|
|
|
|
They do not start with the "./" prefix.
|
|
"""
|
|
return not cls.is_system_module(name)
|
|
|
|
@classmethod
|
|
def is_builtin_module(cls, name: str) -> bool:
|
|
"""
|
|
The built-in module is a single System module for the built-in types.
|
|
|
|
It is always "./builtin".
|
|
"""
|
|
return name == cls.BUILTIN_MODULE_NAME
|
|
|
|
def add_entity(self, ent: QAPISchemaEntity) -> None:
|
|
self._entity_list.append(ent)
|
|
|
|
def visit(self, visitor: QAPISchemaVisitor) -> None:
|
|
visitor.visit_module(self.name)
|
|
for entity in self._entity_list:
|
|
if visitor.visit_needed(entity):
|
|
entity.visit(visitor)
|
|
|
|
|
|
class QAPISchemaInclude(QAPISchemaEntity):
|
|
def __init__(self, sub_module: QAPISchemaModule, info: QAPISourceInfo):
|
|
super().__init__(info)
|
|
self._sub_module = sub_module
|
|
|
|
def visit(self, visitor: QAPISchemaVisitor) -> None:
|
|
super().visit(visitor)
|
|
visitor.visit_include(self._sub_module.name, self.info)
|
|
|
|
|
|
class QAPISchemaType(QAPISchemaDefinition, ABC):
|
|
# Return the C type for common use.
|
|
# For the types we commonly box, this is a pointer type.
|
|
@abstractmethod
|
|
def c_type(self) -> str:
|
|
pass
|
|
|
|
# Return the C type to be used in a parameter list.
|
|
def c_param_type(self) -> str:
|
|
return self.c_type()
|
|
|
|
# Return the C type to be used where we suppress boxing.
|
|
def c_unboxed_type(self) -> str:
|
|
return self.c_type()
|
|
|
|
@abstractmethod
|
|
def json_type(self) -> str:
|
|
pass
|
|
|
|
def alternate_qtype(self) -> Optional[str]:
|
|
json2qtype = {
|
|
'null': 'QTYPE_QNULL',
|
|
'string': 'QTYPE_QSTRING',
|
|
'number': 'QTYPE_QNUM',
|
|
'int': 'QTYPE_QNUM',
|
|
'boolean': 'QTYPE_QBOOL',
|
|
'array': 'QTYPE_QLIST',
|
|
'object': 'QTYPE_QDICT'
|
|
}
|
|
return json2qtype.get(self.json_type())
|
|
|
|
def doc_type(self) -> Optional[str]:
|
|
if self.is_implicit():
|
|
return None
|
|
return self.name
|
|
|
|
def need_has_if_optional(self) -> bool:
|
|
# When FOO is a pointer, has_FOO == !!FOO, i.e. has_FOO is redundant.
|
|
# Except for arrays; see QAPISchemaArrayType.need_has_if_optional().
|
|
return not self.c_type().endswith(POINTER_SUFFIX)
|
|
|
|
def check(self, schema: QAPISchema) -> None:
|
|
super().check(schema)
|
|
for feat in self.features:
|
|
if feat.is_special():
|
|
raise QAPISemError(
|
|
self.info,
|
|
f"feature '{feat.name}' is not supported for types")
|
|
|
|
def describe(self) -> str:
|
|
return "%s type '%s'" % (self.meta, self.name)
|
|
|
|
|
|
class QAPISchemaBuiltinType(QAPISchemaType):
|
|
meta = 'built-in'
|
|
|
|
def __init__(self, name: str, json_type: str, c_type: str):
|
|
super().__init__(name, None, None)
|
|
assert json_type in ('string', 'number', 'int', 'boolean', 'null',
|
|
'value')
|
|
self._json_type_name = json_type
|
|
self._c_type_name = c_type
|
|
|
|
def c_name(self) -> str:
|
|
return self.name
|
|
|
|
def c_type(self) -> str:
|
|
return self._c_type_name
|
|
|
|
def c_param_type(self) -> str:
|
|
if self.name == 'str':
|
|
return 'const ' + self._c_type_name
|
|
return self._c_type_name
|
|
|
|
def json_type(self) -> str:
|
|
return self._json_type_name
|
|
|
|
def doc_type(self) -> str:
|
|
return self.json_type()
|
|
|
|
def visit(self, visitor: QAPISchemaVisitor) -> None:
|
|
super().visit(visitor)
|
|
visitor.visit_builtin_type(self.name, self.info, self.json_type())
|
|
|
|
|
|
class QAPISchemaEnumType(QAPISchemaType):
|
|
meta = 'enum'
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
info: Optional[QAPISourceInfo],
|
|
doc: Optional[QAPIDoc],
|
|
ifcond: Optional[QAPISchemaIfCond],
|
|
features: Optional[List[QAPISchemaFeature]],
|
|
members: List[QAPISchemaEnumMember],
|
|
prefix: Optional[str],
|
|
):
|
|
super().__init__(name, info, doc, ifcond, features)
|
|
for m in members:
|
|
m.set_defined_in(name)
|
|
self.members = members
|
|
self.prefix = prefix
|
|
|
|
def check(self, schema: QAPISchema) -> None:
|
|
super().check(schema)
|
|
seen: Dict[str, QAPISchemaMember] = {}
|
|
for m in self.members:
|
|
m.check_clash(self.info, seen)
|
|
|
|
def connect_doc(self, doc: Optional[QAPIDoc] = None) -> None:
|
|
super().connect_doc(doc)
|
|
doc = doc or self.doc
|
|
for m in self.members:
|
|
m.connect_doc(doc)
|
|
|
|
def is_implicit(self) -> bool:
|
|
# See QAPISchema._def_predefineds()
|
|
return self.name == 'QType'
|
|
|
|
def c_type(self) -> str:
|
|
return c_name(self.name)
|
|
|
|
def member_names(self) -> List[str]:
|
|
return [m.name for m in self.members]
|
|
|
|
def json_type(self) -> str:
|
|
return 'string'
|
|
|
|
def visit(self, visitor: QAPISchemaVisitor) -> None:
|
|
super().visit(visitor)
|
|
visitor.visit_enum_type(
|
|
self.name, self.info, self.ifcond, self.features,
|
|
self.members, self.prefix)
|
|
|
|
|
|
class QAPISchemaArrayType(QAPISchemaType):
|
|
meta = 'array'
|
|
|
|
def __init__(
|
|
self, name: str, info: Optional[QAPISourceInfo], element_type: str
|
|
):
|
|
super().__init__(name, info, None)
|
|
self._element_type_name = element_type
|
|
self.element_type: QAPISchemaType
|
|
|
|
def need_has_if_optional(self) -> bool:
|
|
# When FOO is an array, we still need has_FOO to distinguish
|
|
# absent (!has_FOO) from present and empty (has_FOO && !FOO).
|
|
return True
|
|
|
|
def check(self, schema: QAPISchema) -> None:
|
|
super().check(schema)
|
|
self.element_type = schema.resolve_type(
|
|
self._element_type_name, self.info,
|
|
self.info.defn_meta if self.info else None)
|
|
assert not isinstance(self.element_type, QAPISchemaArrayType)
|
|
|
|
def set_module(self, schema: QAPISchema) -> None:
|
|
self._set_module(schema, self.element_type.info)
|
|
|
|
@property
|
|
def ifcond(self) -> QAPISchemaIfCond:
|
|
assert self._checked
|
|
return self.element_type.ifcond
|
|
|
|
def is_implicit(self) -> bool:
|
|
return True
|
|
|
|
def c_type(self) -> str:
|
|
return c_name(self.name) + POINTER_SUFFIX
|
|
|
|
def json_type(self) -> str:
|
|
return 'array'
|
|
|
|
def doc_type(self) -> Optional[str]:
|
|
elt_doc_type = self.element_type.doc_type()
|
|
if not elt_doc_type:
|
|
return None
|
|
return 'array of ' + elt_doc_type
|
|
|
|
def visit(self, visitor: QAPISchemaVisitor) -> None:
|
|
super().visit(visitor)
|
|
visitor.visit_array_type(self.name, self.info, self.ifcond,
|
|
self.element_type)
|
|
|
|
def describe(self) -> str:
|
|
return "%s type ['%s']" % (self.meta, self._element_type_name)
|
|
|
|
|
|
class QAPISchemaObjectType(QAPISchemaType):
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
info: Optional[QAPISourceInfo],
|
|
doc: Optional[QAPIDoc],
|
|
ifcond: Optional[QAPISchemaIfCond],
|
|
features: Optional[List[QAPISchemaFeature]],
|
|
base: Optional[str],
|
|
local_members: List[QAPISchemaObjectTypeMember],
|
|
variants: Optional[QAPISchemaVariants],
|
|
):
|
|
# struct has local_members, optional base, and no variants
|
|
# union has base, variants, and no local_members
|
|
super().__init__(name, info, doc, ifcond, features)
|
|
self.meta = 'union' if variants else 'struct'
|
|
for m in local_members:
|
|
m.set_defined_in(name)
|
|
if variants is not None:
|
|
variants.set_defined_in(name)
|
|
self._base_name = base
|
|
self.base = None
|
|
self.local_members = local_members
|
|
self.variants = variants
|
|
self.members: List[QAPISchemaObjectTypeMember]
|
|
self._check_complete = False
|
|
|
|
def check(self, schema: QAPISchema) -> None:
|
|
# This calls another type T's .check() exactly when the C
|
|
# struct emitted by gen_object() contains that T's C struct
|
|
# (pointers don't count).
|
|
if self._check_complete:
|
|
# A previous .check() completed: nothing to do
|
|
return
|
|
if self._checked:
|
|
# Recursed: C struct contains itself
|
|
raise QAPISemError(self.info,
|
|
"object %s contains itself" % self.name)
|
|
|
|
super().check(schema)
|
|
assert self._checked and not self._check_complete
|
|
|
|
seen = OrderedDict()
|
|
if self._base_name:
|
|
self.base = schema.resolve_type(self._base_name, self.info,
|
|
"'base'")
|
|
if (not isinstance(self.base, QAPISchemaObjectType)
|
|
or self.base.variants):
|
|
raise QAPISemError(
|
|
self.info,
|
|
"'base' requires a struct type, %s isn't"
|
|
% self.base.describe())
|
|
self.base.check(schema)
|
|
self.base.check_clash(self.info, seen)
|
|
for m in self.local_members:
|
|
m.check(schema)
|
|
m.check_clash(self.info, seen)
|
|
|
|
# self.check_clash() works in terms of the supertype, but
|
|
# self.members is declared List[QAPISchemaObjectTypeMember].
|
|
# Cast down to the subtype.
|
|
members = cast(List[QAPISchemaObjectTypeMember], list(seen.values()))
|
|
|
|
if self.variants:
|
|
self.variants.check(schema, seen)
|
|
self.variants.check_clash(self.info, seen)
|
|
|
|
self.members = members
|
|
self._check_complete = True # mark completed
|
|
|
|
# Check that the members of this type do not cause duplicate JSON members,
|
|
# and update seen to track the members seen so far. Report any errors
|
|
# on behalf of info, which is not necessarily self.info
|
|
def check_clash(
|
|
self,
|
|
info: Optional[QAPISourceInfo],
|
|
seen: Dict[str, QAPISchemaMember],
|
|
) -> None:
|
|
assert self._checked
|
|
for m in self.members:
|
|
m.check_clash(info, seen)
|
|
if self.variants:
|
|
self.variants.check_clash(info, seen)
|
|
|
|
def connect_doc(self, doc: Optional[QAPIDoc] = None) -> None:
|
|
super().connect_doc(doc)
|
|
doc = doc or self.doc
|
|
if self.base and self.base.is_implicit():
|
|
self.base.connect_doc(doc)
|
|
for m in self.local_members:
|
|
m.connect_doc(doc)
|
|
|
|
def is_implicit(self) -> bool:
|
|
# See QAPISchema._make_implicit_object_type(), as well as
|
|
# _def_predefineds()
|
|
return self.name.startswith('q_')
|
|
|
|
def is_empty(self) -> bool:
|
|
return not self.members and not self.variants
|
|
|
|
def has_conditional_members(self) -> bool:
|
|
return any(m.ifcond.is_present() for m in self.members)
|
|
|
|
def c_name(self) -> str:
|
|
assert self.name != 'q_empty'
|
|
return super().c_name()
|
|
|
|
def c_type(self) -> str:
|
|
assert not self.is_implicit()
|
|
return c_name(self.name) + POINTER_SUFFIX
|
|
|
|
def c_unboxed_type(self) -> str:
|
|
return c_name(self.name)
|
|
|
|
def json_type(self) -> str:
|
|
return 'object'
|
|
|
|
def visit(self, visitor: QAPISchemaVisitor) -> None:
|
|
super().visit(visitor)
|
|
visitor.visit_object_type(
|
|
self.name, self.info, self.ifcond, self.features,
|
|
self.base, self.local_members, self.variants)
|
|
visitor.visit_object_type_flat(
|
|
self.name, self.info, self.ifcond, self.features,
|
|
self.members, self.variants)
|
|
|
|
|
|
class QAPISchemaAlternateType(QAPISchemaType):
|
|
meta = 'alternate'
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
info: QAPISourceInfo,
|
|
doc: Optional[QAPIDoc],
|
|
ifcond: Optional[QAPISchemaIfCond],
|
|
features: List[QAPISchemaFeature],
|
|
variants: QAPISchemaVariants,
|
|
):
|
|
super().__init__(name, info, doc, ifcond, features)
|
|
assert variants.tag_member
|
|
variants.set_defined_in(name)
|
|
variants.tag_member.set_defined_in(self.name)
|
|
self.variants = variants
|
|
|
|
def check(self, schema: QAPISchema) -> None:
|
|
super().check(schema)
|
|
self.variants.tag_member.check(schema)
|
|
# Not calling self.variants.check_clash(), because there's nothing
|
|
# to clash with
|
|
self.variants.check(schema, {})
|
|
# Alternate branch names have no relation to the tag enum values;
|
|
# so we have to check for potential name collisions ourselves.
|
|
seen: Dict[str, QAPISchemaMember] = {}
|
|
types_seen: Dict[str, str] = {}
|
|
for v in self.variants.variants:
|
|
v.check_clash(self.info, seen)
|
|
qtype = v.type.alternate_qtype()
|
|
if not qtype:
|
|
raise QAPISemError(
|
|
self.info,
|
|
"%s cannot use %s"
|
|
% (v.describe(self.info), v.type.describe()))
|
|
conflicting = set([qtype])
|
|
if qtype == 'QTYPE_QSTRING':
|
|
if isinstance(v.type, QAPISchemaEnumType):
|
|
for m in v.type.members:
|
|
if m.name in ['on', 'off']:
|
|
conflicting.add('QTYPE_QBOOL')
|
|
if re.match(r'[-+0-9.]', m.name):
|
|
# lazy, could be tightened
|
|
conflicting.add('QTYPE_QNUM')
|
|
else:
|
|
conflicting.add('QTYPE_QNUM')
|
|
conflicting.add('QTYPE_QBOOL')
|
|
for qt in conflicting:
|
|
if qt in types_seen:
|
|
raise QAPISemError(
|
|
self.info,
|
|
"%s can't be distinguished from '%s'"
|
|
% (v.describe(self.info), types_seen[qt]))
|
|
types_seen[qt] = v.name
|
|
|
|
def connect_doc(self, doc: Optional[QAPIDoc] = None) -> None:
|
|
super().connect_doc(doc)
|
|
doc = doc or self.doc
|
|
for v in self.variants.variants:
|
|
v.connect_doc(doc)
|
|
|
|
def c_type(self) -> str:
|
|
return c_name(self.name) + POINTER_SUFFIX
|
|
|
|
def json_type(self) -> str:
|
|
return 'value'
|
|
|
|
def visit(self, visitor: QAPISchemaVisitor) -> None:
|
|
super().visit(visitor)
|
|
visitor.visit_alternate_type(
|
|
self.name, self.info, self.ifcond, self.features, self.variants)
|
|
|
|
|
|
class QAPISchemaVariants:
|
|
def __init__(
|
|
self,
|
|
tag_name: Optional[str],
|
|
info: QAPISourceInfo,
|
|
tag_member: Optional[QAPISchemaObjectTypeMember],
|
|
variants: List[QAPISchemaVariant],
|
|
):
|
|
# Unions pass tag_name but not tag_member.
|
|
# Alternates pass tag_member but not tag_name.
|
|
# After check(), tag_member is always set.
|
|
assert bool(tag_member) != bool(tag_name)
|
|
assert (isinstance(tag_name, str) or
|
|
isinstance(tag_member, QAPISchemaObjectTypeMember))
|
|
self._tag_name = tag_name
|
|
self.info = info
|
|
self._tag_member = tag_member
|
|
self.variants = variants
|
|
|
|
@property
|
|
def tag_member(self) -> QAPISchemaObjectTypeMember:
|
|
if self._tag_member is None:
|
|
raise RuntimeError(
|
|
"QAPISchemaVariants has no tag_member property until "
|
|
"after check() has been run."
|
|
)
|
|
return self._tag_member
|
|
|
|
def set_defined_in(self, name: str) -> None:
|
|
for v in self.variants:
|
|
v.set_defined_in(name)
|
|
|
|
def check(
|
|
self, schema: QAPISchema, seen: Dict[str, QAPISchemaMember]
|
|
) -> None:
|
|
if self._tag_name: # union
|
|
# We need to narrow the member type:
|
|
tmp = seen.get(c_name(self._tag_name))
|
|
assert tmp is None or isinstance(tmp, QAPISchemaObjectTypeMember)
|
|
self._tag_member = tmp
|
|
|
|
base = "'base'"
|
|
# Pointing to the base type when not implicit would be
|
|
# nice, but we don't know it here
|
|
if not self._tag_member or self._tag_name != self._tag_member.name:
|
|
raise QAPISemError(
|
|
self.info,
|
|
"discriminator '%s' is not a member of %s"
|
|
% (self._tag_name, base))
|
|
# Here we do:
|
|
assert self.tag_member.defined_in
|
|
base_type = schema.lookup_type(self.tag_member.defined_in)
|
|
assert base_type
|
|
if not base_type.is_implicit():
|
|
base = "base type '%s'" % self.tag_member.defined_in
|
|
if not isinstance(self.tag_member.type, QAPISchemaEnumType):
|
|
raise QAPISemError(
|
|
self.info,
|
|
"discriminator member '%s' of %s must be of enum type"
|
|
% (self._tag_name, base))
|
|
if self.tag_member.optional:
|
|
raise QAPISemError(
|
|
self.info,
|
|
"discriminator member '%s' of %s must not be optional"
|
|
% (self._tag_name, base))
|
|
if self.tag_member.ifcond.is_present():
|
|
raise QAPISemError(
|
|
self.info,
|
|
"discriminator member '%s' of %s must not be conditional"
|
|
% (self._tag_name, base))
|
|
else: # alternate
|
|
assert self._tag_member
|
|
assert isinstance(self.tag_member.type, QAPISchemaEnumType)
|
|
assert not self.tag_member.optional
|
|
assert not self.tag_member.ifcond.is_present()
|
|
if self._tag_name: # union
|
|
# branches that are not explicitly covered get an empty type
|
|
assert self.tag_member.defined_in
|
|
cases = {v.name for v in self.variants}
|
|
for m in self.tag_member.type.members:
|
|
if m.name not in cases:
|
|
v = QAPISchemaVariant(m.name, self.info,
|
|
'q_empty', m.ifcond)
|
|
v.set_defined_in(self.tag_member.defined_in)
|
|
self.variants.append(v)
|
|
if not self.variants:
|
|
raise QAPISemError(self.info, "union has no branches")
|
|
for v in self.variants:
|
|
v.check(schema)
|
|
# Union names must match enum values; alternate names are
|
|
# checked separately. Use 'seen' to tell the two apart.
|
|
if seen:
|
|
if v.name not in self.tag_member.type.member_names():
|
|
raise QAPISemError(
|
|
self.info,
|
|
"branch '%s' is not a value of %s"
|
|
% (v.name, self.tag_member.type.describe()))
|
|
if not isinstance(v.type, QAPISchemaObjectType):
|
|
raise QAPISemError(
|
|
self.info,
|
|
"%s cannot use %s"
|
|
% (v.describe(self.info), v.type.describe()))
|
|
v.type.check(schema)
|
|
|
|
def check_clash(
|
|
self,
|
|
info: Optional[QAPISourceInfo],
|
|
seen: Dict[str, QAPISchemaMember],
|
|
) -> None:
|
|
for v in self.variants:
|
|
# Reset seen map for each variant, since qapi names from one
|
|
# branch do not affect another branch.
|
|
#
|
|
# v.type's typing is enforced in check() above.
|
|
assert isinstance(v.type, QAPISchemaObjectType)
|
|
v.type.check_clash(info, dict(seen))
|
|
|
|
|
|
class QAPISchemaMember:
|
|
""" Represents object members, enum members and features """
|
|
role = 'member'
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
info: Optional[QAPISourceInfo],
|
|
ifcond: Optional[QAPISchemaIfCond] = None,
|
|
):
|
|
self.name = name
|
|
self.info = info
|
|
self.ifcond = ifcond or QAPISchemaIfCond()
|
|
self.defined_in: Optional[str] = None
|
|
|
|
def set_defined_in(self, name: str) -> None:
|
|
assert not self.defined_in
|
|
self.defined_in = name
|
|
|
|
def check_clash(
|
|
self,
|
|
info: Optional[QAPISourceInfo],
|
|
seen: Dict[str, QAPISchemaMember],
|
|
) -> None:
|
|
cname = c_name(self.name)
|
|
if cname in seen:
|
|
raise QAPISemError(
|
|
info,
|
|
"%s collides with %s"
|
|
% (self.describe(info), seen[cname].describe(info)))
|
|
seen[cname] = self
|
|
|
|
def connect_doc(self, doc: Optional[QAPIDoc]) -> None:
|
|
if doc:
|
|
doc.connect_member(self)
|
|
|
|
def describe(self, info: Optional[QAPISourceInfo]) -> str:
|
|
role = self.role
|
|
meta = 'type'
|
|
defined_in = self.defined_in
|
|
assert defined_in
|
|
|
|
if defined_in.startswith('q_obj_'):
|
|
# See QAPISchema._make_implicit_object_type() - reverse the
|
|
# mapping there to create a nice human-readable description
|
|
defined_in = defined_in[6:]
|
|
if defined_in.endswith('-arg'):
|
|
# Implicit type created for a command's dict 'data'
|
|
assert role == 'member'
|
|
role = 'parameter'
|
|
meta = 'command'
|
|
defined_in = defined_in[:-4]
|
|
elif defined_in.endswith('-base'):
|
|
# Implicit type created for a union's dict 'base'
|
|
role = 'base ' + role
|
|
defined_in = defined_in[:-5]
|
|
else:
|
|
assert False
|
|
|
|
assert info is not None
|
|
if defined_in != info.defn_name:
|
|
return "%s '%s' of %s '%s'" % (role, self.name, meta, defined_in)
|
|
return "%s '%s'" % (role, self.name)
|
|
|
|
|
|
class QAPISchemaEnumMember(QAPISchemaMember):
|
|
role = 'value'
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
info: Optional[QAPISourceInfo],
|
|
ifcond: Optional[QAPISchemaIfCond] = None,
|
|
features: Optional[List[QAPISchemaFeature]] = None,
|
|
):
|
|
super().__init__(name, info, ifcond)
|
|
for f in features or []:
|
|
f.set_defined_in(name)
|
|
self.features = features or []
|
|
|
|
def connect_doc(self, doc: Optional[QAPIDoc]) -> None:
|
|
super().connect_doc(doc)
|
|
if doc:
|
|
for f in self.features:
|
|
doc.connect_feature(f)
|
|
|
|
|
|
class QAPISchemaFeature(QAPISchemaMember):
|
|
role = 'feature'
|
|
|
|
def is_special(self) -> bool:
|
|
return self.name in ('deprecated', 'unstable')
|
|
|
|
|
|
class QAPISchemaObjectTypeMember(QAPISchemaMember):
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
info: QAPISourceInfo,
|
|
typ: str,
|
|
optional: bool,
|
|
ifcond: Optional[QAPISchemaIfCond] = None,
|
|
features: Optional[List[QAPISchemaFeature]] = None,
|
|
):
|
|
super().__init__(name, info, ifcond)
|
|
for f in features or []:
|
|
f.set_defined_in(name)
|
|
self._type_name = typ
|
|
self.type: QAPISchemaType # set during check()
|
|
self.optional = optional
|
|
self.features = features or []
|
|
|
|
def need_has(self) -> bool:
|
|
return self.optional and self.type.need_has_if_optional()
|
|
|
|
def check(self, schema: QAPISchema) -> None:
|
|
assert self.defined_in
|
|
self.type = schema.resolve_type(self._type_name, self.info,
|
|
self.describe)
|
|
seen: Dict[str, QAPISchemaMember] = {}
|
|
for f in self.features:
|
|
f.check_clash(self.info, seen)
|
|
|
|
def connect_doc(self, doc: Optional[QAPIDoc]) -> None:
|
|
super().connect_doc(doc)
|
|
if doc:
|
|
for f in self.features:
|
|
doc.connect_feature(f)
|
|
|
|
|
|
class QAPISchemaVariant(QAPISchemaObjectTypeMember):
|
|
role = 'branch'
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
info: QAPISourceInfo,
|
|
typ: str,
|
|
ifcond: QAPISchemaIfCond,
|
|
):
|
|
super().__init__(name, info, typ, False, ifcond)
|
|
|
|
|
|
class QAPISchemaCommand(QAPISchemaDefinition):
|
|
meta = 'command'
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
info: QAPISourceInfo,
|
|
doc: Optional[QAPIDoc],
|
|
ifcond: QAPISchemaIfCond,
|
|
features: List[QAPISchemaFeature],
|
|
arg_type: Optional[str],
|
|
ret_type: Optional[str],
|
|
gen: bool,
|
|
success_response: bool,
|
|
boxed: bool,
|
|
allow_oob: bool,
|
|
allow_preconfig: bool,
|
|
coroutine: bool,
|
|
):
|
|
super().__init__(name, info, doc, ifcond, features)
|
|
self._arg_type_name = arg_type
|
|
self.arg_type: Optional[QAPISchemaObjectType] = None
|
|
self._ret_type_name = ret_type
|
|
self.ret_type: Optional[QAPISchemaType] = None
|
|
self.gen = gen
|
|
self.success_response = success_response
|
|
self.boxed = boxed
|
|
self.allow_oob = allow_oob
|
|
self.allow_preconfig = allow_preconfig
|
|
self.coroutine = coroutine
|
|
|
|
def check(self, schema: QAPISchema) -> None:
|
|
assert self.info is not None
|
|
super().check(schema)
|
|
if self._arg_type_name:
|
|
arg_type = schema.resolve_type(
|
|
self._arg_type_name, self.info, "command's 'data'")
|
|
if not isinstance(arg_type, QAPISchemaObjectType):
|
|
raise QAPISemError(
|
|
self.info,
|
|
"command's 'data' cannot take %s"
|
|
% arg_type.describe())
|
|
self.arg_type = arg_type
|
|
if self.arg_type.variants and not self.boxed:
|
|
raise QAPISemError(
|
|
self.info,
|
|
"command's 'data' can take %s only with 'boxed': true"
|
|
% self.arg_type.describe())
|
|
self.arg_type.check(schema)
|
|
if self.arg_type.has_conditional_members() and not self.boxed:
|
|
raise QAPISemError(
|
|
self.info,
|
|
"conditional command arguments require 'boxed': true")
|
|
if self._ret_type_name:
|
|
self.ret_type = schema.resolve_type(
|
|
self._ret_type_name, self.info, "command's 'returns'")
|
|
if self.name not in self.info.pragma.command_returns_exceptions:
|
|
typ = self.ret_type
|
|
if isinstance(typ, QAPISchemaArrayType):
|
|
typ = typ.element_type
|
|
if not isinstance(typ, QAPISchemaObjectType):
|
|
raise QAPISemError(
|
|
self.info,
|
|
"command's 'returns' cannot take %s"
|
|
% self.ret_type.describe())
|
|
|
|
def connect_doc(self, doc: Optional[QAPIDoc] = None) -> None:
|
|
super().connect_doc(doc)
|
|
doc = doc or self.doc
|
|
if doc:
|
|
if self.arg_type and self.arg_type.is_implicit():
|
|
self.arg_type.connect_doc(doc)
|
|
|
|
def visit(self, visitor: QAPISchemaVisitor) -> None:
|
|
super().visit(visitor)
|
|
visitor.visit_command(
|
|
self.name, self.info, self.ifcond, self.features,
|
|
self.arg_type, self.ret_type, self.gen, self.success_response,
|
|
self.boxed, self.allow_oob, self.allow_preconfig,
|
|
self.coroutine)
|
|
|
|
|
|
class QAPISchemaEvent(QAPISchemaDefinition):
|
|
meta = 'event'
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
info: QAPISourceInfo,
|
|
doc: Optional[QAPIDoc],
|
|
ifcond: QAPISchemaIfCond,
|
|
features: List[QAPISchemaFeature],
|
|
arg_type: Optional[str],
|
|
boxed: bool,
|
|
):
|
|
super().__init__(name, info, doc, ifcond, features)
|
|
self._arg_type_name = arg_type
|
|
self.arg_type: Optional[QAPISchemaObjectType] = None
|
|
self.boxed = boxed
|
|
|
|
def check(self, schema: QAPISchema) -> None:
|
|
super().check(schema)
|
|
if self._arg_type_name:
|
|
typ = schema.resolve_type(
|
|
self._arg_type_name, self.info, "event's 'data'")
|
|
if not isinstance(typ, QAPISchemaObjectType):
|
|
raise QAPISemError(
|
|
self.info,
|
|
"event's 'data' cannot take %s"
|
|
% typ.describe())
|
|
self.arg_type = typ
|
|
if self.arg_type.variants and not self.boxed:
|
|
raise QAPISemError(
|
|
self.info,
|
|
"event's 'data' can take %s only with 'boxed': true"
|
|
% self.arg_type.describe())
|
|
self.arg_type.check(schema)
|
|
if self.arg_type.has_conditional_members() and not self.boxed:
|
|
raise QAPISemError(
|
|
self.info,
|
|
"conditional event arguments require 'boxed': true")
|
|
|
|
def connect_doc(self, doc: Optional[QAPIDoc] = None) -> None:
|
|
super().connect_doc(doc)
|
|
doc = doc or self.doc
|
|
if doc:
|
|
if self.arg_type and self.arg_type.is_implicit():
|
|
self.arg_type.connect_doc(doc)
|
|
|
|
def visit(self, visitor: QAPISchemaVisitor) -> None:
|
|
super().visit(visitor)
|
|
visitor.visit_event(
|
|
self.name, self.info, self.ifcond, self.features,
|
|
self.arg_type, self.boxed)
|
|
|
|
|
|
class QAPISchema:
|
|
def __init__(self, fname: str):
|
|
self.fname = fname
|
|
|
|
try:
|
|
parser = QAPISchemaParser(fname)
|
|
except OSError as err:
|
|
raise QAPIError(
|
|
f"can't read schema file '{fname}': {err.strerror}"
|
|
) from err
|
|
|
|
exprs = check_exprs(parser.exprs)
|
|
self.docs = parser.docs
|
|
self._entity_list: List[QAPISchemaEntity] = []
|
|
self._entity_dict: Dict[str, QAPISchemaDefinition] = {}
|
|
self._module_dict: Dict[str, QAPISchemaModule] = OrderedDict()
|
|
self._schema_dir = os.path.dirname(fname)
|
|
self._make_module(QAPISchemaModule.BUILTIN_MODULE_NAME)
|
|
self._make_module(fname)
|
|
self._predefining = True
|
|
self._def_predefineds()
|
|
self._predefining = False
|
|
self._def_exprs(exprs)
|
|
self.check()
|
|
|
|
def _def_entity(self, ent: QAPISchemaEntity) -> None:
|
|
self._entity_list.append(ent)
|
|
|
|
def _def_definition(self, defn: QAPISchemaDefinition) -> None:
|
|
# Only the predefined types are allowed to not have info
|
|
assert defn.info or self._predefining
|
|
self._def_entity(defn)
|
|
# TODO reject names that differ only in '_' vs. '.' vs. '-',
|
|
# because they're liable to clash in generated C.
|
|
other_defn = self._entity_dict.get(defn.name)
|
|
if other_defn:
|
|
if other_defn.info:
|
|
where = QAPISourceError(other_defn.info, "previous definition")
|
|
raise QAPISemError(
|
|
defn.info,
|
|
"'%s' is already defined\n%s" % (defn.name, where))
|
|
raise QAPISemError(
|
|
defn.info, "%s is already defined" % other_defn.describe())
|
|
self._entity_dict[defn.name] = defn
|
|
|
|
def lookup_entity(self,name: str) -> Optional[QAPISchemaEntity]:
|
|
return self._entity_dict.get(name)
|
|
|
|
def lookup_type(self, name: str) -> Optional[QAPISchemaType]:
|
|
typ = self.lookup_entity(name)
|
|
if isinstance(typ, QAPISchemaType):
|
|
return typ
|
|
return None
|
|
|
|
def resolve_type(
|
|
self,
|
|
name: str,
|
|
info: Optional[QAPISourceInfo],
|
|
what: Union[None, str, Callable[[QAPISourceInfo], str]],
|
|
) -> QAPISchemaType:
|
|
typ = self.lookup_type(name)
|
|
if not typ:
|
|
assert info and what # built-in types must not fail lookup
|
|
if callable(what):
|
|
what = what(info)
|
|
raise QAPISemError(
|
|
info, "%s uses unknown type '%s'" % (what, name))
|
|
return typ
|
|
|
|
def _module_name(self, fname: str) -> str:
|
|
if QAPISchemaModule.is_system_module(fname):
|
|
return fname
|
|
return os.path.relpath(fname, self._schema_dir)
|
|
|
|
def _make_module(self, fname: str) -> QAPISchemaModule:
|
|
name = self._module_name(fname)
|
|
if name not in self._module_dict:
|
|
self._module_dict[name] = QAPISchemaModule(name)
|
|
return self._module_dict[name]
|
|
|
|
def module_by_fname(self, fname: str) -> QAPISchemaModule:
|
|
name = self._module_name(fname)
|
|
return self._module_dict[name]
|
|
|
|
def _def_include(self, expr: QAPIExpression) -> None:
|
|
include = expr['include']
|
|
assert expr.doc is None
|
|
self._def_entity(
|
|
QAPISchemaInclude(self._make_module(include), expr.info))
|
|
|
|
def _def_builtin_type(
|
|
self, name: str, json_type: str, c_type: str
|
|
) -> None:
|
|
self._def_definition(QAPISchemaBuiltinType(name, json_type, c_type))
|
|
# Instantiating only the arrays that are actually used would
|
|
# be nice, but we can't as long as their generated code
|
|
# (qapi-builtin-types.[ch]) may be shared by some other
|
|
# schema.
|
|
self._make_array_type(name, None)
|
|
|
|
def _def_predefineds(self) -> None:
|
|
for t in [('str', 'string', 'char' + POINTER_SUFFIX),
|
|
('number', 'number', 'double'),
|
|
('int', 'int', 'int64_t'),
|
|
('int8', 'int', 'int8_t'),
|
|
('int16', 'int', 'int16_t'),
|
|
('int32', 'int', 'int32_t'),
|
|
('int64', 'int', 'int64_t'),
|
|
('uint8', 'int', 'uint8_t'),
|
|
('uint16', 'int', 'uint16_t'),
|
|
('uint32', 'int', 'uint32_t'),
|
|
('uint64', 'int', 'uint64_t'),
|
|
('size', 'int', 'uint64_t'),
|
|
('bool', 'boolean', 'bool'),
|
|
('any', 'value', 'QObject' + POINTER_SUFFIX),
|
|
('null', 'null', 'QNull' + POINTER_SUFFIX)]:
|
|
self._def_builtin_type(*t)
|
|
self.the_empty_object_type = QAPISchemaObjectType(
|
|
'q_empty', None, None, None, None, None, [], None)
|
|
self._def_definition(self.the_empty_object_type)
|
|
|
|
qtypes = ['none', 'qnull', 'qnum', 'qstring', 'qdict', 'qlist',
|
|
'qbool']
|
|
qtype_values = self._make_enum_members(
|
|
[{'name': n} for n in qtypes], None)
|
|
|
|
self._def_definition(QAPISchemaEnumType(
|
|
'QType', None, None, None, None, qtype_values, 'QTYPE'))
|
|
|
|
def _make_features(
|
|
self,
|
|
features: Optional[List[Dict[str, Any]]],
|
|
info: Optional[QAPISourceInfo],
|
|
) -> List[QAPISchemaFeature]:
|
|
if features is None:
|
|
return []
|
|
return [QAPISchemaFeature(f['name'], info,
|
|
QAPISchemaIfCond(f.get('if')))
|
|
for f in features]
|
|
|
|
def _make_enum_member(
|
|
self,
|
|
name: str,
|
|
ifcond: Optional[Union[str, Dict[str, Any]]],
|
|
features: Optional[List[Dict[str, Any]]],
|
|
info: Optional[QAPISourceInfo],
|
|
) -> QAPISchemaEnumMember:
|
|
return QAPISchemaEnumMember(name, info,
|
|
QAPISchemaIfCond(ifcond),
|
|
self._make_features(features, info))
|
|
|
|
def _make_enum_members(
|
|
self, values: List[Dict[str, Any]], info: Optional[QAPISourceInfo]
|
|
) -> List[QAPISchemaEnumMember]:
|
|
return [self._make_enum_member(v['name'], v.get('if'),
|
|
v.get('features'), info)
|
|
for v in values]
|
|
|
|
def _make_array_type(
|
|
self, element_type: str, info: Optional[QAPISourceInfo]
|
|
) -> str:
|
|
name = element_type + 'List' # reserved by check_defn_name_str()
|
|
if not self.lookup_type(name):
|
|
self._def_definition(QAPISchemaArrayType(
|
|
name, info, element_type))
|
|
return name
|
|
|
|
def _make_implicit_object_type(
|
|
self,
|
|
name: str,
|
|
info: QAPISourceInfo,
|
|
ifcond: QAPISchemaIfCond,
|
|
role: str,
|
|
members: List[QAPISchemaObjectTypeMember],
|
|
) -> Optional[str]:
|
|
if not members:
|
|
return None
|
|
# See also QAPISchemaObjectTypeMember.describe()
|
|
name = 'q_obj_%s-%s' % (name, role)
|
|
typ = self.lookup_entity(name)
|
|
if typ:
|
|
assert(isinstance(typ, QAPISchemaObjectType))
|
|
# The implicit object type has multiple users. This can
|
|
# only be a duplicate definition, which will be flagged
|
|
# later.
|
|
pass
|
|
else:
|
|
self._def_definition(QAPISchemaObjectType(
|
|
name, info, None, ifcond, None, None, members, None))
|
|
return name
|
|
|
|
def _def_enum_type(self, expr: QAPIExpression) -> None:
|
|
name = expr['enum']
|
|
data = expr['data']
|
|
prefix = expr.get('prefix')
|
|
ifcond = QAPISchemaIfCond(expr.get('if'))
|
|
info = expr.info
|
|
features = self._make_features(expr.get('features'), info)
|
|
self._def_definition(QAPISchemaEnumType(
|
|
name, info, expr.doc, ifcond, features,
|
|
self._make_enum_members(data, info), prefix))
|
|
|
|
def _make_member(
|
|
self,
|
|
name: str,
|
|
typ: Union[List[str], str],
|
|
ifcond: QAPISchemaIfCond,
|
|
features: Optional[List[Dict[str, Any]]],
|
|
info: QAPISourceInfo,
|
|
) -> QAPISchemaObjectTypeMember:
|
|
optional = False
|
|
if name.startswith('*'):
|
|
name = name[1:]
|
|
optional = True
|
|
if isinstance(typ, list):
|
|
assert len(typ) == 1
|
|
typ = self._make_array_type(typ[0], info)
|
|
return QAPISchemaObjectTypeMember(name, info, typ, optional, ifcond,
|
|
self._make_features(features, info))
|
|
|
|
def _make_members(
|
|
self,
|
|
data: Dict[str, Any],
|
|
info: QAPISourceInfo,
|
|
) -> List[QAPISchemaObjectTypeMember]:
|
|
return [self._make_member(key, value['type'],
|
|
QAPISchemaIfCond(value.get('if')),
|
|
value.get('features'), info)
|
|
for (key, value) in data.items()]
|
|
|
|
def _def_struct_type(self, expr: QAPIExpression) -> None:
|
|
name = expr['struct']
|
|
base = expr.get('base')
|
|
data = expr['data']
|
|
info = expr.info
|
|
ifcond = QAPISchemaIfCond(expr.get('if'))
|
|
features = self._make_features(expr.get('features'), info)
|
|
self._def_definition(QAPISchemaObjectType(
|
|
name, info, expr.doc, ifcond, features, base,
|
|
self._make_members(data, info),
|
|
None))
|
|
|
|
def _make_variant(
|
|
self,
|
|
case: str,
|
|
typ: str,
|
|
ifcond: QAPISchemaIfCond,
|
|
info: QAPISourceInfo,
|
|
) -> QAPISchemaVariant:
|
|
if isinstance(typ, list):
|
|
assert len(typ) == 1
|
|
typ = self._make_array_type(typ[0], info)
|
|
return QAPISchemaVariant(case, info, typ, ifcond)
|
|
|
|
def _def_union_type(self, expr: QAPIExpression) -> None:
|
|
name = expr['union']
|
|
base = expr['base']
|
|
tag_name = expr['discriminator']
|
|
data = expr['data']
|
|
assert isinstance(data, dict)
|
|
info = expr.info
|
|
ifcond = QAPISchemaIfCond(expr.get('if'))
|
|
features = self._make_features(expr.get('features'), info)
|
|
if isinstance(base, dict):
|
|
base = self._make_implicit_object_type(
|
|
name, info, ifcond,
|
|
'base', self._make_members(base, info))
|
|
variants = [
|
|
self._make_variant(key, value['type'],
|
|
QAPISchemaIfCond(value.get('if')),
|
|
info)
|
|
for (key, value) in data.items()]
|
|
members: List[QAPISchemaObjectTypeMember] = []
|
|
self._def_definition(
|
|
QAPISchemaObjectType(name, info, expr.doc, ifcond, features,
|
|
base, members,
|
|
QAPISchemaVariants(
|
|
tag_name, info, None, variants)))
|
|
|
|
def _def_alternate_type(self, expr: QAPIExpression) -> None:
|
|
name = expr['alternate']
|
|
data = expr['data']
|
|
assert isinstance(data, dict)
|
|
ifcond = QAPISchemaIfCond(expr.get('if'))
|
|
info = expr.info
|
|
features = self._make_features(expr.get('features'), info)
|
|
variants = [
|
|
self._make_variant(key, value['type'],
|
|
QAPISchemaIfCond(value.get('if')),
|
|
info)
|
|
for (key, value) in data.items()]
|
|
tag_member = QAPISchemaObjectTypeMember('type', info, 'QType', False)
|
|
self._def_definition(
|
|
QAPISchemaAlternateType(
|
|
name, info, expr.doc, ifcond, features,
|
|
QAPISchemaVariants(None, info, tag_member, variants)))
|
|
|
|
def _def_command(self, expr: QAPIExpression) -> None:
|
|
name = expr['command']
|
|
data = expr.get('data')
|
|
rets = expr.get('returns')
|
|
gen = expr.get('gen', True)
|
|
success_response = expr.get('success-response', True)
|
|
boxed = expr.get('boxed', False)
|
|
allow_oob = expr.get('allow-oob', False)
|
|
allow_preconfig = expr.get('allow-preconfig', False)
|
|
coroutine = expr.get('coroutine', False)
|
|
ifcond = QAPISchemaIfCond(expr.get('if'))
|
|
info = expr.info
|
|
features = self._make_features(expr.get('features'), info)
|
|
if isinstance(data, OrderedDict):
|
|
data = self._make_implicit_object_type(
|
|
name, info, ifcond,
|
|
'arg', self._make_members(data, info))
|
|
if isinstance(rets, list):
|
|
assert len(rets) == 1
|
|
rets = self._make_array_type(rets[0], info)
|
|
self._def_definition(
|
|
QAPISchemaCommand(name, info, expr.doc, ifcond, features, data,
|
|
rets, gen, success_response, boxed, allow_oob,
|
|
allow_preconfig, coroutine))
|
|
|
|
def _def_event(self, expr: QAPIExpression) -> None:
|
|
name = expr['event']
|
|
data = expr.get('data')
|
|
boxed = expr.get('boxed', False)
|
|
ifcond = QAPISchemaIfCond(expr.get('if'))
|
|
info = expr.info
|
|
features = self._make_features(expr.get('features'), info)
|
|
if isinstance(data, OrderedDict):
|
|
data = self._make_implicit_object_type(
|
|
name, info, ifcond,
|
|
'arg', self._make_members(data, info))
|
|
self._def_definition(QAPISchemaEvent(name, info, expr.doc, ifcond,
|
|
features, data, boxed))
|
|
|
|
def _def_exprs(self, exprs: List[QAPIExpression]) -> None:
|
|
for expr in exprs:
|
|
if 'enum' in expr:
|
|
self._def_enum_type(expr)
|
|
elif 'struct' in expr:
|
|
self._def_struct_type(expr)
|
|
elif 'union' in expr:
|
|
self._def_union_type(expr)
|
|
elif 'alternate' in expr:
|
|
self._def_alternate_type(expr)
|
|
elif 'command' in expr:
|
|
self._def_command(expr)
|
|
elif 'event' in expr:
|
|
self._def_event(expr)
|
|
elif 'include' in expr:
|
|
self._def_include(expr)
|
|
else:
|
|
assert False
|
|
|
|
def check(self) -> None:
|
|
for ent in self._entity_list:
|
|
ent.check(self)
|
|
ent.connect_doc()
|
|
for ent in self._entity_list:
|
|
ent.set_module(self)
|
|
for doc in self.docs:
|
|
doc.check()
|
|
|
|
def visit(self, visitor: QAPISchemaVisitor) -> None:
|
|
visitor.visit_begin(self)
|
|
for mod in self._module_dict.values():
|
|
mod.visit(visitor)
|
|
visitor.visit_end()
|