Skip to content

Commit

Permalink
feat: make TransactionsManager Future impl generic over NetworkPrimit…
Browse files Browse the repository at this point in the history
…ives (#13115)
  • Loading branch information
Rjected authored Dec 3, 2024
1 parent e9484b2 commit 601e8b9
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 35 deletions.
14 changes: 2 additions & 12 deletions crates/net/eth-wire-types/src/primitives.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Abstraction over primitive types in network messages.
use alloy_rlp::{Decodable, Encodable};
use reth_primitives_traits::{Block, BlockHeader};
use reth_primitives_traits::{Block, BlockHeader, SignedTransaction};
use std::fmt::Debug;

/// Abstraction over primitive types which might appear in network messages. See
Expand Down Expand Up @@ -62,17 +62,7 @@ pub trait NetworkPrimitives:
+ 'static;

/// The transaction type which peers return in `PooledTransactions` messages.
type PooledTransaction: TryFrom<Self::BroadcastedTransaction>
+ Encodable
+ Decodable
+ Send
+ Sync
+ Unpin
+ Clone
+ Debug
+ PartialEq
+ Eq
+ 'static;
type PooledTransaction: SignedTransaction + TryFrom<Self::BroadcastedTransaction> + 'static;

/// The transaction type which peers return in `GetReceipts` messages.
type Receipt: Encodable
Expand Down
39 changes: 21 additions & 18 deletions crates/net/network/src/transactions/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use reth_network_api::PeerRequest;
use reth_network_p2p::error::{RequestError, RequestResult};
use reth_network_peers::PeerId;
use reth_primitives::PooledTransactionsElement;
use reth_primitives_traits::SignedTransaction;
use schnellru::ByLength;
#[cfg(debug_assertions)]
use smallvec::{smallvec, SmallVec};
Expand Down Expand Up @@ -895,16 +896,14 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
approx_capacity_get_pooled_transactions_req_eth66()
}
}
}

impl TransactionFetcher {
/// Processes a resolved [`GetPooledTransactions`] request. Queues the outcome as a
/// [`FetchEvent`], which will then be streamed by
/// [`TransactionsManager`](super::TransactionsManager).
pub fn on_resolved_get_pooled_transactions_request_fut(
&mut self,
response: GetPooledTxResponse,
) -> FetchEvent {
response: GetPooledTxResponse<N::PooledTransaction>,
) -> FetchEvent<N::PooledTransaction> {
// update peer activity, requests for buffered hashes can only be made to idle
// fallback peers
let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
Expand Down Expand Up @@ -1026,8 +1025,8 @@ impl TransactionFetcher {
}
}

impl Stream for TransactionFetcher {
type Item = FetchEvent;
impl<N: NetworkPrimitives> Stream for TransactionFetcher<N> {
type Item = FetchEvent<N::PooledTransaction>;

/// Advances all inflight requests and returns the next event.
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down Expand Up @@ -1176,18 +1175,18 @@ impl<T> Future for GetPooledTxRequestFut<T> {

/// Wrapper of unverified [`PooledTransactions`].
#[derive(Debug, Constructor, Deref)]
pub struct UnverifiedPooledTransactions {
txns: PooledTransactions,
pub struct UnverifiedPooledTransactions<T> {
txns: PooledTransactions<T>,
}

/// [`PooledTransactions`] that have been successfully verified.
#[derive(Debug, Constructor, Deref)]
pub struct VerifiedPooledTransactions {
txns: PooledTransactions,
pub struct VerifiedPooledTransactions<T> {
txns: PooledTransactions<T>,
}

impl DedupPayload for VerifiedPooledTransactions {
type Value = PooledTransactionsElement;
impl<T: SignedTransaction> DedupPayload for VerifiedPooledTransactions<T> {
type Value = T;

fn is_empty(&self) -> bool {
self.txns.is_empty()
Expand All @@ -1199,26 +1198,30 @@ impl DedupPayload for VerifiedPooledTransactions {

fn dedup(self) -> PartiallyValidData<Self::Value> {
PartiallyValidData::from_raw_data(
self.txns.into_iter().map(|tx| (*tx.hash(), tx)).collect(),
self.txns.into_iter().map(|tx| (*tx.tx_hash(), tx)).collect(),
None,
)
}
}

trait VerifyPooledTransactionsResponse {
type Transaction: SignedTransaction;

fn verify(
self,
requested_hashes: &RequestTxHashes,
peer_id: &PeerId,
) -> (VerificationOutcome, VerifiedPooledTransactions);
) -> (VerificationOutcome, VerifiedPooledTransactions<Self::Transaction>);
}

impl VerifyPooledTransactionsResponse for UnverifiedPooledTransactions {
impl<T: SignedTransaction> VerifyPooledTransactionsResponse for UnverifiedPooledTransactions<T> {
type Transaction = T;

fn verify(
self,
requested_hashes: &RequestTxHashes,
_peer_id: &PeerId,
) -> (VerificationOutcome, VerifiedPooledTransactions) {
) -> (VerificationOutcome, VerifiedPooledTransactions<T>) {
let mut verification_outcome = VerificationOutcome::Ok;

let Self { mut txns } = self;
Expand All @@ -1229,11 +1232,11 @@ impl VerifyPooledTransactionsResponse for UnverifiedPooledTransactions {
let mut tx_hashes_not_requested_count = 0;

txns.0.retain(|tx| {
if !requested_hashes.contains(tx.hash()) {
if !requested_hashes.contains(tx.tx_hash()) {
verification_outcome = VerificationOutcome::ReportPeer;

#[cfg(debug_assertions)]
tx_hashes_not_requested.push(*tx.hash());
tx_hashes_not_requested.push(*tx.tx_hash());
#[cfg(not(debug_assertions))]
{
tx_hashes_not_requested_count += 1;
Expand Down
15 changes: 10 additions & 5 deletions crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ use reth_network_p2p::{
use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind;
use reth_primitives::{
transaction::SignedTransactionIntoRecoveredExt, PooledTransactionsElement, RecoveredTx,
TransactionSigned,
transaction::SignedTransactionIntoRecoveredExt, RecoveredTx, TransactionSigned,
};
use reth_primitives_traits::{SignedTransaction, TxType};
use reth_tokio_util::EventStream;
Expand Down Expand Up @@ -1307,11 +1306,17 @@ where
//
// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
// `NetworkConfig::start_network`(reth_network::NetworkConfig)
impl<Pool> Future for TransactionsManager<Pool>
impl<Pool, N> Future for TransactionsManager<Pool, N>
where
Pool: TransactionPool + Unpin + 'static,
Pool::Transaction:
PoolTransaction<Consensus = TransactionSigned, Pooled: Into<PooledTransactionsElement>>,
N: NetworkPrimitives<
BroadcastedTransaction: SignedTransaction,
PooledTransaction: SignedTransaction,
>,
Pool::Transaction: PoolTransaction<
Consensus = N::BroadcastedTransaction,
Pooled: Into<N::PooledTransaction> + From<RecoveredTx<N::PooledTransaction>>,
>,
{
type Output = ();

Expand Down

0 comments on commit 601e8b9

Please sign in to comment.