From a7335d94e9c7406c01c84f5daed7e62c3c1454ca Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sat, 9 Nov 2024 15:28:46 +0100 Subject: [PATCH 01/33] Use ThreadSelectorEventLoop on Windows with ProactorEventLoop --- docs/versionhistory.rst | 2 + src/anyio/_backends/_asyncio.py | 44 +++ src/anyio/_backends/_selector_thread.py | 341 ++++++++++++++++++++++++ src/anyio/_core/_sockets.py | 3 - tests/test_sockets.py | 23 ++ 5 files changed, 410 insertions(+), 3 deletions(-) create mode 100644 src/anyio/_backends/_selector_thread.py diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 761be04f..718299de 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -7,6 +7,8 @@ This library adheres to `Semantic Versioning 2.0 `_. - Fixed a misleading ``ValueError`` in the context of DNS failures (`#815 `_; PR by @graingert) +- Ported ``ThreadSelectorEventLoop`` from Tornado to allow + ``anyio.wait_socket_readable(sock)`` to work on Windows with a ``ProactorEventLoop``. **4.6.2** diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 0a69e7ac..19d10031 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -99,6 +99,44 @@ from ..lowlevel import RunVar from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream +# registry of asyncio loop : selector thread +_selectors: WeakKeyDictionary = WeakKeyDictionary() + + +def _get_selector_windows( + asyncio_loop: AbstractEventLoop, +) -> AbstractEventLoop: + """Get selector-compatible loop. + + Sets ``add_reader`` family of methods on the asyncio loop. + + Workaround Windows proactor removal of *reader methods. + """ + + if asyncio_loop in _selectors: + return _selectors[asyncio_loop] + + from ._selector_thread import AddThreadSelectorEventLoop + + selector_loop = _selectors[asyncio_loop] = AddThreadSelectorEventLoop( # type: ignore[abstract] + asyncio_loop + ) + + # patch loop.close to also close the selector thread + loop_close = asyncio_loop.close + + def _close_selector_and_loop() -> None: + # restore original before calling selector.close, + # which in turn calls eventloop.close! + asyncio_loop.close = loop_close # type: ignore[method-assign] + _selectors.pop(asyncio_loop, None) + selector_loop.close() + + asyncio_loop.close = _close_selector_and_loop # type: ignore[method-assign] + + return selector_loop + + if sys.version_info >= (3, 10): from typing import ParamSpec else: @@ -2683,6 +2721,12 @@ async def wait_socket_readable(cls, sock: socket.socket) -> None: raise BusyResourceError("reading from") from None loop = get_running_loop() + if ( + sys.platform == "win32" + and asyncio.get_event_loop_policy().__class__.__name__ + == "WindowsProactorEventLoopPolicy" + ): + loop = _get_selector_windows(loop) event = read_events[sock] = asyncio.Event() loop.add_reader(sock, event.set) try: diff --git a/src/anyio/_backends/_selector_thread.py b/src/anyio/_backends/_selector_thread.py new file mode 100644 index 00000000..0d814306 --- /dev/null +++ b/src/anyio/_backends/_selector_thread.py @@ -0,0 +1,341 @@ +"""Ensure asyncio selector methods (add_reader, etc.) are available. +Running select in a thread and defining these methods on the running event loop. +Originally in tornado.platform.asyncio. +Redistributed under license Apache-2.0 +""" + +from __future__ import annotations + +import asyncio +import atexit +import errno +import functools +import select +import socket +import threading +import typing +from typing import ( + Any, + Callable, + Union, +) + +if typing.TYPE_CHECKING: + from typing_extensions import Protocol + + class _HasFileno(Protocol): + def fileno(self) -> int: + pass + + _FileDescriptorLike = Union[int, _HasFileno] + + +# Collection of selector thread event loops to shut down on exit. +_selector_loops: set[SelectorThread] = set() + + +def _atexit_callback() -> None: + for loop in _selector_loops: + with loop._select_cond: + loop._closing_selector = True + loop._select_cond.notify() + try: + loop._waker_w.send(b"a") + except BlockingIOError: + pass + # If we don't join our (daemon) thread here, we may get a deadlock + # during interpreter shutdown. I don't really understand why. This + # deadlock happens every time in CI (both travis and appveyor) but + # I've never been able to reproduce locally. + assert loop._thread is not None + loop._thread.join() + _selector_loops.clear() + + +atexit.register(_atexit_callback) + + +# SelectorThread from tornado 6.4.0 + + +class SelectorThread: + """Define ``add_reader`` methods to be called in a background select thread. + + Instances of this class start a second thread to run a selector. + This thread is completely hidden from the user; + all callbacks are run on the wrapped event loop's thread. + + Typically used via ``AddThreadSelectorEventLoop``, + but can be attached to a running asyncio loop. + """ + + _closed = False + + def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None: + self._real_loop = real_loop + + self._select_cond = threading.Condition() + self._select_args: ( + tuple[list[_FileDescriptorLike], list[_FileDescriptorLike]] | None + ) = None + self._closing_selector = False + self._thread: threading.Thread | None = None + self._thread_manager_handle = self._thread_manager() + + async def thread_manager_anext() -> None: + # the anext builtin wasn't added until 3.10. We just need to iterate + # this generator one step. + await self._thread_manager_handle.__anext__() + + # When the loop starts, start the thread. Not too soon because we can't + # clean up if we get to this point but the event loop is closed without + # starting. + self._real_loop.call_soon( + lambda: self._real_loop.create_task(thread_manager_anext()) + ) + + self._readers: dict[_FileDescriptorLike, Callable] = {} + self._writers: dict[_FileDescriptorLike, Callable] = {} + + # Writing to _waker_w will wake up the selector thread, which + # watches for _waker_r to be readable. + self._waker_r, self._waker_w = socket.socketpair() + self._waker_r.setblocking(False) + self._waker_w.setblocking(False) + _selector_loops.add(self) + self.add_reader(self._waker_r, self._consume_waker) + + def close(self) -> None: + if self._closed: + return + with self._select_cond: + self._closing_selector = True + self._select_cond.notify() + self._wake_selector() + if self._thread is not None: + self._thread.join() + _selector_loops.discard(self) + self.remove_reader(self._waker_r) + self._waker_r.close() + self._waker_w.close() + self._closed = True + + async def _thread_manager(self) -> typing.AsyncGenerator[None, None]: + # Create a thread to run the select system call. We manage this thread + # manually so we can trigger a clean shutdown from an atexit hook. Note + # that due to the order of operations at shutdown, only daemon threads + # can be shut down in this way (non-daemon threads would require the + # introduction of a new hook: https://bugs.python.org/issue41962) + self._thread = threading.Thread( + name="Tornado selector", + daemon=True, + target=self._run_select, + ) + self._thread.start() + self._start_select() + try: + # The presense of this yield statement means that this coroutine + # is actually an asynchronous generator, which has a special + # shutdown protocol. We wait at this yield point until the + # event loop's shutdown_asyncgens method is called, at which point + # we will get a GeneratorExit exception and can shut down the + # selector thread. + yield + except GeneratorExit: + self.close() + raise + + def _wake_selector(self) -> None: + if self._closed: + return + try: + self._waker_w.send(b"a") + except BlockingIOError: + pass + + def _consume_waker(self) -> None: + try: + self._waker_r.recv(1024) + except BlockingIOError: + pass + + def _start_select(self) -> None: + # Capture reader and writer sets here in the event loop + # thread to avoid any problems with concurrent + # modification while the select loop uses them. + with self._select_cond: + assert self._select_args is None + self._select_args = (list(self._readers.keys()), list(self._writers.keys())) + self._select_cond.notify() + + def _run_select(self) -> None: + while True: + with self._select_cond: + while self._select_args is None and not self._closing_selector: + self._select_cond.wait() + if self._closing_selector: + return + assert self._select_args is not None + to_read, to_write = self._select_args + self._select_args = None + + # We use the simpler interface of the select module instead of + # the more stateful interface in the selectors module because + # this class is only intended for use on windows, where + # select.select is the only option. The selector interface + # does not have well-documented thread-safety semantics that + # we can rely on so ensuring proper synchronization would be + # tricky. + try: + # On windows, selecting on a socket for write will not + # return the socket when there is an error (but selecting + # for reads works). Also select for errors when selecting + # for writes, and merge the results. + # + # This pattern is also used in + # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317 + rs, ws, xs = select.select(to_read, to_write, to_write) + ws = ws + xs + except OSError as e: + # After remove_reader or remove_writer is called, the file + # descriptor may subsequently be closed on the event loop + # thread. It's possible that this select thread hasn't + # gotten into the select system call by the time that + # happens in which case (at least on macOS), select may + # raise a "bad file descriptor" error. If we get that + # error, check and see if we're also being woken up by + # polling the waker alone. If we are, just return to the + # event loop and we'll get the updated set of file + # descriptors on the next iteration. Otherwise, raise the + # original error. + if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF): + rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0) + if rs: + ws = [] + else: + raise + else: + raise + + try: + self._real_loop.call_soon_threadsafe(self._handle_select, rs, ws) + except RuntimeError: + # "Event loop is closed". Swallow the exception for + # consistency with PollIOLoop (and logical consistency + # with the fact that we can't guarantee that an + # add_callback that completes without error will + # eventually execute). + pass + except AttributeError: + # ProactorEventLoop may raise this instead of RuntimeError + # if call_soon_threadsafe races with a call to close(). + # Swallow it too for consistency. + pass + + def _handle_select( + self, rs: list[_FileDescriptorLike], ws: list[_FileDescriptorLike] + ) -> None: + for r in rs: + self._handle_event(r, self._readers) + for w in ws: + self._handle_event(w, self._writers) + self._start_select() + + def _handle_event( + self, + fd: _FileDescriptorLike, + cb_map: dict[_FileDescriptorLike, Callable], + ) -> None: + try: + callback = cb_map[fd] + except KeyError: + return + callback() + + def add_reader( + self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any + ) -> None: + self._readers[fd] = functools.partial(callback, *args) + self._wake_selector() + + def add_writer( + self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any + ) -> None: + self._writers[fd] = functools.partial(callback, *args) + self._wake_selector() + + def remove_reader(self, fd: _FileDescriptorLike) -> bool: + try: + del self._readers[fd] + except KeyError: + return False + self._wake_selector() + return True + + def remove_writer(self, fd: _FileDescriptorLike) -> bool: + try: + del self._writers[fd] + except KeyError: + return False + self._wake_selector() + return True + + +# AddThreadSelectorEventLoop: unmodified from tornado 6.4.0 +class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop): + """Wrap an event loop to add implementations of the ``add_reader`` method family. + + Instances of this class start a second thread to run a selector. + This thread is completely hidden from the user; all callbacks are + run on the wrapped event loop's thread. + + This class is used automatically by Tornado; applications should not need + to refer to it directly. + + It is safe to wrap any event loop with this class, although it only makes sense + for event loops that do not implement the ``add_reader`` family of methods + themselves (i.e. ``WindowsProactorEventLoop``) + + Closing the ``AddThreadSelectorEventLoop`` also closes the wrapped event loop. + """ + + # This class is a __getattribute__-based proxy. All attributes other than those + # in this set are proxied through to the underlying loop. + MY_ATTRIBUTES = { + "_real_loop", + "_selector", + "add_reader", + "add_writer", + "close", + "remove_reader", + "remove_writer", + } + + def __getattribute__(self, name: str) -> Any: + if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES: + return super().__getattribute__(name) + return getattr(self._real_loop, name) + + def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None: + self._real_loop = real_loop + self._selector = SelectorThread(real_loop) + + def close(self) -> None: + self._selector.close() + self._real_loop.close() + + def add_reader( # type: ignore[override] + self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any + ) -> None: + return self._selector.add_reader(fd, callback, *args) + + def add_writer( # type: ignore[override] + self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any + ) -> None: + return self._selector.add_writer(fd, callback, *args) + + def remove_reader(self, fd: _FileDescriptorLike) -> bool: + return self._selector.remove_reader(fd) + + def remove_writer(self, fd: _FileDescriptorLike) -> bool: + return self._selector.remove_writer(fd) diff --git a/src/anyio/_core/_sockets.py b/src/anyio/_core/_sockets.py index fbfe2585..fa32ff91 100644 --- a/src/anyio/_core/_sockets.py +++ b/src/anyio/_core/_sockets.py @@ -595,9 +595,6 @@ def wait_socket_readable(sock: socket.socket) -> Awaitable[None]: """ Wait until the given socket has data to be read. - This does **NOT** work on Windows when using the asyncio backend with a proactor - event loop (default on py3.8+). - .. warning:: Only use this on raw sockets that have not been wrapped by any higher level constructs like socket streams! diff --git a/tests/test_sockets.py b/tests/test_sockets.py index 0920f6ef..1be01e7b 100644 --- a/tests/test_sockets.py +++ b/tests/test_sockets.py @@ -46,6 +46,7 @@ getnameinfo, move_on_after, wait_all_tasks_blocked, + wait_socket_readable, ) from anyio.abc import ( IPSockAddrType, @@ -1849,3 +1850,25 @@ async def test_connect_tcp_getaddrinfo_context() -> None: pass assert exc_info.value.__context__ is None + + +async def test_wait_socket_readable() -> None: + def client(port: int) -> None: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.connect(("127.0.0.1", port)) + sock.sendall(b"Hello, world") + + with move_on_after(0.1): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + port = sock.getsockname()[1] + sock.listen() + thread = Thread(target=client, args=(port,), daemon=True) + thread.start() + conn, addr = sock.accept() + with conn: + await wait_socket_readable(conn) + socket_readable = True + + assert socket_readable + thread.join() From d67a150add6330836927839e1fbfccdfca62ee6d Mon Sep 17 00:00:00 2001 From: David Brochart Date: Mon, 11 Nov 2024 14:53:20 +0100 Subject: [PATCH 02/33] Removed AddThreadSelectorEventLoop --- src/anyio/_backends/_asyncio.py | 50 ++++------------- src/anyio/_backends/_selector_thread.py | 72 ++++++++----------------- 2 files changed, 32 insertions(+), 90 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 19d10031..a7f74412 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -98,44 +98,7 @@ from ..abc._eventloop import StrOrBytesPath from ..lowlevel import RunVar from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream - -# registry of asyncio loop : selector thread -_selectors: WeakKeyDictionary = WeakKeyDictionary() - - -def _get_selector_windows( - asyncio_loop: AbstractEventLoop, -) -> AbstractEventLoop: - """Get selector-compatible loop. - - Sets ``add_reader`` family of methods on the asyncio loop. - - Workaround Windows proactor removal of *reader methods. - """ - - if asyncio_loop in _selectors: - return _selectors[asyncio_loop] - - from ._selector_thread import AddThreadSelectorEventLoop - - selector_loop = _selectors[asyncio_loop] = AddThreadSelectorEventLoop( # type: ignore[abstract] - asyncio_loop - ) - - # patch loop.close to also close the selector thread - loop_close = asyncio_loop.close - - def _close_selector_and_loop() -> None: - # restore original before calling selector.close, - # which in turn calls eventloop.close! - asyncio_loop.close = loop_close # type: ignore[method-assign] - _selectors.pop(asyncio_loop, None) - selector_loop.close() - - asyncio_loop.close = _close_selector_and_loop # type: ignore[method-assign] - - return selector_loop - +from ._selector_thread import _get_selector_windows if sys.version_info >= (3, 10): from typing import ParamSpec @@ -2726,14 +2689,19 @@ async def wait_socket_readable(cls, sock: socket.socket) -> None: and asyncio.get_event_loop_policy().__class__.__name__ == "WindowsProactorEventLoopPolicy" ): - loop = _get_selector_windows(loop) + selector = _get_selector_windows(loop) + add_reader = selector.add_reader + remove_reader = selector.remove_reader + else: + add_reader = loop.add_reader + remove_reader = loop.remove_reader event = read_events[sock] = asyncio.Event() - loop.add_reader(sock, event.set) + add_reader(sock, event.set) try: await event.wait() finally: if read_events.pop(sock, None) is not None: - loop.remove_reader(sock) + remove_reader(sock) readable = True else: readable = False diff --git a/src/anyio/_backends/_selector_thread.py b/src/anyio/_backends/_selector_thread.py index 0d814306..7bc2c7dd 100644 --- a/src/anyio/_backends/_selector_thread.py +++ b/src/anyio/_backends/_selector_thread.py @@ -19,6 +19,7 @@ Callable, Union, ) +from weakref import WeakKeyDictionary if typing.TYPE_CHECKING: from typing_extensions import Protocol @@ -30,6 +31,9 @@ def fileno(self) -> int: _FileDescriptorLike = Union[int, _HasFileno] +# registry of asyncio loop : selector thread +_selectors: WeakKeyDictionary = WeakKeyDictionary() + # Collection of selector thread event loops to shut down on exit. _selector_loops: set[SelectorThread] = set() @@ -281,61 +285,31 @@ def remove_writer(self, fd: _FileDescriptorLike) -> bool: return True -# AddThreadSelectorEventLoop: unmodified from tornado 6.4.0 -class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop): - """Wrap an event loop to add implementations of the ``add_reader`` method family. - - Instances of this class start a second thread to run a selector. - This thread is completely hidden from the user; all callbacks are - run on the wrapped event loop's thread. - - This class is used automatically by Tornado; applications should not need - to refer to it directly. +def _get_selector_windows( + asyncio_loop: asyncio.AbstractEventLoop, +) -> SelectorThread: + """Get selector-compatible loop. - It is safe to wrap any event loop with this class, although it only makes sense - for event loops that do not implement the ``add_reader`` family of methods - themselves (i.e. ``WindowsProactorEventLoop``) + Sets ``add_reader`` family of methods on the asyncio loop. - Closing the ``AddThreadSelectorEventLoop`` also closes the wrapped event loop. + Workaround Windows proactor removal of *reader methods. """ - # This class is a __getattribute__-based proxy. All attributes other than those - # in this set are proxied through to the underlying loop. - MY_ATTRIBUTES = { - "_real_loop", - "_selector", - "add_reader", - "add_writer", - "close", - "remove_reader", - "remove_writer", - } - - def __getattribute__(self, name: str) -> Any: - if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES: - return super().__getattribute__(name) - return getattr(self._real_loop, name) - - def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None: - self._real_loop = real_loop - self._selector = SelectorThread(real_loop) + if asyncio_loop in _selectors: + return _selectors[asyncio_loop] - def close(self) -> None: - self._selector.close() - self._real_loop.close() + selector_thread = _selectors[asyncio_loop] = SelectorThread(asyncio_loop) - def add_reader( # type: ignore[override] - self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any - ) -> None: - return self._selector.add_reader(fd, callback, *args) + # patch loop.close to also close the selector thread + loop_close = asyncio_loop.close - def add_writer( # type: ignore[override] - self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any - ) -> None: - return self._selector.add_writer(fd, callback, *args) + def _close_selector_and_loop() -> None: + # restore original before calling selector.close, + # which in turn calls eventloop.close! + asyncio_loop.close = loop_close # type: ignore[method-assign] + _selectors.pop(asyncio_loop, None) + selector_thread.close() - def remove_reader(self, fd: _FileDescriptorLike) -> bool: - return self._selector.remove_reader(fd) + asyncio_loop.close = _close_selector_and_loop # type: ignore[method-assign] - def remove_writer(self, fd: _FileDescriptorLike) -> bool: - return self._selector.remove_writer(fd) + return selector_thread From c1dd75905896d8042178b4774ec47606249d2710 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Mon, 11 Nov 2024 18:03:00 +0100 Subject: [PATCH 03/33] Skip test on Windows/Trio --- tests/test_sockets.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_sockets.py b/tests/test_sockets.py index 1be01e7b..971b1f2a 100644 --- a/tests/test_sockets.py +++ b/tests/test_sockets.py @@ -1852,7 +1852,10 @@ async def test_connect_tcp_getaddrinfo_context() -> None: assert exc_info.value.__context__ is None -async def test_wait_socket_readable() -> None: +async def test_wait_socket_readable(anyio_backend_name: str) -> None: + if anyio_backend_name == "trio" and platform.system() == "Windows": + pytest.skip("Internal error in Trio") + def client(port: int) -> None: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.connect(("127.0.0.1", port)) From 76d23fa041501d863d0e8d413e52bbe4110dce91 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 12 Nov 2024 09:37:51 +0100 Subject: [PATCH 04/33] Add back loop close and test on Windows/Trio --- src/anyio/_backends/_selector_thread.py | 1 + tests/test_sockets.py | 3 --- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/anyio/_backends/_selector_thread.py b/src/anyio/_backends/_selector_thread.py index 7bc2c7dd..10635fe1 100644 --- a/src/anyio/_backends/_selector_thread.py +++ b/src/anyio/_backends/_selector_thread.py @@ -309,6 +309,7 @@ def _close_selector_and_loop() -> None: asyncio_loop.close = loop_close # type: ignore[method-assign] _selectors.pop(asyncio_loop, None) selector_thread.close() + asyncio_loop.close() asyncio_loop.close = _close_selector_and_loop # type: ignore[method-assign] diff --git a/tests/test_sockets.py b/tests/test_sockets.py index 971b1f2a..0e0f794e 100644 --- a/tests/test_sockets.py +++ b/tests/test_sockets.py @@ -1853,9 +1853,6 @@ async def test_connect_tcp_getaddrinfo_context() -> None: async def test_wait_socket_readable(anyio_backend_name: str) -> None: - if anyio_backend_name == "trio" and platform.system() == "Windows": - pytest.skip("Internal error in Trio") - def client(port: int) -> None: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.connect(("127.0.0.1", port)) From 8b18582bfc526207b84468927287c480921fdd8f Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 12 Nov 2024 09:55:57 +0100 Subject: [PATCH 05/33] Review --- src/anyio/_backends/_asyncio.py | 34 +++++++++++++------- src/anyio/_backends/_selector_thread.py | 42 +++++-------------------- tests/test_sockets.py | 21 ++++++------- 3 files changed, 39 insertions(+), 58 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index a7f74412..0a6d5d6e 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -98,7 +98,6 @@ from ..abc._eventloop import StrOrBytesPath from ..lowlevel import RunVar from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream -from ._selector_thread import _get_selector_windows if sys.version_info >= (3, 10): from typing import ParamSpec @@ -2684,19 +2683,19 @@ async def wait_socket_readable(cls, sock: socket.socket) -> None: raise BusyResourceError("reading from") from None loop = get_running_loop() - if ( - sys.platform == "win32" - and asyncio.get_event_loop_policy().__class__.__name__ - == "WindowsProactorEventLoopPolicy" - ): + add_reader = loop.add_reader + event = read_events[sock] = asyncio.Event() + try: + add_reader(sock, event.set) + except NotImplementedError: + # Proactor on Windows does not yet implement add/remove reader + from ._selector_thread import _get_selector_windows + selector = _get_selector_windows(loop) - add_reader = selector.add_reader + selector.add_reader(sock, event.set) remove_reader = selector.remove_reader else: - add_reader = loop.add_reader remove_reader = loop.remove_reader - event = read_events[sock] = asyncio.Event() - add_reader(sock, event.set) try: await event.wait() finally: @@ -2722,13 +2721,24 @@ async def wait_socket_writable(cls, sock: socket.socket) -> None: raise BusyResourceError("writing to") from None loop = get_running_loop() + add_writer = loop.add_writer event = write_events[sock] = asyncio.Event() - loop.add_writer(sock.fileno(), event.set) + try: + add_writer(sock.fileno(), event.set) + except NotImplementedError: + # Proactor on Windows does not yet implement add/remove writer + from ._selector_thread import _get_selector_windows + + selector = _get_selector_windows(loop) + selector.add_writer(sock, event.set) + remove_writer = selector.remove_writer + else: + remove_writer = loop.remove_writer try: await event.wait() finally: if write_events.pop(sock, None) is not None: - loop.remove_writer(sock) + remove_writer(sock) writable = True else: writable = False diff --git a/src/anyio/_backends/_selector_thread.py b/src/anyio/_backends/_selector_thread.py index 10635fe1..cfa39ca4 100644 --- a/src/anyio/_backends/_selector_thread.py +++ b/src/anyio/_backends/_selector_thread.py @@ -7,7 +7,6 @@ from __future__ import annotations import asyncio -import atexit import errno import functools import select @@ -21,6 +20,8 @@ ) from weakref import WeakKeyDictionary +from ._asyncio import find_root_task + if typing.TYPE_CHECKING: from typing_extensions import Protocol @@ -38,7 +39,7 @@ def fileno(self) -> int: _selector_loops: set[SelectorThread] = set() -def _atexit_callback() -> None: +def _at_loop_close_callback(future: asyncio.Future) -> None: for loop in _selector_loops: with loop._select_cond: loop._closing_selector = True @@ -56,12 +57,7 @@ def _atexit_callback() -> None: _selector_loops.clear() -atexit.register(_atexit_callback) - - # SelectorThread from tornado 6.4.0 - - class SelectorThread: """Define ``add_reader`` methods to be called in a background select thread. @@ -84,19 +80,6 @@ def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None: ) = None self._closing_selector = False self._thread: threading.Thread | None = None - self._thread_manager_handle = self._thread_manager() - - async def thread_manager_anext() -> None: - # the anext builtin wasn't added until 3.10. We just need to iterate - # this generator one step. - await self._thread_manager_handle.__anext__() - - # When the loop starts, start the thread. Not too soon because we can't - # clean up if we get to this point but the event loop is closed without - # starting. - self._real_loop.call_soon( - lambda: self._real_loop.create_task(thread_manager_anext()) - ) self._readers: dict[_FileDescriptorLike, Callable] = {} self._writers: dict[_FileDescriptorLike, Callable] = {} @@ -108,6 +91,7 @@ async def thread_manager_anext() -> None: self._waker_w.setblocking(False) _selector_loops.add(self) self.add_reader(self._waker_r, self._consume_waker) + self._thread_manager() def close(self) -> None: if self._closed: @@ -124,30 +108,19 @@ def close(self) -> None: self._waker_w.close() self._closed = True - async def _thread_manager(self) -> typing.AsyncGenerator[None, None]: + def _thread_manager(self) -> None: # Create a thread to run the select system call. We manage this thread - # manually so we can trigger a clean shutdown from an atexit hook. Note + # manually so we can trigger a clean shutdown at loop teardown. Note # that due to the order of operations at shutdown, only daemon threads # can be shut down in this way (non-daemon threads would require the # introduction of a new hook: https://bugs.python.org/issue41962) self._thread = threading.Thread( - name="Tornado selector", + name="AnyIO selector", daemon=True, target=self._run_select, ) self._thread.start() self._start_select() - try: - # The presense of this yield statement means that this coroutine - # is actually an asynchronous generator, which has a special - # shutdown protocol. We wait at this yield point until the - # event loop's shutdown_asyncgens method is called, at which point - # we will get a GeneratorExit exception and can shut down the - # selector thread. - yield - except GeneratorExit: - self.close() - raise def _wake_selector(self) -> None: if self._closed: @@ -298,6 +271,7 @@ def _get_selector_windows( if asyncio_loop in _selectors: return _selectors[asyncio_loop] + find_root_task().add_done_callback(_at_loop_close_callback) selector_thread = _selectors[asyncio_loop] = SelectorThread(asyncio_loop) # patch loop.close to also close the selector thread diff --git a/tests/test_sockets.py b/tests/test_sockets.py index 0e0f794e..cbe6200a 100644 --- a/tests/test_sockets.py +++ b/tests/test_sockets.py @@ -1858,17 +1858,14 @@ def client(port: int) -> None: sock.connect(("127.0.0.1", port)) sock.sendall(b"Hello, world") - with move_on_after(0.1): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - sock.bind(("127.0.0.1", 0)) - port = sock.getsockname()[1] - sock.listen() - thread = Thread(target=client, args=(port,), daemon=True) - thread.start() - conn, addr = sock.accept() + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + port = sock.getsockname()[1] + sock.listen() + thread = Thread(target=client, args=(port,)) + thread.start() + thread.join() + conn, addr = sock.accept() + with fail_after(5): with conn: await wait_socket_readable(conn) - socket_readable = True - - assert socket_readable - thread.join() From 1051fcd91e5de728dfebd2007f7a6ecda06aaa7d Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sun, 17 Nov 2024 10:02:57 +0100 Subject: [PATCH 06/33] Fix --- src/anyio/_backends/_asyncio.py | 20 ++++++++++---------- tests/test_sockets.py | 7 ------- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 2a0aa33c..e2715ce7 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -2690,15 +2690,15 @@ async def wait_readable(cls, obj: HasFileno | int) -> None: raise BusyResourceError("reading from") from None loop = get_running_loop() - event = read_events[sock] = asyncio.Event() + event = read_events[obj] = asyncio.Event() try: - loop.add_reader(sock, event.set) + loop.add_reader(obj, event.set) except NotImplementedError: # Proactor on Windows does not yet implement add/remove reader from ._selector_thread import _get_selector_windows selector = _get_selector_windows(loop) - selector.add_reader(sock, event.set) + selector.add_reader(obj, event.set) remove_reader = selector.remove_reader else: remove_reader = loop.remove_reader @@ -2706,8 +2706,8 @@ async def wait_readable(cls, obj: HasFileno | int) -> None: try: await event.wait() finally: - if read_events.pop(sock, None) is not None: - remove_reader(sock) + if read_events.pop(obj, None) is not None: + remove_reader(obj) readable = True else: readable = False @@ -2731,15 +2731,15 @@ async def wait_writable(cls, obj: HasFileno | int) -> None: raise BusyResourceError("writing to") from None loop = get_running_loop() - event = write_events[sock] = asyncio.Event() + event = write_events[obj] = asyncio.Event() try: - loop.add_writer(sock.fileno(), event.set) + loop.add_writer(obj, event.set) except NotImplementedError: # Proactor on Windows does not yet implement add/remove writer from ._selector_thread import _get_selector_windows selector = _get_selector_windows(loop) - selector.add_writer(sock, event.set) + selector.add_writer(obj, event.set) remove_writer = selector.remove_writer else: remove_writer = loop.remove_writer @@ -2747,8 +2747,8 @@ async def wait_writable(cls, obj: HasFileno | int) -> None: try: await event.wait() finally: - if write_events.pop(sock, None) is not None: - remove_writer(sock) + if write_events.pop(obj, None) is not None: + remove_writer(obj) writable = True else: writable = False diff --git a/tests/test_sockets.py b/tests/test_sockets.py index a1189bb8..8fbd886a 100644 --- a/tests/test_sockets.py +++ b/tests/test_sockets.py @@ -1861,13 +1861,6 @@ async def test_connect_tcp_getaddrinfo_context() -> None: async def test_wait_socket( anyio_backend_name: str, event: str, socket_type: str ) -> None: - if anyio_backend_name == "asyncio" and platform.system() == "Windows": - import asyncio - - policy = asyncio.get_event_loop_policy() - if policy.__class__.__name__ == "WindowsProactorEventLoopPolicy": - pytest.skip("Does not work on asyncio/Windows/ProactorEventLoop") - wait = wait_readable if event == "readable" else wait_writable with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_sock: From 01ccdf27adcced6c57b520ff906c211f8bd5fade Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sun, 17 Nov 2024 10:05:32 +0100 Subject: [PATCH 07/33] Removed unneeded anyio_backend_name --- tests/test_sockets.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/test_sockets.py b/tests/test_sockets.py index 8fbd886a..b6108850 100644 --- a/tests/test_sockets.py +++ b/tests/test_sockets.py @@ -1858,9 +1858,7 @@ async def test_connect_tcp_getaddrinfo_context() -> None: @pytest.mark.parametrize("socket_type", ["socket", "fd"]) @pytest.mark.parametrize("event", ["readable", "writable"]) -async def test_wait_socket( - anyio_backend_name: str, event: str, socket_type: str -) -> None: +async def test_wait_socket(event: str, socket_type: str) -> None: wait = wait_readable if event == "readable" else wait_writable with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_sock: From c3816172b5752b73a470d234fea0f78314f88189 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sun, 17 Nov 2024 10:08:29 +0100 Subject: [PATCH 08/33] Added closing parenthesis --- docs/versionhistory.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index f78cbc60..ddac0153 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -8,7 +8,7 @@ This library adheres to `Semantic Versioning 2.0 `_. - Added the ``wait_readable()`` and ``wait_writable()`` functions which will accept an object with a ``.fileno()`` method or an integer handle, and deprecated their now obsolete versions (``wait_socket_readable()`` and - ``wait_socket_writable()`` (PR by @davidbrochart) + ``wait_socket_writable()``) (PR by @davidbrochart) - Fixed a misleading ``ValueError`` in the context of DNS failures (`#815 `_; PR by @graingert) - Ported ``ThreadSelectorEventLoop`` from Tornado to allow From e21f3233ac59a69bfb0b8fcdeec1e114d2f68508 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sun, 17 Nov 2024 10:11:26 +0100 Subject: [PATCH 09/33] Updated changelog --- docs/versionhistory.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index ddac0153..abcc7977 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -12,7 +12,8 @@ This library adheres to `Semantic Versioning 2.0 `_. - Fixed a misleading ``ValueError`` in the context of DNS failures (`#815 `_; PR by @graingert) - Ported ``ThreadSelectorEventLoop`` from Tornado to allow - ``anyio.wait_socket_readable(sock)`` to work on Windows with a ``ProactorEventLoop``. + ``anyio.wait_readable()`` and ``anyio.wait_writable()`` to work on Windows with a + ``ProactorEventLoop``. **4.6.2** From a379ccf5f491cb35a5c84b50fccfb27d88f8eefa Mon Sep 17 00:00:00 2001 From: David Brochart Date: Mon, 18 Nov 2024 10:55:46 +0100 Subject: [PATCH 10/33] Use HasFileno from typeshed --- src/anyio/_backends/_selector_thread.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/anyio/_backends/_selector_thread.py b/src/anyio/_backends/_selector_thread.py index cfa39ca4..1aff007e 100644 --- a/src/anyio/_backends/_selector_thread.py +++ b/src/anyio/_backends/_selector_thread.py @@ -14,6 +14,7 @@ import threading import typing from typing import ( + TYPE_CHECKING, Any, Callable, Union, @@ -22,14 +23,10 @@ from ._asyncio import find_root_task -if typing.TYPE_CHECKING: - from typing_extensions import Protocol +if TYPE_CHECKING: + from _typeshed import HasFileno - class _HasFileno(Protocol): - def fileno(self) -> int: - pass - - _FileDescriptorLike = Union[int, _HasFileno] + _FileDescriptorLike = HasFileno | int # registry of asyncio loop : selector thread From 69800620adb78da8daccfee08a41949c67a4311a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 18 Nov 2024 10:01:36 +0000 Subject: [PATCH 11/33] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/anyio/_backends/_selector_thread.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/anyio/_backends/_selector_thread.py b/src/anyio/_backends/_selector_thread.py index 1aff007e..1d1bbafc 100644 --- a/src/anyio/_backends/_selector_thread.py +++ b/src/anyio/_backends/_selector_thread.py @@ -12,12 +12,10 @@ import select import socket import threading -import typing from typing import ( TYPE_CHECKING, Any, Callable, - Union, ) from weakref import WeakKeyDictionary From 3ea42bf6888797ebc4638a8421fe25cdcfc8c971 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Mon, 25 Nov 2024 11:40:53 +0100 Subject: [PATCH 12/33] Close selector thread when root task is done --- src/anyio/_backends/_selector_thread.py | 76 ++++++++----------------- 1 file changed, 24 insertions(+), 52 deletions(-) diff --git a/src/anyio/_backends/_selector_thread.py b/src/anyio/_backends/_selector_thread.py index 1d1bbafc..943c8755 100644 --- a/src/anyio/_backends/_selector_thread.py +++ b/src/anyio/_backends/_selector_thread.py @@ -6,12 +6,12 @@ from __future__ import annotations -import asyncio import errno -import functools -import select -import socket -import threading +from asyncio import AbstractEventLoop, Future +from functools import partial +from select import select +from socket import socketpair +from threading import Condition, Thread from typing import ( TYPE_CHECKING, Any, @@ -27,29 +27,16 @@ _FileDescriptorLike = HasFileno | int -# registry of asyncio loop : selector thread +# Registry of asyncio loop : selector thread _selectors: WeakKeyDictionary = WeakKeyDictionary() -# Collection of selector thread event loops to shut down on exit. -_selector_loops: set[SelectorThread] = set() +# Collection of selector threads to shut down on exit +_selector_threads: set[SelectorThread] = set() -def _at_loop_close_callback(future: asyncio.Future) -> None: - for loop in _selector_loops: - with loop._select_cond: - loop._closing_selector = True - loop._select_cond.notify() - try: - loop._waker_w.send(b"a") - except BlockingIOError: - pass - # If we don't join our (daemon) thread here, we may get a deadlock - # during interpreter shutdown. I don't really understand why. This - # deadlock happens every time in CI (both travis and appveyor) but - # I've never been able to reproduce locally. - assert loop._thread is not None - loop._thread.join() - _selector_loops.clear() +def _loop_close_callback(asyncio_loop: AbstractEventLoop, future: Future) -> None: + selector_thread = _selectors.pop(asyncio_loop) + selector_thread.close() # SelectorThread from tornado 6.4.0 @@ -66,25 +53,25 @@ class SelectorThread: _closed = False - def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None: + def __init__(self, real_loop: AbstractEventLoop) -> None: self._real_loop = real_loop - self._select_cond = threading.Condition() + self._select_cond = Condition() self._select_args: ( tuple[list[_FileDescriptorLike], list[_FileDescriptorLike]] | None ) = None self._closing_selector = False - self._thread: threading.Thread | None = None + self._thread: Thread | None = None self._readers: dict[_FileDescriptorLike, Callable] = {} self._writers: dict[_FileDescriptorLike, Callable] = {} # Writing to _waker_w will wake up the selector thread, which # watches for _waker_r to be readable. - self._waker_r, self._waker_w = socket.socketpair() + self._waker_r, self._waker_w = socketpair() self._waker_r.setblocking(False) self._waker_w.setblocking(False) - _selector_loops.add(self) + _selector_threads.add(self) self.add_reader(self._waker_r, self._consume_waker) self._thread_manager() @@ -97,7 +84,7 @@ def close(self) -> None: self._wake_selector() if self._thread is not None: self._thread.join() - _selector_loops.discard(self) + _selector_threads.discard(self) self.remove_reader(self._waker_r) self._waker_r.close() self._waker_w.close() @@ -109,7 +96,7 @@ def _thread_manager(self) -> None: # that due to the order of operations at shutdown, only daemon threads # can be shut down in this way (non-daemon threads would require the # introduction of a new hook: https://bugs.python.org/issue41962) - self._thread = threading.Thread( + self._thread = Thread( name="AnyIO selector", daemon=True, target=self._run_select, @@ -166,7 +153,7 @@ def _run_select(self) -> None: # # This pattern is also used in # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317 - rs, ws, xs = select.select(to_read, to_write, to_write) + rs, ws, xs = select(to_read, to_write, to_write) ws = ws + xs except OSError as e: # After remove_reader or remove_writer is called, the file @@ -181,7 +168,7 @@ def _run_select(self) -> None: # descriptors on the next iteration. Otherwise, raise the # original error. if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF): - rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0) + rs, _, _ = select([self._waker_r.fileno()], [], [], 0) if rs: ws = [] else: @@ -227,13 +214,13 @@ def _handle_event( def add_reader( self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any ) -> None: - self._readers[fd] = functools.partial(callback, *args) + self._readers[fd] = partial(callback, *args) self._wake_selector() def add_writer( self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any ) -> None: - self._writers[fd] = functools.partial(callback, *args) + self._writers[fd] = partial(callback, *args) self._wake_selector() def remove_reader(self, fd: _FileDescriptorLike) -> bool: @@ -254,7 +241,7 @@ def remove_writer(self, fd: _FileDescriptorLike) -> bool: def _get_selector_windows( - asyncio_loop: asyncio.AbstractEventLoop, + asyncio_loop: AbstractEventLoop, ) -> SelectorThread: """Get selector-compatible loop. @@ -262,24 +249,9 @@ def _get_selector_windows( Workaround Windows proactor removal of *reader methods. """ - if asyncio_loop in _selectors: return _selectors[asyncio_loop] - find_root_task().add_done_callback(_at_loop_close_callback) + find_root_task().add_done_callback(partial(_loop_close_callback, asyncio_loop)) selector_thread = _selectors[asyncio_loop] = SelectorThread(asyncio_loop) - - # patch loop.close to also close the selector thread - loop_close = asyncio_loop.close - - def _close_selector_and_loop() -> None: - # restore original before calling selector.close, - # which in turn calls eventloop.close! - asyncio_loop.close = loop_close # type: ignore[method-assign] - _selectors.pop(asyncio_loop, None) - selector_thread.close() - asyncio_loop.close() - - asyncio_loop.close = _close_selector_and_loop # type: ignore[method-assign] - return selector_thread From f68df735642e26024b782869afe6d8485e1c4fca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 1 Dec 2024 13:54:04 +0200 Subject: [PATCH 13/33] Alternate implementation of the selector thread --- src/anyio/_backends/_asyncio.py | 36 ++- src/anyio/_backends/_selector_thread.py | 315 -------------------- src/anyio/_core/_asyncio_selector_thread.py | 116 +++++++ 3 files changed, 137 insertions(+), 330 deletions(-) delete mode 100644 src/anyio/_backends/_selector_thread.py create mode 100644 src/anyio/_core/_asyncio_selector_thread.py diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index a7f74412..2934f885 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -98,7 +98,6 @@ from ..abc._eventloop import StrOrBytesPath from ..lowlevel import RunVar from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream -from ._selector_thread import _get_selector_windows if sys.version_info >= (3, 10): from typing import ParamSpec @@ -2684,19 +2683,17 @@ async def wait_socket_readable(cls, sock: socket.socket) -> None: raise BusyResourceError("reading from") from None loop = get_running_loop() - if ( - sys.platform == "win32" - and asyncio.get_event_loop_policy().__class__.__name__ - == "WindowsProactorEventLoopPolicy" - ): - selector = _get_selector_windows(loop) - add_reader = selector.add_reader - remove_reader = selector.remove_reader - else: - add_reader = loop.add_reader - remove_reader = loop.remove_reader event = read_events[sock] = asyncio.Event() - add_reader(sock, event.set) + try: + loop.add_reader(sock, event.set) + remove_reader = loop.remove_reader + except NotImplementedError: + from anyio._core._asyncio_selector_thread import get_selector + + selector = get_selector(loop) + selector.add_reader(sock, event.set) + remove_reader = selector.remove_reader + try: await event.wait() finally: @@ -2723,12 +2720,21 @@ async def wait_socket_writable(cls, sock: socket.socket) -> None: loop = get_running_loop() event = write_events[sock] = asyncio.Event() - loop.add_writer(sock.fileno(), event.set) + try: + loop.add_writer(sock, event.set) + remove_writer = loop.remove_writer + except NotImplementedError: + from anyio._core._asyncio_selector_thread import get_selector + + selector = get_selector(loop) + selector.add_writer(sock, event.set) + remove_writer = selector.remove_writer + try: await event.wait() finally: if write_events.pop(sock, None) is not None: - loop.remove_writer(sock) + remove_writer(sock) writable = True else: writable = False diff --git a/src/anyio/_backends/_selector_thread.py b/src/anyio/_backends/_selector_thread.py deleted file mode 100644 index 7bc2c7dd..00000000 --- a/src/anyio/_backends/_selector_thread.py +++ /dev/null @@ -1,315 +0,0 @@ -"""Ensure asyncio selector methods (add_reader, etc.) are available. -Running select in a thread and defining these methods on the running event loop. -Originally in tornado.platform.asyncio. -Redistributed under license Apache-2.0 -""" - -from __future__ import annotations - -import asyncio -import atexit -import errno -import functools -import select -import socket -import threading -import typing -from typing import ( - Any, - Callable, - Union, -) -from weakref import WeakKeyDictionary - -if typing.TYPE_CHECKING: - from typing_extensions import Protocol - - class _HasFileno(Protocol): - def fileno(self) -> int: - pass - - _FileDescriptorLike = Union[int, _HasFileno] - - -# registry of asyncio loop : selector thread -_selectors: WeakKeyDictionary = WeakKeyDictionary() - -# Collection of selector thread event loops to shut down on exit. -_selector_loops: set[SelectorThread] = set() - - -def _atexit_callback() -> None: - for loop in _selector_loops: - with loop._select_cond: - loop._closing_selector = True - loop._select_cond.notify() - try: - loop._waker_w.send(b"a") - except BlockingIOError: - pass - # If we don't join our (daemon) thread here, we may get a deadlock - # during interpreter shutdown. I don't really understand why. This - # deadlock happens every time in CI (both travis and appveyor) but - # I've never been able to reproduce locally. - assert loop._thread is not None - loop._thread.join() - _selector_loops.clear() - - -atexit.register(_atexit_callback) - - -# SelectorThread from tornado 6.4.0 - - -class SelectorThread: - """Define ``add_reader`` methods to be called in a background select thread. - - Instances of this class start a second thread to run a selector. - This thread is completely hidden from the user; - all callbacks are run on the wrapped event loop's thread. - - Typically used via ``AddThreadSelectorEventLoop``, - but can be attached to a running asyncio loop. - """ - - _closed = False - - def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None: - self._real_loop = real_loop - - self._select_cond = threading.Condition() - self._select_args: ( - tuple[list[_FileDescriptorLike], list[_FileDescriptorLike]] | None - ) = None - self._closing_selector = False - self._thread: threading.Thread | None = None - self._thread_manager_handle = self._thread_manager() - - async def thread_manager_anext() -> None: - # the anext builtin wasn't added until 3.10. We just need to iterate - # this generator one step. - await self._thread_manager_handle.__anext__() - - # When the loop starts, start the thread. Not too soon because we can't - # clean up if we get to this point but the event loop is closed without - # starting. - self._real_loop.call_soon( - lambda: self._real_loop.create_task(thread_manager_anext()) - ) - - self._readers: dict[_FileDescriptorLike, Callable] = {} - self._writers: dict[_FileDescriptorLike, Callable] = {} - - # Writing to _waker_w will wake up the selector thread, which - # watches for _waker_r to be readable. - self._waker_r, self._waker_w = socket.socketpair() - self._waker_r.setblocking(False) - self._waker_w.setblocking(False) - _selector_loops.add(self) - self.add_reader(self._waker_r, self._consume_waker) - - def close(self) -> None: - if self._closed: - return - with self._select_cond: - self._closing_selector = True - self._select_cond.notify() - self._wake_selector() - if self._thread is not None: - self._thread.join() - _selector_loops.discard(self) - self.remove_reader(self._waker_r) - self._waker_r.close() - self._waker_w.close() - self._closed = True - - async def _thread_manager(self) -> typing.AsyncGenerator[None, None]: - # Create a thread to run the select system call. We manage this thread - # manually so we can trigger a clean shutdown from an atexit hook. Note - # that due to the order of operations at shutdown, only daemon threads - # can be shut down in this way (non-daemon threads would require the - # introduction of a new hook: https://bugs.python.org/issue41962) - self._thread = threading.Thread( - name="Tornado selector", - daemon=True, - target=self._run_select, - ) - self._thread.start() - self._start_select() - try: - # The presense of this yield statement means that this coroutine - # is actually an asynchronous generator, which has a special - # shutdown protocol. We wait at this yield point until the - # event loop's shutdown_asyncgens method is called, at which point - # we will get a GeneratorExit exception and can shut down the - # selector thread. - yield - except GeneratorExit: - self.close() - raise - - def _wake_selector(self) -> None: - if self._closed: - return - try: - self._waker_w.send(b"a") - except BlockingIOError: - pass - - def _consume_waker(self) -> None: - try: - self._waker_r.recv(1024) - except BlockingIOError: - pass - - def _start_select(self) -> None: - # Capture reader and writer sets here in the event loop - # thread to avoid any problems with concurrent - # modification while the select loop uses them. - with self._select_cond: - assert self._select_args is None - self._select_args = (list(self._readers.keys()), list(self._writers.keys())) - self._select_cond.notify() - - def _run_select(self) -> None: - while True: - with self._select_cond: - while self._select_args is None and not self._closing_selector: - self._select_cond.wait() - if self._closing_selector: - return - assert self._select_args is not None - to_read, to_write = self._select_args - self._select_args = None - - # We use the simpler interface of the select module instead of - # the more stateful interface in the selectors module because - # this class is only intended for use on windows, where - # select.select is the only option. The selector interface - # does not have well-documented thread-safety semantics that - # we can rely on so ensuring proper synchronization would be - # tricky. - try: - # On windows, selecting on a socket for write will not - # return the socket when there is an error (but selecting - # for reads works). Also select for errors when selecting - # for writes, and merge the results. - # - # This pattern is also used in - # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317 - rs, ws, xs = select.select(to_read, to_write, to_write) - ws = ws + xs - except OSError as e: - # After remove_reader or remove_writer is called, the file - # descriptor may subsequently be closed on the event loop - # thread. It's possible that this select thread hasn't - # gotten into the select system call by the time that - # happens in which case (at least on macOS), select may - # raise a "bad file descriptor" error. If we get that - # error, check and see if we're also being woken up by - # polling the waker alone. If we are, just return to the - # event loop and we'll get the updated set of file - # descriptors on the next iteration. Otherwise, raise the - # original error. - if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF): - rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0) - if rs: - ws = [] - else: - raise - else: - raise - - try: - self._real_loop.call_soon_threadsafe(self._handle_select, rs, ws) - except RuntimeError: - # "Event loop is closed". Swallow the exception for - # consistency with PollIOLoop (and logical consistency - # with the fact that we can't guarantee that an - # add_callback that completes without error will - # eventually execute). - pass - except AttributeError: - # ProactorEventLoop may raise this instead of RuntimeError - # if call_soon_threadsafe races with a call to close(). - # Swallow it too for consistency. - pass - - def _handle_select( - self, rs: list[_FileDescriptorLike], ws: list[_FileDescriptorLike] - ) -> None: - for r in rs: - self._handle_event(r, self._readers) - for w in ws: - self._handle_event(w, self._writers) - self._start_select() - - def _handle_event( - self, - fd: _FileDescriptorLike, - cb_map: dict[_FileDescriptorLike, Callable], - ) -> None: - try: - callback = cb_map[fd] - except KeyError: - return - callback() - - def add_reader( - self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any - ) -> None: - self._readers[fd] = functools.partial(callback, *args) - self._wake_selector() - - def add_writer( - self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any - ) -> None: - self._writers[fd] = functools.partial(callback, *args) - self._wake_selector() - - def remove_reader(self, fd: _FileDescriptorLike) -> bool: - try: - del self._readers[fd] - except KeyError: - return False - self._wake_selector() - return True - - def remove_writer(self, fd: _FileDescriptorLike) -> bool: - try: - del self._writers[fd] - except KeyError: - return False - self._wake_selector() - return True - - -def _get_selector_windows( - asyncio_loop: asyncio.AbstractEventLoop, -) -> SelectorThread: - """Get selector-compatible loop. - - Sets ``add_reader`` family of methods on the asyncio loop. - - Workaround Windows proactor removal of *reader methods. - """ - - if asyncio_loop in _selectors: - return _selectors[asyncio_loop] - - selector_thread = _selectors[asyncio_loop] = SelectorThread(asyncio_loop) - - # patch loop.close to also close the selector thread - loop_close = asyncio_loop.close - - def _close_selector_and_loop() -> None: - # restore original before calling selector.close, - # which in turn calls eventloop.close! - asyncio_loop.close = loop_close # type: ignore[method-assign] - _selectors.pop(asyncio_loop, None) - selector_thread.close() - - asyncio_loop.close = _close_selector_and_loop # type: ignore[method-assign] - - return selector_thread diff --git a/src/anyio/_core/_asyncio_selector_thread.py b/src/anyio/_core/_asyncio_selector_thread.py new file mode 100644 index 00000000..bec20e5a --- /dev/null +++ b/src/anyio/_core/_asyncio_selector_thread.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +import asyncio +import socket +import threading +from collections.abc import Callable +from selectors import EVENT_READ, EVENT_WRITE, DefaultSelector +from typing import TYPE_CHECKING, Any +from weakref import WeakKeyDictionary + +if TYPE_CHECKING: + from _typeshed import FileDescriptorLike + +_selectors: WeakKeyDictionary[asyncio.AbstractEventLoop, Selector] = WeakKeyDictionary() + + +class Selector: + def __init__(self, loop: asyncio.AbstractEventLoop): + self._loop = loop + self._thread = threading.Thread(target=self.run) + self._selector = DefaultSelector() + self._send, self._receive = socket.socketpair() + self._notify_key = self._selector.register(self._receive, EVENT_READ) + + def start(self) -> None: + from anyio._backends._asyncio import find_root_task + + find_root_task().add_done_callback(lambda task: self._stop()) + self._thread.start() + + def _stop(self) -> None: + self._send.send(b"\x00") + self._send.close() + self._thread.join() + del _selectors[self._loop] + assert ( + not self._selector.get_map() + ), "selector still has registered file descriptors after shutdown" + + def add_reader(self, fd: FileDescriptorLike, callback: Callable[[], Any]) -> None: + try: + key = self._selector.get_key(fd) + except KeyError: + self._selector.register(fd, EVENT_READ, {EVENT_READ: callback}) + else: + if EVENT_READ in key.data: + raise ValueError( + "this file descriptor is already registered for reading" + ) + + key.data[EVENT_READ] = callback + self._selector.modify(fd, key.events | EVENT_READ, key.data) + + def add_writer(self, fd: FileDescriptorLike, callback: Callable[[], Any]) -> None: + try: + key = self._selector.get_key(fd) + except KeyError: + self._selector.register(fd, EVENT_WRITE, {EVENT_WRITE: callback}) + else: + if EVENT_WRITE in key.data: + raise ValueError( + "this file descriptor is already registered for writing" + ) + + key.data[EVENT_WRITE] = callback + self._selector.modify(fd, key.events | EVENT_WRITE, key.data) + + def remove_reader(self, fd: FileDescriptorLike) -> bool: + try: + key = self._selector.get_key(fd) + except KeyError: + return False + + if new_events := key.events ^ EVENT_READ: + del key.data[EVENT_READ] + self._selector.modify(fd, new_events) + else: + self._selector.unregister(fd) + + return True + + def remove_writer(self, fd: FileDescriptorLike) -> bool: + try: + key = self._selector.get_key(fd) + except KeyError: + return False + + if new_events := key.events ^ EVENT_WRITE: + del key.data[EVENT_WRITE] + self._selector.modify(fd, new_events) + else: + self._selector.unregister(fd) + + return True + + def run(self) -> None: + while True: + for key, events in self._selector.select(): + if key is self._notify_key: + self._receive.close() + return + + if events & EVENT_READ and (callback := key.data.get(EVENT_READ)): + self._loop.call_soon_threadsafe(callback) + + if events & EVENT_WRITE and (callback := key.data.get(EVENT_WRITE)): + self._loop.call_soon_threadsafe(callback) + + +def get_selector(loop: asyncio.AbstractEventLoop) -> Selector: + try: + return _selectors[loop] + except AttributeError: + _selectors[loop] = selector = Selector(asyncio.get_running_loop()) + selector.start() + return selector From 0a94755642a019c3cfccb4b1a213d0d20a37930c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 1 Dec 2024 14:39:15 +0200 Subject: [PATCH 14/33] Fixed AttributeError -> KeyError in get_selector() --- src/anyio/_core/_asyncio_selector_thread.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/anyio/_core/_asyncio_selector_thread.py b/src/anyio/_core/_asyncio_selector_thread.py index bec20e5a..d56c89c8 100644 --- a/src/anyio/_core/_asyncio_selector_thread.py +++ b/src/anyio/_core/_asyncio_selector_thread.py @@ -110,7 +110,7 @@ def run(self) -> None: def get_selector(loop: asyncio.AbstractEventLoop) -> Selector: try: return _selectors[loop] - except AttributeError: + except KeyError: _selectors[loop] = selector = Selector(asyncio.get_running_loop()) selector.start() return selector From 1d685e45bf7d3ce665fc71245ff280677c3f43d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 1 Dec 2024 15:12:35 +0200 Subject: [PATCH 15/33] Fixed AssertionError at exit --- src/anyio/_core/_asyncio_selector_thread.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/anyio/_core/_asyncio_selector_thread.py b/src/anyio/_core/_asyncio_selector_thread.py index d56c89c8..d51a7c01 100644 --- a/src/anyio/_core/_asyncio_selector_thread.py +++ b/src/anyio/_core/_asyncio_selector_thread.py @@ -97,6 +97,7 @@ def run(self) -> None: while True: for key, events in self._selector.select(): if key is self._notify_key: + self._selector.unregister(key.fd) self._receive.close() return From 17140f382406d1900c81050402ab3be67b9a26ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 1 Dec 2024 15:31:31 +0200 Subject: [PATCH 16/33] Use FileDescriptorLike also in the test module --- tests/test_sockets.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/test_sockets.py b/tests/test_sockets.py index 3d61a88f..987e3539 100644 --- a/tests/test_sockets.py +++ b/tests/test_sockets.py @@ -65,7 +65,7 @@ from exceptiongroup import ExceptionGroup if TYPE_CHECKING: - from _typeshed import HasFileno + from _typeshed import FileDescriptorLike AnyIPAddressFamily = Literal[ AddressFamily.AF_UNSPEC, AddressFamily.AF_INET, AddressFamily.AF_INET6 @@ -1871,7 +1871,9 @@ async def test_wait_socket(event: str, socket_type: str) -> None: conn, addr = server_sock.accept() with conn: - sock_or_fd: HasFileno | int = conn.fileno() if socket_type == "fd" else conn + sock_or_fd: FileDescriptorLike = ( + conn.fileno() if socket_type == "fd" else conn + ) with fail_after(10): await wait(sock_or_fd) assert conn.recv(1024) == b"Hello, world" From 3fc63a4ed115ef209af6271e2e899fa00833a243 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 1 Dec 2024 15:49:22 +0200 Subject: [PATCH 17/33] Fixed timeout errors --- src/anyio/_core/_asyncio_selector_thread.py | 18 ++++++++++++++---- tests/test_sockets.py | 2 +- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/anyio/_core/_asyncio_selector_thread.py b/src/anyio/_core/_asyncio_selector_thread.py index d51a7c01..bc08a015 100644 --- a/src/anyio/_core/_asyncio_selector_thread.py +++ b/src/anyio/_core/_asyncio_selector_thread.py @@ -21,6 +21,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop): self._selector = DefaultSelector() self._send, self._receive = socket.socketpair() self._notify_key = self._selector.register(self._receive, EVENT_READ) + self._closed = False def start(self) -> None: from anyio._backends._asyncio import find_root_task @@ -29,6 +30,7 @@ def start(self) -> None: self._thread.start() def _stop(self) -> None: + self._closed = True self._send.send(b"\x00") self._send.close() self._thread.join() @@ -51,6 +53,8 @@ def add_reader(self, fd: FileDescriptorLike, callback: Callable[[], Any]) -> Non key.data[EVENT_READ] = callback self._selector.modify(fd, key.events | EVENT_READ, key.data) + self._send.send(b"\x00") + def add_writer(self, fd: FileDescriptorLike, callback: Callable[[], Any]) -> None: try: key = self._selector.get_key(fd) @@ -64,6 +68,8 @@ def add_writer(self, fd: FileDescriptorLike, callback: Callable[[], Any]) -> Non key.data[EVENT_WRITE] = callback self._selector.modify(fd, key.events | EVENT_WRITE, key.data) + + self._send.send(b"\x00") def remove_reader(self, fd: FileDescriptorLike) -> bool: try: @@ -94,19 +100,23 @@ def remove_writer(self, fd: FileDescriptorLike) -> bool: return True def run(self) -> None: - while True: + while not self._closed: for key, events in self._selector.select(): if key is self._notify_key: - self._selector.unregister(key.fd) - self._receive.close() - return + key.fileobj.recv(10240) + continue if events & EVENT_READ and (callback := key.data.get(EVENT_READ)): + self.remove_reader(key.fd) self._loop.call_soon_threadsafe(callback) if events & EVENT_WRITE and (callback := key.data.get(EVENT_WRITE)): + self.remove_writer(key.fd) self._loop.call_soon_threadsafe(callback) + self._selector.unregister(self._receive) + self._receive.close() + def get_selector(loop: asyncio.AbstractEventLoop) -> Selector: try: diff --git a/tests/test_sockets.py b/tests/test_sockets.py index 987e3539..be7317a6 100644 --- a/tests/test_sockets.py +++ b/tests/test_sockets.py @@ -1874,7 +1874,7 @@ async def test_wait_socket(event: str, socket_type: str) -> None: sock_or_fd: FileDescriptorLike = ( conn.fileno() if socket_type == "fd" else conn ) - with fail_after(10): + with fail_after(3): await wait(sock_or_fd) assert conn.recv(1024) == b"Hello, world" From 84029d2cc4652771de5efa13d7b27beee0563561 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 1 Dec 2024 15:52:11 +0200 Subject: [PATCH 18/33] Fixed mypy errors --- src/anyio/_core/_asyncio_selector_thread.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/anyio/_core/_asyncio_selector_thread.py b/src/anyio/_core/_asyncio_selector_thread.py index bc08a015..479db492 100644 --- a/src/anyio/_core/_asyncio_selector_thread.py +++ b/src/anyio/_core/_asyncio_selector_thread.py @@ -20,7 +20,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop): self._thread = threading.Thread(target=self.run) self._selector = DefaultSelector() self._send, self._receive = socket.socketpair() - self._notify_key = self._selector.register(self._receive, EVENT_READ) + self._selector.register(self._receive, EVENT_READ) self._closed = False def start(self) -> None: @@ -102,8 +102,8 @@ def remove_writer(self, fd: FileDescriptorLike) -> bool: def run(self) -> None: while not self._closed: for key, events in self._selector.select(): - if key is self._notify_key: - key.fileobj.recv(10240) + if key.fileobj is self._receive: + self._receive.recv(10240) continue if events & EVENT_READ and (callback := key.data.get(EVENT_READ)): From 84f05cecd6c4c4358bb5497c521ab6f070437596 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 1 Dec 2024 15:52:43 +0200 Subject: [PATCH 19/33] Fixed linting error --- src/anyio/_core/_asyncio_selector_thread.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/anyio/_core/_asyncio_selector_thread.py b/src/anyio/_core/_asyncio_selector_thread.py index 479db492..77403543 100644 --- a/src/anyio/_core/_asyncio_selector_thread.py +++ b/src/anyio/_core/_asyncio_selector_thread.py @@ -68,7 +68,7 @@ def add_writer(self, fd: FileDescriptorLike, callback: Callable[[], Any]) -> Non key.data[EVENT_WRITE] = callback self._selector.modify(fd, key.events | EVENT_WRITE, key.data) - + self._send.send(b"\x00") def remove_reader(self, fd: FileDescriptorLike) -> bool: From 56615c1a807e1a00e9f30faf200b7b2aef664aa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 1 Dec 2024 17:02:01 +0200 Subject: [PATCH 20/33] Updated the changelog and the docs on wait_readable/wait_writable --- docs/versionhistory.rst | 5 ++--- src/anyio/_core/_sockets.py | 20 +++++++------------- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 9f18235a..c9eb2b44 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -13,12 +13,11 @@ This library adheres to `Semantic Versioning 2.0 `_. an object with a ``.fileno()`` method or an integer handle, and deprecated their now obsolete versions (``wait_socket_readable()`` and ``wait_socket_writable()``) (PR by @davidbrochart) +- Added support for ``wait_readable()`` and ``wait_writable()`` on ``ProactorEventLoop`` + (used on asyncio + Windows by default) - Fixed the return type annotations of ``readinto()`` and ``readinto1()`` methods in the ``anyio.AsyncFile`` class (`#825 `_) -- Ported ``ThreadSelectorEventLoop`` from Tornado to allow - ``anyio.wait_readable()`` and ``anyio.wait_writable()`` to work on Windows with a - ``ProactorEventLoop``. **4.6.2** diff --git a/src/anyio/_core/_sockets.py b/src/anyio/_core/_sockets.py index cbdd5af5..5615172a 100644 --- a/src/anyio/_core/_sockets.py +++ b/src/anyio/_core/_sockets.py @@ -660,10 +660,7 @@ def wait_readable(obj: FileDescriptorLike) -> Awaitable[None]: descriptors aren't supported, and neither are handles that refer to anything besides a ``SOCKET``. - This does **NOT** work on Windows when using the asyncio backend with a proactor - event loop (default on py3.8+). - - .. warning:: Only use this on raw sockets that have not been wrapped by any higher + .. warning:: Don't use this on raw sockets that have been wrapped by any higher level constructs like socket streams! :param obj: an object with a ``.fileno()`` method or an integer handle @@ -680,21 +677,18 @@ def wait_writable(obj: FileDescriptorLike) -> Awaitable[None]: """ Wait until the given object can be written to. - This does **NOT** work on Windows when using the asyncio backend with a proactor - event loop (default on py3.8+). - - .. seealso:: See the documentation of :func:`wait_readable` for the definition of - ``obj``. - - .. warning:: Only use this on raw sockets that have not been wrapped by any higher - level constructs like socket streams! - :param obj: an object with a ``.fileno()`` method or an integer handle :raises ~anyio.ClosedResourceError: if the object was closed while waiting for the object to become writable :raises ~anyio.BusyResourceError: if another task is already waiting for the object to become writable + .. seealso:: See the documentation of :func:`wait_readable` for the definition of + ``obj``. + + .. warning:: Don't use this on raw sockets that have been wrapped by any higher + level constructs like socket streams! + """ return get_async_backend().wait_writable(obj) From 65662c8ddb579d75a205838b00fe624e1763568b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 1 Dec 2024 18:40:19 +0200 Subject: [PATCH 21/33] Refactored implementation to use a global selector for all event loops --- src/anyio/_backends/_asyncio.py | 16 ++++--- src/anyio/_core/_asyncio_selector_thread.py | 52 ++++++++++++--------- 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index f385b019..53e6415a 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -2752,21 +2752,23 @@ async def wait_readable(cls, obj: FileDescriptorLike) -> None: loop = get_running_loop() event = read_events[obj] = asyncio.Event() + remove_reader: Callable[[FileDescriptorLike], bool] | None = None try: loop.add_reader(obj, event.set) remove_reader = loop.remove_reader except NotImplementedError: from anyio._core._asyncio_selector_thread import get_selector - selector = get_selector(loop) + selector = get_selector() selector.add_reader(obj, event.set) - remove_reader = selector.remove_reader try: await event.wait() finally: if read_events.pop(obj, None) is not None: - remove_reader(obj) + if remove_reader is not None: + remove_reader(obj) + readable = True else: readable = False @@ -2791,21 +2793,23 @@ async def wait_writable(cls, obj: FileDescriptorLike) -> None: loop = get_running_loop() event = write_events[obj] = asyncio.Event() + remove_writer: Callable[[FileDescriptorLike], bool] | None = None try: loop.add_writer(obj, event.set) remove_writer = loop.remove_writer except NotImplementedError: from anyio._core._asyncio_selector_thread import get_selector - selector = get_selector(loop) + selector = get_selector() selector.add_writer(obj, event.set) - remove_writer = selector.remove_writer try: await event.wait() finally: if write_events.pop(obj, None) is not None: - remove_writer(obj) + if remove_writer is not None: + remove_writer(obj) + writable = True else: writable = False diff --git a/src/anyio/_core/_asyncio_selector_thread.py b/src/anyio/_core/_asyncio_selector_thread.py index 77403543..9f540f4d 100644 --- a/src/anyio/_core/_asyncio_selector_thread.py +++ b/src/anyio/_core/_asyncio_selector_thread.py @@ -6,17 +6,16 @@ from collections.abc import Callable from selectors import EVENT_READ, EVENT_WRITE, DefaultSelector from typing import TYPE_CHECKING, Any -from weakref import WeakKeyDictionary if TYPE_CHECKING: from _typeshed import FileDescriptorLike -_selectors: WeakKeyDictionary[asyncio.AbstractEventLoop, Selector] = WeakKeyDictionary() +_selector_lock = threading.Lock() +_selector: Selector | None = None class Selector: - def __init__(self, loop: asyncio.AbstractEventLoop): - self._loop = loop + def __init__(self) -> None: self._thread = threading.Thread(target=self.run) self._selector = DefaultSelector() self._send, self._receive = socket.socketpair() @@ -24,49 +23,50 @@ def __init__(self, loop: asyncio.AbstractEventLoop): self._closed = False def start(self) -> None: - from anyio._backends._asyncio import find_root_task - - find_root_task().add_done_callback(lambda task: self._stop()) self._thread.start() + threading._register_atexit(self._stop) # type: ignore[attr-defined] def _stop(self) -> None: + global _selector self._closed = True self._send.send(b"\x00") self._send.close() self._thread.join() - del _selectors[self._loop] + _selector = None assert ( not self._selector.get_map() ), "selector still has registered file descriptors after shutdown" def add_reader(self, fd: FileDescriptorLike, callback: Callable[[], Any]) -> None: + loop = asyncio.get_running_loop() try: key = self._selector.get_key(fd) except KeyError: - self._selector.register(fd, EVENT_READ, {EVENT_READ: callback}) + self._selector.register(fd, EVENT_READ, {EVENT_READ: (loop, callback)}) else: if EVENT_READ in key.data: raise ValueError( "this file descriptor is already registered for reading" ) - key.data[EVENT_READ] = callback + key.data[EVENT_READ] = loop, callback self._selector.modify(fd, key.events | EVENT_READ, key.data) self._send.send(b"\x00") def add_writer(self, fd: FileDescriptorLike, callback: Callable[[], Any]) -> None: + loop = asyncio.get_running_loop() try: key = self._selector.get_key(fd) except KeyError: - self._selector.register(fd, EVENT_WRITE, {EVENT_WRITE: callback}) + self._selector.register(fd, EVENT_WRITE, {EVENT_WRITE: (loop, callback)}) else: if EVENT_WRITE in key.data: raise ValueError( "this file descriptor is already registered for writing" ) - key.data[EVENT_WRITE] = callback + key.data[EVENT_WRITE] = loop, callback self._selector.modify(fd, key.events | EVENT_WRITE, key.data) self._send.send(b"\x00") @@ -106,22 +106,30 @@ def run(self) -> None: self._receive.recv(10240) continue - if events & EVENT_READ and (callback := key.data.get(EVENT_READ)): + if events & EVENT_READ: + loop, callback = key.data self.remove_reader(key.fd) - self._loop.call_soon_threadsafe(callback) + try: + loop.call_soon_threadsafe(callback) + except RuntimeError: + pass # the loop was already closed - if events & EVENT_WRITE and (callback := key.data.get(EVENT_WRITE)): + if events & EVENT_WRITE: + loop, callback = key.data self.remove_writer(key.fd) - self._loop.call_soon_threadsafe(callback) + try: + loop.call_soon_threadsafe(callback) + except RuntimeError: + pass # the loop was already closed self._selector.unregister(self._receive) self._receive.close() -def get_selector(loop: asyncio.AbstractEventLoop) -> Selector: - try: - return _selectors[loop] - except KeyError: - _selectors[loop] = selector = Selector(asyncio.get_running_loop()) - selector.start() +def get_selector() -> Selector: + with _selector_lock: + if _selector is None: + selector = Selector() + selector.start() + return selector From d9ac6704b1d73b2f4cbdff986dd494425b6e7c5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 1 Dec 2024 18:51:13 +0200 Subject: [PATCH 22/33] Fixed test failures --- src/anyio/_core/_asyncio_selector_thread.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/anyio/_core/_asyncio_selector_thread.py b/src/anyio/_core/_asyncio_selector_thread.py index 9f540f4d..fd367f72 100644 --- a/src/anyio/_core/_asyncio_selector_thread.py +++ b/src/anyio/_core/_asyncio_selector_thread.py @@ -107,7 +107,7 @@ def run(self) -> None: continue if events & EVENT_READ: - loop, callback = key.data + loop, callback = key.data[EVENT_READ] self.remove_reader(key.fd) try: loop.call_soon_threadsafe(callback) @@ -115,7 +115,7 @@ def run(self) -> None: pass # the loop was already closed if events & EVENT_WRITE: - loop, callback = key.data + loop, callback = key.data[EVENT_WRITE] self.remove_writer(key.fd) try: loop.call_soon_threadsafe(callback) From 24fdf30ce776879d5665bfe41e9ffc1775432940 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 1 Dec 2024 18:51:53 +0200 Subject: [PATCH 23/33] Added explicit thread name --- src/anyio/_core/_asyncio_selector_thread.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/anyio/_core/_asyncio_selector_thread.py b/src/anyio/_core/_asyncio_selector_thread.py index fd367f72..3195e97a 100644 --- a/src/anyio/_core/_asyncio_selector_thread.py +++ b/src/anyio/_core/_asyncio_selector_thread.py @@ -16,7 +16,7 @@ class Selector: def __init__(self) -> None: - self._thread = threading.Thread(target=self.run) + self._thread = threading.Thread(target=self.run, name="AnyIO socket selector") self._selector = DefaultSelector() self._send, self._receive = socket.socketpair() self._selector.register(self._receive, EVENT_READ) @@ -79,7 +79,7 @@ def remove_reader(self, fd: FileDescriptorLike) -> bool: if new_events := key.events ^ EVENT_READ: del key.data[EVENT_READ] - self._selector.modify(fd, new_events) + self._selector.modify(fd, new_events, key.data) else: self._selector.unregister(fd) @@ -93,7 +93,7 @@ def remove_writer(self, fd: FileDescriptorLike) -> bool: if new_events := key.events ^ EVENT_WRITE: del key.data[EVENT_WRITE] - self._selector.modify(fd, new_events) + self._selector.modify(fd, new_events, key.data) else: self._selector.unregister(fd) From 68266962d773979ed85b5c7cead2ab2a6e234a5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 1 Dec 2024 18:53:14 +0200 Subject: [PATCH 24/33] Really set the global selector --- src/anyio/_core/_asyncio_selector_thread.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/anyio/_core/_asyncio_selector_thread.py b/src/anyio/_core/_asyncio_selector_thread.py index 3195e97a..9ac9bd0c 100644 --- a/src/anyio/_core/_asyncio_selector_thread.py +++ b/src/anyio/_core/_asyncio_selector_thread.py @@ -127,9 +127,11 @@ def run(self) -> None: def get_selector() -> Selector: + global _selector + with _selector_lock: if _selector is None: - selector = Selector() - selector.start() + _selector = Selector() + _selector.start() - return selector + return _selector From d793aa541f31fb2bd7de85a7940c65b483587ad1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 1 Dec 2024 20:43:09 +0200 Subject: [PATCH 25/33] Remove fd from selector on exception Also removed some dead code. --- src/anyio/_backends/_asyncio.py | 30 ++++++------------------------ 1 file changed, 6 insertions(+), 24 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 53e6415a..fe0f170f 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -2748,11 +2748,10 @@ async def wait_readable(cls, obj: FileDescriptorLike) -> None: obj = obj.fileno() if read_events.get(obj): - raise BusyResourceError("reading from") from None + raise BusyResourceError("reading from") loop = get_running_loop() event = read_events[obj] = asyncio.Event() - remove_reader: Callable[[FileDescriptorLike], bool] | None = None try: loop.add_reader(obj, event.set) remove_reader = loop.remove_reader @@ -2761,20 +2760,12 @@ async def wait_readable(cls, obj: FileDescriptorLike) -> None: selector = get_selector() selector.add_reader(obj, event.set) + remove_reader = selector.remove_reader try: await event.wait() finally: - if read_events.pop(obj, None) is not None: - if remove_reader is not None: - remove_reader(obj) - - readable = True - else: - readable = False - - if not readable: - raise ClosedResourceError + remove_reader(obj) @classmethod async def wait_writable(cls, obj: FileDescriptorLike) -> None: @@ -2789,11 +2780,10 @@ async def wait_writable(cls, obj: FileDescriptorLike) -> None: obj = obj.fileno() if write_events.get(obj): - raise BusyResourceError("writing to") from None + raise BusyResourceError("writing to") loop = get_running_loop() event = write_events[obj] = asyncio.Event() - remove_writer: Callable[[FileDescriptorLike], bool] | None = None try: loop.add_writer(obj, event.set) remove_writer = loop.remove_writer @@ -2802,20 +2792,12 @@ async def wait_writable(cls, obj: FileDescriptorLike) -> None: selector = get_selector() selector.add_writer(obj, event.set) + remove_writer = selector.remove_writer try: await event.wait() finally: - if write_events.pop(obj, None) is not None: - if remove_writer is not None: - remove_writer(obj) - - writable = True - else: - writable = False - - if not writable: - raise ClosedResourceError + remove_writer(obj) @classmethod def current_default_thread_limiter(cls) -> CapacityLimiter: From 24b4a9bf6d3cb6e48efcbb7d89eacdd2ca75e29d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Mon, 2 Dec 2024 10:52:16 +0200 Subject: [PATCH 26/33] Fixed events never getting removed from _(read|write)_events --- src/anyio/_backends/_asyncio.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index fe0f170f..45fa0b79 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -2751,7 +2751,7 @@ async def wait_readable(cls, obj: FileDescriptorLike) -> None: raise BusyResourceError("reading from") loop = get_running_loop() - event = read_events[obj] = asyncio.Event() + event = asyncio.Event() try: loop.add_reader(obj, event.set) remove_reader = loop.remove_reader @@ -2762,10 +2762,12 @@ async def wait_readable(cls, obj: FileDescriptorLike) -> None: selector.add_reader(obj, event.set) remove_reader = selector.remove_reader + read_events[obj] = event try: await event.wait() finally: remove_reader(obj) + del read_events[obj] @classmethod async def wait_writable(cls, obj: FileDescriptorLike) -> None: @@ -2783,7 +2785,7 @@ async def wait_writable(cls, obj: FileDescriptorLike) -> None: raise BusyResourceError("writing to") loop = get_running_loop() - event = write_events[obj] = asyncio.Event() + event = asyncio.Event() try: loop.add_writer(obj, event.set) remove_writer = loop.remove_writer @@ -2794,9 +2796,11 @@ async def wait_writable(cls, obj: FileDescriptorLike) -> None: selector.add_writer(obj, event.set) remove_writer = selector.remove_writer + write_events[obj] = event try: await event.wait() finally: + del write_events[obj] remove_writer(obj) @classmethod From 7274d34fa34e4611beae8a44b79406381e25821e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Tue, 3 Dec 2024 00:22:28 +0200 Subject: [PATCH 27/33] Addressed some review comments --- src/anyio/_core/_asyncio_selector_thread.py | 22 ++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/anyio/_core/_asyncio_selector_thread.py b/src/anyio/_core/_asyncio_selector_thread.py index 9ac9bd0c..3f81aa90 100644 --- a/src/anyio/_core/_asyncio_selector_thread.py +++ b/src/anyio/_core/_asyncio_selector_thread.py @@ -19,6 +19,8 @@ def __init__(self) -> None: self._thread = threading.Thread(target=self.run, name="AnyIO socket selector") self._selector = DefaultSelector() self._send, self._receive = socket.socketpair() + self._send.setblocking(False) + self._receive.setblocking(False) self._selector.register(self._receive, EVENT_READ) self._closed = False @@ -29,14 +31,23 @@ def start(self) -> None: def _stop(self) -> None: global _selector self._closed = True - self._send.send(b"\x00") + self._notify_self() self._send.close() self._thread.join() + self._selector.unregister(self._receive) + self._receive.close() + self._selector.close() _selector = None assert ( not self._selector.get_map() ), "selector still has registered file descriptors after shutdown" + def _notify_self(self) -> None: + try: + self._send.send(b"\x00") + except BlockingIOError: + pass + def add_reader(self, fd: FileDescriptorLike, callback: Callable[[], Any]) -> None: loop = asyncio.get_running_loop() try: @@ -52,7 +63,7 @@ def add_reader(self, fd: FileDescriptorLike, callback: Callable[[], Any]) -> Non key.data[EVENT_READ] = loop, callback self._selector.modify(fd, key.events | EVENT_READ, key.data) - self._send.send(b"\x00") + self._notify_self() def add_writer(self, fd: FileDescriptorLike, callback: Callable[[], Any]) -> None: loop = asyncio.get_running_loop() @@ -69,7 +80,7 @@ def add_writer(self, fd: FileDescriptorLike, callback: Callable[[], Any]) -> Non key.data[EVENT_WRITE] = loop, callback self._selector.modify(fd, key.events | EVENT_WRITE, key.data) - self._send.send(b"\x00") + self._notify_self() def remove_reader(self, fd: FileDescriptorLike) -> bool: try: @@ -103,7 +114,7 @@ def run(self) -> None: while not self._closed: for key, events in self._selector.select(): if key.fileobj is self._receive: - self._receive.recv(10240) + self._receive.recv(4096) continue if events & EVENT_READ: @@ -122,9 +133,6 @@ def run(self) -> None: except RuntimeError: pass # the loop was already closed - self._selector.unregister(self._receive) - self._receive.close() - def get_selector() -> Selector: global _selector From e13876141a1e6ef7d6e1b4212863806b020fbff8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Tue, 3 Dec 2024 00:24:27 +0200 Subject: [PATCH 28/33] Drain the buffer from the notify receive socket whenever it's flagged as readable --- src/anyio/_core/_asyncio_selector_thread.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/anyio/_core/_asyncio_selector_thread.py b/src/anyio/_core/_asyncio_selector_thread.py index 3f81aa90..d777b05f 100644 --- a/src/anyio/_core/_asyncio_selector_thread.py +++ b/src/anyio/_core/_asyncio_selector_thread.py @@ -114,7 +114,12 @@ def run(self) -> None: while not self._closed: for key, events in self._selector.select(): if key.fileobj is self._receive: - self._receive.recv(4096) + while True: + try: + self._receive.recv(4096) + except BlockingIOError: + break + continue if events & EVENT_READ: From 748270b6d266e0cbdd3425ee8354f59bc4493128 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Tue, 3 Dec 2024 00:26:49 +0200 Subject: [PATCH 29/33] Moved remove_(reader|writer) to the else block --- src/anyio/_backends/_asyncio.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 45fa0b79..c1fd0d1e 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -2754,13 +2754,14 @@ async def wait_readable(cls, obj: FileDescriptorLike) -> None: event = asyncio.Event() try: loop.add_reader(obj, event.set) - remove_reader = loop.remove_reader except NotImplementedError: from anyio._core._asyncio_selector_thread import get_selector selector = get_selector() selector.add_reader(obj, event.set) remove_reader = selector.remove_reader + else: + remove_reader = loop.remove_reader read_events[obj] = event try: @@ -2788,13 +2789,14 @@ async def wait_writable(cls, obj: FileDescriptorLike) -> None: event = asyncio.Event() try: loop.add_writer(obj, event.set) - remove_writer = loop.remove_writer except NotImplementedError: from anyio._core._asyncio_selector_thread import get_selector selector = get_selector() selector.add_writer(obj, event.set) remove_writer = selector.remove_writer + else: + remove_writer = loop.remove_writer write_events[obj] = event try: From 7c332d69f737921c9d5d4b34a46682372689ee59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Tue, 3 Dec 2024 00:30:53 +0200 Subject: [PATCH 30/33] Don't skip the wait_socket tests on ProactorEventLoop --- tests/test_sockets.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tests/test_sockets.py b/tests/test_sockets.py index be7317a6..b5143df0 100644 --- a/tests/test_sockets.py +++ b/tests/test_sockets.py @@ -1880,13 +1880,6 @@ async def test_wait_socket(event: str, socket_type: str) -> None: async def test_deprecated_wait_socket(anyio_backend_name: str) -> None: - if anyio_backend_name == "asyncio" and platform.system() == "Windows": - import asyncio - - policy = asyncio.get_event_loop_policy() - if policy.__class__.__name__ == "WindowsProactorEventLoopPolicy": - pytest.skip("Does not work on asyncio/Windows/ProactorEventLoop") - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: with pytest.warns( DeprecationWarning, From e19938ccbf8a3f180e4298190b2db53c5dcca5c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Tue, 3 Dec 2024 13:23:24 +0200 Subject: [PATCH 31/33] Loop until all data is read --- src/anyio/_core/_asyncio_selector_thread.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/anyio/_core/_asyncio_selector_thread.py b/src/anyio/_core/_asyncio_selector_thread.py index d777b05f..d98c3040 100644 --- a/src/anyio/_core/_asyncio_selector_thread.py +++ b/src/anyio/_core/_asyncio_selector_thread.py @@ -114,11 +114,11 @@ def run(self) -> None: while not self._closed: for key, events in self._selector.select(): if key.fileobj is self._receive: - while True: - try: - self._receive.recv(4096) - except BlockingIOError: - break + try: + while self._receive.recv(4096): + pass + except BlockingIOError: + pass continue From dce49fc1cd28f85d3715fe6b91266c569efb34c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Tue, 3 Dec 2024 13:37:46 +0200 Subject: [PATCH 32/33] Added an implementation note --- src/anyio/_core/_sockets.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/anyio/_core/_sockets.py b/src/anyio/_core/_sockets.py index 5615172a..c4161c16 100644 --- a/src/anyio/_core/_sockets.py +++ b/src/anyio/_core/_sockets.py @@ -660,6 +660,11 @@ def wait_readable(obj: FileDescriptorLike) -> Awaitable[None]: descriptors aren't supported, and neither are handles that refer to anything besides a ``SOCKET``. + On backends where this functionality is not natively provided (asyncio + ``ProactorEventLoop`` on Windows) Additionally, on asyncio, this functionality is + provided using a separate selector thread which is set to shut down when the + interpreter shuts down. + .. warning:: Don't use this on raw sockets that have been wrapped by any higher level constructs like socket streams! @@ -684,7 +689,7 @@ def wait_writable(obj: FileDescriptorLike) -> Awaitable[None]: to become writable .. seealso:: See the documentation of :func:`wait_readable` for the definition of - ``obj``. + ``obj`` and notes on backend compatibility. .. warning:: Don't use this on raw sockets that have been wrapped by any higher level constructs like socket streams! From 43d1dd4009a9222181c01c18e2da558920089062 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Tue, 3 Dec 2024 13:53:45 +0200 Subject: [PATCH 33/33] Fixed wording of new paragraph --- src/anyio/_core/_sockets.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/anyio/_core/_sockets.py b/src/anyio/_core/_sockets.py index c4161c16..a822d060 100644 --- a/src/anyio/_core/_sockets.py +++ b/src/anyio/_core/_sockets.py @@ -661,9 +661,8 @@ def wait_readable(obj: FileDescriptorLike) -> Awaitable[None]: a ``SOCKET``. On backends where this functionality is not natively provided (asyncio - ``ProactorEventLoop`` on Windows) Additionally, on asyncio, this functionality is - provided using a separate selector thread which is set to shut down when the - interpreter shuts down. + ``ProactorEventLoop`` on Windows), it is provided using a separate selector thread + which is set to shut down when the interpreter shuts down. .. warning:: Don't use this on raw sockets that have been wrapped by any higher level constructs like socket streams!