diff --git a/tower-batch/src/semaphore.rs b/tower-batch/src/semaphore.rs index cdab56d0784..220fcc72fee 100644 --- a/tower-batch/src/semaphore.rs +++ b/tower-batch/src/semaphore.rs @@ -1,6 +1,11 @@ -// Copied from tower/src/semaphore.rs -// When/if tower-batch is upstreamed, delete this file -// and use the common tower semaphore implementation +// Copied from tower/src/semaphore.rs, commit: +// d4d1c67 hedge: use auto-resizing histograms (#484) +// +// When we upgrade to tower 0.4, we can use tokio's PollSemaphore, like tower's: +// ccfaffc buffer, limit: use `tokio-util`'s `PollSemaphore` (#556) + +// Ignore lints on this copied code +#![allow(dead_code)] pub(crate) use self::sync::OwnedSemaphorePermit as Permit; use futures_core::ready; @@ -9,7 +14,7 @@ use std::{ future::Future, mem, pin::Pin, - sync::Arc, + sync::{Arc, Weak}, task::{Context, Poll}, }; use tokio::sync; @@ -20,6 +25,12 @@ pub(crate) struct Semaphore { state: State, } +#[derive(Debug)] +pub(crate) struct Close { + semaphore: Weak, + permits: usize, +} + enum State { Waiting(Pin + Send + Sync + 'static>>), Ready(Permit), @@ -27,6 +38,19 @@ enum State { } impl Semaphore { + pub(crate) fn new_with_close(permits: usize) -> (Self, Close) { + let semaphore = Arc::new(sync::Semaphore::new(permits)); + let close = Close { + semaphore: Arc::downgrade(&semaphore), + permits, + }; + let semaphore = Self { + semaphore, + state: State::Empty, + }; + (semaphore, close) + } + pub(crate) fn new(permits: usize) -> Self { Self { semaphore: Arc::new(sync::Semaphore::new(permits)), @@ -76,3 +100,23 @@ impl fmt::Debug for State { } } } + +impl Close { + /// Close the semaphore, waking any remaining tasks currently awaiting a permit. + pub(crate) fn close(self) { + // The maximum number of permits that a `tokio::sync::Semaphore` + // can hold is usize::MAX >> 3. If we attempt to add more than that + // number of permits, the semaphore will panic. + // XXX(eliza): another shift is kinda janky but if we add (usize::MAX + // > 3 - initial permits) the semaphore impl panics (I think due to a + // bug in tokio?). + // TODO(eliza): Tokio should _really_ just expose `Semaphore::close` + // publicly so we don't have to do this nonsense... + const MAX: usize = std::usize::MAX >> 4; + if let Some(semaphore) = self.semaphore.upgrade() { + // If we added `MAX - available_permits`, any tasks that are + // currently holding permits could drop them, overflowing the max. + semaphore.add_permits(MAX - self.permits); + } + } +}