Skip to content

Commit

Permalink
fix: limit the amount of pending-accept reset streams
Browse files Browse the repository at this point in the history
Streams that have been received by the peer, but not accepted by the
user, can also receive a RST_STREAM. This is a legitimate pattern: one
could send a request and then shortly after, realize it is not needed,
sending a CANCEL.

However, since those streams are now "closed", they don't count towards
the max concurrent streams. So, they will sit in the accept queue, using
memory.

In most cases, the user is calling `accept` in a loop, and they can
accept requests that have been reset fast enough that this isn't an
issue in practice.

But if the peer is able to flood the network faster than the server
accept loop can run (simply accepting, not processing requests; that
tends to happen in a separate task), the memory could grow.

So, this introduces a maximum count for streams in the pending-accept
but remotely-reset state. If the maximum is reached, a GOAWAY frame with
the error code of ENHANCE_YOUR_CALM is sent, and the connection marks
itself as errored.

ref CVE-2023-26964
ref GHSA-f8vr-r385-rh5r

Closes hyperium/hyper#2877
  • Loading branch information
seanmonstar committed Apr 13, 2023
1 parent 8088ca6 commit 5bc8e72
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 12 deletions.
8 changes: 8 additions & 0 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};

const DEFAULT_MAX_REMOTE_RESET_STREAMS: usize = 20;

/// An H2 connection
#[derive(Debug)]
pub(crate) struct Connection<T, P, B: Buf = Bytes>
Expand Down Expand Up @@ -118,6 +120,7 @@ where
.unwrap_or(false),
local_reset_duration: config.reset_stream_duration,
local_reset_max: config.reset_stream_max,
remote_reset_max: DEFAULT_MAX_REMOTE_RESET_STREAMS,
remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
remote_max_initiated: config
.settings
Expand Down Expand Up @@ -172,6 +175,11 @@ where
self.inner.streams.max_recv_streams()
}

#[cfg(feature = "unstable")]
pub fn num_wired_streams(&self) -> usize {
self.inner.streams.num_wired_streams()
}

/// Returns `Ready` when the connection is ready to receive a frame.
///
/// Returns `Error` as this may raise errors that are caused by delayed
Expand Down
51 changes: 43 additions & 8 deletions src/proto/streams/counts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@ pub(super) struct Counts {
num_recv_streams: usize,

/// Maximum number of pending locally reset streams
max_reset_streams: usize,
max_local_reset_streams: usize,

/// Current number of pending locally reset streams
num_reset_streams: usize,
num_local_reset_streams: usize,

/// Max number of "pending accept" streams that were remotely reset
max_remote_reset_streams: usize,

/// Current number of "pending accept" streams that were remotely reset
num_remote_reset_streams: usize,
}

impl Counts {
Expand All @@ -36,8 +42,10 @@ impl Counts {
num_send_streams: 0,
max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
num_recv_streams: 0,
max_reset_streams: config.local_reset_max,
num_reset_streams: 0,
max_local_reset_streams: config.local_reset_max,
num_local_reset_streams: 0,
max_remote_reset_streams: config.remote_reset_max,
num_remote_reset_streams: 0,
}
}

Expand Down Expand Up @@ -90,7 +98,7 @@ impl Counts {

/// Returns true if the number of pending reset streams can be incremented.
pub fn can_inc_num_reset_streams(&self) -> bool {
self.max_reset_streams > self.num_reset_streams
self.max_local_reset_streams > self.num_local_reset_streams
}

/// Increments the number of pending reset streams.
Expand All @@ -101,7 +109,34 @@ impl Counts {
pub fn inc_num_reset_streams(&mut self) {
assert!(self.can_inc_num_reset_streams());

self.num_reset_streams += 1;
self.num_local_reset_streams += 1;
}

pub(crate) fn max_remote_reset_streams(&self) -> usize {
self.max_remote_reset_streams
}

/// Returns true if the number of pending REMOTE reset streams can be
/// incremented.
pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
self.max_remote_reset_streams > self.num_remote_reset_streams
}

/// Increments the number of pending REMOTE reset streams.
///
/// # Panics
///
/// Panics on failure as this should have been validated before hand.
pub(crate) fn inc_num_remote_reset_streams(&mut self) {
assert!(self.can_inc_num_remote_reset_streams());

self.num_remote_reset_streams += 1;
}

pub(crate) fn dec_num_remote_reset_streams(&mut self) {
assert!(self.num_remote_reset_streams > 0);

self.num_remote_reset_streams -= 1;
}

pub fn apply_remote_settings(&mut self, settings: &frame::Settings) {
Expand Down Expand Up @@ -194,8 +229,8 @@ impl Counts {
}

fn dec_num_reset_streams(&mut self) {
assert!(self.num_reset_streams > 0);
self.num_reset_streams -= 1;
assert!(self.num_local_reset_streams > 0);
self.num_local_reset_streams -= 1;
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/proto/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ pub struct Config {
/// Maximum number of locally reset streams to keep at a time
pub local_reset_max: usize,

/// Maximum number of remotely reset "pending accept" streams to keep at a
/// time. Going over this number results in a connection error.
pub remote_reset_max: usize,

/// Initial window size of remote initiated streams
pub remote_init_window_sz: WindowSize,

Expand Down
30 changes: 28 additions & 2 deletions src/proto/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,12 +741,39 @@ impl Recv {
}

/// Handle remote sending an explicit RST_STREAM.
pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) {
pub fn recv_reset(
&mut self,
frame: frame::Reset,
stream: &mut Stream,
counts: &mut Counts,
) -> Result<(), Error> {
// Reseting a stream that the user hasn't accepted is possible,
// but should be done with care. These streams will continue
// to take up memory in the accept queue, but will no longer be
// counted as "concurrent" streams.
//
// So, we have a separate limit for these.
//
// See https://github.com/hyperium/hyper/issues/2877
if stream.is_pending_accept {
if counts.can_inc_num_remote_reset_streams() {
counts.inc_num_remote_reset_streams();
} else {
tracing::warn!(
"recv_reset; remotely-reset pending-accept streams reached limit ({:?})",
counts.max_remote_reset_streams(),
);
return Err(Error::library_go_away(Reason::ENHANCE_YOUR_CALM));
}
}

// Notify the stream
stream.state.recv_reset(frame, stream.is_pending_send);

stream.notify_send();
stream.notify_recv();

Ok(())
}

/// Handle a connection-level error
Expand Down Expand Up @@ -1033,7 +1060,6 @@ impl Recv {
cx: &Context,
stream: &mut Stream,
) -> Poll<Option<Result<Bytes, proto::Error>>> {
// TODO: Return error when the stream is reset
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
Some(event) => {
Expand Down
7 changes: 7 additions & 0 deletions src/proto/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,13 @@ impl State {
}
}

pub fn is_remote_reset(&self) -> bool {
match self.inner {
Closed(Cause::Error(ref e)) => e.is_local(),
_ => false,
}
}

/// Returns true if the stream is already reset.
pub fn is_reset(&self) -> bool {
match self.inner {
Expand Down
8 changes: 7 additions & 1 deletion src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ where
// TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
// the lock, so it can't.
me.refs += 1;

// Pending-accepted remotely-reset streams are counted.
if stream.state.is_remote_reset() {
me.counts.dec_num_remote_reset_streams();
}

StreamRef {
opaque: OpaqueStreamRef::new(self.inner.clone(), stream),
send_buffer: self.send_buffer.clone(),
Expand Down Expand Up @@ -601,7 +607,7 @@ impl Inner {
let actions = &mut self.actions;

self.counts.transition(stream, |counts, stream| {
actions.recv.recv_reset(frame, stream);
actions.recv.recv_reset(frame, stream, counts)?;
actions.send.handle_error(send_buffer, stream, counts);
assert!(stream.state.is_closed());
Ok(())
Expand Down
7 changes: 7 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,13 @@ where
pub fn max_concurrent_recv_streams(&self) -> usize {
self.connection.max_recv_streams()
}

// Could disappear at anytime.
#[doc(hidden)]
#[cfg(feature = "unstable")]
pub fn num_wired_streams(&self) -> usize {
self.connection.num_wired_streams()
}
}

#[cfg(feature = "stream")]
Expand Down
4 changes: 4 additions & 0 deletions tests/h2-support/src/frames.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ impl Mock<frame::GoAway> {
self.reason(frame::Reason::FRAME_SIZE_ERROR)
}

pub fn calm(self) -> Self {
self.reason(frame::Reason::ENHANCE_YOUR_CALM)
}

pub fn no_error(self) -> Self {
self.reason(frame::Reason::NO_ERROR)
}
Expand Down
41 changes: 40 additions & 1 deletion tests/h2-tests/tests/stream_states.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![deny(warnings)]

use futures::future::{join, join3, lazy, try_join};
use futures::future::{join, join3, lazy, poll_fn, try_join};
use futures::{FutureExt, StreamExt, TryStreamExt};
use h2_support::prelude::*;
use h2_support::util::yield_once;
Expand Down Expand Up @@ -194,6 +194,45 @@ async fn closed_streams_are_released() {
join(srv, h2).await;
}

#[tokio::test]
async fn reset_streams_dont_grow_memory_continuously() {
//h2_support::trace_init!();
let (io, mut client) = mock::new();

const N: u32 = 50;

let client = async move {
let settings = client.assert_server_handshake().await;
assert_default_settings!(settings);
for n in (1..(N * 2)).step_by(2) {
client
.send_frame(frames::headers(n).request("GET", "https://a.b/").eos())
.await;
client.send_frame(frames::reset(n).protocol_error()).await;
}
tokio::time::timeout(
std::time::Duration::from_secs(1),
client.recv_frame(frames::go_away(41).calm()),
)
.await
.expect("client goaway");
};

let srv = async move {
let mut srv = server::Builder::new()
.handshake::<_, Bytes>(io)
.await
.expect("handshake");

poll_fn(|cx| srv.poll_closed(cx))
.await
.expect_err("server should error");
// specifically, not 50;
assert_eq!(21, srv.num_wired_streams());
};
join(srv, client).await;
}

#[tokio::test]
async fn errors_if_recv_frame_exceeds_max_frame_size() {
h2_support::trace_init!();
Expand Down

0 comments on commit 5bc8e72

Please sign in to comment.