From fe6e052f11739a14288a21e7dbd13333a2262e64 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 6 Dec 2023 18:20:02 +0100 Subject: [PATCH 01/13] refactor(client): unify ws ping/pong api --- client/ws-client/src/lib.rs | 23 ++-- core/Cargo.toml | 6 + core/src/client/async_client/mod.rs | 191 +++++++++++++++++++++------- server/src/tests/ws.rs | 2 +- 4 files changed, 168 insertions(+), 54 deletions(-) diff --git a/client/ws-client/src/lib.rs b/client/ws-client/src/lib.rs index c354ff7992..e46b043538 100644 --- a/client/ws-client/src/lib.rs +++ b/client/ws-client/src/lib.rs @@ -39,6 +39,7 @@ mod tests; pub use http::{HeaderMap, HeaderValue}; +pub use jsonrpsee_core::client::async_client::PingConfig; pub use jsonrpsee_core::client::Client as WsClient; pub use jsonrpsee_types as types; @@ -82,7 +83,7 @@ pub struct WsClientBuilder { max_response_size: u32, request_timeout: Duration, connection_timeout: Duration, - ping_interval: Option, + ping_config: Option, headers: http::HeaderMap, max_concurrent_requests: usize, max_buffer_capacity_per_subscription: usize, @@ -99,7 +100,7 @@ impl Default for WsClientBuilder { max_response_size: TEN_MB_SIZE_BYTES, request_timeout: Duration::from_secs(60), connection_timeout: Duration::from_secs(10), - ping_interval: None, + ping_config: None, headers: HeaderMap::new(), max_concurrent_requests: 256, max_buffer_capacity_per_subscription: 1024, @@ -170,9 +171,15 @@ impl WsClientBuilder { self } - /// See documentation [`ClientBuilder::ping_interval`] (disabled by default). - pub fn ping_interval(mut self, interval: Duration) -> Self { - self.ping_interval = Some(interval); + /// See documentation [`ClientBuilder::enable_ws_ping`] (disabled by default). + pub fn enable_ws_ping(mut self, cfg: PingConfig) -> Self { + self.ping_config = Some(cfg); + self + } + + /// See documentation [`ClientBuilder::disable_ws_ping`] + pub fn disable_ws_ping(mut self) -> Self { + self.ping_config = None; self } @@ -227,7 +234,7 @@ impl WsClientBuilder { let Self { max_concurrent_requests, request_timeout, - ping_interval, + ping_config, max_buffer_capacity_per_subscription, id_kind, max_log_length, @@ -241,8 +248,8 @@ impl WsClientBuilder { .id_format(id_kind) .set_max_logging_length(max_log_length); - if let Some(interval) = ping_interval { - client = client.ping_interval(interval); + if let Some(cfg) = ping_config { + client = client.enable_ws_ping(cfg); } client.build_with_tokio(sender, receiver) diff --git a/core/Cargo.toml b/core/Cargo.toml index edb3a5aa3e..8b4b743410 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -33,6 +33,8 @@ parking_lot = { version = "0.12", optional = true } tokio = { version = "1.16", optional = true } wasm-bindgen-futures = { version = "0.4.19", optional = true } futures-timer = { version = "3", optional = true } +tokio-stream = { version = "0.1", optional = true } +pin-project = { version = "1", optional = true } [features] default = [] @@ -57,6 +59,8 @@ async-client = [ "tokio/rt", "tokio/time", "futures-timer", + "tokio-stream", + "pin-project", ] async-wasm-client = [ "async-lock", @@ -67,6 +71,8 @@ async-wasm-client = [ "futures-timer/wasm-bindgen", "tokio/macros", "tokio/time", + "tokio-stream", + "pin-project", ] [dev-dependencies] diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 769ed99db5..2930a770d7 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -16,12 +16,16 @@ use crate::JsonRawValue; use std::borrow::Cow as StdCow; use core::time::Duration; +use std::num::NonZeroUsize; +use std::time::Instant; use helpers::{ build_unsubscribe_message, call_with_timeout, process_batch_response, process_notification, process_single_response, process_subscription_response, stop_subscription, }; use jsonrpsee_types::{InvalidRequestId, ResponseSuccess, TwoPointZero}; use manager::RequestManager; +use pin_project::pin_project; +use tokio::time::{interval, Interval, interval_at}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, Waker}; @@ -42,6 +46,67 @@ use super::{generate_batch_id_range, FrontToBack, IdKind, RequestIdManager}; const LOG_TARGET: &str = "jsonrpsee-client"; +/// Configuration for WebSocket ping/pong mechanism and it may be used to disconnect +/// an inactive connection. +/// +/// jsonrpsee doesn't associate the ping/pong frames just that if +/// pong frame isn't received within `inactive_limit` then it's regarded +/// as missed. +/// +/// Such that the `inactive_limit` should be configured to longer than a single +/// WebSocket ping takes or it might be missed and may end up +/// terminating the connection. +#[derive(Debug, Copy, Clone)] +pub struct PingConfig { + /// Period which the server pings the connected client. + pub(crate) ping_interval: Duration, + /// Max allowed time for a connection to stay idle. + pub(crate) inactive_limit: Duration, + /// Max failures. + pub(crate) max_failures: NonZeroUsize, +} + +impl Default for PingConfig { + fn default() -> Self { + Self { + ping_interval: Duration::from_secs(30), + max_failures: NonZeroUsize::new(1).expect("1 > 0; qed"), + inactive_limit: Duration::from_secs(40), + } + } +} + +impl PingConfig { + /// Create a new PingConfig. + pub fn new() -> Self { + Self::default() + } + + /// Configure the interval when the WebSocket pings are sent out. + pub fn ping_interval(mut self, ping_interval: Duration) -> Self { + self.ping_interval = ping_interval; + self + } + + /// Configure how long to wait for the WebSocket pong. + /// When this limit is expired it's regarded as the client is unresponsive. + /// + /// You may configure how many times the client is allowed to be "inactive" by + /// [`PingConfig::max_failures`]. + pub fn inactive_limit(mut self, inactivity_limit: Duration) -> Self { + self.inactive_limit = inactivity_limit; + self + } + + /// Configure how many times the remote peer is allowed be + /// inactive until the connection is closed. + pub fn max_failures(mut self, max: NonZeroUsize) -> Self { + self.max_failures = max; + self + } +} + + #[derive(Debug, Default, Clone)] pub(crate) struct ThreadSafeRequestManager(Arc>); @@ -118,7 +183,7 @@ pub struct ClientBuilder { max_buffer_capacity_per_subscription: usize, id_kind: IdKind, max_log_length: u32, - ping_interval: Option, + ping_config: Option, } impl Default for ClientBuilder { @@ -129,7 +194,7 @@ impl Default for ClientBuilder { max_buffer_capacity_per_subscription: 1024, id_kind: IdKind::Number, max_log_length: 4096, - ping_interval: None, + ping_config: None, } } } @@ -182,20 +247,21 @@ impl ClientBuilder { self } - /// Set the interval at which pings frames are submitted (disabled by default). - /// - /// Periodically submitting pings at a defined interval has mainly two benefits: - /// - Directly, it acts as a "keep-alive" alternative in the WebSocket world. - /// - Indirectly by inspecting debug logs, it ensures that the endpoint is still responding to messages. + /// Enable WebSocket ping/pong on the client. + /// + /// This only works if the transport supports WebSocket pings. /// - /// The underlying implementation does not make any assumptions about at which intervals pongs are received. + /// Default: pings are disabled. + pub fn enable_ws_ping(mut self, cfg: PingConfig) -> Self { + self.ping_config = Some(cfg); + self + } + + /// Disable WebSocket ping/pong on the server. /// - /// Note: The interval duration is restarted when - /// - a frontend command is submitted - /// - a reply is received from the backend - /// - the interval duration expires - pub fn ping_interval(mut self, interval: Duration) -> Self { - self.ping_interval = Some(interval); + /// Default: pings are disabled. + pub fn disable_ws_ping(mut self) -> Self { + self.ping_config = None; self } @@ -214,7 +280,7 @@ impl ClientBuilder { let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests); let (err_to_front, err_from_back) = oneshot::channel::(); let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription; - let ping_interval = self.ping_interval; + let ping_config = self.ping_config; let (client_dropped_tx, client_dropped_rx) = oneshot::channel(); let (send_receive_task_sync_tx, send_receive_task_sync_rx) = mpsc::channel(1); let manager = ThreadSafeRequestManager::new(); @@ -225,7 +291,7 @@ impl ClientBuilder { close_tx: send_receive_task_sync_tx.clone(), manager: manager.clone(), max_buffer_capacity_per_subscription, - ping_interval, + ping_config, })); tokio::spawn(read_task(ReadTaskParams { @@ -234,6 +300,7 @@ impl ClientBuilder { to_send_task: to_back.clone(), manager, max_buffer_capacity_per_subscription: self.max_buffer_capacity_per_subscription, + ping_config, })); tokio::spawn(wait_for_shutdown(send_receive_task_sync_rx, client_dropped_rx, err_to_front)); @@ -794,7 +861,7 @@ struct SendTaskParams { close_tx: mpsc::Sender>, manager: ThreadSafeRequestManager, max_buffer_capacity_per_subscription: usize, - ping_interval: Option, + ping_config: Option, } async fn send_task(params: SendTaskParams) @@ -807,15 +874,20 @@ where close_tx, manager, max_buffer_capacity_per_subscription, - ping_interval, + ping_config, } = params; + let mut ping_interval = match ping_config { + None => IntervalStream::pending(), + // NOTE: we are emitted a tick here immediately to sync + // with how the receive task work because it starts measuring the pong + // when it starts up. + Some(p) => IntervalStream::new(interval(p.ping_interval)), + }; + // This is safe because `tokio::time::Interval`, `tokio::mpsc::Sender` and `tokio::mpsc::Receiver` // are cancel-safe. - let res = if let Some(ping_interval) = ping_interval { - let mut ping = tokio::time::interval_at(tokio::time::Instant::now() + ping_interval, ping_interval); - - loop { + let res = loop { tokio::select! { biased; _ = close_tx.closed() => break Ok(()), @@ -831,34 +903,14 @@ where break Err(Error::Transport(e.into())); } } - _ = ping.tick() => { + _ = ping_interval.next() => { if let Err(err) = sender.send_ping().await { tracing::error!(target: LOG_TARGET, "Could not send ping frame: {err}"); break Err(Error::Custom("Could not send ping frame".into())); } } } - } - } else { - loop { - tokio::select! { - biased; - _ = close_tx.closed() => break Ok(()), - maybe_msg = from_frontend.recv() => { - let Some(msg) = maybe_msg else { - break Ok(()); - }; - - if let Err(e) = - handle_frontend_messages(msg, &manager, &mut sender, max_buffer_capacity_per_subscription).await - { - tracing::error!(target: LOG_TARGET, "Could not send message: {e}"); - break Err(Error::Transport(e.into())); - } - } - } - } - }; + }; from_frontend.close(); let _ = sender.close().await; @@ -871,13 +923,21 @@ struct ReadTaskParams { to_send_task: mpsc::Sender, manager: ThreadSafeRequestManager, max_buffer_capacity_per_subscription: usize, + ping_config: Option, } async fn read_task(params: ReadTaskParams) where R: TransportReceiverT, { - let ReadTaskParams { receiver, close_tx, to_send_task, manager, max_buffer_capacity_per_subscription } = params; + let ReadTaskParams { receiver, close_tx, to_send_task, manager, max_buffer_capacity_per_subscription, ping_config } = params; + + let mut last_active = Instant::now(); + let mut inactivity_check = match ping_config { + Some(p) => IntervalStream::new(interval_at(tokio::time::Instant::now() + p.ping_interval, p.ping_interval)), + None => IntervalStream::pending(), + }; + let mut missed = 0; let backend_event = futures_util::stream::unfold(receiver, |mut receiver| async { let res = receiver.receive().await; @@ -903,6 +963,7 @@ where _ = pending_unsubscribes.next() => (), // New message received. maybe_msg = backend_event.next() => { + last_active = Instant::now(); let Some(msg) = maybe_msg else { break Ok(()) }; match handle_backend_messages::(Some(msg), &manager, max_buffer_capacity_per_subscription) { @@ -915,7 +976,17 @@ where } Ok(None) => (), } + } + _ = inactivity_check.next() => { + if let Some(p) = ping_config { + if last_active.elapsed() > p.inactive_limit { + missed += 1; + if missed >= p.max_failures.get() { + break Ok(()); + } + } + } } } }; @@ -970,3 +1041,33 @@ impl Stream for MaybePendingFutures { self.futs.poll_next_unpin(cx) } } + + +#[pin_project] +pub(crate) struct IntervalStream(#[pin] Option); + +impl IntervalStream { + /// Creates a stream which never returns any elements. + pub(crate) fn pending() -> Self { + Self(None) + } + + /// Creates a stream which produces elements with interval of `period`. + pub(crate) fn new(interval: Interval) -> Self { + Self(Some(tokio_stream::wrappers::IntervalStream::new(interval))) + } +} + +impl Stream for IntervalStream { + type Item = tokio::time::Instant; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Some(mut stream) = self.project().0.as_pin_mut() { + stream.poll_next_unpin(cx) + } else { + // NOTE: this will not be woken up again and it's by design + // to be a pending stream that never returns. + Poll::Pending + } + } +} diff --git a/server/src/tests/ws.rs b/server/src/tests/ws.rs index 3fbafed690..da151f6a7e 100644 --- a/server/src/tests/ws.rs +++ b/server/src/tests/ws.rs @@ -881,7 +881,7 @@ async fn server_with_infinite_call( ) -> (crate::ServerHandle, std::net::SocketAddr) { let server = ServerBuilder::default() // Make sure that the ping_interval doesn't force the connection to be closed - .enable_ws_ping(crate::server::PingConfig::new().max_failures(NonZeroUsize::MAX).ping_interval(timeout)) + .enable_ws_ping(crate::PingConfig::new().max_failures(NonZeroUsize::MAX).ping_interval(timeout)) .build("127.0.0.1:0") .with_default_timeout() .await From b0d0394cb12e8081d0e5d3e40d8ff26efce42a28 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 6 Dec 2023 18:34:33 +0100 Subject: [PATCH 02/13] fix nits --- core/src/client/async_client/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 2930a770d7..6ca278c847 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -58,7 +58,7 @@ const LOG_TARGET: &str = "jsonrpsee-client"; /// terminating the connection. #[derive(Debug, Copy, Clone)] pub struct PingConfig { - /// Period which the server pings the connected client. + /// Interval that the pings are sent. pub(crate) ping_interval: Duration, /// Max allowed time for a connection to stay idle. pub(crate) inactive_limit: Duration, @@ -89,16 +89,16 @@ impl PingConfig { } /// Configure how long to wait for the WebSocket pong. - /// When this limit is expired it's regarded as the client is unresponsive. + /// When this limit is expired it's regarded as inresponsive. /// - /// You may configure how many times the client is allowed to be "inactive" by - /// [`PingConfig::max_failures`]. + /// You may configure how many times the connection is allowed to + /// be inactive by [`PingConfig::max_failures`]. pub fn inactive_limit(mut self, inactivity_limit: Duration) -> Self { self.inactive_limit = inactivity_limit; self } - /// Configure how many times the remote peer is allowed be + /// Configure how many times the connection is allowed be /// inactive until the connection is closed. pub fn max_failures(mut self, max: NonZeroUsize) -> Self { self.max_failures = max; @@ -983,7 +983,7 @@ where missed += 1; if missed >= p.max_failures.get() { - break Ok(()); + break Err(Error::Custom("WebSocket ping/pong inactive".into())); } } } From 87d44de5dce9e923d13552e46b48a3ff01e63d2f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 6 Dec 2023 18:52:31 +0100 Subject: [PATCH 03/13] fix build --- core/src/client/async_client/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 6ca278c847..964c21be13 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -326,7 +326,7 @@ impl ClientBuilder { let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests); let (err_to_front, err_from_back) = oneshot::channel::(); let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription; - let ping_interval = self.ping_interval; + let ping_config = self.ping_config; let (client_dropped_tx, client_dropped_rx) = oneshot::channel(); let (send_receive_task_sync_tx, send_receive_task_sync_rx) = mpsc::channel(1); let manager = ThreadSafeRequestManager::new(); @@ -337,7 +337,7 @@ impl ClientBuilder { close_tx: send_receive_task_sync_tx.clone(), manager: manager.clone(), max_buffer_capacity_per_subscription, - ping_interval, + ping_config, })); wasm_bindgen_futures::spawn_local(read_task(ReadTaskParams { @@ -346,6 +346,7 @@ impl ClientBuilder { to_send_task: to_back.clone(), manager, max_buffer_capacity_per_subscription: self.max_buffer_capacity_per_subscription, + ping_config, })); wasm_bindgen_futures::spawn_local(wait_for_shutdown( From ede8ff87a6539d494e61a3f4e2e51dcd72b795ba Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 7 Dec 2023 11:02:49 +0100 Subject: [PATCH 04/13] fix wasm build --- core/Cargo.toml | 3 +++ core/src/client/async_client/mod.rs | 5 ++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 8b4b743410..bccebab18b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -35,6 +35,7 @@ wasm-bindgen-futures = { version = "0.4.19", optional = true } futures-timer = { version = "3", optional = true } tokio-stream = { version = "0.1", optional = true } pin-project = { version = "1", optional = true } +instant = { version = "0.1", optional = true } [features] default = [] @@ -61,6 +62,7 @@ async-client = [ "futures-timer", "tokio-stream", "pin-project", + "instant", ] async-wasm-client = [ "async-lock", @@ -73,6 +75,7 @@ async-wasm-client = [ "tokio/time", "tokio-stream", "pin-project", + "instant/wasm-bindgen", ] [dev-dependencies] diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 964c21be13..413a69ee63 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -17,7 +17,6 @@ use std::borrow::Cow as StdCow; use core::time::Duration; use std::num::NonZeroUsize; -use std::time::Instant; use helpers::{ build_unsubscribe_message, call_with_timeout, process_batch_response, process_notification, process_single_response, process_subscription_response, stop_subscription, @@ -933,7 +932,7 @@ where { let ReadTaskParams { receiver, close_tx, to_send_task, manager, max_buffer_capacity_per_subscription, ping_config } = params; - let mut last_active = Instant::now(); + let mut last_active = instant::Instant::now(); let mut inactivity_check = match ping_config { Some(p) => IntervalStream::new(interval_at(tokio::time::Instant::now() + p.ping_interval, p.ping_interval)), None => IntervalStream::pending(), @@ -964,7 +963,7 @@ where _ = pending_unsubscribes.next() => (), // New message received. maybe_msg = backend_event.next() => { - last_active = Instant::now(); + last_active = instant::Instant::now(); let Some(msg) = maybe_msg else { break Ok(()) }; match handle_backend_messages::(Some(msg), &manager, max_buffer_capacity_per_subscription) { From 5aa1ff8c7e31c6aa1f4679b335e7793d8565c07e Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 7 Dec 2023 18:04:52 +0100 Subject: [PATCH 05/13] refactor again --- core/Cargo.toml | 3 - core/src/client/async_client/mod.rs | 159 +++++++++++++++------------- 2 files changed, 84 insertions(+), 78 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index bccebab18b..4ef59c6124 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -62,7 +62,6 @@ async-client = [ "futures-timer", "tokio-stream", "pin-project", - "instant", ] async-wasm-client = [ "async-lock", @@ -73,9 +72,7 @@ async-wasm-client = [ "futures-timer/wasm-bindgen", "tokio/macros", "tokio/time", - "tokio-stream", "pin-project", - "instant/wasm-bindgen", ] [dev-dependencies] diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 413a69ee63..99f7f55718 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -1,7 +1,34 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + //! Abstract async client. mod helpers; mod manager; +mod utils; use crate::client::async_client::helpers::{process_subscription_close_response, InnerBatchResponse}; use crate::client::{ @@ -16,15 +43,12 @@ use crate::JsonRawValue; use std::borrow::Cow as StdCow; use core::time::Duration; -use std::num::NonZeroUsize; use helpers::{ build_unsubscribe_message, call_with_timeout, process_batch_response, process_notification, process_single_response, process_subscription_response, stop_subscription, }; use jsonrpsee_types::{InvalidRequestId, ResponseSuccess, TwoPointZero}; use manager::RequestManager; -use pin_project::pin_project; -use tokio::time::{interval, Interval, interval_at}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, Waker}; @@ -41,6 +65,8 @@ use serde::de::DeserializeOwned; use tokio::sync::{mpsc, oneshot}; use tracing::instrument; +use self::utils::{IntervalStream, InactivityCheck}; + use super::{generate_batch_id_range, FrontToBack, IdKind, RequestIdManager}; const LOG_TARGET: &str = "jsonrpsee-client"; @@ -62,14 +88,14 @@ pub struct PingConfig { /// Max allowed time for a connection to stay idle. pub(crate) inactive_limit: Duration, /// Max failures. - pub(crate) max_failures: NonZeroUsize, + pub(crate) max_failures: usize, } impl Default for PingConfig { fn default() -> Self { Self { ping_interval: Duration::from_secs(30), - max_failures: NonZeroUsize::new(1).expect("1 > 0; qed"), + max_failures: 1, inactive_limit: Duration::from_secs(40), } } @@ -99,7 +125,10 @@ impl PingConfig { /// Configure how many times the connection is allowed be /// inactive until the connection is closed. - pub fn max_failures(mut self, max: NonZeroUsize) -> Self { + /// + /// Panic: if max == 0. + pub fn max_failures(mut self, max: usize) -> Self { + assert!(max > 0); self.max_failures = max; self } @@ -279,18 +308,35 @@ impl ClientBuilder { let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests); let (err_to_front, err_from_back) = oneshot::channel::(); let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription; - let ping_config = self.ping_config; let (client_dropped_tx, client_dropped_rx) = oneshot::channel(); let (send_receive_task_sync_tx, send_receive_task_sync_rx) = mpsc::channel(1); let manager = ThreadSafeRequestManager::new(); + let (ping_interval, inactivity_stream, inactivity_check) = match self.ping_config { + None => (IntervalStream::pending(), IntervalStream::pending(), InactivityCheck::Disabled), + Some(p) => { + // NOTE: This emitts a tick immediately to sync how the `inactive_interval` works + // because it starts measuring when the client start-ups. + let ping_interval = IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(p.ping_interval))); + + let inactive_interval = { + let start = tokio::time::Instant::now() + p.inactive_limit; + IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(start, p.inactive_limit))) + }; + + let inactivity_check = InactivityCheck::new(p.inactive_limit, p.max_failures); + + (ping_interval, inactive_interval, inactivity_check) + } + }; + tokio::spawn(send_task(SendTaskParams { sender, from_frontend: from_front, close_tx: send_receive_task_sync_tx.clone(), manager: manager.clone(), max_buffer_capacity_per_subscription, - ping_config, + ping_interval, })); tokio::spawn(read_task(ReadTaskParams { @@ -299,7 +345,8 @@ impl ClientBuilder { to_send_task: to_back.clone(), manager, max_buffer_capacity_per_subscription: self.max_buffer_capacity_per_subscription, - ping_config, + inactivity_check, + inactivity_stream, })); tokio::spawn(wait_for_shutdown(send_receive_task_sync_rx, client_dropped_rx, err_to_front)); @@ -322,21 +369,28 @@ impl ClientBuilder { S: TransportSenderT, R: TransportReceiverT, { + use futures_util::stream::Pending; + + pub(crate) type PendingIntervalStream = IntervalStream>; + let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests); let (err_to_front, err_from_back) = oneshot::channel::(); let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription; - let ping_config = self.ping_config; let (client_dropped_tx, client_dropped_rx) = oneshot::channel(); let (send_receive_task_sync_tx, send_receive_task_sync_rx) = mpsc::channel(1); let manager = ThreadSafeRequestManager::new(); + let ping_interval = PendingIntervalStream::pending(); + let inactivity_stream = PendingIntervalStream::pending(); + let inactivity_check = InactivityCheck::Disabled; + wasm_bindgen_futures::spawn_local(send_task(SendTaskParams { sender, from_frontend: from_front, close_tx: send_receive_task_sync_tx.clone(), manager: manager.clone(), max_buffer_capacity_per_subscription, - ping_config, + ping_interval, })); wasm_bindgen_futures::spawn_local(read_task(ReadTaskParams { @@ -345,7 +399,8 @@ impl ClientBuilder { to_send_task: to_back.clone(), manager, max_buffer_capacity_per_subscription: self.max_buffer_capacity_per_subscription, - ping_config, + inactivity_check, + inactivity_stream, })); wasm_bindgen_futures::spawn_local(wait_for_shutdown( @@ -855,18 +910,19 @@ fn unparse_error(raw: &[u8]) -> Error { Error::Custom(format!("Unparseable message: {json_str}")) } -struct SendTaskParams { - sender: S, +struct SendTaskParams { + sender: T, from_frontend: mpsc::Receiver, close_tx: mpsc::Sender>, manager: ThreadSafeRequestManager, max_buffer_capacity_per_subscription: usize, - ping_config: Option, + ping_interval: IntervalStream, } -async fn send_task(params: SendTaskParams) +async fn send_task(params: SendTaskParams) where - S: TransportSenderT, + T: TransportSenderT, + S: Stream + Unpin, { let SendTaskParams { mut sender, @@ -874,17 +930,9 @@ where close_tx, manager, max_buffer_capacity_per_subscription, - ping_config, + mut ping_interval, } = params; - let mut ping_interval = match ping_config { - None => IntervalStream::pending(), - // NOTE: we are emitted a tick here immediately to sync - // with how the receive task work because it starts measuring the pong - // when it starts up. - Some(p) => IntervalStream::new(interval(p.ping_interval)), - }; - // This is safe because `tokio::time::Interval`, `tokio::mpsc::Sender` and `tokio::mpsc::Receiver` // are cancel-safe. let res = loop { @@ -917,27 +965,22 @@ where let _ = close_tx.send(res).await; } -struct ReadTaskParams { +struct ReadTaskParams { receiver: R, close_tx: mpsc::Sender>, to_send_task: mpsc::Sender, manager: ThreadSafeRequestManager, max_buffer_capacity_per_subscription: usize, - ping_config: Option, + inactivity_check: InactivityCheck, + inactivity_stream: IntervalStream, } -async fn read_task(params: ReadTaskParams) +async fn read_task(params: ReadTaskParams) where R: TransportReceiverT, + S: Stream + Unpin, { - let ReadTaskParams { receiver, close_tx, to_send_task, manager, max_buffer_capacity_per_subscription, ping_config } = params; - - let mut last_active = instant::Instant::now(); - let mut inactivity_check = match ping_config { - Some(p) => IntervalStream::new(interval_at(tokio::time::Instant::now() + p.ping_interval, p.ping_interval)), - None => IntervalStream::pending(), - }; - let mut missed = 0; + let ReadTaskParams { receiver, close_tx, to_send_task, manager, max_buffer_capacity_per_subscription, mut inactivity_check, mut inactivity_stream } = params; let backend_event = futures_util::stream::unfold(receiver, |mut receiver| async { let res = receiver.receive().await; @@ -963,7 +1006,7 @@ where _ = pending_unsubscribes.next() => (), // New message received. maybe_msg = backend_event.next() => { - last_active = instant::Instant::now(); + inactivity_check.mark_as_active(); let Some(msg) = maybe_msg else { break Ok(()) }; match handle_backend_messages::(Some(msg), &manager, max_buffer_capacity_per_subscription) { @@ -977,15 +1020,9 @@ where Ok(None) => (), } } - _ = inactivity_check.next() => { - if let Some(p) = ping_config { - if last_active.elapsed() > p.inactive_limit { - missed += 1; - - if missed >= p.max_failures.get() { - break Err(Error::Custom("WebSocket ping/pong inactive".into())); - } - } + _ = inactivity_stream.next() => { + if inactivity_check.is_inactive() { + break Ok(()); } } } @@ -1043,31 +1080,3 @@ impl Stream for MaybePendingFutures { } -#[pin_project] -pub(crate) struct IntervalStream(#[pin] Option); - -impl IntervalStream { - /// Creates a stream which never returns any elements. - pub(crate) fn pending() -> Self { - Self(None) - } - - /// Creates a stream which produces elements with interval of `period`. - pub(crate) fn new(interval: Interval) -> Self { - Self(Some(tokio_stream::wrappers::IntervalStream::new(interval))) - } -} - -impl Stream for IntervalStream { - type Item = tokio::time::Instant; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let Some(mut stream) = self.project().0.as_pin_mut() { - stream.poll_next_unpin(cx) - } else { - // NOTE: this will not be woken up again and it's by design - // to be a pending stream that never returns. - Poll::Pending - } - } -} From f2f299e78d2e97c4fdc86181a6dcfb5adfd35008 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 7 Dec 2023 18:09:18 +0100 Subject: [PATCH 06/13] Update core/Cargo.toml --- core/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 4ef59c6124..7b136f3cab 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -35,7 +35,6 @@ wasm-bindgen-futures = { version = "0.4.19", optional = true } futures-timer = { version = "3", optional = true } tokio-stream = { version = "0.1", optional = true } pin-project = { version = "1", optional = true } -instant = { version = "0.1", optional = true } [features] default = [] From 6dff8cdd9552455fbd5a981b64d79e247914bcbe Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 7 Dec 2023 18:09:53 +0100 Subject: [PATCH 07/13] Update core/src/client/async_client/mod.rs --- core/src/client/async_client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 99f7f55718..f414774d05 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -315,7 +315,7 @@ impl ClientBuilder { let (ping_interval, inactivity_stream, inactivity_check) = match self.ping_config { None => (IntervalStream::pending(), IntervalStream::pending(), InactivityCheck::Disabled), Some(p) => { - // NOTE: This emitts a tick immediately to sync how the `inactive_interval` works + // NOTE: This emits a tick immediately to sync how the `inactive_interval` works // because it starts measuring when the client start-ups. let ping_interval = IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(p.ping_interval))); From 266ac4ada8587130bf608e9a42eaa2465ca7df1f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 7 Dec 2023 18:10:23 +0100 Subject: [PATCH 08/13] Update core/src/client/async_client/mod.rs --- core/src/client/async_client/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index f414774d05..d3eac62ad8 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -1079,4 +1079,3 @@ impl Stream for MaybePendingFutures { } } - From f94c43fec14f5715b1df2039ac4fbe1b725c7af3 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 7 Dec 2023 18:18:09 +0100 Subject: [PATCH 09/13] fix build again --- core/src/client/async_client/mod.rs | 45 ++------- core/src/client/async_client/utils.rs | 134 ++++++++++++++++++++++++++ server/src/server.rs | 16 +-- server/src/tests/ws.rs | 3 +- server/src/transport/ws.rs | 2 +- 5 files changed, 150 insertions(+), 50 deletions(-) create mode 100644 core/src/client/async_client/utils.rs diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index d3eac62ad8..c8a206415f 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -31,6 +31,7 @@ mod manager; mod utils; use crate::client::async_client::helpers::{process_subscription_close_response, InnerBatchResponse}; +use crate::client::async_client::utils::MaybePendingFutures; use crate::client::{ BatchMessage, BatchResponse, ClientT, ReceivedMessage, RegisterNotificationMessage, RequestMessage, Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT, Error @@ -49,16 +50,14 @@ use helpers::{ }; use jsonrpsee_types::{InvalidRequestId, ResponseSuccess, TwoPointZero}; use manager::RequestManager; -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll, Waker}; use async_lock::RwLock as AsyncRwLock; use async_trait::async_trait; use futures_timer::Delay; use futures_util::future::{self, Either}; -use futures_util::stream::{FuturesUnordered, StreamExt}; -use futures_util::{Future, Stream}; +use futures_util::stream::StreamExt; +use futures_util::Stream; use jsonrpsee_types::response::{ResponsePayload, SubscriptionError}; use jsonrpsee_types::{Notification, NotificationSer, RequestSer, Response, SubscriptionResponse}; use serde::de::DeserializeOwned; @@ -126,7 +125,9 @@ impl PingConfig { /// Configure how many times the connection is allowed be /// inactive until the connection is closed. /// - /// Panic: if max == 0. + /// # Panics + /// + /// This method panics if `max` == 0. pub fn max_failures(mut self, max: usize) -> Self { assert!(max > 0); self.max_failures = max; @@ -1045,37 +1046,3 @@ async fn wait_for_shutdown( let _ = err_to_front.send(err); } } - -/// A wrapper around `FuturesUnordered` that doesn't return `None` when it's empty. -struct MaybePendingFutures { - futs: FuturesUnordered, - waker: Option, -} - -impl MaybePendingFutures { - fn new() -> Self { - Self { futs: FuturesUnordered::new(), waker: None } - } - - fn push(&mut self, fut: Fut) { - self.futs.push(fut); - - if let Some(w) = self.waker.take() { - w.wake(); - } - } -} - -impl Stream for MaybePendingFutures { - type Item = Fut::Output; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.futs.is_empty() { - self.waker = Some(cx.waker().clone()); - return Poll::Pending; - } - - self.futs.poll_next_unpin(cx) - } -} - diff --git a/core/src/client/async_client/utils.rs b/core/src/client/async_client/utils.rs new file mode 100644 index 0000000000..1c1e3fa762 --- /dev/null +++ b/core/src/client/async_client/utils.rs @@ -0,0 +1,134 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::pin::Pin; +use std::task::{Context, Poll, Waker}; +use std::time::Duration; + +use futures_util::stream::FuturesUnordered; +use futures_util::{Stream, StreamExt, Future}; +use pin_project::pin_project; + +#[pin_project] +pub(crate) struct IntervalStream(#[pin] Option); + +impl IntervalStream { + /// Creates a stream which never returns any elements. + pub(crate) fn pending() -> Self { + Self(None) + } + + /// Creates a stream which produces elements with interval of `period`. + #[cfg(feature = "async-client")] + pub(crate) fn new(s: S) -> Self { + Self(Some(s)) + } +} + +impl Stream for IntervalStream { + type Item = (); + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Some(mut stream) = self.project().0.as_pin_mut() { + match stream.poll_next_unpin(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(_)) => Poll::Ready(Some(())), + Poll::Ready(None) => Poll::Ready(None), + } + } else { + // NOTE: this will not be woken up again and it's by design + // to be a pending stream that never returns. + Poll::Pending + } + } +} + +#[allow(unused)] +pub(crate) enum InactivityCheck { + Disabled, + Enabled { inactive_dur: Duration, last_active: std::time::Instant, count: usize, max_count: usize } +} + +impl InactivityCheck { + #[cfg(feature = "async-client")] + pub(crate) fn new(_inactive_dur: Duration, _max_count: usize) -> Self { + Self::Enabled { inactive_dur: _inactive_dur, last_active: std::time::Instant::now(), count: 0, max_count: _max_count } + } + + pub(crate) fn is_inactive(&mut self) -> bool { + match self { + Self::Disabled => false, + Self::Enabled { inactive_dur, last_active, count, max_count, .. } => { + if last_active.elapsed() >= *inactive_dur { + *count += 1; + } + + count >= max_count + } + } + } + + pub(crate) fn mark_as_active(&mut self) { + if let Self::Enabled { last_active, .. } = self { + *last_active = std::time::Instant::now(); + } + } +} + + + +/// A wrapper around `FuturesUnordered` that doesn't return `None` when it's empty. +pub(crate) struct MaybePendingFutures { + futs: FuturesUnordered, + waker: Option, +} + +impl MaybePendingFutures { + pub(crate) fn new() -> Self { + Self { futs: FuturesUnordered::new(), waker: None } + } + + pub(crate) fn push(&mut self, fut: Fut) { + self.futs.push(fut); + + if let Some(w) = self.waker.take() { + w.wake(); + } + } +} + +impl Stream for MaybePendingFutures { + type Item = Fut::Output; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.futs.is_empty() { + self.waker = Some(cx.waker().clone()); + return Poll::Pending; + } + + self.futs.poll_next_unpin(cx) + } +} \ No newline at end of file diff --git a/server/src/server.rs b/server/src/server.rs index 9cdf57ac84..262a53f18c 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -27,7 +27,6 @@ use std::error::Error as StdError; use std::future::Future; use std::net::{SocketAddr, TcpListener as StdTcpListener}; -use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::atomic::AtomicU32; use std::sync::Arc; @@ -294,16 +293,12 @@ pub struct PingConfig { /// Max allowed time for a connection to stay idle. pub(crate) inactive_limit: Duration, /// Max failures. - pub(crate) max_failures: NonZeroUsize, + pub(crate) max_failures: usize, } impl Default for PingConfig { fn default() -> Self { - Self { - ping_interval: Duration::from_secs(30), - max_failures: NonZeroUsize::new(1).expect("1 > 0; qed"), - inactive_limit: Duration::from_secs(40), - } + Self { ping_interval: Duration::from_secs(30), max_failures: 1, inactive_limit: Duration::from_secs(40) } } } @@ -331,7 +326,12 @@ impl PingConfig { /// Configure how many times the remote peer is allowed be /// inactive until the connection is closed. - pub fn max_failures(mut self, max: NonZeroUsize) -> Self { + /// + /// # Panics + /// + /// This method panics if `max` == 0. + pub fn max_failures(mut self, max: usize) -> Self { + assert!(max > 0); self.max_failures = max; self } diff --git a/server/src/tests/ws.rs b/server/src/tests/ws.rs index da151f6a7e..00b10f2011 100644 --- a/server/src/tests/ws.rs +++ b/server/src/tests/ws.rs @@ -24,7 +24,6 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use std::num::NonZeroUsize; use std::time::Duration; use crate::tests::helpers::{deser_call, init_logger, server_with_context}; @@ -881,7 +880,7 @@ async fn server_with_infinite_call( ) -> (crate::ServerHandle, std::net::SocketAddr) { let server = ServerBuilder::default() // Make sure that the ping_interval doesn't force the connection to be closed - .enable_ws_ping(crate::PingConfig::new().max_failures(NonZeroUsize::MAX).ping_interval(timeout)) + .enable_ws_ping(crate::PingConfig::new().max_failures(usize::MAX).ping_interval(timeout)) .build("127.0.0.1:0") .with_default_timeout() .await diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 09fd6d1a54..2480529847 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -282,7 +282,7 @@ where if last_active.elapsed() > p.inactive_limit { missed += 1; - if missed >= p.max_failures.get() { + if missed >= p.max_failures { break Receive::ConnectionClosed; } } From 4a5e3293a4ee06dfc5d8371365bc559331bbc7f3 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 8 Dec 2023 16:15:27 +0100 Subject: [PATCH 10/13] Update core/src/client/async_client/mod.rs --- core/src/client/async_client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index c8a206415f..a3f43e8fe3 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -372,7 +372,7 @@ impl ClientBuilder { { use futures_util::stream::Pending; - pub(crate) type PendingIntervalStream = IntervalStream>; + type PendingIntervalStream = IntervalStream>; let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests); let (err_to_front, err_from_back) = oneshot::channel::(); From e406f1c82a5f65a199c240e8e30d98f768c1ddb2 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 8 Dec 2023 16:59:06 +0100 Subject: [PATCH 11/13] fix grumbles --- core/src/client/async_client/mod.rs | 4 +++- server/src/server.rs | 11 +++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index c8a206415f..8bc3c3734f 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -74,12 +74,14 @@ const LOG_TARGET: &str = "jsonrpsee-client"; /// an inactive connection. /// /// jsonrpsee doesn't associate the ping/pong frames just that if -/// pong frame isn't received within `inactive_limit` then it's regarded +/// a pong frame isn't received within the `inactive_limit` then it's regarded /// as missed. /// /// Such that the `inactive_limit` should be configured to longer than a single /// WebSocket ping takes or it might be missed and may end up /// terminating the connection. +/// +/// Default: ping_interval: 30 seconds, max failures: 1 and inactive limit: 40 seconds. #[derive(Debug, Copy, Clone)] pub struct PingConfig { /// Interval that the pings are sent. diff --git a/server/src/server.rs b/server/src/server.rs index 262a53f18c..83ba2495d9 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -273,19 +273,18 @@ impl ConnectionState { } } -/// Configuration for WebSocket ping/pong mechanism and it may be used to disconnect inactive -/// clients. -/// -/// It's possible to configure how often pings are sent out and how long the server will -/// wait until a client is determined as "inactive". +/// Configuration for WebSocket ping/pong mechanism and it may be used to disconnect +/// an inactive connection. /// /// jsonrpsee doesn't associate the ping/pong frames just that if -/// pong frame isn't received within `inactive_limit` then it's regarded +/// a pong frame isn't received within the `inactive_limit` then it's regarded /// as missed. /// /// Such that the `inactive_limit` should be configured to longer than a single /// WebSocket ping takes or it might be missed and may end up /// terminating the connection. +/// +/// Default: ping_interval: 30 seconds, max failures: 1 and inactive limit: 40 seconds. #[derive(Debug, Copy, Clone)] pub struct PingConfig { /// Period which the server pings the connected client. From aeb35466f7ce107205fb3a9c69043dec4c43759f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 8 Dec 2023 17:08:59 +0100 Subject: [PATCH 12/13] fix fmt nit --- core/src/client/async_client/utils.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/client/async_client/utils.rs b/core/src/client/async_client/utils.rs index 1c1e3fa762..fff6d66b08 100644 --- a/core/src/client/async_client/utils.rs +++ b/core/src/client/async_client/utils.rs @@ -44,7 +44,7 @@ impl IntervalStream { /// Creates a stream which produces elements with interval of `period`. #[cfg(feature = "async-client")] pub(crate) fn new(s: S) -> Self { - Self(Some(s)) + Self(Some(s)) } } @@ -131,4 +131,4 @@ impl Stream for MaybePendingFutures { self.futs.poll_next_unpin(cx) } -} \ No newline at end of file +} From a0dddd7d90a554250e4287af37c8e5adccba49c8 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 8 Dec 2023 17:23:10 +0100 Subject: [PATCH 13/13] emit error when ping/pong terminates conn --- core/src/client/async_client/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 6ef50e14e0..c50a3c0858 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -950,14 +950,14 @@ where if let Err(e) = handle_frontend_messages(msg, &manager, &mut sender, max_buffer_capacity_per_subscription).await { - tracing::error!(target: LOG_TARGET, "Could not send message: {e}"); + tracing::error!(target: LOG_TARGET, "ws send failed: {e}"); break Err(Error::Transport(e.into())); } } _ = ping_interval.next() => { if let Err(err) = sender.send_ping().await { - tracing::error!(target: LOG_TARGET, "Could not send ping frame: {err}"); - break Err(Error::Custom("Could not send ping frame".into())); + tracing::error!(target: LOG_TARGET, "Send ws ping failed: {err}"); + break Err(Error::Transport(err.into())); } } } @@ -1025,7 +1025,7 @@ where } _ = inactivity_stream.next() => { if inactivity_check.is_inactive() { - break Ok(()); + break Err(Error::Transport(anyhow::anyhow!("WebSocket ping/pong inactive"))); } } }