extmod/asyncio: Make current_task raise exception when there is no task.

Matches CPython behaviour.

Fixes issue #11530.

Signed-off-by: Damien George <damien@micropython.org>
This commit is contained in:
Damien George 2024-02-26 10:52:12 +11:00
parent 8fdcc25eb0
commit 8692d2602a
2 changed files with 16 additions and 0 deletions

View File

@ -164,6 +164,7 @@ def run_until_complete(main_task=None):
dt = max(0, ticks_diff(t.ph_key, ticks()))
elif not _io_queue.map:
# No tasks can be woken so finished running
cur_task = None
return
# print('(poll {})'.format(dt), len(_io_queue.map))
_io_queue.wait_io_event(dt)
@ -188,6 +189,7 @@ def run_until_complete(main_task=None):
assert t.data is None
# This task is done, check if it's the main task and then loop should stop
if t is main_task:
cur_task = None
if isinstance(er, StopIteration):
return er.value
raise er
@ -242,6 +244,7 @@ async def _stopper():
pass
cur_task = None
_stop_task = None
@ -291,6 +294,8 @@ def get_event_loop(runq_len=0, waitq_len=0):
def current_task():
if cur_task is None:
raise RuntimeError("no running event loop")
return cur_task

View File

@ -19,4 +19,15 @@ async def main():
print(t is result[0])
try:
print(asyncio.current_task())
except RuntimeError:
print("RuntimeError")
asyncio.run(main())
try:
print(asyncio.current_task())
except RuntimeError:
print("RuntimeError")