Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async/await pattern #318

Closed
yshui opened this issue Mar 29, 2020 · 34 comments · Fixed by #790
Closed

Async/await pattern #318

yshui opened this issue Mar 29, 2020 · 34 comments · Fixed by #790
Labels
enhancement New feature or request P5 Priority Nice-to-have stable API Things that might require API changes wontfix This will not be worked on

Comments

@yshui
Copy link

yshui commented Mar 29, 2020

Instead of using the cookie+reply pattern used by libxcb, Rust's async/await pattern seems to be a much more ergonomic way of doing asynchronous communication with the X server.

Also, the libxcb pattern allows the user to control when requests are sent and when buffer are drained by calling xcb_request_check and the *_reply functions. Whereas the async/await pattern would allow us more freedom in that regard.

@psychon
Copy link
Owner

psychon commented Mar 30, 2020

I'm not quite sure what you mean with "more freedoms" in your second paragraph.

For the first paragraph: Correct me if I'm wrong, but this would either require an extra thread that does all the reading (and that should also do all the writing, but that's not possible in the current design) or would require writing our own executor.

If I understand the Future trait correctly, something needs to save the provided Waker and wake it when the needed reply comes in. The rest would be simple. So, it might be possible to have cookies implement Future (behind a feature gate).

Any ideas on how to do that exactly?

Also, what is your use case? Do you suggest it based on "it would be cool" or do you have something in mind that is only possible in an ugly way currently and would be much nicer that async/await?

@yshui
Copy link
Author

yshui commented Mar 30, 2020

but this would either require an extra thread that does all the reading (and that should also do all the writing, but that's not possible in the current design) or would require writing our own executor.

From a user's perspective, probably yes. But it doesn't have to be, as for example, tokio has a single threaded executor. From the library's perspective, all we need to do is implement the Future trait properly, and we don't have to worry about the threaded-ness.

If I understand the Future trait correctly, ...

While we could do it this way, and managing Wakers ourselves, it would be easier if we use an existing async io crate, like tokio or async-std.

As for "freedom", consider this example:

// has to flush and wait for reply on each request
conn.create_window(COPY_DEPTH_FROM_PARENT, win_id, screen.root, 0, 0, 100, 100, 0, WindowClass::InputOutput,
                   0, &CreateWindowAux::new().background_pixel(screen.white_pixel))?.check();
conn.map_window(win_id)?.check();

compared to

// minimal code change, but now this can happen concurrently with other requests
conn.create_window(COPY_DEPTH_FROM_PARENT, win_id, screen.root, 0, 0, 100, 100, 0, WindowClass::InputOutput,
                   0, &CreateWindowAux::new().background_pixel(screen.white_pixel))?.await?;
conn.map_window(win_id)?.await?;

Also, what is your use case?

Mostly, because this is cool. But I also think the libxcb model is harder to reason about.

For example, if I don't explicitly check, i would get errors as events, which is hard to correlate back to the original request; if I do check, then I introduce waits in the code. And it's difficult to express "i want to ignore the errors". While async/await lets me do these naturally.

Also, there are problems like:

let cookie1 = conn.create_window(COPY_DEPTH_FROM_PARENT, win_id, screen.root, 0, 0, 100, 100, 0, WindowClass::InputOutput,
                       0, &CreateWindowAux::new().background_pixel(screen.white_pixel))?;
conn.map_window(win_id)?.check();
cookie1.check(); // <- can i still do this?

I know this is a synthetic example, but the point is that, it seems to me, the cookies are quite leaky abstractions, and is very "un-cool".

@yshui
Copy link
Author

yshui commented Mar 30, 2020

Also, there is the argument of if we are using Rust, we should make the best use of the tools Rust provides.

@psychon psychon added enhancement New feature or request stable API Things that might require API changes labels Mar 30, 2020
@psychon
Copy link
Owner

psychon commented Mar 30, 2020

Hm... doing this properly would require also making the actual sending of requests async (via some AsyncWrite trait. So, basically "everything" would need to be duplicated, I think.
And I guess using an AsyncWrite from multiple futures concurrently is a Bad Idea (tm), so doing this properly would require an async mutex and going async-std...

@yshui
Copy link
Author

yshui commented Mar 30, 2020 via email

@psychon
Copy link
Owner

psychon commented Mar 31, 2020

Well... after thinking a bit more about it, I am not quite sure how a X11 connection can be shared between futures without much copying and with async request sending.

Let's say future A wants to send a big request to the X11 server. The kernel accepts half of the data, so the rest needs to be resubmitted later. The "no copying" requirement means that now all other futures are blocked on this one finishing sending its request (which is fine - in x11rb, something similar happens with threads instead of futures). However, now it is possible that future A is just dropped instead of polled again. Due to the "no copying" rule, we now got a big problem. (And this situation cannot even be detected - future A could be leaked instead of dropped).

The only ways around this that I see are:

  • request sending copies the whole request to a single buffer (possibly only after it was partially sent, but this is still ugly)
  • request sending takes ownership of all the data that is being sent (IMO results in an ugly API and just moves the "copy everything" problem to the caller)
  • Connections cannot be shared (not between futures and not between threads, i.e. sending a request required &mut and even then it is still possible to get access to a connection after a request was "half sent")
  • Uhm... that's it actually.

For your code example above: Well, you just replaced reply() with await, so there is not much of a difference ergonomic-wise.

Also:

let cookie1 = conn.create_window(COPY_DEPTH_FROM_PARENT, win_id, screen.root, 0, 0, 100, 100, 0, WindowClass::InputOutput,
                       0, &CreateWindowAux::new().background_pixel(screen.white_pixel))?;
conn.map_window(win_id)?.check();
cookie1.check(); // <- can i still do this?

Yes you can. Everything else would be quite a bad API (because it allows simple mis-use). In fact, you should do it this way to save a round-trip to the X11 server (both requests are sent together instead of one after the other).

Actually... going the "Future way" means that people will most likely await eagerly, which can lead to extra round-trips to the X11 server.

For example, if I don't explicitly check, i would get errors as events, which is hard to correlate back to the original request;

My plan with #314 is that the error would at least tell you which request failed (as a string). That way, you only have to search for calls to foo(), which already narrows down the search a lot.

I guess I could also add some debugging tips to doc/. Well, "some". My only tip is to add logging of Cookie.sequence_number() in your code. That number will match the sequence number of the error you get (for ergonomics: This could be done in a macro. Something like log_request!(conn.map_window(win)););

@psychon
Copy link
Owner

psychon commented Apr 3, 2020

Random idea: Add a function on cookies that gets a callback that is called when the reply is available. That's basically what is needed to implement Future for replies. No idea what this really would be useful for, so I'm not going to add it, but it's, well, a random idea.

fn call_when_reply(self, cb: impl FnOnce(ReplyType) + 'static)

(The name of the function is intentionally bad and needs improvement)

@dalcde
Copy link
Contributor

dalcde commented Apr 3, 2020 via email

@yshui
Copy link
Author

yshui commented Apr 4, 2020

request sending takes ownership of all the data that is being sent

I don't think this is a bad option. the ownership can be returned when the future resolves.

For your code example above: Well, you just replaced reply() with await, so there is not much of a difference ergonomic-wise.

Because they are doing different things. If you want to replicate the .await behavior with the current model, it is going to be very non-ergonomic.

Yes you can. Everything else would be quite a bad API

Ok, I wasn't aware of that. How is it implemented? Does the library keep the reply indefinitely?

@psychon
Copy link
Owner

psychon commented Apr 4, 2020

I don't think this is a bad option. the ownership can be returned when the future resolves.

Replacing &[u8] with Vec<u8> is quite limiting. E.g. it loses the ability to send parts of an array and everything must be copied into a Vec, even if it comes from something else.

Does the library keep the reply indefinitely?

Yup, and so does libxcb. Although with libxcb it is a lot easier to get a memory leak this way since Rust has Drop.

@psychon
Copy link
Owner

psychon commented Apr 4, 2020

How is it implemented? Does the library keep the reply indefinitely?

Well, some more information about this: Every request has a sequence number. The first one that the client sends (the connection setup) is request 0, the next request has seqno 1 etc. A cookie is (in both XCB and x11rb) basically a wrapper around the seqno of the request.

The X11 protocol itself uses 16 bit sequence numbers, but since the server handles requests in-order, one can reconstruct a larger sequence number from incoming "stuff" (the client just has to make sure to send a request with a reply at least every (1 << 16) - 1 requests; for this purpose GetInputFocus requests are inserted).

XCB and x11rb use 64 bit sequence numbers (well, XCB originally used int at least in its public API, but for Xlib a uint64_t API was added). It is essentially impossible to overflow a u64 here. Just for _ in 0..=u64::max_value() {} would already take forever (if the optimiser does not optimise it away).

Thus, every request has a unique number that identifies it and getting a reply is basically a lookup in a HashMap<u64, Vec<u8>> (but x11rb::rust_connection actually uses a VecDeque instead... why? Well, this could actually be faster...).

@psychon
Copy link
Owner

psychon commented May 6, 2020

(Very) loosely related: https://github.com/Diggsey/posts/tree/master/async-mutexes
(An async mutex protects something "inner". In x11rb, we have basically a mutex in that only one "actor" (to avoid the words "thread" and "future") can sent a request at the same time.)

@psychon
Copy link
Owner

psychon commented Jan 7, 2021

If we want to go down this route (currently I don't), I think it would be best to split up the crate. Something like x11rb-protocol contains all the structs, parsing and serialising code. Basically: Everything that deals with bytes, but no I/O. Around this inner crate, one could then build multiple "I/O stuff" libraries, like x11rb for blocking I/O with the std types and x11rb-async for non-blocking I/O with AsyncRead/Write. There would also have to be some generated code in these leave crates for ergonomic request sending, but that'd still be a lot less generated code than in x11rb-protocol.

A bit of context: I have been thinking about how to integrate X11 stuff with calloop, but haven't found a good solution. x11rb-protocol would make x11rb-calloop (hopefully) simple. See Smithay/smithay#254 for some context.

@psychon
Copy link
Owner

psychon commented Jan 8, 2021

I hacked together an ugly hack ontop of Smithay/smithay#254 that provides callback-based request handling. Consider it a proof of concept.

There is an on_reply() function on Cookies to provide a callback for when the reply arrives. Everything else is ugly. And dealing with callbacks means we end up in callback hell...

If we add something like SerializeFd on request structs, it could be made less ugly. Right now there is no public API to serialise a request into bytes and the RequestConnection trait assumes synchronous/blocking I/O.

The best thing would be to split up the crate, I guess. Something like x11rb-protocol contains all the parse/serialise stuff. Other crates use that and provide synchronous, asynchronous or callback-based I/O based on that.

I still don't want to go down this route properly. A proof of concept is enough for now. The proof of concept deals with the problems with lots of unimplemented!().

@psychon psychon added P5 Priority Nice-to-have wontfix This will not be worked on labels Apr 10, 2021
@yshui
Copy link
Author

yshui commented Jan 28, 2022

So it's been ~2 years and I've gotten more familiar with Rust async, and I was looking into this again, reading the code trying to figure out the least intrusive way of adding async.

I came across a question that's tangentially related, but I don't want to open an issue just to ask that, so I will ask it here.

The function RustConnection::read_packet_and_enqueue returns without putting packets into the queue in 2 cases:

  1. when there is nothing to read
  2. when it can't lock packet_reader

And it doesn't differentiate these 2 cases, this can cause a problem for multithreaded programs when:

  1. thread 1 poll()s the fd, poll returns and it calls poll_for_event.
  2. thread 2 enters read_packet_and_enqueue before thread 1 can acquires packet_reader.
  3. thread 1 returns without any event because locking packet_reader failed.
  4. thread 2 finishes reading and enqueuing, releases packet_reader
  5. thread 1 poll()s again, thinking poll_for_event returned nothing because there was nothing to read.

At this point thread 1 is blocked even though there are packets in the queue.

I didn't read the whole codebase through so maybe I am missing something and this isn't a problem. Curious to know what do you think about this?

@notgull
Copy link
Collaborator

notgull commented Jan 28, 2022

All of the connections involved implement AsRawFd, would it not be easier to just wrap it up in an AsyncFd (or async_io::Async depending on your flavor)? Then implement the current methods as ones that wait on readable() or writable() instead of polling?

I think it'd be relatively easy for the pure Rust connection, since the current methods would just replace poll() with awaiting on the aforementioned futures. For the XCB connection, I know xcb_poll_for_event() exists for checking to see if an event has arrived, I'm not sure if there's a good way to poll for a reply.

@psychon
Copy link
Owner

psychon commented Jan 28, 2022

@yshui Who exactly is calling poll() in your example? Is this some code outside of x11rb? If so, then that code is buggy. Only single-threaded applications can directly poll() the FD. Everything else is inherently racy. See https://docs.rs/x11rb/latest/x11rb/event_loop_integration/index.html (I didn't re-read this page; I just know that I wrote it once upon a time).

If you try to use poll() to integrate a X11 connection with your main loop, there is always the possible that something like poll for event returned None, so you call poll(). But between these two calls, another thread can read an event from the FD that was just received.

(All of this applies to libxcb as well, I think)

If I misunderstood you: Sorry. Please clarify where this poll() call is.

@notgull Yup, I guess that would more-or-less work. But writable() is a problem since futures can be dropped at any "wait point" (I forgot how these are officially called). With the sync-version, one can be sure that the code will eventually continue running.

Actually, async reading is the easy part, I think. In fact, I guess I would just spawn another task that does nothing else than reading from the connection and putting packets into some kind of queue. This would also work in the sync case: Just spawn a thread. I just didn't like the idea of spawning threads behind people's back, so lots of complicated code was necessary.

Anyway, the hard part with async is writing: X11 requests can be quite large (xdpyinfo | grep 'maximum request size says my X11 server accepts up to 16 MiB in a single request). At this point, one easily runs into lifetime problems. And the API user can drop futures "half way through" bread-graphics/breadx#20. And all of this even before one tries to share the X11 connection between multiple users.

I'm not sure if there's a good way to poll for a reply.

It's called xcb_poll_for_reply() :-)

@yshui
Copy link
Author

yshui commented Jan 28, 2022

@psychon Thanks for the answer! You understood it correctly. You are right that multithreaded use of the connection like this is racy. My example probably doesn't make much sense, but I came up with it because I was thinking in a more async context.

Each async tasks behaves somewhat like an individual thread. And they might need to individually call poll_for_event. They will return Pending and go to sleep if poll_for_event returns nothing, which is equivalent to a thread calling poll(). If they goes to sleep not because of the fd being empty, but because they couldn't aquire packet_reader, then they will have the problem I described.

The solution to this would be making the packet_reader lock async aware (e.g. tokio::sync::Mutex), so the async task could be woken up when the lock is released. Downside is this makes adding async support much more intrusive :'( 😢


As for the buffer problem, personally I think copying the buf isn't all that bad (the max size is 16M, but realistically most of the requests should be small, unless you are sending a 1000x1000 bitmap over the wire). But even if you don't want to do that, I can settle for sending the request synchronously. Receiving the reply/events asynchronously is what I really want.

@psychon
Copy link
Owner

psychon commented Jan 28, 2022

If they goes to sleep not because of the fd being empty, but because they couldn't aquire packet_reader, then they will have the problem I described.

Yeah, true. Which is why wait_for_event() calls this function with BlockingMode::Blocking. In this case, the code will be woken up once whoever is holding the lock is done:

Ok(self.reader_condition.wait(inner).unwrap())

Something similar would be needed for "the async world".

I guess the best approach would be to start from "almost zero" with a new connection (the existing RequestConnection trait cannot be used anyway) and freely copy from the existing code. I would start a new task on the runtime that does all the reading and any packets are just send to some async channels. All events go to one channel and replies need to have their corresponding channel in some kind of map or queue. Hopefully, this extra task/job simplifies things a lot.

Hm.... we already have the Request trait. I guess adding something like fn serialize(self) -> BufWithFds<PiecwiseBuf<'static>> could go a long way into separating x11rb into some "x11 packets"-layer and the actual (blocking) connection.... or so...

@notgull
Copy link
Collaborator

notgull commented Jan 28, 2022

@psychon I don't think this is really a problem. At an await point, yes, a future can be dropped. The solution, then, is to "poison" the display to prevent further communication. libxcb does something similar on error.

Although, I have to wonder if it's possible to recover from, if buffering is an option?

@psychon
Copy link
Owner

psychon commented Jan 28, 2022

Here is a proof of concept for my channel idea: https://gist.github.com/psychon/0b8b59b30d3253254b37e6267dbee471
It uses:

[dependencies]
tokio = { version = "1", features = ["full"] }
x11rb = "0.9"

All of this is basically untested. But this provides API for sending requests, receiving replies, and receiving events. All of these only need a shared reference. I tested this against x11rb's xtrace-example.

Proper error handling is left as an exercise for the reader. Doing something sane when the connection is dropped (stopping the reading and writing tasks) is left as an exercise for the reader.

I wonder why the serialize functions in x11rb are not pub. Even better would be to have those in some trait. Perhaps ExtensionRequest and CoreRequest? With those, one could turn something like this hack into a proper "thing". One could even separate the generated code in x11rb into its own crate and have two crates x11rb-sync and x11rb-async based on that.

One could of course also easily use @notgull 's poisoning idea instead of the channels for writing. For reading, I feel like the channels and the extra tasks simplify things a lot.

(Oh, hey, @notgull also just handled the "shut everything down on drop" problem: When the write-end of the connection lives in the connection, it is also dropped. This shutdown()s the X11 connection, thus the X11 server would also close the connection and the reading thread gets an EOF.)

@psychon
Copy link
Owner

psychon commented Jan 30, 2022

So it's been ~2 years and I've gotten more familiar with Rust async

@yshui So, what's your position on tokio vs async-std vs anything else? Any preferences? I looked at this briefly, wanted to do everything perfect and abstract over all possible runtimes, but... well, that's not really possible, I guess. I feel like I am using the wrong approach when I type the following:

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum Runtime {
    Tokio,
    AsyncStd,
}

@psychon
Copy link
Owner

psychon commented Jan 30, 2022

@notgull Basically same question to you. I took a look at breadx and it has async and tokio-support features. However, I didn't really figure out how what this actually does. Could you enlighten me? Is this using some "bridge" between the two or is there native support for either? Thanks!

@notgull
Copy link
Collaborator

notgull commented Jan 30, 2022

The way I did it was that, if you ran with async, it would implement the traits necessary (like AsyncConnection) on top of the async-std primitives and implement NameConnection (the breadx equivalent of DefaultStream) in terms of those primitives. However, if Tokio-support is enabled, the traits are also implemented for Tokio primitives, and NameConnection uses tokio promitives instead.

This isn’t an ideal solution. What I’m working towards now is doing away with AsyncConnection altogether and implementing AsyncDisplay on top of async_io::Async<impl Display> or tokio::AsyncFd<impl Display>, which would hopefully make the runtimes a bit less integrated.

@yshui
Copy link
Author

yshui commented Jan 30, 2022

@psychon

what's your position on tokio vs async-std vs anything else?

So it's possible to write this in a runtime agnostic way. the standard library provides a Waker mechanism which is used by the runtimes. In theory we can run a thread that poll()s the X file descriptor and wakes up the corresponding Wakers, and that should work with any runtime.

@notgull
Copy link
Collaborator

notgull commented Jan 30, 2022

In theory we can run a thread that poll()s the X file descriptor and wakes up the corresponding Wakers

So, a runtime? The main advantage of runtimes is that they allow for cooperative scheduling, and that advantage is lost in rolling your own like this.

@yshui
Copy link
Author

yshui commented Jan 30, 2022

@notgull this is more like rolling our own reactor (although this term is no longer used).

since we are only managing a single connection I don't think this is so bad. the async tasks are still scheduled by the runtime, it's just we need to manage the connection ourselves.

@notgull
Copy link
Collaborator

notgull commented Jan 30, 2022

@yshui My main concern is, is the advantage of runtime independence worth the overhead that would be incurred by rolling our own thread? Not only because of the thread overhead, but on Linux (which is basically the main platform X11 runs for) most runtimes use epoll, which has significant advantages over poll especially when more than one I/O device is involved.

@yshui
Copy link
Author

yshui commented Jan 30, 2022

@notgull most async runtimes use thread anyway (well, i guess some let you choose, but most of the times you will be using threads).

I don't think have a polling thread would be that much overhead anyways. And we only have a single fd to worry about so I think poll will be fine. Unless the user wants to make a whole bunch of connections to the X server, which would be a weird way to interact with the X server, this shouldn't cause any concern.

@notgull
Copy link
Collaborator

notgull commented Jan 30, 2022

most async runtimes use thread anyway

Threads, technically. But on top of that, there’s usually several other async runtime components.

And we only have a single fd to worry about so I think poll will be fine.

We only have one fd, but what about other packages? Keep in mind X11 is used as a graphical front end for programs that are intended to do other things as well, like write to files or communicate with servers. Having one thread that polls all of those other fds and another just for X11 seems a little silly.

I don't think have a polling thread would be that much overhead anyways.

It has at least twice as much overhead, since you're now calling poll in addition to epoll.

In addition, I’m sure maintaining an entire runtime/reactor just for async X11 is out of the scope for this crate anyways.

@yshui
Copy link
Author

yshui commented Jan 30, 2022

In addition, I’m sure maintaining an entire runtime/reactor just for async X11 is out of the scope for this crate anyways.

It would be just a thread that calls poll() and reads the data and feed it into the rest of the async support code which we have to write either way. I bet it would be less than 100 loc.

Having one thread that polls all of those other fds and another just for X11 seems a little silly.

the alternative is to:

  1. force the user to use the runtime we choose.
  2. or force the user to run multiple runtimes at the same time (the one x11rb uses + the one they uses for the rest of the program).

So, trade-offs.

twice as much overhead

I don't see how would that double the overhead.


well I guess the best solution is decoupling the I/O from the rest of the protocol. this way the user could plug our fd into whatever runtime and drive x11rb from there. but just have an extra thread seems so much simpler.

@psychon
Copy link
Owner

psychon commented Jan 30, 2022

My PoC hack above does not only use tokio for I/O, but also uses channels and mutexes from tokio. Without these, I do not really see how requests sending could work with &self instead of &mut self.

Reimplementing these should be relative easy to do, but also feels like "not invented here"-syndrome. Reimplementing the wheel cannot be the best answer to this ecosystem split.

I also thought "just use a trait and let users plug in an implementation", but, well, async fn in traits... :-(

(Hm, this could work with hand-written futures... Is this a better idea than writing a new runtime? And Pin as receiver type was even stabilised back in 1.33: rust-lang/rust#56805 and rust-lang/rust#55786 )

@yshui
Copy link
Author

yshui commented Jan 30, 2022

I think discussing it like this is kinda moot because there could totally be things I didn't think of that makes my approach 10x harder.

I wish I could put in time to make a PoC but right now I am working on something else.

Edit: @psychon yeah I forgot about the mutexes.

async fn in traits... :-(

There's async-trait :-) It's not "real" async fn in traits but it does the wrapping and stuff for you.

@notgull
Copy link
Collaborator

notgull commented Jan 30, 2022

Hm, this could work with hand-written futures

This is actually what I do at the moment for breadx, for reference, and I plan to continue doing it into future versions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request P5 Priority Nice-to-have stable API Things that might require API changes wontfix This will not be worked on
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants