diff --git a/Cargo.lock b/Cargo.lock index 9f53800906ff..3c0483ce495d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6298,6 +6298,7 @@ dependencies = [ "serde", "serde_json", "serial_test", + "smallvec", "tempfile", "thiserror", "tokio", diff --git a/crates/net/eth-wire/src/types/broadcast.rs b/crates/net/eth-wire/src/types/broadcast.rs index a43cd900b7da..5c856c43ba69 100644 --- a/crates/net/eth-wire/src/types/broadcast.rs +++ b/crates/net/eth-wire/src/types/broadcast.rs @@ -5,7 +5,7 @@ use alloy_rlp::{ Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper, }; -use derive_more::{Constructor, Deref, DerefMut, IntoIterator}; +use derive_more::{Constructor, Deref, DerefMut, From, IntoIterator}; use reth_codecs::derive_arbitrary; use reth_primitives::{ Block, Bytes, PooledTransactionsElement, TransactionSigned, TxHash, B256, U128, @@ -441,28 +441,27 @@ impl Decodable for NewPooledTransactionHashes68 { } } -/// Interface for handling mempool message data. Used in various filters in pipelines in -/// `TransactionsManager` and in queries to `TransactionPool`. -pub trait HandleMempoolData { - /// The announcement contains no entries. +/// Validation pass that checks for unique transaction hashes. +pub trait DedupPayload { + /// Value type in [`PartiallyValidData`] map. + type Value; + + /// The payload contains no entries. fn is_empty(&self) -> bool; /// Returns the number of entries. fn len(&self) -> usize; - /// Retain only entries for which the hash in the entry satisfies a given predicate, return - /// the rest. - fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool) -> Self; + /// Consumes self, returning an iterator over hashes in payload. + fn dedup(self) -> PartiallyValidData; } -/// Extension of [`HandleMempoolData`] interface, for mempool messages that are versioned. -pub trait HandleVersionedMempoolData { - /// Returns the announcement version, either [`Eth66`](EthVersion::Eth66) or - /// [`Eth68`](EthVersion::Eth68). - fn msg_version(&self) -> EthVersion; -} +/// Value in [`PartiallyValidData`] map obtained from an announcement. +pub type Eth68TxMetadata = Option<(u8, usize)>; + +impl DedupPayload for NewPooledTransactionHashes { + type Value = Eth68TxMetadata; -impl HandleMempoolData for NewPooledTransactionHashes { fn is_empty(&self) -> bool { self.is_empty() } @@ -471,21 +470,17 @@ impl HandleMempoolData for NewPooledTransactionHashes { self.len() } - fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool) -> Self { + fn dedup(self) -> PartiallyValidData { match self { - NewPooledTransactionHashes::Eth66(msg) => Self::Eth66(msg.retain_by_hash(f)), - NewPooledTransactionHashes::Eth68(msg) => Self::Eth68(msg.retain_by_hash(f)), + NewPooledTransactionHashes::Eth66(msg) => msg.dedup(), + NewPooledTransactionHashes::Eth68(msg) => msg.dedup(), } } } -impl HandleVersionedMempoolData for NewPooledTransactionHashes { - fn msg_version(&self) -> EthVersion { - self.version() - } -} +impl DedupPayload for NewPooledTransactionHashes68 { + type Value = Eth68TxMetadata; -impl HandleMempoolData for NewPooledTransactionHashes68 { fn is_empty(&self) -> bool { self.hashes.is_empty() } @@ -494,38 +489,24 @@ impl HandleMempoolData for NewPooledTransactionHashes68 { self.hashes.len() } - fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self { - let mut indices_to_remove = vec![]; - for (i, hash) in self.hashes.iter().enumerate() { - if !f(hash) { - indices_to_remove.push(i); - } - } + fn dedup(self) -> PartiallyValidData { + let Self { hashes, mut sizes, mut types } = self; + + let mut deduped_data = HashMap::with_capacity(hashes.len()); - let mut removed_hashes = Vec::with_capacity(indices_to_remove.len()); - let mut removed_types = Vec::with_capacity(indices_to_remove.len()); - let mut removed_sizes = Vec::with_capacity(indices_to_remove.len()); - - for index in indices_to_remove.into_iter().rev() { - let hash = self.hashes.remove(index); - removed_hashes.push(hash); - let ty = self.types.remove(index); - removed_types.push(ty); - let size = self.sizes.remove(index); - removed_sizes.push(size); + for hash in hashes.into_iter().rev() { + if let (Some(ty), Some(size)) = (types.pop(), sizes.pop()) { + deduped_data.insert(hash, Some((ty, size))); + } } - Self { hashes: removed_hashes, types: removed_types, sizes: removed_sizes } + PartiallyValidData::from_raw_data_eth68(deduped_data) } } -impl HandleVersionedMempoolData for NewPooledTransactionHashes68 { - fn msg_version(&self) -> EthVersion { - EthVersion::Eth68 - } -} +impl DedupPayload for NewPooledTransactionHashes66 { + type Value = Eth68TxMetadata; -impl HandleMempoolData for NewPooledTransactionHashes66 { fn is_empty(&self) -> bool { self.0.is_empty() } @@ -534,100 +515,163 @@ impl HandleMempoolData for NewPooledTransactionHashes66 { self.0.len() } - fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self { - let mut indices_to_remove = vec![]; - for (i, hash) in self.0.iter().enumerate() { - if !f(hash) { - indices_to_remove.push(i); - } - } + fn dedup(self) -> PartiallyValidData { + let Self(hashes) = self; - let mut removed_hashes = Vec::with_capacity(indices_to_remove.len()); + let mut deduped_data = HashMap::with_capacity(hashes.len()); - for index in indices_to_remove.into_iter().rev() { - let hash = self.0.remove(index); - removed_hashes.push(hash); + let noop_value: Eth68TxMetadata = None; + + for hash in hashes.into_iter().rev() { + deduped_data.insert(hash, noop_value); } - Self(removed_hashes) + PartiallyValidData::from_raw_data_eth66(deduped_data) } } -impl HandleVersionedMempoolData for NewPooledTransactionHashes66 { - fn msg_version(&self) -> EthVersion { - EthVersion::Eth66 +/// Interface for handling mempool message data. Used in various filters in pipelines in +/// `TransactionsManager` and in queries to `TransactionPool`. +pub trait HandleMempoolData { + /// The announcement contains no entries. + fn is_empty(&self) -> bool; + + /// Returns the number of entries. + fn len(&self) -> usize; + + /// Retain only entries for which the hash in the entry satisfies a given predicate. + fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool); +} + +/// Extension of [`HandleMempoolData`] interface, for mempool messages that are versioned. +pub trait HandleVersionedMempoolData { + /// Returns the announcement version, either [`Eth66`](EthVersion::Eth66) or + /// [`Eth68`](EthVersion::Eth68). + fn msg_version(&self) -> EthVersion; +} + +impl HandleMempoolData for Vec { + fn is_empty(&self) -> bool { + self.is_empty() + } + + fn len(&self) -> usize { + self.len() + } + + fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) { + self.retain(|tx| f(tx.hash())) } } -/// Announcement data that has been validated according to the configured network. For an eth68 -/// announcement, values of the map are `Some((u8, usize))` - the tx metadata. For an eth66 -/// announcement, values of the map are `None`. -#[derive(Debug, Deref, DerefMut, IntoIterator, Constructor)] -pub struct ValidAnnouncementData { +macro_rules! handle_mempool_data_map_impl { + ($data_ty:ty, $(<$generic:ident>)?) => { + impl$(<$generic>)? HandleMempoolData for $data_ty { + fn is_empty(&self) -> bool { + self.data.is_empty() + } + + fn len(&self) -> usize { + self.data.len() + } + + fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) { + self.data.retain(|hash, _| f(hash)); + } + } + }; +} + +/// Data that has passed an initial validation pass that is not specific to any mempool message +/// type. +#[derive(Debug, Deref, DerefMut, IntoIterator)] +pub struct PartiallyValidData { #[deref] #[deref_mut] #[into_iterator] - data: HashMap>, - version: EthVersion, + data: HashMap, + version: Option, } -impl ValidAnnouncementData { - /// Returns a new [`ValidAnnouncementData`] wrapper around validated - /// [`Eth68`](EthVersion::Eth68) announcement data. - pub fn new_eth68(data: HashMap>) -> Self { - Self::new(data, EthVersion::Eth68) +handle_mempool_data_map_impl!(PartiallyValidData, ); + +impl PartiallyValidData { + /// Wraps raw data. + pub fn from_raw_data(data: HashMap, version: Option) -> Self { + Self { data, version } } - /// Returns a new [`ValidAnnouncementData`] wrapper around validated - /// [`Eth68`](EthVersion::Eth68) announcement data. - pub fn new_eth66(data: HashMap>) -> Self { - Self::new(data, EthVersion::Eth66) + /// Wraps raw data with version [`EthVersion::Eth68`]. + pub fn from_raw_data_eth68(data: HashMap) -> Self { + Self::from_raw_data(data, Some(EthVersion::Eth68)) } - /// Returns a new [`ValidAnnouncementData`] with empty data from an [`Eth68`](EthVersion::Eth68) + /// Wraps raw data with version [`EthVersion::Eth66`]. + pub fn from_raw_data_eth66(data: HashMap) -> Self { + Self::from_raw_data(data, Some(EthVersion::Eth66)) + } + + /// Returns a new [`PartiallyValidData`] with empty data from an [`Eth68`](EthVersion::Eth68) /// announcement. pub fn empty_eth68() -> Self { - Self::new_eth68(HashMap::new()) + Self::from_raw_data_eth68(HashMap::new()) } - /// Returns a new [`ValidAnnouncementData`] with empty data from an [`Eth66`](EthVersion::Eth66) + /// Returns a new [`PartiallyValidData`] with empty data from an [`Eth66`](EthVersion::Eth66) /// announcement. pub fn empty_eth66() -> Self { - Self::new_eth66(HashMap::new()) + Self::from_raw_data_eth66(HashMap::new()) + } + + /// Returns the version of the message this data was received in if different versions of the + /// message exists, either [`Eth66`](EthVersion::Eth66) or [`Eth68`](EthVersion::Eth68). + pub fn msg_version(&self) -> Option { + self.version } /// Destructs returning the validated data. - pub fn into_data(self) -> HashMap> { + pub fn into_data(self) -> HashMap { self.data } +} +/// Partially validated data from an announcement or a +/// [`PooledTransactions`](crate::PooledTransactions) response. +#[derive(Debug, Deref, DerefMut, IntoIterator, From)] +#[from(PartiallyValidData)] +pub struct ValidAnnouncementData { + #[deref] + #[deref_mut] + #[into_iterator] + data: HashMap, + version: EthVersion, +} + +handle_mempool_data_map_impl!(ValidAnnouncementData,); + +impl ValidAnnouncementData { /// Destructs returning only the valid hashes and the announcement message version. Caution! If - /// this is [`Eth68`](EthVersion::Eth68)announcement data, the metadata must be cached - /// before call. + /// this is [`Eth68`](EthVersion::Eth68) announcement data, this drops the metadata. pub fn into_request_hashes(self) -> (RequestTxHashes, EthVersion) { let hashes = self.data.into_keys().collect::>(); (RequestTxHashes::new(hashes), self.version) } -} - -impl HandleMempoolData for ValidAnnouncementData { - fn is_empty(&self) -> bool { - self.data.is_empty() - } - fn len(&self) -> usize { - self.data.len() - } - - fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self { - let data = std::mem::take(&mut self.data); + /// Conversion from [`PartiallyValidData`] from an announcement. Note! [`PartiallyValidData`] + /// from an announcement, should have some [`EthVersion`]. Panics if [`PartiallyValidData`] has + /// version set to `None`. + pub fn from_partially_valid_data(data: PartiallyValidData) -> Self { + let PartiallyValidData { data, version } = data; - let (keep, rest) = data.into_iter().partition(|(hash, _)| f(hash)); + let version = version.expect("should have eth version for conversion"); - self.data = keep; + Self { data, version } + } - ValidAnnouncementData::new(rest, self.version) + /// Destructs returning the validated data. + pub fn into_data(self) -> HashMap { + self.data } } @@ -656,8 +700,8 @@ impl RequestTxHashes { } } -impl FromIterator<(TxHash, Option<(u8, usize)>)> for RequestTxHashes { - fn from_iter)>>(iter: I) -> Self { +impl FromIterator<(TxHash, Eth68TxMetadata)> for RequestTxHashes { + fn from_iter>(iter: I) -> Self { let mut hashes = Vec::with_capacity(32); for (hash, _) in iter { @@ -670,34 +714,6 @@ impl FromIterator<(TxHash, Option<(u8, usize)>)> for RequestTxHashes { } } -impl HandleMempoolData for Vec { - fn is_empty(&self) -> bool { - self.is_empty() - } - - fn len(&self) -> usize { - self.len() - } - - fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self { - let mut indices_to_remove = vec![]; - for (i, tx) in self.iter().enumerate() { - if !f(tx.hash()) { - indices_to_remove.push(i); - } - } - - let mut removed_txns = Vec::with_capacity(indices_to_remove.len()); - - for index in indices_to_remove.into_iter().rev() { - let hash = self.remove(index); - removed_txns.push(hash); - } - - removed_txns - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/net/eth-wire/src/types/transactions.rs b/crates/net/eth-wire/src/types/transactions.rs index fb6573dac33d..6288be35cda4 100644 --- a/crates/net/eth-wire/src/types/transactions.rs +++ b/crates/net/eth-wire/src/types/transactions.rs @@ -1,7 +1,7 @@ //! Implements the `GetPooledTransactions` and `PooledTransactions` message types. use alloy_rlp::{RlpDecodableWrapper, RlpEncodableWrapper}; -use derive_more::Deref; +use derive_more::{Constructor, Deref, IntoIterator}; use reth_codecs::derive_arbitrary; use reth_primitives::{PooledTransactionsElement, TransactionSigned, B256}; @@ -34,7 +34,18 @@ where /// corresponds to a requested hash. Hashes may need to be re-requested if the bodies are not /// included in the response. // #[derive_arbitrary(rlp, 10)] -#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default, Deref)] +#[derive( + Clone, + Debug, + PartialEq, + Eq, + RlpEncodableWrapper, + RlpDecodableWrapper, + Default, + IntoIterator, + Deref, + Constructor, +)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct PooledTransactions( /// The transaction bodies, each of which should correspond to a requested hash. @@ -54,6 +65,12 @@ impl From> for PooledTransactions { } } +impl FromIterator for PooledTransactions { + fn from_iter>(iter: I) -> Self { + PooledTransactions(iter.into_iter().collect()) + } +} + #[cfg(test)] mod tests { use crate::{message::RequestPair, GetPooledTransactions, PooledTransactions}; diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index cacd2e663a52..10cc4b354d5c 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -68,6 +68,7 @@ derive_more.workspace = true schnellru.workspace = true itertools.workspace = true tempfile = { workspace = true, optional = true } +smallvec.workspace = true [dev-dependencies] # reth diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index de9ec7e67035..fd1411e90844 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -148,6 +148,6 @@ pub use session::{ PendingSessionHandle, PendingSessionHandshakeError, SessionCommand, SessionEvent, SessionId, SessionLimits, SessionManager, SessionsConfig, }; -pub use transactions::{AnnouncementFilter, FilterAnnouncement, ValidateTx68}; +pub use transactions::{FilterAnnouncement, MessageFilter, ValidateTx68}; pub use reth_eth_wire::{DisconnectReason, HelloMessageWithProtocols}; diff --git a/crates/net/network/src/peers/mod.rs b/crates/net/network/src/peers/mod.rs index 8d0a86941d56..4fd7486be70a 100644 --- a/crates/net/network/src/peers/mod.rs +++ b/crates/net/network/src/peers/mod.rs @@ -9,10 +9,10 @@ pub use reputation::ReputationChangeWeights; pub use reth_network_api::PeerKind; /// Maximum number of available slots for outbound sessions. -pub(crate) const DEFAULT_MAX_COUNT_PEERS_OUTBOUND: u32 = 100; +pub const DEFAULT_MAX_COUNT_PEERS_OUTBOUND: u32 = 100; /// Maximum number of available slots for inbound sessions. -pub(crate) const DEFAULT_MAX_COUNT_PEERS_INBOUND: u32 = 30; +pub const DEFAULT_MAX_COUNT_PEERS_INBOUND: u32 = 30; /// Maximum number of available slots concurrent outgoing dials. -pub(crate) const DEFAULT_MAX_COUNT_CONCURRENT_DIALS: usize = 10; +pub const DEFAULT_MAX_COUNT_CONCURRENT_DIALS: usize = 10; diff --git a/crates/net/network/src/transactions/constants.rs b/crates/net/network/src/transactions/constants.rs index 47895ad8f353..71ee9f77358c 100644 --- a/crates/net/network/src/transactions/constants.rs +++ b/crates/net/network/src/transactions/constants.rs @@ -33,6 +33,7 @@ pub const SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST: usize = 25 /// . pub const SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE: usize = 2 * 1024 * 1024; +/// Constants used by [`TransactionsManager`](super::TransactionsManager). pub mod tx_manager { use super::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE; @@ -54,6 +55,7 @@ pub mod tx_manager { pub const DEFAULT_CAPACITY_CACHE_BAD_IMPORTS: usize = 100 * 1024; } +/// Constants used by [`TransactionFetcher`](super::TransactionFetcher). pub mod tx_fetcher { use crate::{ peers::{DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND}, @@ -100,7 +102,7 @@ pub mod tx_fetcher { /// once from each individual peer. /// /// Default is 1 peer. - const DEFAULT_MARGINAL_COUNT_FALLBACK_PEERS: u8 = 1; + pub const DEFAULT_MARGINAL_COUNT_FALLBACK_PEERS: u8 = 1; /* ==================== CONCURRENCY ==================== */ diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index f58c4a0fbab6..81f7880c7ce9 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -1,31 +1,37 @@ use crate::{ cache::{LruCache, LruMap}, message::PeerRequest, + transactions::{validation, PartiallyFilterMessage}, }; - -use derive_more::Constructor; +use derive_more::{Constructor, Deref}; use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt}; + use pin_project::pin_project; use reth_eth_wire::{ - EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData, - RequestTxHashes, ValidAnnouncementData, + DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData, + PartiallyValidData, RequestTxHashes, ValidAnnouncementData, }; use reth_interfaces::p2p::error::{RequestError, RequestResult}; +use reth_metrics::common::mpsc::{ + metered_unbounded_channel, UnboundedMeteredReceiver, UnboundedMeteredSender, +}; use reth_primitives::{PeerId, PooledTransactionsElement, TxHash}; use schnellru::{ByLength, Unlimited}; +use smallvec::{smallvec, SmallVec}; use std::{ collections::HashMap, num::NonZeroUsize, pin::Pin, - task::{Context, Poll}, + task::{ready, Context, Poll}, }; use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError}; use tracing::{debug, trace}; +use validation::FilterOutcome; use super::{ config::TransactionFetcherConfig, constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST}, - AnnouncementFilter, Peer, PooledTransactions, + MessageFilter, PeerMetadata, PooledTransactions, SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, }; @@ -35,27 +41,33 @@ use super::{ /// new requests on announced hashes. #[derive(Debug)] #[pin_project] -pub(crate) struct TransactionFetcher { +pub struct TransactionFetcher { /// All peers with to which a [`GetPooledTransactions`] request is inflight. - pub(super) active_peers: LruMap, + pub active_peers: LruMap, /// All currently active [`GetPooledTransactions`] requests. /// /// The set of hashes encompassed by these requests are a subset of all hashes in the fetcher. /// It's disjoint from the set of hashes which are awaiting an idle fallback peer in order to /// be fetched. #[pin] - pub(super) inflight_requests: FuturesUnordered, + pub inflight_requests: FuturesUnordered, /// Hashes that are awaiting an idle fallback peer so they can be fetched. /// /// This is a subset of all hashes in the fetcher, and is disjoint from the set of hashes for /// which a [`GetPooledTransactions`] request is inflight. - pub(super) hashes_pending_fetch: LruCache, + pub hashes_pending_fetch: LruCache, /// Tracks all hashes in the transaction fetcher. pub(super) hashes_fetch_inflight_and_pending_fetch: LruMap, - /// Filter for valid eth68 announcements. - pub(super) filter_valid_hashes: AnnouncementFilter, + /// Filter for valid announcement and response data. + pub(super) filter_valid_message: MessageFilter, /// Info on capacity of the transaction fetcher. - pub(super) info: TransactionFetcherInfo, + pub info: TransactionFetcherInfo, + /// [`FetchEvent`]s as a result of advancing inflight requests. This is an intermediary ยจ + /// storage, before [`TransactionsManager`](super::TransactionsManager) streams them. + #[pin] + pub fetch_events_head: UnboundedMeteredReceiver, + /// Handle for queueing [`FetchEvent`]s as a result of advancing inflight requests. + pub fetch_events_tail: UnboundedMeteredSender, } // === impl TransactionFetcher === @@ -100,7 +112,7 @@ impl TransactionFetcher { } /// Returns `true` if peer is idle with respect to `self.inflight_requests`. - pub(super) fn is_idle(&self, peer_id: &PeerId) -> bool { + pub fn is_idle(&self, peer_id: &PeerId) -> bool { let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true }; if *inflight_count < DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER { return true @@ -109,7 +121,7 @@ impl TransactionFetcher { } /// Returns any idle peer for the given hash. - pub(super) fn get_idle_peer_for( + pub fn get_idle_peer_for( &self, hash: TxHash, is_session_active: impl Fn(&PeerId) -> bool, @@ -131,7 +143,7 @@ impl TransactionFetcher { /// /// Loops through the hashes pending fetch in lru order until one is found with an idle /// fallback peer, or the budget passed as parameter is depleted, whatever happens first. - pub(super) fn find_any_idle_fallback_peer_for_any_pending_hash( + pub fn find_any_idle_fallback_peer_for_any_pending_hash( &mut self, hashes_to_request: &mut RequestTxHashes, is_session_active: impl Fn(&PeerId) -> bool, @@ -169,7 +181,7 @@ impl TransactionFetcher { /// a [`RequestTxHashes`] buffer as parameter for filling with hashes to request. /// /// Returns left over hashes. - pub(super) fn pack_request( + pub fn pack_request( &mut self, hashes_to_request: &mut RequestTxHashes, hashes_from_announcement: ValidAnnouncementData, @@ -190,7 +202,7 @@ impl TransactionFetcher { /// Loops through hashes passed as parameter and checks if a hash fits in the expected /// response. If no, it's added to surplus hashes. If yes, it's added to hashes to the request /// and expected response size is accumulated. - pub(super) fn pack_request_eth68( + pub fn pack_request_eth68( &mut self, hashes_to_request: &mut RequestTxHashes, hashes_from_announcement: impl HandleMempoolData @@ -258,7 +270,7 @@ impl TransactionFetcher { /// hashes to request. /// /// Returns left over hashes. - pub(super) fn pack_request_eth66( + pub fn pack_request_eth66( &mut self, hashes_to_request: &mut RequestTxHashes, hashes_from_announcement: ValidAnnouncementData, @@ -280,7 +292,7 @@ impl TransactionFetcher { } /// Tries to buffer hashes for retry. - pub(super) fn buffer_hashes_for_retry( + pub fn try_buffer_hashes_for_retry( &mut self, mut hashes: RequestTxHashes, peer_failed_to_serve: &PeerId, @@ -301,9 +313,9 @@ impl TransactionFetcher { /// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be /// passed as `fallback_peer` parameter! For re-buffering hashes on failed request, use - /// [`TransactionFetcher::buffer_hashes_for_retry`]. Hashes that have been re-requested + /// [`TransactionFetcher::try_buffer_hashes_for_retry`]. Hashes that have been re-requested /// [`DEFAULT_MAX_RETRIES`], are dropped. - pub(super) fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option) { + pub fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option) { let mut max_retried_and_evicted_hashes = vec![]; for hash in hashes.into_iter() { @@ -349,15 +361,14 @@ impl TransactionFetcher { /// /// Finds the first buffered hash with a fallback peer that is idle, if any. Fills the rest of /// the request by checking the transactions seen by the peer against the buffer. - pub(super) fn on_fetch_pending_hashes( + pub fn on_fetch_pending_hashes( &mut self, - peers: &HashMap, + peers: &HashMap, has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool, metrics_increment_egress_peer_channel_full: impl FnOnce(), ) { let init_capacity_req = approx_capacity_get_pooled_transactions_req_eth68(&self.info); let mut hashes_to_request = RequestTxHashes::with_capacity(init_capacity_req); - let is_session_active = |peer_id: &PeerId| peers.contains_key(peer_id); // budget to look for an idle peer before giving up @@ -420,7 +431,7 @@ impl TransactionFetcher { /// Filters out hashes that have been seen before. For hashes that have already been seen, the /// peer is added as fallback peer. - pub(super) fn filter_unseen_and_pending_hashes( + pub fn filter_unseen_and_pending_hashes( &mut self, new_announced_hashes: &mut ValidAnnouncementData, is_tx_bad_import: impl Fn(&TxHash) -> bool, @@ -504,7 +515,7 @@ impl TransactionFetcher { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), hash=%hash, - msg_version=%msg_version, + msg_version=?msg_version, client_version=%client_version, "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash" ); @@ -518,7 +529,7 @@ impl TransactionFetcher { trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), previously_unseen_hashes_count=previously_unseen_hashes_count, - msg_version=%msg_version, + msg_version=?msg_version, client_version=%client_version, "received previously unseen hashes in announcement from peer" ); @@ -526,7 +537,7 @@ impl TransactionFetcher { #[cfg(debug_assertions)] trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), - msg_version=%msg_version, + msg_version=?msg_version, client_version=%client_version, previously_unseen_hashes_len=?previously_unseen_hashes.len(), previously_unseen_hashes=?previously_unseen_hashes, @@ -541,10 +552,10 @@ impl TransactionFetcher { /// This filters all announced hashes that are already in flight, and requests the missing, /// while marking the given peer as an alternative peer for the hashes that are already in /// flight. - pub(super) fn request_transactions_from_peer( + pub fn request_transactions_from_peer( &mut self, new_announced_hashes: RequestTxHashes, - peer: &Peer, + peer: &PeerMetadata, metrics_increment_egress_peer_channel_full: impl FnOnce(), ) -> Option { let peer_id: PeerId = peer.request_tx.peer_id; @@ -648,7 +659,7 @@ impl TransactionFetcher { /// 3. Accumulate expected total response size. /// 4. Check if acc size and hashes count is at limit, if so stop looping. /// 5. Remove hashes to request from cache of hashes pending fetch. - pub(super) fn fill_request_from_hashes_pending_fetch( + pub fn fill_request_from_hashes_pending_fetch( &mut self, hashes_to_request: &mut RequestTxHashes, seen_hashes: &LruCache, @@ -716,7 +727,7 @@ impl TransactionFetcher { /// Returns `true` if [`TransactionFetcher`] has capacity to request pending hashes. Returns /// `false` if [`TransactionFetcher`] is operating close to full capacity. - pub(super) fn has_capacity_for_fetching_pending_hashes(&self) -> bool { + pub fn has_capacity_for_fetching_pending_hashes(&self) -> bool { let info = &self.info; self.has_capacity(info.max_inflight_requests) @@ -732,7 +743,7 @@ impl TransactionFetcher { /// Returns `Some(limit)` if [`TransactionFetcher`] and the /// [`TransactionPool`](reth_transaction_pool::TransactionPool) are operating close to full /// capacity. Returns `None`, unlimited, if they are not that busy. - pub(super) fn search_breadth_budget_find_idle_fallback_peer( + pub fn search_breadth_budget_find_idle_fallback_peer( &self, has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool, ) -> Option { @@ -771,7 +782,7 @@ impl TransactionFetcher { /// Returns `Some(limit)` if [`TransactionFetcher`] and the /// [`TransactionPool`](reth_transaction_pool::TransactionPool) are operating close to full /// capacity. Returns `None`, unlimited, if they are not that busy. - pub(super) fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer( + pub fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer( &self, has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool, ) -> Option { @@ -816,22 +827,19 @@ impl TransactionFetcher { approx_capacity_get_pooled_transactions_req_eth66() } } -} - -impl Stream for TransactionFetcher { - type Item = FetchEvent; - - /// Advances all inflight requests and returns the next event. - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.as_mut().project(); - let res = this.inflight_requests.poll_next_unpin(cx); - if let Poll::Ready(Some(response)) = res { - // update peer activity, requests for buffered hashes can only be made to idle - // fallback peers - let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response; + /// Processes a resolved [`GetPooledTransactions`] request. Queues the outcome as a + /// [`FetchEvent`], which will then be streamed by + /// [`TransactionsManager`](super::TransactionsManager). + pub fn on_resolved_get_pooled_transactions_request_fut( + &mut self, + response: GetPooledTxResponse, + ) -> FetchEvent { + // update peer activity, requests for buffered hashes can only be made to idle + // fallback peers + let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response; - debug_assert!( + debug_assert!( self.active_peers.get(&peer_id).is_some(), "`%peer_id` has been removed from `@active_peers` before inflight request(s) resolved, broken invariant `@active_peers` and `@inflight_requests`, `%peer_id`: {}, @@ -839,46 +847,91 @@ impl Stream for TransactionFetcher { peer_id, self ); - self.decrement_inflight_request_count_for(&peer_id); - - return match result { - Ok(Ok(transactions)) => { - // clear received hashes - let mut fetched = Vec::with_capacity(transactions.len()); - requested_hashes.retain(|requested_hash| { - if transactions.hashes().any(|hash| hash == requested_hash) { - // hash is now known, stop tracking - fetched.push(*requested_hash); - return false - } - true - }); - fetched.shrink_to_fit(); + self.decrement_inflight_request_count_for(&peer_id); - self.remove_hashes_from_transaction_fetcher(fetched); + match result { + Ok(Ok(transactions)) => { + let payload = UnverifiedPooledTransactions::new(transactions); - // buffer left over hashes - self.buffer_hashes_for_retry(requested_hashes, &peer_id); + let unverified_len = payload.len(); + let (verification_outcome, verified_payload) = + payload.verify(&requested_hashes, &peer_id); - Poll::Ready(Some(FetchEvent::TransactionsFetched { - peer_id, - transactions: transactions.0, - })) - } - Ok(Err(req_err)) => { - self.buffer_hashes_for_retry(requested_hashes, &peer_id); - Poll::Ready(Some(FetchEvent::FetchError { peer_id, error: req_err })) + if let VerificationOutcome::ReportPeer = verification_outcome { + // todo: report peer for sending hashes that weren't requested + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + unverified_len=unverified_len, + verified_payload_len=verified_payload.len(), + "received `PooledTransactions` response from peer with entries that didn't verify against request, filtered out transactions" + ); } - Err(_) => { - self.buffer_hashes_for_retry(requested_hashes, &peer_id); - // request channel closed/dropped - Poll::Ready(Some(FetchEvent::FetchError { - peer_id, - error: RequestError::ChannelClosed, - })) + + let unvalidated_payload_len = verified_payload.len(); + + // todo: report peer for sending invalid response + // + + let (validation_outcome, valid_payload) = + self.filter_valid_message.partially_filter_valid_entries(verified_payload); + + if let FilterOutcome::ReportPeer = validation_outcome { + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + unvalidated_payload_len=unvalidated_payload_len, + valid_payload_len=valid_payload.len(), + "received invalid `PooledTransactions` response from peer, filtered out invalid entries" + ); } + + // clear received hashes + let mut fetched = Vec::with_capacity(valid_payload.len()); + requested_hashes.retain(|requested_hash| { + if valid_payload.contains_key(requested_hash) { + // hash is now known, stop tracking + fetched.push(*requested_hash); + return false + } + true + }); + fetched.shrink_to_fit(); + self.remove_hashes_from_transaction_fetcher(fetched); + + // buffer left over hashes + self.try_buffer_hashes_for_retry(requested_hashes, &peer_id); + + let transactions = + valid_payload.into_data().into_values().collect::(); + + FetchEvent::TransactionsFetched { peer_id, transactions } + } + Ok(Err(req_err)) => { + self.try_buffer_hashes_for_retry(requested_hashes, &peer_id); + FetchEvent::FetchError { peer_id, error: req_err } + } + Err(_) => { + self.try_buffer_hashes_for_retry(requested_hashes, &peer_id); + // request channel closed/dropped + FetchEvent::FetchError { peer_id, error: RequestError::ChannelClosed } } } + } +} + +impl Stream for TransactionFetcher { + type Item = FetchEvent; + + /// Advances all inflight requests and returns the next event. + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // `FuturesUnordered` doesn't close when `None` is returned. so just return pending. + // + if self.inflight_requests.is_empty() { + return Poll::Pending + } + + if let Some(resp) = ready!(self.inflight_requests.poll_next_unpin(cx)) { + return Poll::Ready(Some(self.on_resolved_get_pooled_transactions_request_fut(resp))) + } Poll::Pending } @@ -886,6 +939,8 @@ impl Stream for TransactionFetcher { impl Default for TransactionFetcher { fn default() -> Self { + let (fetch_events_tail, fetch_events_head) = metered_unbounded_channel("net::tx"); + Self { active_peers: LruMap::new(DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS), inflight_requests: Default::default(), @@ -894,15 +949,17 @@ impl Default for TransactionFetcher { .expect("buffered cache limit should be non-zero"), ), hashes_fetch_inflight_and_pending_fetch: LruMap::new_unlimited(), - filter_valid_hashes: Default::default(), + filter_valid_message: Default::default(), info: TransactionFetcherInfo::default(), + fetch_events_head, + fetch_events_tail, } } } /// Metadata of a transaction hash that is yet to be fetched. #[derive(Debug, Constructor)] -pub(super) struct TxFetchMetadata { +pub struct TxFetchMetadata { /// The number of times a request attempt has been made for the hash. retries: u8, /// Peers that have announced the hash, but to which a request attempt has not yet been made. @@ -915,10 +972,15 @@ pub(super) struct TxFetchMetadata { } impl TxFetchMetadata { + /// Returns a mutable reference to the fallback peers cache for this transaction hash. pub fn fallback_peers_mut(&mut self) -> &mut LruCache { &mut self.fallback_peers } + /// Returns the size of the transaction, if its hash has been received in any + /// [`Eth68`](reth_eth_wire::EthVersion::Eth68) announcement. If the transaction hash has only + /// been seen in [`Eth66`](reth_eth_wire::EthVersion::Eth66) announcements so far, this will + /// return `None`. pub fn tx_encoded_len(&self) -> Option { self.tx_encoded_length } @@ -926,13 +988,13 @@ impl TxFetchMetadata { /// Represents possible events from fetching transactions. #[derive(Debug)] -pub(crate) enum FetchEvent { +pub enum FetchEvent { /// Triggered when transactions are successfully fetched. TransactionsFetched { /// The ID of the peer from which transactions were fetched. peer_id: PeerId, /// The transactions that were fetched, if available. - transactions: Vec, + transactions: PooledTransactions, }, /// Triggered when there is an error in fetching transactions. FetchError { @@ -943,15 +1005,19 @@ pub(crate) enum FetchEvent { }, } -/// An inflight request for [`PooledTransactions`] from a peer -pub(super) struct GetPooledTxRequest { +/// An inflight request for [`PooledTransactions`] from a peer. +#[derive(Debug)] +pub struct GetPooledTxRequest { peer_id: PeerId, /// Transaction hashes that were requested, for cleanup purposes requested_hashes: RequestTxHashes, response: oneshot::Receiver>, } -pub(super) struct GetPooledTxResponse { +/// Upon reception of a response, a [`GetPooledTxRequest`] is deconstructed to form a +/// [`GetPooledTxResponse`]. +#[derive(Debug)] +pub struct GetPooledTxResponse { peer_id: PeerId, /// Transaction hashes that were requested, for cleanup purposes, since peer may only return a /// subset of requested hashes. @@ -959,9 +1025,12 @@ pub(super) struct GetPooledTxResponse { result: Result, RecvError>, } +/// Stores the response receiver made by sending a [`GetPooledTransactions`] request to a peer's +/// session. #[must_use = "futures do nothing unless polled"] #[pin_project::pin_project] -pub(super) struct GetPooledTxRequestFut { +#[derive(Debug)] +pub struct GetPooledTxRequestFut { #[pin] inner: Option, } @@ -996,11 +1065,100 @@ impl Future for GetPooledTxRequestFut { } } +/// Wrapper of unverified [`PooledTransactions`]. +#[derive(Debug, Constructor, Deref)] +pub struct UnverifiedPooledTransactions { + txns: PooledTransactions, +} + +/// [`PooledTransactions`] that have been successfully verified. +#[derive(Debug, Constructor)] +pub struct VerifiedPooledTransactions { + txns: PooledTransactions, +} + +impl DedupPayload for VerifiedPooledTransactions { + type Value = PooledTransactionsElement; + + fn is_empty(&self) -> bool { + self.txns.is_empty() + } + + fn len(&self) -> usize { + self.txns.len() + } + + fn dedup(self) -> PartiallyValidData { + let Self { txns } = self; + let unique_fetched = txns + .into_iter() + .map(|tx| (*tx.hash(), tx)) + .collect::>(); + + PartiallyValidData::from_raw_data(unique_fetched, None) + } +} + +trait VerifyPooledTransactionsResponse { + fn verify( + self, + requested_hashes: &[TxHash], + peer_id: &PeerId, + ) -> (VerificationOutcome, VerifiedPooledTransactions); +} + +impl VerifyPooledTransactionsResponse for UnverifiedPooledTransactions { + fn verify( + self, + requested_hashes: &[TxHash], + peer_id: &PeerId, + ) -> (VerificationOutcome, VerifiedPooledTransactions) { + let mut verification_outcome = VerificationOutcome::Ok; + + let Self { mut txns } = self; + + #[cfg(debug_assertions)] + let mut tx_hashes_not_requested: SmallVec<[TxHash; 16]> = smallvec!(); + + txns.0.retain(|tx| { + if !requested_hashes.contains(tx.hash()) { + verification_outcome = VerificationOutcome::ReportPeer; + + #[cfg(debug_assertions)] + tx_hashes_not_requested.push(*tx.hash()); + + return false + } + true + }); + + #[cfg(debug_assertions)] + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + tx_hashes_not_requested=?tx_hashes_not_requested, + "transactions in `PooledTransactions` response from peer were not requested" + ); + + (verification_outcome, VerifiedPooledTransactions::new(txns)) + } +} + +/// Outcome from verifying a [`PooledTransactions`] response. Signals to caller whether to penalize +/// the sender of the response or not. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum VerificationOutcome { + /// Peer behaves appropriately. + Ok, + /// A penalty should be flagged for the peer. Peer sent a response with unacceptably + /// invalid entries. + ReportPeer, +} + /// Tracks stats about the [`TransactionFetcher`]. #[derive(Debug)] pub struct TransactionFetcherInfo { /// Currently active outgoing [`GetPooledTransactions`] requests. - pub(super) max_inflight_requests: usize, + pub max_inflight_requests: usize, /// Soft limit for the byte size of the expected /// [`PooledTransactions`] response on packing a /// [`GetPooledTransactions`] request with hashes. @@ -1008,10 +1166,11 @@ pub struct TransactionFetcherInfo { /// Soft limit for the byte size of a [`PooledTransactions`] /// response on assembling a [`GetPooledTransactions`] /// request. Spec'd at 2 MiB. - pub(super) soft_limit_byte_size_pooled_transactions_response: usize, + pub soft_limit_byte_size_pooled_transactions_response: usize, } impl TransactionFetcherInfo { + /// Creates a new max pub fn new( max_inflight_transaction_requests: usize, soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize, @@ -1058,22 +1217,8 @@ mod test { self.0.len() } - fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self { - let mut indices_to_remove = vec![]; - for (i, (hash, _)) in self.0.iter().enumerate() { - if !f(hash) { - indices_to_remove.push(i); - } - } - - let mut removed_hashes = Vec::with_capacity(indices_to_remove.len()); - - for index in indices_to_remove.into_iter().rev() { - let entry = self.0.remove(index); - removed_hashes.push(entry); - } - - TestValidAnnouncementData(removed_hashes) + fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) { + self.0.retain(|(hash, _)| f(hash)) } } @@ -1111,6 +1256,7 @@ mod test { let expected_surplus_hashes = [eth68_hashes[1], eth68_hashes[3], eth68_hashes[4]]; let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3); + let valid_announcement_data = TestValidAnnouncementData( eth68_hashes .into_iter() diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 97bcb1777166..4b0bc01558a9 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -69,10 +69,13 @@ use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError}; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; use tracing::{debug, trace}; -mod config; -mod constants; -mod fetcher; -mod validation; +/// Aggregation on configurable parameters for [`TransactionsManager`]. +pub mod config; +/// Default and spec'd bounds. +pub mod constants; +/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`]. +pub mod fetcher; +pub mod validation; pub use config::{TransactionFetcherConfig, TransactionsManagerConfig}; use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE; @@ -239,7 +242,7 @@ pub struct TransactionsManager { /// Bad imports. bad_imports: LruCache, /// All the connected peers. - peers: HashMap, + peers: HashMap, /// Send half for the command channel. /// /// This is kept so that a new [TransactionsHandle] can be created at any time. @@ -630,7 +633,7 @@ where fn on_new_pooled_transaction_hashes( &mut self, peer_id: PeerId, - mut msg: NewPooledTransactionHashes, + msg: NewPooledTransactionHashes, ) { // If the node is initially syncing, ignore transactions if self.network.is_initially_syncing() { @@ -679,58 +682,66 @@ where self.report_already_seen(peer_id); } - // 1. filter out known hashes + // 1. filter out spam + let (validation_outcome, mut partially_valid_msg) = + self.transaction_fetcher.filter_valid_message.partially_filter_valid_entries(msg); + + if let FilterOutcome::ReportPeer = validation_outcome { + self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement); + } + + // 2. filter out known hashes // - // known txns have already been successfully fetched. + // known txns have already been successfully fetched or received over gossip. // // most hashes will be filtered out here since this the mempool protocol is a gossip // protocol, healthy peers will send many of the same hashes. // - let already_known_by_pool = self.pool.retain_unknown(&mut msg); - if let Some(intersection) = already_known_by_pool { - self.metrics.occurrences_hashes_already_in_pool.increment(intersection.len() as u64); + let hashes_count_pre_pool_filter = partially_valid_msg.len(); + self.pool.retain_unknown(&mut partially_valid_msg); + if hashes_count_pre_pool_filter > partially_valid_msg.len() { + let already_known_hashes_count = + hashes_count_pre_pool_filter - partially_valid_msg.len(); + self.metrics + .occurrences_hashes_already_in_pool + .increment(already_known_hashes_count as u64); } - if msg.is_empty() { + if partially_valid_msg.is_empty() { // nothing to request return } - // 2. filter out invalid entries + // 3. filter out invalid entries (spam) // // validates messages with respect to the given network, e.g. allowed tx types // - let mut valid_announcement_data = match msg { - NewPooledTransactionHashes::Eth68(eth68_msg) => { - // validate eth68 announcement data - let (outcome, valid_data) = - self.transaction_fetcher.filter_valid_hashes.filter_valid_entries_68(eth68_msg); - - if let FilterOutcome::ReportPeer = outcome { - self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement); - } - - valid_data - } - NewPooledTransactionHashes::Eth66(eth66_msg) => { - // validate eth66 announcement data - let (outcome, valid_data) = - self.transaction_fetcher.filter_valid_hashes.filter_valid_entries_66(eth66_msg); - - if let FilterOutcome::ReportPeer = outcome { - self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement); - } - - valid_data - } + let (validation_outcome, mut valid_announcement_data) = if partially_valid_msg + .msg_version() + .expect("partially valid announcement should have version") + .is_eth68() + { + // validate eth68 announcement data + self.transaction_fetcher + .filter_valid_message + .filter_valid_entries_68(partially_valid_msg) + } else { + // validate eth66 announcement data + self.transaction_fetcher + .filter_valid_message + .filter_valid_entries_66(partially_valid_msg) }; + if let FilterOutcome::ReportPeer = validation_outcome { + self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement); + } + if valid_announcement_data.is_empty() { // no valid announcement data return } - // 3. filter out already seen unknown hashes + // 4. filter out already seen unknown hashes // // seen hashes are already in the tx fetcher, pending fetch. // @@ -849,7 +860,7 @@ where .into_iter() .map(PooledTransactionsElement::try_from_broadcast) .filter_map(Result::ok) - .collect::>(); + .collect::(); // mark the transactions as received self.transaction_fetcher.remove_hashes_from_transaction_fetcher( @@ -923,7 +934,7 @@ where peer_id, client_version, messages, version, .. } => { // Insert a new peer into the peerset. - let peer = Peer::new(messages, version, client_version); + let peer = PeerMetadata::new(messages, version, client_version); let peer = match self.peers.entry(peer_id) { Entry::Occupied(mut entry) => { entry.insert(peer); @@ -964,7 +975,7 @@ where fn import_transactions( &mut self, peer_id: PeerId, - mut transactions: Vec, + transactions: PooledTransactions, source: TransactionSource, ) { // If the node is pipeline syncing, ignore transactions @@ -975,6 +986,8 @@ where return } + let mut transactions = transactions.0; + let Some(peer) = self.peers.get_mut(&peer_id) else { return }; // track that the peer knows these transaction, but only if this is a new broadcast. @@ -988,12 +1001,14 @@ where } } - // 1. filter out already imported txns - let already_known_by_pool = self.pool.retain_unknown(&mut transactions); - if let Some(intersection) = already_known_by_pool { + // 1. filter out txns already inserted into pool + let txns_count_pre_pool_filter = transactions.len(); + self.pool.retain_unknown(&mut transactions); + if txns_count_pre_pool_filter > transactions.len() { + let already_known_txns_count = txns_count_pre_pool_filter - transactions.len(); self.metrics .occurrences_transactions_already_in_pool - .increment(intersection.len() as u64); + .increment(already_known_txns_count as u64); } // tracks the quality of the given transactions @@ -1485,13 +1500,13 @@ impl TransactionSource { } } -/// Tracks a single peer +/// Tracks a single peer in the context of [`TransactionsManager`]. #[derive(Debug)] -struct Peer { +pub struct PeerMetadata { /// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in /// the sense that transactions are preemptively marked as seen by peer when they are sent to /// the peer. - seen_transactions: LruCache, + seen_transactions: LruCache, /// A communication channel directly to the peer's session task. request_tx: PeerRequestSender, /// negotiated version of the session. @@ -1500,7 +1515,8 @@ struct Peer { client_version: Arc, } -impl Peer { +impl PeerMetadata { + /// Returns a new instance of [`PeerMetadata`]. fn new(request_tx: PeerRequestSender, version: EthVersion, client_version: Arc) -> Self { Self { seen_transactions: LruCache::new( @@ -1570,14 +1586,15 @@ pub enum NetworkTransactionEvent { /// Tracks stats about the [`TransactionsManager`]. #[derive(Debug)] -struct PendingPoolImportsInfo { - /// Number of transactions that are currently being imported into pool. +pub struct PendingPoolImportsInfo { + /// Number of transactions about to be inserted into the pool. pending_pool_imports: Arc, /// Max number of transactions allowed to be imported concurrently. max_pending_pool_imports: usize, } impl PendingPoolImportsInfo { + /// Returns a new [`PendingPoolImportsInfo`]. pub fn new(max_pending_pool_imports: usize) -> Self { Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports } } @@ -1642,11 +1659,15 @@ mod tests { pub(super) fn new_mock_session( peer_id: PeerId, version: EthVersion, - ) -> (Peer, mpsc::Receiver) { + ) -> (PeerMetadata, mpsc::Receiver) { let (to_mock_session_tx, to_mock_session_rx) = mpsc::channel(1); ( - Peer::new(PeerRequestSender::new(peer_id, to_mock_session_tx), version, Arc::from("")), + PeerMetadata::new( + PeerRequestSender::new(peer_id, to_mock_session_tx), + version, + Arc::from(""), + ), to_mock_session_rx, ) } diff --git a/crates/net/network/src/transactions/validation.rs b/crates/net/network/src/transactions/validation.rs index 807f74731089..585bb3b73e0f 100644 --- a/crates/net/network/src/transactions/validation.rs +++ b/crates/net/network/src/transactions/validation.rs @@ -1,12 +1,12 @@ -//! Validation of [`NewPooledTransactionHashes66`] and [`NewPooledTransactionHashes68`] +//! Validation of [`NewPooledTransactionHashes66`](reth_eth_wire::NewPooledTransactionHashes66) +//! and [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68) //! announcements. Validation and filtering of announcements is network dependent. -use std::{collections::HashMap, mem}; +use std::{fmt, mem}; use derive_more::{Deref, DerefMut, Display}; -use itertools::izip; use reth_eth_wire::{ - NewPooledTransactionHashes66, NewPooledTransactionHashes68, ValidAnnouncementData, + DedupPayload, Eth68TxMetadata, HandleMempoolData, PartiallyValidData, ValidAnnouncementData, MAX_MESSAGE_SIZE, }; use reth_primitives::{Signature, TxHash, TxType}; @@ -15,12 +15,14 @@ use tracing::{debug, trace}; /// The size of a decoded signature in bytes. pub const SIGNATURE_DECODED_SIZE_BYTES: usize = mem::size_of::(); -/// Interface for validating a `(ty, size, hash)` tuple from a [`NewPooledTransactionHashes68`]. +/// Interface for validating a `(ty, size, hash)` tuple from a +/// [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68).. pub trait ValidateTx68 { - /// Validates a [`NewPooledTransactionHashes68`] entry. Returns [`ValidationOutcome`] which - /// signals to the caller wether to fetch the transaction or wether to drop it, and wether the - /// sender of the announcement should be penalized. - fn should_fetch(&self, ty: u8, hash: TxHash, size: usize) -> ValidationOutcome; + /// Validates a [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68) + /// entry. Returns [`ValidationOutcome`] which signals to the caller wether to fetch the + /// transaction or wether to drop it, and wether the sender of the announcement should be + /// penalized. + fn should_fetch(&self, ty: u8, hash: &TxHash, size: usize) -> ValidationOutcome; /// Returns the reasonable maximum encoded transaction length configured for this network, if /// any. This property is not spec'ed out but can be inferred by looking how much data can be @@ -42,9 +44,9 @@ pub trait ValidateTx68 { fn strict_min_encoded_tx_length(&self, ty: TxType) -> Option; } -/// Outcomes from validating a `(ty, hash, size)` entry from a [`NewPooledTransactionHashes68`]. -/// Signals to the caller how to deal with an announcement entry and the peer who sent the -/// announcement. +/// Outcomes from validating a `(ty, hash, size)` entry from a +/// [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68). Signals to the +/// caller how to deal with an announcement entry and the peer who sent the announcement. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ValidationOutcome { /// Tells the caller to keep the entry in the announcement for fetch. @@ -56,30 +58,68 @@ pub enum ValidationOutcome { ReportPeer, } -/// Filters valid entries in [`NewPooledTransactionHashes68`] and [`NewPooledTransactionHashes66`] -/// in place, and flags misbehaving peers. +/// Generic filter for announcements and responses. Checks for empty message and unique hashes/ +/// transactions in message. +pub trait PartiallyFilterMessage { + /// Removes duplicate entries from a mempool message. Returns [`FilterOutcome::ReportPeer`] if + /// the caller should penalize the peer, otherwise [`FilterOutcome::Ok`]. + fn partially_filter_valid_entries( + &self, + msg: impl DedupPayload + fmt::Debug, + ) -> (FilterOutcome, PartiallyValidData) { + // 1. checks if the announcement is empty + if msg.is_empty() { + debug!(target: "net::tx", + msg=?msg, + "empty payload" + ); + return (FilterOutcome::ReportPeer, PartiallyValidData::empty_eth66()) + } + + // 2. checks if announcement is spam packed with duplicate hashes + let original_len = msg.len(); + let partially_valid_data = msg.dedup(); + + ( + if partially_valid_data.len() != original_len { + FilterOutcome::ReportPeer + } else { + FilterOutcome::Ok + }, + partially_valid_data, + ) + } +} + +/// Filters valid entries in +/// [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68) and +/// [`NewPooledTransactionHashes66`](reth_eth_wire::NewPooledTransactionHashes66) in place, and +/// flags misbehaving peers. pub trait FilterAnnouncement { - /// Removes invalid entries from a [`NewPooledTransactionHashes68`] announcement. Returns - /// [`FilterOutcome::ReportPeer`] if the caller should penalize the peer, otherwise + /// Removes invalid entries from a + /// [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68) announcement. + /// Returns [`FilterOutcome::ReportPeer`] if the caller should penalize the peer, otherwise /// [`FilterOutcome::Ok`]. fn filter_valid_entries_68( &self, - msg: NewPooledTransactionHashes68, + msg: PartiallyValidData, ) -> (FilterOutcome, ValidAnnouncementData) where Self: ValidateTx68; - /// Removes invalid entries from a [`NewPooledTransactionHashes66`] announcement. Returns - /// [`FilterOutcome::ReportPeer`] if the caller should penalize the peer, otherwise + /// Removes invalid entries from a + /// [`NewPooledTransactionHashes66`](reth_eth_wire::NewPooledTransactionHashes66) announcement. + /// Returns [`FilterOutcome::ReportPeer`] if the caller should penalize the peer, otherwise /// [`FilterOutcome::Ok`]. fn filter_valid_entries_66( &self, - msg: NewPooledTransactionHashes66, + msg: PartiallyValidData, ) -> (FilterOutcome, ValidAnnouncementData); } -/// Outcome from filtering [`NewPooledTransactionHashes68`]. Signals to caller whether to penalize -/// the sender of the announcement or not. +/// Outcome from filtering +/// [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68). Signals to caller +/// whether to penalize the sender of the announcement or not. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum FilterOutcome { /// Peer behaves appropriately. @@ -90,18 +130,20 @@ pub enum FilterOutcome { } /// Wrapper for types that implement [`FilterAnnouncement`]. The definition of a valid -/// announcement is network dependent. For example, different networks support different [`TxType`] -/// s, and different [`TxType`]s have different transaction size constraints. Defaults to -/// [`EthAnnouncementFilter`]. +/// announcement is network dependent. For example, different networks support different +/// [`TxType`]s, and different [`TxType`]s have different transaction size constraints. Defaults to +/// [`EthMessageFilter`]. #[derive(Debug, Default, Deref, DerefMut)] -pub struct AnnouncementFilter(N); +pub struct MessageFilter(N); /// Filter for announcements containing EIP [`TxType`]s. #[derive(Debug, Display, Default)] -pub struct EthAnnouncementFilter; +pub struct EthMessageFilter; + +impl PartiallyFilterMessage for EthMessageFilter {} -impl ValidateTx68 for EthAnnouncementFilter { - fn should_fetch(&self, ty: u8, hash: TxHash, size: usize) -> ValidationOutcome { +impl ValidateTx68 for EthMessageFilter { + fn should_fetch(&self, ty: u8, hash: &TxHash, size: usize) -> ValidationOutcome { // // 1. checks if tx type is valid value for this network // @@ -204,140 +246,74 @@ impl ValidateTx68 for EthAnnouncementFilter { } } -impl FilterAnnouncement for EthAnnouncementFilter { +impl FilterAnnouncement for EthMessageFilter { fn filter_valid_entries_68( &self, - msg: NewPooledTransactionHashes68, + mut msg: PartiallyValidData, ) -> (FilterOutcome, ValidAnnouncementData) where Self: ValidateTx68, { trace!(target: "net::tx::validation", - types=?msg.types, - sizes=?msg.sizes, - hashes=?msg.hashes, + msg=?*msg, network=%Self, "validating eth68 announcement data.." ); - let NewPooledTransactionHashes68 { mut hashes, mut types, mut sizes } = msg; - - debug_assert!( - hashes.len() == types.len() && hashes.len() == sizes.len(), "`%hashes`, `%types` and `%sizes` should all be the same length, decoding of `NewPooledTransactionHashes68` should handle this, -`%hashes`: {hashes:?}, -`%types`: {types:?}, -`%sizes: {sizes:?}`" - ); - - // - // 1. checks if the announcement is empty - // - if hashes.is_empty() { - debug!(target: "net::tx", - network=%Self, - "empty eth68 announcement" - ); - return (FilterOutcome::ReportPeer, ValidAnnouncementData::empty_eth68()) - } - let mut should_report_peer = false; - let mut indices_to_remove = vec![]; - // - // 2. checks if eth68 announcement metadata is valid + // checks if eth68 announcement metadata is valid // // transactions that are filtered out here, may not be spam, rather from benevolent peers // that are unknowingly sending announcements with invalid data. // - for (i, (&ty, &hash, &size)) in izip!(&types, &hashes, &sizes).enumerate() { - match self.should_fetch(ty, hash, size) { - ValidationOutcome::Fetch => (), - ValidationOutcome::Ignore => indices_to_remove.push(i), + msg.retain(|hash, metadata| { + debug_assert!( + metadata.is_some(), + "metadata should exist for `%hash` in eth68 announcement passed to `%filter_valid_entries_68`, +`%hash`: {hash}" + ); + + let Some((ty, size)) = metadata else { + return false + }; + + match self.should_fetch(*ty, hash, *size) { + ValidationOutcome::Fetch => true, + ValidationOutcome::Ignore => false, ValidationOutcome::ReportPeer => { - indices_to_remove.push(i); should_report_peer = true; + false } } - } - - for index in indices_to_remove.into_iter().rev() { - hashes.remove(index); - types.remove(index); - sizes.remove(index); - } - - // - // 3. checks if announcement is spam packed with duplicate hashes - // - let original_len = hashes.len(); - - let mut deduped_data = HashMap::with_capacity(hashes.len()); - for hash in hashes.into_iter().rev() { - if let (Some(ty), Some(size)) = (types.pop(), sizes.pop()) { - deduped_data.insert(hash, Some((ty, size))); - } - } - deduped_data.shrink_to_fit(); - - if deduped_data.len() != original_len { - should_report_peer = true - } + }); ( if should_report_peer { FilterOutcome::ReportPeer } else { FilterOutcome::Ok }, - ValidAnnouncementData::new_eth68(deduped_data), + ValidAnnouncementData::from_partially_valid_data(msg), ) } fn filter_valid_entries_66( &self, - msg: NewPooledTransactionHashes66, + partially_valid_data: PartiallyValidData>, ) -> (FilterOutcome, ValidAnnouncementData) { trace!(target: "net::tx::validation", - hashes=?msg.0, + hashes=?*partially_valid_data, network=%Self, "validating eth66 announcement data.." ); - let NewPooledTransactionHashes66(hashes) = msg; - - // - // 1. checks if the announcement is empty - // - if hashes.is_empty() { - debug!(target: "net::tx", - network=%Self, - "empty eth66 announcement" - ); - return (FilterOutcome::ReportPeer, ValidAnnouncementData::empty_eth66()) - } - - // - // 2. checks if announcement is spam packed with duplicate hashes - // - let original_len = hashes.len(); - - let mut deduped_data = HashMap::with_capacity(hashes.len()); - for hash in hashes.into_iter().rev() { - deduped_data.insert(hash, None); - } - deduped_data.shrink_to_fit(); - - ( - if deduped_data.len() != original_len { - FilterOutcome::ReportPeer - } else { - FilterOutcome::Ok - }, - ValidAnnouncementData::new_eth66(deduped_data), - ) + (FilterOutcome::Ok, ValidAnnouncementData::from_partially_valid_data(partially_valid_data)) } } #[cfg(test)] mod test { use super::*; + + use reth_eth_wire::{NewPooledTransactionHashes66, NewPooledTransactionHashes68}; use reth_primitives::B256; - use std::str::FromStr; + use std::{collections::HashMap, str::FromStr}; #[test] fn eth68_empty_announcement() { @@ -347,9 +323,9 @@ mod test { let announcement = NewPooledTransactionHashes68 { types, sizes, hashes }; - let filter = EthAnnouncementFilter; + let filter = EthMessageFilter; - let (outcome, _data) = filter.filter_valid_entries_68(announcement); + let (outcome, _partially_valid_data) = filter.partially_filter_valid_entries(announcement); assert_eq!(outcome, FilterOutcome::ReportPeer); } @@ -374,16 +350,20 @@ mod test { hashes: hashes.clone(), }; - let filter = EthAnnouncementFilter; + let filter = EthMessageFilter; + + let (outcome, partially_valid_data) = filter.partially_filter_valid_entries(announcement); + + assert_eq!(outcome, FilterOutcome::Ok); - let (outcome, data) = filter.filter_valid_entries_68(announcement); + let (outcome, valid_data) = filter.filter_valid_entries_68(partially_valid_data); assert_eq!(outcome, FilterOutcome::ReportPeer); let mut expected_data = HashMap::new(); expected_data.insert(hashes[1], Some((types[1], sizes[1]))); - assert_eq!(expected_data, data.into_data()) + assert_eq!(expected_data, valid_data.into_data()) } #[test] @@ -410,16 +390,20 @@ mod test { hashes: hashes.clone(), }; - let filter = EthAnnouncementFilter; + let filter = EthMessageFilter; + + let (outcome, partially_valid_data) = filter.partially_filter_valid_entries(announcement); + + assert_eq!(outcome, FilterOutcome::Ok); - let (outcome, data) = filter.filter_valid_entries_68(announcement); + let (outcome, valid_data) = filter.filter_valid_entries_68(partially_valid_data); assert_eq!(outcome, FilterOutcome::Ok); let mut expected_data = HashMap::new(); expected_data.insert(hashes[2], Some((types[2], sizes[2]))); - assert_eq!(expected_data, data.into_data()) + assert_eq!(expected_data, valid_data.into_data()) } #[test] @@ -449,9 +433,9 @@ mod test { hashes: hashes.clone(), }; - let filter = EthAnnouncementFilter; + let filter = EthMessageFilter; - let (outcome, data) = filter.filter_valid_entries_68(announcement); + let (outcome, partially_valid_data) = filter.partially_filter_valid_entries(announcement); assert_eq!(outcome, FilterOutcome::ReportPeer); @@ -459,7 +443,7 @@ mod test { expected_data.insert(hashes[3], Some((types[3], sizes[3]))); expected_data.insert(hashes[0], Some((types[0], sizes[0]))); - assert_eq!(expected_data, data.into_data()) + assert_eq!(expected_data, partially_valid_data.into_data()) } #[test] @@ -468,9 +452,9 @@ mod test { let announcement = NewPooledTransactionHashes66(hashes); - let filter: AnnouncementFilter = AnnouncementFilter::default(); + let filter: MessageFilter = MessageFilter::default(); - let (outcome, _data) = filter.filter_valid_entries_66(announcement); + let (outcome, _partially_valid_data) = filter.partially_filter_valid_entries(announcement); assert_eq!(outcome, FilterOutcome::ReportPeer); } @@ -493,9 +477,9 @@ mod test { let announcement = NewPooledTransactionHashes66(hashes.clone()); - let filter: AnnouncementFilter = AnnouncementFilter::default(); + let filter: MessageFilter = MessageFilter::default(); - let (outcome, data) = filter.filter_valid_entries_66(announcement); + let (outcome, partially_valid_data) = filter.partially_filter_valid_entries(announcement); assert_eq!(outcome, FilterOutcome::ReportPeer); @@ -503,12 +487,12 @@ mod test { expected_data.insert(hashes[1], None); expected_data.insert(hashes[0], None); - assert_eq!(expected_data, data.into_data()) + assert_eq!(expected_data, partially_valid_data.into_data()) } #[test] fn test_derive_more_display_for_zst() { - let filter = EthAnnouncementFilter; - assert_eq!("EthAnnouncementFilter", &filter.to_string()); + let filter = EthMessageFilter; + assert_eq!("EthMessageFilter", &filter.to_string()); } } diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index b15bac5d72a9..81380423207d 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -459,7 +459,7 @@ where self.pool.remove_transactions(hashes) } - fn retain_unknown(&self, announcement: &mut A) -> Option + fn retain_unknown(&self, announcement: &mut A) where A: HandleMempoolData, { diff --git a/crates/transaction-pool/src/noop.rs b/crates/transaction-pool/src/noop.rs index fdf037b76581..7fb57415c9c4 100644 --- a/crates/transaction-pool/src/noop.rs +++ b/crates/transaction-pool/src/noop.rs @@ -180,11 +180,10 @@ impl TransactionPool for NoopTransactionPool { vec![] } - fn retain_unknown(&self, _announcement: &mut A) -> Option + fn retain_unknown(&self, _announcement: &mut A) where A: HandleMempoolData, { - None } fn get(&self, _tx_hash: &TxHash) -> Option>> { diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index da4b524b7540..90e0c6a3c1e0 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -685,15 +685,15 @@ where } /// Removes and returns all transactions that are present in the pool. - pub(crate) fn retain_unknown(&self, announcement: &mut A) -> Option + pub(crate) fn retain_unknown(&self, announcement: &mut A) where A: HandleMempoolData, { if announcement.is_empty() { - return None + return } let pool = self.get_pool_data(); - Some(announcement.retain_by_hash(|tx| !pool.contains(tx))) + announcement.retain_by_hash(|tx| !pool.contains(tx)) } /// Returns the transaction by hash. diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 53ffa668b31c..f4c26f6fa990 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -301,7 +301,7 @@ pub trait TransactionPool: Send + Sync + Clone { /// the pool. Returns hashes already known to the pool. /// /// Consumer: P2P - fn retain_unknown(&self, announcement: &mut A) -> Option + fn retain_unknown(&self, announcement: &mut A) where A: HandleMempoolData;