Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove WaitMap dependency #1183

Merged
merged 8 commits into from
Jan 30, 2024
Merged

Remove WaitMap dependency #1183

merged 8 commits into from
Jan 30, 2024

Conversation

charliermarsh
Copy link
Member

Summary

This is an attempt to #1163 by removing the WaitMap and gaining more granular control over the values that we hold over await boundaries.

@charliermarsh charliermarsh requested a review from konstin January 30, 2024 05:02
@charliermarsh
Copy link
Member Author

@konstin - I haven't done any benchmarking, but this does consistently not-deadlock for me.

@konstin
Copy link
Member

konstin commented Jan 30, 2024

Current dependencies on/for this PR:

This stack of pull requests is managed by Graphite.

@konstin
Copy link
Member

konstin commented Jan 30, 2024

(Sorry for pushing this branch, i misread the updated graphite docs)

Copy link
Member

@konstin konstin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removes the deadlock! Benchmarks in the upstack PR are looking good. My main concern is deadlock-warning in the DashMap::entry api

Value::Waiting(notify) => {
let notify = notify.clone();
drop(entry);
notify.notified().await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert that on drop, nobody is waiting anymore?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you say more?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a task waiting on some request but there is no task providing it (because we're done, we're dropping the once map), this sounds like a bug.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! I see now. Yes. On Drop of the OnceMap.

I agree it might be a bug... but I could also see that maybe it isn't? Maybe the caller has finished what they needed and didn't need to wait for everything to finish. And it is perhaps hard to distinguish between "there is no task providing it" and "there is a task providing it, but it hasn't done so yet." I defer to y'all here because I don't have enough context on how this is used. But I absolutely agree that if you can assert that any Waiting values are a bug, then it might be nice to assert it on Drop. (Although note that a panic during Drop is an instant abort.)

@@ -406,6 +405,9 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
if self.index.distributions.register_owned(dist.package_id()) {
priorities.add(dist.name().clone());
request_sink.unbounded_send(Request::Dist(dist))?;

// Yield, to allow subscribers to continue.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add here that we specifically need this because the channel is sync

let mut lock = self.started.lock().unwrap();
if lock.contains(key) {
return false;
let entry = self.items.entry(key.to_owned());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs for entry say:

Locking behaviour: May deadlock if called when holding any sort of reference into the map.

This assumes we never actually call this function from two threads at the same time. This is the same call that WaitMap previously deadlocked in.

If we know that the map will stay on the main thread anyway, we can use Cell<FxHashMap<K, V>> instead of DashMap. If we want share the index between threads (i don't know that - it depends on how we want to structure our async code) we'll need something like RwLock<FxHashMap<K, V>>. DashMap is a Box<[RwLock<HashMap<K, V, S>>]> internally (https://docs.rs/dashmap/latest/src/dashmap/lib.rs.html#88-92) and our lock times are minimal, so i expect no perf difference. DashMap is missing a get_or_insert() -> bool, an "atomic" compare-and-swap option, that both we and WaitMap would need to be correct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the warning is about calling entry from two different threads simultaneously. That's normal and should be fine, otherwise it'd be a pretty poor concurrent hashmap. The warning is, AIUI, about calling entry (or get) while you have a reference to a previous call in hand in the same thread. When multiple threads are calling it, one will (hopefully) eventually make progress, drop the reference and unblock the other thread. But if you do let entry = self.items.entry(foo); and then self.items.get(foo) while entry is still alive, then that get call seems likely to block waiting for entry to drop. Which, of course, will never happen because the thread is blocked on get.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying, this makes much more sense! I was indeed assuming that dashmap was behaving rather poorly, but this makes more sense and works for us if we don't have any await points while we hold the entry.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this longer, was the problem with waitmap maybe merely that we there was some yielding while holding an entry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@konstin - I spent a while investigating that, and trying to make changes to the resolver to solve it, but I ultimately couldn't figure out where it might be.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also briefly looked at waitmap's implementation and nothing jumped out at me immediately.

self.wait_map.insert(key, value);
if let Some(Value::Waiting(notify)) = self.items.insert(key, Value::Filled(Arc::new(value)))
{
notify.notify_waiters();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also yield here? Notifying is surprisingly sync, so the subscribers won't get notified immediately until the next yielding of the task that called done. (This would make the function async, but i think it's correct that all operations on the OnceMap should be async)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notifying is surprisingly sync, so the subscribers won't get notified immediately until the next yielding of the task that called done.

Hmmm. Are you sure? We're using the multi-threaded runtime for tokio right? If so, AIUI, other waiters could be notified and acting on it before notify.notify_waiters() even finishes.

Copy link
Member

@konstin konstin Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that's correct, but our receivers are all on the some thread atm as far as i can see.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason the waiters need to wake up sooner?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #1163 i got a speedup from 30ms to 20ms for warm cache jupyter by inserting one tokio::task::yield_now().await, now i'm motivated to avoid these kinds of bottlenecks.

Copy link
Member

@BurntSushi BurntSushi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great. I like the use of Arc to simplify things here.

let mut lock = self.started.lock().unwrap();
if lock.contains(key) {
return false;
let entry = self.items.entry(key.to_owned());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the warning is about calling entry from two different threads simultaneously. That's normal and should be fine, otherwise it'd be a pretty poor concurrent hashmap. The warning is, AIUI, about calling entry (or get) while you have a reference to a previous call in hand in the same thread. When multiple threads are calling it, one will (hopefully) eventually make progress, drop the reference and unblock the other thread. But if you do let entry = self.items.entry(foo); and then self.items.get(foo) while entry is still alive, then that get call seems likely to block waiting for entry to drop. Which, of course, will never happen because the thread is blocked on get.

crates/once-map/src/lib.rs Outdated Show resolved Hide resolved
crates/once-map/src/lib.rs Outdated Show resolved Hide resolved
self.wait_map.insert(key, value);
if let Some(Value::Waiting(notify)) = self.items.insert(key, Value::Filled(Arc::new(value)))
{
notify.notify_waiters();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notifying is surprisingly sync, so the subscribers won't get notified immediately until the next yielding of the task that called done.

Hmmm. Are you sure? We're using the multi-threaded runtime for tokio right? If so, AIUI, other waiters could be notified and acting on it before notify.notify_waiters() even finishes.

crates/once-map/src/lib.rs Outdated Show resolved Hide resolved
Value::Waiting(notify) => {
let notify = notify.clone();
drop(entry);
notify.notified().await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you say more?

crates/once-map/src/lib.rs Outdated Show resolved Hide resolved
crates/once-map/src/lib.rs Show resolved Hide resolved
crates/once-map/src/lib.rs Outdated Show resolved Hide resolved
@charliermarsh
Copy link
Member Author

Thank you both so much for the close read, I needed it!

dashmap::mapref::entry::Entry::Occupied(_) => false,
dashmap::mapref::entry::Entry::Vacant(entry) => {
entry.insert(Value::Waiting(Arc::new(Notify::new())));
true
Copy link
Member

@zanieb zanieb Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could return some Handle object that we could use to enforce the invariant, right? e.g. done would consume a handle which would have a reference to the key instead and we'd be able to check if the handle was not used on drop.

If this method returns true, you need to start a job and call [OnceMap::done] eventually
or other tasks will hang.

Not sure how problematic that is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that would be a much nicer API. Will consider it in a future PR, the dataflow might be tedious for now.

@charliermarsh charliermarsh merged commit 3f5e730 into main Jan 30, 2024
5 checks passed
@charliermarsh charliermarsh deleted the charlie/wait branch January 30, 2024 20:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants