Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support AnyIO #2045

Closed
wants to merge 1 commit into from
Closed

Support AnyIO #2045

wants to merge 1 commit into from

Conversation

davidbrochart
Copy link

Closes #1827.

@davidbrochart davidbrochart marked this pull request as draft October 29, 2024 09:19
@minrk
Copy link
Member

minrk commented Oct 29, 2024

Awesome, thanks! Let me know when you think it's ready for review or if you have any questions. The edge-triggered FD stuff is very fiddly.

@davidbrochart davidbrochart marked this pull request as ready for review October 31, 2024 08:19
@davidbrochart
Copy link
Author

It's still a WIP, but it would be nice to know what you think about it @minrk.
The API in sugar/socket.py should probably not be async, I'm not sure exactly where to have the sync/async split.

@minrk
Copy link
Member

minrk commented Oct 31, 2024

Yeah, sugar/socket.py, which is the base blocking API, should probably not need any changes at all, and definitely shouldn't introduce any async code.

It may be that the coroutine/wait_readable approach is different enough in design that it would be better to start zmq/anyio.py from scratch and only take inspiration from the current asyncio implementation (or copy/edit), rather than updating in-place. That would also be the easiest way to ensure backward compatibility, which is important. But if it seems easier to update in-place, that's a possibility, too. It's hard for me to say since I don't know what it's going to look like.

The hardest part, I think, will be correctly handling the edge-triggered FD with multiple awaiters, especially since it is hard to ensure we provoke this situation thoroughly in tests. Any event that consumes socket events must necessarily wake all tasks that are waiting on the FD or they can hang. We'll need to be thorough about the implications of a socket being used in multiple task groups, because all waiting tasks must wake. If that's a problem, we'll need to make sure to have locks to prevent the situations that won't work from happening.

Needing to wake tasks includes accessing the socket.EVENTS attribute, sending a message, and receiving a message. The FD being readable does not mean the socket is readable, it means there have been events on the socket, and socket.EVENTS must be checked. The socket not being readable does not mean there are not messages waiting, it means that events have not changed. so e.g. with multiple messages ready to recv, the FD will wake once, and all messages must be received before it is appropriate to call wait_readable again.

Plus we'll need a wait_readable implementation that works on proactor on Windows before it can be used. We have the add_reader version of this via tornado, which will need to be adapted to wait_readable. The principle is the same, so it can be built on top, I think.

assert recvd == [b"hi", b"there"]


@mark.skipif(not hasattr(zmq, "SNDTIMEO"), reason="requires SNDTIMEO")
async def test_send_timeout(socket):
s = socket(zmq.PUSH)
s.sndtimeo = 100
with pytest.raises(zmq.Again):
with pytest.raises(ExceptionGroup) as excinfo:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully this isn't required when it's wrapped up. A single coroutine should raise a single error, right? This would be a major breaking change and a significant degradation of the API.

@minrk
Copy link
Member

minrk commented Oct 31, 2024

Here's a very simple anyio example implementing async recv using only the blocking pyzmq API. It doesn't handle the edge-triggered nuances or timeouts, but it gets the very basics.

if key == EVENTS:
self._schedule_remaining_events(result)
# if key == EVENTS:
# self._schedule_remaining_events(result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the things we will need to keep some version of. This is part of handling the edge-triggered FD: all tasks waiting on wait_readable must wake if EVENTS is accessed from anywhere.

@davidbrochart
Copy link
Author

Yes I think you're right that it's safer to have a separate implementation just for AnyIO, for backwards-compatibility reasons. I'm not sure anything I've done will remain then, since I was hoping to replace the asyncio layer with AnyIO, but now we can start from scratch as you did in your example.
This also raises the question of whether to create a different package, or have the AnyIO support here in pyzmq?

@minrk
Copy link
Member

minrk commented Oct 31, 2024

I think either way is fine for the separate package or here. A separate package can also make sense as a prototyping pathway to release often and avoid needing to wait for pyzmq releases. It really shouldn't need changes in pyzmq, though I do occasionally have to do some cooperation to get all the serialization methods to work well with the async subclasses without too much duplication.

For example, outside pyzmq it would be totally appropriate to only support low-level send/recv_multipart, not all the serialization extensions. That doesn't fit in the pyzmq socket subclass approach where all methods need to work, and use a "has a" rather than "is a" relationship, which type systems like a lot better since subclass != subtype.

We can always merge upstream here if/when it seems like 'the way to go' for async pyzmq, or not. We did that with gevent_zeromq and pyzmq_static before. Or if we like maintaining the separate package, gently deprecate zmq.asyncio in favor of the new package.

@minrk
Copy link
Member

minrk commented Oct 31, 2024

(btw, what you have can still remain, and you can copy the changed files to zmq/anyio.py and then feel free to update without concerns around breaking APIs). But if you'd rather start the whole design from scratch, that's also fine by me. Getting all the waits/timeouts/cleanup right was hard, but maybe (hopefully!) it will be easier with an anyio approach. Timeouts and cleanup are usually a lot easier with task groups and coroutines than futures! And if we had normal level-triggered FDs, this whole thing would be pretty straightforward.

Maybe it should even be a function-based approach (async_send / async_recv) using blocking Socket objects instead of defining async Socket classes (this might make the bookkeeping of waking multiple waiters hard, I'm not sure). I'm absolutely not set on using the same approach I used for zmq.asyncio for anyio. I'm happy to pursue whatever you think would be suit anyio compatibility best.

@davidbrochart
Copy link
Author

Sounds good, I'll create a new package then.

@davidbrochart
Copy link
Author

One simple way would be to run pyzmq's blocking APIs in a thread, but I guess that would lead to very bad performances.

@davidbrochart
Copy link
Author

I have started https://github.com/davidbrochart/zmq-anyio so I'm closing this for now. Thanks for the feedback @minrk !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

FEAT: support AnyIO
2 participants