From a3cbf2d7acad6c4a36c075cd8a70c22b11e124c3 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 2 Aug 2017 11:34:34 -0700 Subject: [PATCH] Wire up Trailers frame --- src/frame/headers.rs | 11 ++++++++--- src/lib.rs | 4 +++- src/proto/connection.rs | 36 ++++++++++++++++++++++++++++-------- src/proto/streams/mod.rs | 9 +++++++-- src/proto/streams/recv.rs | 4 ++-- src/proto/streams/send.rs | 4 ++-- 6 files changed, 50 insertions(+), 18 deletions(-) diff --git a/src/frame/headers.rs b/src/frame/headers.rs index ab486f8a7..dbbf7996a 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -1,9 +1,10 @@ use super::StreamId; use hpack; use frame::{self, Frame, Head, Kind, Error}; +use HeaderMap; use http::{request, response, version, uri, Method, StatusCode, Uri}; -use http::header::{self, HeaderMap, HeaderName, HeaderValue}; +use http::header::{self, HeaderName, HeaderValue}; use bytes::{BytesMut, Bytes}; use byteorder::{BigEndian, ByteOrder}; @@ -23,7 +24,7 @@ pub struct Headers { stream_dep: Option, /// The decoded header fields - fields: HeaderMap, + fields: HeaderMap, /// Pseudo headers, these are broken out as they must be sent as part of the /// headers frame. @@ -110,7 +111,7 @@ const ALL: u8 = END_STREAM // ===== impl Headers ===== impl Headers { - pub fn new(stream_id: StreamId, pseudo: Pseudo, fields: HeaderMap) -> Self { + pub fn new(stream_id: StreamId, pseudo: Pseudo, fields: HeaderMap) -> Self { Headers { stream_id: stream_id, stream_dep: None, @@ -251,6 +252,10 @@ impl Headers { request } + pub fn into_fields(self) -> HeaderMap { + self.fields + } + pub fn encode(self, encoder: &mut hpack::Encoder, dst: &mut BytesMut) -> Option { diff --git a/src/lib.rs b/src/lib.rs index 23bca30bf..3badfb7ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,6 +41,8 @@ pub use proto::Connection; use bytes::Bytes; pub type FrameSize = u32; +// TODO: remove if carllerche/http#90 lands +pub type HeaderMap = http::HeaderMap; /// An H2 connection frame #[derive(Debug)] @@ -57,7 +59,7 @@ pub enum Frame { }, Trailers { id: StreamId, - headers: (), + headers: HeaderMap, }, PushPromise { id: StreamId, diff --git a/src/proto/connection.rs b/src/proto/connection.rs index b92fd4a38..b9f70b2fd 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,4 +1,5 @@ use {ConnectionError, Frame, Peer}; +use HeaderMap; use frame::{self, StreamId}; use client::Client; use server::Server; @@ -108,6 +109,17 @@ impl Connection }) } + pub fn send_trailers(self, + id: StreamId, + headers: HeaderMap) + -> sink::Send + { + self.send(Frame::Trailers { + id, + headers, + }) + } + pub fn start_ping(&mut self, _body: PingPayload) -> StartSend { unimplemented!(); } @@ -167,7 +179,7 @@ impl Connection Some(Headers(frame)) => { trace!("recv HEADERS; frame={:?}", frame); // Update stream state while ensuring that the headers frame - // can be received + // can be received. if let Some(frame) = try!(self.streams.recv_headers(frame)) { let frame = Self::convert_poll_message(frame); return Ok(Some(frame).into()); @@ -224,8 +236,10 @@ impl Connection fn convert_poll_message(frame: frame::Headers) -> Frame { if frame.is_trailers() { - // TODO: return trailers - unimplemented!(); + Frame::Trailers { + id: frame.stream_id(), + headers: frame.into_fields() + } } else { Frame::Headers { id: frame.stream_id(), @@ -328,7 +342,6 @@ impl Sink for Connection frame::Frame::Headers(frame) } - Frame::Data { id, data, end_of_stream } => { let frame = frame::Data::from_buf( id, data.into_buf(), end_of_stream); @@ -337,13 +350,20 @@ impl Sink for Connection frame.into() } - Frame::Reset { id, error } => frame::Reset::new(id, error).into(), - - /* Frame::Trailers { id, headers } => { - unimplemented!(); + let mut frame = frame::Headers::new( + id, + frame::Pseudo::default(), + headers); + + frame.set_end_stream(); + + self.streams.send_headers(&frame)?; + + frame::Frame::Headers(frame) } + /* Frame::PushPromise { id, promise } => { unimplemented!(); } diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 7969a06c4..da53607fe 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -87,7 +87,12 @@ impl Streams

{ }; if frame.is_trailers() { - try!(self.inner.recv.recv_trailers(state, frame.is_end_stream())); + if !frame.is_end_stream() { + // TODO: What error should this return? + unimplemented!(); + } + + try!(self.inner.recv.recv_eos(state)); } else { try!(self.inner.recv.recv_headers(state, frame.is_end_stream())); } @@ -174,7 +179,7 @@ impl Streams

{ }; if frame.is_trailers() { - try!(self.inner.send.send_trailers(state, frame.is_end_stream())); + try!(self.inner.send.send_eos(state)); } else { try!(self.inner.send.send_headers(state, frame.is_end_stream())); } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 710dc837b..83a9aff7b 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -70,10 +70,10 @@ impl Recv

{ state.recv_open(self.init_window_sz, eos) } - pub fn recv_trailers(&mut self, _state: &mut state::Stream, _eos: bool) + pub fn recv_eos(&mut self, state: &mut state::Stream) -> Result<(), ConnectionError> { - unimplemented!(); + state.recv_close() } pub fn recv_data(&mut self, diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index bb3523eb3..1084dc8f4 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -72,10 +72,10 @@ impl Send

{ state.send_open(self.init_window_sz, eos) } - pub fn send_trailers(&mut self, _state: &mut state::Stream, _eos: bool) + pub fn send_eos(&mut self, state: &mut state::Stream) -> Result<(), ConnectionError> { - unimplemented!(); + state.send_close() } pub fn send_data(&mut self,