Skip to content

Commit

Permalink
feat(server): allow sending a head with an upgrade
Browse files Browse the repository at this point in the history
This makes it easier to send a "101 Switching Protocols" before upgrading from
HTTP to another protocol.
  • Loading branch information
spinda committed Aug 12, 2017
1 parent 9376644 commit 95d0aff
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 12 deletions.
10 changes: 10 additions & 0 deletions src/http/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ impl fmt::Debug for Response {
}
}

impl fmt::Debug for Response<()> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Response")
.field("status", &self.status)
.field("version", &self.version)
.field("headers", &self.headers)
.finish()
}
}

/// Constructs a response using a received ResponseHead and optional body
#[inline]
#[cfg(not(feature = "raw_status"))]
Expand Down
52 changes: 40 additions & 12 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,9 @@ impl<B> Into<Message<__ProtoResponse, B>> for Response<B> {
}
}

impl<B, P> Into<Message<Result<__ProtoResponse, P>, B>> for UpgradableResponse<P, B> {
impl<B, P> Into<Message<UpgradableResponseHead<P>, B>> for UpgradableResponse<P, B> {
#[inline]
fn into(self) -> Message<Result<__ProtoResponse, P>, B> {
fn into(self) -> Message<UpgradableResponseHead<P>, B> {
match self {
UpgradableResponse::Response(res) => {
let (head, body) = response::split(res);
Expand All @@ -366,7 +366,11 @@ impl<B, P> Into<Message<Result<__ProtoResponse, P>, B>> for UpgradableResponse<P
Message::WithoutBody(Ok(__ProtoResponse(head)))
}
}
UpgradableResponse::Upgrade(upgrade_info) => Message::WithoutBody(Err(upgrade_info)),
UpgradableResponse::Upgrade(upgrade_info, maybe_head) => {
Message::WithoutBody(Err((upgrade_info, maybe_head.map(|head| {
__ProtoResponse(response::split(head).0)
}))))
}
}
}
}
Expand Down Expand Up @@ -408,14 +412,16 @@ type UpgradablePayload<T, B, P> = Result<(__ProtoTransport<T, B>, P), ::Error>;
type UpgradableReceiver<T, B, P> = Receiver<UpgradablePayload<T, B, P>>;
type UpgradableSender<T, B, P> = Sender<UpgradablePayload<T, B, P>>;

type UpgradableResponseHead<P> = Result<__ProtoResponse, (P, Option<__ProtoResponse>)>;

impl<T, B, P> ServerProto<(T, UpgradableSender<T, B, P>)> for UpgradableHttp<P, B>
where T: AsyncRead + AsyncWrite + 'static,
B: AsRef<[u8]> + 'static,
P: 'static,
{
type Request = __ProtoRequest;
type RequestBody = http::Chunk;
type Response = Result<__ProtoResponse, P>;
type Response = UpgradableResponseHead<P>;
type ResponseBody = B;
type Error = ::Error;
type Transport = UpgradableTransport<T, B, P>;
Expand Down Expand Up @@ -521,7 +527,7 @@ impl<T, B, P> Sink for UpgradableTransport<T, B, P>
where T: AsyncRead + AsyncWrite + 'static,
B: AsRef<[u8]> + 'static,
{
type SinkItem = Frame<Result<__ProtoResponse, P>, B, ::Error>;
type SinkItem = Frame<UpgradableResponseHead<P>, B, ::Error>;
type SinkError = io::Error;

fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
Expand All @@ -541,7 +547,28 @@ impl<T, B, P> Sink for UpgradableTransport<T, B, P>
Frame::Message { message, body } => {
match message {
Ok(message) => Frame::Message { message: message, body: body },
Err(upgrade_info) => {
Err((upgrade_info, maybe_head)) => {
if let Some(head) = maybe_head {
let result = self.inner_mut().start_send(Frame::Message {
message: head,
body: false
});
match result.map_err(|err| self.send_error(err))? {
AsyncSink::Ready => (),
AsyncSink::NotReady(item) => {
match item {
Frame::Message { message, body } => {
return Ok(AsyncSink::NotReady(Frame::Message {
message: Err((upgrade_info, Some(message))),
body: body,
}));
}
_ => unreachable!(),
}
}
}
}

let inner = self.take_inner();
let sender = self.take_sender();
let _ = sender.send(Ok((inner, upgrade_info)));
Expand Down Expand Up @@ -691,8 +718,9 @@ impl<T, B, P> Future for BindUpgradableConnection<T, B, P>
pub enum UpgradableResponse<P = (), B = ::Body> {
/// An HTTP response.
Response(Response<B>),
/// A protocol upgrade signal with accompanying information.
Upgrade(P),
/// A protocol upgrade signal with accompanying information and optional
/// "switching protocols" HTTP response head.
Upgrade(P, Option<Response<()>>),
}

impl<P, B> fmt::Debug for UpgradableResponse<P, B>
Expand All @@ -704,8 +732,8 @@ impl<P, B> fmt::Debug for UpgradableResponse<P, B>
UpgradableResponse::Response(ref response) => {
f.debug_tuple("Response").field(response).finish()
}
UpgradableResponse::Upgrade(ref upgrade_info) => {
f.debug_tuple("Upgrade").field(upgrade_info).finish()
UpgradableResponse::Upgrade(ref upgrade_info, ref response) => {
f.debug_tuple("Upgrade").field(upgrade_info).field(response).finish()
}
}
}
Expand Down Expand Up @@ -734,10 +762,10 @@ impl<T, B, P> Service for UpgradableHttpService<T>
B::Item: AsRef<[u8]>,
{
type Request = Message<__ProtoRequest, http::TokioBody>;
type Response = Message<Result<__ProtoResponse, P>, B>;
type Response = Message<UpgradableResponseHead<P>, B>;
type Error = ::Error;
type Future = Map<T::Future,
fn(UpgradableResponse<P, B>) -> Message<Result<__ProtoResponse, P>, B>>;
fn(UpgradableResponse<P, B>) -> Message<UpgradableResponseHead<P>, B>>;

#[inline]
fn call(&self, message: Self::Request) -> Self::Future {
Expand Down

0 comments on commit 95d0aff

Please sign in to comment.