diff --git a/external/devlib/devlib/utils/asyn.py b/external/devlib/devlib/utils/asyn.py index b2d54afb536c0cb50a2ff4229623188041b027d6..c9d1e41ef45b6af71ad0ce855eba8585e570a2f8 100644 --- a/external/devlib/devlib/utils/asyn.py +++ b/external/devlib/devlib/utils/asyn.py @@ -21,6 +21,7 @@ Async-related utilities import abc import asyncio import asyncio.events +import atexit import functools import itertools import contextlib @@ -46,6 +47,20 @@ def create_task(awaitable, name=None): return task +def _close_loop(loop): + if loop is not None: + try: + loop.run_until_complete(loop.shutdown_asyncgens()) + try: + shutdown_default_executor = loop.shutdown_default_executor + except AttributeError: + pass + else: + loop.run_until_complete(shutdown_default_executor()) + finally: + loop.close() + + class AsyncManager: def __init__(self): self.task_tree = dict() @@ -321,10 +336,11 @@ class _Genlet(greenlet): except StopIteration as e: return e.value else: + parent = self.parent # Switch back to the consumer that returns the values via # send() try: - value = self.consumer_genlet.switch(future) + value = parent.switch(future) except BaseException as e: excep = e value = None @@ -344,7 +360,7 @@ class _Genlet(greenlet): return g def _send_throw(self, value, excep): - self.consumer_genlet = greenlet.getcurrent() + self.parent = greenlet.getcurrent() # Switch back to the function yielding values if excep is None: @@ -377,15 +393,23 @@ class _AwaitableGenlet: @classmethod def wrap_coro(cls, coro): - if _Genlet.get_enclosing() is None: - # Create a top-level _Genlet that all nested runs will use to yield - # their futures - aw = cls(coro) - async def coro_f(): - return await aw - return coro_f() - else: - return coro + async def coro_f(): + # Make sure every new task will be instrumented since a task cannot + # yield futures on behalf of another task. If that were to happen, + # the task B trying to do a nested yield would switch back to task + # A, asking to yield on its behalf. Since the event loop would be + # currently handling task B, nothing would handle task A trying to + # yield on behalf of B, leading to a deadlock. + loop = asyncio.get_running_loop() + _install_task_factory(loop) + + # Create a top-level _AwaitableGenlet that all nested runs will use + # to yield their futures + _coro = cls(coro) + + return await _coro + + return coro_f() def __init__(self, coro): self._coro = coro @@ -433,6 +457,13 @@ class _AwaitableGenlet: return gen +def _allow_nested_run(coro): + if _Genlet.get_enclosing() is None: + return _AwaitableGenlet.wrap_coro(coro) + else: + return coro + + def allow_nested_run(coro): """ Wrap the coroutine ``coro`` such that nested calls to :func:`run` will be @@ -441,15 +472,7 @@ def allow_nested_run(coro): .. warning:: The coroutine needs to be consumed in the same OS thread it was created in. """ - return _allow_nested_run(coro, loop=None) - - -def _allow_nested_run(coro, loop=None): - return _do_allow_nested_run(coro) - - -def _do_allow_nested_run(coro): - return _AwaitableGenlet.wrap_coro(coro) + return _allow_nested_run(coro) # This thread runs coroutines that cannot be ran on the event loop in the @@ -457,26 +480,64 @@ def _do_allow_nested_run(coro): # another event loop has been setup, so we can wrap coroutines before # dispatching them there. _CORO_THREAD_EXECUTOR = ThreadPoolExecutor(max_workers=1) -def _coro_thread_f(coro): +_CORO_THREAD_EXECUTOR_LOOP = None + + +def _check_executor_alive(executor): try: - loop = asyncio.get_running_loop() + executor.submit(lambda: None) except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + return False + else: + return True + + +def _shutdown_thread_loop(): + global _CORO_THREAD_EXECUTOR_LOOP + + loop = _CORO_THREAD_EXECUTOR_LOOP + _CORO_THREAD_EXECUTOR_LOOP = None + # As per the documentation, ThreadPoolExecutor will .shutdown() before any + # atexit handler get to run, so we can safely close the event loop it would + # be using. + # https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor + assert not _check_executor_alive(_CORO_THREAD_EXECUTOR) + _close_loop(loop) + + +atexit.register(_shutdown_thread_loop) + + +def _coro_thread_f(coro): + global _CORO_THREAD_EXECUTOR_LOOP + + if _CORO_THREAD_EXECUTOR_LOOP is None: + _CORO_THREAD_EXECUTOR_LOOP = asyncio.new_event_loop() + + loop = _CORO_THREAD_EXECUTOR_LOOP + asyncio.set_event_loop(loop) - _install_task_factory(loop) # The coroutine needs to be wrapped in the same thread that will consume it, - coro = _allow_nested_run(coro, loop) + coro = _allow_nested_run(coro) return loop.run_until_complete(coro) def _run_in_thread(coro): + executor = _CORO_THREAD_EXECUTOR + # This is a truly blocking operation, which will block the caller's event # loop. However, this also prevents most thread safety issues as the # calling code will not run concurrently with the coroutine. We also don't # have a choice anyway. - future = _CORO_THREAD_EXECUTOR.submit(_coro_thread_f, coro) - return future.result() + try: + future = executor.submit(_coro_thread_f, coro) + except RuntimeError as e: + if _check_executor_alive(executor): + raise e + else: + raise RuntimeError('Devlib relies on nested asyncio implementation requiring threads. These threads are not available while shutting down the interpreter.') + else: + return future.result() _PATCHED_LOOP_LOCK = threading.Lock() @@ -498,7 +559,9 @@ def _install_task_factory(loop): make_task = loop.get_task_factory() or default_factory def factory(loop, coro, context=None): - coro = _allow_nested_run(coro, loop) + # Make sure each Task will be able to yield on behalf of its nested + # await beneath blocking layers + coro = _AwaitableGenlet.wrap_coro(coro) return make_task(loop, coro, context=context) loop.set_task_factory(factory) @@ -511,27 +574,21 @@ def _install_task_factory(loop): _PATCHED_LOOP.add(loop) -def _patch_current_loop(): - try: - loop = asyncio.get_running_loop() - except RuntimeError: - pass - else: - _install_task_factory(loop) - - -# Patch the currently running event loop if any, to increase the chances of not -# having to use the _CORO_THREAD_EXECUTOR -_patch_current_loop() - - def run(coro): """ Similar to :func:`asyncio.run` but can be called while an event loop is running if a coroutine higher in the callstack has been wrapped using :func:`allow_nested_run`. """ + is_loop_owned, loop, go = _run(coro) + try: + return go() + finally: + if is_loop_owned: + _close_loop(loop) + +def _run(coro): # Ensure we have a fresh coroutine. inspect.getcoroutinestate() does not # work on all objects that asyncio creates on some version of Python, such # as iterable_coroutine @@ -540,20 +597,21 @@ def run(coro): try: loop = asyncio.get_running_loop() except RuntimeError: - # We are not currently running an event loop, so it's ok to just use - # asyncio.run() and let it create one. - # Once the coroutine is wrapped, we will be able to yield across - # blocking function boundaries thanks to _Genlet - return asyncio.run(_do_allow_nested_run(coro)) + loop = asyncio.new_event_loop() + def go(): + # Once the coroutine is wrapped, we will be able to yield across + # blocking function boundaries thanks to _Genlet + asyncio.set_event_loop(loop) + _coro = _allow_nested_run(coro) + return loop.run_until_complete(_coro) + return (True, loop, go) else: - return _run_in_loop(loop, coro) + def go(): + return _run_in_loop(loop, coro) + return (False, loop, go) def _run_in_loop(loop, coro): - # Increase the odds that in the future, we have a wrapped coroutine in - # our callstack to avoid the _run_in_thread() path. - _install_task_factory(loop) - if loop.is_running(): g = _Genlet.get_enclosing() if g is None: @@ -569,10 +627,7 @@ def _run_in_loop(loop, coro): # able to use as a conduit to yield the futures. return g.consume_coro(coro, None) else: - # In the odd case a loop was installed but is not running, we just - # use it. With _install_task_factory(), we should have the - # top-level Task run an instrumented coroutine (wrapped with - # allow_nested_run()) + coro = _allow_nested_run(coro) return loop.run_until_complete(coro) @@ -627,44 +682,49 @@ def asyncf(f): ) +class _AsyncPolymorphicCMState: + def __init__(self): + self.nesting = 0 + self.loop = None + self.is_loop_owned = False + + class _AsyncPolymorphicCM: """ Wrap an async context manager such that it exposes a synchronous API as well for backward compatibility. """ - _nested = threading.local() - def _get_nesting(self): + def __init__(self, async_cm): + self.cm = async_cm + self._state = threading.local() + + def _get_state(self): try: - return self._nested.x + return self._state.x except AttributeError: - self._nested.x = 0 - return 0 + state = _AsyncPolymorphicCMState() + self._state.x = state + return state + + def _delete_state(self): + state = self._get_state() + try: + loop = state.loop + is_loop_owned = state.is_loop_owned + if loop is not None and is_loop_owned: + _close_loop(loop) + finally: + del self._state.x def _update_nesting(self, n): - x = self._get_nesting() + n - self._nested.x = x + state = self._get_state() + x = state.nesting + assert x >= 0 + x = x + n + state.nesting = x return bool(x) - def __init__(self, async_cm): - self.cm = async_cm - self._loop = None - - def _close_loop(self): - reentered = self._update_nesting(0) - if not reentered: - loop = self._loop - self._loop = None - if loop is not None: - loop.run_until_complete(loop.shutdown_asyncgens()) - try: - shutdown_default_executor = loop.shutdown_default_executor - except AttributeError: - pass - else: - loop.run_until_complete(shutdown_default_executor()) - loop.close() - def __aenter__(self, *args, **kwargs): return self.cm.__aenter__(*args, **kwargs) @@ -672,37 +732,48 @@ class _AsyncPolymorphicCM: return self.cm.__aexit__(*args, **kwargs) def __enter__(self, *args, **kwargs): - self._update_nesting(1) coro = self.cm.__aenter__(*args, **kwargs) - # If there is already a running loop, no need to create a new one + is_loop_owned, loop, go = _run(coro) + self._set_loop(is_loop_owned, loop) + + # Increase the nesting count _before_ we start running the + # coroutine, in case it is a recursive context manager + self._update_nesting(1) + try: - asyncio.get_running_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - self._loop = loop - try: - asyncio.set_event_loop(loop) - return _run_in_loop(loop, coro) - except BaseException: - self._close_loop() - raise - else: - return run(coro) + return go() + except BaseException: + self._close_loop() + raise def __exit__(self, *args, **kwargs): + coro = self.cm.__aexit__(*args, **kwargs) + state = self._get_state() + loop = state.loop try: - self._update_nesting(-1) - coro = self.cm.__aexit__(*args, **kwargs) - loop = self._loop - if loop is None: - return run(coro) - else: - return _run_in_loop(loop, coro) + return _run_in_loop(loop, coro) finally: + nesting = self._update_nesting(-1) self._close_loop() + def _set_loop(self, is_loop_owned, loop): + nesting = self._update_nesting(0) + state = self._get_state() + if nesting: + assert not is_loop_owned + assert loop is state.loop + else: + assert state.loop is None + state.loop = loop + state.is_loop_owned = is_loop_owned + + def _close_loop(self, check_nesting=True): + nesting = self._update_nesting(0) + if (not check_nesting) or (not nesting): + self._delete_state() + def __del__(self): - self._close_loop() + self._close_loop(check_nesting=False) def asynccontextmanager(f): diff --git a/external/devlib/tests/test_asyn.py b/external/devlib/tests/test_asyn.py index b743084bbe1d0225b742b4bbe1bb5af53cca55e0..2ae3757d491d909ab1ff0ff655921035c234b45d 100644 --- a/external/devlib/tests/test_asyn.py +++ b/external/devlib/tests/test_asyn.py @@ -508,14 +508,18 @@ def _test_run_with_setup(setup): asyncio.set_event_loop(loop) # Simulate case where devlib is ran in a context where the main app has # set an event loop at some point - return asyncio.run(coro) + try: + return asyncio.run(coro) + finally: + loop.close() def run_with_existing_loop2(coro): # This is similar to how things are executed on IPython/jupyterlab loop = asyncio.new_event_loop() - x = loop.run_until_complete(coro) - loop.close() - return x + try: + return loop.run_until_complete(coro) + finally: + loop.close() def run_with_to_thread(top_run, coro): # Add a layer of asyncio.to_thread(), to simulate a case where users @@ -553,7 +557,10 @@ def test_run_stdlib(): def setup(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - yield asyncio.run + try: + yield asyncio.run + finally: + loop.close() _test_run_with_setup(setup)