From 40196c23939758abc5300e85333e676196e3ba6d Mon Sep 17 00:00:00 2001 From: John Snow Date: Fri, 25 Feb 2022 15:59:39 -0500 Subject: [PATCH 01/11] python/aqmp: add _session_guard() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In _new_session, there's a fairly complex except clause that's used to give semantic errors to callers of accept() and connect(). We need to create a new two-step replacement for accept(), so factoring out this piece of logic will be useful. Bolster the comments and docstring here to try and demystify what's going on in this fairly delicate piece of Python magic. (If we were using Python 3.7+, this would be an @asynccontextmanager. We don't have that very nice piece of magic, however, so this must take an Awaitable to manage the Exception contexts properly. We pay the price for platform compatibility.) Signed-off-by: John Snow Acked-by: Kevin Wolf Reviewed-by: Daniel P. Berrangé Message-id: 20220225205948.3693480-2-jsnow@redhat.com Signed-off-by: John Snow --- python/qemu/aqmp/protocol.py | 89 +++++++++++++++++++++++++----------- 1 file changed, 62 insertions(+), 27 deletions(-) diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py index 33358f5cd7..009883f64d 100644 --- a/python/qemu/aqmp/protocol.py +++ b/python/qemu/aqmp/protocol.py @@ -317,6 +317,62 @@ class AsyncProtocol(Generic[T]): # Section: Session machinery # -------------------------- + async def _session_guard(self, coro: Awaitable[None], emsg: str) -> None: + """ + Async guard function used to roll back to `IDLE` on any error. + + On any Exception, the state machine will be reset back to + `IDLE`. Most Exceptions will be wrapped with `ConnectError`, but + `BaseException` events will be left alone (This includes + asyncio.CancelledError, even prior to Python 3.8). + + :param error_message: + Human-readable string describing what connection phase failed. + + :raise BaseException: + When `BaseException` occurs in the guarded block. + :raise ConnectError: + When any other error is encountered in the guarded block. + """ + # Note: After Python 3.6 support is removed, this should be an + # @asynccontextmanager instead of accepting a callback. + try: + await coro + except BaseException as err: + self.logger.error("%s: %s", emsg, exception_summary(err)) + self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) + try: + # Reset the runstate back to IDLE. + await self.disconnect() + except: + # We don't expect any Exceptions from the disconnect function + # here, because we failed to connect in the first place. + # The disconnect() function is intended to perform + # only cannot-fail cleanup here, but you never know. + emsg = ( + "Unexpected bottom half exception. " + "This is a bug in the QMP library. " + "Please report it to and " + "CC: John Snow ." + ) + self.logger.critical("%s:\n%s\n", emsg, pretty_traceback()) + raise + + # CancelledError is an Exception with special semantic meaning; + # We do NOT want to wrap it up under ConnectError. + # NB: CancelledError is not a BaseException before Python 3.8 + if isinstance(err, asyncio.CancelledError): + raise + + # Any other kind of error can be treated as some kind of connection + # failure broadly. Inspect the 'exc' field to explore the root + # cause in greater detail. + if isinstance(err, Exception): + raise ConnectError(emsg, err) from err + + # Raise BaseExceptions un-wrapped, they're more important. + raise + @property def _runstate_event(self) -> asyncio.Event: # asyncio.Event() objects should not be created prior to entrance into @@ -371,34 +427,13 @@ class AsyncProtocol(Generic[T]): """ assert self.runstate == Runstate.IDLE - try: - phase = "connection" - await self._establish_connection(address, ssl, accept) + await self._session_guard( + self._establish_connection(address, ssl, accept), + 'Failed to establish connection') - phase = "session" - await self._establish_session() - - except BaseException as err: - emsg = f"Failed to establish {phase}" - self.logger.error("%s: %s", emsg, exception_summary(err)) - self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) - try: - # Reset from CONNECTING back to IDLE. - await self.disconnect() - except: - emsg = "Unexpected bottom half exception" - self.logger.critical("%s:\n%s\n", emsg, pretty_traceback()) - raise - - # NB: CancelledError is not a BaseException before Python 3.8 - if isinstance(err, asyncio.CancelledError): - raise - - if isinstance(err, Exception): - raise ConnectError(emsg, err) from err - - # Raise BaseExceptions un-wrapped, they're more important. - raise + await self._session_guard( + self._establish_session(), + 'Failed to establish session') assert self.runstate == Runstate.RUNNING From 0ba4e76b23fed77d09be7f56da783ab3f0b2d497 Mon Sep 17 00:00:00 2001 From: John Snow Date: Fri, 25 Feb 2022 15:59:40 -0500 Subject: [PATCH 02/11] python/aqmp: rename 'accept()' to 'start_server_and_accept()' MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, I had a method named "accept()" that under-the-hood calls bind(2), listen(2) *and* accept(2). I meant this as a simplification and counterpart to the one-shot "connect()" method. This is confusing to readers who expect accept() to mean *just* accept(2). Since I need to split apart the "accept()" method into multiple methods anyway (one of which strongly resembling accept(2)), it feels pertinent to rename this method *now*. Rename this all-in-one method "start_server_and_accept()" instead. Signed-off-by: John Snow Acked-by: Kevin Wolf Reviewed-by: Daniel P. Berrangé Message-id: 20220225205948.3693480-3-jsnow@redhat.com Signed-off-by: John Snow --- python/qemu/aqmp/legacy.py | 2 +- python/qemu/aqmp/protocol.py | 6 ++++-- python/tests/protocol.py | 24 ++++++++++++------------ 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/python/qemu/aqmp/legacy.py b/python/qemu/aqmp/legacy.py index 6baa5f3409..dca1e76ed4 100644 --- a/python/qemu/aqmp/legacy.py +++ b/python/qemu/aqmp/legacy.py @@ -91,7 +91,7 @@ class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol): self._aqmp.negotiate = True self._sync( - self._aqmp.accept(self._address), + self._aqmp.start_server_and_accept(self._address), timeout ) diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py index 009883f64d..73719257e0 100644 --- a/python/qemu/aqmp/protocol.py +++ b/python/qemu/aqmp/protocol.py @@ -265,8 +265,10 @@ class AsyncProtocol(Generic[T]): @upper_half @require(Runstate.IDLE) - async def accept(self, address: SocketAddrT, - ssl: Optional[SSLContext] = None) -> None: + async def start_server_and_accept( + self, address: SocketAddrT, + ssl: Optional[SSLContext] = None + ) -> None: """ Accept a connection and begin processing message queues. diff --git a/python/tests/protocol.py b/python/tests/protocol.py index 5cd7938be3..354d6559b9 100644 --- a/python/tests/protocol.py +++ b/python/tests/protocol.py @@ -413,14 +413,14 @@ class Accept(Connect): assert family in ('INET', 'UNIX') if family == 'INET': - await self.proto.accept(('example.com', 1)) + await self.proto.start_server_and_accept(('example.com', 1)) elif family == 'UNIX': - await self.proto.accept('/dev/null') + await self.proto.start_server_and_accept('/dev/null') async def _hanging_connection(self): with TemporaryDirectory(suffix='.aqmp') as tmpdir: sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock") - await self.proto.accept(sock) + await self.proto.start_server_and_accept(sock) class FakeSession(TestBase): @@ -449,13 +449,13 @@ class FakeSession(TestBase): @TestBase.async_test async def testFakeAccept(self): """Test the full state lifecycle (via accept) with a no-op session.""" - await self.proto.accept('/not/a/real/path') + await self.proto.start_server_and_accept('/not/a/real/path') self.assertEqual(self.proto.runstate, Runstate.RUNNING) @TestBase.async_test async def testFakeRecv(self): """Test receiving a fake/null message.""" - await self.proto.accept('/not/a/real/path') + await self.proto.start_server_and_accept('/not/a/real/path') logname = self.proto.logger.name with self.assertLogs(logname, level='DEBUG') as context: @@ -471,7 +471,7 @@ class FakeSession(TestBase): @TestBase.async_test async def testFakeSend(self): """Test sending a fake/null message.""" - await self.proto.accept('/not/a/real/path') + await self.proto.start_server_and_accept('/not/a/real/path') logname = self.proto.logger.name with self.assertLogs(logname, level='DEBUG') as context: @@ -493,7 +493,7 @@ class FakeSession(TestBase): ): with self.assertRaises(StateError) as context: if accept: - await self.proto.accept('/not/a/real/path') + await self.proto.start_server_and_accept('/not/a/real/path') else: await self.proto.connect('/not/a/real/path') @@ -504,7 +504,7 @@ class FakeSession(TestBase): @TestBase.async_test async def testAcceptRequireRunning(self): """Test that accept() cannot be called when Runstate=RUNNING""" - await self.proto.accept('/not/a/real/path') + await self.proto.start_server_and_accept('/not/a/real/path') await self._prod_session_api( Runstate.RUNNING, @@ -515,7 +515,7 @@ class FakeSession(TestBase): @TestBase.async_test async def testConnectRequireRunning(self): """Test that connect() cannot be called when Runstate=RUNNING""" - await self.proto.accept('/not/a/real/path') + await self.proto.start_server_and_accept('/not/a/real/path') await self._prod_session_api( Runstate.RUNNING, @@ -526,7 +526,7 @@ class FakeSession(TestBase): @TestBase.async_test async def testAcceptRequireDisconnecting(self): """Test that accept() cannot be called when Runstate=DISCONNECTING""" - await self.proto.accept('/not/a/real/path') + await self.proto.start_server_and_accept('/not/a/real/path') # Cheat: force a disconnect. await self.proto.simulate_disconnect() @@ -541,7 +541,7 @@ class FakeSession(TestBase): @TestBase.async_test async def testConnectRequireDisconnecting(self): """Test that connect() cannot be called when Runstate=DISCONNECTING""" - await self.proto.accept('/not/a/real/path') + await self.proto.start_server_and_accept('/not/a/real/path') # Cheat: force a disconnect. await self.proto.simulate_disconnect() @@ -576,7 +576,7 @@ class SimpleSession(TestBase): async def testSmoke(self): with TemporaryDirectory(suffix='.aqmp') as tmpdir: sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock") - server_task = create_task(self.server.accept(sock)) + server_task = create_task(self.server.start_server_and_accept(sock)) # give the server a chance to start listening [...] await asyncio.sleep(0) From 68a6cf3ffe3532c0655efbbf5910bd99a1b4a3fa Mon Sep 17 00:00:00 2001 From: John Snow Date: Fri, 25 Feb 2022 15:59:41 -0500 Subject: [PATCH 03/11] python/aqmp: remove _new_session and _establish_connection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These two methods attempted to entirely envelop the logic of establishing a connection to a peer start to finish. However, we need to break apart the incoming connection step into more granular steps. We will no longer be able to reasonably constrain the logic inside of these helper functions. So, remove them - with _session_guard(), they no longer serve a real purpose. Although the public API doesn't change, the internal API does. Now that there are no intermediary methods between e.g. connect() and _do_connect(), there's no hook where the runstate is set. As a result, the test suite changes a little to cope with the new semantics of _do_accept() and _do_connect(). Lastly, take some pieces of the now-deleted docstrings and move them up to the public interface level. They were a little more detailed, and it won't hurt to keep them. Signed-off-by: John Snow Acked-by: Kevin Wolf Reviewed-by: Daniel P. Berrangé Message-id: 20220225205948.3693480-4-jsnow@redhat.com Signed-off-by: John Snow --- python/qemu/aqmp/protocol.py | 117 ++++++++++++++--------------------- python/tests/protocol.py | 10 ++- 2 files changed, 53 insertions(+), 74 deletions(-) diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py index 73719257e0..b7e5e635d8 100644 --- a/python/qemu/aqmp/protocol.py +++ b/python/qemu/aqmp/protocol.py @@ -275,13 +275,25 @@ class AsyncProtocol(Generic[T]): If this call fails, `runstate` is guaranteed to be set back to `IDLE`. :param address: - Address to listen to; UNIX socket path or TCP address/port. + Address to listen on; UNIX socket path or TCP address/port. :param ssl: SSL context to use, if any. :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: If a connection could not be accepted. + :raise ConnectError: + When a connection or session cannot be established. + + This exception will wrap a more concrete one. In most cases, + the wrapped exception will be `OSError` or `EOFError`. If a + protocol-level failure occurs while establishing a new + session, the wrapped error may also be an `QMPError`. """ - await self._new_session(address, ssl, accept=True) + await self._session_guard( + self._do_accept(address, ssl), + 'Failed to establish connection') + await self._session_guard( + self._establish_session(), + 'Failed to establish session') + assert self.runstate == Runstate.RUNNING @upper_half @require(Runstate.IDLE) @@ -297,9 +309,21 @@ class AsyncProtocol(Generic[T]): :param ssl: SSL context to use, if any. :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: If a connection cannot be made to the server. + :raise ConnectError: + When a connection or session cannot be established. + + This exception will wrap a more concrete one. In most cases, + the wrapped exception will be `OSError` or `EOFError`. If a + protocol-level failure occurs while establishing a new + session, the wrapped error may also be an `QMPError`. """ - await self._new_session(address, ssl) + await self._session_guard( + self._do_connect(address, ssl), + 'Failed to establish connection') + await self._session_guard( + self._establish_session(), + 'Failed to establish session') + assert self.runstate == Runstate.RUNNING @upper_half async def disconnect(self) -> None: @@ -401,73 +425,6 @@ class AsyncProtocol(Generic[T]): self._runstate_event.set() self._runstate_event.clear() - @upper_half - async def _new_session(self, - address: SocketAddrT, - ssl: Optional[SSLContext] = None, - accept: bool = False) -> None: - """ - Establish a new connection and initialize the session. - - Connect or accept a new connection, then begin the protocol - session machinery. If this call fails, `runstate` is guaranteed - to be set back to `IDLE`. - - :param address: - Address to connect to/listen on; - UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - :param accept: Accept a connection instead of connecting when `True`. - - :raise ConnectError: - When a connection or session cannot be established. - - This exception will wrap a more concrete one. In most cases, - the wrapped exception will be `OSError` or `EOFError`. If a - protocol-level failure occurs while establishing a new - session, the wrapped error may also be an `QMPError`. - """ - assert self.runstate == Runstate.IDLE - - await self._session_guard( - self._establish_connection(address, ssl, accept), - 'Failed to establish connection') - - await self._session_guard( - self._establish_session(), - 'Failed to establish session') - - assert self.runstate == Runstate.RUNNING - - @upper_half - async def _establish_connection( - self, - address: SocketAddrT, - ssl: Optional[SSLContext] = None, - accept: bool = False - ) -> None: - """ - Establish a new connection. - - :param address: - Address to connect to/listen on; - UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - :param accept: Accept a connection instead of connecting when `True`. - """ - assert self.runstate == Runstate.IDLE - self._set_state(Runstate.CONNECTING) - - # Allow runstate watchers to witness 'CONNECTING' state; some - # failures in the streaming layer are synchronous and will not - # otherwise yield. - await asyncio.sleep(0) - - if accept: - await self._do_accept(address, ssl) - else: - await self._do_connect(address, ssl) - def _bind_hack(self, address: Union[str, Tuple[str, int]]) -> None: """ Used to create a socket in advance of accept(). @@ -508,6 +465,9 @@ class AsyncProtocol(Generic[T]): :raise OSError: For stream-related errors. """ + assert self.runstate == Runstate.IDLE + self._set_state(Runstate.CONNECTING) + self.logger.debug("Awaiting connection on %s ...", address) connected = asyncio.Event() server: Optional[asyncio.AbstractServer] = None @@ -550,6 +510,11 @@ class AsyncProtocol(Generic[T]): sock=self._sock, ) + # Allow runstate watchers to witness 'CONNECTING' state; some + # failures in the streaming layer are synchronous and will not + # otherwise yield. + await asyncio.sleep(0) + server = await coro # Starts listening await connected.wait() # Waits for the callback to fire (and finish) assert server is None @@ -569,6 +534,14 @@ class AsyncProtocol(Generic[T]): :raise OSError: For stream-related errors. """ + assert self.runstate == Runstate.IDLE + self._set_state(Runstate.CONNECTING) + + # Allow runstate watchers to witness 'CONNECTING' state; some + # failures in the streaming layer are synchronous and will not + # otherwise yield. + await asyncio.sleep(0) + self.logger.debug("Connecting to %s ...", address) if isinstance(address, tuple): diff --git a/python/tests/protocol.py b/python/tests/protocol.py index 354d6559b9..8dd26c4ed1 100644 --- a/python/tests/protocol.py +++ b/python/tests/protocol.py @@ -42,11 +42,17 @@ class NullProtocol(AsyncProtocol[None]): await super()._establish_session() async def _do_accept(self, address, ssl=None): - if not self.fake_session: + if self.fake_session: + self._set_state(Runstate.CONNECTING) + await asyncio.sleep(0) + else: await super()._do_accept(address, ssl) async def _do_connect(self, address, ssl=None): - if not self.fake_session: + if self.fake_session: + self._set_state(Runstate.CONNECTING) + await asyncio.sleep(0) + else: await super()._do_connect(address, ssl) async def _do_recv(self) -> None: From 830e6fd36e2aef37b158a10dea6c3853ce43b20c Mon Sep 17 00:00:00 2001 From: John Snow Date: Fri, 25 Feb 2022 15:59:42 -0500 Subject: [PATCH 04/11] python/aqmp: split _client_connected_cb() out as _incoming() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As part of disentangling the monolithic nature of _do_accept(), split out the incoming callback to prepare for factoring out the "wait for a peer" step. Namely, this means using an event signal we can wait on from outside of this method. Signed-off-by: John Snow Acked-by: Kevin Wolf Reviewed-by: Daniel P. Berrangé Message-id: 20220225205948.3693480-5-jsnow@redhat.com Signed-off-by: John Snow --- python/qemu/aqmp/protocol.py | 83 +++++++++++++++++++++++++----------- 1 file changed, 58 insertions(+), 25 deletions(-) diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py index b7e5e635d8..56f05b9030 100644 --- a/python/qemu/aqmp/protocol.py +++ b/python/qemu/aqmp/protocol.py @@ -242,6 +242,10 @@ class AsyncProtocol(Generic[T]): # Workaround for bind() self._sock: Optional[socket.socket] = None + # Server state for start_server() and _incoming() + self._server: Optional[asyncio.AbstractServer] = None + self._accepted: Optional[asyncio.Event] = None + def __repr__(self) -> str: cls_name = type(self).__name__ tokens = [] @@ -425,6 +429,54 @@ class AsyncProtocol(Generic[T]): self._runstate_event.set() self._runstate_event.clear() + @bottom_half # However, it does not run from the R/W tasks. + async def _stop_server(self) -> None: + """ + Stop listening for / accepting new incoming connections. + """ + if self._server is None: + return + + try: + self.logger.debug("Stopping server.") + self._server.close() + await self._server.wait_closed() + self.logger.debug("Server stopped.") + finally: + self._server = None + + @bottom_half # However, it does not run from the R/W tasks. + async def _incoming(self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter) -> None: + """ + Accept an incoming connection and signal the upper_half. + + This method does the minimum necessary to accept a single + incoming connection. It signals back to the upper_half ASAP so + that any errors during session initialization can occur + naturally in the caller's stack. + + :param reader: Incoming `asyncio.StreamReader` + :param writer: Incoming `asyncio.StreamWriter` + """ + peer = writer.get_extra_info('peername', 'Unknown peer') + self.logger.debug("Incoming connection from %s", peer) + + if self._reader or self._writer: + # Sadly, we can have more than one pending connection + # because of https://bugs.python.org/issue46715 + # Close any extra connections we don't actually want. + self.logger.warning("Extraneous connection inadvertently accepted") + writer.close() + return + + # A connection has been accepted; stop listening for new ones. + assert self._accepted is not None + await self._stop_server() + self._reader, self._writer = (reader, writer) + self._accepted.set() + def _bind_hack(self, address: Union[str, Tuple[str, int]]) -> None: """ Used to create a socket in advance of accept(). @@ -469,30 +521,11 @@ class AsyncProtocol(Generic[T]): self._set_state(Runstate.CONNECTING) self.logger.debug("Awaiting connection on %s ...", address) - connected = asyncio.Event() - server: Optional[asyncio.AbstractServer] = None - - async def _client_connected_cb(reader: asyncio.StreamReader, - writer: asyncio.StreamWriter) -> None: - """Used to accept a single incoming connection, see below.""" - nonlocal server - nonlocal connected - - # A connection has been accepted; stop listening for new ones. - assert server is not None - server.close() - await server.wait_closed() - server = None - - # Register this client as being connected - self._reader, self._writer = (reader, writer) - - # Signal back: We've accepted a client! - connected.set() + self._accepted = asyncio.Event() if isinstance(address, tuple): coro = asyncio.start_server( - _client_connected_cb, + self._incoming, host=None if self._sock else address[0], port=None if self._sock else address[1], ssl=ssl, @@ -502,7 +535,7 @@ class AsyncProtocol(Generic[T]): ) else: coro = asyncio.start_unix_server( - _client_connected_cb, + self._incoming, path=None if self._sock else address, ssl=ssl, backlog=1, @@ -515,9 +548,9 @@ class AsyncProtocol(Generic[T]): # otherwise yield. await asyncio.sleep(0) - server = await coro # Starts listening - await connected.wait() # Waits for the callback to fire (and finish) - assert server is None + self._server = await coro # Starts listening + await self._accepted.wait() # Waits for the callback to finish + assert self._server is None self._sock = None self.logger.debug("Connection accepted.") From 1b9c8cb6ce6b5c5911eb715b2d5b0a2671999dde Mon Sep 17 00:00:00 2001 From: John Snow Date: Fri, 25 Feb 2022 15:59:43 -0500 Subject: [PATCH 05/11] python/aqmp: squelch pylint warning for too many lines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I would really like to keep this under 1000 lines, I promise. Doesn't look like it's gonna happen. Signed-off-by: John Snow Acked-by: Kevin Wolf Reviewed-by: Daniel P. Berrangé Message-id: 20220225205948.3693480-6-jsnow@redhat.com Signed-off-by: John Snow --- python/qemu/aqmp/protocol.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py index 56f05b9030..631bcdaa55 100644 --- a/python/qemu/aqmp/protocol.py +++ b/python/qemu/aqmp/protocol.py @@ -10,6 +10,9 @@ In this package, it is used as the implementation for the `QMPClient` class. """ +# It's all the docstrings ... ! It's long for a good reason ^_^; +# pylint: disable=too-many-lines + import asyncio from asyncio import StreamReader, StreamWriter from enum import Enum From 5e9902a030ab832b0b6577764c65ce6a6f874af6 Mon Sep 17 00:00:00 2001 From: John Snow Date: Fri, 25 Feb 2022 15:59:44 -0500 Subject: [PATCH 06/11] python/aqmp: refactor _do_accept() into two distinct steps MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactor _do_accept() into _do_start_server() and _do_accept(). As of this commit, the former calls the latter, but in subsequent commits they'll be split apart. (So please forgive the misnomer for _do_start_server(); it will live up to its name shortly, and the docstring will be updated then too. I'm just cutting down on some churn.) Signed-off-by: John Snow Acked-by: Kevin Wolf Reviewed-by: Daniel P. Berrangé Message-id: 20220225205948.3693480-7-jsnow@redhat.com Signed-off-by: John Snow --- python/qemu/aqmp/protocol.py | 29 ++++++++++++++++++++++++----- python/tests/protocol.py | 4 ++-- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py index 631bcdaa55..e2bdad542d 100644 --- a/python/qemu/aqmp/protocol.py +++ b/python/qemu/aqmp/protocol.py @@ -295,7 +295,7 @@ class AsyncProtocol(Generic[T]): session, the wrapped error may also be an `QMPError`. """ await self._session_guard( - self._do_accept(address, ssl), + self._do_start_server(address, ssl), 'Failed to establish connection') await self._session_guard( self._establish_session(), @@ -509,8 +509,8 @@ class AsyncProtocol(Generic[T]): self._sock = sock @upper_half - async def _do_accept(self, address: SocketAddrT, - ssl: Optional[SSLContext] = None) -> None: + async def _do_start_server(self, address: SocketAddrT, + ssl: Optional[SSLContext] = None) -> None: """ Acting as the transport server, accept a single connection. @@ -551,9 +551,28 @@ class AsyncProtocol(Generic[T]): # otherwise yield. await asyncio.sleep(0) - self._server = await coro # Starts listening - await self._accepted.wait() # Waits for the callback to finish + # This will start the server (bind(2), listen(2)). It will also + # call accept(2) if we yield, but we don't block on that here. + self._server = await coro + + # Just for this one commit, wait for a peer. + # This gets split out in the next patch. + await self._do_accept() + + @upper_half + async def _do_accept(self) -> None: + """ + Wait for and accept an incoming connection. + + Requires that we have not yet accepted an incoming connection + from the upper_half, but it's OK if the server is no longer + running because the bottom_half has already accepted the + connection. + """ + assert self._accepted is not None + await self._accepted.wait() assert self._server is None + self._accepted = None self._sock = None self.logger.debug("Connection accepted.") diff --git a/python/tests/protocol.py b/python/tests/protocol.py index 8dd26c4ed1..5e442e1efb 100644 --- a/python/tests/protocol.py +++ b/python/tests/protocol.py @@ -41,12 +41,12 @@ class NullProtocol(AsyncProtocol[None]): self.trigger_input = asyncio.Event() await super()._establish_session() - async def _do_accept(self, address, ssl=None): + async def _do_start_server(self, address, ssl=None): if self.fake_session: self._set_state(Runstate.CONNECTING) await asyncio.sleep(0) else: - await super()._do_accept(address, ssl) + await super()._do_start_server(address, ssl) async def _do_connect(self, address, ssl=None): if self.fake_session: From 32c5abf051d06ff103d9d30eb6a7f3e8bf582334 Mon Sep 17 00:00:00 2001 From: John Snow Date: Fri, 25 Feb 2022 15:59:45 -0500 Subject: [PATCH 07/11] python/aqmp: stop the server during disconnect() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before we allow the full separation of starting the server and accepting new connections, make sure that the disconnect cleans up the server and its new state, too. Signed-off-by: John Snow Acked-by: Kevin Wolf Reviewed-by: Daniel P. Berrangé Message-id: 20220225205948.3693480-8-jsnow@redhat.com Signed-off-by: John Snow --- python/qemu/aqmp/protocol.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py index e2bdad542d..cdbc9cba0d 100644 --- a/python/qemu/aqmp/protocol.py +++ b/python/qemu/aqmp/protocol.py @@ -432,7 +432,7 @@ class AsyncProtocol(Generic[T]): self._runstate_event.set() self._runstate_event.clear() - @bottom_half # However, it does not run from the R/W tasks. + @bottom_half async def _stop_server(self) -> None: """ Stop listening for / accepting new incoming connections. @@ -709,6 +709,7 @@ class AsyncProtocol(Generic[T]): self._reader = None self._writer = None + self._accepted = None # NB: _runstate_changed cannot be cleared because we still need it to # send the final runstate changed event ...! @@ -732,6 +733,9 @@ class AsyncProtocol(Generic[T]): def _done(task: Optional['asyncio.Future[Any]']) -> bool: return task is not None and task.done() + # If the server is running, stop it. + await self._stop_server() + # Are we already in an error pathway? If either of the tasks are # already done, or if we have no tasks but a reader/writer; we # must be. From 481607c7d35de2bc4d9bec7f4734036fc467f330 Mon Sep 17 00:00:00 2001 From: John Snow Date: Fri, 25 Feb 2022 15:59:46 -0500 Subject: [PATCH 08/11] python/aqmp: add start_server() and accept() methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add start_server() and accept() methods that can be used instead of start_server_and_accept() to allow more fine-grained control over the incoming connection process. (Eagle-eyed reviewers will surely notice that it's a bit weird that "CONNECTING" is a state that's shared between both the start_server() and connect() states. That's absolutely true, and it's very true that checking on the presence of _accepted as an indicator of state is a hack. That's also very certainly true. But ... this keeps client code an awful lot simpler, as it doesn't have to care exactly *how* the connection is being made, just that it *is*. Is it worth disrupting that simplicity in order to provide a better state guard on `accept()`? Hm.) Signed-off-by: John Snow Acked-by: Kevin Wolf Reviewed-by: Daniel P. Berrangé Message-id: 20220225205948.3693480-9-jsnow@redhat.com Signed-off-by: John Snow --- python/qemu/aqmp/protocol.py | 67 +++++++++++++++++++++++++++++++++--- python/tests/protocol.py | 7 ++++ 2 files changed, 69 insertions(+), 5 deletions(-) diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py index cdbc9cba0d..2ecba14555 100644 --- a/python/qemu/aqmp/protocol.py +++ b/python/qemu/aqmp/protocol.py @@ -280,6 +280,8 @@ class AsyncProtocol(Generic[T]): Accept a connection and begin processing message queues. If this call fails, `runstate` is guaranteed to be set back to `IDLE`. + This method is precisely equivalent to calling `start_server()` + followed by `accept()`. :param address: Address to listen on; UNIX socket path or TCP address/port. @@ -294,9 +296,62 @@ class AsyncProtocol(Generic[T]): protocol-level failure occurs while establishing a new session, the wrapped error may also be an `QMPError`. """ + await self.start_server(address, ssl) + await self.accept() + assert self.runstate == Runstate.RUNNING + + @upper_half + @require(Runstate.IDLE) + async def start_server(self, address: SocketAddrT, + ssl: Optional[SSLContext] = None) -> None: + """ + Start listening for an incoming connection, but do not wait for a peer. + + This method starts listening for an incoming connection, but + does not block waiting for a peer. This call will return + immediately after binding and listening on a socket. A later + call to `accept()` must be made in order to finalize the + incoming connection. + + :param address: + Address to listen on; UNIX socket path or TCP address/port. + :param ssl: SSL context to use, if any. + + :raise StateError: When the `Runstate` is not `IDLE`. + :raise ConnectError: + When the server could not start listening on this address. + + This exception will wrap a more concrete one. In most cases, + the wrapped exception will be `OSError`. + """ await self._session_guard( self._do_start_server(address, ssl), 'Failed to establish connection') + assert self.runstate == Runstate.CONNECTING + + @upper_half + @require(Runstate.CONNECTING) + async def accept(self) -> None: + """ + Accept an incoming connection and begin processing message queues. + + If this call fails, `runstate` is guaranteed to be set back to `IDLE`. + + :raise StateError: When the `Runstate` is not `CONNECTING`. + :raise QMPError: When `start_server()` was not called yet. + :raise ConnectError: + When a connection or session cannot be established. + + This exception will wrap a more concrete one. In most cases, + the wrapped exception will be `OSError` or `EOFError`. If a + protocol-level failure occurs while establishing a new + session, the wrapped error may also be an `QMPError`. + """ + if self._accepted is None: + raise QMPError("Cannot call accept() before start_server().") + await self._session_guard( + self._do_accept(), + 'Failed to establish connection') await self._session_guard( self._establish_session(), 'Failed to establish session') @@ -512,7 +567,12 @@ class AsyncProtocol(Generic[T]): async def _do_start_server(self, address: SocketAddrT, ssl: Optional[SSLContext] = None) -> None: """ - Acting as the transport server, accept a single connection. + Start listening for an incoming connection, but do not wait for a peer. + + This method starts listening for an incoming connection, but does not + block waiting for a peer. This call will return immediately after + binding and listening to a socket. A later call to accept() must be + made in order to finalize the incoming connection. :param address: Address to listen on; UNIX socket path or TCP address/port. @@ -554,10 +614,7 @@ class AsyncProtocol(Generic[T]): # This will start the server (bind(2), listen(2)). It will also # call accept(2) if we yield, but we don't block on that here. self._server = await coro - - # Just for this one commit, wait for a peer. - # This gets split out in the next patch. - await self._do_accept() + self.logger.debug("Server listening on %s", address) @upper_half async def _do_accept(self) -> None: diff --git a/python/tests/protocol.py b/python/tests/protocol.py index 5e442e1efb..d6849ad306 100644 --- a/python/tests/protocol.py +++ b/python/tests/protocol.py @@ -43,11 +43,18 @@ class NullProtocol(AsyncProtocol[None]): async def _do_start_server(self, address, ssl=None): if self.fake_session: + self._accepted = asyncio.Event() self._set_state(Runstate.CONNECTING) await asyncio.sleep(0) else: await super()._do_start_server(address, ssl) + async def _do_accept(self): + if self.fake_session: + self._accepted = None + else: + await super()._do_accept() + async def _do_connect(self, address, ssl=None): if self.fake_session: self._set_state(Runstate.CONNECTING) From 673856f9d889dc50b6a1a7964df960c4f00c7c93 Mon Sep 17 00:00:00 2001 From: John Snow Date: Fri, 25 Feb 2022 15:59:47 -0500 Subject: [PATCH 09/11] python/aqmp: fix race condition in legacy.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit legacy.py provides a synchronous model. iotests frequently uses this paradigm: - create QMP client object - start QEMU process - await connection from QEMU process In the switch from sync to async QMP, the QMP client object stopped calling bind() and listen() during the QMP object creation step, which creates a race condition if the QEMU process dials in too quickly. With refactoring out of the way, restore the former behavior of calling bind() and listen() during __init__() to fix this race condition. Signed-off-by: John Snow Acked-by: Kevin Wolf Reviewed-by: Daniel P. Berrangé Message-id: 20220225205948.3693480-10-jsnow@redhat.com [Expanded commit message. --js] Signed-off-by: John Snow --- python/qemu/aqmp/legacy.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/python/qemu/aqmp/legacy.py b/python/qemu/aqmp/legacy.py index dca1e76ed4..cb50e60564 100644 --- a/python/qemu/aqmp/legacy.py +++ b/python/qemu/aqmp/legacy.py @@ -57,7 +57,7 @@ class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol): self._timeout: Optional[float] = None if server: - self._aqmp._bind_hack(address) # pylint: disable=protected-access + self._sync(self._aqmp.start_server(address)) _T = TypeVar('_T') @@ -90,10 +90,7 @@ class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol): self._aqmp.await_greeting = True self._aqmp.negotiate = True - self._sync( - self._aqmp.start_server_and_accept(self._address), - timeout - ) + self._sync(self._aqmp.accept(), timeout) ret = self._get_greeting() assert ret is not None From 4c1fe7003c9b373acb0791b4356e2285a10365c0 Mon Sep 17 00:00:00 2001 From: John Snow Date: Fri, 25 Feb 2022 15:59:48 -0500 Subject: [PATCH 10/11] python/aqmp: drop _bind_hack() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _bind_hack() was a quick fix to allow async QMP to call bind(2) prior to calling listen(2) and accept(2). This wasn't sufficient to fully address the race condition present in synchronous clients. With the race condition in legacy.py fixed (see the previous commit), there are no longer any users of _bind_hack(). Drop it. Fixes: b0b662bb2b3 Signed-off-by: John Snow Acked-by: Kevin Wolf Reviewed-by: Daniel P. Berrangé Message-id: 20220225205948.3693480-11-jsnow@redhat.com [Expanded commit message. --js] Signed-off-by: John Snow --- python/qemu/aqmp/legacy.py | 2 +- python/qemu/aqmp/protocol.py | 41 +++--------------------------------- 2 files changed, 4 insertions(+), 39 deletions(-) diff --git a/python/qemu/aqmp/legacy.py b/python/qemu/aqmp/legacy.py index cb50e60564..46026e9fdc 100644 --- a/python/qemu/aqmp/legacy.py +++ b/python/qemu/aqmp/legacy.py @@ -57,7 +57,7 @@ class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol): self._timeout: Optional[float] = None if server: - self._sync(self._aqmp.start_server(address)) + self._sync(self._aqmp.start_server(self._address)) _T = TypeVar('_T') diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py index 2ecba14555..36fae57f27 100644 --- a/python/qemu/aqmp/protocol.py +++ b/python/qemu/aqmp/protocol.py @@ -18,7 +18,6 @@ from asyncio import StreamReader, StreamWriter from enum import Enum from functools import wraps import logging -import socket from ssl import SSLContext from typing import ( Any, @@ -242,9 +241,6 @@ class AsyncProtocol(Generic[T]): self._runstate = Runstate.IDLE self._runstate_changed: Optional[asyncio.Event] = None - # Workaround for bind() - self._sock: Optional[socket.socket] = None - # Server state for start_server() and _incoming() self._server: Optional[asyncio.AbstractServer] = None self._accepted: Optional[asyncio.Event] = None @@ -535,34 +531,6 @@ class AsyncProtocol(Generic[T]): self._reader, self._writer = (reader, writer) self._accepted.set() - def _bind_hack(self, address: Union[str, Tuple[str, int]]) -> None: - """ - Used to create a socket in advance of accept(). - - This is a workaround to ensure that we can guarantee timing of - precisely when a socket exists to avoid a connection attempt - bouncing off of nothing. - - Python 3.7+ adds a feature to separate the server creation and - listening phases instead, and should be used instead of this - hack. - """ - if isinstance(address, tuple): - family = socket.AF_INET - else: - family = socket.AF_UNIX - - sock = socket.socket(family, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - try: - sock.bind(address) - except: - sock.close() - raise - - self._sock = sock - @upper_half async def _do_start_server(self, address: SocketAddrT, ssl: Optional[SSLContext] = None) -> None: @@ -589,21 +557,19 @@ class AsyncProtocol(Generic[T]): if isinstance(address, tuple): coro = asyncio.start_server( self._incoming, - host=None if self._sock else address[0], - port=None if self._sock else address[1], + host=address[0], + port=address[1], ssl=ssl, backlog=1, limit=self._limit, - sock=self._sock, ) else: coro = asyncio.start_unix_server( self._incoming, - path=None if self._sock else address, + path=address, ssl=ssl, backlog=1, limit=self._limit, - sock=self._sock, ) # Allow runstate watchers to witness 'CONNECTING' state; some @@ -630,7 +596,6 @@ class AsyncProtocol(Generic[T]): await self._accepted.wait() assert self._server is None self._accepted = None - self._sock = None self.logger.debug("Connection accepted.") From 7cba010e821bf227e5fa016d0df06f2a33a0c318 Mon Sep 17 00:00:00 2001 From: John Snow Date: Fri, 25 Feb 2022 12:08:28 -0500 Subject: [PATCH 11/11] scripts/qmp-shell-wrap: Fix import path Mea culpa. Dan's patch wound up with the wrong import path because I re-ordered my most recent pull request and missed that this needed a fix on rebase. Fixes: 43912529 Reported-by: Kashyap Chamarthy Signed-off-by: John Snow Tested-by: Kashyap Chamarthy Message-id: 20220225170828.3418305-1-jsnow@redhat.com Signed-off-by: John Snow --- scripts/qmp/qmp-shell-wrap | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/qmp/qmp-shell-wrap b/scripts/qmp/qmp-shell-wrap index 9e94da114f..66846e36d1 100755 --- a/scripts/qmp/qmp-shell-wrap +++ b/scripts/qmp/qmp-shell-wrap @@ -4,7 +4,7 @@ import os import sys sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) -from qemu.qmp import qmp_shell +from qemu.aqmp import qmp_shell if __name__ == '__main__':