Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(http server): handle post and option HTTP requests properly. #637

Merged
merged 19 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions http-server/src/access_control/cors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::{fmt, ops};

use crate::access_control::hosts::{Host, Port};
use crate::access_control::matcher::{Matcher, Pattern};
use jsonrpsee_core::Cow;
use lazy_static::lazy_static;
use unicase::Ascii;

Expand Down Expand Up @@ -169,6 +170,16 @@ pub enum AccessControlAllowHeaders {
Any,
}

impl AccessControlAllowHeaders {
/// Return an appropriate value for the CORS header "Access-Control-Allow-Headers".
pub fn to_cors_header_value(&self) -> Cow<'_, str> {
match self {
AccessControlAllowHeaders::Any => "*".into(),
AccessControlAllowHeaders::Only(headers) => headers.join(", ").into(),
}
}
}

/// CORS response headers
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AllowCors<T> {
Expand Down
5 changes: 5 additions & 0 deletions http-server/src/access_control/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ impl AccessControl {
});
header == cors::AllowCors::Invalid && !self.continue_on_invalid_cors
}

/// Return the allowed headers we've set
pub(crate) fn allowed_headers(&self) -> &AccessControlAllowHeaders {
&self.allowed_headers
}
}

impl Default for AccessControl {
Expand Down
9 changes: 9 additions & 0 deletions http-server/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,12 @@ fn from_template<S: Into<hyper::Body>>(
pub fn ok_response(body: String) -> hyper::Response<hyper::Body> {
from_template(hyper::StatusCode::OK, body, JSON)
}

/// Create a response for unsupported content type.
pub fn unsupported_content_type() -> hyper::Response<hyper::Body> {
from_template(
hyper::StatusCode::UNSUPPORTED_MEDIA_TYPE,
"Supplied content type is not allowed. Content-Type: application/json is required\n".to_owned(),
TEXT,
)
}
260 changes: 148 additions & 112 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ use std::net::{SocketAddr, TcpListener, ToSocketAddrs};
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::response::{internal_error, malformed};
use crate::{response, AccessControl};
use futures_channel::mpsc;
use futures_util::{future::join_all, stream::StreamExt, FutureExt};
use hyper::server::{conn::AddrIncoming, Builder as HyperBuilder};
use hyper::service::{make_service_fn, service_fn};
use hyper::Error as HyperError;
use hyper::{Error as HyperError, Method};
use jsonrpsee_core::error::{Error, GenericTransportError};
use jsonrpsee_core::http_helpers::read_body;
use jsonrpsee_core::http_helpers::{self, read_body};
use jsonrpsee_core::id_providers::NoopIdProvider;
use jsonrpsee_core::middleware::Middleware;
use jsonrpsee_core::server::helpers::{collect_batch_response, prepare_error, MethodSink};
Expand Down Expand Up @@ -302,116 +303,47 @@ impl<M: Middleware> Server<M> {
// two cases: a single RPC request or a batch of RPC requests.
async move {
if let Err(e) = access_control_is_valid(&access_control, &request) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this became hard to review, however validate origins and headers before checking the method kind.

return Ok::<_, HyperError>(e);
return Ok(e);
}

if let Err(e) = content_type_is_valid(&request) {
return Ok::<_, HyperError>(e);
}

let (parts, body) = request.into_parts();

let (body, mut is_single) = match read_body(&parts.headers, body, max_request_body_size).await {
Ok(r) => r,
Err(GenericTransportError::TooLarge) => return Ok::<_, HyperError>(response::too_large()),
Err(GenericTransportError::Malformed) => return Ok::<_, HyperError>(response::malformed()),
Err(GenericTransportError::Inner(e)) => {
tracing::error!("Internal error reading request body: {}", e);
return Ok::<_, HyperError>(response::internal_error());
}
};

let request_start = middleware.on_request();

// NOTE(niklasad1): it's a channel because it's needed for batch requests.
let (tx, mut rx) = mpsc::unbounded::<String>();
let sink = MethodSink::new_with_limit(tx, max_request_body_size);

type Notif<'a> = Notification<'a, Option<&'a RawValue>>;

// Single request or notification
if is_single {
if let Ok(req) = serde_json::from_slice::<Request>(&body) {
middleware.on_call(req.method.as_ref());

// NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here.
match methods.execute_with_resources(&sink, req, 0, &resources, &NoopIdProvider) {
Ok((name, MethodResult::Sync(success))) => {
middleware.on_result(name, success, request_start);
}
Ok((name, MethodResult::Async(fut))) => {
let success = fut.await;

middleware.on_result(name, success, request_start);
}
Err(name) => {
middleware.on_result(name.as_ref(), false, request_start);
}
}
} else if let Ok(_req) = serde_json::from_slice::<Notif>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
let (id, code) = prepare_error(&body);
sink.send_error(id, code.into());
// Only `POST` and `OPTIONS` methods are allowed.
match *request.method() {
Method::POST if content_type_is_json(&request) => {
process_validated_request(
request,
middleware,
methods,
resources,
max_request_body_size,
)
.await
}

// Batch of requests or notifications
} else if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&body) {
if !batch.is_empty() {
let middleware = &middleware;

join_all(batch.into_iter().filter_map(
move |req| match methods.execute_with_resources(
&sink,
req,
0,
&resources,
&NoopIdProvider,
) {
Ok((name, MethodResult::Sync(success))) => {
middleware.on_result(name, success, request_start);
None
}
Ok((name, MethodResult::Async(fut))) => Some(async move {
let success = fut.await;
middleware.on_result(name, success, request_start);
}),
Err(name) => {
middleware.on_result(name.as_ref(), false, request_start);
None
}
},
))
.await;
} else {
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
is_single = true;
sink.send_error(Id::Null, ErrorCode::InvalidRequest.into());
// Handle CORS preflight request. We've done our access check
// above so we just need to tell the browser that the request is OK.
Method::OPTIONS => {
let origin = match http_helpers::read_header_value(request.headers(), "origin") {
Some(origin) => origin,
None => return Ok(malformed()),
};
let allowed_headers = access_control.allowed_headers().to_cors_header_value();
let allowed_header_bytes = allowed_headers.as_bytes();

let res = hyper::Response::builder()
.header("access-control-allow-origin", origin)
.header("access-control-allow-methods", "POST")
.header("access-control-allow-headers", allowed_header_bytes)
.body(hyper::Body::empty())
.unwrap_or_else(|e| {
tracing::error!("Error forming preflight response: {}", e);
internal_error()
});

Ok(res)
}
} else if let Ok(_batch) = serde_json::from_slice::<Vec<Notif>>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
is_single = true;
let (id, code) = prepare_error(&body);
sink.send_error(id, code.into());
// Error scenarios:
Method::POST => Ok(response::unsupported_content_type()),
_ => Ok(response::method_not_allowed()),
}

// Closes the receiving half of a channel without dropping it. This prevents any further
// messages from being sent on the channel.
rx.close();
let response = if is_single {
rx.next().await.expect("Sender is still alive managed by us above; qed")
} else {
collect_batch_response(rx).await
};
tracing::debug!("[service_fn] sending back: {:?}", &response[..cmp::min(response.len(), 1024)]);
middleware.on_response(request_start);
Ok::<_, HyperError>(response::ok_response(response))
}
}))
}
Expand Down Expand Up @@ -449,11 +381,8 @@ fn access_control_is_valid(
}

/// Checks that content type of received request is valid for JSON-RPC.
fn content_type_is_valid(request: &hyper::Request<hyper::Body>) -> Result<(), hyper::Response<hyper::Body>> {
match *request.method() {
hyper::Method::POST if is_json(request.headers().get("content-type")) => Ok(()),
_ => Err(response::method_not_allowed()),
}
fn content_type_is_json(request: &hyper::Request<hyper::Body>) -> bool {
is_json(request.headers().get("content-type"))
}

/// Returns true if the `content_type` header indicates a valid JSON message.
Expand All @@ -469,3 +398,110 @@ fn is_json(content_type: Option<&hyper::header::HeaderValue>) -> bool {
_ => false,
}
}

/// Process a verified request, it implies a POST request with content type JSON.
async fn process_validated_request(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to a separate function to decrease the indentation level and become more readable...

request: hyper::Request<hyper::Body>,
middleware: impl Middleware,
methods: Methods,
resources: Resources,
max_request_body_size: u32,
) -> Result<hyper::Response<hyper::Body>, HyperError> {
let (parts, body) = request.into_parts();

let (body, mut is_single) = match read_body(&parts.headers, body, max_request_body_size).await {
Ok(r) => r,
Err(GenericTransportError::TooLarge) => return Ok(response::too_large()),
Err(GenericTransportError::Malformed) => return Ok(response::malformed()),
Err(GenericTransportError::Inner(e)) => {
tracing::error!("Internal error reading request body: {}", e);
return Ok(response::internal_error());
}
};

let request_start = middleware.on_request();

// NOTE(niklasad1): it's a channel because it's needed for batch requests.
let (tx, mut rx) = mpsc::unbounded::<String>();
let sink = MethodSink::new_with_limit(tx, max_request_body_size);

type Notif<'a> = Notification<'a, Option<&'a RawValue>>;

// Single request or notification
if is_single {
if let Ok(req) = serde_json::from_slice::<Request>(&body) {
middleware.on_call(req.method.as_ref());

// NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here.
match methods.execute_with_resources(&sink, req, 0, &resources, &NoopIdProvider) {
Ok((name, MethodResult::Sync(success))) => {
middleware.on_result(name, success, request_start);
}
Ok((name, MethodResult::Async(fut))) => {
let success = fut.await;

middleware.on_result(name, success, request_start);
}
Err(name) => {
middleware.on_result(name.as_ref(), false, request_start);
}
}
} else if let Ok(_req) = serde_json::from_slice::<Notif>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
let (id, code) = prepare_error(&body);
sink.send_error(id, code.into());
}

// Batch of requests or notifications
} else if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&body) {
if !batch.is_empty() {
let middleware = &middleware;

join_all(batch.into_iter().filter_map(move |req| {
match methods.execute_with_resources(&sink, req, 0, &resources, &NoopIdProvider) {
Ok((name, MethodResult::Sync(success))) => {
middleware.on_result(name, success, request_start);
None
}
Ok((name, MethodResult::Async(fut))) => Some(async move {
let success = fut.await;
middleware.on_result(name, success, request_start);
}),
Err(name) => {
middleware.on_result(name.as_ref(), false, request_start);
None
}
}
}))
.await;
} else {
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
is_single = true;
sink.send_error(Id::Null, ErrorCode::InvalidRequest.into());
}
} else if let Ok(_batch) = serde_json::from_slice::<Vec<Notif>>(&body) {
return Ok(response::ok_response("".into()));
} else {
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
is_single = true;
let (id, code) = prepare_error(&body);
sink.send_error(id, code.into());
}

// Closes the receiving half of a channel without dropping it. This prevents any further
// messages from being sent on the channel.
rx.close();
let response = if is_single {
rx.next().await.expect("Sender is still alive managed by us above; qed")
} else {
collect_batch_response(rx).await
};
tracing::debug!("[service_fn] sending back: {:?}", &response[..cmp::min(response.len(), 1024)]);
middleware.on_response(request_start);
Ok(response::ok_response(response))
}
1 change: 1 addition & 0 deletions tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ tokio = { version = "1.8", features = ["full"] }
tracing = "0.1"
serde = "1"
serde_json = "1"
hyper = { version = "0.14", features = ["http1", "client"] }
8 changes: 6 additions & 2 deletions tests/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::net::SocketAddr;
use std::time::Duration;

use jsonrpsee::core::Error;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
use jsonrpsee::http_server::{AccessControl, HttpServerBuilder, HttpServerHandle};
use jsonrpsee::ws_server::{WsServerBuilder, WsServerHandle};
use jsonrpsee::RpcModule;

Expand Down Expand Up @@ -117,7 +117,11 @@ pub async fn websocket_server() -> SocketAddr {
}

pub async fn http_server() -> (SocketAddr, HttpServerHandle) {
let server = HttpServerBuilder::default().build("127.0.0.1:0").unwrap();
http_server_with_access_control(AccessControl::default()).await
}

pub async fn http_server_with_access_control(acl: AccessControl) -> (SocketAddr, HttpServerHandle) {
let server = HttpServerBuilder::default().set_access_control(acl).build("127.0.0.1:0").unwrap();
let mut module = RpcModule::new(());
let addr = server.local_addr().unwrap();
module.register_method("say_hello", |_, _| Ok("hello")).unwrap();
Expand Down
Loading