From a7d766908a6d5f92354c812a924348bf58fc2781 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Thu, 1 Aug 2024 18:09:24 +0100 Subject: [PATCH] Squashed 'external/devlib/' changes from 2e5efb815..4fe9bb047 4fe9bb047 ftrace: Add write-to-disk mode 95df75062 connection: Support all signals in BackgroundCommand.send_signal() d07cc5868 utils/asyn: Replace nest_asyncio with greenlet 3ada269ce utils/asyn: Factor out the calls to asyncio.run 9f5c19dad target: Allow reuse of a connection once the owning thread is terminated f343642de tests: Add tests for nested async support d63ea8e6e utils/asyn: Fix memoized_method.__set_name__ REVERT: 2e5efb815 ftrace: Add write-to-disk mode REVERT: ee9317ec1 connection: Support all signals in BackgroundCommand.send_signal() REVERT: b9d2317a6 utils/asyn: Replace nest_asyncio with greenlet REVERT: 7e0c3e862 Revert "utils/asyn: Replace nest_asyncio with greenback" REVERT: 3ca1bb105 utils/asyn: Replace nest_asyncio with greenback REVERT: 83a566a49 utils/asyn: Factor out the calls to asyncio.run REVERT: 54b39a403 target: Allow reuse of a connection once the owning thread is terminated REVERT: 11fc36e7c tests: Add tests for nested async support git-subtree-dir: external/devlib git-subtree-split: 4fe9bb047f46b004953c0cb760f934c0e966c002 --- devlib/utils/asyn.py | 420 ++++++++++++++++++++++++++++--------------- tests/test_asyn.py | 25 +++ 2 files changed, 302 insertions(+), 143 deletions(-) diff --git a/devlib/utils/asyn.py b/devlib/utils/asyn.py index c9d1e41ef..fd518d905 100644 --- a/devlib/utils/asyn.py +++ b/devlib/utils/asyn.py @@ -20,12 +20,12 @@ Async-related utilities import abc import asyncio -import asyncio.events -import atexit +import contextvars import functools import itertools import contextlib import pathlib +import queue import os.path import inspect import sys @@ -300,7 +300,7 @@ class memoized_method: raise RuntimeError("Cannot monkey-patch a memoized function") def __set_name__(self, owner, name): - self.name = name + self._name = name class _Genlet(greenlet): @@ -309,6 +309,14 @@ class _Genlet(greenlet): to make their parent yield on their behalf, as if callees could decide to be annotated ``yield from`` without modifying the caller. """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # Forward the context variables to the greenlet, which will not happen + # by default: + # https://greenlet.readthedocs.io/en/latest/contextvars.html + self.gr_context = contextvars.copy_context() + @classmethod def from_coro(cls, coro): """ @@ -347,7 +355,6 @@ class _Genlet(greenlet): else: excep = None - @classmethod def get_enclosing(cls): """ @@ -440,6 +447,8 @@ class _AwaitableGenlet: future = gen.gen_throw(excep) except StopIteration as e: return e.value + finally: + _set_current_context(gen.gr_context) try: value = yield future @@ -479,8 +488,12 @@ def allow_nested_run(coro): # current thread. Instead, they are scheduled in a separate thread where # another event loop has been setup, so we can wrap coroutines before # dispatching them there. -_CORO_THREAD_EXECUTOR = ThreadPoolExecutor(max_workers=1) -_CORO_THREAD_EXECUTOR_LOOP = None +_CORO_THREAD_EXECUTOR = ThreadPoolExecutor( + # Allow for a ridiculously large number so that we will never end up + # queuing one job after another. This is critical as we could otherwise end + # up in deadlock, if a job triggers another job and waits for it. + max_workers=2**64, +) def _check_executor_alive(executor): @@ -492,57 +505,8 @@ def _check_executor_alive(executor): return True -def _shutdown_thread_loop(): - global _CORO_THREAD_EXECUTOR_LOOP - - loop = _CORO_THREAD_EXECUTOR_LOOP - _CORO_THREAD_EXECUTOR_LOOP = None - # As per the documentation, ThreadPoolExecutor will .shutdown() before any - # atexit handler get to run, so we can safely close the event loop it would - # be using. - # https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor - assert not _check_executor_alive(_CORO_THREAD_EXECUTOR) - _close_loop(loop) - - -atexit.register(_shutdown_thread_loop) - - -def _coro_thread_f(coro): - global _CORO_THREAD_EXECUTOR_LOOP - - if _CORO_THREAD_EXECUTOR_LOOP is None: - _CORO_THREAD_EXECUTOR_LOOP = asyncio.new_event_loop() - - loop = _CORO_THREAD_EXECUTOR_LOOP - asyncio.set_event_loop(loop) - - # The coroutine needs to be wrapped in the same thread that will consume it, - coro = _allow_nested_run(coro) - return loop.run_until_complete(coro) - - -def _run_in_thread(coro): - executor = _CORO_THREAD_EXECUTOR - - # This is a truly blocking operation, which will block the caller's event - # loop. However, this also prevents most thread safety issues as the - # calling code will not run concurrently with the coroutine. We also don't - # have a choice anyway. - try: - future = executor.submit(_coro_thread_f, coro) - except RuntimeError as e: - if _check_executor_alive(executor): - raise e - else: - raise RuntimeError('Devlib relies on nested asyncio implementation requiring threads. These threads are not available while shutting down the interpreter.') - else: - return future.result() - - _PATCHED_LOOP_LOCK = threading.Lock() _PATCHED_LOOP = WeakSet() - def _install_task_factory(loop): """ Install a task factory on the given event ``loop`` so that top-level @@ -574,61 +538,224 @@ def _install_task_factory(loop): _PATCHED_LOOP.add(loop) -def run(coro): +def _set_current_context(ctx): """ - Similar to :func:`asyncio.run` but can be called while an event loop is - running if a coroutine higher in the callstack has been wrapped using + Get all the variable from the passed ``ctx`` and set them in the current + context. + """ + for var, val in ctx.items(): + var.set(val) + + +class _CoroRunner(abc.ABC): + """ + ABC for an object that can execute multiple coroutines in a given + environment. + + This allows running coroutines for which it might be an assumption, such as + the awaitables yielded by an async generator that are all attached to a + single event loop. + """ + @abc.abstractmethod + def _run(self, coro): + pass + + def run(self, coro): + # Ensure we have a fresh coroutine. inspect.getcoroutinestate() does not + # work on all objects that asyncio creates on some version of Python, such + # as iterable_coroutine + assert not (inspect.iscoroutine(coro) and coro.cr_running) + return self._run(coro) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tb): + pass + + +class _ThreadCoroRunner(_CoroRunner): + """ + Run the coroutines on a thread picked from a + :class:`concurrent.futures.ThreadPoolExecutor`. + + Critically, this allows running multiple coroutines out of the same thread, + which will be reserved until the runner ``__exit__`` method is called. + """ + def __init__(self, future, jobq, resq): + self._future = future + self._jobq = jobq + self._resq = resq + + @staticmethod + def _thread_f(jobq, resq): + def handle_jobs(runner): + while True: + job = jobq.get() + if job is None: + return + else: + ctx, coro = job + try: + value = ctx.run(runner.run, coro) + except BaseException as e: + value = None + excep = e + else: + excep = None + + resq.put((ctx, excep, value)) + + with _LoopCoroRunner(None) as runner: + handle_jobs(runner) + + @classmethod + def from_executor(cls, executor): + jobq = queue.SimpleQueue() + resq = queue.SimpleQueue() + + try: + future = executor.submit(cls._thread_f, jobq, resq) + except RuntimeError as e: + if _check_executor_alive(executor): + raise e + else: + raise RuntimeError('Devlib relies on nested asyncio implementation requiring threads. These threads are not available while shutting down the interpreter.') + + return cls( + jobq=jobq, + resq=resq, + future=future, + ) + + def _run(self, coro): + ctx = contextvars.copy_context() + self._jobq.put((ctx, coro)) + ctx, excep, value = self._resq.get() + + _set_current_context(ctx) + + if excep is None: + return value + else: + raise excep + + def __exit__(self, *args, **kwargs): + self._jobq.put(None) + self._future.result() + + +class _LoopCoroRunner(_CoroRunner): + """ + Run a coroutine on the given event loop. + + The passed event loop is assumed to not be running. If ``None`` is passed, + a new event loop will be created in ``__enter__`` and closed in + ``__exit__``. + """ + def __init__(self, loop): + self.loop = loop + self._owned = False + + def _run(self, coro): + loop = self.loop + + # Back-propagate the contextvars that could have been modified by the + # coroutine. This could be handled by asyncio.Runner().run(..., + # context=...) or loop.create_task(..., context=...) but these APIs are + # only available since Python 3.11 + ctx = None + async def capture_ctx(): + nonlocal ctx + try: + return await _allow_nested_run(coro) + finally: + ctx = contextvars.copy_context() + + try: + return loop.run_until_complete(capture_ctx()) + finally: + _set_current_context(ctx) + + def __enter__(self): + loop = self.loop + if loop is None: + owned = True + loop = asyncio.new_event_loop() + else: + owned = False + + asyncio.set_event_loop(loop) + + self.loop = loop + self._owned = owned + return self + + def __exit__(self, *args, **kwargs): + if self._owned: + asyncio.set_event_loop(None) + _close_loop(self.loop) + + +class _GenletCoroRunner(_CoroRunner): + """ + Run a coroutine assuming one of the parent coroutines was wrapped with :func:`allow_nested_run`. """ - is_loop_owned, loop, go = _run(coro) - try: - return go() - finally: - if is_loop_owned: - _close_loop(loop) + def __init__(self, g): + self._g = g + def _run(self, coro): + return self._g.consume_coro(coro, None) -def _run(coro): - # Ensure we have a fresh coroutine. inspect.getcoroutinestate() does not - # work on all objects that asyncio creates on some version of Python, such - # as iterable_coroutine - assert not (inspect.iscoroutine(coro) and coro.cr_running) +def _get_runner(): + executor = _CORO_THREAD_EXECUTOR + g = _Genlet.get_enclosing() try: loop = asyncio.get_running_loop() except RuntimeError: - loop = asyncio.new_event_loop() - def go(): - # Once the coroutine is wrapped, we will be able to yield across - # blocking function boundaries thanks to _Genlet - asyncio.set_event_loop(loop) - _coro = _allow_nested_run(coro) - return loop.run_until_complete(_coro) - return (True, loop, go) + loop = None + + # We have an coroutine wrapped with allow_nested_run() higher in the + # callstack, that we will be able to use as a conduit to yield the + # futures. + if g is not None: + return _GenletCoroRunner(g) + # No event loop setup, so we can just make our own + elif loop is None: + return _LoopCoroRunner(None) + # There is an event loop setup, but it is not currently running so we + # can just re-use it. + # + # TODO: for now, this path is dead since asyncio.get_running_loop() will + # always raise a RuntimeError if the loop is not running, even if + # asyncio.set_event_loop() was used. + elif not loop.is_running(): + return _LoopCoroRunner(loop) + # There is an event loop currently running in our thread, so we cannot + # just create another event loop and install it since asyncio forbids + # that. The only choice is doing this in a separate thread that we + # fully control. else: - def go(): - return _run_in_loop(loop, coro) - return (False, loop, go) - - -def _run_in_loop(loop, coro): - if loop.is_running(): - g = _Genlet.get_enclosing() - if g is None: - # If we are not running under a wrapped coroutine, we don't - # have a choice and we need to run in a separate event loop. We - # cannot just create another event loop and install it, as - # asyncio forbids that, so the only choice is doing this in a - # separate thread that we fully control. - return _run_in_thread(coro) - else: - # This requires that we have an coroutine wrapped with - # allow_nested_run() higher in the callstack, that we will be - # able to use as a conduit to yield the futures. - return g.consume_coro(coro, None) - else: - coro = _allow_nested_run(coro) - return loop.run_until_complete(coro) + return _ThreadCoroRunner.from_executor(executor) + + +def run(coro): + """ + Similar to :func:`asyncio.run` but can be called while an event loop is + running if a coroutine higher in the callstack has been wrapped using + :func:`allow_nested_run`. + + Note that context variables from :mod:`contextvars` will be available in + the coroutine, and unlike with :func:`asyncio.run`, any update to them will + be reflected in the context of the caller. This allows context variable + updates to cross an arbitrary number of run layers, as if all those layers + were just part of the same coroutine. + """ + runner = _get_runner() + with runner as runner: + return runner.run(coro) def asyncf(f): @@ -685,8 +812,37 @@ def asyncf(f): class _AsyncPolymorphicCMState: def __init__(self): self.nesting = 0 - self.loop = None - self.is_loop_owned = False + self.runner = None + + def _update_nesting(self, n): + x = self.nesting + assert x >= 0 + x = x + n + self.nesting = x + return bool(x) + + def _get_runner(self): + runner = self.runner + if runner is None: + assert not self.nesting + runner = _get_runner() + runner.__enter__() + self.runner = runner + return runner + + def _cleanup_runner(self, force=False): + def cleanup(): + self.runner = None + if runner is not None: + runner.__exit__(None, None, None) + + runner = self.runner + if force: + cleanup() + else: + assert runner is not None + if not self._update_nesting(0): + cleanup() class _AsyncPolymorphicCM: @@ -708,22 +864,10 @@ class _AsyncPolymorphicCM: return state def _delete_state(self): - state = self._get_state() try: - loop = state.loop - is_loop_owned = state.is_loop_owned - if loop is not None and is_loop_owned: - _close_loop(loop) - finally: del self._state.x - - def _update_nesting(self, n): - state = self._get_state() - x = state.nesting - assert x >= 0 - x = x + n - state.nesting = x - return bool(x) + except AttributeError: + pass def __aenter__(self, *args, **kwargs): return self.cm.__aenter__(*args, **kwargs) @@ -731,49 +875,39 @@ class _AsyncPolymorphicCM: def __aexit__(self, *args, **kwargs): return self.cm.__aexit__(*args, **kwargs) + @staticmethod + def _exit(state): + state._update_nesting(-1) + state._cleanup_runner() + def __enter__(self, *args, **kwargs): - coro = self.cm.__aenter__(*args, **kwargs) - is_loop_owned, loop, go = _run(coro) - self._set_loop(is_loop_owned, loop) + state = self._get_state() + runner = state._get_runner() # Increase the nesting count _before_ we start running the # coroutine, in case it is a recursive context manager - self._update_nesting(1) + state._update_nesting(1) try: - return go() + coro = self.cm.__aenter__(*args, **kwargs) + return runner.run(coro) except BaseException: - self._close_loop() + self._exit(state) raise def __exit__(self, *args, **kwargs): coro = self.cm.__aexit__(*args, **kwargs) - state = self._get_state() - loop = state.loop - try: - return _run_in_loop(loop, coro) - finally: - nesting = self._update_nesting(-1) - self._close_loop() - def _set_loop(self, is_loop_owned, loop): - nesting = self._update_nesting(0) state = self._get_state() - if nesting: - assert not is_loop_owned - assert loop is state.loop - else: - assert state.loop is None - state.loop = loop - state.is_loop_owned = is_loop_owned + runner = state._get_runner() - def _close_loop(self, check_nesting=True): - nesting = self._update_nesting(0) - if (not check_nesting) or (not nesting): - self._delete_state() + try: + return runner.run(coro) + finally: + self._exit(state) def __del__(self): - self._close_loop(check_nesting=False) + self._get_state()._cleanup_runner(force=True) def asynccontextmanager(f): diff --git a/tests/test_asyn.py b/tests/test_asyn.py index 2ae3757d4..8fe9a2212 100644 --- a/tests/test_asyn.py +++ b/tests/test_asyn.py @@ -17,6 +17,7 @@ import sys import asyncio from functools import partial +import contextvars from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager @@ -70,6 +71,30 @@ def _do_test_run(top_run): top_run(test_run_basic()) + async def test_run_basic_contextvars_get(): + var = contextvars.ContextVar('var') + var.set(42) + + async def f(): + return var.get() + + assert var.get() == 42 + assert run(f()) == 42 + + top_run(test_run_basic_contextvars_get()) + + async def test_run_basic_contextvars_set(): + var = contextvars.ContextVar('var') + + async def f(): + var.set(43) + + var.set(42) + assert var.get() == 42 + run(f()) + assert var.get() == 43 + + top_run(test_run_basic_contextvars_set()) async def test_run_basic_raise(): -- GitLab