Skip to content

Commit

Permalink
when receiving a GOAWAY, allow earlier streams to still process
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Oct 4, 2017
1 parent c4ca8f7 commit 252e306
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 9 deletions.
5 changes: 1 addition & 4 deletions src/frame/go_away.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ impl GoAway {
}
}

#[cfg(feature = "unstable")]
pub fn last_stream_id(&self) -> StreamId {
self.last_stream_id
}
Expand All @@ -27,9 +26,7 @@ impl GoAway {

pub fn load(payload: &[u8]) -> Result<GoAway, Error> {
if payload.len() < 8 {
// Invalid payload len
// TODO: Handle error
unimplemented!();
return Err(Error::BadFrameSize);
}

let (last_stream_id, _) = StreamId::parse(&payload[..4]);
Expand Down
13 changes: 8 additions & 5 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,14 @@ where
trace!("recv SETTINGS; frame={:?}", frame);
self.settings.recv_settings(frame);
},
Some(GoAway(_)) => {
// TODO: handle the last_processed_id. Also, should this be
// handled as an error?
// let _ = RecvError::Proto(frame.reason());
return Ok(().into());
Some(GoAway(frame)) => {
trace!("recv GOAWAY; frame={:?}", frame);
// TODO: should adjust State to RecvGoAway
// This new state should prevent starting new streams,
// but should allow continuing to process current streams
// until they are all EOS. Once they are, State should
// transition to GoAway.
self.streams.recv_goaway(&frame);
},
Some(Ping(frame)) => {
trace!("recv PING; frame={:?}", frame);
Expand Down
27 changes: 27 additions & 0 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,33 @@ where
last_processed_id
}

pub fn recv_goaway(&mut self, frame: &frame::GoAway) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

let actions = &mut me.actions;
let counts = &mut me.counts;

let last_stream_id = frame.last_stream_id();
let reason = frame.reason();
let err = reason.into();

me.store
.for_each(|stream| {
if stream.id > last_stream_id {
counts.transition(stream, |_, stream| {
actions.recv.recv_err(&err, &mut *stream);
Ok::<_, ()>(())
})
} else {
Ok::<_, ()>(())
}
})
.unwrap();

actions.conn_error = Some(err);
}

pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), RecvError> {
let id = frame.stream_id();
let mut me = self.inner.lock().unwrap();
Expand Down
67 changes: 67 additions & 0 deletions tests/stream_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,73 @@ fn configure_max_frame_size() {
let _ = h2.join(srv).wait().expect("wait");
}

#[test]
fn recv_goaway_finishes_processed_streams() {
let _ = ::env_logger::init();
let (io, srv) = mock::new();

let srv = srv.assert_client_handshake()
.unwrap()
.recv_settings()
.recv_frame(
frames::headers(1)
.request("GET", "https://example.com/")
.eos(),
)
.recv_frame(
frames::headers(3)
.request("GET", "https://example.com/")
.eos(),
)
.send_frame(frames::go_away(1))
.send_frame(frames::headers(1).response(200))
.send_frame(frames::data(1, vec![0; 16_384]).eos())
.close();

let h2 = Client::handshake(io)
.expect("handshake")
.and_then(|(mut client, h2)| {
let request = Request::builder()
.method(Method::GET)
.uri("https://example.com/")
.body(())
.unwrap();

let req1 = client.send_request(request, true)
.unwrap()
.expect("response")
.and_then(|resp| {
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.into_parts().1;
body.concat2().expect("body")
})
.and_then(|buf| {
assert_eq!(buf.len(), 16_384);
Ok(())
});


// this request will trigger a goaway
let request = Request::builder()
.method(Method::GET)
.uri("https://example.com/")
.body(())
.unwrap();
let req2 = client.send_request(request, true)
.unwrap()
.then(|res| {
let err = res.unwrap_err();
assert_eq!(err.to_string(), "protocol error: not a result of an error");
Ok::<(), ()>(())
});

h2.expect("client").join3(req1, req2)
});


h2.join(srv).wait().expect("wait");
}

/*
#[test]
fn send_data_after_headers_eos() {
Expand Down

0 comments on commit 252e306

Please sign in to comment.