-
Notifications
You must be signed in to change notification settings - Fork 144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for wait_readable() and wait_writable() on ProactorEventLoop #831
Merged
Merged
Changes from 37 commits
Commits
Show all changes
38 commits
Select commit
Hold shift + click to select a range
a7335d9
Use ThreadSelectorEventLoop on Windows with ProactorEventLoop
davidbrochart d67a150
Removed AddThreadSelectorEventLoop
davidbrochart c1dd759
Skip test on Windows/Trio
davidbrochart 76d23fa
Add back loop close and test on Windows/Trio
davidbrochart 8b18582
Review
davidbrochart 40c7347
Merge branch 'master' into selector-thread
agronholm 1051fcd
Fix
davidbrochart 01ccdf2
Removed unneeded anyio_backend_name
davidbrochart c381617
Added closing parenthesis
davidbrochart e21f323
Updated changelog
davidbrochart a379ccf
Use HasFileno from typeshed
davidbrochart 872329a
Merge branch 'master' into selector-thread
davidbrochart 6980062
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 19536de
Merge branch 'master' into selector-thread
davidbrochart 3ea42bf
Close selector thread when root task is done
davidbrochart 90b8eea
Merge branch 'master' into selector-thread
agronholm f68df73
Alternate implementation of the selector thread
agronholm 0a94755
Fixed AttributeError -> KeyError in get_selector()
agronholm 8fce306
Merge branch 'selector-thread' into selector-thread-alternate
agronholm 1d685e4
Fixed AssertionError at exit
agronholm 17140f3
Use FileDescriptorLike also in the test module
agronholm 3fc63a4
Fixed timeout errors
agronholm 84029d2
Fixed mypy errors
agronholm 84f05ce
Fixed linting error
agronholm 56615c1
Updated the changelog and the docs on wait_readable/wait_writable
agronholm 65662c8
Refactored implementation to use a global selector for all event loops
agronholm d9ac670
Fixed test failures
agronholm 24fdf30
Added explicit thread name
agronholm 6826696
Really set the global selector
agronholm d793aa5
Remove fd from selector on exception
agronholm 24b4a9b
Fixed events never getting removed from _(read|write)_events
agronholm 7274d34
Addressed some review comments
agronholm e138761
Drain the buffer from the notify receive socket whenever it's flagged…
agronholm 748270b
Moved remove_(reader|writer) to the else block
agronholm 7c332d6
Don't skip the wait_socket tests on ProactorEventLoop
agronholm e19938c
Loop until all data is read
agronholm dce49fc
Added an implementation note
agronholm 43d1dd4
Fixed wording of new paragraph
agronholm File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
from __future__ import annotations | ||
|
||
import asyncio | ||
import socket | ||
import threading | ||
from collections.abc import Callable | ||
from selectors import EVENT_READ, EVENT_WRITE, DefaultSelector | ||
from typing import TYPE_CHECKING, Any | ||
|
||
if TYPE_CHECKING: | ||
from _typeshed import FileDescriptorLike | ||
|
||
_selector_lock = threading.Lock() | ||
_selector: Selector | None = None | ||
|
||
|
||
class Selector: | ||
def __init__(self) -> None: | ||
self._thread = threading.Thread(target=self.run, name="AnyIO socket selector") | ||
self._selector = DefaultSelector() | ||
self._send, self._receive = socket.socketpair() | ||
self._send.setblocking(False) | ||
self._receive.setblocking(False) | ||
self._selector.register(self._receive, EVENT_READ) | ||
self._closed = False | ||
|
||
def start(self) -> None: | ||
self._thread.start() | ||
threading._register_atexit(self._stop) # type: ignore[attr-defined] | ||
|
||
def _stop(self) -> None: | ||
global _selector | ||
self._closed = True | ||
self._notify_self() | ||
self._send.close() | ||
self._thread.join() | ||
self._selector.unregister(self._receive) | ||
self._receive.close() | ||
self._selector.close() | ||
_selector = None | ||
assert ( | ||
not self._selector.get_map() | ||
), "selector still has registered file descriptors after shutdown" | ||
|
||
def _notify_self(self) -> None: | ||
try: | ||
self._send.send(b"\x00") | ||
except BlockingIOError: | ||
pass | ||
|
||
def add_reader(self, fd: FileDescriptorLike, callback: Callable[[], Any]) -> None: | ||
loop = asyncio.get_running_loop() | ||
try: | ||
key = self._selector.get_key(fd) | ||
except KeyError: | ||
self._selector.register(fd, EVENT_READ, {EVENT_READ: (loop, callback)}) | ||
else: | ||
if EVENT_READ in key.data: | ||
raise ValueError( | ||
"this file descriptor is already registered for reading" | ||
) | ||
|
||
key.data[EVENT_READ] = loop, callback | ||
self._selector.modify(fd, key.events | EVENT_READ, key.data) | ||
|
||
self._notify_self() | ||
|
||
def add_writer(self, fd: FileDescriptorLike, callback: Callable[[], Any]) -> None: | ||
loop = asyncio.get_running_loop() | ||
try: | ||
key = self._selector.get_key(fd) | ||
except KeyError: | ||
self._selector.register(fd, EVENT_WRITE, {EVENT_WRITE: (loop, callback)}) | ||
else: | ||
if EVENT_WRITE in key.data: | ||
raise ValueError( | ||
"this file descriptor is already registered for writing" | ||
) | ||
|
||
key.data[EVENT_WRITE] = loop, callback | ||
self._selector.modify(fd, key.events | EVENT_WRITE, key.data) | ||
|
||
self._notify_self() | ||
|
||
def remove_reader(self, fd: FileDescriptorLike) -> bool: | ||
try: | ||
key = self._selector.get_key(fd) | ||
except KeyError: | ||
return False | ||
|
||
if new_events := key.events ^ EVENT_READ: | ||
del key.data[EVENT_READ] | ||
self._selector.modify(fd, new_events, key.data) | ||
else: | ||
self._selector.unregister(fd) | ||
|
||
return True | ||
|
||
def remove_writer(self, fd: FileDescriptorLike) -> bool: | ||
try: | ||
key = self._selector.get_key(fd) | ||
except KeyError: | ||
return False | ||
|
||
if new_events := key.events ^ EVENT_WRITE: | ||
del key.data[EVENT_WRITE] | ||
self._selector.modify(fd, new_events, key.data) | ||
else: | ||
self._selector.unregister(fd) | ||
|
||
return True | ||
|
||
def run(self) -> None: | ||
while not self._closed: | ||
for key, events in self._selector.select(): | ||
if key.fileobj is self._receive: | ||
try: | ||
while self._receive.recv(4096): | ||
pass | ||
except BlockingIOError: | ||
pass | ||
|
||
continue | ||
|
||
if events & EVENT_READ: | ||
loop, callback = key.data[EVENT_READ] | ||
self.remove_reader(key.fd) | ||
try: | ||
loop.call_soon_threadsafe(callback) | ||
except RuntimeError: | ||
pass # the loop was already closed | ||
|
||
if events & EVENT_WRITE: | ||
loop, callback = key.data[EVENT_WRITE] | ||
self.remove_writer(key.fd) | ||
try: | ||
loop.call_soon_threadsafe(callback) | ||
except RuntimeError: | ||
pass # the loop was already closed | ||
|
||
|
||
def get_selector() -> Selector: | ||
global _selector | ||
|
||
with _selector_lock: | ||
if _selector is None: | ||
_selector = Selector() | ||
_selector.start() | ||
|
||
return _selector |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -32,9 +32,9 @@ | |||||
from ._tasks import create_task_group, move_on_after | ||||||
|
||||||
if TYPE_CHECKING: | ||||||
from _typeshed import HasFileno | ||||||
from _typeshed import FileDescriptorLike | ||||||
else: | ||||||
HasFileno = object | ||||||
FileDescriptorLike = object | ||||||
|
||||||
if sys.version_info < (3, 11): | ||||||
from exceptiongroup import ExceptionGroup | ||||||
|
@@ -609,9 +609,6 @@ def wait_socket_readable(sock: socket.socket) -> Awaitable[None]: | |||||
|
||||||
Wait until the given socket has data to be read. | ||||||
|
||||||
This does **NOT** work on Windows when using the asyncio backend with a proactor | ||||||
event loop (default on py3.8+). | ||||||
|
||||||
.. warning:: Only use this on raw sockets that have not been wrapped by any higher | ||||||
level constructs like socket streams! | ||||||
|
||||||
|
@@ -649,7 +646,7 @@ def wait_socket_writable(sock: socket.socket) -> Awaitable[None]: | |||||
return get_async_backend().wait_writable(sock.fileno()) | ||||||
|
||||||
|
||||||
def wait_readable(obj: HasFileno | int) -> Awaitable[None]: | ||||||
def wait_readable(obj: FileDescriptorLike) -> Awaitable[None]: | ||||||
""" | ||||||
Wait until the given object has data to be read. | ||||||
|
||||||
|
@@ -663,10 +660,12 @@ def wait_readable(obj: HasFileno | int) -> Awaitable[None]: | |||||
descriptors aren't supported, and neither are handles that refer to anything besides | ||||||
a ``SOCKET``. | ||||||
|
||||||
This does **NOT** work on Windows when using the asyncio backend with a proactor | ||||||
event loop (default on py3.8+). | ||||||
On backends where this functionality is not natively provided (asyncio | ||||||
``ProactorEventLoop`` on Windows) Additionally, on asyncio, this functionality is | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Woops. I pushed a better wording just now. |
||||||
provided using a separate selector thread which is set to shut down when the | ||||||
interpreter shuts down. | ||||||
|
||||||
.. warning:: Only use this on raw sockets that have not been wrapped by any higher | ||||||
.. warning:: Don't use this on raw sockets that have been wrapped by any higher | ||||||
level constructs like socket streams! | ||||||
|
||||||
:param obj: an object with a ``.fileno()`` method or an integer handle | ||||||
|
@@ -679,25 +678,22 @@ def wait_readable(obj: HasFileno | int) -> Awaitable[None]: | |||||
return get_async_backend().wait_readable(obj) | ||||||
|
||||||
|
||||||
def wait_writable(obj: HasFileno | int) -> Awaitable[None]: | ||||||
def wait_writable(obj: FileDescriptorLike) -> Awaitable[None]: | ||||||
""" | ||||||
Wait until the given object can be written to. | ||||||
|
||||||
This does **NOT** work on Windows when using the asyncio backend with a proactor | ||||||
graingert marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
event loop (default on py3.8+). | ||||||
|
||||||
.. seealso:: See the documentation of :func:`wait_readable` for the definition of | ||||||
``obj``. | ||||||
|
||||||
.. warning:: Only use this on raw sockets that have not been wrapped by any higher | ||||||
level constructs like socket streams! | ||||||
|
||||||
:param obj: an object with a ``.fileno()`` method or an integer handle | ||||||
:raises ~anyio.ClosedResourceError: if the object was closed while waiting for the | ||||||
object to become writable | ||||||
:raises ~anyio.BusyResourceError: if another task is already waiting for the object | ||||||
to become writable | ||||||
|
||||||
.. seealso:: See the documentation of :func:`wait_readable` for the definition of | ||||||
``obj`` and notes on backend compatibility. | ||||||
|
||||||
.. warning:: Don't use this on raw sockets that have been wrapped by any higher | ||||||
level constructs like socket streams! | ||||||
|
||||||
""" | ||||||
return get_async_backend().wait_writable(obj) | ||||||
|
||||||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think both sockets should be non-blocking, and you should ignore BlockingIOError on the send side
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.