-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(async client): refactor background task #1145
Conversation
Split send and receive to separate tasks to support multiplexing reads/writes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good to me overall, and the separate taks looks pretty clean!
If the ThreadSafeRequestManager
didn't need to lock over await points then maybe a standard mutex would be a better choice (a safe way to do this would be to make the mutex an integral part of RequestManager
so that each function only locks internall for as long as it needs and it's impossible to hold a lock over any await points or whatever).
I'd be interested to know whether this has any impact on benchmarks in either direction :)
core/src/client/async_client/mod.rs
Outdated
} | ||
} | ||
_ = close_tx.closed() => { | ||
break Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could some tracing debug logs be useful on the break branches to know the reason of the send_task
exit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added Client dropped
logs when if either the send or receive task returns Ok(())
ok?
These channels should be closed if send/receive tasks are closed, if something terminates with an error that must occur before this send fails.
However, if it's bug somewhere that could be tricky detect by looking at the logs.
0509338
to
05bb6c8
Compare
This reverts commit 49edaee.
|
||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
if self.futs.is_empty() { | ||
return Poll::Pending; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point, the waker is not handed anywhere and so if any other futures are added, they will be ignored.
I think that this is ok though because you just call next()
on this in a loop, so it'll get polled at least once per loop anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried using pure FutureUnordered
and then the select loop will be busy that it returns Poll::Ready i.e None
when it's empty and steal CPU cycles from more important stuff, that's the reason why I added it...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, if it's empty and futures are added those will read on the next iteration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see, so when it's empty it'll spam Poll::Ready(None)
whenever polled; yeah that sucks. Your version looks good to me offhand in the current code now, though it's a little scary that if used wrong it'll just .await
forever, so a small footgun to be wary of for future us! Perhaps it's impossible to misuse though given it needs &mut self
, but I havent convinced myself 100% :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this should be fine for now, because we use the read_task to drive this:
read_task
select (server_receiver, closed, pending_unsub)
server_receiver -> if Ok(Some(m)) = handle_backend_messages
pending_unsub.push( to_send_task.send(m) )
One case where this might be problematic is when:
- server_receiver and closed futures are not ready
- the pending_unsub is empty and thus will return
Poll::Pending
and I believe will never woke up for the current iteration loop because there's nothing that will wake up thecx.waker
again of the future - then pending_unsub has some elements added to it
But sincepending_unsub
having elements added is dependent on server_receiver
being triggered first; it should be fine here as long as we don't save the current pending_unsub
future that returned Pending
and instead call next directly in the loop future::select(rx_or_closed, pending_unsubscribes.next())
Let me know if my reasoning is right 🙏
Indeed there might be dragons if we change something in the read_task
(maybe a comment to not save pending_unsub.next()
future is enough here).
We may be able to handle this by:
- if futs.is_empty(), then we could save the cx
- when adding a new future, if we have
Some(cx)
we could wake up the waker - preserve the same pending_unsub.next() between iterations similar to
let rx_item = backend_event.next();
Not quite sure if worth going that way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, you are correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did what you suggested it didn't require much code but instead I had to clone the Waker because Context
is !Send
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Save the Waker
and not the context; that's sendable :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah that's what you did; I misread the earlier comment and github was showing me old stuff, whoops!
There's a lot to work through here, but offhand it looks good to me now; no more unbounded queues from what I could see, and so backpressure looks like it'll work with sending and reading, and the two queues shouldn't block progrss from eachother I think now, so nice one! Couple of nits above but I'm happy to see this merge, and if it does well enough on burnins then it's an improvement from where we were regardless :) |
}); | ||
|
||
let closed = close_tx.closed(); | ||
let pending_unsubscribes = MaybePendingFutures::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do wonder, is this only used for unsubscribes
? Wouldn't the receiver: R
also provide pings and plain messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is really an edge-case if a subscription fails
i) a subscription call is answered but the client dropped the subscription (we want to unsubscribe then)
ii) if the subscription couldn't keep it with the server.
Thus, this really just is a matter to unsubscribe then so that no further message are received for that particular subscription...
This looks good to me! I believe the backpressure would work fine, nice job here! |
Split send and receive to separate tasks to support multiplexing reads and writes
Summary of what was wrong before:
Benches