From af344ed1cdfe8cac7912ee370dffb07c5d989a3b Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 11 Jan 2024 18:13:09 -0800 Subject: [PATCH 1/4] Fix determination of staked QUIC connections --- quic-client/src/lib.rs | 29 ++-- streamer/src/nonblocking/quic.rs | 173 +++++++------------- streamer/src/nonblocking/stream_throttle.rs | 93 ++--------- 3 files changed, 83 insertions(+), 212 deletions(-) diff --git a/quic-client/src/lib.rs b/quic-client/src/lib.rs index 6339c5080d9b17..9bd08fa01ad796 100644 --- a/quic-client/src/lib.rs +++ b/quic-client/src/lib.rs @@ -28,8 +28,7 @@ use { signature::{Keypair, Signer}, }, solana_streamer::{ - nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType}, - streamer::StakedNodes, + nonblocking::quic::compute_max_allowed_uni_streams, streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate, }, std::{ @@ -133,21 +132,17 @@ impl QuicConfig { } fn compute_max_parallel_streams(&self) -> usize { - let (client_type, stake, total_stake) = - self.maybe_client_pubkey - .map_or((ConnectionPeerType::Unstaked, 0, 0), |pubkey| { - self.maybe_staked_nodes.as_ref().map_or( - (ConnectionPeerType::Unstaked, 0, 0), - |stakes| { - let rstakes = stakes.read().unwrap(); - rstakes.get_node_stake(&pubkey).map_or( - (ConnectionPeerType::Unstaked, 0, rstakes.total_stake()), - |stake| (ConnectionPeerType::Staked, stake, rstakes.total_stake()), - ) - }, - ) - }); - compute_max_allowed_uni_streams(client_type, stake, total_stake) + let (stake, total_stake) = self.maybe_client_pubkey.map_or((0, 0), |pubkey| { + self.maybe_staked_nodes.as_ref().map_or((0, 0), |stakes| { + let rstakes = stakes.read().unwrap(); + rstakes + .get_node_stake(&pubkey) + .map_or((0, rstakes.total_stake()), |stake| { + (stake, rstakes.total_stake()) + }) + }) + }); + compute_max_allowed_uni_streams(stake, total_stake) } pub fn update_client_certificate( diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index f0b272917da49d..7ac5803c04f8cd 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -142,12 +142,11 @@ async fn run_server( const WAIT_FOR_CONNECTION_TIMEOUT: Duration = Duration::from_secs(1); debug!("spawn quic server"); let mut last_datapoint = Instant::now(); - let unstaked_connection_table: Arc> = Arc::new(Mutex::new( - ConnectionTable::new(ConnectionPeerType::Unstaked), - )); + let unstaked_connection_table: Arc> = + Arc::new(Mutex::new(ConnectionTable::new())); let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(stats.clone())); let staked_connection_table: Arc> = - Arc::new(Mutex::new(ConnectionTable::new(ConnectionPeerType::Staked))); + Arc::new(Mutex::new(ConnectionTable::new())); let (sender, receiver) = async_unbounded(); tokio::spawn(packet_batch_sender( packet_sender, @@ -227,39 +226,29 @@ fn get_connection_stake( )) } -pub fn compute_max_allowed_uni_streams( - peer_type: ConnectionPeerType, - peer_stake: u64, - total_stake: u64, -) -> usize { +pub fn compute_max_allowed_uni_streams(peer_stake: u64, total_stake: u64) -> usize { // Treat stake = 0 as unstaked if peer_stake == 0 { QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS } else { - match peer_type { - ConnectionPeerType::Staked => { - // No checked math for f64 type. So let's explicitly check for 0 here - if total_stake == 0 || peer_stake > total_stake { - warn!( - "Invalid stake values: peer_stake: {:?}, total_stake: {:?}", - peer_stake, total_stake, - ); - - QUIC_MIN_STAKED_CONCURRENT_STREAMS - } else { - let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - - QUIC_MIN_STAKED_CONCURRENT_STREAMS) - as f64; - - (((peer_stake as f64 / total_stake as f64) * delta) as usize - + QUIC_MIN_STAKED_CONCURRENT_STREAMS) - .clamp( - QUIC_MIN_STAKED_CONCURRENT_STREAMS, - QUIC_MAX_STAKED_CONCURRENT_STREAMS, - ) - } - } - _ => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, + // No checked math for f64 type. So let's explicitly check for 0 here + if total_stake == 0 || peer_stake > total_stake { + warn!( + "Invalid stake values: peer_stake: {:?}, total_stake: {:?}", + peer_stake, total_stake, + ); + + QUIC_MIN_STAKED_CONCURRENT_STREAMS + } else { + let delta = + (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; + + (((peer_stake as f64 / total_stake as f64) * delta) as usize + + QUIC_MIN_STAKED_CONCURRENT_STREAMS) + .clamp( + QUIC_MIN_STAKED_CONCURRENT_STREAMS, + QUIC_MAX_STAKED_CONCURRENT_STREAMS, + ) } } } @@ -313,19 +302,12 @@ fn handle_and_cache_new_connection( wait_for_chunk_timeout: Duration, stream_load_ema: Arc, ) -> Result<(), ConnectionHandlerError> { - if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams( - connection_table_l.peer_type, - params.stake, - params.total_stake, - ) as u64) + if let Ok(max_uni_streams) = + VarInt::from_u64(compute_max_allowed_uni_streams(params.stake, params.total_stake) as u64) { connection.set_max_concurrent_uni_streams(max_uni_streams); - let receive_window = compute_recieve_window( - params.max_stake, - params.min_stake, - connection_table_l.peer_type, - params.stake, - ); + let receive_window = + compute_recieve_window(params.max_stake, params.min_stake, params.stake); if let Ok(receive_window) = receive_window { connection.set_receive_window(receive_window); @@ -334,8 +316,7 @@ fn handle_and_cache_new_connection( let remote_addr = connection.remote_address(); debug!( - "Peer type: {:?}, stake {}, total stake {}, max streams {} receive_window {:?} from peer {}", - connection_table_l.peer_type, + "Peer stake {}, total stake {}, max streams {} receive_window {:?} from peer {}", params.stake, params.total_stake, max_uni_streams.into_inner(), @@ -353,7 +334,6 @@ fn handle_and_cache_new_connection( params.max_connections_per_peer, ) { - let peer_type = connection_table_l.peer_type; drop(connection_table_l); tokio::spawn(handle_connection( connection, @@ -362,7 +342,6 @@ fn handle_and_cache_new_connection( connection_table, stream_exit, params.clone(), - peer_type, wait_for_chunk_timeout, stream_load_ema, stream_counter, @@ -447,18 +426,13 @@ fn compute_receive_window_ratio_for_staked_node(max_stake: u64, min_stake: u64, fn compute_recieve_window( max_stake: u64, min_stake: u64, - peer_type: ConnectionPeerType, peer_stake: u64, ) -> Result { - match peer_type { - ConnectionPeerType::Unstaked => { - VarInt::from_u64(PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO) - } - ConnectionPeerType::Staked => { - let ratio = - compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake); - VarInt::from_u64(PACKET_DATA_SIZE as u64 * ratio) - } + if peer_stake == 0 { + VarInt::from_u64(PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO) + } else { + let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake); + VarInt::from_u64(PACKET_DATA_SIZE as u64 * ratio) } } @@ -696,7 +670,6 @@ async fn packet_batch_sender( } } -#[allow(clippy::too_many_arguments)] async fn handle_connection( connection: Connection, remote_addr: SocketAddr, @@ -704,7 +677,6 @@ async fn handle_connection( connection_table: Arc>, stream_exit: Arc, params: NewConnectionHandlerParams, - peer_type: ConnectionPeerType, wait_for_chunk_timeout: Duration, stream_load_ema: Arc, stream_counter: Arc, @@ -720,19 +692,17 @@ async fn handle_connection( stats.total_connections.fetch_add(1, Ordering::Relaxed); let mut max_streams_per_throttling_interval = stream_throttle::max_streams_for_connection_in_throttling_duration( - peer_type, params.stake, params.total_stake, stream_load_ema.clone(), ); - let staked_stream = matches!(peer_type, ConnectionPeerType::Staked) && params.stake > 0; while !stream_exit.load(Ordering::Relaxed) { if let Ok(stream) = tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await { match stream { Ok(mut stream) => { - if staked_stream { + if params.stake > 0 { max_streams_per_throttling_interval = stream_load_ema .available_load_capacity_in_throttling_duration( params.stake, @@ -748,7 +718,7 @@ async fn handle_connection( let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING)); continue; } - if staked_stream { + if params.stake > 0 { stream_load_ema.increment_load(); } stream_counter.stream_count.fetch_add(1, Ordering::Relaxed); @@ -782,7 +752,7 @@ async fn handle_connection( &remote_addr, &packet_sender, stats.clone(), - peer_type, + params.stake, ) .await { @@ -799,7 +769,7 @@ async fn handle_connection( } } stats.total_streams.fetch_sub(1, Ordering::Relaxed); - if staked_stream { + if params.stake > 0 { stream_load_ema.update_ema_if_needed(); } }); @@ -836,7 +806,7 @@ async fn handle_chunk( remote_addr: &SocketAddr, packet_sender: &AsyncSender, stats: Arc, - peer_type: ConnectionPeerType, + peer_stake: u64, ) -> bool { match chunk { Ok(maybe_chunk) => { @@ -884,17 +854,14 @@ async fn handle_chunk( accum.meta.size = std::cmp::max(accum.meta.size, end_of_chunk); } - match peer_type { - ConnectionPeerType::Staked => { - stats - .total_staked_chunks_received - .fetch_add(1, Ordering::Relaxed); - } - ConnectionPeerType::Unstaked => { - stats - .total_unstaked_chunks_received - .fetch_add(1, Ordering::Relaxed); - } + if peer_stake > 0 { + stats + .total_staked_chunks_received + .fetch_add(1, Ordering::Relaxed); + } else { + stats + .total_unstaked_chunks_received + .fetch_add(1, Ordering::Relaxed); } } else { // done receiving chunks @@ -986,12 +953,6 @@ impl Drop for ConnectionEntry { } } -#[derive(Copy, Clone, Debug)] -pub enum ConnectionPeerType { - Unstaked, - Staked, -} - #[derive(Copy, Clone, Eq, Hash, PartialEq)] enum ConnectionTableKey { IP(IpAddr), @@ -1010,17 +971,15 @@ impl ConnectionTableKey { struct ConnectionTable { table: IndexMap>, total_size: usize, - peer_type: ConnectionPeerType, } // Prune the connection which has the oldest update // Return number pruned impl ConnectionTable { - fn new(peer_type: ConnectionPeerType) -> Self { + fn new() -> Self { Self { table: IndexMap::default(), total_size: 0, - peer_type, } } @@ -1738,7 +1697,7 @@ pub mod test { fn test_prune_table_with_ip() { use std::net::Ipv4Addr; solana_logger::setup(); - let mut table = ConnectionTable::new(ConnectionPeerType::Staked); + let mut table = ConnectionTable::new(); let mut num_entries = 5; let max_connections_per_peer = 10; let sockets: Vec<_> = (0..num_entries) @@ -1787,7 +1746,7 @@ pub mod test { #[test] fn test_prune_table_with_unique_pubkeys() { solana_logger::setup(); - let mut table = ConnectionTable::new(ConnectionPeerType::Staked); + let mut table = ConnectionTable::new(); // We should be able to add more entries than max_connections_per_peer, since each entry is // from a different peer pubkey. @@ -1822,7 +1781,7 @@ pub mod test { #[test] fn test_prune_table_with_non_unique_pubkeys() { solana_logger::setup(); - let mut table = ConnectionTable::new(ConnectionPeerType::Staked); + let mut table = ConnectionTable::new(); let max_connections_per_peer = 10; let pubkey = Pubkey::new_unique(); @@ -1882,7 +1841,7 @@ pub mod test { fn test_prune_table_random() { use std::net::Ipv4Addr; solana_logger::setup(); - let mut table = ConnectionTable::new(ConnectionPeerType::Staked); + let mut table = ConnectionTable::new(); let num_entries = 5; let max_connections_per_peer = 10; let sockets: Vec<_> = (0..num_entries) @@ -1919,7 +1878,7 @@ pub mod test { fn test_remove_connections() { use std::net::Ipv4Addr; solana_logger::setup(); - let mut table = ConnectionTable::new(ConnectionPeerType::Staked); + let mut table = ConnectionTable::new(); let num_ips = 5; let max_connections_per_peer = 10; let mut sockets: Vec<_> = (0..num_ips) @@ -1978,46 +1937,26 @@ pub mod test { fn test_max_allowed_uni_streams() { assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, 0), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10, 0), + compute_max_allowed_uni_streams(0, 0), QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, 0), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 10, 0), + compute_max_allowed_uni_streams(10, 0), QUIC_MIN_STAKED_CONCURRENT_STREAMS ); let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 1000, 10000), + compute_max_allowed_uni_streams(1000, 10000), QUIC_MAX_STAKED_CONCURRENT_STREAMS, ); assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 100, 10000), + compute_max_allowed_uni_streams(100, 10000), ((delta / (100_f64)) as usize + QUIC_MIN_STAKED_CONCURRENT_STREAMS) .min(QUIC_MAX_STAKED_CONCURRENT_STREAMS) ); assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, 10000), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 1000, 10000), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 1, 10000), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, 10000), + compute_max_allowed_uni_streams(0, 10000), QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); } diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index 596aa8882f2e0c..67f98915145b06 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -1,8 +1,5 @@ use { - crate::{ - nonblocking::quic::ConnectionPeerType, - quic::{StreamStats, MAX_UNSTAKED_CONNECTIONS}, - }, + crate::quic::{StreamStats, MAX_UNSTAKED_CONNECTIONS}, percentage::Percentage, std::{ cmp, @@ -156,12 +153,11 @@ impl StakedStreamLoadEMA { } pub(crate) fn max_streams_for_connection_in_throttling_duration( - connection_type: ConnectionPeerType, stake: u64, total_stake: u64, ema_load: Arc, ) -> u64 { - if matches!(connection_type, ConnectionPeerType::Unstaked) || stake == 0 { + if stake == 0 { let max_num_connections = u64::try_from(MAX_UNSTAKED_CONNECTIONS).unwrap_or_else(|_| { error!( "Failed to convert maximum number of unstaked connections {} to u64.", @@ -213,13 +209,10 @@ pub mod test { use { super::*, crate::{ - nonblocking::{ - quic::ConnectionPeerType, - stream_throttle::{ - max_streams_for_connection_in_throttling_duration, - MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION, - STREAM_LOAD_EMA_INTERVAL_MS, - }, + nonblocking::stream_throttle::{ + max_streams_for_connection_in_throttling_duration, + MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION, + STREAM_LOAD_EMA_INTERVAL_MS, }, quic::StreamStats, }, @@ -234,35 +227,14 @@ pub mod test { let load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default()))); // 25K packets per ms * 20% / 500 max unstaked connections assert_eq!( - max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Unstaked, - 0, - 10000, - load_ema.clone(), - ), - 10 - ); - - // 25K packets per ms * 20% / 500 max unstaked connections - assert_eq!( - max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Unstaked, - 10, - 10000, - load_ema.clone(), - ), + max_streams_for_connection_in_throttling_duration(0, 10000, load_ema.clone(),), 10 ); // If stake is 0, same limits as unstaked connections will apply. // 25K packets per ms * 20% / 500 max unstaked connections assert_eq!( - max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Staked, - 0, - 10000, - load_ema.clone(), - ), + max_streams_for_connection_in_throttling_duration(0, 10000, load_ema.clone(),), 10 ); } @@ -282,24 +254,14 @@ pub mod test { // ema_load = 10K, stake = 15, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 10K) * 15 / 10K = 30 assert_eq!( - max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Staked, - 15, - 10000, - load_ema.clone(), - ), + max_streams_for_connection_in_throttling_duration(15, 10000, load_ema.clone(),), 30 ); // ema_load = 10K, stake = 1K, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 10K) * 1K / 10K = 2K assert_eq!( - max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Staked, - 1000, - 10000, - load_ema.clone(), - ), + max_streams_for_connection_in_throttling_duration(1000, 10000, load_ema.clone(),), 2000 ); @@ -307,24 +269,14 @@ pub mod test { // ema_load = 2.5K, stake = 15, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 2.5K) * 15 / 10K = 120 assert_eq!( - max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Staked, - 15, - 10000, - load_ema.clone(), - ), + max_streams_for_connection_in_throttling_duration(15, 10000, load_ema.clone(),), 120 ); // ema_load = 2.5K, stake = 1K, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 2.5K) * 1K / 10K = 8000 assert_eq!( - max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Staked, - 1000, - 10000, - load_ema.clone(), - ), + max_streams_for_connection_in_throttling_duration(1000, 10000, load_ema.clone(),), 8000 ); @@ -333,35 +285,20 @@ pub mod test { load_ema.current_load_ema.store(2000, Ordering::Relaxed); // function = ((10K * 10K) / 25% of 10K) * stake / total_stake assert_eq!( - max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Staked, - 15, - 10000, - load_ema.clone(), - ), + max_streams_for_connection_in_throttling_duration(15, 10000, load_ema.clone(),), 120 ); // function = ((10K * 10K) / 25% of 10K) * stake / total_stake assert_eq!( - max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Staked, - 1000, - 10000, - load_ema.clone(), - ), + max_streams_for_connection_in_throttling_duration(1000, 10000, load_ema.clone(),), 8000 ); // At 1/40000 stake weight, and minimum load, it should still allow // MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION of streams. assert_eq!( - max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Staked, - 1, - 40000, - load_ema.clone(), - ), + max_streams_for_connection_in_throttling_duration(1, 40000, load_ema.clone(),), MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION ); } From c09468840ef3fbbac93ee51d4f369a352cee52fc Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Fri, 12 Jan 2024 14:08:31 -0800 Subject: [PATCH 2/4] address review comments --- quic-client/src/lib.rs | 29 ++- streamer/src/nonblocking/quic.rs | 250 +++++++++++--------- streamer/src/nonblocking/stream_throttle.rs | 92 ++++--- 3 files changed, 219 insertions(+), 152 deletions(-) diff --git a/quic-client/src/lib.rs b/quic-client/src/lib.rs index 9bd08fa01ad796..6bd9726cbfb3ea 100644 --- a/quic-client/src/lib.rs +++ b/quic-client/src/lib.rs @@ -28,7 +28,8 @@ use { signature::{Keypair, Signer}, }, solana_streamer::{ - nonblocking::quic::compute_max_allowed_uni_streams, streamer::StakedNodes, + nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType}, + streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate, }, std::{ @@ -132,17 +133,21 @@ impl QuicConfig { } fn compute_max_parallel_streams(&self) -> usize { - let (stake, total_stake) = self.maybe_client_pubkey.map_or((0, 0), |pubkey| { - self.maybe_staked_nodes.as_ref().map_or((0, 0), |stakes| { - let rstakes = stakes.read().unwrap(); - rstakes - .get_node_stake(&pubkey) - .map_or((0, rstakes.total_stake()), |stake| { - (stake, rstakes.total_stake()) - }) - }) - }); - compute_max_allowed_uni_streams(stake, total_stake) + let (client_type, total_stake) = + self.maybe_client_pubkey + .map_or((ConnectionPeerType::Unstaked, 0), |pubkey| { + self.maybe_staked_nodes.as_ref().map_or( + (ConnectionPeerType::Unstaked, 0), + |stakes| { + let rstakes = stakes.read().unwrap(); + rstakes.get_node_stake(&pubkey).map_or( + (ConnectionPeerType::Unstaked, rstakes.total_stake()), + |stake| (ConnectionPeerType::Staked(stake), rstakes.total_stake()), + ) + }, + ) + }); + compute_max_allowed_uni_streams(client_type, total_stake) } pub fn update_client_certificate( diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 7ac5803c04f8cd..5b8bdbe526661b 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -83,6 +83,18 @@ struct PacketAccumulator { pub chunks: Vec, } +#[derive(Copy, Clone, Debug)] +pub enum ConnectionPeerType { + Unstaked, + Staked(u64), +} + +impl ConnectionPeerType { + fn staked(&self) -> bool { + matches!(self, ConnectionPeerType::Staked(_)) + } +} + #[allow(clippy::too_many_arguments)] pub fn spawn_server( name: &'static str, @@ -226,30 +238,30 @@ fn get_connection_stake( )) } -pub fn compute_max_allowed_uni_streams(peer_stake: u64, total_stake: u64) -> usize { - // Treat stake = 0 as unstaked - if peer_stake == 0 { - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - } else { - // No checked math for f64 type. So let's explicitly check for 0 here - if total_stake == 0 || peer_stake > total_stake { - warn!( - "Invalid stake values: peer_stake: {:?}, total_stake: {:?}", - peer_stake, total_stake, - ); +pub fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize { + match peer_type { + ConnectionPeerType::Staked(peer_stake) => { + // No checked math for f64 type. So let's explicitly check for 0 here + if total_stake == 0 || peer_stake > total_stake { + warn!( + "Invalid stake values: peer_stake: {:?}, total_stake: {:?}", + peer_stake, total_stake, + ); - QUIC_MIN_STAKED_CONCURRENT_STREAMS - } else { - let delta = - (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; - - (((peer_stake as f64 / total_stake as f64) * delta) as usize - + QUIC_MIN_STAKED_CONCURRENT_STREAMS) - .clamp( - QUIC_MIN_STAKED_CONCURRENT_STREAMS, - QUIC_MAX_STAKED_CONCURRENT_STREAMS, - ) + QUIC_MIN_STAKED_CONCURRENT_STREAMS + } else { + let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS + - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; + + (((peer_stake as f64 / total_stake as f64) * delta) as usize + + QUIC_MIN_STAKED_CONCURRENT_STREAMS) + .clamp( + QUIC_MIN_STAKED_CONCURRENT_STREAMS, + QUIC_MAX_STAKED_CONCURRENT_STREAMS, + ) + } } + ConnectionPeerType::Unstaked => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, } } @@ -267,7 +279,7 @@ struct NewConnectionHandlerParams { // we're sticking with an async channel packet_sender: AsyncSender, remote_pubkey: Option, - stake: u64, + peer_type: ConnectionPeerType, total_stake: u64, max_connections_per_peer: usize, stats: Arc, @@ -284,7 +296,7 @@ impl NewConnectionHandlerParams { NewConnectionHandlerParams { packet_sender, remote_pubkey: None, - stake: 0, + peer_type: ConnectionPeerType::Unstaked, total_stake: 0, max_connections_per_peer, stats, @@ -302,12 +314,14 @@ fn handle_and_cache_new_connection( wait_for_chunk_timeout: Duration, stream_load_ema: Arc, ) -> Result<(), ConnectionHandlerError> { - if let Ok(max_uni_streams) = - VarInt::from_u64(compute_max_allowed_uni_streams(params.stake, params.total_stake) as u64) + if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams( + params.peer_type, + params.total_stake, + ) as u64) { connection.set_max_concurrent_uni_streams(max_uni_streams); let receive_window = - compute_recieve_window(params.max_stake, params.min_stake, params.stake); + compute_recieve_window(params.max_stake, params.min_stake, params.peer_type); if let Ok(receive_window) = receive_window { connection.set_receive_window(receive_window); @@ -316,8 +330,8 @@ fn handle_and_cache_new_connection( let remote_addr = connection.remote_address(); debug!( - "Peer stake {}, total stake {}, max streams {} receive_window {:?} from peer {}", - params.stake, + "Peer type {:?}, total stake {}, max streams {} receive_window {:?} from peer {}", + params.peer_type, params.total_stake, max_uni_streams.into_inner(), receive_window, @@ -329,7 +343,7 @@ fn handle_and_cache_new_connection( ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), remote_addr.port(), Some(connection.clone()), - params.stake, + params.peer_type, timing::timestamp(), params.max_connections_per_peer, ) @@ -426,13 +440,17 @@ fn compute_receive_window_ratio_for_staked_node(max_stake: u64, min_stake: u64, fn compute_recieve_window( max_stake: u64, min_stake: u64, - peer_stake: u64, + peer_type: ConnectionPeerType, ) -> Result { - if peer_stake == 0 { - VarInt::from_u64(PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO) - } else { - let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake); - VarInt::from_u64(PACKET_DATA_SIZE as u64 * ratio) + match peer_type { + ConnectionPeerType::Unstaked => { + VarInt::from_u64(PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO) + } + ConnectionPeerType::Staked(peer_stake) => { + let ratio = + compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake); + VarInt::from_u64(PACKET_DATA_SIZE as u64 * ratio) + } } } @@ -467,7 +485,7 @@ async fn setup_connection( NewConnectionHandlerParams { packet_sender, remote_pubkey: Some(pubkey), - stake, + peer_type: ConnectionPeerType::Staked(stake), total_stake, max_connections_per_peer, stats: stats.clone(), @@ -477,31 +495,54 @@ async fn setup_connection( }, ); - if params.stake > 0 { - let mut connection_table_l = staked_connection_table.lock().unwrap(); - if connection_table_l.total_size >= max_staked_connections { - let num_pruned = - connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, params.stake); - stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); - } + match params.peer_type { + ConnectionPeerType::Staked(stake) => { + let mut connection_table_l = staked_connection_table.lock().unwrap(); + if connection_table_l.total_size >= max_staked_connections { + let num_pruned = + connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, stake); + stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); + } - if connection_table_l.total_size < max_staked_connections { - if let Ok(()) = handle_and_cache_new_connection( - new_connection, - connection_table_l, - staked_connection_table.clone(), - ¶ms, - wait_for_chunk_timeout, - stream_load_ema.clone(), - ) { - stats - .connection_added_from_staked_peer - .fetch_add(1, Ordering::Relaxed); + if connection_table_l.total_size < max_staked_connections { + if let Ok(()) = handle_and_cache_new_connection( + new_connection, + connection_table_l, + staked_connection_table.clone(), + ¶ms, + wait_for_chunk_timeout, + stream_load_ema.clone(), + ) { + stats + .connection_added_from_staked_peer + .fetch_add(1, Ordering::Relaxed); + } + } else { + // If we couldn't prune a connection in the staked connection table, let's + // put this connection in the unstaked connection table. If needed, prune a + // connection from the unstaked connection table. + if let Ok(()) = prune_unstaked_connections_and_add_new_connection( + new_connection, + unstaked_connection_table.clone(), + max_unstaked_connections, + ¶ms, + wait_for_chunk_timeout, + stream_load_ema.clone(), + ) { + stats + .connection_added_from_staked_peer + .fetch_add(1, Ordering::Relaxed); + } else { + stats + .connection_add_failed_on_pruning + .fetch_add(1, Ordering::Relaxed); + stats + .connection_add_failed_staked_node + .fetch_add(1, Ordering::Relaxed); + } } - } else { - // If we couldn't prune a connection in the staked connection table, let's - // put this connection in the unstaked connection table. If needed, prune a - // connection from the unstaked connection table. + } + ConnectionPeerType::Unstaked => { if let Ok(()) = prune_unstaked_connections_and_add_new_connection( new_connection, unstaked_connection_table.clone(), @@ -511,32 +552,14 @@ async fn setup_connection( stream_load_ema.clone(), ) { stats - .connection_added_from_staked_peer + .connection_added_from_unstaked_peer .fetch_add(1, Ordering::Relaxed); } else { stats - .connection_add_failed_on_pruning - .fetch_add(1, Ordering::Relaxed); - stats - .connection_add_failed_staked_node + .connection_add_failed_unstaked_node .fetch_add(1, Ordering::Relaxed); } } - } else if let Ok(()) = prune_unstaked_connections_and_add_new_connection( - new_connection, - unstaked_connection_table.clone(), - max_unstaked_connections, - ¶ms, - wait_for_chunk_timeout, - stream_load_ema.clone(), - ) { - stats - .connection_added_from_unstaked_peer - .fetch_add(1, Ordering::Relaxed); - } else { - stats - .connection_add_failed_unstaked_node - .fetch_add(1, Ordering::Relaxed); } } Err(e) => { @@ -692,7 +715,7 @@ async fn handle_connection( stats.total_connections.fetch_add(1, Ordering::Relaxed); let mut max_streams_per_throttling_interval = stream_throttle::max_streams_for_connection_in_throttling_duration( - params.stake, + params.peer_type, params.total_stake, stream_load_ema.clone(), ); @@ -702,10 +725,10 @@ async fn handle_connection( { match stream { Ok(mut stream) => { - if params.stake > 0 { + if let ConnectionPeerType::Staked(peer_stake) = params.peer_type { max_streams_per_throttling_interval = stream_load_ema .available_load_capacity_in_throttling_duration( - params.stake, + peer_stake, params.total_stake, ); } @@ -718,7 +741,7 @@ async fn handle_connection( let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING)); continue; } - if params.stake > 0 { + if params.peer_type.staked() { stream_load_ema.increment_load(); } stream_counter.stream_count.fetch_add(1, Ordering::Relaxed); @@ -752,7 +775,7 @@ async fn handle_connection( &remote_addr, &packet_sender, stats.clone(), - params.stake, + params.peer_type, ) .await { @@ -769,7 +792,7 @@ async fn handle_connection( } } stats.total_streams.fetch_sub(1, Ordering::Relaxed); - if params.stake > 0 { + if params.peer_type.staked() { stream_load_ema.update_ema_if_needed(); } }); @@ -806,7 +829,7 @@ async fn handle_chunk( remote_addr: &SocketAddr, packet_sender: &AsyncSender, stats: Arc, - peer_stake: u64, + peer_type: ConnectionPeerType, ) -> bool { match chunk { Ok(maybe_chunk) => { @@ -854,7 +877,7 @@ async fn handle_chunk( accum.meta.size = std::cmp::max(accum.meta.size, end_of_chunk); } - if peer_stake > 0 { + if peer_type.staked() { stats .total_staked_chunks_received .fetch_add(1, Ordering::Relaxed); @@ -910,7 +933,7 @@ async fn handle_chunk( #[derive(Debug)] struct ConnectionEntry { exit: Arc, - stake: u64, + peer_type: ConnectionPeerType, last_update: Arc, port: u16, connection: Option, @@ -920,7 +943,7 @@ struct ConnectionEntry { impl ConnectionEntry { fn new( exit: Arc, - stake: u64, + peer_type: ConnectionPeerType, last_update: Arc, port: u16, connection: Option, @@ -928,7 +951,7 @@ impl ConnectionEntry { ) -> Self { Self { exit, - stake, + peer_type, last_update, port, connection, @@ -939,6 +962,13 @@ impl ConnectionEntry { fn last_update(&self) -> u64 { self.last_update.load(Ordering::Relaxed) } + + fn stake(&self) -> u64 { + match self.peer_type { + ConnectionPeerType::Unstaked => 0, + ConnectionPeerType::Staked(stake) => stake, + } + } } impl Drop for ConnectionEntry { @@ -1014,7 +1044,7 @@ impl ConnectionTable { }) .map(|index| { let connection = self.table[index].first(); - let stake = connection.map(|connection| connection.stake); + let stake = connection.map(|connection| connection.stake()); (index, stake) }) .take(sample_size) @@ -1032,7 +1062,7 @@ impl ConnectionTable { key: ConnectionTableKey, port: u16, connection: Option, - stake: u64, + peer_type: ConnectionPeerType, last_update: u64, max_connections_per_peer: usize, ) -> Option<( @@ -1049,7 +1079,7 @@ impl ConnectionTable { if has_connection_capacity { let exit = Arc::new(AtomicBool::new(false)); let last_update = Arc::new(AtomicU64::new(last_update)); - let stream_counter = if stake > 0 { + let stream_counter = if peer_type.staked() { connection_entry .first() .map(|entry| entry.stream_counter.clone()) @@ -1061,7 +1091,7 @@ impl ConnectionTable { }; connection_entry.push(ConnectionEntry::new( exit.clone(), - stake, + peer_type, last_update.clone(), port, connection, @@ -1709,7 +1739,7 @@ pub mod test { ConnectionTableKey::IP(socket.ip()), socket.port(), None, - 0, + ConnectionPeerType::Unstaked, i as u64, max_connections_per_peer, ) @@ -1721,7 +1751,7 @@ pub mod test { ConnectionTableKey::IP(sockets[0].ip()), sockets[0].port(), None, - 0, + ConnectionPeerType::Unstaked, 5, max_connections_per_peer, ) @@ -1760,7 +1790,7 @@ pub mod test { ConnectionTableKey::Pubkey(*pubkey), 0, None, - 0, + ConnectionPeerType::Unstaked, i as u64, max_connections_per_peer, ) @@ -1791,7 +1821,7 @@ pub mod test { ConnectionTableKey::Pubkey(pubkey), 0, None, - 0, + ConnectionPeerType::Unstaked, i as u64, max_connections_per_peer, ) @@ -1805,7 +1835,7 @@ pub mod test { ConnectionTableKey::Pubkey(pubkey), 0, None, - 0, + ConnectionPeerType::Unstaked, 10, max_connections_per_peer, ) @@ -1819,7 +1849,7 @@ pub mod test { ConnectionTableKey::Pubkey(pubkey2), 0, None, - 0, + ConnectionPeerType::Unstaked, 10, max_connections_per_peer, ) @@ -1853,7 +1883,7 @@ pub mod test { ConnectionTableKey::IP(socket.ip()), socket.port(), None, - (i + 1) as u64, + ConnectionPeerType::Staked((i + 1) as u64), i as u64, max_connections_per_peer, ) @@ -1890,7 +1920,7 @@ pub mod test { ConnectionTableKey::IP(socket.ip()), socket.port(), None, - 0, + ConnectionPeerType::Unstaked, (i * 2) as u64, max_connections_per_peer, ) @@ -1901,7 +1931,7 @@ pub mod test { ConnectionTableKey::IP(socket.ip()), socket.port(), None, - 0, + ConnectionPeerType::Unstaked, (i * 2 + 1) as u64, max_connections_per_peer, ) @@ -1915,7 +1945,7 @@ pub mod test { ConnectionTableKey::IP(single_connection_addr.ip()), single_connection_addr.port(), None, - 0, + ConnectionPeerType::Unstaked, (num_ips * 2) as u64, max_connections_per_peer, ) @@ -1937,26 +1967,26 @@ pub mod test { fn test_max_allowed_uni_streams() { assert_eq!( - compute_max_allowed_uni_streams(0, 0), + compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0), QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); assert_eq!( - compute_max_allowed_uni_streams(10, 0), + compute_max_allowed_uni_streams(ConnectionPeerType::Staked(10), 0), QUIC_MIN_STAKED_CONCURRENT_STREAMS ); let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; assert_eq!( - compute_max_allowed_uni_streams(1000, 10000), + compute_max_allowed_uni_streams(ConnectionPeerType::Staked(1000), 10000), QUIC_MAX_STAKED_CONCURRENT_STREAMS, ); assert_eq!( - compute_max_allowed_uni_streams(100, 10000), + compute_max_allowed_uni_streams(ConnectionPeerType::Staked(100), 10000), ((delta / (100_f64)) as usize + QUIC_MIN_STAKED_CONCURRENT_STREAMS) .min(QUIC_MAX_STAKED_CONCURRENT_STREAMS) ); assert_eq!( - compute_max_allowed_uni_streams(0, 10000), + compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10000), QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); } diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index 67f98915145b06..f7da3a17dcf178 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -1,5 +1,8 @@ use { - crate::quic::{StreamStats, MAX_UNSTAKED_CONNECTIONS}, + crate::{ + nonblocking::quic::ConnectionPeerType, + quic::{StreamStats, MAX_UNSTAKED_CONNECTIONS}, + }, percentage::Percentage, std::{ cmp, @@ -153,23 +156,27 @@ impl StakedStreamLoadEMA { } pub(crate) fn max_streams_for_connection_in_throttling_duration( - stake: u64, + peer_type: ConnectionPeerType, total_stake: u64, ema_load: Arc, ) -> u64 { - if stake == 0 { - let max_num_connections = u64::try_from(MAX_UNSTAKED_CONNECTIONS).unwrap_or_else(|_| { - error!( - "Failed to convert maximum number of unstaked connections {} to u64.", - MAX_UNSTAKED_CONNECTIONS - ); - 500 - }); - Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT) - .apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) - .saturating_div(max_num_connections) - } else { - ema_load.available_load_capacity_in_throttling_duration(stake, total_stake) + match peer_type { + ConnectionPeerType::Unstaked => { + let max_num_connections = + u64::try_from(MAX_UNSTAKED_CONNECTIONS).unwrap_or_else(|_| { + error!( + "Failed to convert maximum number of unstaked connections {} to u64.", + MAX_UNSTAKED_CONNECTIONS + ); + 500 + }); + Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT) + .apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) + .saturating_div(max_num_connections) + } + ConnectionPeerType::Staked(stake) => { + ema_load.available_load_capacity_in_throttling_duration(stake, total_stake) + } } } @@ -227,14 +234,11 @@ pub mod test { let load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default()))); // 25K packets per ms * 20% / 500 max unstaked connections assert_eq!( - max_streams_for_connection_in_throttling_duration(0, 10000, load_ema.clone(),), - 10 - ); - - // If stake is 0, same limits as unstaked connections will apply. - // 25K packets per ms * 20% / 500 max unstaked connections - assert_eq!( - max_streams_for_connection_in_throttling_duration(0, 10000, load_ema.clone(),), + max_streams_for_connection_in_throttling_duration( + ConnectionPeerType::Unstaked, + 10000, + load_ema.clone(), + ), 10 ); } @@ -254,14 +258,22 @@ pub mod test { // ema_load = 10K, stake = 15, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 10K) * 15 / 10K = 30 assert_eq!( - max_streams_for_connection_in_throttling_duration(15, 10000, load_ema.clone(),), + max_streams_for_connection_in_throttling_duration( + ConnectionPeerType::Staked(15), + 10000, + load_ema.clone(), + ), 30 ); // ema_load = 10K, stake = 1K, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 10K) * 1K / 10K = 2K assert_eq!( - max_streams_for_connection_in_throttling_duration(1000, 10000, load_ema.clone(),), + max_streams_for_connection_in_throttling_duration( + ConnectionPeerType::Staked(1000), + 10000, + load_ema.clone(), + ), 2000 ); @@ -269,14 +281,22 @@ pub mod test { // ema_load = 2.5K, stake = 15, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 2.5K) * 15 / 10K = 120 assert_eq!( - max_streams_for_connection_in_throttling_duration(15, 10000, load_ema.clone(),), + max_streams_for_connection_in_throttling_duration( + ConnectionPeerType::Staked(15), + 10000, + load_ema.clone(), + ), 120 ); // ema_load = 2.5K, stake = 1K, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 2.5K) * 1K / 10K = 8000 assert_eq!( - max_streams_for_connection_in_throttling_duration(1000, 10000, load_ema.clone(),), + max_streams_for_connection_in_throttling_duration( + ConnectionPeerType::Staked(1000), + 10000, + load_ema.clone(), + ), 8000 ); @@ -285,20 +305,32 @@ pub mod test { load_ema.current_load_ema.store(2000, Ordering::Relaxed); // function = ((10K * 10K) / 25% of 10K) * stake / total_stake assert_eq!( - max_streams_for_connection_in_throttling_duration(15, 10000, load_ema.clone(),), + max_streams_for_connection_in_throttling_duration( + ConnectionPeerType::Staked(15), + 10000, + load_ema.clone(), + ), 120 ); // function = ((10K * 10K) / 25% of 10K) * stake / total_stake assert_eq!( - max_streams_for_connection_in_throttling_duration(1000, 10000, load_ema.clone(),), + max_streams_for_connection_in_throttling_duration( + ConnectionPeerType::Staked(1000), + 10000, + load_ema.clone(), + ), 8000 ); // At 1/40000 stake weight, and minimum load, it should still allow // MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION of streams. assert_eq!( - max_streams_for_connection_in_throttling_duration(1, 40000, load_ema.clone(),), + max_streams_for_connection_in_throttling_duration( + ConnectionPeerType::Staked(1), + 40000, + load_ema.clone(), + ), MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION ); } From 51a18a93cada914fb62ee753d736db130fcb0a0d Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Fri, 12 Jan 2024 17:08:23 -0800 Subject: [PATCH 3/4] review comments --- streamer/src/nonblocking/quic.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 5b8bdbe526661b..f42e5e85e19a50 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -90,7 +90,7 @@ pub enum ConnectionPeerType { } impl ConnectionPeerType { - fn staked(&self) -> bool { + fn is_staked(&self) -> bool { matches!(self, ConnectionPeerType::Staked(_)) } } @@ -741,7 +741,7 @@ async fn handle_connection( let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING)); continue; } - if params.peer_type.staked() { + if params.peer_type.is_staked() { stream_load_ema.increment_load(); } stream_counter.stream_count.fetch_add(1, Ordering::Relaxed); @@ -792,7 +792,7 @@ async fn handle_connection( } } stats.total_streams.fetch_sub(1, Ordering::Relaxed); - if params.peer_type.staked() { + if params.peer_type.is_staked() { stream_load_ema.update_ema_if_needed(); } }); @@ -877,7 +877,7 @@ async fn handle_chunk( accum.meta.size = std::cmp::max(accum.meta.size, end_of_chunk); } - if peer_type.staked() { + if peer_type.is_staked() { stats .total_staked_chunks_received .fetch_add(1, Ordering::Relaxed); @@ -1079,7 +1079,7 @@ impl ConnectionTable { if has_connection_capacity { let exit = Arc::new(AtomicBool::new(false)); let last_update = Arc::new(AtomicU64::new(last_update)); - let stream_counter = if peer_type.staked() { + let stream_counter = if peer_type.is_staked() { connection_entry .first() .map(|entry| entry.stream_counter.clone()) From 8fb5a01f137e0e92f839ee8d9b5ef5d945268620 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Sat, 13 Jan 2024 08:32:03 -0800 Subject: [PATCH 4/4] treat connections with zero stake as unstaked --- streamer/src/nonblocking/quic.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index f42e5e85e19a50..89c3deb2e61ea7 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -482,10 +482,15 @@ async fn setup_connection( stats.clone(), ), |(pubkey, stake, total_stake, max_stake, min_stake)| { + let peer_type = if stake > 0 { + ConnectionPeerType::Staked(stake) + } else { + ConnectionPeerType::Unstaked + }; NewConnectionHandlerParams { packet_sender, remote_pubkey: Some(pubkey), - peer_type: ConnectionPeerType::Staked(stake), + peer_type, total_stake, max_connections_per_peer, stats: stats.clone(),