Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[http server] Batch requests #292

Merged
merged 20 commits into from
May 4, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::runtime::Runtime as TokioRuntime;

mod helpers;

criterion_group!(benches, http_requests, websocket_requests, jsonrpsee_types_v2);
criterion_group!(benches, http_requests, batched_http_requests, websocket_requests, jsonrpsee_types_v2);
criterion_main!(benches);

fn v2_serialize<'a>(req: JsonRpcCallSer<'a>) -> String {
Expand Down Expand Up @@ -47,6 +47,13 @@ pub fn http_requests(crit: &mut Criterion) {
run_concurrent_round_trip(&rt, crit, client.clone(), "http_concurrent_round_trip");
}

pub fn batched_http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::http_server());
let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap());
run_round_trip_with_batch(&rt, crit, client.clone(), "batched_http_round_trip", 10);
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn websocket_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
Expand All @@ -66,6 +73,23 @@ fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Clie
});
}

fn run_round_trip_with_batch(
rt: &TokioRuntime,
crit: &mut Criterion,
client: Arc<impl Client>,
name: &str,
batch_size: usize,
) {
let batch = vec![("say_hello", JsonRpcParams::NoParams); batch_size];
crit.bench_function(name, |b| {
b.iter(|| {
rt.block_on(async {
black_box(client.batch_request::<String>(batch.clone()).await.unwrap());
})
})
});
}

fn run_concurrent_round_trip<C: 'static + Client + Send + Sync>(
rt: &TokioRuntime,
crit: &mut Criterion,
Expand Down
108 changes: 86 additions & 22 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,15 @@ use hyper::{
use jsonrpsee_types::error::{Error, GenericTransportError, RpcError};
use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest};
use jsonrpsee_types::v2::{error::JsonRpcErrorCode, params::RpcParams};
use jsonrpsee_utils::{hyper_helpers::read_response_to_body, server::send_error};
use jsonrpsee_utils::{
hyper_helpers::read_response_to_body,
server::{send_error, RpcSender},
};
use serde::Serialize;
use serde_json::value::RawValue;
use socket2::{Domain, Socket, Type};
use std::{
cmp,
net::{SocketAddr, TcpListener},
sync::Arc,
};
Expand Down Expand Up @@ -153,6 +158,30 @@ impl Server {
Ok::<_, HyperError>(service_fn(move |request| {
let methods = methods.clone();
let access_control = access_control.clone();

// Look up the "method" (i.e. function pointer) from the registered methods and run it passing in
// the params from the request. The result of the computation is sent back over the `tx` channel and
// the result(s) are collected into a `String` and sent back over the wire.
let execute =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to make this a standalone function? the start fn is already fairly long, i think it would be good to move some code out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, and I agree, and I tried but couldn't make it work; I'd have to look up the method from the methods hash map in start() and pass it along so it didn't really shorten up the code much.

Lately I've started to shift my views on when it's right to refactor for briefness. The rule of thumb is still "terse is good", but for some cases I've started to think it's alright to keep the code long when it's "the main loop", whatever that means for each specific case, i.e. the most important thing that a given program does. Splitting things up too much can force the reader to jump around more than is ideal and even if it's long it is sometimes more readable to keep it all in one place. I know this is hand wavey (and for this case here I wanted to do exactly what you suggest), but yeah, just wanted to share those thoughts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mainly because of the closure that is passed to hyper but yeah this changes made the code really quite hard to read, it wasn't great before either.

Maybe we split it the response handling to helper functions, basically you have to read the bytes here to take ownership over it which used to later to build a message to send to the background task.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I found this more readable but it's still quite long ^^

(maybe because I wrote it lol)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

K, I tried another approach, moving execute to be a method on Server so I can access self.root (i.e. the function pointers we call to run the request), but it doesn't work because self is moved into the closure when calling make_service_fn().
On the client side it's a bit easier because we don't have to call anything. Or I'm too limited to see how to do it! Pointers welcome!

Copy link
Contributor

@insipx insipx May 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could just add another parameter, &Methods (we'd have to import methods from jsonrpsee_util) to the execute function so execute becomes:

fn execute(methods: &Methods, id: Option<&RawValue>, tx: RpcSender, method_name: &str, params: Option<&RawValue>) {
	if let Some(method) = methods.get(method_name) {
		let params = RpcParams::new(params.map(|params| params.get()));
		// NOTE(niklasad1): connection ID is unused thus hardcoded to `0`.
		if let Err(err) = (method)(id, params, &tx, 0) {
			log::error!("execution of method call {} failed: {:?}, request id={:?}", method_name, err, id);
		}
	} else {
		send_error(id, tx, JsonRpcErrorCode::MethodNotFound.into());
	}
}

and then call it like execute(&methods, id, &tx, &method_name, params);

I guess the tradeoff is having a long function signature for execute, though.

I think I agree with your analysis of main_loops though. Often it can be hard to make them shorter and would result in more confusing code than just keeping it all together. In this case I think execute fn is self-explanatory enough in that it just executes the right function for the rpc call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@insipx I could have sworn I tried that; your version works too. I can go either way here, @niklasad1 thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW the performance of @insipx's version is identical.

move |id: Option<&RawValue>, tx: RpcSender, method_name: &str, params: Option<&RawValue>| {
if let Some(method) = methods.get(method_name) {
let params = RpcParams::new(params.map(|params| params.get()));
// NOTE(niklasad1): connection ID is unused thus hardcoded to `0`.
if let Err(err) = (method)(id, params, &tx, 0) {
log::error!(
"execution of method call {} failed: {:?}, request id={:?}",
method_name,
err,
id
);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@niklasad1 What does the connected client see in this case, i.e. when the method they're calling fails? Shouldn't we be sending back the error here in addition to logging it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that what you're doing in #295 perhaps?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, #295 should fix that but it doesn't log it FWIW

} else {
send_error(id, tx, JsonRpcErrorCode::MethodNotFound.into());
}
};

// Run some validation on the http request, then read the body and try to deserialize it into one of
// two cases: a single RPC request or a batch of RPC requests.
async move {
if let Err(e) = access_control_is_valid(&access_control, &request) {
return Ok::<_, HyperError>(e);
Expand All @@ -175,31 +204,48 @@ impl Server {

// NOTE(niklasad1): it's a channel because it's needed for batch requests.
let (tx, mut rx) = mpsc::unbounded();
// Is this a single request or a batch (or error)?
let mut single = true;

match serde_json::from_slice::<JsonRpcRequest>(&body) {
Ok(req) => {
log::debug!("recv: {:?}", req);
let params = RpcParams::new(req.params.map(|params| params.get()));
if let Some(method) = methods.get(&*req.method) {
// NOTE(niklasad1): connection ID is unused thus hardcoded to `0`.
if let Err(err) = (method)(req.id, params, &tx, 0) {
log::error!("method_call: {} failed: {:?}", req.method, err);
}
} else {
send_error(req.id, &tx, JsonRpcErrorCode::MethodNotFound.into());
// For reasons outlined [here](https://github.com/serde-rs/json/issues/497), `RawValue` can't be
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
// used with untagged enums at the moment. This means we can't use an `SingleOrBatch` untagged
// enum here and have to try each case individually: first the single request case, then the
// batch case and lastly the error. For the worst case – unparseable input – we make three calls
// to [`serde_json::from_slice`] which is pretty annoying.
// Our [issue](https://github.com/paritytech/jsonrpsee/issues/296).
if let Ok(JsonRpcRequest { id, method: method_name, params, .. }) =
serde_json::from_slice::<JsonRpcRequest>(&body)
{
execute(id, &tx, &method_name, params);
} else if let Ok(batch) = serde_json::from_slice::<Vec<JsonRpcRequest>>(&body) {
if !batch.is_empty() {
single = false;
for JsonRpcRequest { id, method: method_name, params, .. } in batch {
execute(id, &tx, &method_name, params);
}
} else {
send_error(None, &tx, JsonRpcErrorCode::InvalidRequest.into());
}
Err(_e) => {
let (id, code) = match serde_json::from_slice::<JsonRpcInvalidRequest>(&body) {
Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest),
Err(_) => (None, JsonRpcErrorCode::ParseError),
};
send_error(id, &tx, code.into());
}
} else {
log::error!(
"[service_fn], Cannot parse request body={:?}",
String::from_utf8_lossy(&body[..cmp::min(body.len(), 1024)])
);
let (id, code) = match serde_json::from_slice::<JsonRpcInvalidRequest>(&body) {
Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest),
Err(_) => (None, JsonRpcErrorCode::ParseError),
};
send_error(id, &tx, code.into());
}
// Closes the receiving half of a channel without dropping it. This prevents any further
// messages from being sent on the channel.
rx.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this is performed to mark to that we are done sending any further messages?!
Such that we the receiver will receive eventually Ok(None) when all messages has been sent?

I think it deserves a comment because it was not straightforward to me at least

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes exactly, I read this in the docs:

Closes the receiving half of a channel without dropping it.
This prevents any further messages from being sent on the channel while still enabling the receiver to drain messages that are buffered.

I don't know if it's necessary to close the channel in this case but thought maybe it's a "best practice" to do so and if we ever decide to execute batch requests on separate tasks (and threads) with deadlines it's good to make sure the channel can't be written to. I'll add a comment.

Copy link
Member

@niklasad1 niklasad1 Apr 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, I think so because otherwise you have to drop the sender to close the channel. Because we have control of over both the sender and receiver (the sender is just borrowed by the call closure)

let response = if single {
rx.next().await.expect("Sender is still alive managed by us above; qed")
} else {
collect_batch_responses(rx).await
};

let response = rx.next().await.expect("Sender is still alive managed by us above; qed");
log::debug!("send: {:?}", response);
log::debug!("[service_fn] sending back: {:?}", &response[..cmp::min(response.len(), 1024)]);
Ok::<_, HyperError>(response::ok_response(response))
}
}))
Expand All @@ -211,6 +257,24 @@ impl Server {
}
}

// Collect the results of all computations sent back on the ['Stream'] into a single `String` appropriately wrapped in
// `[`/`]`.
async fn collect_batch_responses(rx: mpsc::UnboundedReceiver<String>) -> String {
let mut buf = String::with_capacity(2048);
buf.push('[');
let mut buf = rx
.fold(buf, |mut acc, response| async {
acc = [acc, response].concat();
acc.push(',');
acc
})
.await;
// Remove trailing comma
buf.pop();
buf.push(']');
buf
}

// Checks to that access control of the received request is the same as configured.
fn access_control_is_valid(
access_control: &AccessControl,
Expand Down
87 changes: 87 additions & 0 deletions http-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ async fn single_method_call_works() {
}
}

#[tokio::test]
async fn invalid_single_method_call() {
let _ = env_logger::try_init();
let addr = server().await;
let uri = to_http_uri(addr);

let req = r#"{"jsonrpc":"2.0","method":1, "params": "bar"}"#;
let response = http_request(req.into(), uri.clone()).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
assert_eq!(response.body, invalid_request(Id::Null));
}

#[tokio::test]
async fn single_method_call_with_params() {
let addr = server().await;
Expand All @@ -50,6 +62,81 @@ async fn single_method_call_with_params() {
assert_eq!(response.body, ok_response(JsonValue::Number(3.into()), Id::Num(1)));
}

#[tokio::test]
async fn valid_batched_method_calls() {
let _ = env_logger::try_init();

let addr = server().await;
let uri = to_http_uri(addr);

let req = r#"[
{"jsonrpc":"2.0","method":"add", "params":[1, 2],"id":1},
{"jsonrpc":"2.0","method":"add", "params":[3, 4],"id":2},
{"jsonrpc":"2.0","method":"say_hello","id":3},
{"jsonrpc":"2.0","method":"add", "params":[5, 6],"id":4}
]"#;
let response = http_request(req.into(), uri).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
assert_eq!(
response.body,
r#"[{"jsonrpc":"2.0","result":3,"id":1},{"jsonrpc":"2.0","result":7,"id":2},{"jsonrpc":"2.0","result":"lo","id":3},{"jsonrpc":"2.0","result":11,"id":4}]"#
);
}

#[tokio::test]
async fn batched_notifications() {
let _ = env_logger::try_init();

let addr = server().await;
let uri = to_http_uri(addr);

let req = r#"[
{"jsonrpc": "2.0", "method": "notif", "params": [1,2,4]},
{"jsonrpc": "2.0", "method": "notif", "params": [7]}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this probably because Id is regarded a None not sure, so a typed Id could fix this

]"#;
let response = http_request(req.into(), uri).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
// Note: this is *not* according to spec. Response should be the empty string, `""`.
assert_eq!(response.body, r#"[{"jsonrpc":"2.0","result":"","id":null},{"jsonrpc":"2.0","result":"","id":null}]"#);
}

#[tokio::test]
async fn invalid_batched_method_calls() {
let _ = env_logger::try_init();

let addr = server().await;
let uri = to_http_uri(addr);

// batch with no requests
let req = r#"[]"#;
let response = http_request(req.into(), uri.clone()).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
assert_eq!(response.body, invalid_request(Id::Null));

// batch with invalid request
let req = r#"[123]"#;
let response = http_request(req.into(), uri.clone()).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
// Note: according to the spec the `id` should be `null` here, not 123.
assert_eq!(response.body, invalid_request(Id::Num(123)));

// batch with invalid request
let req = r#"[1, 2, 3]"#;
let response = http_request(req.into(), uri.clone()).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
// Note: according to the spec this should return an array of three `Invalid Request`s
assert_eq!(response.body, parse_error(Id::Null));

// invalid JSON in batch
let req = r#"[
{"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"},
{"jsonrpc": "2.0", "method"
]"#;
let response = http_request(req.into(), uri.clone()).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
assert_eq!(response.body, parse_error(Id::Null));
}

#[tokio::test]
async fn should_return_method_not_found() {
let addr = server().await;
Expand Down
2 changes: 1 addition & 1 deletion types/src/v2/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl<'a> RpcParams<'a> {
/// If your type implement `Into<JsonValue>` call that favor of `serde_json::to:value` to
/// construct the parameters. Because `serde_json::to_value` serializes the type which
/// allocates whereas `Into<JsonValue>` doesn't in most cases.
#[derive(Serialize, Debug)]
#[derive(Serialize, Debug, Clone)]
#[serde(untagged)]
pub enum JsonRpcParams<'a> {
/// No params.
Expand Down