From 682d98bb9e354d49e9952374367fa752c5b9bc54 Mon Sep 17 00:00:00 2001 From: magine Date: Tue, 13 Aug 2024 08:08:49 +0800 Subject: [PATCH] config: add a keep_alive_timeout configuration option --- src/config.rs | 18 ++++++++++++++-- src/lib.rs | 16 +++++++------- src/protocol/libp2p/kademlia/mod.rs | 7 +++++-- src/protocol/notification/tests/mod.rs | 7 +++++-- src/protocol/request_response/tests.rs | 7 +++++-- src/protocol/transport_service.rs | 29 ++++++++++++++------------ src/transport/manager/mod.rs | 20 ++++++++++-------- src/transport/mod.rs | 3 +++ 8 files changed, 69 insertions(+), 38 deletions(-) diff --git a/src/config.rs b/src/config.rs index 34d67ca0..c2956021 100644 --- a/src/config.rs +++ b/src/config.rs @@ -30,7 +30,7 @@ use crate::{ }, transport::{ manager::limits::ConnectionLimitsConfig, tcp::config::Config as TcpConfig, - MAX_PARALLEL_DIALS, + KEEP_ALIVE_TIMEOUT, MAX_PARALLEL_DIALS, }, types::protocol::ProtocolName, PeerId, @@ -45,7 +45,7 @@ use crate::transport::websocket::config::Config as WebSocketConfig; use multiaddr::Multiaddr; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Duration}; /// Connection role. #[derive(Debug, Copy, Clone)] @@ -121,6 +121,9 @@ pub struct ConfigBuilder { /// Connection limits config. connection_limits: ConnectionLimitsConfig, + + /// Close the connection if no substreams are open within this time frame. + keep_alive_timeout: Duration, } impl Default for ConfigBuilder { @@ -153,6 +156,7 @@ impl ConfigBuilder { request_response_protocols: HashMap::new(), known_addresses: Vec::new(), connection_limits: ConnectionLimitsConfig::default(), + keep_alive_timeout: KEEP_ALIVE_TIMEOUT, } } @@ -268,6 +272,12 @@ impl ConfigBuilder { self } + /// Set keep alive timeout for connections. + pub fn with_keep_alive_timeout(mut self, timeout: Duration) -> Self { + self.keep_alive_timeout = timeout; + self + } + /// Build [`Litep2pConfig`]. pub fn build(mut self) -> Litep2pConfig { let keypair = match self.keypair { @@ -296,6 +306,7 @@ impl ConfigBuilder { request_response_protocols: self.request_response_protocols, known_addresses: self.known_addresses, connection_limits: self.connection_limits, + keep_alive_timeout: self.keep_alive_timeout, } } } @@ -355,4 +366,7 @@ pub struct Litep2pConfig { /// Connection limits config. pub(crate) connection_limits: ConnectionLimitsConfig, + + /// Close the connection if no substreams are open within this time frame. + pub(crate) keep_alive_timeout: Duration, } diff --git a/src/lib.rs b/src/lib.rs index 1523bcb4..6222c241 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,7 +55,7 @@ use multihash::Multihash; use transport::Endpoint; use types::ConnectionId; -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{collections::HashSet, sync::Arc}; pub use bandwidth::BandwidthSink; pub use error::Error; @@ -171,7 +171,7 @@ impl Litep2p { protocol, config.fallback_names.clone(), config.codec, - Duration::from_secs(5), + litep2p_config.keep_alive_timeout, ); let executor = Arc::clone(&litep2p_config.executor); litep2p_config.executor.run(Box::pin(async move { @@ -191,7 +191,7 @@ impl Litep2p { protocol, config.fallback_names.clone(), config.codec, - config.timeout, + litep2p_config.keep_alive_timeout, ); litep2p_config.executor.run(Box::pin(async move { RequestResponseProtocol::new(service, config).run().await @@ -206,7 +206,7 @@ impl Litep2p { protocol_name, Vec::new(), protocol.codec(), - Duration::from_secs(5), + litep2p_config.keep_alive_timeout, ); litep2p_config.executor.run(Box::pin(async move { let _ = protocol.run(service).await; @@ -225,7 +225,7 @@ impl Litep2p { ping_config.protocol.clone(), Vec::new(), ping_config.codec, - Duration::from_secs(5), + litep2p_config.keep_alive_timeout, ); litep2p_config.executor.run(Box::pin(async move { Ping::new(service, ping_config).run().await @@ -248,7 +248,7 @@ impl Litep2p { main_protocol.clone(), fallback_names, kademlia_config.codec, - Duration::from_secs(5), + litep2p_config.keep_alive_timeout, ); litep2p_config.executor.run(Box::pin(async move { let _ = Kademlia::new(service, kademlia_config).run().await; @@ -269,7 +269,7 @@ impl Litep2p { identify_config.protocol.clone(), Vec::new(), identify_config.codec, - Duration::from_secs(5), + litep2p_config.keep_alive_timeout, ); identify_config.public = Some(litep2p_config.keypair.public().into()); @@ -289,7 +289,7 @@ impl Litep2p { bitswap_config.protocol.clone(), Vec::new(), bitswap_config.codec, - Duration::from_secs(5), + litep2p_config.keep_alive_timeout, ); litep2p_config.executor.run(Box::pin(async move { Bitswap::new(service, bitswap_config).run().await diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index f379f132..9dc2c347 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -874,7 +874,10 @@ mod tests { use crate::{ codec::ProtocolCodec, crypto::ed25519::Keypair, - transport::manager::{limits::ConnectionLimitsConfig, TransportManager}, + transport::{ + manager::{limits::ConnectionLimitsConfig, TransportManager}, + KEEP_ALIVE_TIMEOUT, + }, types::protocol::ProtocolName, BandwidthSink, }; @@ -902,7 +905,7 @@ mod tests { Vec::new(), Default::default(), handle, - std::time::Duration::from_secs(5), + KEEP_ALIVE_TIMEOUT, ); let (event_tx, event_rx) = channel(64); let (_cmd_tx, cmd_rx) = channel(64); diff --git a/src/protocol/notification/tests/mod.rs b/src/protocol/notification/tests/mod.rs index 67aa9a50..4aa48aa4 100644 --- a/src/protocol/notification/tests/mod.rs +++ b/src/protocol/notification/tests/mod.rs @@ -29,7 +29,10 @@ use crate::{ }, InnerTransportEvent, ProtocolCommand, TransportService, }, - transport::manager::{limits::ConnectionLimitsConfig, TransportManager}, + transport::{ + manager::{limits::ConnectionLimitsConfig, TransportManager}, + KEEP_ALIVE_TIMEOUT, + }, types::protocol::ProtocolName, BandwidthSink, PeerId, }; @@ -63,7 +66,7 @@ fn make_notification_protocol() -> ( Vec::new(), std::sync::Arc::new(Default::default()), handle, - std::time::Duration::from_secs(5), + KEEP_ALIVE_TIMEOUT, ); let (config, handle) = NotificationConfig::new( ProtocolName::from("/notif/1"), diff --git a/src/protocol/request_response/tests.rs b/src/protocol/request_response/tests.rs index e17e760b..9cb842f6 100644 --- a/src/protocol/request_response/tests.rs +++ b/src/protocol/request_response/tests.rs @@ -29,7 +29,10 @@ use crate::{ InnerTransportEvent, TransportService, }, substream::Substream, - transport::manager::{limits::ConnectionLimitsConfig, TransportManager}, + transport::{ + manager::{limits::ConnectionLimitsConfig, TransportManager}, + KEEP_ALIVE_TIMEOUT, + }, types::{RequestId, SubstreamId}, BandwidthSink, Error, PeerId, ProtocolName, }; @@ -61,7 +64,7 @@ fn protocol() -> ( Vec::new(), std::sync::Arc::new(Default::default()), handle, - std::time::Duration::from_secs(5), + KEEP_ALIVE_TIMEOUT, ); let (config, handle) = ConfigBuilder::new(ProtocolName::from("/req/1")).with_max_size(1024).build(); diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index b774bb1d..a5a87ffc 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -123,10 +123,10 @@ pub struct TransportService { next_substream_id: Arc, /// Close the connection if no substreams are open within this time frame. - keep_alive: Duration, + keep_alive_timeout: Duration, /// Pending keep-alive timeouts. - keep_alive_timeouts: FuturesUnordered>, + pending_keep_alive_timeouts: FuturesUnordered>, } impl TransportService { @@ -150,8 +150,8 @@ impl TransportService { transport_handle, next_substream_id, connections: HashMap::new(), - keep_alive, - keep_alive_timeouts: FuturesUnordered::new(), + keep_alive_timeout: keep_alive, + pending_keep_alive_timeouts: FuturesUnordered::new(), }, tx, ) @@ -173,7 +173,7 @@ impl TransportService { ?connection_id, "connection established", ); - let keep_alive = self.keep_alive; + let keep_alive = self.keep_alive_timeout; match self.connections.get_mut(&peer) { Some(context) => match context.secondary { @@ -188,7 +188,7 @@ impl TransportService { None } None => { - self.keep_alive_timeouts.push(Box::pin(async move { + self.pending_keep_alive_timeouts.push(Box::pin(async move { tokio::time::sleep(keep_alive).await; (peer, connection_id) })); @@ -199,7 +199,7 @@ impl TransportService { }, None => { self.connections.insert(peer, ConnectionContext::new(handle)); - self.keep_alive_timeouts.push(Box::pin(async move { + self.pending_keep_alive_timeouts.push(Box::pin(async move { tokio::time::sleep(keep_alive).await; (peer, connection_id) })); @@ -393,7 +393,7 @@ impl Stream for TransportService { } while let Poll::Ready(Some((peer, connection_id))) = - self.keep_alive_timeouts.poll_next_unpin(cx) + self.pending_keep_alive_timeouts.poll_next_unpin(cx) { if let Some(context) = self.connections.get_mut(&peer) { tracing::trace!( @@ -416,7 +416,10 @@ mod tests { use super::*; use crate::{ protocol::TransportService, - transport::manager::{handle::InnerTransportManagerCommand, TransportManagerHandle}, + transport::{ + manager::{handle::InnerTransportManagerCommand, TransportManagerHandle}, + KEEP_ALIVE_TIMEOUT, + }, }; use futures::StreamExt; use parking_lot::RwLock; @@ -445,7 +448,7 @@ mod tests { Vec::new(), Arc::new(AtomicUsize::new(0usize)), handle, - std::time::Duration::from_secs(5), + KEEP_ALIVE_TIMEOUT, ); (service, sender, cmd_rx) @@ -787,7 +790,7 @@ mod tests { }; // verify the first connection state is correct - assert_eq!(service.keep_alive_timeouts.len(), 1); + assert_eq!(service.pending_keep_alive_timeouts.len(), 1); match service.connections.get(&peer) { Some(context) => { assert_eq!( @@ -822,7 +825,7 @@ mod tests { // doesn't exist anymore // // the peer is removed because there is no connection to them - assert_eq!(service.keep_alive_timeouts.len(), 1); + assert_eq!(service.pending_keep_alive_timeouts.len(), 1); assert!(service.connections.get(&peer).is_none()); // register new primary connection but verify that there are now two pending keep-alive @@ -850,7 +853,7 @@ mod tests { }; // verify the first connection state is correct - assert_eq!(service.keep_alive_timeouts.len(), 2); + assert_eq!(service.pending_keep_alive_timeouts.len(), 2); match service.connections.get(&peer) { Some(context) => { assert_eq!( diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index bf0c8b73..33cc8f5b 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -323,7 +323,7 @@ impl TransportManager { protocol: ProtocolName, fallback_names: Vec, codec: ProtocolCodec, - keep_alive: Duration, + keep_alive_timeout: Duration, ) -> TransportService { assert!(!self.protocol_names.contains(&protocol)); @@ -339,7 +339,7 @@ impl TransportManager { fallback_names.clone(), self.next_substream_id.clone(), self.transport_manager_handle.clone(), - keep_alive, + keep_alive_timeout, ); self.protocols.insert( @@ -1759,7 +1759,9 @@ mod tests { use super::*; use crate::{ - crypto::ed25519::Keypair, executor::DefaultExecutor, transport::dummy::DummyTransport, + crypto::ed25519::Keypair, + executor::DefaultExecutor, + transport::{dummy::DummyTransport, KEEP_ALIVE_TIMEOUT}, }; use std::{ net::{Ipv4Addr, Ipv6Addr}, @@ -1796,13 +1798,13 @@ mod tests { ProtocolName::from("/notif/1"), Vec::new(), ProtocolCodec::UnsignedVarint(None), - Duration::from_secs(5), + KEEP_ALIVE_TIMEOUT, ); manager.register_protocol( ProtocolName::from("/notif/1"), Vec::new(), ProtocolCodec::UnsignedVarint(None), - Duration::from_secs(5), + KEEP_ALIVE_TIMEOUT, ); } @@ -1823,7 +1825,7 @@ mod tests { ProtocolName::from("/notif/1"), Vec::new(), ProtocolCodec::UnsignedVarint(None), - Duration::from_secs(5), + KEEP_ALIVE_TIMEOUT, ); manager.register_protocol( ProtocolName::from("/notif/2"), @@ -1832,7 +1834,7 @@ mod tests { ProtocolName::from("/notif/1"), ], ProtocolCodec::UnsignedVarint(None), - Duration::from_secs(5), + KEEP_ALIVE_TIMEOUT, ); } @@ -1856,7 +1858,7 @@ mod tests { ProtocolName::from("/notif/1"), ], ProtocolCodec::UnsignedVarint(None), - Duration::from_secs(5), + KEEP_ALIVE_TIMEOUT, ); manager.register_protocol( ProtocolName::from("/notif/2"), @@ -1865,7 +1867,7 @@ mod tests { ProtocolName::from("/notif/1/new"), ], ProtocolCodec::UnsignedVarint(None), - Duration::from_secs(5), + KEEP_ALIVE_TIMEOUT, ); } diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 0746b9e7..792508cc 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -47,6 +47,9 @@ pub(crate) const CONNECTION_OPEN_TIMEOUT: Duration = Duration::from_secs(10); /// Timeout for opening a substream. pub(crate) const SUBSTREAM_OPEN_TIMEOUT: Duration = Duration::from_secs(5); +/// Timeout for connection waiting new substreams. +pub(crate) const KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(5); + /// Maximum number of parallel dial attempts. pub(crate) const MAX_PARALLEL_DIALS: usize = 8;