diff --git a/quic-client/src/lib.rs b/quic-client/src/lib.rs index 6339c5080d9b17..6bd9726cbfb3ea 100644 --- a/quic-client/src/lib.rs +++ b/quic-client/src/lib.rs @@ -133,21 +133,21 @@ impl QuicConfig { } fn compute_max_parallel_streams(&self) -> usize { - let (client_type, stake, total_stake) = + let (client_type, total_stake) = self.maybe_client_pubkey - .map_or((ConnectionPeerType::Unstaked, 0, 0), |pubkey| { + .map_or((ConnectionPeerType::Unstaked, 0), |pubkey| { self.maybe_staked_nodes.as_ref().map_or( - (ConnectionPeerType::Unstaked, 0, 0), + (ConnectionPeerType::Unstaked, 0), |stakes| { let rstakes = stakes.read().unwrap(); rstakes.get_node_stake(&pubkey).map_or( - (ConnectionPeerType::Unstaked, 0, rstakes.total_stake()), - |stake| (ConnectionPeerType::Staked, stake, rstakes.total_stake()), + (ConnectionPeerType::Unstaked, rstakes.total_stake()), + |stake| (ConnectionPeerType::Staked(stake), rstakes.total_stake()), ) }, ) }); - compute_max_allowed_uni_streams(client_type, stake, total_stake) + compute_max_allowed_uni_streams(client_type, total_stake) } pub fn update_client_certificate( diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index f0b272917da49d..89c3deb2e61ea7 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -83,6 +83,18 @@ struct PacketAccumulator { pub chunks: Vec, } +#[derive(Copy, Clone, Debug)] +pub enum ConnectionPeerType { + Unstaked, + Staked(u64), +} + +impl ConnectionPeerType { + fn is_staked(&self) -> bool { + matches!(self, ConnectionPeerType::Staked(_)) + } +} + #[allow(clippy::too_many_arguments)] pub fn spawn_server( name: &'static str, @@ -142,12 +154,11 @@ async fn run_server( const WAIT_FOR_CONNECTION_TIMEOUT: Duration = Duration::from_secs(1); debug!("spawn quic server"); let mut last_datapoint = Instant::now(); - let unstaked_connection_table: Arc> = Arc::new(Mutex::new( - ConnectionTable::new(ConnectionPeerType::Unstaked), - )); + let unstaked_connection_table: Arc> = + Arc::new(Mutex::new(ConnectionTable::new())); let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(stats.clone())); let staked_connection_table: Arc> = - Arc::new(Mutex::new(ConnectionTable::new(ConnectionPeerType::Staked))); + Arc::new(Mutex::new(ConnectionTable::new())); let (sender, receiver) = async_unbounded(); tokio::spawn(packet_batch_sender( packet_sender, @@ -227,40 +238,30 @@ fn get_connection_stake( )) } -pub fn compute_max_allowed_uni_streams( - peer_type: ConnectionPeerType, - peer_stake: u64, - total_stake: u64, -) -> usize { - // Treat stake = 0 as unstaked - if peer_stake == 0 { - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - } else { - match peer_type { - ConnectionPeerType::Staked => { - // No checked math for f64 type. So let's explicitly check for 0 here - if total_stake == 0 || peer_stake > total_stake { - warn!( - "Invalid stake values: peer_stake: {:?}, total_stake: {:?}", - peer_stake, total_stake, - ); - - QUIC_MIN_STAKED_CONCURRENT_STREAMS - } else { - let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - - QUIC_MIN_STAKED_CONCURRENT_STREAMS) - as f64; - - (((peer_stake as f64 / total_stake as f64) * delta) as usize - + QUIC_MIN_STAKED_CONCURRENT_STREAMS) - .clamp( - QUIC_MIN_STAKED_CONCURRENT_STREAMS, - QUIC_MAX_STAKED_CONCURRENT_STREAMS, - ) - } +pub fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize { + match peer_type { + ConnectionPeerType::Staked(peer_stake) => { + // No checked math for f64 type. So let's explicitly check for 0 here + if total_stake == 0 || peer_stake > total_stake { + warn!( + "Invalid stake values: peer_stake: {:?}, total_stake: {:?}", + peer_stake, total_stake, + ); + + QUIC_MIN_STAKED_CONCURRENT_STREAMS + } else { + let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS + - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; + + (((peer_stake as f64 / total_stake as f64) * delta) as usize + + QUIC_MIN_STAKED_CONCURRENT_STREAMS) + .clamp( + QUIC_MIN_STAKED_CONCURRENT_STREAMS, + QUIC_MAX_STAKED_CONCURRENT_STREAMS, + ) } - _ => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, } + ConnectionPeerType::Unstaked => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, } } @@ -278,7 +279,7 @@ struct NewConnectionHandlerParams { // we're sticking with an async channel packet_sender: AsyncSender, remote_pubkey: Option, - stake: u64, + peer_type: ConnectionPeerType, total_stake: u64, max_connections_per_peer: usize, stats: Arc, @@ -295,7 +296,7 @@ impl NewConnectionHandlerParams { NewConnectionHandlerParams { packet_sender, remote_pubkey: None, - stake: 0, + peer_type: ConnectionPeerType::Unstaked, total_stake: 0, max_connections_per_peer, stats, @@ -314,18 +315,13 @@ fn handle_and_cache_new_connection( stream_load_ema: Arc, ) -> Result<(), ConnectionHandlerError> { if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams( - connection_table_l.peer_type, - params.stake, + params.peer_type, params.total_stake, ) as u64) { connection.set_max_concurrent_uni_streams(max_uni_streams); - let receive_window = compute_recieve_window( - params.max_stake, - params.min_stake, - connection_table_l.peer_type, - params.stake, - ); + let receive_window = + compute_recieve_window(params.max_stake, params.min_stake, params.peer_type); if let Ok(receive_window) = receive_window { connection.set_receive_window(receive_window); @@ -334,9 +330,8 @@ fn handle_and_cache_new_connection( let remote_addr = connection.remote_address(); debug!( - "Peer type: {:?}, stake {}, total stake {}, max streams {} receive_window {:?} from peer {}", - connection_table_l.peer_type, - params.stake, + "Peer type {:?}, total stake {}, max streams {} receive_window {:?} from peer {}", + params.peer_type, params.total_stake, max_uni_streams.into_inner(), receive_window, @@ -348,12 +343,11 @@ fn handle_and_cache_new_connection( ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), remote_addr.port(), Some(connection.clone()), - params.stake, + params.peer_type, timing::timestamp(), params.max_connections_per_peer, ) { - let peer_type = connection_table_l.peer_type; drop(connection_table_l); tokio::spawn(handle_connection( connection, @@ -362,7 +356,6 @@ fn handle_and_cache_new_connection( connection_table, stream_exit, params.clone(), - peer_type, wait_for_chunk_timeout, stream_load_ema, stream_counter, @@ -448,13 +441,12 @@ fn compute_recieve_window( max_stake: u64, min_stake: u64, peer_type: ConnectionPeerType, - peer_stake: u64, ) -> Result { match peer_type { ConnectionPeerType::Unstaked => { VarInt::from_u64(PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO) } - ConnectionPeerType::Staked => { + ConnectionPeerType::Staked(peer_stake) => { let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake); VarInt::from_u64(PACKET_DATA_SIZE as u64 * ratio) @@ -490,10 +482,15 @@ async fn setup_connection( stats.clone(), ), |(pubkey, stake, total_stake, max_stake, min_stake)| { + let peer_type = if stake > 0 { + ConnectionPeerType::Staked(stake) + } else { + ConnectionPeerType::Unstaked + }; NewConnectionHandlerParams { packet_sender, remote_pubkey: Some(pubkey), - stake, + peer_type, total_stake, max_connections_per_peer, stats: stats.clone(), @@ -503,31 +500,54 @@ async fn setup_connection( }, ); - if params.stake > 0 { - let mut connection_table_l = staked_connection_table.lock().unwrap(); - if connection_table_l.total_size >= max_staked_connections { - let num_pruned = - connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, params.stake); - stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); - } + match params.peer_type { + ConnectionPeerType::Staked(stake) => { + let mut connection_table_l = staked_connection_table.lock().unwrap(); + if connection_table_l.total_size >= max_staked_connections { + let num_pruned = + connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, stake); + stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); + } - if connection_table_l.total_size < max_staked_connections { - if let Ok(()) = handle_and_cache_new_connection( - new_connection, - connection_table_l, - staked_connection_table.clone(), - ¶ms, - wait_for_chunk_timeout, - stream_load_ema.clone(), - ) { - stats - .connection_added_from_staked_peer - .fetch_add(1, Ordering::Relaxed); + if connection_table_l.total_size < max_staked_connections { + if let Ok(()) = handle_and_cache_new_connection( + new_connection, + connection_table_l, + staked_connection_table.clone(), + ¶ms, + wait_for_chunk_timeout, + stream_load_ema.clone(), + ) { + stats + .connection_added_from_staked_peer + .fetch_add(1, Ordering::Relaxed); + } + } else { + // If we couldn't prune a connection in the staked connection table, let's + // put this connection in the unstaked connection table. If needed, prune a + // connection from the unstaked connection table. + if let Ok(()) = prune_unstaked_connections_and_add_new_connection( + new_connection, + unstaked_connection_table.clone(), + max_unstaked_connections, + ¶ms, + wait_for_chunk_timeout, + stream_load_ema.clone(), + ) { + stats + .connection_added_from_staked_peer + .fetch_add(1, Ordering::Relaxed); + } else { + stats + .connection_add_failed_on_pruning + .fetch_add(1, Ordering::Relaxed); + stats + .connection_add_failed_staked_node + .fetch_add(1, Ordering::Relaxed); + } } - } else { - // If we couldn't prune a connection in the staked connection table, let's - // put this connection in the unstaked connection table. If needed, prune a - // connection from the unstaked connection table. + } + ConnectionPeerType::Unstaked => { if let Ok(()) = prune_unstaked_connections_and_add_new_connection( new_connection, unstaked_connection_table.clone(), @@ -537,32 +557,14 @@ async fn setup_connection( stream_load_ema.clone(), ) { stats - .connection_added_from_staked_peer + .connection_added_from_unstaked_peer .fetch_add(1, Ordering::Relaxed); } else { stats - .connection_add_failed_on_pruning - .fetch_add(1, Ordering::Relaxed); - stats - .connection_add_failed_staked_node + .connection_add_failed_unstaked_node .fetch_add(1, Ordering::Relaxed); } } - } else if let Ok(()) = prune_unstaked_connections_and_add_new_connection( - new_connection, - unstaked_connection_table.clone(), - max_unstaked_connections, - ¶ms, - wait_for_chunk_timeout, - stream_load_ema.clone(), - ) { - stats - .connection_added_from_unstaked_peer - .fetch_add(1, Ordering::Relaxed); - } else { - stats - .connection_add_failed_unstaked_node - .fetch_add(1, Ordering::Relaxed); } } Err(e) => { @@ -696,7 +698,6 @@ async fn packet_batch_sender( } } -#[allow(clippy::too_many_arguments)] async fn handle_connection( connection: Connection, remote_addr: SocketAddr, @@ -704,7 +705,6 @@ async fn handle_connection( connection_table: Arc>, stream_exit: Arc, params: NewConnectionHandlerParams, - peer_type: ConnectionPeerType, wait_for_chunk_timeout: Duration, stream_load_ema: Arc, stream_counter: Arc, @@ -720,22 +720,20 @@ async fn handle_connection( stats.total_connections.fetch_add(1, Ordering::Relaxed); let mut max_streams_per_throttling_interval = stream_throttle::max_streams_for_connection_in_throttling_duration( - peer_type, - params.stake, + params.peer_type, params.total_stake, stream_load_ema.clone(), ); - let staked_stream = matches!(peer_type, ConnectionPeerType::Staked) && params.stake > 0; while !stream_exit.load(Ordering::Relaxed) { if let Ok(stream) = tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await { match stream { Ok(mut stream) => { - if staked_stream { + if let ConnectionPeerType::Staked(peer_stake) = params.peer_type { max_streams_per_throttling_interval = stream_load_ema .available_load_capacity_in_throttling_duration( - params.stake, + peer_stake, params.total_stake, ); } @@ -748,7 +746,7 @@ async fn handle_connection( let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING)); continue; } - if staked_stream { + if params.peer_type.is_staked() { stream_load_ema.increment_load(); } stream_counter.stream_count.fetch_add(1, Ordering::Relaxed); @@ -782,7 +780,7 @@ async fn handle_connection( &remote_addr, &packet_sender, stats.clone(), - peer_type, + params.peer_type, ) .await { @@ -799,7 +797,7 @@ async fn handle_connection( } } stats.total_streams.fetch_sub(1, Ordering::Relaxed); - if staked_stream { + if params.peer_type.is_staked() { stream_load_ema.update_ema_if_needed(); } }); @@ -884,17 +882,14 @@ async fn handle_chunk( accum.meta.size = std::cmp::max(accum.meta.size, end_of_chunk); } - match peer_type { - ConnectionPeerType::Staked => { - stats - .total_staked_chunks_received - .fetch_add(1, Ordering::Relaxed); - } - ConnectionPeerType::Unstaked => { - stats - .total_unstaked_chunks_received - .fetch_add(1, Ordering::Relaxed); - } + if peer_type.is_staked() { + stats + .total_staked_chunks_received + .fetch_add(1, Ordering::Relaxed); + } else { + stats + .total_unstaked_chunks_received + .fetch_add(1, Ordering::Relaxed); } } else { // done receiving chunks @@ -943,7 +938,7 @@ async fn handle_chunk( #[derive(Debug)] struct ConnectionEntry { exit: Arc, - stake: u64, + peer_type: ConnectionPeerType, last_update: Arc, port: u16, connection: Option, @@ -953,7 +948,7 @@ struct ConnectionEntry { impl ConnectionEntry { fn new( exit: Arc, - stake: u64, + peer_type: ConnectionPeerType, last_update: Arc, port: u16, connection: Option, @@ -961,7 +956,7 @@ impl ConnectionEntry { ) -> Self { Self { exit, - stake, + peer_type, last_update, port, connection, @@ -972,6 +967,13 @@ impl ConnectionEntry { fn last_update(&self) -> u64 { self.last_update.load(Ordering::Relaxed) } + + fn stake(&self) -> u64 { + match self.peer_type { + ConnectionPeerType::Unstaked => 0, + ConnectionPeerType::Staked(stake) => stake, + } + } } impl Drop for ConnectionEntry { @@ -986,12 +988,6 @@ impl Drop for ConnectionEntry { } } -#[derive(Copy, Clone, Debug)] -pub enum ConnectionPeerType { - Unstaked, - Staked, -} - #[derive(Copy, Clone, Eq, Hash, PartialEq)] enum ConnectionTableKey { IP(IpAddr), @@ -1010,17 +1006,15 @@ impl ConnectionTableKey { struct ConnectionTable { table: IndexMap>, total_size: usize, - peer_type: ConnectionPeerType, } // Prune the connection which has the oldest update // Return number pruned impl ConnectionTable { - fn new(peer_type: ConnectionPeerType) -> Self { + fn new() -> Self { Self { table: IndexMap::default(), total_size: 0, - peer_type, } } @@ -1055,7 +1049,7 @@ impl ConnectionTable { }) .map(|index| { let connection = self.table[index].first(); - let stake = connection.map(|connection| connection.stake); + let stake = connection.map(|connection| connection.stake()); (index, stake) }) .take(sample_size) @@ -1073,7 +1067,7 @@ impl ConnectionTable { key: ConnectionTableKey, port: u16, connection: Option, - stake: u64, + peer_type: ConnectionPeerType, last_update: u64, max_connections_per_peer: usize, ) -> Option<( @@ -1090,7 +1084,7 @@ impl ConnectionTable { if has_connection_capacity { let exit = Arc::new(AtomicBool::new(false)); let last_update = Arc::new(AtomicU64::new(last_update)); - let stream_counter = if stake > 0 { + let stream_counter = if peer_type.is_staked() { connection_entry .first() .map(|entry| entry.stream_counter.clone()) @@ -1102,7 +1096,7 @@ impl ConnectionTable { }; connection_entry.push(ConnectionEntry::new( exit.clone(), - stake, + peer_type, last_update.clone(), port, connection, @@ -1738,7 +1732,7 @@ pub mod test { fn test_prune_table_with_ip() { use std::net::Ipv4Addr; solana_logger::setup(); - let mut table = ConnectionTable::new(ConnectionPeerType::Staked); + let mut table = ConnectionTable::new(); let mut num_entries = 5; let max_connections_per_peer = 10; let sockets: Vec<_> = (0..num_entries) @@ -1750,7 +1744,7 @@ pub mod test { ConnectionTableKey::IP(socket.ip()), socket.port(), None, - 0, + ConnectionPeerType::Unstaked, i as u64, max_connections_per_peer, ) @@ -1762,7 +1756,7 @@ pub mod test { ConnectionTableKey::IP(sockets[0].ip()), sockets[0].port(), None, - 0, + ConnectionPeerType::Unstaked, 5, max_connections_per_peer, ) @@ -1787,7 +1781,7 @@ pub mod test { #[test] fn test_prune_table_with_unique_pubkeys() { solana_logger::setup(); - let mut table = ConnectionTable::new(ConnectionPeerType::Staked); + let mut table = ConnectionTable::new(); // We should be able to add more entries than max_connections_per_peer, since each entry is // from a different peer pubkey. @@ -1801,7 +1795,7 @@ pub mod test { ConnectionTableKey::Pubkey(*pubkey), 0, None, - 0, + ConnectionPeerType::Unstaked, i as u64, max_connections_per_peer, ) @@ -1822,7 +1816,7 @@ pub mod test { #[test] fn test_prune_table_with_non_unique_pubkeys() { solana_logger::setup(); - let mut table = ConnectionTable::new(ConnectionPeerType::Staked); + let mut table = ConnectionTable::new(); let max_connections_per_peer = 10; let pubkey = Pubkey::new_unique(); @@ -1832,7 +1826,7 @@ pub mod test { ConnectionTableKey::Pubkey(pubkey), 0, None, - 0, + ConnectionPeerType::Unstaked, i as u64, max_connections_per_peer, ) @@ -1846,7 +1840,7 @@ pub mod test { ConnectionTableKey::Pubkey(pubkey), 0, None, - 0, + ConnectionPeerType::Unstaked, 10, max_connections_per_peer, ) @@ -1860,7 +1854,7 @@ pub mod test { ConnectionTableKey::Pubkey(pubkey2), 0, None, - 0, + ConnectionPeerType::Unstaked, 10, max_connections_per_peer, ) @@ -1882,7 +1876,7 @@ pub mod test { fn test_prune_table_random() { use std::net::Ipv4Addr; solana_logger::setup(); - let mut table = ConnectionTable::new(ConnectionPeerType::Staked); + let mut table = ConnectionTable::new(); let num_entries = 5; let max_connections_per_peer = 10; let sockets: Vec<_> = (0..num_entries) @@ -1894,7 +1888,7 @@ pub mod test { ConnectionTableKey::IP(socket.ip()), socket.port(), None, - (i + 1) as u64, + ConnectionPeerType::Staked((i + 1) as u64), i as u64, max_connections_per_peer, ) @@ -1919,7 +1913,7 @@ pub mod test { fn test_remove_connections() { use std::net::Ipv4Addr; solana_logger::setup(); - let mut table = ConnectionTable::new(ConnectionPeerType::Staked); + let mut table = ConnectionTable::new(); let num_ips = 5; let max_connections_per_peer = 10; let mut sockets: Vec<_> = (0..num_ips) @@ -1931,7 +1925,7 @@ pub mod test { ConnectionTableKey::IP(socket.ip()), socket.port(), None, - 0, + ConnectionPeerType::Unstaked, (i * 2) as u64, max_connections_per_peer, ) @@ -1942,7 +1936,7 @@ pub mod test { ConnectionTableKey::IP(socket.ip()), socket.port(), None, - 0, + ConnectionPeerType::Unstaked, (i * 2 + 1) as u64, max_connections_per_peer, ) @@ -1956,7 +1950,7 @@ pub mod test { ConnectionTableKey::IP(single_connection_addr.ip()), single_connection_addr.port(), None, - 0, + ConnectionPeerType::Unstaked, (num_ips * 2) as u64, max_connections_per_peer, ) @@ -1978,46 +1972,26 @@ pub mod test { fn test_max_allowed_uni_streams() { assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, 0), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10, 0), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, 0), + compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0), QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 10, 0), + compute_max_allowed_uni_streams(ConnectionPeerType::Staked(10), 0), QUIC_MIN_STAKED_CONCURRENT_STREAMS ); let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 1000, 10000), + compute_max_allowed_uni_streams(ConnectionPeerType::Staked(1000), 10000), QUIC_MAX_STAKED_CONCURRENT_STREAMS, ); assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 100, 10000), + compute_max_allowed_uni_streams(ConnectionPeerType::Staked(100), 10000), ((delta / (100_f64)) as usize + QUIC_MIN_STAKED_CONCURRENT_STREAMS) .min(QUIC_MAX_STAKED_CONCURRENT_STREAMS) ); assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, 10000), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 1000, 10000), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 1, 10000), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, 10000), + compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10000), QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); } diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index 596aa8882f2e0c..f7da3a17dcf178 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -156,24 +156,27 @@ impl StakedStreamLoadEMA { } pub(crate) fn max_streams_for_connection_in_throttling_duration( - connection_type: ConnectionPeerType, - stake: u64, + peer_type: ConnectionPeerType, total_stake: u64, ema_load: Arc, ) -> u64 { - if matches!(connection_type, ConnectionPeerType::Unstaked) || stake == 0 { - let max_num_connections = u64::try_from(MAX_UNSTAKED_CONNECTIONS).unwrap_or_else(|_| { - error!( - "Failed to convert maximum number of unstaked connections {} to u64.", - MAX_UNSTAKED_CONNECTIONS - ); - 500 - }); - Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT) - .apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) - .saturating_div(max_num_connections) - } else { - ema_load.available_load_capacity_in_throttling_duration(stake, total_stake) + match peer_type { + ConnectionPeerType::Unstaked => { + let max_num_connections = + u64::try_from(MAX_UNSTAKED_CONNECTIONS).unwrap_or_else(|_| { + error!( + "Failed to convert maximum number of unstaked connections {} to u64.", + MAX_UNSTAKED_CONNECTIONS + ); + 500 + }); + Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT) + .apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) + .saturating_div(max_num_connections) + } + ConnectionPeerType::Staked(stake) => { + ema_load.available_load_capacity_in_throttling_duration(stake, total_stake) + } } } @@ -213,13 +216,10 @@ pub mod test { use { super::*, crate::{ - nonblocking::{ - quic::ConnectionPeerType, - stream_throttle::{ - max_streams_for_connection_in_throttling_duration, - MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION, - STREAM_LOAD_EMA_INTERVAL_MS, - }, + nonblocking::stream_throttle::{ + max_streams_for_connection_in_throttling_duration, + MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION, + STREAM_LOAD_EMA_INTERVAL_MS, }, quic::StreamStats, }, @@ -236,30 +236,6 @@ pub mod test { assert_eq!( max_streams_for_connection_in_throttling_duration( ConnectionPeerType::Unstaked, - 0, - 10000, - load_ema.clone(), - ), - 10 - ); - - // 25K packets per ms * 20% / 500 max unstaked connections - assert_eq!( - max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Unstaked, - 10, - 10000, - load_ema.clone(), - ), - 10 - ); - - // If stake is 0, same limits as unstaked connections will apply. - // 25K packets per ms * 20% / 500 max unstaked connections - assert_eq!( - max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Staked, - 0, 10000, load_ema.clone(), ), @@ -283,8 +259,7 @@ pub mod test { // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 10K) * 15 / 10K = 30 assert_eq!( max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Staked, - 15, + ConnectionPeerType::Staked(15), 10000, load_ema.clone(), ), @@ -295,8 +270,7 @@ pub mod test { // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 10K) * 1K / 10K = 2K assert_eq!( max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Staked, - 1000, + ConnectionPeerType::Staked(1000), 10000, load_ema.clone(), ), @@ -308,8 +282,7 @@ pub mod test { // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 2.5K) * 15 / 10K = 120 assert_eq!( max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Staked, - 15, + ConnectionPeerType::Staked(15), 10000, load_ema.clone(), ), @@ -320,8 +293,7 @@ pub mod test { // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 2.5K) * 1K / 10K = 8000 assert_eq!( max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Staked, - 1000, + ConnectionPeerType::Staked(1000), 10000, load_ema.clone(), ), @@ -334,8 +306,7 @@ pub mod test { // function = ((10K * 10K) / 25% of 10K) * stake / total_stake assert_eq!( max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Staked, - 15, + ConnectionPeerType::Staked(15), 10000, load_ema.clone(), ), @@ -345,8 +316,7 @@ pub mod test { // function = ((10K * 10K) / 25% of 10K) * stake / total_stake assert_eq!( max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Staked, - 1000, + ConnectionPeerType::Staked(1000), 10000, load_ema.clone(), ), @@ -357,8 +327,7 @@ pub mod test { // MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION of streams. assert_eq!( max_streams_for_connection_in_throttling_duration( - ConnectionPeerType::Staked, - 1, + ConnectionPeerType::Staked(1), 40000, load_ema.clone(), ),