Skip to content

Commit

Permalink
fix(net): add concurrency param from config to `TransactionFetcherInf…
Browse files Browse the repository at this point in the history
…o` (paradigmxyz#11600)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
  • Loading branch information
emhane and mattsse authored Oct 9, 2024
1 parent f5d6844 commit fb8bd77
Showing 1 changed file with 46 additions and 33 deletions.
79 changes: 46 additions & 33 deletions crates/net/network/src/transactions/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,27 @@ impl TransactionFetcher {

/// Sets up transaction fetcher with config
pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
let mut tx_fetcher = Self::default();
let TransactionFetcherConfig {
max_inflight_requests,
max_capacity_cache_txns_pending_fetch,
..
} = *config;

tx_fetcher.info.soft_limit_byte_size_pooled_transactions_response =
config.soft_limit_byte_size_pooled_transactions_response;
tx_fetcher.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request =
config.soft_limit_byte_size_pooled_transactions_response_on_pack_request;
tx_fetcher
.metrics
.capacity_inflight_requests
.increment(tx_fetcher.info.max_inflight_requests as u64);
tx_fetcher.info.max_capacity_cache_txns_pending_fetch =
config.max_capacity_cache_txns_pending_fetch;
let info = config.clone().into();

tx_fetcher
let metrics = TransactionFetcherMetrics::default();
metrics.capacity_inflight_requests.increment(max_inflight_requests as u64);

Self {
active_peers: LruMap::new(max_inflight_requests),
hashes_pending_fetch: LruCache::new(max_capacity_cache_txns_pending_fetch),
hashes_fetch_inflight_and_pending_fetch: LruMap::new(
max_inflight_requests + max_capacity_cache_txns_pending_fetch,
),
info,
metrics,
..Default::default()
}
}

/// Removes the specified hashes from inflight tracking.
Expand Down Expand Up @@ -178,7 +185,7 @@ impl TransactionFetcher {
/// Returns `true` if peer is idle with respect to `self.inflight_requests`.
pub fn is_idle(&self, peer_id: &PeerId) -> bool {
let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true };
if *inflight_count < DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER {
if *inflight_count < self.info.max_inflight_requests_per_peer {
return true
}
false
Expand Down Expand Up @@ -653,12 +660,12 @@ impl TransactionFetcher {
return Some(new_announced_hashes)
};

if *inflight_count >= DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER {
if *inflight_count >= self.info.max_inflight_requests_per_peer {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=?*new_announced_hashes,
%conn_eth_version,
max_concurrent_tx_reqs_per_peer=DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
max_concurrent_tx_reqs_per_peer=self.info.max_inflight_requests_per_peer,
"limit for concurrent `GetPooledTransactions` requests per peer reached"
);
return Some(new_announced_hashes)
Expand Down Expand Up @@ -1288,10 +1295,12 @@ pub enum VerificationOutcome {
}

/// Tracks stats about the [`TransactionFetcher`].
#[derive(Debug)]
#[derive(Debug, Constructor)]
pub struct TransactionFetcherInfo {
/// Max inflight [`GetPooledTransactions`] requests.
pub max_inflight_requests: usize,
/// Max inflight [`GetPooledTransactions`] requests per peer.
pub max_inflight_requests_per_peer: u8,
/// Soft limit for the byte size of the expected [`PooledTransactions`] response, upon packing
/// a [`GetPooledTransactions`] request with hashes (by default less than 2 MiB worth of
/// transactions is requested).
Expand All @@ -1305,34 +1314,38 @@ pub struct TransactionFetcherInfo {
pub max_capacity_cache_txns_pending_fetch: u32,
}

impl TransactionFetcherInfo {
/// Creates a new max
pub const fn new(
max_inflight_requests: usize,
soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
soft_limit_byte_size_pooled_transactions_response: usize,
max_capacity_cache_txns_pending_fetch: u32,
) -> Self {
Self {
max_inflight_requests,
soft_limit_byte_size_pooled_transactions_response_on_pack_request,
soft_limit_byte_size_pooled_transactions_response,
max_capacity_cache_txns_pending_fetch,
}
}
}

impl Default for TransactionFetcherInfo {
fn default() -> Self {
Self::new(
DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize,
DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
)
}
}

impl From<TransactionFetcherConfig> for TransactionFetcherInfo {
fn from(config: TransactionFetcherConfig) -> Self {
let TransactionFetcherConfig {
max_inflight_requests,
max_inflight_requests_per_peer,
soft_limit_byte_size_pooled_transactions_response,
soft_limit_byte_size_pooled_transactions_response_on_pack_request,
max_capacity_cache_txns_pending_fetch,
} = config;

Self::new(
max_inflight_requests as usize,
max_inflight_requests_per_peer,
soft_limit_byte_size_pooled_transactions_response_on_pack_request,
soft_limit_byte_size_pooled_transactions_response,
max_capacity_cache_txns_pending_fetch,
)
}
}

#[derive(Debug, Default)]
struct TxFetcherSearchDurations {
find_idle_peer: Duration,
Expand Down

0 comments on commit fb8bd77

Please sign in to comment.