From 763a031b0a35612a7b9181ac6f13d3bb87c46dee Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 30 Mar 2022 11:30:17 +0200 Subject: [PATCH 1/7] refactor: make `pipe_from_stream` take stream of result The rationale for this is that it is more flexible for use cases when `Stream>`. Take for example `tokio_stream::Broadcast` then one would have to something like: ```rust let stream = BroadcastStream::new(rx).take_while(|r| future::ready(r.is_ok())).filter_map(|r| future::ready(r.ok())); ``` Of course it's a bit awkward to return `Result` when the underlying stream can't fail but I think that's fair trade-off here. --- core/src/server/rpc_module.rs | 15 ++++++++++----- tests/tests/integration_tests.rs | 5 +++-- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 7f5f62875e..08cf406206 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -764,25 +764,27 @@ impl SubscriptionSink { /// when items gets produced by the stream. /// /// Returns `Ok(())` if the stream or connection was terminated. - /// Returns `Err(_)` if one of the items couldn't be serialized. + /// Returns `Err(_)` if the underlying stream return an error or if an item from the stream could not be serialized. /// /// # Examples /// /// ```no_run /// /// use jsonrpsee_core::server::rpc_module::RpcModule; + /// use anyhow::anyhow; /// /// let mut m = RpcModule::new(()); /// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| { - /// let stream = futures_util::stream::iter(vec![1_u32, 2, 3]); + /// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Ok(3), Err(Box::new(anyhow!("error on the stream"))]); /// tokio::spawn(sink.pipe_from_stream(stream)); /// Ok(()) /// }); /// ``` - pub async fn pipe_from_stream(mut self, mut stream: S) -> Result<(), Error> + pub async fn pipe_from_stream(mut self, mut stream: S) -> Result<(), Error> where - S: Stream + Unpin, + S: Stream> + Unpin, T: Serialize, + E: std::error::Error, { if let Some(close_notify) = self.close_notify.clone() { let mut stream_item = stream.next(); @@ -791,7 +793,7 @@ impl SubscriptionSink { loop { match futures_util::future::select(stream_item, closed_fut).await { // The app sent us a value to send back to the subscribers - Either::Left((Some(result), next_closed_fut)) => { + Either::Left((Some(Ok(result)), next_closed_fut)) => { match self.send(&result) { Ok(_) => (), Err(Error::SubscriptionClosed(close_reason)) => { @@ -805,6 +807,9 @@ impl SubscriptionSink { stream_item = stream.next(); closed_fut = next_closed_fut; } + Either::Left((Some(Err(e)), _)) => { + break Err(Error::Custom(e.to_string())); + } // Stream terminated. Either::Left((None, _)) => break Ok(()), // The subscriber went away without telling us. diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 3eec2ba10b..1c8e4373bf 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -399,7 +399,7 @@ async fn ws_server_cancels_stream_after_reset_conn() { module .register_subscription("subscribe_never_produce", "n", "unsubscribe_never_produce", |_, sink, mut tx| { // create stream that doesn't produce items. - let stream = futures::stream::empty::(); + let stream = futures::stream::empty::().map(|i| Ok::<_, Error>(i)); tokio::spawn(async move { sink.pipe_from_stream(stream).await.unwrap(); let send_back = Arc::make_mut(&mut tx); @@ -437,7 +437,8 @@ async fn ws_server_subscribe_with_stream() { .register_subscription("subscribe_5_ints", "n", "unsubscribe_5_ints", |_, sink, _| { tokio::spawn(async move { let interval = interval(Duration::from_millis(50)); - let stream = IntervalStream::new(interval).zip(futures::stream::iter(1..=5)).map(|(_, c)| c); + let stream = + IntervalStream::new(interval).zip(futures::stream::iter(1..=5)).map(|(_, c)| Ok::<_, Error>(c)); sink.pipe_from_stream(stream).await.unwrap(); }); From d0a5b0f735827844f7c2c8a4e5f08cbd13ab24ff Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 30 Mar 2022 15:58:01 +0200 Subject: [PATCH 2/7] Update core/src/server/rpc_module.rs Co-authored-by: Tarik Gul <47201679+TarikGul@users.noreply.github.com> --- core/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 08cf406206..085d810eaf 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -764,7 +764,7 @@ impl SubscriptionSink { /// when items gets produced by the stream. /// /// Returns `Ok(())` if the stream or connection was terminated. - /// Returns `Err(_)` if the underlying stream return an error or if an item from the stream could not be serialized. + /// Returns `Err(_)` if the underlying stream returns an error or if an item from the stream could not be serialized. /// /// # Examples /// From 51dd154b06d81176f6b5e2c996f2b13015fd78b8 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 30 Mar 2022 17:37:43 +0200 Subject: [PATCH 3/7] pipe_from_stream: make E: Display instead of StdError --- core/src/server/rpc_module.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 08cf406206..8ab16e134b 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -775,7 +775,7 @@ impl SubscriptionSink { /// /// let mut m = RpcModule::new(()); /// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| { - /// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Ok(3), Err(Box::new(anyhow!("error on the stream"))]); + /// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Ok(3), Err("error on the stream")]); /// tokio::spawn(sink.pipe_from_stream(stream)); /// Ok(()) /// }); @@ -784,7 +784,7 @@ impl SubscriptionSink { where S: Stream> + Unpin, T: Serialize, - E: std::error::Error, + E: std::fmt::Display, { if let Some(close_notify) = self.close_notify.clone() { let mut stream_item = stream.next(); @@ -808,7 +808,7 @@ impl SubscriptionSink { closed_fut = next_closed_fut; } Either::Left((Some(Err(e)), _)) => { - break Err(Error::Custom(e.to_string())); + break Err(Error::SubscriptionClosed(SubscriptionClosedReason::Server(e.to_string()).into())); } // Stream terminated. Either::Left((None, _)) => break Ok(()), From 79cd97d9c2fb7c52598933e04fdc84731168ff67 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 30 Mar 2022 19:29:01 +0200 Subject: [PATCH 4/7] add a test --- core/src/error.rs | 2 +- core/src/server/rpc_module.rs | 10 ++++++--- proc-macros/src/lib.rs | 2 +- tests/tests/integration_tests.rs | 38 ++++++++++++++++++++++++++++++++ 4 files changed, 47 insertions(+), 5 deletions(-) diff --git a/core/src/error.rs b/core/src/error.rs index eab468ecd9..4459dfba46 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -179,7 +179,7 @@ impl SubscriptionClosed { /// A type to represent when a subscription gets closed /// by either the server or client side. -#[derive(Deserialize, Serialize, Debug, PartialEq)] +#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] pub enum SubscriptionClosedReason { /// The subscription was closed by calling the unsubscribe method. Unsubscribed, diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 7cc7426301..d10bd633ae 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -764,7 +764,7 @@ impl SubscriptionSink { /// when items gets produced by the stream. /// /// Returns `Ok(())` if the stream or connection was terminated. - /// Returns `Err(_)` if the underlying stream returns an error or if an item from the stream could not be serialized. + /// Returns `Err(_)` immediately if the underlying stream returns an error or if an item from the stream could not be serialized. /// /// # Examples /// @@ -775,7 +775,9 @@ impl SubscriptionSink { /// /// let mut m = RpcModule::new(()); /// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| { - /// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Ok(3), Err("error on the stream")]); + /// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]); + /// // This will return send `[Ok(1_u32), Ok(2_u32)]` to the subscriber + /// // because after the `Err(_)` the stream is terminated. /// tokio::spawn(sink.pipe_from_stream(stream)); /// Ok(()) /// }); @@ -808,7 +810,9 @@ impl SubscriptionSink { closed_fut = next_closed_fut; } Either::Left((Some(Err(e)), _)) => { - break Err(Error::SubscriptionClosed(SubscriptionClosedReason::Server(e.to_string()).into())); + let close_reason = SubscriptionClosedReason::Server(e.to_string()).into(); + self.close(&close_reason); + break Err(Error::SubscriptionClosed(close_reason)); } // Stream terminated. Either::Left((None, _)) => break Ok(()), diff --git a/proc-macros/src/lib.rs b/proc-macros/src/lib.rs index 2a6d277975..5aec7cf16e 100644 --- a/proc-macros/src/lib.rs +++ b/proc-macros/src/lib.rs @@ -288,7 +288,7 @@ pub(crate) mod visitor; /// // as subscription responses. /// fn sub_override_notif_method(&self, mut sink: SubscriptionSink) -> RpcResult<()> { /// tokio::spawn(async move { -/// let stream = futures_util::stream::iter(["one", "two", "three"]); +/// let stream = futures_util::stream::iter([Ok::<_, &str>("one"), Ok("two"), Ok("three")]); /// sink.pipe_from_stream(stream).await; /// }); /// diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 1c8e4373bf..2eaf5d90ed 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -423,6 +423,44 @@ async fn ws_server_cancels_stream_after_reset_conn() { assert_eq!(Some(()), rx.next().await, "subscription stream should be terminated after the client was dropped"); } +#[tokio::test] +async fn ws_server_cancels_sub_stream_after_err() { + use jsonrpsee::{ws_server::WsServerBuilder, RpcModule}; + + let err: &'static str = "error on the stream"; + let server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap(); + let server_url = format!("ws://{}", server.local_addr().unwrap()); + + let mut module = RpcModule::new(()); + + module + .register_subscription( + "subscribe_with_err_on_stream", + "n", + "unsubscribe_with_err_on_stream", + move |_, sink, _| { + // create stream that produce an error which will cancel the subscription. + let stream = futures::stream::iter(vec![Ok(1_u32), Err(err), Ok(2), Ok(3)]); + tokio::spawn(async move { + let _ = sink.pipe_from_stream(stream).await; + }); + Ok(()) + }, + ) + .unwrap(); + + server.start(module).unwrap(); + + let client = WsClientBuilder::default().build(&server_url).await.unwrap(); + let mut sub: Subscription = + client.subscribe("subscribe_with_err_on_stream", None, "unsubscribe_with_err_on_stream").await.unwrap(); + + assert_eq!(sub.next().await.unwrap().unwrap(), 1); + let exp = SubscriptionClosed::new(SubscriptionClosedReason::Server(err.to_string())); + // The server closed down the subscription with the underlying error from the stream. + assert!(matches!(sub.next().await, Some(Err(Error::SubscriptionClosed(close_reason))) if close_reason == exp)); +} + #[tokio::test] async fn ws_server_subscribe_with_stream() { use futures::StreamExt; From 72bfee09315ff363163c04ec84f1be166c3a28eb Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 1 Apr 2022 10:45:57 +0200 Subject: [PATCH 5/7] add `pipe_from_try_stream` API to support `TryStream` --- core/src/server/rpc_module.rs | 48 +++++++++++++++++++++++++------- tests/tests/integration_tests.rs | 8 +++--- 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index d10bd633ae..df916c5c3b 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -38,7 +38,7 @@ use crate::traits::{IdProvider, ToRpcParams}; use futures_channel::{mpsc, oneshot}; use futures_util::future::Either; use futures_util::pin_mut; -use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt}; +use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt, TryStream, TryStreamExt}; use jsonrpsee_types::error::{ErrorCode, CALL_EXECUTION_FAILED_CODE}; use jsonrpsee_types::{ Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload, SubscriptionResponse, @@ -762,6 +762,7 @@ impl SubscriptionSink { /// Consumes the `SubscriptionSink` and reads data from the `stream` and sends back data on the subscription /// when items gets produced by the stream. + /// The underlying stream must produce `Result values, see [`futures_util::TryStream`] for further information. /// /// Returns `Ok(())` if the stream or connection was terminated. /// Returns `Err(_)` immediately if the underlying stream returns an error or if an item from the stream could not be serialized. @@ -776,26 +777,26 @@ impl SubscriptionSink { /// let mut m = RpcModule::new(()); /// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| { /// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]); - /// // This will return send `[Ok(1_u32), Ok(2_u32)]` to the subscriber + /// // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber /// // because after the `Err(_)` the stream is terminated. - /// tokio::spawn(sink.pipe_from_stream(stream)); + /// tokio::spawn(sink.pipe_from_try_stream(stream)); /// Ok(()) /// }); /// ``` - pub async fn pipe_from_stream(mut self, mut stream: S) -> Result<(), Error> + pub async fn pipe_from_try_stream(mut self, mut stream: S) -> Result<(), Error> where - S: Stream> + Unpin, + S: TryStream + Unpin, T: Serialize, E: std::fmt::Display, { if let Some(close_notify) = self.close_notify.clone() { - let mut stream_item = stream.next(); + let mut stream_item = stream.try_next(); let closed_fut = close_notify.notified(); pin_mut!(closed_fut); loop { match futures_util::future::select(stream_item, closed_fut).await { // The app sent us a value to send back to the subscribers - Either::Left((Some(Ok(result)), next_closed_fut)) => { + Either::Left((Ok(Some(result)), next_closed_fut)) => { match self.send(&result) { Ok(_) => (), Err(Error::SubscriptionClosed(close_reason)) => { @@ -806,16 +807,16 @@ impl SubscriptionSink { break Err(err); } }; - stream_item = stream.next(); + stream_item = stream.try_next(); closed_fut = next_closed_fut; } - Either::Left((Some(Err(e)), _)) => { + Either::Left((Err(e), _)) => { let close_reason = SubscriptionClosedReason::Server(e.to_string()).into(); self.close(&close_reason); break Err(Error::SubscriptionClosed(close_reason)); } // Stream terminated. - Either::Left((None, _)) => break Ok(()), + Either::Left((Ok(None), _)) => break Ok(()), // The subscriber went away without telling us. Either::Right(((), _)) => { self.close(&SubscriptionClosed::new(SubscriptionClosedReason::ConnectionReset)); @@ -829,6 +830,33 @@ impl SubscriptionSink { } } + /// Similar to [`SubscriptionSink::pipe_from_try_stream`] but it doesn't require the stream return `Result`. + /// + /// Warning: it's possible to pass in a stream that returns `Result` if `Result: Serialize` is satisfied + /// but it won't cancel the stream when an error occurs. If you want the stream to be canceled when an + /// error occurs use [`SubscriptionSink::pipe_from_try_stream`] instead. + /// + /// # Examples + /// + /// ```no_run + /// + /// use jsonrpsee_core::server::rpc_module::RpcModule; + /// + /// let mut m = RpcModule::new(()); + /// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| { + /// let stream = futures_util::stream::iter(vec![1, 2, 3]); + /// tokio::spawn(sink.pipe_from_stream(stream)); + /// Ok(()) + /// }); + /// ``` + pub async fn pipe_from_stream(self, stream: S) -> Result<(), Error> + where + S: Stream + Unpin, + T: Serialize, + { + self.pipe_from_try_stream::<_, _, Error>(stream.map(|item| Ok(item))).await + } + /// Returns whether this channel is closed without needing a context. pub fn is_closed(&self) -> bool { self.inner.is_closed() || self.close_notify.is_none() diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 2eaf5d90ed..1fe6ed084c 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -399,7 +399,7 @@ async fn ws_server_cancels_stream_after_reset_conn() { module .register_subscription("subscribe_never_produce", "n", "unsubscribe_never_produce", |_, sink, mut tx| { // create stream that doesn't produce items. - let stream = futures::stream::empty::().map(|i| Ok::<_, Error>(i)); + let stream = futures::stream::empty::(); tokio::spawn(async move { sink.pipe_from_stream(stream).await.unwrap(); let send_back = Arc::make_mut(&mut tx); @@ -442,7 +442,7 @@ async fn ws_server_cancels_sub_stream_after_err() { // create stream that produce an error which will cancel the subscription. let stream = futures::stream::iter(vec![Ok(1_u32), Err(err), Ok(2), Ok(3)]); tokio::spawn(async move { - let _ = sink.pipe_from_stream(stream).await; + let _ = sink.pipe_from_try_stream(stream).await; }); Ok(()) }, @@ -459,6 +459,7 @@ async fn ws_server_cancels_sub_stream_after_err() { let exp = SubscriptionClosed::new(SubscriptionClosedReason::Server(err.to_string())); // The server closed down the subscription with the underlying error from the stream. assert!(matches!(sub.next().await, Some(Err(Error::SubscriptionClosed(close_reason))) if close_reason == exp)); + sub.next().await.unwrap(); } #[tokio::test] @@ -475,8 +476,7 @@ async fn ws_server_subscribe_with_stream() { .register_subscription("subscribe_5_ints", "n", "unsubscribe_5_ints", |_, sink, _| { tokio::spawn(async move { let interval = interval(Duration::from_millis(50)); - let stream = - IntervalStream::new(interval).zip(futures::stream::iter(1..=5)).map(|(_, c)| Ok::<_, Error>(c)); + let stream = IntervalStream::new(interval).zip(futures::stream::iter(1..=5)).map(|(_, c)| c); sink.pipe_from_stream(stream).await.unwrap(); }); From f354d86823a164d484d04bfaab127e92a3c0a446 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 1 Apr 2022 12:10:31 +0200 Subject: [PATCH 6/7] Update tests/tests/integration_tests.rs --- tests/tests/integration_tests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 1fe6ed084c..c59be25a6f 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -459,7 +459,6 @@ async fn ws_server_cancels_sub_stream_after_err() { let exp = SubscriptionClosed::new(SubscriptionClosedReason::Server(err.to_string())); // The server closed down the subscription with the underlying error from the stream. assert!(matches!(sub.next().await, Some(Err(Error::SubscriptionClosed(close_reason))) if close_reason == exp)); - sub.next().await.unwrap(); } #[tokio::test] From e6922870d0acedeabd06f76f874149cce81bba52 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 1 Apr 2022 12:11:34 +0200 Subject: [PATCH 7/7] Update proc-macros/src/lib.rs --- proc-macros/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proc-macros/src/lib.rs b/proc-macros/src/lib.rs index 5aec7cf16e..2a6d277975 100644 --- a/proc-macros/src/lib.rs +++ b/proc-macros/src/lib.rs @@ -288,7 +288,7 @@ pub(crate) mod visitor; /// // as subscription responses. /// fn sub_override_notif_method(&self, mut sink: SubscriptionSink) -> RpcResult<()> { /// tokio::spawn(async move { -/// let stream = futures_util::stream::iter([Ok::<_, &str>("one"), Ok("two"), Ok("three")]); +/// let stream = futures_util::stream::iter(["one", "two", "three"]); /// sink.pipe_from_stream(stream).await; /// }); ///