Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(async client): refactor background task #1145

Merged
merged 24 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use jsonrpsee_core::client::{
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::traits::ToRpcParams;
use jsonrpsee_core::{Error, JsonRawValue, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::{ErrorObject, ResponseSuccess, TwoPointZero};
use jsonrpsee_types::{ErrorObject, InvalidRequestId, ResponseSuccess, TwoPointZero};
use serde::de::DeserializeOwned;
use tower::layer::util::Identity;
use tower::{Layer, Service};
Expand Down Expand Up @@ -320,7 +320,7 @@ where
if response.id == id {
Ok(result)
} else {
Err(Error::InvalidRequestId)
Err(InvalidRequestId::NotPendingRequest(response.id.to_string()).into())
}
}

Expand Down Expand Up @@ -363,7 +363,7 @@ where
}

for rp in json_rps {
let id = rp.id.try_parse_inner_as_number().ok_or(Error::InvalidRequestId)?;
let id = rp.id.try_parse_inner_as_number()?;

let res = match ResponseSuccess::try_from(rp) {
Ok(r) => {
Expand All @@ -385,7 +385,7 @@ where
if let Some(elem) = maybe_elem {
*elem = res;
} else {
return Err(Error::InvalidRequestId);
return Err(InvalidRequestId::NotPendingRequest(id.to_string()).into());
}
}

Expand Down
4 changes: 2 additions & 2 deletions client/http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn method_call_with_wrong_id_kind() {
let uri = format!("http://{server_addr}");
let client = HttpClientBuilder::default().id_format(IdKind::String).build(&uri).unwrap();
let res: Result<String, Error> = client.request("o", rpc_params![]).with_default_timeout().await.unwrap();
assert!(matches!(res, Err(Error::InvalidRequestId)));
assert!(matches!(res, Err(Error::InvalidRequestId(_))));
}

#[tokio::test]
Expand Down Expand Up @@ -96,7 +96,7 @@ async fn response_with_wrong_id() {
.await
.unwrap()
.unwrap_err();
assert!(matches!(err, Error::InvalidRequestId));
assert!(matches!(err, Error::InvalidRequestId(_)));
}

#[tokio::test]
Expand Down
4 changes: 3 additions & 1 deletion client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn method_call_with_wrong_id_kind() {
WsClientBuilder::default().id_format(IdKind::String).build(&uri).with_default_timeout().await.unwrap().unwrap();

let err: Result<String, Error> = client.request("o", rpc_params![]).with_default_timeout().await.unwrap();
assert!(matches!(err, Err(Error::RestartNeeded(e)) if e == "Invalid request ID"));
assert!(matches!(err, Err(Error::RestartNeeded(e)) if e == "request ID=0 is not a pending call"));
}

#[tokio::test]
Expand Down Expand Up @@ -191,6 +191,8 @@ async fn notification_handler_works() {

#[tokio::test]
async fn notification_without_polling_doesnt_make_client_unuseable() {
init_logger();

let server = WebSocketTestServer::with_hardcoded_notification(
"127.0.0.1:0".parse().unwrap(),
server_notification("test", "server originated notification".into()),
Expand Down
8 changes: 5 additions & 3 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ tokio = { version = "1.16", optional = true }
wasm-bindgen-futures = { version = "0.4.19", optional = true }
futures-timer = { version = "3", optional = true }
globset = { version = "0.4", optional = true }
tokio-stream = { version = "0.1", optional = true }

[features]
default = []
Expand All @@ -55,19 +54,22 @@ client = ["futures-util/sink", "tokio/sync"]
async-client = [
"async-lock",
"client",
"futures-util/alloc",
"rustc-hash",
"tokio/macros",
"tokio/rt",
"tokio-stream",
"tokio/time",
"futures-timer",
]
async-wasm-client = [
"async-lock",
"client",
"futures-util/alloc",
"wasm-bindgen-futures",
"rustc-hash/std",
"futures-timer/wasm-bindgen",
"tokio-stream",
"tokio/macros",
"tokio/time",
]

[dev-dependencies]
Expand Down
80 changes: 40 additions & 40 deletions core/src/client/async_client/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use tokio::sync::{mpsc, oneshot};

use jsonrpsee_types::response::SubscriptionError;
use jsonrpsee_types::{
ErrorObject, Id, Notification, RequestSer, Response, ResponseSuccess, SubscriptionId, SubscriptionResponse,
ErrorObject, Id, InvalidRequestId, Notification, RequestSer, Response, ResponseSuccess, SubscriptionId,
SubscriptionResponse,
};
use serde_json::Value as JsonValue;
use std::ops::Range;
Expand All @@ -63,7 +64,7 @@ pub(crate) fn process_batch_response(
Some(state) => state,
None => {
tracing::warn!("Received unknown batch response");
return Err(Error::InvalidRequestId);
return Err(InvalidRequestId::NotPendingRequest(format!("{:?}", range)).into());
}
};

Expand All @@ -79,7 +80,7 @@ pub(crate) fn process_batch_response(
if let Some(elem) = maybe_elem {
*elem = rp.result;
} else {
return Err(Error::InvalidRequestId);
return Err(InvalidRequestId::NotPendingRequest(rp.id.to_string()).into());
}
}

Expand All @@ -95,7 +96,7 @@ pub(crate) fn process_batch_response(
pub(crate) fn process_subscription_response(
manager: &mut RequestManager,
response: SubscriptionResponse<JsonValue>,
) -> Result<(), Option<RequestMessage>> {
) -> Result<(), Option<SubscriptionId<'static>>> {
let sub_id = response.params.subscription.into_owned();
let request_id = match manager.get_request_id_by_subscription_id(&sub_id) {
Some(request_id) => request_id,
Expand All @@ -110,9 +111,7 @@ pub(crate) fn process_subscription_response(
Ok(()) => Ok(()),
Err(err) => {
tracing::error!("Dropping subscription {:?} error: {:?}", sub_id, err);
let msg = build_unsubscribe_message(manager, request_id, sub_id)
.expect("request ID and subscription ID valid checked above; qed");
Err(Some(msg))
Err(Some(sub_id))
}
},
None => {
Expand All @@ -124,42 +123,42 @@ pub(crate) fn process_subscription_response(

/// Attempts to close a subscription when a [`SubscriptionError`] is received.
///
/// Returns `Ok(())` if the subscription was removed
/// Return `Err(e)` if the subscription was not found.
/// If the notification is not found it's just logged as a warning and the connection
/// will continue.
///
/// It's possible that the user closed down the subscription before the actual close response is received
pub(crate) fn process_subscription_close_response(
manager: &mut RequestManager,
response: SubscriptionError<JsonValue>,
) -> Result<(), Error> {
) {
let sub_id = response.params.subscription.into_owned();
let request_id = match manager.get_request_id_by_subscription_id(&sub_id) {
Some(request_id) => request_id,
match manager.get_request_id_by_subscription_id(&sub_id) {
Some(request_id) => {
manager.remove_subscription(request_id, sub_id).expect("Both request ID and sub ID in RequestManager; qed");
}
None => {
tracing::error!("The server tried to close an invalid subscription: {:?}", sub_id);
return Err(Error::InvalidSubscriptionId);
tracing::debug!("The server tried to close an non-pending subscription: {:?}", sub_id);
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
}
};

manager.remove_subscription(request_id, sub_id).expect("Both request ID and sub ID in RequestManager; qed");
Ok(())
}
}

/// Attempts to process an incoming notification
///
/// Returns Ok() if the response was successfully handled
/// Returns Err() if there was no handler for the method
pub(crate) fn process_notification(manager: &mut RequestManager, notif: Notification<JsonValue>) -> Result<(), Error> {
/// If the notification is not found it's just logged as a warning and the connection
/// will continue.
///
/// It's possible that user close down the subscription before this notification is received.
pub(crate) fn process_notification(manager: &mut RequestManager, notif: Notification<JsonValue>) {
match manager.as_notification_handler_mut(notif.method.to_string()) {
Some(send_back_sink) => match send_back_sink.try_send(notif.params) {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
Ok(()) => Ok(()),
Ok(()) => (),
Err(err) => {
tracing::error!("Error sending notification, dropping handler for {:?} error: {:?}", notif.method, err);
tracing::warn!("Could not send notification, dropping handler for {:?} error: {:?}", notif.method, err);
let _ = manager.remove_notification_handler(notif.method.into_owned());
Err(Error::Custom(err.to_string()))
}
},
None => {
tracing::error!("Notification: {:?} not a registered method", notif.method);
Err(Error::UnregisteredNotification(notif.method.into_owned()))
tracing::debug!("Notification: {:?} not a registered method", notif.method);
}
}
}
Expand All @@ -179,18 +178,19 @@ pub(crate) fn process_single_response(

match manager.request_status(&response_id) {
RequestStatus::PendingMethodCall => {
let send_back_oneshot = match manager.complete_pending_call(response_id) {
let send_back_oneshot = match manager.complete_pending_call(response_id.clone()) {
Some(Some(send)) => send,
Some(None) => return Ok(None),
None => return Err(Error::InvalidRequestId),
None => return Err(InvalidRequestId::NotPendingRequest(response_id.to_string()).into()),
};

let _ = send_back_oneshot.send(result);
Ok(None)
}
RequestStatus::PendingSubscription => {
let (unsub_id, send_back_oneshot, unsubscribe_method) =
manager.complete_pending_subscription(response_id.clone()).ok_or(Error::InvalidRequestId)?;
let (unsub_id, send_back_oneshot, unsubscribe_method) = manager
.complete_pending_subscription(response_id.clone())
.ok_or(InvalidRequestId::NotPendingRequest(response_id.to_string()))?;

let sub_id = result.map(|r| SubscriptionId::try_from(r).ok());

Expand Down Expand Up @@ -220,23 +220,23 @@ pub(crate) fn process_single_response(
Ok(None)
}
}
RequestStatus::Subscription | RequestStatus::Invalid => Err(Error::InvalidRequestId),

RequestStatus::Subscription | RequestStatus::Invalid => {
Err(InvalidRequestId::NotPendingRequest(response_id.to_string()).into())
}
}
}

/// Sends an unsubscribe to request to server to indicate
/// that the client is not interested in the subscription anymore.
//
// NOTE: we don't count this a concurrent request as it's part of a subscription.
pub(crate) async fn stop_subscription(
sender: &mut impl TransportSenderT,
manager: &mut RequestManager,
pub(crate) async fn stop_subscription<S: TransportSenderT>(
sender: &mut S,
unsub: RequestMessage,
) {
if let Err(e) = sender.send(unsub.raw).await {
tracing::error!("Send unsubscribe request failed: {:?}", e);
let _ = manager.complete_pending_call(unsub.id);
}
) -> Result<(), S::Error> {
sender.send(unsub.raw).await?;
Ok(())
}

/// Builds an unsubscription message.
Expand All @@ -245,7 +245,7 @@ pub(crate) fn build_unsubscribe_message(
sub_req_id: Id<'static>,
sub_id: SubscriptionId<'static>,
) -> Option<RequestMessage> {
let (unsub_req_id, _, unsub, sub_id) = manager.remove_subscription(sub_req_id, sub_id)?;
let (unsub_req_id, _, unsub, sub_id) = manager.unsubscribe(sub_req_id, sub_id)?;

let mut params = ArrayParams::new();
params.insert(sub_id).ok()?;
Expand Down
36 changes: 34 additions & 2 deletions core/src/client/async_client/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub(crate) struct RequestManager {

impl RequestManager {
/// Create a new `RequestManager`.
#[allow(unused)]
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) fn new() -> Self {
Self::default()
}
Expand Down Expand Up @@ -250,9 +251,9 @@ impl RequestManager {
}
}

/// Tries to remove a subscription.
/// Removes the subscription without waiting for the unsubscribe call.
///
/// Returns `Some` if the subscription was removed otherwise `None`.
/// Returns `Some` if the subscription was removed.
pub(crate) fn remove_subscription(
&mut self,
request_id: RequestId,
Expand All @@ -262,6 +263,7 @@ impl RequestManager {
(Entry::Occupied(request), Entry::Occupied(subscription))
if matches!(request.get(), Kind::Subscription(_)) =>
{
// Mark the request ID as pending unsubscription.
let (_req_id, kind) = request.remove_entry();
let (sub_id, _req_id) = subscription.remove_entry();
if let Kind::Subscription((unsub_req_id, send_back, unsub)) = kind {
Expand All @@ -274,6 +276,33 @@ impl RequestManager {
}
}

/// Initiates an unsubscribe which is not completed until the unsubscribe call
/// has been acknowledged.
///
/// Returns `Some` if the subscription was unsubscribed.
pub(crate) fn unsubscribe(
&mut self,
request_id: RequestId,
subscription_id: SubscriptionId<'static>,
) -> Option<(RequestId, SubscriptionSink, UnsubscribeMethod, SubscriptionId)> {
match (self.requests.entry(request_id), self.subscriptions.entry(subscription_id)) {
(Entry::Occupied(mut request), Entry::Occupied(subscription))
if matches!(request.get(), Kind::Subscription(_)) =>
{
// Mark the request ID as "pending unsubscription" which will be resolved once the
// unsubscribe call has been acknowledged.
let kind = std::mem::replace(request.get_mut(), Kind::PendingMethodCall(None));
let (sub_id, _req_id) = subscription.remove_entry();
if let Kind::Subscription((unsub_req_id, send_back, unsub)) = kind {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
Some((unsub_req_id, send_back, unsub, sub_id))
} else {
unreachable!("Subscription is Subscription checked above; qed");
}
}
_ => None,
}
}

/// Returns the status of a request ID
pub(crate) fn request_status(&mut self, id: &RequestId) -> RequestStatus {
self.requests.get(id).map_or(RequestStatus::Invalid, |kind| match kind {
Expand Down Expand Up @@ -473,5 +502,8 @@ mod tests {
assert!(manager.complete_pending_subscription(Id::Number(3)).is_none());
assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(1)).is_none());
assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(0)).is_some());

assert!(manager.requests.is_empty());
assert!(manager.subscriptions.is_empty());
}
}
Loading