From 0cde0724539962b80bc2eea390afbbfea501becd Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 17 Sep 2024 13:32:41 +0100 Subject: [PATCH] feat(exex): backfill on subscription with head (#10787) --- clippy.toml | 1 + crates/exex/exex/Cargo.toml | 2 +- crates/exex/exex/src/context.rs | 9 +- crates/exex/exex/src/manager.rs | 533 ++++++++++++++++++++++--- crates/exex/test-utils/src/lib.rs | 7 +- crates/exex/types/src/head.rs | 3 +- crates/node/builder/src/launch/exex.rs | 7 +- crates/prune/prune/src/pruner.rs | 1 - 8 files changed, 507 insertions(+), 56 deletions(-) diff --git a/clippy.toml b/clippy.toml index 8105f3a75343..b498158094f9 100644 --- a/clippy.toml +++ b/clippy.toml @@ -1,2 +1,3 @@ msrv = "1.81" too-large-for-stack = 128 +doc-valid-idents = ["P2P", "ExEx", "ExExes", "IPv4", "IPv6", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"] diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index f3decd337417..4e082c4573e8 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] ## reth +reth-chainspec.workspace = true reth-config.workspace = true reth-evm.workspace = true reth-exex-types.workspace = true @@ -40,7 +41,6 @@ metrics.workspace = true [dev-dependencies] reth-blockchain-tree.workspace = true -reth-chainspec.workspace = true reth-db-api.workspace = true reth-db-common.workspace = true reth-evm-ethereum.workspace = true diff --git a/crates/exex/exex/src/context.rs b/crates/exex/exex/src/context.rs index b4743732563b..ec3c880148aa 100644 --- a/crates/exex/exex/src/context.rs +++ b/crates/exex/exex/src/context.rs @@ -30,13 +30,18 @@ pub struct ExExContext { /// /// Once an [`ExExNotification`](crate::ExExNotification) is sent over the channel, it is /// considered delivered by the node. - pub notifications: ExExNotifications, + pub notifications: ExExNotifications, /// node components pub components: Node, } -impl Debug for ExExContext { +impl Debug for ExExContext +where + Node: FullNodeComponents, + Node::Provider: Debug, + Node::Executor: Debug, +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ExExContext") .field("head", &self.head) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 48a77d786275..5e0e5b215cd6 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1,9 +1,15 @@ -use crate::{ExExEvent, ExExNotification, FinishedExExHeight}; -use futures::Stream; +use crate::{ + BackfillJobFactory, ExExEvent, ExExNotification, FinishedExExHeight, StreamBackfillJob, +}; +use eyre::OptionExt; +use futures::{Stream, StreamExt}; use metrics::Gauge; +use reth_chainspec::Head; +use reth_evm::execute::BlockExecutorProvider; use reth_exex_types::ExExHead; use reth_metrics::{metrics::Counter, Metrics}; -use reth_primitives::BlockNumber; +use reth_primitives::{BlockNumber, U256}; +use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory}; use reth_tracing::tracing::debug; use std::{ collections::VecDeque, @@ -60,13 +66,16 @@ impl ExExHandle { /// /// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a /// [`Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`. - pub fn new( + pub fn new( id: String, - components: Node, - ) -> (Self, UnboundedSender, ExExNotifications) { + node_head: Head, + provider: P, + executor: E, + ) -> (Self, UnboundedSender, ExExNotifications) { let (notification_tx, notification_rx) = mpsc::channel(1); let (event_tx, event_rx) = mpsc::unbounded_channel(); - let notifications = ExExNotifications { components, notifications: notification_rx }; + let notifications = + ExExNotifications { node_head, provider, executor, notifications: notification_rx }; ( Self { @@ -145,24 +154,32 @@ impl ExExHandle { } /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. -pub struct ExExNotifications { - components: Node, +pub struct ExExNotifications { + node_head: Head, + provider: P, + executor: E, notifications: Receiver, } -impl Debug for ExExNotifications { +impl Debug for ExExNotifications { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ExExNotifications") - .field("components", &"...") + .field("provider", &self.provider) + .field("executor", &self.executor) .field("notifications", &self.notifications) .finish() } } -impl ExExNotifications { +impl ExExNotifications { /// Creates a new instance of [`ExExNotifications`]. - pub const fn new(components: Node, notifications: Receiver) -> Self { - Self { components, notifications } + pub const fn new( + node_head: Head, + provider: P, + executor: E, + notifications: Receiver, + ) -> Self { + Self { node_head, provider, executor, notifications } } /// Receives the next value for this receiver. @@ -214,23 +231,29 @@ impl ExExNotifications { pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { self.notifications.poll_recv(cx) } +} - // TODO(alexey): make it public when backfill is implemented in [`ExExNotificationsWithHead`] +impl ExExNotifications +where + P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, + E: BlockExecutorProvider + Clone + Unpin + 'static, +{ /// Subscribe to notifications with the given head. /// /// Notifications will be sent starting from the head, not inclusive. For example, if /// `head.number == 10`, then the first notification will be with `block.number == 11`. - #[allow(dead_code)] - fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { - ExExNotificationsWithHead { - components: self.components, - notifications: self.notifications, + pub fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { + ExExNotificationsWithHead::new( + self.node_head, + self.provider, + self.executor, + self.notifications, head, - } + ) } } -impl Stream for ExExNotifications { +impl Stream for ExExNotifications { type Item = ExExNotification; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -241,31 +264,212 @@ impl Stream for ExExNotifications { /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are /// committed or reverted after the given head. #[derive(Debug)] -pub struct ExExNotificationsWithHead { - #[allow(dead_code)] - components: Node, +pub struct ExExNotificationsWithHead { + node_head: Head, + provider: P, + executor: E, notifications: Receiver, - head: ExExHead, + exex_head: ExExHead, + pending_sync: bool, + /// The backfill job to run before consuming any notifications. + backfill_job: Option>, + /// Whether we're currently waiting for the node head to catch up to the same height as the + /// ExEx head. + node_head_catchup_in_progress: bool, } -impl Stream for ExExNotificationsWithHead { - type Item = ExExNotification; +impl ExExNotificationsWithHead +where + P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, + E: BlockExecutorProvider + Clone + Unpin + 'static, +{ + /// Creates a new [`ExExNotificationsWithHead`]. + pub const fn new( + node_head: Head, + provider: P, + executor: E, + notifications: Receiver, + exex_head: ExExHead, + ) -> Self { + Self { + node_head, + provider, + executor, + notifications, + exex_head, + pending_sync: true, + backfill_job: None, + node_head_catchup_in_progress: false, + } + } + + /// Compares the node head against the ExEx head, and synchronizes them in case of a mismatch. + /// + /// Possible situations are: + /// - ExEx is behind the node head (`node_head.number < exex_head.number`). + /// - ExEx is on the canonical chain (`exex_head.hash` is found in the node database). + /// Backfill from the node database. + /// - ExEx is not on the canonical chain (`exex_head.hash` is not found in the node database). + /// Unwind the ExEx to the first block matching between the ExEx and the node, and then + /// bacfkill from the node database. + /// - ExEx is at the same block number (`node_head.number == exex_head.number`). + /// - ExEx is on the canonical chain (`exex_head.hash` is found in the node database). Nothing + /// to do. + /// - ExEx is not on the canonical chain (`exex_head.hash` is not found in the node database). + /// Unwind the ExEx to the first block matching between the ExEx and the node, and then + /// backfill from the node database. + /// - ExEx is ahead of the node head (`node_head.number > exex_head.number`). Wait until the + /// node head catches up to the ExEx head, and then repeat the synchronization process. + fn synchronize(&mut self) -> eyre::Result<()> { + debug!(target: "exex::manager", "Synchronizing ExEx head"); + + let backfill_job_factory = + BackfillJobFactory::new(self.executor.clone(), self.provider.clone()); + match self.exex_head.block.number.cmp(&self.node_head.number) { + std::cmp::Ordering::Less => { + // ExEx is behind the node head + + if let Some(exex_header) = self.provider.header(&self.exex_head.block.hash)? { + // ExEx is on the canonical chain + debug!(target: "exex::manager", "ExEx is behind the node head and on the canonical chain"); + + if exex_header.number != self.exex_head.block.number { + eyre::bail!("ExEx head number does not match the hash") + } + + // ExEx is on the canonical chain, start backfill + let backfill = backfill_job_factory + .backfill(self.exex_head.block.number + 1..=self.node_head.number) + .into_stream(); + self.backfill_job = Some(backfill); + } else { + debug!(target: "exex::manager", "ExEx is behind the node head and not on the canonical chain"); + // ExEx is not on the canonical chain, first unwind it and then backfill + + // TODO(alexey): unwind and backfill + self.backfill_job = None; + } + } + #[allow(clippy::branches_sharing_code)] + std::cmp::Ordering::Equal => { + // ExEx is at the same block height as the node head + + if let Some(exex_header) = self.provider.header(&self.exex_head.block.hash)? { + // ExEx is on the canonical chain + debug!(target: "exex::manager", "ExEx is at the same block height as the node head and on the canonical chain"); + + if exex_header.number != self.exex_head.block.number { + eyre::bail!("ExEx head number does not match the hash") + } + + // ExEx is on the canonical chain and the same as the node head, no need to + // backfill + self.backfill_job = None; + } else { + // ExEx is not on the canonical chain, first unwind it and then backfill + debug!(target: "exex::manager", "ExEx is at the same block height as the node head but not on the canonical chain"); + + // TODO(alexey): unwind and backfill + self.backfill_job = None; + } + } + std::cmp::Ordering::Greater => { + debug!(target: "exex::manager", "ExEx is ahead of the node head"); + + // ExEx is ahead of the node head + + // TODO(alexey): wait until the node head is at the same height as the ExEx head + // and then repeat the process above + self.node_head_catchup_in_progress = true; + } + }; + + Ok(()) + } +} + +impl Stream for ExExNotificationsWithHead +where + P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, + E: BlockExecutorProvider + Clone + Unpin + 'static, +{ + type Item = eyre::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - // TODO(alexey): backfill according to the head + if this.pending_sync { + this.synchronize()?; + this.pending_sync = false; + } + + if let Some(backfill_job) = &mut this.backfill_job { + if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)) { + return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted { + new: Arc::new(chain?), + }))) + } + + // Backfill job is done, remove it + this.backfill_job = None; + } + loop { let Some(notification) = ready!(this.notifications.poll_recv(cx)) else { return Poll::Ready(None) }; + // 1. Either committed or reverted chain from the notification. + // 2. Block number of the tip of the canonical chain: + // - For committed chain, it's the tip block number. + // - For reverted chain, it's the block number preceding the first block in the chain. + let (chain, tip) = notification + .committed_chain() + .map(|chain| (chain.clone(), chain.tip().number)) + .or_else(|| { + notification + .reverted_chain() + .map(|chain| (chain.clone(), chain.first().number - 1)) + }) + .unzip(); + + if this.node_head_catchup_in_progress { + // If we are waiting for the node head to catch up to the same height as the ExEx + // head, then we need to check if the ExEx is on the canonical chain. + + // Query the chain from the new notification for the ExEx head block number. + let exex_head_block = chain + .as_ref() + .and_then(|chain| chain.blocks().get(&this.exex_head.block.number)); + + // Compare the hash of the block from the new notification to the ExEx head + // hash. + if let Some((block, tip)) = exex_head_block.zip(tip) { + if block.hash() == this.exex_head.block.hash { + // ExEx is on the canonical chain, proceed with the notification + this.node_head_catchup_in_progress = false; + } else { + // ExEx is not on the canonical chain, synchronize + let tip = + this.provider.sealed_header(tip)?.ok_or_eyre("node head not found")?; + this.node_head = Head::new( + tip.number, + tip.hash(), + tip.difficulty, + U256::MAX, + tip.timestamp, + ); + this.synchronize()?; + } + } + } + if notification .committed_chain() .or_else(|| notification.reverted_chain()) - .map_or(false, |chain| chain.first().number > this.head.block.number) + .map_or(false, |chain| chain.first().number > this.exex_head.block.number) { - return Poll::Ready(Some(notification)) + return Poll::Ready(Some(Ok(notification))) } } } @@ -606,13 +810,19 @@ impl Clone for ExExManagerHandle { mod tests { use super::*; use futures::StreamExt; - use reth_primitives::{SealedBlockWithSenders, B256}; - use reth_provider::Chain; + use reth_db_common::init::init_genesis; + use reth_evm_ethereum::execute::EthExecutorProvider; + use reth_primitives::{Block, BlockNumHash, Header, SealedBlockWithSenders, B256}; + use reth_provider::{ + providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockReader, + BlockWriter, Chain, + }; + use reth_testing_utils::generators::{self, random_block, BlockParams}; #[tokio::test] async fn test_delivers_events() { let (mut exex_handle, event_tx, mut _notification_rx) = - ExExHandle::new("test_exex".to_string(), ()); + ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); // Send an event and check that it's delivered correctly event_tx.send(ExExEvent::FinishedHeight(42)).unwrap(); @@ -622,7 +832,8 @@ mod tests { #[tokio::test] async fn test_has_exexs() { - let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ()); + let (exex_handle_1, _, _) = + ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ()); assert!(!ExExManager::new(vec![], 0).handle.has_exexs()); @@ -631,7 +842,8 @@ mod tests { #[tokio::test] async fn test_has_capacity() { - let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ()); + let (exex_handle_1, _, _) = + ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ()); assert!(!ExExManager::new(vec![], 0).handle.has_capacity()); @@ -640,7 +852,7 @@ mod tests { #[test] fn test_push_notification() { - let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), ()); + let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); // Create a mock ExExManager and add the exex_handle to it let mut exex_manager = ExExManager::new(vec![exex_handle], 10); @@ -685,7 +897,7 @@ mod tests { #[test] fn test_update_capacity() { - let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), ()); + let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); // Create a mock ExExManager and add the exex_handle to it let max_capacity = 5; @@ -720,7 +932,7 @@ mod tests { #[tokio::test] async fn test_updates_block_height() { let (exex_handle, event_tx, mut _notification_rx) = - ExExHandle::new("test_exex".to_string(), ()); + ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); // Check initial block height assert!(exex_handle.finished_height.is_none()); @@ -757,8 +969,10 @@ mod tests { #[tokio::test] async fn test_updates_block_height_lower() { // Create two `ExExHandle` instances - let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string(), ()); - let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string(), ()); + let (exex_handle1, event_tx1, _) = + ExExHandle::new("test_exex1".to_string(), Head::default(), (), ()); + let (exex_handle2, event_tx2, _) = + ExExHandle::new("test_exex2".to_string(), Head::default(), (), ()); // Send events to update the block heights of the two handles, with the second being lower event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap(); @@ -788,8 +1002,10 @@ mod tests { #[tokio::test] async fn test_updates_block_height_greater() { // Create two `ExExHandle` instances - let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string(), ()); - let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string(), ()); + let (exex_handle1, event_tx1, _) = + ExExHandle::new("test_exex1".to_string(), Head::default(), (), ()); + let (exex_handle2, event_tx2, _) = + ExExHandle::new("test_exex2".to_string(), Head::default(), (), ()); // Assert that the initial block height is `None` for the first `ExExHandle`. assert!(exex_handle1.finished_height.is_none()); @@ -825,7 +1041,8 @@ mod tests { #[tokio::test] async fn test_exex_manager_capacity() { - let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ()); + let (exex_handle_1, _, _) = + ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ()); // Create an ExExManager with a small max capacity let max_capacity = 2; @@ -863,7 +1080,8 @@ mod tests { #[tokio::test] async fn exex_handle_new() { - let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ()); + let (mut exex_handle, _, mut notifications) = + ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); // Check initial state assert_eq!(exex_handle.id, "test_exex"); @@ -905,7 +1123,8 @@ mod tests { #[tokio::test] async fn test_notification_if_finished_height_gt_chain_tip() { - let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ()); + let (mut exex_handle, _, mut notifications) = + ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); // Set finished_height to a value higher than the block tip exex_handle.finished_height = Some(15); @@ -946,7 +1165,8 @@ mod tests { #[tokio::test] async fn test_sends_chain_reorged_notification() { - let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ()); + let (mut exex_handle, _, mut notifications) = + ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); let notification = ExExNotification::ChainReorged { old: Arc::new(Chain::default()), @@ -976,7 +1196,8 @@ mod tests { #[tokio::test] async fn test_sends_chain_reverted_notification() { - let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ()); + let (mut exex_handle, _, mut notifications) = + ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) }; @@ -1000,4 +1221,220 @@ mod tests { // Ensure the notification ID was incremented assert_eq!(exex_handle.next_notification_id, 23); } + + #[tokio::test] + async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> { + let mut rng = generators::rng(); + + let provider_factory = create_test_provider_factory(); + let genesis_hash = init_genesis(&provider_factory)?; + let genesis_block = provider_factory + .block(genesis_hash.into())? + .ok_or_else(|| eyre::eyre!("genesis block not found"))?; + + let provider = BlockchainProvider2::new(provider_factory.clone())?; + + let node_head_block = random_block( + &mut rng, + genesis_block.number + 1, + BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() }, + ); + let provider_rw = provider_factory.provider_rw()?; + provider_rw.insert_block( + node_head_block.clone().seal_with_senders().ok_or_eyre("failed to recover senders")?, + )?; + provider_rw.commit()?; + + let node_head = Head { + number: node_head_block.number, + hash: node_head_block.hash(), + ..Default::default() + }; + let exex_head = + ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } }; + + let notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![random_block( + &mut rng, + node_head.number + 1, + BlockParams { parent: Some(node_head.hash), ..Default::default() }, + ) + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?], + Default::default(), + None, + )), + }; + + let (notifications_tx, notifications_rx) = mpsc::channel(1); + + notifications_tx.send(notification.clone()).await?; + + let mut notifications = ExExNotifications::new( + node_head, + provider, + EthExecutorProvider::mainnet(), + notifications_rx, + ) + .with_head(exex_head); + + // First notification is the backfill of missing blocks from the canonical chain + assert_eq!( + notifications.next().await.transpose()?, + Some(ExExNotification::ChainCommitted { + new: Arc::new( + BackfillJobFactory::new( + notifications.executor.clone(), + notifications.provider.clone() + ) + .backfill(1..=1) + .next() + .ok_or_eyre("failed to backfill")?? + ) + }) + ); + + // Second notification is the actual notification that we sent before + assert_eq!(notifications.next().await.transpose()?, Some(notification)); + + Ok(()) + } + + #[ignore] + #[tokio::test] + async fn exex_notifications_behind_head_non_canonical() -> eyre::Result<()> { + Ok(()) + } + + #[tokio::test] + async fn exex_notifications_same_head_canonical() -> eyre::Result<()> { + let provider_factory = create_test_provider_factory(); + let genesis_hash = init_genesis(&provider_factory)?; + let genesis_block = provider_factory + .block(genesis_hash.into())? + .ok_or_else(|| eyre::eyre!("genesis block not found"))?; + + let provider = BlockchainProvider2::new(provider_factory)?; + + let node_head = + Head { number: genesis_block.number, hash: genesis_hash, ..Default::default() }; + let exex_head = + ExExHead { block: BlockNumHash { number: node_head.number, hash: node_head.hash } }; + + let notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![Block { + header: Header { + parent_hash: node_head.hash, + number: node_head.number + 1, + ..Default::default() + }, + ..Default::default() + } + .seal_slow() + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?], + Default::default(), + None, + )), + }; + + let (notifications_tx, notifications_rx) = mpsc::channel(1); + + notifications_tx.send(notification.clone()).await?; + + let mut notifications = ExExNotifications::new( + node_head, + provider, + EthExecutorProvider::mainnet(), + notifications_rx, + ) + .with_head(exex_head); + + let new_notification = notifications.next().await.transpose()?; + assert_eq!(new_notification, Some(notification)); + + Ok(()) + } + + #[ignore] + #[tokio::test] + async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> { + Ok(()) + } + + #[tokio::test] + async fn test_notifications_ahead_of_head() -> eyre::Result<()> { + let mut rng = generators::rng(); + + let provider_factory = create_test_provider_factory(); + let genesis_hash = init_genesis(&provider_factory)?; + let genesis_block = provider_factory + .block(genesis_hash.into())? + .ok_or_else(|| eyre::eyre!("genesis block not found"))?; + + let provider = BlockchainProvider2::new(provider_factory)?; + + let exex_head_block = random_block( + &mut rng, + genesis_block.number + 1, + BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() }, + ); + + let node_head = + Head { number: genesis_block.number, hash: genesis_hash, ..Default::default() }; + let exex_head = ExExHead { + block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() }, + }; + + let (notifications_tx, notifications_rx) = mpsc::channel(1); + + notifications_tx + .send(ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![exex_head_block + .clone() + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?], + Default::default(), + None, + )), + }) + .await?; + + let mut notifications = ExExNotifications::new( + node_head, + provider, + EthExecutorProvider::mainnet(), + notifications_rx, + ) + .with_head(exex_head); + + // First notification is skipped because the node is catching up with the ExEx + let new_notification = poll_fn(|cx| Poll::Ready(notifications.poll_next_unpin(cx))).await; + assert!(new_notification.is_pending()); + + // Imitate the node catching up with the ExEx by sending a notification for the missing + // block + let notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![random_block( + &mut rng, + exex_head_block.number + 1, + BlockParams { parent: Some(exex_head_block.hash()), ..Default::default() }, + ) + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?], + Default::default(), + None, + )), + }; + notifications_tx.send(notification.clone()).await?; + + // Second notification is received because the node caught up with the ExEx + assert_eq!(notifications.next().await.transpose()?, Some(notification)); + + Ok(()) + } } diff --git a/crates/exex/test-utils/src/lib.rs b/crates/exex/test-utils/src/lib.rs index 1d9d18039215..ab1958cc86c7 100644 --- a/crates/exex/test-utils/src/lib.rs +++ b/crates/exex/test-utils/src/lib.rs @@ -296,7 +296,12 @@ pub async fn test_exex_context_with_chain_spec( let (events_tx, events_rx) = tokio::sync::mpsc::unbounded_channel(); let (notifications_tx, notifications_rx) = tokio::sync::mpsc::channel(1); - let notifications = ExExNotifications::new(components.clone(), notifications_rx); + let notifications = ExExNotifications::new( + head, + components.provider.clone(), + components.components.executor.clone(), + notifications_rx, + ); let ctx = ExExContext { head, diff --git a/crates/exex/types/src/head.rs b/crates/exex/types/src/head.rs index 1552c2380fdb..3e67b1eca592 100644 --- a/crates/exex/types/src/head.rs +++ b/crates/exex/types/src/head.rs @@ -1,8 +1,7 @@ use reth_primitives::BlockNumHash; -#[allow(clippy::doc_markdown)] /// A head of the ExEx. It determines the highest block committed to the internal ExEx state. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct ExExHead { /// The head block. pub block: BlockNumHash, diff --git a/crates/node/builder/src/launch/exex.rs b/crates/node/builder/src/launch/exex.rs index 798ddea2d829..c3f842e5dffd 100644 --- a/crates/node/builder/src/launch/exex.rs +++ b/crates/node/builder/src/launch/exex.rs @@ -48,7 +48,12 @@ impl ExExLauncher { for (id, exex) in extensions { // create a new exex handle - let (handle, events, notifications) = ExExHandle::new(id.clone(), components.clone()); + let (handle, events, notifications) = ExExHandle::new( + id.clone(), + head, + components.provider().clone(), + components.block_executor().clone(), + ); exex_handles.push(handle); // create the launch context for the exex diff --git a/crates/prune/prune/src/pruner.rs b/crates/prune/prune/src/pruner.rs index 0fb388b1e20c..d21560cae607 100644 --- a/crates/prune/prune/src/pruner.rs +++ b/crates/prune/prune/src/pruner.rs @@ -320,7 +320,6 @@ where /// /// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data /// to prune. - #[allow(clippy::doc_markdown)] pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult { let provider = self.provider_factory.database_provider_rw()?; let result = self.run_with_provider(&provider, tip_block_number);