Skip to content

Commit

Permalink
concurrency-limit: Drop permit on readiness (#751)
Browse files Browse the repository at this point in the history
The concurrency-limit module holds permits until the response future is
dropped. This is fine in theory, but we have reports of clients hitting
the concurrency limit unexpectedly.

This change ensures that the permit is released immediately as soon as
the inner future becomes ready, regardless of how long the response
future is held. This also adds trace logging when the permit is
released.
  • Loading branch information
olix0r authored Nov 23, 2020
1 parent f0da619 commit 42a7435
Showing 1 changed file with 22 additions and 26 deletions.
48 changes: 22 additions & 26 deletions linkerd/concurrency-limit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
#![deny(warnings, rust_2018_idioms)]

use pin_project::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{fmt, mem};
use std::{
fmt,
future::Future,
mem,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tower::Service;
use tracing::trace;
Expand Down Expand Up @@ -41,8 +44,8 @@ enum State {
pub struct ResponseFuture<T> {
#[pin]
inner: T,
// We only keep this around so that it is dropped when the future completes.
_permit: OwnedSemaphorePermit,
// The permit is held until the future becomes ready.
permit: Option<OwnedSemaphorePermit>,
}

impl From<Arc<Semaphore>> for Layer {
Expand Down Expand Up @@ -74,21 +77,6 @@ impl<T> ConcurrencyLimit<T> {
state: State::Empty,
}
}

/// Get a reference to the inner service
pub fn get_ref(&self) -> &T {
&self.inner
}

/// Get a mutable reference to the inner service
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}

/// Consume `self`, returning the inner service
pub fn into_inner(self) -> T {
self.inner
}
}

impl<S, Request> Service<Request> for ConcurrencyLimit<S>
Expand Down Expand Up @@ -119,17 +107,17 @@ where

fn call(&mut self, request: Request) -> Self::Future {
// Make sure a permit has been acquired
let _permit = match mem::replace(&mut self.state, State::Empty) {
let permit = match mem::replace(&mut self.state, State::Empty) {
// Take the permit.
State::Ready(permit) => permit,
State::Ready(permit) => Some(permit),
// whoopsie!
_ => panic!("max requests in-flight; poll_ready must be called first"),
};

// Call the inner service
let inner = self.inner.call(request);

ResponseFuture { inner, _permit }
ResponseFuture { inner, permit }
}
}

Expand All @@ -153,7 +141,15 @@ where
type Output = T::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx)
let this = self.project();
let res = futures::ready!(this.inner.poll(cx));
let released = this.permit.take().is_some();
debug_assert!(
released,
"Permit must be released when the future completes"
);
trace!("permit released");
Poll::Ready(res)
}
}

Expand Down

0 comments on commit 42a7435

Please sign in to comment.