From b0f9469b204ae1963b0552f041ca856dac1bb698 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 2 Jun 2022 15:05:05 +0300 Subject: [PATCH 1/2] ws-server: Submit ping regardless of WS messages Signed-off-by: Alexandru Vasile --- ws-server/src/server.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 961a86f828..75065bdb91 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -319,21 +319,20 @@ async fn background_task( tokio::spawn(async move { // Received messages from the WebSocket. let mut rx_item = rx.next(); + let mut submit_ping = Box::pin(tokio::time::sleep(ping_interval)); while !stop_server2.shutdown_requested() { - let submit_ping = tokio::time::sleep(ping_interval); - tokio::pin!(submit_ping); - // Ensure select is cancel-safe by fetching and storing the `rx_item` that did not finish yet. // Note: Although, this is cancel-safe already, avoid using `select!` macro for future proofing. match futures_util::future::select(rx_item, submit_ping).await { - Either::Left((Some(response), _)) => { + Either::Left((Some(response), next_ping)) => { // If websocket message send fail then terminate the connection. if let Err(err) = send_ws_message(&mut sender, response).await { tracing::warn!("WS send error: {}; terminate connection", err); break; } rx_item = rx.next(); + submit_ping = next_ping; } // Nothing else to receive. Either::Left((None, _)) => break, @@ -345,6 +344,7 @@ async fn background_task( break; } rx_item = next_rx; + submit_ping = Box::pin(tokio::time::sleep(ping_interval)); } } } From aee8697ea565274d5fa626b081b126bca6d70c20 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 3 Jun 2022 09:54:13 +0200 Subject: [PATCH 2/2] use tokio_stream::IntervalStream for less boxing --- ws-server/Cargo.toml | 1 + ws-server/src/server.rs | 15 ++++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/ws-server/Cargo.toml b/ws-server/Cargo.toml index 29368c5b94..091801f7d5 100644 --- a/ws-server/Cargo.toml +++ b/ws-server/Cargo.toml @@ -19,6 +19,7 @@ serde_json = { version = "1", features = ["raw_value"] } soketto = "0.7.1" tokio = { version = "1.16", features = ["net", "rt-multi-thread", "macros", "time"] } tokio-util = { version = "0.7", features = ["compat"] } +tokio-stream = "0.1.7" [dev-dependencies] anyhow = "1" diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 75065bdb91..ba935b6735 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -51,6 +51,7 @@ use soketto::data::ByteSlice125; use soketto::handshake::{server::Response, Server as SokettoServer}; use soketto::Sender; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; +use tokio_stream::wrappers::IntervalStream; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; /// Default maximum connections allowed. @@ -319,20 +320,24 @@ async fn background_task( tokio::spawn(async move { // Received messages from the WebSocket. let mut rx_item = rx.next(); - let mut submit_ping = Box::pin(tokio::time::sleep(ping_interval)); + + // Interval to send out continuously `pings`. + let ping_interval = IntervalStream::new(tokio::time::interval(ping_interval)); + tokio::pin!(ping_interval); + let mut next_ping = ping_interval.next(); while !stop_server2.shutdown_requested() { // Ensure select is cancel-safe by fetching and storing the `rx_item` that did not finish yet. // Note: Although, this is cancel-safe already, avoid using `select!` macro for future proofing. - match futures_util::future::select(rx_item, submit_ping).await { - Either::Left((Some(response), next_ping)) => { + match futures_util::future::select(rx_item, next_ping).await { + Either::Left((Some(response), ping)) => { // If websocket message send fail then terminate the connection. if let Err(err) = send_ws_message(&mut sender, response).await { tracing::warn!("WS send error: {}; terminate connection", err); break; } rx_item = rx.next(); - submit_ping = next_ping; + next_ping = ping; } // Nothing else to receive. Either::Left((None, _)) => break, @@ -344,7 +349,7 @@ async fn background_task( break; } rx_item = next_rx; - submit_ping = Box::pin(tokio::time::sleep(ping_interval)); + next_ping = ping_interval.next(); } } }