diff --git a/crates/net/eth-wire-types/src/primitives.rs b/crates/net/eth-wire-types/src/primitives.rs index 1b0c16c0622d..78083e9e0928 100644 --- a/crates/net/eth-wire-types/src/primitives.rs +++ b/crates/net/eth-wire-types/src/primitives.rs @@ -1,7 +1,7 @@ //! Abstraction over primitive types in network messages. use alloy_rlp::{Decodable, Encodable}; -use reth_primitives_traits::{Block, BlockHeader}; +use reth_primitives_traits::{Block, BlockHeader, SignedTransaction}; use std::fmt::Debug; /// Abstraction over primitive types which might appear in network messages. See @@ -62,17 +62,7 @@ pub trait NetworkPrimitives: + 'static; /// The transaction type which peers return in `PooledTransactions` messages. - type PooledTransaction: TryFrom - + Encodable - + Decodable - + Send - + Sync - + Unpin - + Clone - + Debug - + PartialEq - + Eq - + 'static; + type PooledTransaction: SignedTransaction + TryFrom + 'static; /// The transaction type which peers return in `GetReceipts` messages. type Receipt: Encodable diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 180a619fff9e..025ae36ea142 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -50,6 +50,7 @@ use reth_network_api::PeerRequest; use reth_network_p2p::error::{RequestError, RequestResult}; use reth_network_peers::PeerId; use reth_primitives::PooledTransactionsElement; +use reth_primitives_traits::SignedTransaction; use schnellru::ByLength; #[cfg(debug_assertions)] use smallvec::{smallvec, SmallVec}; @@ -895,16 +896,14 @@ impl TransactionFetcher { approx_capacity_get_pooled_transactions_req_eth66() } } -} -impl TransactionFetcher { /// Processes a resolved [`GetPooledTransactions`] request. Queues the outcome as a /// [`FetchEvent`], which will then be streamed by /// [`TransactionsManager`](super::TransactionsManager). pub fn on_resolved_get_pooled_transactions_request_fut( &mut self, - response: GetPooledTxResponse, - ) -> FetchEvent { + response: GetPooledTxResponse, + ) -> FetchEvent { // update peer activity, requests for buffered hashes can only be made to idle // fallback peers let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response; @@ -1026,8 +1025,8 @@ impl TransactionFetcher { } } -impl Stream for TransactionFetcher { - type Item = FetchEvent; +impl Stream for TransactionFetcher { + type Item = FetchEvent; /// Advances all inflight requests and returns the next event. fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -1176,18 +1175,18 @@ impl Future for GetPooledTxRequestFut { /// Wrapper of unverified [`PooledTransactions`]. #[derive(Debug, Constructor, Deref)] -pub struct UnverifiedPooledTransactions { - txns: PooledTransactions, +pub struct UnverifiedPooledTransactions { + txns: PooledTransactions, } /// [`PooledTransactions`] that have been successfully verified. #[derive(Debug, Constructor, Deref)] -pub struct VerifiedPooledTransactions { - txns: PooledTransactions, +pub struct VerifiedPooledTransactions { + txns: PooledTransactions, } -impl DedupPayload for VerifiedPooledTransactions { - type Value = PooledTransactionsElement; +impl DedupPayload for VerifiedPooledTransactions { + type Value = T; fn is_empty(&self) -> bool { self.txns.is_empty() @@ -1199,26 +1198,30 @@ impl DedupPayload for VerifiedPooledTransactions { fn dedup(self) -> PartiallyValidData { PartiallyValidData::from_raw_data( - self.txns.into_iter().map(|tx| (*tx.hash(), tx)).collect(), + self.txns.into_iter().map(|tx| (*tx.tx_hash(), tx)).collect(), None, ) } } trait VerifyPooledTransactionsResponse { + type Transaction: SignedTransaction; + fn verify( self, requested_hashes: &RequestTxHashes, peer_id: &PeerId, - ) -> (VerificationOutcome, VerifiedPooledTransactions); + ) -> (VerificationOutcome, VerifiedPooledTransactions); } -impl VerifyPooledTransactionsResponse for UnverifiedPooledTransactions { +impl VerifyPooledTransactionsResponse for UnverifiedPooledTransactions { + type Transaction = T; + fn verify( self, requested_hashes: &RequestTxHashes, _peer_id: &PeerId, - ) -> (VerificationOutcome, VerifiedPooledTransactions) { + ) -> (VerificationOutcome, VerifiedPooledTransactions) { let mut verification_outcome = VerificationOutcome::Ok; let Self { mut txns } = self; @@ -1229,11 +1232,11 @@ impl VerifyPooledTransactionsResponse for UnverifiedPooledTransactions { let mut tx_hashes_not_requested_count = 0; txns.0.retain(|tx| { - if !requested_hashes.contains(tx.hash()) { + if !requested_hashes.contains(tx.tx_hash()) { verification_outcome = VerificationOutcome::ReportPeer; #[cfg(debug_assertions)] - tx_hashes_not_requested.push(*tx.hash()); + tx_hashes_not_requested.push(*tx.tx_hash()); #[cfg(not(debug_assertions))] { tx_hashes_not_requested_count += 1; diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 4a7167a8064f..a1097dacf550 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -49,8 +49,7 @@ use reth_network_p2p::{ use reth_network_peers::PeerId; use reth_network_types::ReputationChangeKind; use reth_primitives::{ - transaction::SignedTransactionIntoRecoveredExt, PooledTransactionsElement, RecoveredTx, - TransactionSigned, + transaction::SignedTransactionIntoRecoveredExt, RecoveredTx, TransactionSigned, }; use reth_primitives_traits::{SignedTransaction, TxType}; use reth_tokio_util::EventStream; @@ -1307,11 +1306,17 @@ where // // spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and // `NetworkConfig::start_network`(reth_network::NetworkConfig) -impl Future for TransactionsManager +impl Future for TransactionsManager where Pool: TransactionPool + Unpin + 'static, - Pool::Transaction: - PoolTransaction>, + N: NetworkPrimitives< + BroadcastedTransaction: SignedTransaction, + PooledTransaction: SignedTransaction, + >, + Pool::Transaction: PoolTransaction< + Consensus = N::BroadcastedTransaction, + Pooled: Into + From>, + >, { type Output = ();