Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make concurrent dialing more aggressive #3656

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions substrate/client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ use std::{
collections::{HashMap, HashSet},
fs, iter,
marker::PhantomData,
num::NonZeroUsize,
num::{NonZeroU8, NonZeroUsize},
pin::Pin,
str,
sync::{
Expand Down Expand Up @@ -472,7 +472,10 @@ where
// NOTE: 24 is somewhat arbitrary and should be tuned in the future if necessary.
// See <https://github.com/paritytech/substrate/pull/6080>
.per_connection_event_buffer_size(24)
.max_negotiating_inbound_streams(2048);
.max_negotiating_inbound_streams(2048)
// Increase the default dial concurrency factor 8 to 16 to help with cases where DHT
// has plenty of stale peer addresses.
.dial_concurrency_factor(NonZeroU8::new(16).expect("0 < 16 < 256; qed"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dq: I think this is fine, but do you think there is a chance we might run into issues with the total number of file descriptors opened for the concurrent requests? 🤔

Copy link
Contributor Author

@dmitry-markin dmitry-markin Mar 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is theoretically possible, especially for nodes with high out peer count. All my systems show ulimit -n as 1024. So, it means maximum 125 simultaneous connections to peers with bloated DHT records before the change, and 60 connections after the change.

Don't think the practical probability of hitting that many peers at once with bloated DHT records is high, but at least something to keep in mind.


(builder.build(), bandwidth)
};
Expand Down
21 changes: 15 additions & 6 deletions substrate/client/network/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use either::Either;
use libp2p::{
core::{
muxing::StreamMuxerBox,
transport::{Boxed, OptionalTransport},
transport::{timeout::TransportTimeout, Boxed, OptionalTransport},
upgrade,
},
dns, identity, noise, tcp, websocket, PeerId, Transport, TransportExt,
Expand All @@ -31,6 +31,9 @@ use std::{sync::Arc, time::Duration};

pub use libp2p::bandwidth::BandwidthSinks;

/// Timeout after which a TCP connection attempt is considered failed.
const TCP_DIAL_TIMEOUT: Duration = Duration::from_secs(20);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dq: The default timeout is in the order of minutes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On my linux box it's 2 min 10 secs.


/// Builds the transport that serves as a common ground for all connections.
///
/// If `memory_only` is true, then only communication within the same process are allowed. Only
Expand All @@ -56,7 +59,8 @@ pub fn build_transport(
let transport = if !memory_only {
// Main transport: DNS(TCP)
let tcp_config = tcp::Config::new().nodelay(true);
let tcp_trans = tcp::tokio::Transport::new(tcp_config.clone());
let tcp_trans =
TransportTimeout::new(tcp::tokio::Transport::new(tcp_config.clone()), TCP_DIAL_TIMEOUT);
let dns_init = dns::TokioDnsConfig::system(tcp_trans);

Either::Left(if let Ok(dns) = dns_init {
Expand All @@ -65,15 +69,20 @@ pub fn build_transport(
// Main transport can't be used for `/wss` addresses because WSS transport needs
// unresolved addresses (BUT WSS transport itself needs an instance of DNS transport to
// resolve and dial addresses).
let tcp_trans = tcp::tokio::Transport::new(tcp_config);
let tcp_trans =
TransportTimeout::new(tcp::tokio::Transport::new(tcp_config), TCP_DIAL_TIMEOUT);
let dns_for_wss = dns::TokioDnsConfig::system(tcp_trans)
.expect("same system_conf & resolver to work");
Either::Left(websocket::WsConfig::new(dns_for_wss).or_transport(dns))
} else {
// In case DNS can't be constructed, fallback to TCP + WS (WSS won't work)
let tcp_trans = tcp::tokio::Transport::new(tcp_config.clone());
let desktop_trans = websocket::WsConfig::new(tcp_trans)
.or_transport(tcp::tokio::Transport::new(tcp_config));
let tcp_trans = TransportTimeout::new(
tcp::tokio::Transport::new(tcp_config.clone()),
TCP_DIAL_TIMEOUT,
);
let tcp_trans_for_ws =
TransportTimeout::new(tcp::tokio::Transport::new(tcp_config), TCP_DIAL_TIMEOUT);
let desktop_trans = websocket::WsConfig::new(tcp_trans_for_ws).or_transport(tcp_trans);
Either::Right(desktop_trans)
})
} else {
Expand Down
Loading