From 526ccfca577db149f1af08ab22224bd2820f0e04 Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 17 Feb 2021 11:33:37 +1000 Subject: [PATCH 1/3] Wake waiting tower-batch tasks on drop When other tower-batch tasks drop, wake any tasks that are waiting for a semaphore permit. Otherwise, tower-batch can hang. We currently pin tower in our workspace to: d4d1c67 hedge: use auto-resizing histograms (tower-rs/tower#484) Copy tower/src/semaphore.rs from that commit, to pick up tower-rs/tower#480. --- tower-batch/src/semaphore.rs | 51 ++++++++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/tower-batch/src/semaphore.rs b/tower-batch/src/semaphore.rs index cdab56d0784..574c398aee5 100644 --- a/tower-batch/src/semaphore.rs +++ b/tower-batch/src/semaphore.rs @@ -1,6 +1,8 @@ -// Copied from tower/src/semaphore.rs -// When/if tower-batch is upstreamed, delete this file -// and use the common tower semaphore implementation +// Copied from tower/src/semaphore.rs, commit: +// d4d1c67 hedge: use auto-resizing histograms (#484) +// +// When we upgrade to tower 0.4, we can use tokio's PollSemaphore, like tower's: +// ccfaffc buffer, limit: use `tokio-util`'s `PollSemaphore` (#556) pub(crate) use self::sync::OwnedSemaphorePermit as Permit; use futures_core::ready; @@ -9,7 +11,7 @@ use std::{ future::Future, mem, pin::Pin, - sync::Arc, + sync::{Arc, Weak}, task::{Context, Poll}, }; use tokio::sync; @@ -20,13 +22,32 @@ pub(crate) struct Semaphore { state: State, } +#[derive(Debug)] +pub(crate) struct Close { + semaphore: Weak, + permits: usize, +} + enum State { - Waiting(Pin + Send + Sync + 'static>>), + Waiting(Pin + Send + 'static>>), Ready(Permit), Empty, } impl Semaphore { + pub(crate) fn new_with_close(permits: usize) -> (Self, Close) { + let semaphore = Arc::new(sync::Semaphore::new(permits)); + let close = Close { + semaphore: Arc::downgrade(&semaphore), + permits, + }; + let semaphore = Self { + semaphore, + state: State::Empty, + }; + (semaphore, close) + } + pub(crate) fn new(permits: usize) -> Self { Self { semaphore: Arc::new(sync::Semaphore::new(permits)), @@ -76,3 +97,23 @@ impl fmt::Debug for State { } } } + +impl Close { + /// Close the semaphore, waking any remaining tasks currently awaiting a permit. + pub(crate) fn close(self) { + // The maximum number of permits that a `tokio::sync::Semaphore` + // can hold is usize::MAX >> 3. If we attempt to add more than that + // number of permits, the semaphore will panic. + // XXX(eliza): another shift is kinda janky but if we add (usize::MAX + // > 3 - initial permits) the semaphore impl panics (I think due to a + // bug in tokio?). + // TODO(eliza): Tokio should _really_ just expose `Semaphore::close` + // publicly so we don't have to do this nonsense... + const MAX: usize = std::usize::MAX >> 4; + if let Some(semaphore) = self.semaphore.upgrade() { + // If we added `MAX - available_permits`, any tasks that are + // currently holding permits could drop them, overflowing the max. + semaphore.add_permits(MAX - self.permits); + } + } +} From 36e174e4bf920c5c658c048b2ede73d7f9e4655a Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 17 Feb 2021 11:39:19 +1000 Subject: [PATCH 2/3] Ignore clippy lints on copied code --- tower-batch/src/semaphore.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tower-batch/src/semaphore.rs b/tower-batch/src/semaphore.rs index 574c398aee5..c9f0adadafe 100644 --- a/tower-batch/src/semaphore.rs +++ b/tower-batch/src/semaphore.rs @@ -4,6 +4,9 @@ // When we upgrade to tower 0.4, we can use tokio's PollSemaphore, like tower's: // ccfaffc buffer, limit: use `tokio-util`'s `PollSemaphore` (#556) +// Ignore lints on this copied code +#![allow(dead_code)] + pub(crate) use self::sync::OwnedSemaphorePermit as Permit; use futures_core::ready; use std::{ From f670265e3bb82205afac3e805d9aac2f1fdf2514 Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 17 Feb 2021 11:57:51 +1000 Subject: [PATCH 3/3] Add a missing Sync bound --- tower-batch/src/semaphore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tower-batch/src/semaphore.rs b/tower-batch/src/semaphore.rs index c9f0adadafe..220fcc72fee 100644 --- a/tower-batch/src/semaphore.rs +++ b/tower-batch/src/semaphore.rs @@ -32,7 +32,7 @@ pub(crate) struct Close { } enum State { - Waiting(Pin + Send + 'static>>), + Waiting(Pin + Send + Sync + 'static>>), Ready(Permit), Empty, }