Skip to content

Commit

Permalink
Verify tx response data against request (#6439)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Oliver Nordbjerg <onbjerg@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 27, 2024
1 parent a47c62c commit 0007c9a
Show file tree
Hide file tree
Showing 14 changed files with 633 additions and 446 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

280 changes: 148 additions & 132 deletions crates/net/eth-wire/src/types/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use alloy_rlp::{
Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper,
};

use derive_more::{Constructor, Deref, DerefMut, IntoIterator};
use derive_more::{Constructor, Deref, DerefMut, From, IntoIterator};
use reth_codecs::derive_arbitrary;
use reth_primitives::{
Block, Bytes, PooledTransactionsElement, TransactionSigned, TxHash, B256, U128,
Expand Down Expand Up @@ -441,28 +441,27 @@ impl Decodable for NewPooledTransactionHashes68 {
}
}

/// Interface for handling mempool message data. Used in various filters in pipelines in
/// `TransactionsManager` and in queries to `TransactionPool`.
pub trait HandleMempoolData {
/// The announcement contains no entries.
/// Validation pass that checks for unique transaction hashes.
pub trait DedupPayload {
/// Value type in [`PartiallyValidData`] map.
type Value;

/// The payload contains no entries.
fn is_empty(&self) -> bool;

/// Returns the number of entries.
fn len(&self) -> usize;

/// Retain only entries for which the hash in the entry satisfies a given predicate, return
/// the rest.
fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool) -> Self;
/// Consumes self, returning an iterator over hashes in payload.
fn dedup(self) -> PartiallyValidData<Self::Value>;
}

/// Extension of [`HandleMempoolData`] interface, for mempool messages that are versioned.
pub trait HandleVersionedMempoolData {
/// Returns the announcement version, either [`Eth66`](EthVersion::Eth66) or
/// [`Eth68`](EthVersion::Eth68).
fn msg_version(&self) -> EthVersion;
}
/// Value in [`PartiallyValidData`] map obtained from an announcement.
pub type Eth68TxMetadata = Option<(u8, usize)>;

impl DedupPayload for NewPooledTransactionHashes {
type Value = Eth68TxMetadata;

impl HandleMempoolData for NewPooledTransactionHashes {
fn is_empty(&self) -> bool {
self.is_empty()
}
Expand All @@ -471,21 +470,17 @@ impl HandleMempoolData for NewPooledTransactionHashes {
self.len()
}

fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool) -> Self {
fn dedup(self) -> PartiallyValidData<Self::Value> {
match self {
NewPooledTransactionHashes::Eth66(msg) => Self::Eth66(msg.retain_by_hash(f)),
NewPooledTransactionHashes::Eth68(msg) => Self::Eth68(msg.retain_by_hash(f)),
NewPooledTransactionHashes::Eth66(msg) => msg.dedup(),
NewPooledTransactionHashes::Eth68(msg) => msg.dedup(),
}
}
}

impl HandleVersionedMempoolData for NewPooledTransactionHashes {
fn msg_version(&self) -> EthVersion {
self.version()
}
}
impl DedupPayload for NewPooledTransactionHashes68 {
type Value = Eth68TxMetadata;

impl HandleMempoolData for NewPooledTransactionHashes68 {
fn is_empty(&self) -> bool {
self.hashes.is_empty()
}
Expand All @@ -494,38 +489,24 @@ impl HandleMempoolData for NewPooledTransactionHashes68 {
self.hashes.len()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let mut indices_to_remove = vec![];
for (i, hash) in self.hashes.iter().enumerate() {
if !f(hash) {
indices_to_remove.push(i);
}
}
fn dedup(self) -> PartiallyValidData<Self::Value> {
let Self { hashes, mut sizes, mut types } = self;

let mut deduped_data = HashMap::with_capacity(hashes.len());

let mut removed_hashes = Vec::with_capacity(indices_to_remove.len());
let mut removed_types = Vec::with_capacity(indices_to_remove.len());
let mut removed_sizes = Vec::with_capacity(indices_to_remove.len());

for index in indices_to_remove.into_iter().rev() {
let hash = self.hashes.remove(index);
removed_hashes.push(hash);
let ty = self.types.remove(index);
removed_types.push(ty);
let size = self.sizes.remove(index);
removed_sizes.push(size);
for hash in hashes.into_iter().rev() {
if let (Some(ty), Some(size)) = (types.pop(), sizes.pop()) {
deduped_data.insert(hash, Some((ty, size)));
}
}

Self { hashes: removed_hashes, types: removed_types, sizes: removed_sizes }
PartiallyValidData::from_raw_data_eth68(deduped_data)
}
}

impl HandleVersionedMempoolData for NewPooledTransactionHashes68 {
fn msg_version(&self) -> EthVersion {
EthVersion::Eth68
}
}
impl DedupPayload for NewPooledTransactionHashes66 {
type Value = Eth68TxMetadata;

impl HandleMempoolData for NewPooledTransactionHashes66 {
fn is_empty(&self) -> bool {
self.0.is_empty()
}
Expand All @@ -534,100 +515,163 @@ impl HandleMempoolData for NewPooledTransactionHashes66 {
self.0.len()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let mut indices_to_remove = vec![];
for (i, hash) in self.0.iter().enumerate() {
if !f(hash) {
indices_to_remove.push(i);
}
}
fn dedup(self) -> PartiallyValidData<Self::Value> {
let Self(hashes) = self;

let mut removed_hashes = Vec::with_capacity(indices_to_remove.len());
let mut deduped_data = HashMap::with_capacity(hashes.len());

for index in indices_to_remove.into_iter().rev() {
let hash = self.0.remove(index);
removed_hashes.push(hash);
let noop_value: Eth68TxMetadata = None;

for hash in hashes.into_iter().rev() {
deduped_data.insert(hash, noop_value);
}

Self(removed_hashes)
PartiallyValidData::from_raw_data_eth66(deduped_data)
}
}

impl HandleVersionedMempoolData for NewPooledTransactionHashes66 {
fn msg_version(&self) -> EthVersion {
EthVersion::Eth66
/// Interface for handling mempool message data. Used in various filters in pipelines in
/// `TransactionsManager` and in queries to `TransactionPool`.
pub trait HandleMempoolData {
/// The announcement contains no entries.
fn is_empty(&self) -> bool;

/// Returns the number of entries.
fn len(&self) -> usize;

/// Retain only entries for which the hash in the entry satisfies a given predicate.
fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool);
}

/// Extension of [`HandleMempoolData`] interface, for mempool messages that are versioned.
pub trait HandleVersionedMempoolData {
/// Returns the announcement version, either [`Eth66`](EthVersion::Eth66) or
/// [`Eth68`](EthVersion::Eth68).
fn msg_version(&self) -> EthVersion;
}

impl HandleMempoolData for Vec<PooledTransactionsElement> {
fn is_empty(&self) -> bool {
self.is_empty()
}

fn len(&self) -> usize {
self.len()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
self.retain(|tx| f(tx.hash()))
}
}

/// Announcement data that has been validated according to the configured network. For an eth68
/// announcement, values of the map are `Some((u8, usize))` - the tx metadata. For an eth66
/// announcement, values of the map are `None`.
#[derive(Debug, Deref, DerefMut, IntoIterator, Constructor)]
pub struct ValidAnnouncementData {
macro_rules! handle_mempool_data_map_impl {
($data_ty:ty, $(<$generic:ident>)?) => {
impl$(<$generic>)? HandleMempoolData for $data_ty {
fn is_empty(&self) -> bool {
self.data.is_empty()
}

fn len(&self) -> usize {
self.data.len()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
self.data.retain(|hash, _| f(hash));
}
}
};
}

/// Data that has passed an initial validation pass that is not specific to any mempool message
/// type.
#[derive(Debug, Deref, DerefMut, IntoIterator)]
pub struct PartiallyValidData<V> {
#[deref]
#[deref_mut]
#[into_iterator]
data: HashMap<TxHash, Option<(u8, usize)>>,
version: EthVersion,
data: HashMap<TxHash, V>,
version: Option<EthVersion>,
}

impl ValidAnnouncementData {
/// Returns a new [`ValidAnnouncementData`] wrapper around validated
/// [`Eth68`](EthVersion::Eth68) announcement data.
pub fn new_eth68(data: HashMap<TxHash, Option<(u8, usize)>>) -> Self {
Self::new(data, EthVersion::Eth68)
handle_mempool_data_map_impl!(PartiallyValidData<V>, <V>);

impl<V> PartiallyValidData<V> {
/// Wraps raw data.
pub fn from_raw_data(data: HashMap<TxHash, V>, version: Option<EthVersion>) -> Self {
Self { data, version }
}

/// Returns a new [`ValidAnnouncementData`] wrapper around validated
/// [`Eth68`](EthVersion::Eth68) announcement data.
pub fn new_eth66(data: HashMap<TxHash, Option<(u8, usize)>>) -> Self {
Self::new(data, EthVersion::Eth66)
/// Wraps raw data with version [`EthVersion::Eth68`].
pub fn from_raw_data_eth68(data: HashMap<TxHash, V>) -> Self {
Self::from_raw_data(data, Some(EthVersion::Eth68))
}

/// Returns a new [`ValidAnnouncementData`] with empty data from an [`Eth68`](EthVersion::Eth68)
/// Wraps raw data with version [`EthVersion::Eth66`].
pub fn from_raw_data_eth66(data: HashMap<TxHash, V>) -> Self {
Self::from_raw_data(data, Some(EthVersion::Eth66))
}

/// Returns a new [`PartiallyValidData`] with empty data from an [`Eth68`](EthVersion::Eth68)
/// announcement.
pub fn empty_eth68() -> Self {
Self::new_eth68(HashMap::new())
Self::from_raw_data_eth68(HashMap::new())
}

/// Returns a new [`ValidAnnouncementData`] with empty data from an [`Eth66`](EthVersion::Eth66)
/// Returns a new [`PartiallyValidData`] with empty data from an [`Eth66`](EthVersion::Eth66)
/// announcement.
pub fn empty_eth66() -> Self {
Self::new_eth66(HashMap::new())
Self::from_raw_data_eth66(HashMap::new())
}

/// Returns the version of the message this data was received in if different versions of the
/// message exists, either [`Eth66`](EthVersion::Eth66) or [`Eth68`](EthVersion::Eth68).
pub fn msg_version(&self) -> Option<EthVersion> {
self.version
}

/// Destructs returning the validated data.
pub fn into_data(self) -> HashMap<TxHash, Option<(u8, usize)>> {
pub fn into_data(self) -> HashMap<TxHash, V> {
self.data
}
}

/// Partially validated data from an announcement or a
/// [`PooledTransactions`](crate::PooledTransactions) response.
#[derive(Debug, Deref, DerefMut, IntoIterator, From)]
#[from(PartiallyValidData<Eth68TxMetadata>)]
pub struct ValidAnnouncementData {
#[deref]
#[deref_mut]
#[into_iterator]
data: HashMap<TxHash, Eth68TxMetadata>,
version: EthVersion,
}

handle_mempool_data_map_impl!(ValidAnnouncementData,);

impl ValidAnnouncementData {
/// Destructs returning only the valid hashes and the announcement message version. Caution! If
/// this is [`Eth68`](EthVersion::Eth68)announcement data, the metadata must be cached
/// before call.
/// this is [`Eth68`](EthVersion::Eth68) announcement data, this drops the metadata.
pub fn into_request_hashes(self) -> (RequestTxHashes, EthVersion) {
let hashes = self.data.into_keys().collect::<Vec<_>>();

(RequestTxHashes::new(hashes), self.version)
}
}

impl HandleMempoolData for ValidAnnouncementData {
fn is_empty(&self) -> bool {
self.data.is_empty()
}

fn len(&self) -> usize {
self.data.len()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let data = std::mem::take(&mut self.data);
/// Conversion from [`PartiallyValidData`] from an announcement. Note! [`PartiallyValidData`]
/// from an announcement, should have some [`EthVersion`]. Panics if [`PartiallyValidData`] has
/// version set to `None`.
pub fn from_partially_valid_data(data: PartiallyValidData<Eth68TxMetadata>) -> Self {
let PartiallyValidData { data, version } = data;

let (keep, rest) = data.into_iter().partition(|(hash, _)| f(hash));
let version = version.expect("should have eth version for conversion");

self.data = keep;
Self { data, version }
}

ValidAnnouncementData::new(rest, self.version)
/// Destructs returning the validated data.
pub fn into_data(self) -> HashMap<TxHash, Eth68TxMetadata> {
self.data
}
}

Expand Down Expand Up @@ -656,8 +700,8 @@ impl RequestTxHashes {
}
}

impl FromIterator<(TxHash, Option<(u8, usize)>)> for RequestTxHashes {
fn from_iter<I: IntoIterator<Item = (TxHash, Option<(u8, usize)>)>>(iter: I) -> Self {
impl FromIterator<(TxHash, Eth68TxMetadata)> for RequestTxHashes {
fn from_iter<I: IntoIterator<Item = (TxHash, Eth68TxMetadata)>>(iter: I) -> Self {
let mut hashes = Vec::with_capacity(32);

for (hash, _) in iter {
Expand All @@ -670,34 +714,6 @@ impl FromIterator<(TxHash, Option<(u8, usize)>)> for RequestTxHashes {
}
}

impl HandleMempoolData for Vec<PooledTransactionsElement> {
fn is_empty(&self) -> bool {
self.is_empty()
}

fn len(&self) -> usize {
self.len()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let mut indices_to_remove = vec![];
for (i, tx) in self.iter().enumerate() {
if !f(tx.hash()) {
indices_to_remove.push(i);
}
}

let mut removed_txns = Vec::with_capacity(indices_to_remove.len());

for index in indices_to_remove.into_iter().rev() {
let hash = self.remove(index);
removed_txns.push(hash);
}

removed_txns
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit 0007c9a

Please sign in to comment.