Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

middleware: Implement proxy URI paths to RPC methods #859

Merged
merged 12 commits into from
Aug 24, 2022
Merged
101 changes: 101 additions & 0 deletions examples/examples/http_proxy_middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! This example utilizes the `ProxyRequest` layer for redirecting
//! `GET /path` requests to internal RPC methods.
//!
//! The RPC server registers a method named `system_health` which
//! returns `serde_json::Value`. Redirect any `GET /health`
//! requests to the internal method, and return only the method's
//! response in the body (ie, without any jsonRPC 2.0 overhead).
//!
//! # Note
//!
//! This functionality is useful for services which would
//! like to query a certain `URI` path for statistics.

use hyper::{Body, Client, Request};
use std::net::SocketAddr;
use std::time::Duration;

use jsonrpsee::core::client::ClientT;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::http_server::middleware::proxy_request::ProxyGetRequestLayer;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle, RpcModule};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.expect("setting default subscriber failed");

let (addr, _handler) = run_server().await?;
let url = format!("http://{}", addr);

// Use RPC client to get the response of `say_hello` method.
let client = HttpClientBuilder::default().build(&url)?;
let response: String = client.request("say_hello", None).await?;
println!("[main]: response: {:?}", response);

// Use hyper client to manually submit a `GET /health` request.
let http_client = Client::new();
let uri = format!("http://{}/health", addr);

let req = Request::builder().method("GET").uri(&uri).body(Body::empty())?;
println!("[main]: Submit proxy request: {:?}", req);
let res = http_client.request(req).await?;
println!("[main]: Received proxy response: {:?}", res);

// Interpret the response as String.
let bytes = hyper::body::to_bytes(res.into_body()).await.unwrap();
let out = String::from_utf8(bytes.to_vec()).unwrap();
println!("[main]: Interpret proxy response: {:?}", out);
assert_eq!(out.as_str(), "{\"health\":true}");

Ok(())
}

async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> {
// Custom tower service to handle the RPC requests
let service_builder = tower::ServiceBuilder::new()
// Proxy `GET /health` requests to internal `system_health` method.
.layer(ProxyGetRequestLayer::new("/health", "system_health"))
.timeout(Duration::from_secs(2));

let server =
HttpServerBuilder::new().set_middleware(service_builder).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;

let addr = server.local_addr()?;

let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo")).unwrap();
module.register_method("system_health", |_, _| Ok(serde_json::json!({ "health": true }))).unwrap();

let handler = server.start(module)?;

Ok((addr, handler))
}
1 change: 1 addition & 0 deletions http-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde_json = { version = "1.0", features = ["raw_value"] }
serde = "1"
tokio = { version = "1.16", features = ["rt-multi-thread", "macros"] }
tower = "0.4.13"
pin-project-lite = "0.2.9"
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
Expand Down
3 changes: 3 additions & 0 deletions http-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ mod server;
/// Common builders for RPC responses.
pub mod response;

/// Common tower middleware exposed for RPC interaction.
pub mod middleware;

pub use jsonrpsee_core::server::access_control::{AccessControl, AccessControlBuilder};
pub use jsonrpsee_core::server::rpc_module::RpcModule;
pub use jsonrpsee_types as types;
Expand Down
4 changes: 4 additions & 0 deletions http-server/src/middleware/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
//! Various middleware implementations for RPC specific purposes.

/// Proxy `GET /path` to internal RPC methods.
pub mod proxy_request;
140 changes: 140 additions & 0 deletions http-server/src/middleware/proxy_request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
//! Middleware that proxies requests at a specified URI to internal
lexnv marked this conversation as resolved.
Show resolved Hide resolved
//! RPC method calls.

use crate::response;
use hyper::header::{ACCEPT, CONTENT_TYPE};
use hyper::http::HeaderValue;
use hyper::{Body, Method, Request, Response, Uri};
use jsonrpsee_types::{Id, RequestSer};
use std::error::Error;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tower::{Layer, Service};

/// Layer that applies [`ProxyGetRequest`] which proxies the `GET /path` requests to
/// specific RPC method calls and that strips the response.
///
/// See [`ProxyGetRequest`] for more details.
#[derive(Debug, Clone)]
pub struct ProxyGetRequestLayer {
path: String,
method: String,
}

impl ProxyGetRequestLayer {
/// Creates a new [`ProxyGetRequestLayer`].
///
/// See [`ProxyGetRequest`] for more details.
pub fn new(path: impl Into<String>, method: impl Into<String>) -> Self {
Self { path: path.into(), method: method.into() }
}
}
impl<S> Layer<S> for ProxyGetRequestLayer {
type Service = ProxyGetRequest<S>;

fn layer(&self, inner: S) -> Self::Service {
ProxyGetRequest::new(inner, &self.path, &self.method)
}
}

/// Proxy `GET /path` requests to the specified RPC method calls.
///
/// # Request
///
/// The `GET /path` requests are modified into valid `POST` requests for
/// calling the RPC method. This middleware adds appropriate headers to the
/// request, and completely modifies the request `BODY`.
///
/// # Response
///
/// The response of the RPC method is stripped down to contain only the method's
/// response, removing any RPC 2.0 spec logic regarding the response' body.
#[derive(Debug, Clone)]
pub struct ProxyGetRequest<S> {
inner: S,
path: Arc<str>,
method: Arc<str>,
}

impl<S> ProxyGetRequest<S> {
/// Creates a new [`ProxyGetRequest`].
///
/// The request `GET /path` is redirected to the provided method.
pub fn new(inner: S, path: &str, method: &str) -> Self {
Self { inner, path: Arc::from(path), method: Arc::from(method) }
}
}

impl<S> Service<Request<Body>> for ProxyGetRequest<S>
where
S: Service<Request<Body>, Response = Response<Body>>,
S::Response: 'static,
S::Error: Into<Box<dyn Error + Send + Sync>> + 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = Box<dyn Error + Send + Sync + 'static>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, mut req: Request<Body>) -> Self::Future {
let modify = self.path.as_ref() == req.uri() && req.method() == Method::GET;

// Proxy the request to the appropriate method call.
if modify {
// RPC methods are accessed with `POST`.
*req.method_mut() = Method::POST;
// Precautionary remove the URI.
*req.uri_mut() = Uri::from_static("/");

// Requests must have the following headers:
req.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
req.headers_mut().insert(ACCEPT, HeaderValue::from_static("application/json"));

// Adjust the body to reflect the method call.
let body = Body::from(
serde_json::to_string(&RequestSer::new(&Id::Number(0), &self.method, None))
.expect("Valid request; qed"),
);
req = req.map(|_| body);
}

// Call the inner service and get a future that resolves to the response.
let fut = self.inner.call(req);

// Adjust the response if needed.
let res_fut = async move {
let res = fut.await.map_err(|err| err.into())?;

// Nothing to modify: return the response as is.
if !modify {
return Ok(res);
}

let body = res.into_body();
let bytes = hyper::body::to_bytes(body).await?;

#[derive(serde::Deserialize, Debug)]
struct RpcPayload<'a> {
#[serde(borrow)]
result: &'a serde_json::value::RawValue,
}

let response = if let Ok(payload) = serde_json::from_slice::<RpcPayload>(&bytes) {
response::ok_response(payload.result.to_string())
} else {
response::internal_error()
};

Ok(response)
};

Box::pin(res_fut)
}
}
Loading