python/aqmp: add QMP event support
This class was designed as a "mix-in" primarily so that the feature could be given its own treatment in its own python module. It gets quite a bit too long otherwise. Signed-off-by: John Snow <jsnow@redhat.com> Message-id: 20210915162955.333025-16-jsnow@redhat.com Signed-off-by: John Snow <jsnow@redhat.com>
This commit is contained in:
parent
ad07299941
commit
b3cda213a7
@ -22,6 +22,7 @@ managing QMP events.
|
||||
# the COPYING file in the top-level directory.
|
||||
|
||||
from .error import AQMPError
|
||||
from .events import EventListener
|
||||
from .message import Message
|
||||
from .protocol import ConnectError, Runstate, StateError
|
||||
|
||||
@ -30,6 +31,7 @@ from .protocol import ConnectError, Runstate, StateError
|
||||
__all__ = (
|
||||
# Classes, most to least important
|
||||
'Message',
|
||||
'EventListener',
|
||||
'Runstate',
|
||||
|
||||
# Exceptions, most generic to most explicit
|
||||
|
706
python/qemu/aqmp/events.py
Normal file
706
python/qemu/aqmp/events.py
Normal file
@ -0,0 +1,706 @@
|
||||
"""
|
||||
AQMP Events and EventListeners
|
||||
|
||||
Asynchronous QMP uses `EventListener` objects to listen for events. An
|
||||
`EventListener` is a FIFO event queue that can be pre-filtered to listen
|
||||
for only specific events. Each `EventListener` instance receives its own
|
||||
copy of events that it hears, so events may be consumed without fear or
|
||||
worry for depriving other listeners of events they need to hear.
|
||||
|
||||
|
||||
EventListener Tutorial
|
||||
----------------------
|
||||
|
||||
In all of the following examples, we assume that we have a `QMPClient`
|
||||
instantiated named ``qmp`` that is already connected.
|
||||
|
||||
|
||||
`listener()` context blocks with one name
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The most basic usage is by using the `listener()` context manager to
|
||||
construct them:
|
||||
|
||||
.. code:: python
|
||||
|
||||
with qmp.listener('STOP') as listener:
|
||||
await qmp.execute('stop')
|
||||
await listener.get()
|
||||
|
||||
The listener is active only for the duration of the ‘with’ block. This
|
||||
instance listens only for ‘STOP’ events.
|
||||
|
||||
|
||||
`listener()` context blocks with two or more names
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Multiple events can be selected for by providing any ``Iterable[str]``:
|
||||
|
||||
.. code:: python
|
||||
|
||||
with qmp.listener(('STOP', 'RESUME')) as listener:
|
||||
await qmp.execute('stop')
|
||||
event = await listener.get()
|
||||
assert event['event'] == 'STOP'
|
||||
|
||||
await qmp.execute('cont')
|
||||
event = await listener.get()
|
||||
assert event['event'] == 'RESUME'
|
||||
|
||||
|
||||
`listener()` context blocks with no names
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
By omitting names entirely, you can listen to ALL events.
|
||||
|
||||
.. code:: python
|
||||
|
||||
with qmp.listener() as listener:
|
||||
await qmp.execute('stop')
|
||||
event = await listener.get()
|
||||
assert event['event'] == 'STOP'
|
||||
|
||||
This isn’t a very good use case for this feature: In a non-trivial
|
||||
running system, we may not know what event will arrive next. Grabbing
|
||||
the top of a FIFO queue returning multiple kinds of events may be prone
|
||||
to error.
|
||||
|
||||
|
||||
Using async iterators to retrieve events
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
If you’d like to simply watch what events happen to arrive, you can use
|
||||
the listener as an async iterator:
|
||||
|
||||
.. code:: python
|
||||
|
||||
with qmp.listener() as listener:
|
||||
async for event in listener:
|
||||
print(f"Event arrived: {event['event']}")
|
||||
|
||||
This is analogous to the following code:
|
||||
|
||||
.. code:: python
|
||||
|
||||
with qmp.listener() as listener:
|
||||
while True:
|
||||
event = listener.get()
|
||||
print(f"Event arrived: {event['event']}")
|
||||
|
||||
This event stream will never end, so these blocks will never terminate.
|
||||
|
||||
|
||||
Using asyncio.Task to concurrently retrieve events
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Since a listener’s event stream will never terminate, it is not likely
|
||||
useful to use that form in a script. For longer-running clients, we can
|
||||
create event handlers by using `asyncio.Task` to create concurrent
|
||||
coroutines:
|
||||
|
||||
.. code:: python
|
||||
|
||||
async def print_events(listener):
|
||||
try:
|
||||
async for event in listener:
|
||||
print(f"Event arrived: {event['event']}")
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
|
||||
with qmp.listener() as listener:
|
||||
task = asyncio.Task(print_events(listener))
|
||||
await qmp.execute('stop')
|
||||
await qmp.execute('cont')
|
||||
task.cancel()
|
||||
await task
|
||||
|
||||
However, there is no guarantee that these events will be received by the
|
||||
time we leave this context block. Once the context block is exited, the
|
||||
listener will cease to hear any new events, and becomes inert.
|
||||
|
||||
Be mindful of the timing: the above example will *probably*– but does
|
||||
not *guarantee*– that both STOP/RESUMED events will be printed. The
|
||||
example below outlines how to use listeners outside of a context block.
|
||||
|
||||
|
||||
Using `register_listener()` and `remove_listener()`
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
To create a listener with a longer lifetime, beyond the scope of a
|
||||
single block, create a listener and then call `register_listener()`:
|
||||
|
||||
.. code:: python
|
||||
|
||||
class MyClient:
|
||||
def __init__(self, qmp):
|
||||
self.qmp = qmp
|
||||
self.listener = EventListener()
|
||||
|
||||
async def print_events(self):
|
||||
try:
|
||||
async for event in self.listener:
|
||||
print(f"Event arrived: {event['event']}")
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
|
||||
async def run(self):
|
||||
self.task = asyncio.Task(self.print_events)
|
||||
self.qmp.register_listener(self.listener)
|
||||
await qmp.execute('stop')
|
||||
await qmp.execute('cont')
|
||||
|
||||
async def stop(self):
|
||||
self.task.cancel()
|
||||
await self.task
|
||||
self.qmp.remove_listener(self.listener)
|
||||
|
||||
The listener can be deactivated by using `remove_listener()`. When it is
|
||||
removed, any possible pending events are cleared and it can be
|
||||
re-registered at a later time.
|
||||
|
||||
|
||||
Using the built-in all events listener
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The `QMPClient` object creates its own default listener named
|
||||
:py:obj:`~Events.events` that can be used for the same purpose without
|
||||
having to create your own:
|
||||
|
||||
.. code:: python
|
||||
|
||||
async def print_events(listener):
|
||||
try:
|
||||
async for event in listener:
|
||||
print(f"Event arrived: {event['event']}")
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
|
||||
task = asyncio.Task(print_events(qmp.events))
|
||||
|
||||
await qmp.execute('stop')
|
||||
await qmp.execute('cont')
|
||||
|
||||
task.cancel()
|
||||
await task
|
||||
|
||||
|
||||
Using both .get() and async iterators
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The async iterator and `get()` methods pull events from the same FIFO
|
||||
queue. If you mix the usage of both, be aware: Events are emitted
|
||||
precisely once per listener.
|
||||
|
||||
If multiple contexts try to pull events from the same listener instance,
|
||||
events are still emitted only precisely once.
|
||||
|
||||
This restriction can be lifted by creating additional listeners.
|
||||
|
||||
|
||||
Creating multiple listeners
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Additional `EventListener` objects can be created at-will. Each one
|
||||
receives its own copy of events, with separate FIFO event queues.
|
||||
|
||||
.. code:: python
|
||||
|
||||
my_listener = EventListener()
|
||||
qmp.register_listener(my_listener)
|
||||
|
||||
await qmp.execute('stop')
|
||||
copy1 = await my_listener.get()
|
||||
copy2 = await qmp.events.get()
|
||||
|
||||
assert copy1 == copy2
|
||||
|
||||
In this example, we await an event from both a user-created
|
||||
`EventListener` and the built-in events listener. Both receive the same
|
||||
event.
|
||||
|
||||
|
||||
Clearing listeners
|
||||
~~~~~~~~~~~~~~~~~~
|
||||
|
||||
`EventListener` objects can be cleared, clearing all events seen thus far:
|
||||
|
||||
.. code:: python
|
||||
|
||||
await qmp.execute('stop')
|
||||
qmp.events.clear()
|
||||
await qmp.execute('cont')
|
||||
event = await qmp.events.get()
|
||||
assert event['event'] == 'RESUME'
|
||||
|
||||
`EventListener` objects are FIFO queues. If events are not consumed,
|
||||
they will remain in the queue until they are witnessed or discarded via
|
||||
`clear()`. FIFO queues will be drained automatically upon leaving a
|
||||
context block, or when calling `remove_listener()`.
|
||||
|
||||
|
||||
Accessing listener history
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
`EventListener` objects record their history. Even after being cleared,
|
||||
you can obtain a record of all events seen so far:
|
||||
|
||||
.. code:: python
|
||||
|
||||
await qmp.execute('stop')
|
||||
await qmp.execute('cont')
|
||||
qmp.events.clear()
|
||||
|
||||
assert len(qmp.events.history) == 2
|
||||
assert qmp.events.history[0]['event'] == 'STOP'
|
||||
assert qmp.events.history[1]['event'] == 'RESUME'
|
||||
|
||||
The history is updated immediately and does not require the event to be
|
||||
witnessed first.
|
||||
|
||||
|
||||
Using event filters
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
`EventListener` objects can be given complex filtering criteria if names
|
||||
are not sufficient:
|
||||
|
||||
.. code:: python
|
||||
|
||||
def job1_filter(event) -> bool:
|
||||
event_data = event.get('data', {})
|
||||
event_job_id = event_data.get('id')
|
||||
return event_job_id == "job1"
|
||||
|
||||
with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener:
|
||||
await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...})
|
||||
async for event in listener:
|
||||
if event['data']['status'] == 'concluded':
|
||||
break
|
||||
|
||||
These filters might be most useful when parameterized. `EventListener`
|
||||
objects expect a function that takes only a single argument (the raw
|
||||
event, as a `Message`) and returns a bool; True if the event should be
|
||||
accepted into the stream. You can create a function that adapts this
|
||||
signature to accept configuration parameters:
|
||||
|
||||
.. code:: python
|
||||
|
||||
def job_filter(job_id: str) -> EventFilter:
|
||||
def filter(event: Message) -> bool:
|
||||
return event['data']['id'] == job_id
|
||||
return filter
|
||||
|
||||
with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener:
|
||||
await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...})
|
||||
async for event in listener:
|
||||
if event['data']['status'] == 'concluded':
|
||||
break
|
||||
|
||||
|
||||
Activating an existing listener with `listen()`
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Listeners with complex, long configurations can also be created manually
|
||||
and activated temporarily by using `listen()` instead of `listener()`:
|
||||
|
||||
.. code:: python
|
||||
|
||||
listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
|
||||
'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
|
||||
'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
|
||||
|
||||
with qmp.listen(listener):
|
||||
await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...})
|
||||
async for event in listener:
|
||||
print(event)
|
||||
if event['event'] == 'BLOCK_JOB_COMPLETED':
|
||||
break
|
||||
|
||||
Any events that are not witnessed by the time the block is left will be
|
||||
cleared from the queue; entering the block is an implicit
|
||||
`register_listener()` and leaving the block is an implicit
|
||||
`remove_listener()`.
|
||||
|
||||
|
||||
Activating multiple existing listeners with `listen()`
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
While `listener()` is only capable of creating a single listener,
|
||||
`listen()` is capable of activating multiple listeners simultaneously:
|
||||
|
||||
.. code:: python
|
||||
|
||||
def job_filter(job_id: str) -> EventFilter:
|
||||
def filter(event: Message) -> bool:
|
||||
return event['data']['id'] == job_id
|
||||
return filter
|
||||
|
||||
jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA'))
|
||||
jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB'))
|
||||
|
||||
with qmp.listen(jobA, jobB):
|
||||
qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...})
|
||||
qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...})
|
||||
|
||||
async for event in jobA.get():
|
||||
if event['data']['status'] == 'concluded':
|
||||
break
|
||||
async for event in jobB.get():
|
||||
if event['data']['status'] == 'concluded':
|
||||
break
|
||||
|
||||
|
||||
Extending the `EventListener` class
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
In the case that a more specialized `EventListener` is desired to
|
||||
provide either more functionality or more compact syntax for specialized
|
||||
cases, it can be extended.
|
||||
|
||||
One of the key methods to extend or override is
|
||||
:py:meth:`~EventListener.accept()`. The default implementation checks an
|
||||
incoming message for:
|
||||
|
||||
1. A qualifying name, if any :py:obj:`~EventListener.names` were
|
||||
specified at initialization time
|
||||
2. That :py:obj:`~EventListener.event_filter()` returns True.
|
||||
|
||||
This can be modified however you see fit to change the criteria for
|
||||
inclusion in the stream.
|
||||
|
||||
For convenience, a ``JobListener`` class could be created that simply
|
||||
bakes in configuration so it does not need to be repeated:
|
||||
|
||||
.. code:: python
|
||||
|
||||
class JobListener(EventListener):
|
||||
def __init__(self, job_id: str):
|
||||
super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
|
||||
'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
|
||||
'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
|
||||
self.job_id = job_id
|
||||
|
||||
def accept(self, event) -> bool:
|
||||
if not super().accept(event):
|
||||
return False
|
||||
if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'):
|
||||
return event['data']['id'] == job_id
|
||||
return event['data']['device'] == job_id
|
||||
|
||||
From here on out, you can conjure up a custom-purpose listener that
|
||||
listens only for job-related events for a specific job-id easily:
|
||||
|
||||
.. code:: python
|
||||
|
||||
listener = JobListener('job4')
|
||||
with qmp.listener(listener):
|
||||
await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...})
|
||||
async for event in listener:
|
||||
print(event)
|
||||
if event['event'] == 'BLOCK_JOB_COMPLETED':
|
||||
break
|
||||
|
||||
|
||||
Experimental Interfaces & Design Issues
|
||||
---------------------------------------
|
||||
|
||||
These interfaces are not ones I am sure I will keep or otherwise modify
|
||||
heavily.
|
||||
|
||||
qmp.listener()’s type signature
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
`listener()` does not return anything, because it was assumed the caller
|
||||
already had a handle to the listener. However, for
|
||||
``qmp.listener(EventListener())`` forms, the caller will not have saved
|
||||
a handle to the listener.
|
||||
|
||||
Because this function can accept *many* listeners, I found it hard to
|
||||
accurately type in a way where it could be used in both “one” or “many”
|
||||
forms conveniently and in a statically type-safe manner.
|
||||
|
||||
Ultimately, I removed the return altogether, but perhaps with more time
|
||||
I can work out a way to re-add it.
|
||||
|
||||
|
||||
API Reference
|
||||
-------------
|
||||
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from contextlib import contextmanager
|
||||
import logging
|
||||
from typing import (
|
||||
AsyncIterator,
|
||||
Callable,
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
Optional,
|
||||
Set,
|
||||
Tuple,
|
||||
Union,
|
||||
)
|
||||
|
||||
from .error import AQMPError
|
||||
from .message import Message
|
||||
|
||||
|
||||
EventNames = Union[str, Iterable[str], None]
|
||||
EventFilter = Callable[[Message], bool]
|
||||
|
||||
|
||||
class ListenerError(AQMPError):
|
||||
"""
|
||||
Generic error class for `EventListener`-related problems.
|
||||
"""
|
||||
|
||||
|
||||
class EventListener:
|
||||
"""
|
||||
Selectively listens for events with runtime configurable filtering.
|
||||
|
||||
This class is designed to be directly usable for the most common cases,
|
||||
but it can be extended to provide more rigorous control.
|
||||
|
||||
:param names:
|
||||
One or more names of events to listen for.
|
||||
When not provided, listen for ALL events.
|
||||
:param event_filter:
|
||||
An optional event filtering function.
|
||||
When names are also provided, this acts as a secondary filter.
|
||||
|
||||
When ``names`` and ``event_filter`` are both provided, the names
|
||||
will be filtered first, and then the filter function will be called
|
||||
second. The event filter function can assume that the format of the
|
||||
event is a known format.
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
names: EventNames = None,
|
||||
event_filter: Optional[EventFilter] = None,
|
||||
):
|
||||
# Queue of 'heard' events yet to be witnessed by a caller.
|
||||
self._queue: 'asyncio.Queue[Message]' = asyncio.Queue()
|
||||
|
||||
# Intended as a historical record, NOT a processing queue or backlog.
|
||||
self._history: List[Message] = []
|
||||
|
||||
#: Primary event filter, based on one or more event names.
|
||||
self.names: Set[str] = set()
|
||||
if isinstance(names, str):
|
||||
self.names.add(names)
|
||||
elif names is not None:
|
||||
self.names.update(names)
|
||||
|
||||
#: Optional, secondary event filter.
|
||||
self.event_filter: Optional[EventFilter] = event_filter
|
||||
|
||||
@property
|
||||
def history(self) -> Tuple[Message, ...]:
|
||||
"""
|
||||
A read-only history of all events seen so far.
|
||||
|
||||
This represents *every* event, including those not yet witnessed
|
||||
via `get()` or ``async for``. It persists between `clear()`
|
||||
calls and is immutable.
|
||||
"""
|
||||
return tuple(self._history)
|
||||
|
||||
def accept(self, event: Message) -> bool:
|
||||
"""
|
||||
Determine if this listener accepts this event.
|
||||
|
||||
This method determines which events will appear in the stream.
|
||||
The default implementation simply checks the event against the
|
||||
list of names and the event_filter to decide if this
|
||||
`EventListener` accepts a given event. It can be
|
||||
overridden/extended to provide custom listener behavior.
|
||||
|
||||
User code is not expected to need to invoke this method.
|
||||
|
||||
:param event: The event under consideration.
|
||||
:return: `True`, if this listener accepts this event.
|
||||
"""
|
||||
name_ok = (not self.names) or (event['event'] in self.names)
|
||||
return name_ok and (
|
||||
(not self.event_filter) or self.event_filter(event)
|
||||
)
|
||||
|
||||
async def put(self, event: Message) -> None:
|
||||
"""
|
||||
Conditionally put a new event into the FIFO queue.
|
||||
|
||||
This method is not designed to be invoked from user code, and it
|
||||
should not need to be overridden. It is a public interface so
|
||||
that `QMPClient` has an interface by which it can inform
|
||||
registered listeners of new events.
|
||||
|
||||
The event will be put into the queue if
|
||||
:py:meth:`~EventListener.accept()` returns `True`.
|
||||
|
||||
:param event: The new event to put into the FIFO queue.
|
||||
"""
|
||||
if not self.accept(event):
|
||||
return
|
||||
|
||||
self._history.append(event)
|
||||
await self._queue.put(event)
|
||||
|
||||
async def get(self) -> Message:
|
||||
"""
|
||||
Wait for the very next event in this stream.
|
||||
|
||||
If one is already available, return that one.
|
||||
"""
|
||||
return await self._queue.get()
|
||||
|
||||
def clear(self) -> None:
|
||||
"""
|
||||
Clear this listener of all pending events.
|
||||
|
||||
Called when an `EventListener` is being unregistered, this clears the
|
||||
pending FIFO queue synchronously. It can be also be used to
|
||||
manually clear any pending events, if desired.
|
||||
|
||||
.. warning::
|
||||
Take care when discarding events. Cleared events will be
|
||||
silently tossed on the floor. All events that were ever
|
||||
accepted by this listener are visible in `history()`.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
self._queue.get_nowait()
|
||||
except asyncio.QueueEmpty:
|
||||
break
|
||||
|
||||
def __aiter__(self) -> AsyncIterator[Message]:
|
||||
return self
|
||||
|
||||
async def __anext__(self) -> Message:
|
||||
"""
|
||||
Enables the `EventListener` to function as an async iterator.
|
||||
|
||||
It may be used like this:
|
||||
|
||||
.. code:: python
|
||||
|
||||
async for event in listener:
|
||||
print(event)
|
||||
|
||||
These iterators will never terminate of their own accord; you
|
||||
must provide break conditions or otherwise prepare to run them
|
||||
in an `asyncio.Task` that can be cancelled.
|
||||
"""
|
||||
return await self.get()
|
||||
|
||||
|
||||
class Events:
|
||||
"""
|
||||
Events is a mix-in class that adds event functionality to the QMP class.
|
||||
|
||||
It's designed specifically as a mix-in for `QMPClient`, and it
|
||||
relies upon the class it is being mixed into having a 'logger'
|
||||
property.
|
||||
"""
|
||||
def __init__(self) -> None:
|
||||
self._listeners: List[EventListener] = []
|
||||
|
||||
#: Default, all-events `EventListener`.
|
||||
self.events: EventListener = EventListener()
|
||||
self.register_listener(self.events)
|
||||
|
||||
# Parent class needs to have a logger
|
||||
self.logger: logging.Logger
|
||||
|
||||
async def _event_dispatch(self, msg: Message) -> None:
|
||||
"""
|
||||
Given a new event, propagate it to all of the active listeners.
|
||||
|
||||
:param msg: The event to propagate.
|
||||
"""
|
||||
for listener in self._listeners:
|
||||
await listener.put(msg)
|
||||
|
||||
def register_listener(self, listener: EventListener) -> None:
|
||||
"""
|
||||
Register and activate an `EventListener`.
|
||||
|
||||
:param listener: The listener to activate.
|
||||
:raise ListenerError: If the given listener is already registered.
|
||||
"""
|
||||
if listener in self._listeners:
|
||||
raise ListenerError("Attempted to re-register existing listener")
|
||||
self.logger.debug("Registering %s.", str(listener))
|
||||
self._listeners.append(listener)
|
||||
|
||||
def remove_listener(self, listener: EventListener) -> None:
|
||||
"""
|
||||
Unregister and deactivate an `EventListener`.
|
||||
|
||||
The removed listener will have its pending events cleared via
|
||||
`clear()`. The listener can be re-registered later when
|
||||
desired.
|
||||
|
||||
:param listener: The listener to deactivate.
|
||||
:raise ListenerError: If the given listener is not registered.
|
||||
"""
|
||||
if listener == self.events:
|
||||
raise ListenerError("Cannot remove the default listener.")
|
||||
self.logger.debug("Removing %s.", str(listener))
|
||||
listener.clear()
|
||||
self._listeners.remove(listener)
|
||||
|
||||
@contextmanager
|
||||
def listen(self, *listeners: EventListener) -> Iterator[None]:
|
||||
r"""
|
||||
Context manager: Temporarily listen with an `EventListener`.
|
||||
|
||||
Accepts one or more `EventListener` objects and registers them,
|
||||
activating them for the duration of the context block.
|
||||
|
||||
`EventListener` objects will have any pending events in their
|
||||
FIFO queue cleared upon exiting the context block, when they are
|
||||
deactivated.
|
||||
|
||||
:param \*listeners: One or more EventListeners to activate.
|
||||
:raise ListenerError: If the given listener(s) are already active.
|
||||
"""
|
||||
_added = []
|
||||
|
||||
try:
|
||||
for listener in listeners:
|
||||
self.register_listener(listener)
|
||||
_added.append(listener)
|
||||
|
||||
yield
|
||||
|
||||
finally:
|
||||
for listener in _added:
|
||||
self.remove_listener(listener)
|
||||
|
||||
@contextmanager
|
||||
def listener(
|
||||
self,
|
||||
names: EventNames = (),
|
||||
event_filter: Optional[EventFilter] = None
|
||||
) -> Iterator[EventListener]:
|
||||
"""
|
||||
Context manager: Temporarily listen with a new `EventListener`.
|
||||
|
||||
Creates an `EventListener` object and registers it, activating
|
||||
it for the duration of the context block.
|
||||
|
||||
:param names:
|
||||
One or more names of events to listen for.
|
||||
When not provided, listen for ALL events.
|
||||
:param event_filter:
|
||||
An optional event filtering function.
|
||||
When names are also provided, this acts as a secondary filter.
|
||||
|
||||
:return: The newly created and active `EventListener`.
|
||||
"""
|
||||
listener = EventListener(names, event_filter)
|
||||
with self.listen(listener):
|
||||
yield listener
|
Loading…
Reference in New Issue
Block a user