Skip to content

Commit

Permalink
chore: smol touchups
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse committed Oct 27, 2024
1 parent e69390e commit 7f297b8
Showing 1 changed file with 75 additions and 80 deletions.
155 changes: 75 additions & 80 deletions crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,29 +324,6 @@ where
}
}

/// Represents the different modes of transaction propagation.
///
/// This enum is used to determine how transactions are propagated to peers in the network.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum PropgateKind {
/// Default propagation mode.
///
/// Transactions are only sent to peers that haven't seen them yet.
Basic,
/// Forced propagation mode.
///
/// Transactions are sent to all peers regardless of whether they have been sent or received
/// before.
Forced,
}

impl PropgateKind {
/// Returns `true` if the propagation kind is `Forced`.
pub const fn is_forced(self) -> bool {
matches!(self, Self::Forced)
}
}

impl<Pool> TransactionsManager<Pool>
where
Pool: TransactionPool + 'static,
Expand Down Expand Up @@ -439,7 +416,7 @@ where
fn propagate_all(&mut self, hashes: Vec<TxHash>) {
let propagated = self.propagate_transactions(
self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect(),
PropgateKind::Basic,
PropagationMode::Basic,
);

// notify pool so events get fired
Expand All @@ -455,7 +432,7 @@ where
fn propagate_transactions(
&mut self,
to_propagate: Vec<PropagateTransaction>,
propagate_kind: PropgateKind,
propagation_mode: PropagationMode,
) -> PropagatedTransactions {
let mut propagated = PropagatedTransactions::default();
if self.network.tx_gossip_disabled() {
Expand All @@ -474,17 +451,16 @@ where
PropagateTransactionsBuilder::full(peer.version)
};

// Iterate through the transactions to propagate and fill the hashes and full
// transaction lists, before deciding whether or not to send full transactions to the
// peer.
for tx in &to_propagate {
// Only proceed if the transaction is not in the peer's list of seen transactions
if propagate_kind.is_forced() {
// add transaction to the list of hashes to propagate
builder.push(tx);
} else if !peer.seen_transactions.contains(&tx.hash()) {
// Include the transaction if the peer hasn't seen it
builder.push(tx);
if propagation_mode.is_forced() {
builder.extend(to_propagate.iter());
} else {
// Iterate through the transactions to propagate and fill the hashes and full
// transaction lists, before deciding whether or not to send full transactions to
// the peer.
for tx in &to_propagate {
if !peer.seen_transactions.contains(&tx.hash()) {
builder.push(tx);
}
}
}

Expand All @@ -504,13 +480,8 @@ where

for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
}

// Update cache if respecting cache
if !propagate_kind.is_forced() {
for hash in new_pooled_hashes.iter_hashes().copied() {
peer.seen_transactions.insert(hash);
}
// mark transaction as seen by peer
peer.seen_transactions.insert(hash);
}

trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
Expand All @@ -521,17 +492,10 @@ where

// send full transactions, if any
if let Some(new_full_transactions) = full {
// Record propagated full transactions
for tx in &new_full_transactions {
propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(*peer_id));
}

// Update cache if respecting cache
if !propagate_kind.is_forced() {
for tx in &new_full_transactions {
// Mark transaction as seen by peer
peer.seen_transactions.insert(tx.hash());
}
// mark transaction as seen by peer
peer.seen_transactions.insert(tx.hash());
}

trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
Expand All @@ -554,7 +518,7 @@ where
&mut self,
txs: Vec<TxHash>,
peer_id: PeerId,
propagate_kind: PropgateKind,
propagation_mode: PropagationMode,
) -> Option<PropagatedTransactions> {
trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");

Expand All @@ -566,14 +530,17 @@ where

let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::new);

// Iterate through the transactions to propagate and fill the hashes and full transaction
for tx in to_propagate {
if propagate_kind.is_forced() {
// Always include the transaction, regardless of cache
full_transactions.push(&tx);
} else if !peer.seen_transactions.contains(&tx.hash()) {
// Only include if the peer hasn't seen the transaction
full_transactions.push(&tx);
if propagation_mode.is_forced() {
// skip cache check if forced
full_transactions.extend(to_propagate);
} else {
// Iterate through the transactions to propagate and fill the hashes and full
// transaction
for tx in to_propagate {
if !peer.seen_transactions.contains(&tx.hash()) {
// Only include if the peer hasn't seen the transaction
full_transactions.push(&tx);
}
}
}

Expand All @@ -588,13 +555,8 @@ where
if let Some(new_pooled_hashes) = pooled {
for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
}

// mark transaction as seen by peer
if !propagate_kind.is_forced() {
for hash in new_pooled_hashes.iter_hashes().copied() {
peer.seen_transactions.insert(hash);
}
// mark transaction as seen by peer
peer.seen_transactions.insert(hash);
}

// send hashes of transactions
Expand All @@ -605,13 +567,8 @@ where
if let Some(new_full_transactions) = full {
for tx in &new_full_transactions {
propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(peer_id));
}

// mark transaction as seen by peer
if !propagate_kind.is_forced() {
for tx in &new_full_transactions {
peer.seen_transactions.insert(tx.hash());
}
// mark transaction as seen by peer
peer.seen_transactions.insert(tx.hash());
}

// send full transactions
Expand All @@ -631,7 +588,7 @@ where
&mut self,
hashes: Vec<TxHash>,
peer_id: PeerId,
propagate_kind: PropgateKind,
propagate_kind: PropagationMode,
) {
trace!(target: "net::tx", "Start propagating transactions as hashes");

Expand Down Expand Up @@ -953,15 +910,15 @@ where
self.on_new_pending_transactions(vec![hash])
}
TransactionsCommand::PropagateHashesTo(hashes, peer) => {
self.propagate_hashes_to(hashes, peer, PropgateKind::Forced)
self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
}
TransactionsCommand::GetActivePeers(tx) => {
let peers = self.peers.keys().copied().collect::<HashSet<_>>();
tx.send(peers).ok();
}
TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
if let Some(propagated) =
self.propagate_full_transactions_to_peer(txs, peer, PropgateKind::Forced)
self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
{
self.pool.on_propagated(propagated);
}
Expand Down Expand Up @@ -1470,6 +1427,29 @@ where
}
}

/// Represents the different modes of transaction propagation.
///
/// This enum is used to determine how transactions are propagated to peers in the network.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
enum PropagationMode {
/// Default propagation mode.
///
/// Transactions are only sent to peers that haven't seen them yet.
Basic,
/// Forced propagation mode.
///
/// Transactions are sent to all peers regardless of whether they have been sent or received
/// before.
Forced,
}

impl PropagationMode {
/// Returns `true` if the propagation kind is `Forced`.
const fn is_forced(self) -> bool {
matches!(self, Self::Forced)
}
}

/// A transaction that's about to be propagated to multiple peers.
#[derive(Debug, Clone)]
struct PropagateTransaction {
Expand Down Expand Up @@ -1516,6 +1496,13 @@ impl PropagateTransactionsBuilder {
Self::Full(FullTransactionsBuilder::new(version))
}

/// Appends all transactions
fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction>) {
for tx in txs {
self.push(tx);
}
}

/// Appends a transaction to the list.
fn push(&mut self, transaction: &PropagateTransaction) {
match self {
Expand Down Expand Up @@ -1577,6 +1564,13 @@ impl FullTransactionsBuilder {
}
}

/// Appends all transactions.
fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction>) {
for tx in txs {
self.push(&tx)
}
}

/// Append a transaction to the list of full transaction if the total message bytes size doesn't
/// exceed the soft maximum target byte size. The limit is soft, meaning if one single
/// transaction goes over the limit, it will be broadcasted in its own [`Transactions`]
Expand Down Expand Up @@ -2463,7 +2457,8 @@ mod tests {
let eip4844_tx = Arc::new(factory.create_eip4844());
propagate.push(PropagateTransaction::new(eip4844_tx.clone()));

let propagated = tx_manager.propagate_transactions(propagate.clone(), PropgateKind::Basic);
let propagated =
tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
assert_eq!(propagated.0.len(), 2);
let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
assert_eq!(prop_txs.len(), 1);
Expand All @@ -2479,7 +2474,7 @@ mod tests {
peer.seen_transactions.contains(eip4844_tx.transaction.hash());

// propagate again
let propagated = tx_manager.propagate_transactions(propagate, PropgateKind::Basic);
let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
assert!(propagated.0.is_empty());
}
}

0 comments on commit 7f297b8

Please sign in to comment.