diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 19d10031..a7f74412 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -98,44 +98,7 @@ from ..abc._eventloop import StrOrBytesPath from ..lowlevel import RunVar from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream - -# registry of asyncio loop : selector thread -_selectors: WeakKeyDictionary = WeakKeyDictionary() - - -def _get_selector_windows( - asyncio_loop: AbstractEventLoop, -) -> AbstractEventLoop: - """Get selector-compatible loop. - - Sets ``add_reader`` family of methods on the asyncio loop. - - Workaround Windows proactor removal of *reader methods. - """ - - if asyncio_loop in _selectors: - return _selectors[asyncio_loop] - - from ._selector_thread import AddThreadSelectorEventLoop - - selector_loop = _selectors[asyncio_loop] = AddThreadSelectorEventLoop( # type: ignore[abstract] - asyncio_loop - ) - - # patch loop.close to also close the selector thread - loop_close = asyncio_loop.close - - def _close_selector_and_loop() -> None: - # restore original before calling selector.close, - # which in turn calls eventloop.close! - asyncio_loop.close = loop_close # type: ignore[method-assign] - _selectors.pop(asyncio_loop, None) - selector_loop.close() - - asyncio_loop.close = _close_selector_and_loop # type: ignore[method-assign] - - return selector_loop - +from ._selector_thread import _get_selector_windows if sys.version_info >= (3, 10): from typing import ParamSpec @@ -2726,14 +2689,19 @@ async def wait_socket_readable(cls, sock: socket.socket) -> None: and asyncio.get_event_loop_policy().__class__.__name__ == "WindowsProactorEventLoopPolicy" ): - loop = _get_selector_windows(loop) + selector = _get_selector_windows(loop) + add_reader = selector.add_reader + remove_reader = selector.remove_reader + else: + add_reader = loop.add_reader + remove_reader = loop.remove_reader event = read_events[sock] = asyncio.Event() - loop.add_reader(sock, event.set) + add_reader(sock, event.set) try: await event.wait() finally: if read_events.pop(sock, None) is not None: - loop.remove_reader(sock) + remove_reader(sock) readable = True else: readable = False diff --git a/src/anyio/_backends/_selector_thread.py b/src/anyio/_backends/_selector_thread.py index 0d814306..7bc2c7dd 100644 --- a/src/anyio/_backends/_selector_thread.py +++ b/src/anyio/_backends/_selector_thread.py @@ -19,6 +19,7 @@ Callable, Union, ) +from weakref import WeakKeyDictionary if typing.TYPE_CHECKING: from typing_extensions import Protocol @@ -30,6 +31,9 @@ def fileno(self) -> int: _FileDescriptorLike = Union[int, _HasFileno] +# registry of asyncio loop : selector thread +_selectors: WeakKeyDictionary = WeakKeyDictionary() + # Collection of selector thread event loops to shut down on exit. _selector_loops: set[SelectorThread] = set() @@ -281,61 +285,31 @@ def remove_writer(self, fd: _FileDescriptorLike) -> bool: return True -# AddThreadSelectorEventLoop: unmodified from tornado 6.4.0 -class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop): - """Wrap an event loop to add implementations of the ``add_reader`` method family. - - Instances of this class start a second thread to run a selector. - This thread is completely hidden from the user; all callbacks are - run on the wrapped event loop's thread. - - This class is used automatically by Tornado; applications should not need - to refer to it directly. +def _get_selector_windows( + asyncio_loop: asyncio.AbstractEventLoop, +) -> SelectorThread: + """Get selector-compatible loop. - It is safe to wrap any event loop with this class, although it only makes sense - for event loops that do not implement the ``add_reader`` family of methods - themselves (i.e. ``WindowsProactorEventLoop``) + Sets ``add_reader`` family of methods on the asyncio loop. - Closing the ``AddThreadSelectorEventLoop`` also closes the wrapped event loop. + Workaround Windows proactor removal of *reader methods. """ - # This class is a __getattribute__-based proxy. All attributes other than those - # in this set are proxied through to the underlying loop. - MY_ATTRIBUTES = { - "_real_loop", - "_selector", - "add_reader", - "add_writer", - "close", - "remove_reader", - "remove_writer", - } - - def __getattribute__(self, name: str) -> Any: - if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES: - return super().__getattribute__(name) - return getattr(self._real_loop, name) - - def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None: - self._real_loop = real_loop - self._selector = SelectorThread(real_loop) + if asyncio_loop in _selectors: + return _selectors[asyncio_loop] - def close(self) -> None: - self._selector.close() - self._real_loop.close() + selector_thread = _selectors[asyncio_loop] = SelectorThread(asyncio_loop) - def add_reader( # type: ignore[override] - self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any - ) -> None: - return self._selector.add_reader(fd, callback, *args) + # patch loop.close to also close the selector thread + loop_close = asyncio_loop.close - def add_writer( # type: ignore[override] - self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any - ) -> None: - return self._selector.add_writer(fd, callback, *args) + def _close_selector_and_loop() -> None: + # restore original before calling selector.close, + # which in turn calls eventloop.close! + asyncio_loop.close = loop_close # type: ignore[method-assign] + _selectors.pop(asyncio_loop, None) + selector_thread.close() - def remove_reader(self, fd: _FileDescriptorLike) -> bool: - return self._selector.remove_reader(fd) + asyncio_loop.close = _close_selector_and_loop # type: ignore[method-assign] - def remove_writer(self, fd: _FileDescriptorLike) -> bool: - return self._selector.remove_writer(fd) + return selector_thread