Skip to content

Commit

Permalink
middleware: Modify the response for proxies
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv committed Aug 22, 2022
1 parent cf7cf40 commit 1006c82
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 3 deletions.
1 change: 1 addition & 0 deletions http-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde_json = { version = "1.0", features = ["raw_value"] }
serde = "1"
tokio = { version = "1.16", features = ["rt-multi-thread", "macros"] }
tower = "0.4.13"
pin-project-lite = "0.2.9"

[dev-dependencies]
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
Expand Down
81 changes: 78 additions & 3 deletions http-server/src/middlewares/proxy_request.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
//! Middleware that proxies requests at a specified URI to internal
//! RPC method calls.
use crate::response;
use futures_util::ready;
use hyper::body::HttpBody;
use hyper::header::{ACCEPT, CONTENT_TYPE};
use hyper::http::HeaderValue;
use hyper::{Body, Method, Request, Response};
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::{Layer, Service};

Expand Down Expand Up @@ -63,11 +69,12 @@ impl<S> ProxyRequest<S> {

impl<S> Service<Request<Body>> for ProxyRequest<S>
where
S: Service<Request<Body>, Response = Response<Body>, Error = hyper::Error>,
S: Service<Request<Body>, Response = Response<Body>>,
<S as Service<Request<Body>>>::Error: From<hyper::Error>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
type Future = ResponseFuture<S::Future>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Expand All @@ -94,6 +101,74 @@ where
req = req.map(|_| body);
}

self.inner.call(req)
// Depending on `modify` adjust the response.
ResponseFuture::PollFuture { future: self.inner.call(req), modify }
}
}

pin_project! {
/// Response future for [`ProxyRequest`].
#[project = ResponseFutureState]
#[allow(missing_docs)]
pub enum ResponseFuture<F> {
/// Poll the response out of the future.
PollFuture {
#[pin]
future: F,
modify: bool,
},
/// Poll the [`hyper::Body`] response and modify it.
PollBodyData {
body: Body,
body_bytes: Vec<u8>,
},
}
}

impl<F, E> Future for ResponseFuture<F>
where
F: Future<Output = Result<Response<Body>, E>>,
E: From<hyper::Error>,
{
type Output = F::Output;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// The purpose of this loop is to optimise the transition from
// `PollFuture` -> `PollBodyData` state, that would otherwise
// require a `cx.wake().wake_by_ref and return Poll::Pending`.
loop {
match self.as_mut().project() {
ResponseFutureState::PollFuture { future, modify } => {
let res: Response<Body> = ready!(future.poll(cx)?);

// Nothing to modify: return the response as is.
if !*modify {
return Poll::Ready(Ok(res));
}

let inner = ResponseFuture::PollBodyData { body: res.into_body(), body_bytes: Vec::new() };
self.set(inner);
}
ResponseFutureState::PollBodyData { body, body_bytes } => {
while let Some(chunk) = ready!(Pin::new(&mut *body).poll_data(cx)?) {
body_bytes.extend_from_slice(chunk.as_ref());
}

#[derive(serde::Deserialize, Debug)]
struct RpcPayload<'a> {
#[serde(borrow)]
result: &'a serde_json::value::RawValue,
}

let response = if let Ok(payload) = serde_json::from_slice::<RpcPayload>(&body_bytes) {
response::ok_response(payload.result.to_string())
} else {
response::internal_error()
};

return Poll::Ready(Ok(response));
}
}
}
}
}

0 comments on commit 1006c82

Please sign in to comment.