From fb8bd77df34fe462cc59efd63761037d9cd94cfd Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 9 Oct 2024 14:32:16 +0200 Subject: [PATCH] fix(net): add concurrency param from config to `TransactionFetcherInfo` (#11600) Co-authored-by: Matthias Seitz --- .../net/network/src/transactions/fetcher.rs | 79 +++++++++++-------- 1 file changed, 46 insertions(+), 33 deletions(-) diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index cf5c09045f28..9276219d593b 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -130,20 +130,27 @@ impl TransactionFetcher { /// Sets up transaction fetcher with config pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self { - let mut tx_fetcher = Self::default(); + let TransactionFetcherConfig { + max_inflight_requests, + max_capacity_cache_txns_pending_fetch, + .. + } = *config; - tx_fetcher.info.soft_limit_byte_size_pooled_transactions_response = - config.soft_limit_byte_size_pooled_transactions_response; - tx_fetcher.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request = - config.soft_limit_byte_size_pooled_transactions_response_on_pack_request; - tx_fetcher - .metrics - .capacity_inflight_requests - .increment(tx_fetcher.info.max_inflight_requests as u64); - tx_fetcher.info.max_capacity_cache_txns_pending_fetch = - config.max_capacity_cache_txns_pending_fetch; + let info = config.clone().into(); - tx_fetcher + let metrics = TransactionFetcherMetrics::default(); + metrics.capacity_inflight_requests.increment(max_inflight_requests as u64); + + Self { + active_peers: LruMap::new(max_inflight_requests), + hashes_pending_fetch: LruCache::new(max_capacity_cache_txns_pending_fetch), + hashes_fetch_inflight_and_pending_fetch: LruMap::new( + max_inflight_requests + max_capacity_cache_txns_pending_fetch, + ), + info, + metrics, + ..Default::default() + } } /// Removes the specified hashes from inflight tracking. @@ -178,7 +185,7 @@ impl TransactionFetcher { /// Returns `true` if peer is idle with respect to `self.inflight_requests`. pub fn is_idle(&self, peer_id: &PeerId) -> bool { let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true }; - if *inflight_count < DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER { + if *inflight_count < self.info.max_inflight_requests_per_peer { return true } false @@ -653,12 +660,12 @@ impl TransactionFetcher { return Some(new_announced_hashes) }; - if *inflight_count >= DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER { + if *inflight_count >= self.info.max_inflight_requests_per_peer { trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), hashes=?*new_announced_hashes, %conn_eth_version, - max_concurrent_tx_reqs_per_peer=DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER, + max_concurrent_tx_reqs_per_peer=self.info.max_inflight_requests_per_peer, "limit for concurrent `GetPooledTransactions` requests per peer reached" ); return Some(new_announced_hashes) @@ -1288,10 +1295,12 @@ pub enum VerificationOutcome { } /// Tracks stats about the [`TransactionFetcher`]. -#[derive(Debug)] +#[derive(Debug, Constructor)] pub struct TransactionFetcherInfo { /// Max inflight [`GetPooledTransactions`] requests. pub max_inflight_requests: usize, + /// Max inflight [`GetPooledTransactions`] requests per peer. + pub max_inflight_requests_per_peer: u8, /// Soft limit for the byte size of the expected [`PooledTransactions`] response, upon packing /// a [`GetPooledTransactions`] request with hashes (by default less than 2 MiB worth of /// transactions is requested). @@ -1305,27 +1314,11 @@ pub struct TransactionFetcherInfo { pub max_capacity_cache_txns_pending_fetch: u32, } -impl TransactionFetcherInfo { - /// Creates a new max - pub const fn new( - max_inflight_requests: usize, - soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize, - soft_limit_byte_size_pooled_transactions_response: usize, - max_capacity_cache_txns_pending_fetch: u32, - ) -> Self { - Self { - max_inflight_requests, - soft_limit_byte_size_pooled_transactions_response_on_pack_request, - soft_limit_byte_size_pooled_transactions_response, - max_capacity_cache_txns_pending_fetch, - } - } -} - impl Default for TransactionFetcherInfo { fn default() -> Self { Self::new( DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize, + DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER, DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH, @@ -1333,6 +1326,26 @@ impl Default for TransactionFetcherInfo { } } +impl From for TransactionFetcherInfo { + fn from(config: TransactionFetcherConfig) -> Self { + let TransactionFetcherConfig { + max_inflight_requests, + max_inflight_requests_per_peer, + soft_limit_byte_size_pooled_transactions_response, + soft_limit_byte_size_pooled_transactions_response_on_pack_request, + max_capacity_cache_txns_pending_fetch, + } = config; + + Self::new( + max_inflight_requests as usize, + max_inflight_requests_per_peer, + soft_limit_byte_size_pooled_transactions_response_on_pack_request, + soft_limit_byte_size_pooled_transactions_response, + max_capacity_cache_txns_pending_fetch, + ) + } +} + #[derive(Debug, Default)] struct TxFetcherSearchDurations { find_idle_peer: Duration,