Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeouts for all requests #406

Merged
merged 13 commits into from
Jul 8, 2021
9 changes: 6 additions & 3 deletions http-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ documentation = "https://docs.rs/jsonrpsee-http-client"

[dependencies]
async-trait = "0.1"
futures = { version = "0.3.14", default-features = false, features = ["std"] }
hyper13-rustls = { package = "hyper-rustls", version = "0.21", optional = true }
hyper14-rustls = { package = "hyper-rustls", version = "0.22", optional = true }
hyper14 = { package = "hyper", version = "0.14", features = ["client", "http1", "http2", "tcp"], optional = true }
Expand All @@ -20,15 +21,17 @@ jsonrpsee-utils = { path = "../utils", version = "0.2.0", optional = true }
log = "0.4"
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = "1.0"
tokioV1 = { package = "tokio", version = "1", features = ["time"], optional = true }
tokioV02 = { package = "tokio", version = "0.2", features = ["time"], optional = true }
thiserror = "1.0"
url = "2.2"
fnv = "1"

[features]
default = ["tokio1"]
tokio1 = ["hyper14", "hyper14-rustls", "jsonrpsee-utils/hyper_14"]
tokio02 = ["hyper13", "hyper13-rustls", "jsonrpsee-utils/hyper_13"]
tokio1 = ["hyper14", "hyper14-rustls", "jsonrpsee-utils/hyper_14", "tokioV1" ]
tokio02 = ["hyper13", "hyper13-rustls", "jsonrpsee-utils/hyper_13", "tokioV02" ]

[dev-dependencies]
jsonrpsee-test-utils = { path = "../test-utils" }
tokio = { version = "1.0", features = ["net", "rt-multi-thread", "macros"] }
tokioV1 = { package = "tokio", version = "1", features = ["net", "rt-multi-thread", "macros"] }
47 changes: 31 additions & 16 deletions http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use async_trait::async_trait;
use fnv::FnvHashMap;
use serde::de::DeserializeOwned;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;

/// Http Client Builder.
#[derive(Debug)]
pub struct HttpClientBuilder {
max_request_body_size: u32,
request_timeout: Duration,
}

impl HttpClientBuilder {
Expand All @@ -25,17 +27,23 @@ impl HttpClientBuilder {
self
}

/// Set request timeout (default is 60 seconds).
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}

/// Build the HTTP client with target to connect to.
pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient, Error> {
let transport =
HttpTransportClient::new(target, self.max_request_body_size).map_err(|e| Error::Transport(Box::new(e)))?;
Ok(HttpClient { transport, request_id: AtomicU64::new(0) })
Ok(HttpClient { transport, request_id: AtomicU64::new(0), request_timeout: self.request_timeout })
}
}

impl Default for HttpClientBuilder {
fn default() -> Self {
Self { max_request_body_size: TEN_MB_SIZE_BYTES }
Self { max_request_body_size: TEN_MB_SIZE_BYTES, request_timeout: Duration::from_secs(60) }
}
}

Expand All @@ -46,16 +54,20 @@ pub struct HttpClient {
transport: HttpTransportClient,
/// Request ID that wraps around when overflowing.
request_id: AtomicU64,
/// Request timeout. Defaults to 60sec.
request_timeout: Duration,
}

#[async_trait]
impl Client for HttpClient {
async fn notification<'a>(&self, method: &'a str, params: JsonRpcParams<'a>) -> Result<(), Error> {
let notif = JsonRpcNotificationSer::new(method, params);
self.transport
.send(serde_json::to_string(&notif).map_err(Error::ParseError)?)
.await
.map_err(|e| Error::Transport(Box::new(e)))
let fut = self.transport.send(serde_json::to_string(&notif).map_err(Error::ParseError)?);
match crate::tokio::timeout(self.request_timeout, fut).await {
Ok(Ok(ok)) => Ok(ok),
Err(_) => Err(Error::RequestTimeout),
Ok(Err(e)) => Err(Error::Transport(Box::new(e))),
}
}

/// Perform a request towards the server.
Expand All @@ -67,11 +79,12 @@ impl Client for HttpClient {
let id = self.request_id.fetch_add(1, Ordering::SeqCst);
let request = JsonRpcCallSer::new(Id::Number(id), method, params);

let body = self
.transport
.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?)
.await
.map_err(|e| Error::Transport(Box::new(e)))?;
let fut = self.transport.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?);
let body = match crate::tokio::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => return Err(Error::RequestTimeout),
Ok(Err(e)) => return Err(Error::Transport(Box::new(e))),
};

let response: JsonRpcResponse<_> = match serde_json::from_slice(&body) {
Ok(response) => response,
Expand Down Expand Up @@ -106,11 +119,13 @@ impl Client for HttpClient {
request_set.insert(id, pos);
}

let body = self
.transport
.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?)
.await
.map_err(|e| Error::Transport(Box::new(e)))?;
let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);

let body = match crate::tokio::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => return Err(Error::RequestTimeout),
Ok(Err(e)) => return Err(Error::Transport(Box::new(e))),
};

let rps: Vec<JsonRpcResponse<_>> = match serde_json::from_slice(&body) {
Ok(response) => response,
Expand Down
13 changes: 13 additions & 0 deletions http-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ extern crate hyper13_rustls as hyper_rustls;
mod client;
mod transport;

#[cfg(all(feature = "tokio1", not(feature = "tokio02")))]
mod tokio {
pub(crate) use tokioV1::time::timeout;
#[cfg(test)]
pub(crate) use tokioV1::{runtime, test};
}

#[cfg(all(feature = "tokio02", not(feature = "tokio1")))]
mod tokio {
pub(crate) use tokioV02::time::timeout;
pub(crate) use tokioV02::time::Elapsed;
}

#[cfg(test)]
mod tests;

Expand Down
2 changes: 1 addition & 1 deletion http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::v2::{
error::{JsonRpcError, JsonRpcErrorCode, JsonRpcErrorObject},
params::JsonRpcParams,
};
use crate::{traits::Client, Error, HttpClientBuilder, JsonValue};
use crate::{tokio, traits::Client, Error, HttpClientBuilder, JsonValue};
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::Id;
use jsonrpsee_test_utils::TimeoutFutureExt;
Expand Down
1 change: 1 addition & 0 deletions http-client/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ where
#[cfg(test)]
mod tests {
use super::{Error, HttpTransportClient};
use crate::tokio;

#[test]
fn invalid_http_url_rejected() {
Expand Down
12 changes: 6 additions & 6 deletions ws-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ documentation = "https://docs.rs/jsonrpsee-ws-client"

[dependencies]
# Tokio v1 deps
tokioV1 = { package="tokio", version = "1", features = ["net", "time", "rt-multi-thread"], optional=true }
tokioV1-rustls = { package="tokio-rustls", version = "0.22", optional=true }
tokioV1-util = { package="tokio-util", version = "0.6", features = ["compat"], optional=true }
tokioV1 = { package="tokio", version = "1", features = ["net", "time", "rt-multi-thread", "macros"], optional = true }
tokioV1-rustls = { package="tokio-rustls", version = "0.22", optional = true }
tokioV1-util = { package="tokio-util", version = "0.6", features = ["compat"], optional = true }

# Tokio v0.2 deps
tokioV02 = { package="tokio", version = "0.2", features = ["net", "time", "rt-threaded", "sync"], optional=true }
tokioV02-rustls = { package="tokio-rustls", version = "0.15", optional=true }
tokioV02-util = { package="tokio-util", version = "0.3", features = ["compat"], optional=true }
tokioV02 = { package="tokio", version = "0.2", features = ["net", "time", "rt-threaded", "sync", "macros"], optional = true }
tokioV02-rustls = { package="tokio-rustls", version = "0.15", optional = true }
tokioV02-util = { package="tokio-util", version = "0.3", features = ["compat"], optional = true }

async-trait = "0.1"
fnv = "1"
Expand Down
54 changes: 29 additions & 25 deletions ws-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ use crate::v2::params::{Id, JsonRpcParams};
use crate::v2::request::{JsonRpcCallSer, JsonRpcNotification, JsonRpcNotificationSer};
use crate::v2::response::JsonRpcResponse;
use crate::TEN_MB_SIZE_BYTES;
use crate::{
helpers::call_with_timeout, manager::RequestManager, BatchMessage, Error, FrontToBack, RegisterNotificationMessage,
RequestMessage, Subscription, SubscriptionMessage,
};
use crate::{
helpers::{
build_unsubscribe_message, process_batch_response, process_error_response, process_notification,
process_single_response, process_subscription_response, stop_subscription,
},
transport::CertificateStore,
};
use crate::{
manager::RequestManager, BatchMessage, Error, FrontToBack, RegisterNotificationMessage, RequestMessage,
Subscription, SubscriptionMessage,
};
use async_trait::async_trait;
use futures::{
channel::{mpsc, oneshot},
Expand Down Expand Up @@ -102,8 +102,8 @@ pub struct WsClient {
/// If the background thread terminates the error is sent to this channel.
// NOTE(niklasad1): This is a Mutex to circumvent that the async fns takes immutable references.
error: Mutex<ErrorFromBack>,
/// Request timeout
request_timeout: Option<Duration>,
/// Request timeout. Defaults to 60sec.
request_timeout: Duration,
/// Request ID manager.
id_guard: RequestIdGuard,
}
Expand Down Expand Up @@ -174,7 +174,7 @@ impl RequestIdGuard {
pub struct WsClientBuilder<'a> {
certificate_store: CertificateStore,
max_request_body_size: u32,
request_timeout: Option<Duration>,
request_timeout: Duration,
connection_timeout: Duration,
origin_header: Option<Cow<'a, str>>,
max_concurrent_requests: usize,
Expand All @@ -186,7 +186,7 @@ impl<'a> Default for WsClientBuilder<'a> {
Self {
certificate_store: CertificateStore::Native,
max_request_body_size: TEN_MB_SIZE_BYTES,
request_timeout: None,
request_timeout: Duration::from_secs(60),
connection_timeout: Duration::from_secs(10),
origin_header: None,
max_concurrent_requests: 256,
Expand All @@ -208,9 +208,9 @@ impl<'a> WsClientBuilder<'a> {
self
}

/// Set request timeout.
/// Set request timeout (default is 60 seconds).
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = Some(timeout);
self.request_timeout = timeout;
self
}

Expand Down Expand Up @@ -313,7 +313,17 @@ impl Client for WsClient {
Error::ParseError(e)
})?;
log::trace!("[frontend]: send notification: {:?}", raw);
let res = self.to_back.clone().send(FrontToBack::Notification(raw)).await;

let mut sender = self.to_back.clone();
let fut = sender.send(FrontToBack::Notification(raw));

let timeout = crate::tokio::sleep(self.request_timeout);

let res = crate::tokio::select! {
x = fut => x,
_ = timeout => return Err(Error::RequestTimeout)
};

self.id_guard.reclaim_request_id();
match res {
Ok(()) => Ok(()),
Expand Down Expand Up @@ -344,19 +354,10 @@ impl Client for WsClient {
return Err(self.read_error_from_backend().await);
}

let send_back_rx_out = if let Some(duration) = self.request_timeout {
let timeout = crate::tokio::sleep(duration);
futures::pin_mut!(send_back_rx, timeout);
match future::select(send_back_rx, timeout).await {
future::Either::Left((send_back_rx_out, _)) => send_back_rx_out,
future::Either::Right((_, _)) => Ok(Err(Error::RequestTimeout)),
}
} else {
send_back_rx.await
};
let res = call_with_timeout(self.request_timeout, send_back_rx).await;

self.id_guard.reclaim_request_id();
let json_value = match send_back_rx_out {
let json_value = match res {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
Expand Down Expand Up @@ -393,7 +394,8 @@ impl Client for WsClient {
return Err(self.read_error_from_backend().await);
}

let res = send_back_rx.await;
let res = call_with_timeout(self.request_timeout, send_back_rx).await;

self.id_guard.reclaim_request_id();
let json_values = match res {
Ok(Ok(v)) => v,
Expand Down Expand Up @@ -453,7 +455,8 @@ impl SubscriptionClient for WsClient {
return Err(self.read_error_from_backend().await);
}

let res = send_back_rx.await;
let res = call_with_timeout(self.request_timeout, send_back_rx).await;

self.id_guard.reclaim_request_id();
let (notifs_rx, id) = match res {
Ok(Ok(val)) => val,
Expand Down Expand Up @@ -484,7 +487,8 @@ impl SubscriptionClient for WsClient {
return Err(self.read_error_from_backend().await);
}

let res = send_back_rx.await;
let res = call_with_timeout(self.request_timeout, send_back_rx).await;

let (notifs_rx, method) = match res {
Ok(Ok(val)) => val,
Ok(Err(err)) => return Err(err),
Expand Down
15 changes: 14 additions & 1 deletion ws-client/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::manager::{RequestManager, RequestStatus};
use crate::transport::Sender as WsSender;
use futures::channel::mpsc;
use futures::channel::{mpsc, oneshot};
use jsonrpsee_types::v2::params::{Id, JsonRpcParams, JsonRpcSubscriptionParams, SubscriptionId};
use jsonrpsee_types::v2::request::{JsonRpcCallSer, JsonRpcNotification};
use jsonrpsee_types::v2::response::JsonRpcResponse;
use jsonrpsee_types::{v2::error::JsonRpcError, Error, RequestMessage};
use serde_json::Value as JsonValue;
use std::time::Duration;

/// Attempts to process a batch response.
///
Expand Down Expand Up @@ -188,3 +189,15 @@ pub fn process_error_response(manager: &mut RequestManager, err: JsonRpcError) -
_ => Err(Error::InvalidRequestId),
}
}

/// Wait for a stream to complete within the given timeout.
pub async fn call_with_timeout<T>(
timeout: Duration,
rx: oneshot::Receiver<Result<T, Error>>,
) -> Result<Result<T, Error>, oneshot::Canceled> {
let timeout = crate::tokio::sleep(timeout);
crate::tokio::select! {
res = rx => res,
_ = timeout => Ok(Err(Error::RequestTimeout))
}
}
4 changes: 4 additions & 0 deletions ws-client/src/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ mod tokio_impl {
pub(crate) use tokioV1_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};

pub(crate) use tokioV1::time::sleep;

pub(crate) use tokioV1::select;
}

// Note that we check for `not(feature = "tokio1")` here, but not above.
Expand All @@ -50,4 +52,6 @@ mod tokio_impl {

// In 0.2 `tokio::time::sleep` had different name.
pub(crate) use tokioV02::time::delay_for as sleep;

pub(crate) use tokioV02::select;
}