From 76f321bb800a6b54aec59c270ef33b52d957a096 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 9 Sep 2024 15:57:21 +0100 Subject: [PATCH 01/16] feat(exex): backfill on subscription with head --- crates/exex/exex/Cargo.toml | 2 +- crates/exex/exex/src/manager.rs | 213 ++++++++++++++++++++----- crates/exex/test-utils/src/lib.rs | 2 +- crates/exex/types/src/head.rs | 2 +- crates/node/builder/src/launch/exex.rs | 3 +- 5 files changed, 180 insertions(+), 42 deletions(-) diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index f3decd337417..4e082c4573e8 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] ## reth +reth-chainspec.workspace = true reth-config.workspace = true reth-evm.workspace = true reth-exex-types.workspace = true @@ -40,7 +41,6 @@ metrics.workspace = true [dev-dependencies] reth-blockchain-tree.workspace = true -reth-chainspec.workspace = true reth-db-api.workspace = true reth-db-common.workspace = true reth-evm-ethereum.workspace = true diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 48a77d786275..4fc1ebea58f2 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1,9 +1,14 @@ -use crate::{ExExEvent, ExExNotification, FinishedExExHeight}; -use futures::Stream; +use crate::{ + BackfillJobFactory, ExExEvent, ExExNotification, FinishedExExHeight, StreamBackfillJob, +}; +use futures::{Stream, StreamExt}; use metrics::Gauge; +use reth_chainspec::Head; use reth_exex_types::ExExHead; use reth_metrics::{metrics::Counter, Metrics}; -use reth_primitives::BlockNumber; +use reth_node_api::FullNodeComponents; +use reth_primitives::{BlockNumber, U256}; +use reth_provider::{Chain, HeaderProvider}; use reth_tracing::tracing::debug; use std::{ collections::VecDeque, @@ -62,11 +67,13 @@ impl ExExHandle { /// [`Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`. pub fn new( id: String, + node_head: Head, components: Node, ) -> (Self, UnboundedSender, ExExNotifications) { let (notification_tx, notification_rx) = mpsc::channel(1); let (event_tx, event_rx) = mpsc::unbounded_channel(); - let notifications = ExExNotifications { components, notifications: notification_rx }; + let notifications = + ExExNotifications { node_head, components, notifications: notification_rx }; ( Self { @@ -146,6 +153,7 @@ impl ExExHandle { /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. pub struct ExExNotifications { + node_head: Head, components: Node, notifications: Receiver, } @@ -159,10 +167,14 @@ impl Debug for ExExNotifications { } } -impl ExExNotifications { +impl ExExNotifications { /// Creates a new instance of [`ExExNotifications`]. - pub const fn new(components: Node, notifications: Receiver) -> Self { - Self { components, notifications } + pub const fn new( + node_head: Head, + components: Node, + notifications: Receiver, + ) -> Self { + Self { node_head, components, notifications } } /// Receives the next value for this receiver. @@ -221,12 +233,8 @@ impl ExExNotifications { /// Notifications will be sent starting from the head, not inclusive. For example, if /// `head.number == 10`, then the first notification will be with `block.number == 11`. #[allow(dead_code)] - fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { - ExExNotificationsWithHead { - components: self.components, - notifications: self.notifications, - head, - } + fn with_head(self, head: ExExHead) -> eyre::Result> { + ExExNotificationsWithHead::new(self.node_head, self.components, self.notifications, head) } } @@ -241,31 +249,152 @@ impl Stream for ExExNotifications { /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are /// committed or reverted after the given head. #[derive(Debug)] -pub struct ExExNotificationsWithHead { +pub struct ExExNotificationsWithHead { #[allow(dead_code)] components: Node, notifications: Receiver, head: ExExHead, + backfill_job: Option>, + compare_head: bool, } -impl Stream for ExExNotificationsWithHead { - type Item = ExExNotification; +impl ExExNotificationsWithHead { + /// Creates a new [`ExExNotificationsWithHead`]. + pub fn new( + node_head: Head, + components: Node, + notifications: Receiver, + exex_head: ExExHead, + ) -> eyre::Result { + let mut notifications = Self { + components, + notifications, + head: exex_head, + backfill_job: None, + compare_head: false, + }; + + notifications.heal(node_head)?; + + Ok(notifications) + } + + fn heal(&mut self, node_head: Head) -> eyre::Result<()> { + let backfill_job_factory = BackfillJobFactory::new( + self.components.block_executor().clone(), + self.components.provider().clone(), + ); + match self.head.block.number.cmp(&node_head.number) { + std::cmp::Ordering::Less => { + // ExEx is behind the node head + + if let Some(exex_header) = + self.components.provider().header(&self.head.block.hash)? + { + // ExEx is on the canonical chain + + if exex_header.number != self.head.block.number { + eyre::bail!("ExEx head number does not match the hash") + } + + // ExEx is on the canonical chain, start backfill + let backfill = backfill_job_factory + .backfill(self.head.block.number + 1..=node_head.number) + .into_stream(); + self.backfill_job = Some(backfill); + } else { + // ExEx is not on the canonical chain, first unwind it and then backfill + + // TODO(alexey): unwind and backfill + self.backfill_job = None; + } + } + std::cmp::Ordering::Equal => { + // ExEx is at the same block height as the node head + + if let Some(exex_header) = + self.components.provider().header(&self.head.block.hash)? + { + // ExEx is on the canonical chain + + if exex_header.number != self.head.block.number { + eyre::bail!("ExEx head number does not match the hash") + } + + // ExEx is on the canonical chain and the same as the node head, no need to + // backfill + self.backfill_job = None; + } else { + // ExEx is not on the canonical chain, first unwind it and then backfill + + // TODO(alexey): unwind and backfill + self.backfill_job = None; + } + } + std::cmp::Ordering::Greater => { + // ExEx is ahead of the node head + + // TODO(alexey): wait until the node head is at the same height as the ExEx head + // and then repeat the process above + self.compare_head = true; + } + }; + + Ok(()) + } +} + +impl Stream for ExExNotificationsWithHead { + type Item = eyre::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - // TODO(alexey): backfill according to the head + if let Some(backfill_job) = &mut this.backfill_job { + if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)) { + return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted { + new: Arc::new(chain?), + }))) + } + + // Backfill job is done, remove it + this.backfill_job = None; + } + loop { let Some(notification) = ready!(this.notifications.poll_recv(cx)) else { return Poll::Ready(None) }; - if notification - .committed_chain() - .or_else(|| notification.reverted_chain()) - .map_or(false, |chain| chain.first().number > this.head.block.number) - { - return Poll::Ready(Some(notification)) + let chain = notification.committed_chain().or_else(|| notification.reverted_chain()); + + if this.compare_head { + if let Some(block) = + chain.as_ref().and_then(|chain| chain.blocks().get(&this.head.block.number)) + { + if block.hash() == this.head.block.hash { + // ExEx is on the canonical chain + this.compare_head = false; + } else { + // ExEx is not on the canonical chain, heal + let total_difficulty = this + .components + .provider() + .header_td_by_number(block.number)? + .unwrap_or(U256::MAX); + this.heal(Head::new( + block.number, + block.hash(), + block.difficulty, + total_difficulty, + block.timestamp, + ))?; + } + } + } + + if chain.map_or(false, |chain| chain.first().number > this.head.block.number) { + return Poll::Ready(Some(Ok(notification))) } } } @@ -612,7 +741,7 @@ mod tests { #[tokio::test] async fn test_delivers_events() { let (mut exex_handle, event_tx, mut _notification_rx) = - ExExHandle::new("test_exex".to_string(), ()); + ExExHandle::new("test_exex".to_string(), Head::default(), ()); // Send an event and check that it's delivered correctly event_tx.send(ExExEvent::FinishedHeight(42)).unwrap(); @@ -622,7 +751,7 @@ mod tests { #[tokio::test] async fn test_has_exexs() { - let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ()); + let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), Head::default(), ()); assert!(!ExExManager::new(vec![], 0).handle.has_exexs()); @@ -631,7 +760,7 @@ mod tests { #[tokio::test] async fn test_has_capacity() { - let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ()); + let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), Head::default(), ()); assert!(!ExExManager::new(vec![], 0).handle.has_capacity()); @@ -640,7 +769,7 @@ mod tests { #[test] fn test_push_notification() { - let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), ()); + let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), ()); // Create a mock ExExManager and add the exex_handle to it let mut exex_manager = ExExManager::new(vec![exex_handle], 10); @@ -685,7 +814,7 @@ mod tests { #[test] fn test_update_capacity() { - let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), ()); + let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), ()); // Create a mock ExExManager and add the exex_handle to it let max_capacity = 5; @@ -720,7 +849,7 @@ mod tests { #[tokio::test] async fn test_updates_block_height() { let (exex_handle, event_tx, mut _notification_rx) = - ExExHandle::new("test_exex".to_string(), ()); + ExExHandle::new("test_exex".to_string(), Head::default(), ()); // Check initial block height assert!(exex_handle.finished_height.is_none()); @@ -757,8 +886,10 @@ mod tests { #[tokio::test] async fn test_updates_block_height_lower() { // Create two `ExExHandle` instances - let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string(), ()); - let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string(), ()); + let (exex_handle1, event_tx1, _) = + ExExHandle::new("test_exex1".to_string(), Head::default(), ()); + let (exex_handle2, event_tx2, _) = + ExExHandle::new("test_exex2".to_string(), Head::default(), ()); // Send events to update the block heights of the two handles, with the second being lower event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap(); @@ -788,8 +919,10 @@ mod tests { #[tokio::test] async fn test_updates_block_height_greater() { // Create two `ExExHandle` instances - let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string(), ()); - let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string(), ()); + let (exex_handle1, event_tx1, _) = + ExExHandle::new("test_exex1".to_string(), Head::default(), ()); + let (exex_handle2, event_tx2, _) = + ExExHandle::new("test_exex2".to_string(), Head::default(), ()); // Assert that the initial block height is `None` for the first `ExExHandle`. assert!(exex_handle1.finished_height.is_none()); @@ -825,7 +958,7 @@ mod tests { #[tokio::test] async fn test_exex_manager_capacity() { - let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ()); + let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), Head::default(), ()); // Create an ExExManager with a small max capacity let max_capacity = 2; @@ -863,7 +996,8 @@ mod tests { #[tokio::test] async fn exex_handle_new() { - let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ()); + let (mut exex_handle, _, mut notifications) = + ExExHandle::new("test_exex".to_string(), Head::default(), ()); // Check initial state assert_eq!(exex_handle.id, "test_exex"); @@ -905,7 +1039,8 @@ mod tests { #[tokio::test] async fn test_notification_if_finished_height_gt_chain_tip() { - let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ()); + let (mut exex_handle, _, mut notifications) = + ExExHandle::new("test_exex".to_string(), Head::default(), ()); // Set finished_height to a value higher than the block tip exex_handle.finished_height = Some(15); @@ -946,7 +1081,8 @@ mod tests { #[tokio::test] async fn test_sends_chain_reorged_notification() { - let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ()); + let (mut exex_handle, _, mut notifications) = + ExExHandle::new("test_exex".to_string(), Head::default(), ()); let notification = ExExNotification::ChainReorged { old: Arc::new(Chain::default()), @@ -976,7 +1112,8 @@ mod tests { #[tokio::test] async fn test_sends_chain_reverted_notification() { - let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ()); + let (mut exex_handle, _, mut notifications) = + ExExHandle::new("test_exex".to_string(), Head::default(), ()); let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) }; diff --git a/crates/exex/test-utils/src/lib.rs b/crates/exex/test-utils/src/lib.rs index dd4907649c3a..90bfd2fa258e 100644 --- a/crates/exex/test-utils/src/lib.rs +++ b/crates/exex/test-utils/src/lib.rs @@ -296,7 +296,7 @@ pub async fn test_exex_context_with_chain_spec( let (events_tx, events_rx) = tokio::sync::mpsc::unbounded_channel(); let (notifications_tx, notifications_rx) = tokio::sync::mpsc::channel(1); - let notifications = ExExNotifications::new(components.clone(), notifications_rx); + let notifications = ExExNotifications::new(head, components.clone(), notifications_rx); let ctx = ExExContext { head, diff --git a/crates/exex/types/src/head.rs b/crates/exex/types/src/head.rs index 1552c2380fdb..eb5a989833fa 100644 --- a/crates/exex/types/src/head.rs +++ b/crates/exex/types/src/head.rs @@ -2,7 +2,7 @@ use reth_primitives::BlockNumHash; #[allow(clippy::doc_markdown)] /// A head of the ExEx. It determines the highest block committed to the internal ExEx state. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct ExExHead { /// The head block. pub block: BlockNumHash, diff --git a/crates/node/builder/src/launch/exex.rs b/crates/node/builder/src/launch/exex.rs index 798ddea2d829..94c34b566ff2 100644 --- a/crates/node/builder/src/launch/exex.rs +++ b/crates/node/builder/src/launch/exex.rs @@ -48,7 +48,8 @@ impl ExExLauncher { for (id, exex) in extensions { // create a new exex handle - let (handle, events, notifications) = ExExHandle::new(id.clone(), components.clone()); + let (handle, events, notifications) = + ExExHandle::new(id.clone(), head, components.clone()); exex_handles.push(handle); // create the launch context for the exex From 8ff9b7659c99885c6693674b5378d3085cbb697d Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 9 Sep 2024 16:28:01 +0100 Subject: [PATCH 02/16] silence clippy --- crates/exex/exex/src/manager.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 4fc1ebea58f2..c2a26cf873f4 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -309,6 +309,7 @@ impl ExExNotificationsWithHead { self.backfill_job = None; } } + #[allow(clippy::branches_sharing_code)] std::cmp::Ordering::Equal => { // ExEx is at the same block height as the node head From c48a07cf8efff30651a065f0c847c9b9a6dfe410 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 11 Sep 2024 10:52:15 +0100 Subject: [PATCH 03/16] fix wait for head logic --- clippy.toml | 1 + crates/exex/exex/src/manager.rs | 58 ++++++++++++++++++++++---------- crates/exex/types/src/head.rs | 1 - crates/prune/prune/src/pruner.rs | 2 -- 4 files changed, 42 insertions(+), 20 deletions(-) diff --git a/clippy.toml b/clippy.toml index 8105f3a75343..b498158094f9 100644 --- a/clippy.toml +++ b/clippy.toml @@ -1,2 +1,3 @@ msrv = "1.81" too-large-for-stack = 128 +doc-valid-idents = ["P2P", "ExEx", "ExExes", "IPv4", "IPv6", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"] diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index c2a26cf873f4..3ef9b9d56e30 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1,6 +1,7 @@ use crate::{ BackfillJobFactory, ExExEvent, ExExNotification, FinishedExExHeight, StreamBackfillJob, }; +use eyre::OptionExt; use futures::{Stream, StreamExt}; use metrics::Gauge; use reth_chainspec::Head; @@ -254,8 +255,11 @@ pub struct ExExNotificationsWithHead { components: Node, notifications: Receiver, head: ExExHead, + /// The backfill job to run before consuming any notifications. backfill_job: Option>, - compare_head: bool, + /// Whether to wait for the node head to be at the same height as the ExEx head, and then + /// call the [`Self::heal`]. + wait_for_head: bool, } impl ExExNotificationsWithHead { @@ -271,7 +275,7 @@ impl ExExNotificationsWithHead { notifications, head: exex_head, backfill_job: None, - compare_head: false, + wait_for_head: false, }; notifications.heal(node_head)?; @@ -337,7 +341,7 @@ impl ExExNotificationsWithHead { // TODO(alexey): wait until the node head is at the same height as the ExEx head // and then repeat the process above - self.compare_head = true; + self.wait_for_head = true; } }; @@ -367,34 +371,54 @@ impl Stream for ExExNotificationsWithHead this.head.block.number) { + if notification + .committed_chain() + .or_else(|| notification.reverted_chain()) + .map_or(false, |chain| chain.first().number > this.head.block.number) + { return Poll::Ready(Some(Ok(notification))) } } diff --git a/crates/exex/types/src/head.rs b/crates/exex/types/src/head.rs index eb5a989833fa..3e67b1eca592 100644 --- a/crates/exex/types/src/head.rs +++ b/crates/exex/types/src/head.rs @@ -1,6 +1,5 @@ use reth_primitives::BlockNumHash; -#[allow(clippy::doc_markdown)] /// A head of the ExEx. It determines the highest block committed to the internal ExEx state. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct ExExHead { diff --git a/crates/prune/prune/src/pruner.rs b/crates/prune/prune/src/pruner.rs index f47705211c7a..4507fee20e03 100644 --- a/crates/prune/prune/src/pruner.rs +++ b/crates/prune/prune/src/pruner.rs @@ -305,7 +305,6 @@ impl Pruner { /// /// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data /// to prune. - #[allow(clippy::doc_markdown)] pub fn run( &mut self, provider: &DatabaseProviderRW, @@ -321,7 +320,6 @@ impl Pruner> { /// /// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data /// to prune. - #[allow(clippy::doc_markdown)] pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult { let provider = self.provider_factory.provider_rw()?; let result = self.run_with_provider(&provider, tip_block_number); From 86b56c7acbc404b0d6d6f501b5f7289a8ebd1b22 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 16 Sep 2024 13:06:58 +0100 Subject: [PATCH 04/16] address some reviews --- crates/exex/exex/src/manager.rs | 66 +++++++++++++++++---------------- 1 file changed, 34 insertions(+), 32 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 3ef9b9d56e30..601b1639c182 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -168,7 +168,7 @@ impl Debug for ExExNotifications { } } -impl ExExNotifications { +impl ExExNotifications { /// Creates a new instance of [`ExExNotifications`]. pub const fn new( node_head: Head, @@ -227,14 +227,16 @@ impl ExExNotifications { pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { self.notifications.poll_recv(cx) } +} +impl ExExNotifications { // TODO(alexey): make it public when backfill is implemented in [`ExExNotificationsWithHead`] /// Subscribe to notifications with the given head. /// /// Notifications will be sent starting from the head, not inclusive. For example, if /// `head.number == 10`, then the first notification will be with `block.number == 11`. #[allow(dead_code)] - fn with_head(self, head: ExExHead) -> eyre::Result> { + fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { ExExNotificationsWithHead::new(self.node_head, self.components, self.notifications, head) } } @@ -254,33 +256,33 @@ pub struct ExExNotificationsWithHead { #[allow(dead_code)] components: Node, notifications: Receiver, - head: ExExHead, + node_head: Head, + exex_head: ExExHead, + pending_heal: bool, /// The backfill job to run before consuming any notifications. backfill_job: Option>, /// Whether to wait for the node head to be at the same height as the ExEx head, and then /// call the [`Self::heal`]. - wait_for_head: bool, + pending_node_head_catchup: bool, } impl ExExNotificationsWithHead { /// Creates a new [`ExExNotificationsWithHead`]. - pub fn new( + pub const fn new( node_head: Head, components: Node, notifications: Receiver, exex_head: ExExHead, - ) -> eyre::Result { - let mut notifications = Self { + ) -> Self { + Self { components, notifications, - head: exex_head, + node_head, + exex_head, + pending_heal: true, backfill_job: None, - wait_for_head: false, - }; - - notifications.heal(node_head)?; - - Ok(notifications) + pending_node_head_catchup: false, + } } fn heal(&mut self, node_head: Head) -> eyre::Result<()> { @@ -288,22 +290,22 @@ impl ExExNotificationsWithHead { self.components.block_executor().clone(), self.components.provider().clone(), ); - match self.head.block.number.cmp(&node_head.number) { + match self.exex_head.block.number.cmp(&node_head.number) { std::cmp::Ordering::Less => { // ExEx is behind the node head if let Some(exex_header) = - self.components.provider().header(&self.head.block.hash)? + self.components.provider().header(&self.exex_head.block.hash)? { // ExEx is on the canonical chain - if exex_header.number != self.head.block.number { + if exex_header.number != self.exex_head.block.number { eyre::bail!("ExEx head number does not match the hash") } // ExEx is on the canonical chain, start backfill let backfill = backfill_job_factory - .backfill(self.head.block.number + 1..=node_head.number) + .backfill(self.exex_head.block.number + 1..=node_head.number) .into_stream(); self.backfill_job = Some(backfill); } else { @@ -318,11 +320,11 @@ impl ExExNotificationsWithHead { // ExEx is at the same block height as the node head if let Some(exex_header) = - self.components.provider().header(&self.head.block.hash)? + self.components.provider().header(&self.exex_head.block.hash)? { // ExEx is on the canonical chain - if exex_header.number != self.head.block.number { + if exex_header.number != self.exex_head.block.number { eyre::bail!("ExEx head number does not match the hash") } @@ -341,7 +343,7 @@ impl ExExNotificationsWithHead { // TODO(alexey): wait until the node head is at the same height as the ExEx head // and then repeat the process above - self.wait_for_head = true; + self.pending_node_head_catchup = true; } }; @@ -355,6 +357,11 @@ impl Stream for ExExNotificationsWithHead, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); + if this.pending_heal { + this.heal(this.node_head)?; + this.pending_heal = false; + } + if let Some(backfill_job) = &mut this.backfill_job { if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)) { return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted { @@ -380,17 +387,17 @@ impl Stream for ExExNotificationsWithHead Stream for ExExNotificationsWithHead Stream for ExExNotificationsWithHead this.head.block.number) + .map_or(false, |chain| chain.first().number > this.exex_head.block.number) { return Poll::Ready(Some(Ok(notification))) } From 492613ca3a5b3ff4df3fed7350e229614c9783ac Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 16 Sep 2024 13:26:07 +0100 Subject: [PATCH 05/16] minor renames --- crates/exex/exex/src/manager.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 601b1639c182..0336c38b632e 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -254,16 +254,16 @@ impl Stream for ExExNotifications { #[derive(Debug)] pub struct ExExNotificationsWithHead { #[allow(dead_code)] + node_head: Head, components: Node, notifications: Receiver, - node_head: Head, exex_head: ExExHead, pending_heal: bool, /// The backfill job to run before consuming any notifications. backfill_job: Option>, - /// Whether to wait for the node head to be at the same height as the ExEx head, and then - /// call the [`Self::heal`]. - pending_node_head_catchup: bool, + /// Whether we're currently waiting for the node head to catch up to the same height as the + /// ExEx head. + node_head_catchup_in_progress: bool, } impl ExExNotificationsWithHead { @@ -275,13 +275,13 @@ impl ExExNotificationsWithHead { exex_head: ExExHead, ) -> Self { Self { + node_head, components, notifications, - node_head, exex_head, pending_heal: true, backfill_job: None, - pending_node_head_catchup: false, + node_head_catchup_in_progress: false, } } @@ -343,7 +343,7 @@ impl ExExNotificationsWithHead { // TODO(alexey): wait until the node head is at the same height as the ExEx head // and then repeat the process above - self.pending_node_head_catchup = true; + self.node_head_catchup_in_progress = true; } }; @@ -387,7 +387,7 @@ impl Stream for ExExNotificationsWithHead Stream for ExExNotificationsWithHead Date: Mon, 16 Sep 2024 13:39:19 +0100 Subject: [PATCH 06/16] simplify the canonical check logic --- crates/exex/exex/src/manager.rs | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 0336c38b632e..4caafcc81ba3 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -378,23 +378,32 @@ impl Stream for ExExNotificationsWithHead Stream for ExExNotificationsWithHead Date: Mon, 16 Sep 2024 13:51:12 +0100 Subject: [PATCH 07/16] describe sync process in more detail --- crates/exex/exex/src/manager.rs | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 4caafcc81ba3..819794342a7f 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -258,7 +258,7 @@ pub struct ExExNotificationsWithHead { components: Node, notifications: Receiver, exex_head: ExExHead, - pending_heal: bool, + pending_sync: bool, /// The backfill job to run before consuming any notifications. backfill_job: Option>, /// Whether we're currently waiting for the node head to catch up to the same height as the @@ -279,13 +279,30 @@ impl ExExNotificationsWithHead { components, notifications, exex_head, - pending_heal: true, + pending_sync: true, backfill_job: None, node_head_catchup_in_progress: false, } } - fn heal(&mut self, node_head: Head) -> eyre::Result<()> { + /// Compares the node head against the ExEx head, and synchronizes them in case of a mismatch. + /// + /// Possible situations are: + /// - ExEx is behind the node head (`node_head.number < exex_head.number`). + /// - ExEx is on the canonical chain (`exex_head.hash` is found in the node database). + /// Backfill from the node database. + /// - ExEx is not on the canonical chain (`exex_head.hash` is not found in the node database). + /// Unwind the ExEx to the first block matching between the ExEx and the node, and then + /// bacfkill from the node database. + /// - ExEx is at the same block number (`node_head.number == exex_head.number`). + /// - ExEx is on the canonical chain (`exex_head.hash` is found in the node database). Nothing + /// to do. + /// - ExEx is not on the canonical chain (`exex_head.hash` is not found in the node database). + /// Unwind the ExEx to the first block matching between the ExEx and the node, and then + /// backfill from the node database. + /// - ExEx is ahead of the node head (`node_head.number > exex_head.number`). Wait until the + /// node head catches up to the ExEx head, and then repeat the synchronization process. + fn synchronize(&mut self, node_head: Head) -> eyre::Result<()> { let backfill_job_factory = BackfillJobFactory::new( self.components.block_executor().clone(), self.components.provider().clone(), @@ -357,9 +374,9 @@ impl Stream for ExExNotificationsWithHead, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - if this.pending_heal { - this.heal(this.node_head)?; - this.pending_heal = false; + if this.pending_sync { + this.synchronize(this.node_head)?; + this.pending_sync = false; } if let Some(backfill_job) = &mut this.backfill_job { @@ -408,13 +425,13 @@ impl Stream for ExExNotificationsWithHead Date: Mon, 16 Sep 2024 14:31:13 +0100 Subject: [PATCH 08/16] update head when node caught up --- crates/exex/exex/src/manager.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 819794342a7f..7f9645414be3 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -253,7 +253,6 @@ impl Stream for ExExNotifications { /// committed or reverted after the given head. #[derive(Debug)] pub struct ExExNotificationsWithHead { - #[allow(dead_code)] node_head: Head, components: Node, notifications: Receiver, @@ -302,12 +301,12 @@ impl ExExNotificationsWithHead { /// backfill from the node database. /// - ExEx is ahead of the node head (`node_head.number > exex_head.number`). Wait until the /// node head catches up to the ExEx head, and then repeat the synchronization process. - fn synchronize(&mut self, node_head: Head) -> eyre::Result<()> { + fn synchronize(&mut self) -> eyre::Result<()> { let backfill_job_factory = BackfillJobFactory::new( self.components.block_executor().clone(), self.components.provider().clone(), ); - match self.exex_head.block.number.cmp(&node_head.number) { + match self.exex_head.block.number.cmp(&self.node_head.number) { std::cmp::Ordering::Less => { // ExEx is behind the node head @@ -322,7 +321,7 @@ impl ExExNotificationsWithHead { // ExEx is on the canonical chain, start backfill let backfill = backfill_job_factory - .backfill(self.exex_head.block.number + 1..=node_head.number) + .backfill(self.exex_head.block.number + 1..=self.node_head.number) .into_stream(); self.backfill_job = Some(backfill); } else { @@ -375,7 +374,7 @@ impl Stream for ExExNotificationsWithHead Stream for ExExNotificationsWithHead Date: Mon, 16 Sep 2024 17:38:05 +0100 Subject: [PATCH 09/16] wip tests --- Cargo.lock | 1 + crates/exex/exex/Cargo.toml | 1 + crates/exex/exex/src/manager.rs | 120 ++++++++++++++++++++++++++++-- crates/exex/test-utils/src/lib.rs | 52 ++++++++----- 4 files changed, 152 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5b4ccbd7e37a..bd126cd3ac05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7292,6 +7292,7 @@ dependencies = [ "reth-db-common", "reth-evm", "reth-evm-ethereum", + "reth-exex-test-utils", "reth-exex-types", "reth-metrics", "reth-node-api", diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index 4e082c4573e8..53a460a45fba 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -44,6 +44,7 @@ reth-blockchain-tree.workspace = true reth-db-api.workspace = true reth-db-common.workspace = true reth-evm-ethereum.workspace = true +reth-exex-test-utils.workspace = true reth-node-api.workspace = true reth-primitives-traits = { workspace = true, features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] } diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 7f9645414be3..08c35d183b13 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -230,13 +230,11 @@ impl ExExNotifications { } impl ExExNotifications { - // TODO(alexey): make it public when backfill is implemented in [`ExExNotificationsWithHead`] /// Subscribe to notifications with the given head. /// /// Notifications will be sent starting from the head, not inclusive. For example, if /// `head.number == 10`, then the first notification will be with `block.number == 11`. - #[allow(dead_code)] - fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { + pub fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { ExExNotificationsWithHead::new(self.node_head, self.components, self.notifications, head) } } @@ -302,6 +300,8 @@ impl ExExNotificationsWithHead { /// - ExEx is ahead of the node head (`node_head.number > exex_head.number`). Wait until the /// node head catches up to the ExEx head, and then repeat the synchronization process. fn synchronize(&mut self) -> eyre::Result<()> { + debug!(target: "exex::manager", "Synchronizing ExEx head"); + let backfill_job_factory = BackfillJobFactory::new( self.components.block_executor().clone(), self.components.provider().clone(), @@ -314,6 +314,7 @@ impl ExExNotificationsWithHead { self.components.provider().header(&self.exex_head.block.hash)? { // ExEx is on the canonical chain + debug!(target: "exex::manager", "ExEx is behind the node head and on the canonical chain"); if exex_header.number != self.exex_head.block.number { eyre::bail!("ExEx head number does not match the hash") @@ -325,6 +326,7 @@ impl ExExNotificationsWithHead { .into_stream(); self.backfill_job = Some(backfill); } else { + debug!(target: "exex::manager", "ExEx is behind the node head and not on the canonical chain"); // ExEx is not on the canonical chain, first unwind it and then backfill // TODO(alexey): unwind and backfill @@ -339,6 +341,7 @@ impl ExExNotificationsWithHead { self.components.provider().header(&self.exex_head.block.hash)? { // ExEx is on the canonical chain + debug!(target: "exex::manager", "ExEx is at the same block height as the node head and on the canonical chain"); if exex_header.number != self.exex_head.block.number { eyre::bail!("ExEx head number does not match the hash") @@ -349,12 +352,15 @@ impl ExExNotificationsWithHead { self.backfill_job = None; } else { // ExEx is not on the canonical chain, first unwind it and then backfill + debug!(target: "exex::manager", "ExEx is at the same block height as the node head but not on the canonical chain"); // TODO(alexey): unwind and backfill self.backfill_job = None; } } std::cmp::Ordering::Greater => { + debug!(target: "exex::manager", "ExEx is ahead of the node head"); + // ExEx is ahead of the node head // TODO(alexey): wait until the node head is at the same height as the ExEx head @@ -788,8 +794,11 @@ impl Clone for ExExManagerHandle { mod tests { use super::*; use futures::StreamExt; - use reth_primitives::{SealedBlockWithSenders, B256}; - use reth_provider::Chain; + use reth_chainspec::MAINNET; + use reth_exex_test_utils::test_exex_context_components; + use reth_primitives::{Block, BlockNumHash, Header, SealedBlockWithSenders, B256}; + use reth_provider::{BlockReader, BlockWriter, Chain}; + use reth_testing_utils::generators::{self, random_block, BlockParams}; #[tokio::test] async fn test_delivers_events() { @@ -1190,4 +1199,105 @@ mod tests { // Ensure the notification ID was incremented assert_eq!(exex_handle.next_notification_id, 23); } + + #[tokio::test] + async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> { + let mut rng = generators::rng(); + + let (genesis_hash, provider_factory, _, components) = + test_exex_context_components(MAINNET.clone()).await?; + + let genesis_block = provider_factory + .block(genesis_hash.into())? + .ok_or_else(|| eyre::eyre!("genesis block not found"))?; + + let node_head_block = random_block( + &mut rng, + genesis_block.number + 1, + BlockParams { parent: Some(genesis_hash), ..Default::default() }, + ); + let provider_rw = provider_factory.provider_rw()?; + provider_rw.insert_block( + node_head_block.clone().seal_with_senders().ok_or_eyre("failed to recover senders")?, + )?; + provider_rw.commit()?; + + let node_head = Head { + number: node_head_block.number, + hash: node_head_block.hash(), + ..Default::default() + }; + let exex_head = + ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } }; + + let notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![random_block( + &mut rng, + node_head.number + 1, + BlockParams { parent: Some(node_head.hash), ..Default::default() }, + ) + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?], + Default::default(), + Default::default(), + )), + }; + + let (notifications_tx, notifications_rx) = mpsc::channel(1); + + notifications_tx.send(notification.clone()).await?; + + let mut notifications = + ExExNotifications::new(node_head, components, notifications_rx).with_head(exex_head); + let new_notification = notifications.next().await.transpose()?; + + assert_eq!(new_notification, Some(notification)); + + Ok(()) + } + + #[tokio::test] + async fn exex_notifications_same_head_canonical() -> eyre::Result<()> { + let (genesis_hash, provider_factory, _, components) = + test_exex_context_components(MAINNET.clone()).await?; + let genesis_block = provider_factory + .block(genesis_hash.into())? + .ok_or_else(|| eyre::eyre!("genesis block not found"))?; + + let node_head = + Head { number: genesis_block.number, hash: genesis_hash, ..Default::default() }; + let exex_head = + ExExHead { block: BlockNumHash { number: node_head.number, hash: node_head.hash } }; + + let notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![Block { + header: Header { + parent_hash: node_head.hash, + number: node_head.number + 1, + ..Default::default() + }, + ..Default::default() + } + .seal_slow() + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?], + Default::default(), + Default::default(), + )), + }; + + let (notifications_tx, notifications_rx) = mpsc::channel(1); + + notifications_tx.send(notification.clone()).await?; + + let mut notifications = + ExExNotifications::new(node_head, components, notifications_rx).with_head(exex_head); + let new_notification = notifications.next().await.transpose()?; + + assert_eq!(new_notification, Some(notification)); + + Ok(()) + } } diff --git a/crates/exex/test-utils/src/lib.rs b/crates/exex/test-utils/src/lib.rs index 21ee87c85b07..0828b218be92 100644 --- a/crates/exex/test-utils/src/lib.rs +++ b/crates/exex/test-utils/src/lib.rs @@ -9,6 +9,7 @@ #![cfg_attr(not(test), warn(unused_crate_dependencies))] use futures_util::FutureExt; +use rand::Rng; use reth_blockchain_tree::noop::NoopBlockchainTree; use reth_chainspec::{ChainSpec, MAINNET}; use reth_consensus::test_utils::TestConsensus; @@ -22,7 +23,8 @@ use reth_execution_types::Chain; use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotifications}; use reth_network::{config::SecretKey, NetworkConfigBuilder, NetworkManager}; use reth_node_api::{ - FullNodeTypes, FullNodeTypesAdapter, NodeTypes, NodeTypesWithDBAdapter, NodeTypesWithEngine, + FullNodeTypes, FullNodeTypesAdapter, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter, + NodeTypesWithEngine, }; use reth_node_builder::{ components::{ @@ -37,7 +39,7 @@ use reth_node_ethereum::{ EthEngineTypes, EthEvmConfig, }; use reth_payload_builder::noop::NoopPayloadBuilderService; -use reth_primitives::{Head, SealedBlockWithSenders}; +use reth_primitives::{Head, SealedBlockWithSenders, B256}; use reth_provider::{ providers::{BlockchainProvider, StaticFileProvider}, BlockReader, ProviderFactory, @@ -223,20 +225,14 @@ impl TestExExHandle { } } -/// Creates a new [`ExExContext`]. -/// -/// This is a convenience function that does the following: -/// 1. Sets up an [`ExExContext`] with all dependencies. -/// 2. Inserts the genesis block of the provided (chain spec)[`ChainSpec`] into the storage. -/// 3. Creates a channel for receiving events from the Execution Extension. -/// 4. Creates a channel for sending notifications to the Execution Extension. -/// -/// # Warning -/// The genesis block is not sent to the notifications channel. The caller is responsible for -/// doing this. -pub async fn test_exex_context_with_chain_spec( +pub async fn test_exex_context_components( chain_spec: Arc, -) -> eyre::Result<(ExExContext, TestExExHandle)> { +) -> eyre::Result<( + B256, + ProviderFactory< as FullNodeTypes>::Types>, + TaskManager, + Adapter, +)> { let transaction_pool = testing_pool(); let evm_config = EthEvmConfig::new(chain_spec.clone()); let executor = MockExecutorProvider::default(); @@ -254,8 +250,11 @@ pub async fn test_exex_context_with_chain_spec( let provider = BlockchainProvider::new(provider_factory.clone(), Arc::new(NoopBlockchainTree::default()))?; + let mut rng = &mut rand::thread_rng(); let network_manager = NetworkManager::new( - NetworkConfigBuilder::new(SecretKey::new(&mut rand::thread_rng())) + NetworkConfigBuilder::new(SecretKey::new(&mut rng)) + .listener_port(rng.gen()) + .discovery_port(rng.gen()) .build(provider_factory.clone()), ) .await?; @@ -266,7 +265,7 @@ pub async fn test_exex_context_with_chain_spec( let tasks = TaskManager::current(); let task_executor = tasks.executor(); - let components = NodeAdapter::, _>, _> { + let components = Adapter { components: Components { transaction_pool, evm_config, @@ -279,6 +278,25 @@ pub async fn test_exex_context_with_chain_spec( provider, }; + Ok((genesis_hash, provider_factory, tasks, components)) +} + +/// Creates a new [`ExExContext`]. +/// +/// This is a convenience function that does the following: +/// 1. Sets up an [`ExExContext`] with all dependencies. +/// 2. Inserts the genesis block of the provided (chain spec)[`ChainSpec`] into the storage. +/// 3. Creates a channel for receiving events from the Execution Extension. +/// 4. Creates a channel for sending notifications to the Execution Extension. +/// +/// # Warning +/// The genesis block is not sent to the notifications channel. The caller is responsible for +/// doing this. +pub async fn test_exex_context_with_chain_spec( + chain_spec: Arc, +) -> eyre::Result<(ExExContext, TestExExHandle)> { + let (genesis_hash, provider_factory, tasks, components) = + test_exex_context_components(chain_spec).await?; let genesis = provider_factory .block_by_hash(genesis_hash)? .ok_or_else(|| eyre::eyre!("genesis block not found"))? From 2343685c0409becdc1497418aed49e2a91717a92 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 16 Sep 2024 17:50:23 +0100 Subject: [PATCH 10/16] accept decomposed components --- crates/exex/exex/src/context.rs | 9 +- crates/exex/exex/src/manager.rs | 146 +++++++++++++++---------- crates/exex/test-utils/src/lib.rs | 7 +- crates/node/builder/src/launch/exex.rs | 8 +- 4 files changed, 107 insertions(+), 63 deletions(-) diff --git a/crates/exex/exex/src/context.rs b/crates/exex/exex/src/context.rs index b4743732563b..ec3c880148aa 100644 --- a/crates/exex/exex/src/context.rs +++ b/crates/exex/exex/src/context.rs @@ -30,13 +30,18 @@ pub struct ExExContext { /// /// Once an [`ExExNotification`](crate::ExExNotification) is sent over the channel, it is /// considered delivered by the node. - pub notifications: ExExNotifications, + pub notifications: ExExNotifications, /// node components pub components: Node, } -impl Debug for ExExContext { +impl Debug for ExExContext +where + Node: FullNodeComponents, + Node::Provider: Debug, + Node::Executor: Debug, +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ExExContext") .field("head", &self.head) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 08c35d183b13..a860cf8a2c0c 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -5,11 +5,11 @@ use eyre::OptionExt; use futures::{Stream, StreamExt}; use metrics::Gauge; use reth_chainspec::Head; +use reth_evm::execute::BlockExecutorProvider; use reth_exex_types::ExExHead; use reth_metrics::{metrics::Counter, Metrics}; -use reth_node_api::FullNodeComponents; use reth_primitives::{BlockNumber, U256}; -use reth_provider::{Chain, HeaderProvider}; +use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory}; use reth_tracing::tracing::debug; use std::{ collections::VecDeque, @@ -66,15 +66,16 @@ impl ExExHandle { /// /// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a /// [`Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`. - pub fn new( + pub fn new( id: String, node_head: Head, - components: Node, - ) -> (Self, UnboundedSender, ExExNotifications) { + provider: P, + executor: E, + ) -> (Self, UnboundedSender, ExExNotifications) { let (notification_tx, notification_rx) = mpsc::channel(1); let (event_tx, event_rx) = mpsc::unbounded_channel(); let notifications = - ExExNotifications { node_head, components, notifications: notification_rx }; + ExExNotifications { node_head, provider, executor, notifications: notification_rx }; ( Self { @@ -153,29 +154,32 @@ impl ExExHandle { } /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. -pub struct ExExNotifications { +pub struct ExExNotifications { node_head: Head, - components: Node, + provider: P, + executor: E, notifications: Receiver, } -impl Debug for ExExNotifications { +impl Debug for ExExNotifications { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ExExNotifications") - .field("components", &"...") + .field("provider", &self.provider) + .field("executor", &self.executor) .field("notifications", &self.notifications) .finish() } } -impl ExExNotifications { +impl ExExNotifications { /// Creates a new instance of [`ExExNotifications`]. pub const fn new( node_head: Head, - components: Node, + provider: P, + executor: E, notifications: Receiver, ) -> Self { - Self { node_head, components, notifications } + Self { node_head, provider, executor, notifications } } /// Receives the next value for this receiver. @@ -229,17 +233,27 @@ impl ExExNotifications { } } -impl ExExNotifications { +impl ExExNotifications +where + P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, + E: BlockExecutorProvider + Clone + Unpin + 'static, +{ /// Subscribe to notifications with the given head. /// /// Notifications will be sent starting from the head, not inclusive. For example, if /// `head.number == 10`, then the first notification will be with `block.number == 11`. - pub fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { - ExExNotificationsWithHead::new(self.node_head, self.components, self.notifications, head) + pub fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { + ExExNotificationsWithHead::new( + self.node_head, + self.provider, + self.executor, + self.notifications, + head, + ) } } -impl Stream for ExExNotifications { +impl Stream for ExExNotifications { type Item = ExExNotification; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -250,30 +264,37 @@ impl Stream for ExExNotifications { /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are /// committed or reverted after the given head. #[derive(Debug)] -pub struct ExExNotificationsWithHead { +pub struct ExExNotificationsWithHead { node_head: Head, - components: Node, + provider: P, + executor: E, notifications: Receiver, exex_head: ExExHead, pending_sync: bool, /// The backfill job to run before consuming any notifications. - backfill_job: Option>, + backfill_job: Option>, /// Whether we're currently waiting for the node head to catch up to the same height as the /// ExEx head. node_head_catchup_in_progress: bool, } -impl ExExNotificationsWithHead { +impl ExExNotificationsWithHead +where + P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, + E: BlockExecutorProvider + Clone + Unpin + 'static, +{ /// Creates a new [`ExExNotificationsWithHead`]. pub const fn new( node_head: Head, - components: Node, + provider: P, + executor: E, notifications: Receiver, exex_head: ExExHead, ) -> Self { Self { node_head, - components, + provider, + executor, notifications, exex_head, pending_sync: true, @@ -302,17 +323,13 @@ impl ExExNotificationsWithHead { fn synchronize(&mut self) -> eyre::Result<()> { debug!(target: "exex::manager", "Synchronizing ExEx head"); - let backfill_job_factory = BackfillJobFactory::new( - self.components.block_executor().clone(), - self.components.provider().clone(), - ); + let backfill_job_factory = + BackfillJobFactory::new(self.executor.clone(), self.provider.clone()); match self.exex_head.block.number.cmp(&self.node_head.number) { std::cmp::Ordering::Less => { // ExEx is behind the node head - if let Some(exex_header) = - self.components.provider().header(&self.exex_head.block.hash)? - { + if let Some(exex_header) = self.provider.header(&self.exex_head.block.hash)? { // ExEx is on the canonical chain debug!(target: "exex::manager", "ExEx is behind the node head and on the canonical chain"); @@ -337,9 +354,7 @@ impl ExExNotificationsWithHead { std::cmp::Ordering::Equal => { // ExEx is at the same block height as the node head - if let Some(exex_header) = - self.components.provider().header(&self.exex_head.block.hash)? - { + if let Some(exex_header) = self.provider.header(&self.exex_head.block.hash)? { // ExEx is on the canonical chain debug!(target: "exex::manager", "ExEx is at the same block height as the node head and on the canonical chain"); @@ -373,7 +388,11 @@ impl ExExNotificationsWithHead { } } -impl Stream for ExExNotificationsWithHead { +impl Stream for ExExNotificationsWithHead +where + P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, + E: BlockExecutorProvider + Clone + Unpin + 'static, +{ type Item = eyre::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -431,11 +450,8 @@ impl Stream for ExExNotificationsWithHead ExExLauncher { for (id, exex) in extensions { // create a new exex handle - let (handle, events, notifications) = - ExExHandle::new(id.clone(), head, components.clone()); + let (handle, events, notifications) = ExExHandle::new( + id.clone(), + head, + components.provider().clone(), + components.block_executor().clone(), + ); exex_handles.push(handle); // create the launch context for the exex From 9189c20c6afeb670390f23e85ed0a5c6c2787b4e Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 16 Sep 2024 18:11:21 +0100 Subject: [PATCH 11/16] fix test --- crates/exex/exex/src/manager.rs | 26 +++++++++++++++++++++----- crates/exex/test-utils/src/lib.rs | 3 +-- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index a860cf8a2c0c..a434d41887c3 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1234,7 +1234,7 @@ mod tests { let node_head_block = random_block( &mut rng, genesis_block.number + 1, - BlockParams { parent: Some(genesis_hash), ..Default::default() }, + BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() }, ); let provider_rw = provider_factory.provider_rw()?; provider_rw.insert_block( @@ -1260,7 +1260,7 @@ mod tests { .seal_with_senders() .ok_or_eyre("failed to recover senders")?], Default::default(), - Default::default(), + None, )), }; @@ -1275,9 +1275,25 @@ mod tests { notifications_rx, ) .with_head(exex_head); - let new_notification = notifications.next().await.transpose()?; - assert_eq!(new_notification, Some(notification)); + // First notification is the backfill of missing blocks from the canonical chain + assert_eq!( + notifications.next().await.transpose()?, + Some(ExExNotification::ChainCommitted { + new: Arc::new( + BackfillJobFactory::new( + notifications.executor.clone(), + notifications.provider.clone() + ) + .backfill(1..=1) + .next() + .ok_or_eyre("failed to backfill")?? + ) + }) + ); + + // Second notification is the actual notification that we sent before + assert_eq!(notifications.next().await.transpose()?, Some(notification)); Ok(()) } @@ -1309,7 +1325,7 @@ mod tests { .seal_with_senders() .ok_or_eyre("failed to recover senders")?], Default::default(), - Default::default(), + None, )), }; diff --git a/crates/exex/test-utils/src/lib.rs b/crates/exex/test-utils/src/lib.rs index 75d925797d60..bedf57950ceb 100644 --- a/crates/exex/test-utils/src/lib.rs +++ b/crates/exex/test-utils/src/lib.rs @@ -23,8 +23,7 @@ use reth_execution_types::Chain; use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotifications}; use reth_network::{config::SecretKey, NetworkConfigBuilder, NetworkManager}; use reth_node_api::{ - FullNodeTypes, FullNodeTypesAdapter, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter, - NodeTypesWithEngine, + FullNodeTypes, FullNodeTypesAdapter, NodeTypes, NodeTypesWithDBAdapter, NodeTypesWithEngine, }; use reth_node_builder::{ components::{ From 371252203ef741f5a221ae0dddd0c0b69bf17e76 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 16 Sep 2024 18:24:27 +0100 Subject: [PATCH 12/16] ahead of node --- crates/exex/exex/src/manager.rs | 71 +++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index a434d41887c3..781013f4bf3d 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1346,4 +1346,75 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_notifications_ahead_of_head() -> eyre::Result<()> { + let mut rng = generators::rng(); + + let (genesis_hash, provider_factory, _, components) = + test_exex_context_components(MAINNET.clone()).await?; + + let genesis_block = provider_factory + .block(genesis_hash.into())? + .ok_or_else(|| eyre::eyre!("genesis block not found"))?; + + let exex_head_block = random_block( + &mut rng, + genesis_block.number + 1, + BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() }, + ); + + let node_head = + Head { number: genesis_block.number, hash: genesis_hash, ..Default::default() }; + let exex_head = ExExHead { + block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() }, + }; + + let (notifications_tx, notifications_rx) = mpsc::channel(1); + + notifications_tx + .send(ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![exex_head_block + .clone() + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?], + Default::default(), + None, + )), + }) + .await?; + + let mut notifications = ExExNotifications::new( + node_head, + components.provider, + EthExecutorProvider::mainnet(), + notifications_rx, + ) + .with_head(exex_head); + + // First notification is skipped because the node is catching up with the ExEx + let new_notification = poll_fn(|cx| Poll::Ready(notifications.poll_next_unpin(cx))).await; + assert!(new_notification.is_pending()); + + let notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![random_block( + &mut rng, + exex_head_block.number + 1, + BlockParams { parent: Some(exex_head_block.hash()), ..Default::default() }, + ) + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?], + Default::default(), + None, + )), + }; + notifications_tx.send(notification.clone()).await?; + + // Second notification is received because the node caught up with the ExEx + assert_eq!(notifications.next().await.transpose()?, Some(notification)); + + Ok(()) + } } From 98ceba59daf0d11040ddf83260ad298e5b5d7e9b Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 17 Sep 2024 11:00:44 +0100 Subject: [PATCH 13/16] revert changes to test utils --- Cargo.lock | 1 - crates/exex/exex/Cargo.toml | 1 - crates/exex/exex/src/manager.rs | 34 ++++++++++++--------- crates/exex/test-utils/src/lib.rs | 49 ++++++++++--------------------- 4 files changed, 36 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bd126cd3ac05..5b4ccbd7e37a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7292,7 +7292,6 @@ dependencies = [ "reth-db-common", "reth-evm", "reth-evm-ethereum", - "reth-exex-test-utils", "reth-exex-types", "reth-metrics", "reth-node-api", diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index 53a460a45fba..4e082c4573e8 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -44,7 +44,6 @@ reth-blockchain-tree.workspace = true reth-db-api.workspace = true reth-db-common.workspace = true reth-evm-ethereum.workspace = true -reth-exex-test-utils.workspace = true reth-node-api.workspace = true reth-primitives-traits = { workspace = true, features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] } diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 781013f4bf3d..bf5e7c86819b 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -810,11 +810,13 @@ impl Clone for ExExManagerHandle { mod tests { use super::*; use futures::StreamExt; - use reth_chainspec::MAINNET; + use reth_db_common::init::init_genesis; use reth_evm_ethereum::execute::EthExecutorProvider; - use reth_exex_test_utils::test_exex_context_components; use reth_primitives::{Block, BlockNumHash, Header, SealedBlockWithSenders, B256}; - use reth_provider::{BlockReader, BlockWriter, Chain}; + use reth_provider::{ + providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockReader, + BlockWriter, Chain, + }; use reth_testing_utils::generators::{self, random_block, BlockParams}; #[tokio::test] @@ -1224,13 +1226,14 @@ mod tests { async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> { let mut rng = generators::rng(); - let (genesis_hash, provider_factory, _, components) = - test_exex_context_components(MAINNET.clone()).await?; - + let provider_factory = create_test_provider_factory(); + let genesis_hash = init_genesis(&provider_factory)?; let genesis_block = provider_factory .block(genesis_hash.into())? .ok_or_else(|| eyre::eyre!("genesis block not found"))?; + let provider = BlockchainProvider2::new(provider_factory.clone())?; + let node_head_block = random_block( &mut rng, genesis_block.number + 1, @@ -1270,7 +1273,7 @@ mod tests { let mut notifications = ExExNotifications::new( node_head, - components.provider, + provider, EthExecutorProvider::mainnet(), notifications_rx, ) @@ -1300,12 +1303,14 @@ mod tests { #[tokio::test] async fn exex_notifications_same_head_canonical() -> eyre::Result<()> { - let (genesis_hash, provider_factory, _, components) = - test_exex_context_components(MAINNET.clone()).await?; + let provider_factory = create_test_provider_factory(); + let genesis_hash = init_genesis(&provider_factory)?; let genesis_block = provider_factory .block(genesis_hash.into())? .ok_or_else(|| eyre::eyre!("genesis block not found"))?; + let provider = BlockchainProvider2::new(provider_factory)?; + let node_head = Head { number: genesis_block.number, hash: genesis_hash, ..Default::default() }; let exex_head = @@ -1335,7 +1340,7 @@ mod tests { let mut notifications = ExExNotifications::new( node_head, - components.provider, + provider, EthExecutorProvider::mainnet(), notifications_rx, ) @@ -1351,13 +1356,14 @@ mod tests { async fn test_notifications_ahead_of_head() -> eyre::Result<()> { let mut rng = generators::rng(); - let (genesis_hash, provider_factory, _, components) = - test_exex_context_components(MAINNET.clone()).await?; - + let provider_factory = create_test_provider_factory(); + let genesis_hash = init_genesis(&provider_factory)?; let genesis_block = provider_factory .block(genesis_hash.into())? .ok_or_else(|| eyre::eyre!("genesis block not found"))?; + let provider = BlockchainProvider2::new(provider_factory)?; + let exex_head_block = random_block( &mut rng, genesis_block.number + 1, @@ -1387,7 +1393,7 @@ mod tests { let mut notifications = ExExNotifications::new( node_head, - components.provider, + provider, EthExecutorProvider::mainnet(), notifications_rx, ) diff --git a/crates/exex/test-utils/src/lib.rs b/crates/exex/test-utils/src/lib.rs index bedf57950ceb..ab1958cc86c7 100644 --- a/crates/exex/test-utils/src/lib.rs +++ b/crates/exex/test-utils/src/lib.rs @@ -9,7 +9,6 @@ #![cfg_attr(not(test), warn(unused_crate_dependencies))] use futures_util::FutureExt; -use rand::Rng; use reth_blockchain_tree::noop::NoopBlockchainTree; use reth_chainspec::{ChainSpec, MAINNET}; use reth_consensus::test_utils::TestConsensus; @@ -38,7 +37,7 @@ use reth_node_ethereum::{ EthEngineTypes, EthEvmConfig, }; use reth_payload_builder::noop::NoopPayloadBuilderService; -use reth_primitives::{Head, SealedBlockWithSenders, B256}; +use reth_primitives::{Head, SealedBlockWithSenders}; use reth_provider::{ providers::{BlockchainProvider, StaticFileProvider}, BlockReader, ProviderFactory, @@ -224,14 +223,20 @@ impl TestExExHandle { } } -pub async fn test_exex_context_components( +/// Creates a new [`ExExContext`]. +/// +/// This is a convenience function that does the following: +/// 1. Sets up an [`ExExContext`] with all dependencies. +/// 2. Inserts the genesis block of the provided (chain spec)[`ChainSpec`] into the storage. +/// 3. Creates a channel for receiving events from the Execution Extension. +/// 4. Creates a channel for sending notifications to the Execution Extension. +/// +/// # Warning +/// The genesis block is not sent to the notifications channel. The caller is responsible for +/// doing this. +pub async fn test_exex_context_with_chain_spec( chain_spec: Arc, -) -> eyre::Result<( - B256, - ProviderFactory< as FullNodeTypes>::Types>, - TaskManager, - Adapter, -)> { +) -> eyre::Result<(ExExContext, TestExExHandle)> { let transaction_pool = testing_pool(); let evm_config = EthEvmConfig::new(chain_spec.clone()); let executor = MockExecutorProvider::default(); @@ -249,11 +254,8 @@ pub async fn test_exex_context_components( let provider = BlockchainProvider::new(provider_factory.clone(), Arc::new(NoopBlockchainTree::default()))?; - let mut rng = &mut rand::thread_rng(); let network_manager = NetworkManager::new( - NetworkConfigBuilder::new(SecretKey::new(&mut rng)) - .listener_port(rng.gen()) - .discovery_port(rng.gen()) + NetworkConfigBuilder::new(SecretKey::new(&mut rand::thread_rng())) .build(provider_factory.clone()), ) .await?; @@ -264,7 +266,7 @@ pub async fn test_exex_context_components( let tasks = TaskManager::current(); let task_executor = tasks.executor(); - let components = Adapter { + let components = NodeAdapter::, _>, _> { components: Components { transaction_pool, evm_config, @@ -277,25 +279,6 @@ pub async fn test_exex_context_components( provider, }; - Ok((genesis_hash, provider_factory, tasks, components)) -} - -/// Creates a new [`ExExContext`]. -/// -/// This is a convenience function that does the following: -/// 1. Sets up an [`ExExContext`] with all dependencies. -/// 2. Inserts the genesis block of the provided (chain spec)[`ChainSpec`] into the storage. -/// 3. Creates a channel for receiving events from the Execution Extension. -/// 4. Creates a channel for sending notifications to the Execution Extension. -/// -/// # Warning -/// The genesis block is not sent to the notifications channel. The caller is responsible for -/// doing this. -pub async fn test_exex_context_with_chain_spec( - chain_spec: Arc, -) -> eyre::Result<(ExExContext, TestExExHandle)> { - let (genesis_hash, provider_factory, tasks, components) = - test_exex_context_components(chain_spec).await?; let genesis = provider_factory .block_by_hash(genesis_hash)? .ok_or_else(|| eyre::eyre!("genesis block not found"))? From 4139d00afef9a187680c976943e99eb54c2dbbab Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 17 Sep 2024 11:09:57 +0100 Subject: [PATCH 14/16] add empty tests for remaining cases --- crates/exex/exex/src/manager.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index bf5e7c86819b..5e0e5b215cd6 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1301,6 +1301,12 @@ mod tests { Ok(()) } + #[ignore] + #[tokio::test] + async fn exex_notifications_behind_head_non_canonical() -> eyre::Result<()> { + Ok(()) + } + #[tokio::test] async fn exex_notifications_same_head_canonical() -> eyre::Result<()> { let provider_factory = create_test_provider_factory(); @@ -1345,13 +1351,19 @@ mod tests { notifications_rx, ) .with_head(exex_head); - let new_notification = notifications.next().await.transpose()?; + let new_notification = notifications.next().await.transpose()?; assert_eq!(new_notification, Some(notification)); Ok(()) } + #[ignore] + #[tokio::test] + async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> { + Ok(()) + } + #[tokio::test] async fn test_notifications_ahead_of_head() -> eyre::Result<()> { let mut rng = generators::rng(); @@ -1403,6 +1415,8 @@ mod tests { let new_notification = poll_fn(|cx| Poll::Ready(notifications.poll_next_unpin(cx))).await; assert!(new_notification.is_pending()); + // Imitate the node catching up with the ExEx by sending a notification for the missing + // block let notification = ExExNotification::ChainCommitted { new: Arc::new(Chain::new( vec![random_block( From f2a5705c4ce0a27e94115f7d3ee758b674d244f0 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 17 Sep 2024 11:13:42 +0100 Subject: [PATCH 15/16] make with_head private again --- crates/exex/exex/src/manager.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 5e0e5b215cd6..6cd43fd58566 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -238,11 +238,13 @@ where P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, E: BlockExecutorProvider + Clone + Unpin + 'static, { + // TODO(alexey): make it public when backfill is implemented in [`ExExNotificationsWithHead`] /// Subscribe to notifications with the given head. /// /// Notifications will be sent starting from the head, not inclusive. For example, if /// `head.number == 10`, then the first notification will be with `block.number == 11`. - pub fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { + #[allow(dead_code)] + fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { ExExNotificationsWithHead::new( self.node_head, self.provider, From 6398a02a05fdc43d4cdeba1febd1ba057f6200b7 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 17 Sep 2024 11:56:23 +0100 Subject: [PATCH 16/16] Revert "make with_head private again" This reverts commit f2a5705c4ce0a27e94115f7d3ee758b674d244f0. --- crates/exex/exex/src/manager.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 6cd43fd58566..5e0e5b215cd6 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -238,13 +238,11 @@ where P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, E: BlockExecutorProvider + Clone + Unpin + 'static, { - // TODO(alexey): make it public when backfill is implemented in [`ExExNotificationsWithHead`] /// Subscribe to notifications with the given head. /// /// Notifications will be sent starting from the head, not inclusive. For example, if /// `head.number == 10`, then the first notification will be with `block.number == 11`. - #[allow(dead_code)] - fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { + pub fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { ExExNotificationsWithHead::new( self.node_head, self.provider,