Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc module: refactor calls/subs without a server #591

Merged
merged 13 commits into from
Dec 10, 2021
43 changes: 18 additions & 25 deletions tests/tests/proc_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use jsonrpsee::{
ws_server::WsServerBuilder,
};

use serde_json::value::RawValue;
use serde_json::{value::RawValue, json};

mod rpc_impl {
use jsonrpsee::{
Expand Down Expand Up @@ -231,57 +231,52 @@ async fn proc_macros_generic_ws_client_api() {
async fn macro_param_parsing() {
let module = RpcServerImpl.into_rpc();

let params = RawValue::from_string(r#"[42, "Hello"]"#.into()).ok();
let result = module.call("foo_params", params).await.unwrap();
let res: String = module.call("foo_params", [json!(42_u64), json!("Hello")]).await.unwrap();

assert_eq!(result, r#"{"jsonrpc":"2.0","result":"Called with: 42, Hello","id":0}"#);
assert_eq!(&res, "Called with: 42, Hello");
}

#[tokio::test]
async fn macro_optional_param_parsing() {
let module = RpcServerImpl.into_rpc();

// Optional param omitted at tail
let params = RawValue::from_string(r#"[42, 70]"#.into()).ok();
let result = module.call("foo_optional_params", params).await.unwrap();

assert_eq!(result, r#"{"jsonrpc":"2.0","result":"Called with: 42, Some(70), None","id":0}"#);
let res: String = module.call("foo_optional_params", [42_u64, 70]).await.unwrap();
assert_eq!(&res, "Called with: 42, Some(70), None");

// Optional param using `null`
let params = RawValue::from_string(r#"[42, null, 70]"#.into()).ok();
let result = module.call("foo_optional_params", params).await.unwrap();
let res: String = module.call("foo_optional_params", [json!(42_u64), json!(null), json!(70_u64)]).await.unwrap();

assert_eq!(result, r#"{"jsonrpc":"2.0","result":"Called with: 42, None, Some(70)","id":0}"#);
assert_eq!(&res, "Called with: 42, None, Some(70)");

// Named params using a map
let params = RawValue::from_string(r#"{"a": 22, "c": 50}"#.into()).ok();
let result = module.call("foo_optional_params", params).await.unwrap();
let params = RawValue::from_string(r#"{"a": 22, "c": 50}"#.into()).unwrap();
let (result, _, _) = module.raw_call("foo_optional_params", params).await;
assert_eq!(result, r#"{"jsonrpc":"2.0","result":"Called with: 22, None, Some(50)","id":0}"#);
}

#[tokio::test]
async fn macro_lifetimes_parsing() {
let module = RpcServerImpl.into_rpc();

let params = RawValue::from_string(r#"["foo", "bar", "baz", "qux"]"#.into()).ok();
let result = module.call("foo_lifetimes", params).await.unwrap();
let res: String = module.call("foo_lifetimes", ["foo", "bar", "baz", "qux"]).await.unwrap();

assert_eq!(result, r#"{"jsonrpc":"2.0","result":"Called with: foo, bar, baz, Some(\"qux\")","id":0}"#);
assert_eq!(&res, "Called with: foo, bar, baz, Some(\"qux\")");
}

#[tokio::test]
async fn macro_zero_copy_cow() {
let module = RpcServerImpl.into_rpc();

let params = RawValue::from_string(r#"["foo", "bar"]"#.into()).ok();
let result = module.call("foo_zero_copy_cow", params).await.unwrap();
let params = RawValue::from_string(r#"["foo", "bar"]"#.into()).unwrap();
let (result, _, _) = module.raw_call("foo_zero_copy_cow", params).await;

// std::borrow::Cow<str> always deserialized to owned variant here
assert_eq!(result, r#"{"jsonrpc":"2.0","result":"Zero copy params: false, true","id":0}"#);

// serde_json will have to allocate a new string to replace `\t` with byte 0x09 (tab)
let params = RawValue::from_string(r#"["\tfoo", "\tbar"]"#.into()).ok();
let result = module.call("foo_zero_copy_cow", params).await.unwrap();
let params = RawValue::from_string(r#"["\tfoo", "\tbar"]"#.into()).unwrap();
let (result, _, _) = module.raw_call("foo_zero_copy_cow", params).await;

assert_eq!(result, r#"{"jsonrpc":"2.0","result":"Zero copy params: false, false","id":0}"#);
}
Expand All @@ -291,19 +286,17 @@ async fn macro_zero_copy_cow() {
#[tokio::test]
async fn multiple_blocking_calls_overlap() {
use std::time::{Duration, Instant};
use jsonrpsee::types::EmptyParams;

let module = RpcServerImpl.into_rpc();

let params = RawValue::from_string("[]".into()).ok();

let futures = std::iter::repeat_with(|| module.call("foo_blocking_call", params.clone())).take(4);
let futures = std::iter::repeat_with(|| module.call::<_, u64>("foo_blocking_call", EmptyParams::new())).take(4);
let now = Instant::now();
let results = futures::future::join_all(futures).await;
let elapsed = now.elapsed();

for result in results {
let result = serde_json::from_str::<serde_json::Value>(&result.unwrap()).unwrap();
assert_eq!(result["result"], 42);
assert_eq!(result.unwrap(), 42);
}

// Each request takes 50ms, added 10ms margin for scheduling
Expand Down
107 changes: 44 additions & 63 deletions utils/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ pub type AsyncMethod<'a> =
pub type ConnectionId = usize;
/// Subscription ID.
pub type SubscriptionId = u64;
/// Raw RPC response.
pub type RawRpcResponse = (String, mpsc::UnboundedReceiver<String>, mpsc::UnboundedSender<String>);


type Subscribers = Arc<Mutex<FxHashMap<SubscriptionKey, (MethodSink, oneshot::Receiver<()>)>>>;

Expand Down Expand Up @@ -349,34 +352,24 @@ impl Methods {
/// Helper to call a method on the `RPC module` without having to spin up a server.
///
/// The params must be serializable as JSON array, see [`ToRpcParams`] for further documentation.
pub async fn call_with<Params: ToRpcParams>(&self, method: &str, params: Params) -> Option<String> {
let params = params.to_rpc_params().ok();
self.call(method, params).await
}

/// Helper alternative to `execute`, useful for writing unit tests without having to spin
/// a server up.
pub async fn call(&self, method: &str, params: Option<Box<RawValue>>) -> Option<String> {
let req = Request::new(method.into(), params.as_deref(), Id::Number(0));

let (tx, mut rx) = mpsc::unbounded();
let sink = MethodSink::new(tx);

if let MethodResult::Async(fut) = self.execute(&sink, req, 0) {
fut.await;
pub async fn call<Params: ToRpcParams, T: DeserializeOwned>(&self, method: &str, params: Params) -> Result<T, String> {
let params = params.to_rpc_params().map_err(|e| e.to_string())?;
let (resp, _, _) = self.raw_call(method, params).await;
if let Ok(res) = serde_json::from_str::<Response<T>>(&resp) {
return Ok(res.result);
}

rx.next().await
Err(resp)
}

/// Perform a "in memory JSON-RPC method call" and receive further subscriptions.
/// This is useful if you want to support both `method calls` and `subscriptions`
/// in the same API.
///
///
/// There are better variants than this method if you only want
/// method calls or only subscriptions.
///
/// See [`Methods::test_subscription`] and [`Methods::call_with`] for
/// See [`Methods::test_subscription`] and [`Methods::call`] for
/// for further documentation.
///
/// Returns a response to the actual method call and a stream to process
Expand All @@ -388,7 +381,8 @@ impl Methods {
/// use jsonrpsee::RpcModule;
/// use jsonrpsee::types::{
/// EmptyParams,
/// v2::{Response, SubscriptionResponse}
/// v2::{Response, SubscriptionResponse},
/// traits::ToRpcParams,
/// };
/// use futures_util::StreamExt;
///
Expand All @@ -397,48 +391,37 @@ impl Methods {
/// sink.send(&"one answer").unwrap();
/// Ok(())
/// }).unwrap();
/// let (resp, mut stream) = module.call_and_subscribe("hi", EmptyParams::new()).await.unwrap();
/// let (resp, mut stream, _) = module.raw_call("hi", EmptyParams::new().to_rpc_params().unwrap()).await.unwrap();
/// assert!(serde_json::from_str::<Response<u64>>(&resp).is_ok());
/// let raw_sub_resp = stream.next().await.unwrap();
/// let sub_resp: SubscriptionResponse<String> = serde_json::from_str(&raw_sub_resp).unwrap();
/// assert_eq!(&sub_resp.params.result, "one answer");
/// }
/// ```
pub async fn call_and_subscribe<Params: ToRpcParams>(
&self,
method: &str,
params: Params,
) -> Option<(String, mpsc::UnboundedReceiver<String>)> {
pub async fn raw_call(&self, method: &str, params: Box<RawValue>) -> RawRpcResponse {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
let req = Request::new(method.into(),Some(&params), Id::Number(0));

let (tx, mut rx) = mpsc::unbounded();
let params = params.to_rpc_params().ok();
let req = Request::new(method.into(), params.as_deref(), Id::Number(0));
let sink = MethodSink::new(tx);
let sink = MethodSink::new(tx.clone());

if let MethodResult::Async(fut) = self.execute(&sink, req, 0) {
fut.await;
}

rx.next().await.map(|r| (r, rx))
let resp = rx.next().await.expect("tx and rx still alive; qed");
(resp, rx, tx)
}

/// Test helper that sets up a subscription using the given `method`. Returns a tuple of the
/// [`SubscriptionId`] and a channel on which subscription JSON payloads can be received.
pub async fn test_subscription(&self, method: &str, params: impl ToRpcParams) -> TestSubscription {
pub async fn test_subscription(&self, sub_method: &str, params: impl ToRpcParams) -> TestSubscription {
let params = params.to_rpc_params().expect("valid JSON-RPC params");
tracing::trace!("[Methods::test_subscription] Calling subscription method: {:?}, params: {:?}", method, params);
let req = Request::new(method.into(), Some(&params), Id::Number(0));

let (tx, mut rx) = mpsc::unbounded();
let sink = MethodSink::new(tx.clone());

if let MethodResult::Async(fut) = self.execute(&sink, req, 0) {
fut.await;
}
let response = rx.next().await.expect("Could not establish subscription.");
tracing::trace!("[Methods::test_subscription] Calling subscription method: {:?}, params: {:?}", sub_method, params);
let (response, rx, tx) = self.raw_call(sub_method, params).await;
let subscription_response = serde_json::from_str::<Response<SubscriptionId>>(&response)
.unwrap_or_else(|_| panic!("Could not deserialize subscription response {:?}", response));
let sub_id = subscription_response.result;
TestSubscription { tx, rx, sub_id }
TestSubscription { sub_id, rx, tx }
}

/// Returns an `Iterator` with all the method names registered on this server.
Expand Down Expand Up @@ -811,7 +794,7 @@ pub struct TestSubscription {
}

impl TestSubscription {
/// Close the subscription channel.
/// Close the subscription channel by doing a unsubscribe call.
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
pub fn close(&mut self) {
self.tx.close_channel();
}
Expand Down Expand Up @@ -844,7 +827,7 @@ impl Drop for TestSubscription {
#[cfg(test)]
mod tests {
use super::*;
use jsonrpsee_types::v2;
use jsonrpsee_types::{v2, EmptyParams};
use serde::Deserialize;
use std::collections::HashMap;

Expand Down Expand Up @@ -889,8 +872,8 @@ mod tests {
let mut module = RpcModule::new(());
module.register_method("boo", |_: Params, _| Ok(String::from("boo!"))).unwrap();

let result = module.call("boo", None).await.unwrap();
assert_eq!(result, r#"{"jsonrpc":"2.0","result":"boo!","id":0}"#);
let res: String = module.call("boo", EmptyParams::new()).await.unwrap();
assert_eq!(&res, "boo!");

// Call sync method with params
module
Expand All @@ -899,11 +882,12 @@ mod tests {
Ok(n * 2)
})
.unwrap();
let result = module.call_with("foo", [3]).await.unwrap();
assert_eq!(result, r#"{"jsonrpc":"2.0","result":6,"id":0}"#);
let res: u64 = module.call("foo", [3_u64]).await.unwrap();
assert_eq!(res, 6);

// Call sync method with bad param
let result = module.call_with("foo", (false,)).await.unwrap();
let params = (false,).to_rpc_params().unwrap();
let (result, _, _) = module.raw_call("foo", params).await;
assert_eq!(
result,
r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"invalid type: boolean `false`, expected u16 at line 1 column 6"},"id":0}"#
Expand All @@ -923,8 +907,8 @@ mod tests {
async move { Ok(ctx.roo(ns)) }
})
.unwrap();
let result = module.call_with("roo", vec![12, 13]).await.unwrap();
assert_eq!(result, r#"{"jsonrpc":"2.0","result":25,"id":0}"#);
let res: u64 = module.call("roo", [12, 13]).await.unwrap();
assert_eq!(res, 25);
}

#[tokio::test]
Expand Down Expand Up @@ -977,27 +961,24 @@ mod tests {
let module = CoolServerImpl.into_rpc();

// Call sync method with no params
let result = module.call("rebel_without_cause", None).await.unwrap();
assert_eq!(result, r#"{"jsonrpc":"2.0","result":false,"id":0}"#);

// Call sync method with no params, alternative way.
let result = module.call_with::<[u8; 0]>("rebel_without_cause", []).await.unwrap();
assert_eq!(result, r#"{"jsonrpc":"2.0","result":false,"id":0}"#);
let res: bool = module.call("rebel_without_cause", EmptyParams::new()).await.unwrap();
assert_eq!(res, false);
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved

// Call sync method with params
let result = module.call_with("rebel", (Gun { shoots: true }, HashMap::<u8, u8>::default())).await.unwrap();
assert_eq!(result, r#"{"jsonrpc":"2.0","result":"0 Gun { shoots: true }","id":0}"#);
let res: String = module.call("rebel", (Gun { shoots: true }, HashMap::<u8, u8>::default())).await.unwrap();
assert_eq!(&res, "0 Gun { shoots: true }");

// Call sync method with bad params
let result = module.call_with("rebel", (Gun { shoots: true }, false)).await.unwrap();
let params = (Gun { shoots: true }, false).to_rpc_params().unwrap();
let (result, _, _) = module.raw_call("rebel", params).await;
assert_eq!(
result,
r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"invalid type: boolean `false`, expected a map at line 1 column 5"},"id":0}"#
);

// Call async method with params and context
let result = module.call_with("revolution", (Beverage { ice: true }, vec![1, 2, 3])).await.unwrap();
assert_eq!(result, r#"{"jsonrpc":"2.0","result":"drink: Beverage { ice: true }, phases: [1, 2, 3]","id":0}"#);
let result: String = module.call("revolution", (Beverage { ice: true }, vec![1, 2, 3])).await.unwrap();
assert_eq!(&result, "drink: Beverage { ice: true }, phases: [1, 2, 3]");
}

#[tokio::test]
Expand All @@ -1021,7 +1002,7 @@ mod tests {
})
.unwrap();

let mut my_sub: TestSubscription = module.test_subscription("my_sub", Vec::<()>::new()).await;
let mut my_sub: TestSubscription = module.test_subscription("my_sub", EmptyParams::new()).await;
for i in (0..=2).rev() {
let (val, id) = my_sub.next::<char>().await.unwrap();
assert_eq!(val, std::char::from_digit(i, 10).unwrap());
Expand Down Expand Up @@ -1049,7 +1030,7 @@ mod tests {
})
.unwrap();

let mut my_sub: TestSubscription = module.test_subscription("my_sub", Vec::<()>::new()).await;
let mut my_sub: TestSubscription = module.test_subscription("my_sub", EmptyParams::new()).await;
let (val, id) = my_sub.next::<String>().await.unwrap();
assert_eq!(&val, "lo");
assert_eq!(id, v2::params::SubscriptionId::Num(my_sub.subscription_id()));
Expand Down