From a89de219c97e470f77f9833dc170ed1344f84a21 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 6 Sep 2024 15:24:23 +0100 Subject: [PATCH] feat(exex): subscribe to notifications explicitly (#10573) --- Cargo.lock | 1 + book/developers/exex/hello-world.md | 5 +- book/developers/exex/remote.md | 10 +- book/developers/exex/tracking-state.md | 6 +- book/installation/source.md | 6 +- crates/exex/exex/Cargo.toml | 4 +- crates/exex/exex/src/context.rs | 12 +- crates/exex/exex/src/manager.rs | 191 +++++++++++++++++++++---- crates/exex/test-utils/src/lib.rs | 5 +- crates/exex/types/Cargo.toml | 1 + crates/exex/types/src/head.rs | 9 ++ crates/exex/types/src/lib.rs | 2 + crates/node/builder/src/launch/exex.rs | 2 +- 13 files changed, 209 insertions(+), 45 deletions(-) create mode 100644 crates/exex/types/src/head.rs diff --git a/Cargo.lock b/Cargo.lock index 4b8570d9b74f..c4857d0fd2bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7308,6 +7308,7 @@ name = "reth-exex-types" version = "1.0.6" dependencies = [ "alloy-primitives", + "reth-primitives", "reth-provider", "serde", ] diff --git a/book/developers/exex/hello-world.md b/book/developers/exex/hello-world.md index b15a203cddbb..3c90e5a693d0 100644 --- a/book/developers/exex/hello-world.md +++ b/book/developers/exex/hello-world.md @@ -24,7 +24,9 @@ reth = { git = "https://github.com/paradigmxyz/reth.git" } # Reth reth-exex = { git = "https://github.com/paradigmxyz/reth.git" } # Execution Extensions reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth.git" } # Ethereum Node implementation reth-tracing = { git = "https://github.com/paradigmxyz/reth.git" } # Logging + eyre = "0.6" # Easy error handling +futures-util = "0.3" # Stream utilities for consuming notifications ``` ### Default Reth node @@ -101,13 +103,14 @@ If you try running a node with an ExEx that exits, the node will exit as well. Now, let's extend our simplest ExEx and start actually listening to new notifications, log them, and send events back to the main node ```rust,norun,noplayground,ignore +use futures_util::StreamExt; use reth::api::FullNodeComponents; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_ethereum::EthereumNode; use reth_tracing::tracing::info; async fn my_exex(mut ctx: ExExContext) -> eyre::Result<()> { - while let Some(notification) = ctx.notifications.recv().await { + while let Some(notification) = ctx.notifications.next().await { match ¬ification { ExExNotification::ChainCommitted { new } => { info!(committed_chain = ?new.range(), "Received commit"); diff --git a/book/developers/exex/remote.md b/book/developers/exex/remote.md index 2e7e7dad10b0..2db5074e1df7 100644 --- a/book/developers/exex/remote.md +++ b/book/developers/exex/remote.md @@ -268,13 +268,15 @@ Don't forget to emit `ExExEvent::FinishedHeight` ```rust,norun,noplayground,ignore // ... + +use futures_util::StreamExt; use reth_exex::{ExExContext, ExExEvent}; async fn remote_exex( mut ctx: ExExContext, notifications: Arc>, ) -> eyre::Result<()> { - while let Some(notification) = ctx.notifications.recv().await { + while let Some(notification) = ctx.notifications.next().await { if let Some(committed_chain) = notification.committed_chain() { ctx.events .send(ExExEvent::FinishedHeight(committed_chain.tip().number))?; @@ -332,6 +334,9 @@ fn main() -> eyre::Result<()> { Click to expand ```rust,norun,noplayground,ignore +use std::sync::Arc; + +use futures_util::StreamExt; use remote_exex::proto::{ self, remote_ex_ex_server::{RemoteExEx, RemoteExExServer}, @@ -340,7 +345,6 @@ use reth::api::FullNodeComponents; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_ethereum::EthereumNode; use reth_tracing::tracing::info; -use std::sync::Arc; use tokio::sync::{broadcast, mpsc}; use tokio_stream::wrappers::ReceiverStream; use tonic::{transport::Server, Request, Response, Status}; @@ -381,7 +385,7 @@ async fn remote_exex( mut ctx: ExExContext, notifications: Arc>, ) -> eyre::Result<()> { - while let Some(notification) = ctx.notifications.recv().await { + while let Some(notification) = ctx.notifications.next().await { if let Some(committed_chain) = notification.committed_chain() { ctx.events .send(ExExEvent::FinishedHeight(committed_chain.tip().number))?; diff --git a/book/developers/exex/tracking-state.md b/book/developers/exex/tracking-state.md index 5fe8b1c9ef83..4d3bbd0a35ae 100644 --- a/book/developers/exex/tracking-state.md +++ b/book/developers/exex/tracking-state.md @@ -25,6 +25,7 @@ use std::{ task::{ready, Context, Poll}, }; +use futures_util::StreamExt; use reth::api::FullNodeComponents; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_ethereum::EthereumNode; @@ -40,7 +41,7 @@ impl Future for MyExEx { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - while let Some(notification) = ready!(this.ctx.notifications.poll_recv(cx)) { + while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) { match ¬ification { ExExNotification::ChainCommitted { new } => { info!(committed_chain = ?new.range(), "Received commit"); @@ -101,6 +102,7 @@ use std::{ task::{ready, Context, Poll}, }; +use futures_util::StreamExt; use reth::{api::FullNodeComponents, primitives::BlockNumber}; use reth_exex::{ExExContext, ExExEvent}; use reth_node_ethereum::EthereumNode; @@ -130,7 +132,7 @@ impl Future for MyExEx { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - while let Some(notification) = ready!(this.ctx.notifications.poll_recv(cx)) { + while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) { if let Some(reverted_chain) = notification.reverted_chain() { this.transactions = this.transactions.saturating_sub( reverted_chain diff --git a/book/installation/source.md b/book/installation/source.md index 14ae22e0fda4..d9642c4bc48d 100644 --- a/book/installation/source.md +++ b/book/installation/source.md @@ -8,7 +8,7 @@ You can build Reth on Linux, macOS, Windows, and Windows WSL2. ## Dependencies -First, **install Rust** using [rustup](https://rustup.rs/): +First, **install Rust** using [rustup](https://rustup.rs/): ```bash curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh @@ -32,7 +32,7 @@ operating system: These are needed to build bindings for Reth's database. -The Minimum Supported Rust Version (MSRV) of this project is 1.81.0. If you already have a version of Rust installed, +The Minimum Supported Rust Version (MSRV) of this project is 1.80.0. If you already have a version of Rust installed, you can check your version by running `rustc --version`. To update your version of Rust, run `rustup update`. ## Build Reth @@ -147,7 +147,7 @@ _(Thanks to Sigma Prime for this section from [their Lighthouse book](https://li ### Bus error (WSL2) -In WSL 2 on Windows, the default virtual disk size is set to 1TB. +In WSL 2 on Windows, the default virtual disk size is set to 1TB. You must increase the allocated disk size for your WSL2 instance before syncing reth. diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index 1ad906e6f010..f3decd337417 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -20,8 +20,8 @@ reth-metrics.workspace = true reth-node-api.workspace = true reth-node-core.workspace = true reth-payload-builder.workspace = true -reth-primitives-traits.workspace = true reth-primitives = { workspace = true, features = ["secp256k1"] } +reth-primitives-traits.workspace = true reth-provider.workspace = true reth-prune-types.workspace = true reth-revm.workspace = true @@ -45,9 +45,9 @@ reth-db-api.workspace = true reth-db-common.workspace = true reth-evm-ethereum.workspace = true reth-node-api.workspace = true +reth-primitives-traits = { workspace = true, features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] } reth-testing-utils.workspace = true -reth-primitives-traits = { workspace = true, features = ["test-utils"] } secp256k1.workspace = true diff --git a/crates/exex/exex/src/context.rs b/crates/exex/exex/src/context.rs index 90fbe0d568a4..b4743732563b 100644 --- a/crates/exex/exex/src/context.rs +++ b/crates/exex/exex/src/context.rs @@ -4,9 +4,9 @@ use reth_node_api::{FullNodeComponents, NodeTypesWithEngine}; use reth_node_core::node_config::NodeConfig; use reth_primitives::Head; use reth_tasks::TaskExecutor; -use tokio::sync::mpsc::{Receiver, UnboundedSender}; +use tokio::sync::mpsc::UnboundedSender; -use crate::{ExExEvent, ExExNotification}; +use crate::{ExExEvent, ExExNotifications}; /// Captures the context that an `ExEx` has access to. pub struct ExExContext { @@ -24,13 +24,13 @@ pub struct ExExContext { /// Additionally, the exex can pre-emptively emit a `FinishedHeight` event to specify what /// blocks to receive notifications for. pub events: UnboundedSender, - /// Channel to receive [`ExExNotification`]s. + /// Channel to receive [`ExExNotification`](crate::ExExNotification)s. /// /// # Important /// - /// Once a an [`ExExNotification`] is sent over the channel, it is considered delivered by the - /// node. - pub notifications: Receiver, + /// Once an [`ExExNotification`](crate::ExExNotification) is sent over the channel, it is + /// considered delivered by the node. + pub notifications: ExExNotifications, /// node components pub components: Node, diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 0f222e0ecaa9..48a77d786275 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1,10 +1,13 @@ use crate::{ExExEvent, ExExNotification, FinishedExExHeight}; +use futures::Stream; use metrics::Gauge; +use reth_exex_types::ExExHead; use reth_metrics::{metrics::Counter, Metrics}; use reth_primitives::BlockNumber; use reth_tracing::tracing::debug; use std::{ collections::VecDeque, + fmt::Debug, future::{poll_fn, Future}, pin::Pin, sync::{ @@ -40,14 +43,12 @@ pub struct ExExHandle { id: String, /// Metrics for an `ExEx`. metrics: ExExMetrics, - /// Channel to send [`ExExNotification`]s to the `ExEx`. sender: PollSender, /// Channel to receive [`ExExEvent`]s from the `ExEx`. receiver: UnboundedReceiver, /// The ID of the next notification to send to this `ExEx`. next_notification_id: usize, - /// The finished block number of the `ExEx`. /// /// If this is `None`, the `ExEx` has not emitted a `FinishedHeight` event. @@ -59,9 +60,13 @@ impl ExExHandle { /// /// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a /// [`Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`. - pub fn new(id: String) -> (Self, UnboundedSender, Receiver) { + pub fn new( + id: String, + components: Node, + ) -> (Self, UnboundedSender, ExExNotifications) { let (notification_tx, notification_rx) = mpsc::channel(1); let (event_tx, event_rx) = mpsc::unbounded_channel(); + let notifications = ExExNotifications { components, notifications: notification_rx }; ( Self { @@ -73,7 +78,7 @@ impl ExExHandle { finished_height: None, }, event_tx, - notification_rx, + notifications, ) } @@ -139,6 +144,133 @@ impl ExExHandle { } } +/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. +pub struct ExExNotifications { + components: Node, + notifications: Receiver, +} + +impl Debug for ExExNotifications { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ExExNotifications") + .field("components", &"...") + .field("notifications", &self.notifications) + .finish() + } +} + +impl ExExNotifications { + /// Creates a new instance of [`ExExNotifications`]. + pub const fn new(components: Node, notifications: Receiver) -> Self { + Self { components, notifications } + } + + /// Receives the next value for this receiver. + /// + /// This method returns `None` if the channel has been closed and there are + /// no remaining messages in the channel's buffer. This indicates that no + /// further values can ever be received from this `Receiver`. The channel is + /// closed when all senders have been dropped, or when [`Receiver::close`] is called. + /// + /// # Cancel safety + /// + /// This method is cancel safe. If `recv` is used as the event in a + /// [`tokio::select!`] statement and some other branch + /// completes first, it is guaranteed that no messages were received on this + /// channel. + /// + /// For full documentation, see [`Receiver::recv`]. + #[deprecated(note = "use `ExExNotifications::next` and its `Stream` implementation instead")] + pub async fn recv(&mut self) -> Option { + self.notifications.recv().await + } + + /// Polls to receive the next message on this channel. + /// + /// This method returns: + /// + /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a + /// spurious failure happens. + /// * `Poll::Ready(Some(message))` if a message is available. + /// * `Poll::Ready(None)` if the channel has been closed and all messages sent before it was + /// closed have been received. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when a message is sent on any + /// receiver, or when the channel is closed. Note that on multiple calls to + /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context` + /// passed to the most recent call is scheduled to receive a wakeup. + /// + /// If this method returns `Poll::Pending` due to a spurious failure, then + /// the `Waker` will be notified when the situation causing the spurious + /// failure has been resolved. Note that receiving such a wakeup does not + /// guarantee that the next call will succeed — it could fail with another + /// spurious failure. + /// + /// For full documentation, see [`Receiver::poll_recv`]. + #[deprecated( + note = "use `ExExNotifications::poll_next` and its `Stream` implementation instead" + )] + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + self.notifications.poll_recv(cx) + } + + // TODO(alexey): make it public when backfill is implemented in [`ExExNotificationsWithHead`] + /// Subscribe to notifications with the given head. + /// + /// Notifications will be sent starting from the head, not inclusive. For example, if + /// `head.number == 10`, then the first notification will be with `block.number == 11`. + #[allow(dead_code)] + fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { + ExExNotificationsWithHead { + components: self.components, + notifications: self.notifications, + head, + } + } +} + +impl Stream for ExExNotifications { + type Item = ExExNotification; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().notifications.poll_recv(cx) + } +} + +/// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are +/// committed or reverted after the given head. +#[derive(Debug)] +pub struct ExExNotificationsWithHead { + #[allow(dead_code)] + components: Node, + notifications: Receiver, + head: ExExHead, +} + +impl Stream for ExExNotificationsWithHead { + type Item = ExExNotification; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + // TODO(alexey): backfill according to the head + loop { + let Some(notification) = ready!(this.notifications.poll_recv(cx)) else { + return Poll::Ready(None) + }; + + if notification + .committed_chain() + .or_else(|| notification.reverted_chain()) + .map_or(false, |chain| chain.first().number > this.head.block.number) + { + return Poll::Ready(Some(notification)) + } + } + } +} + /// Metrics for the `ExEx` manager. #[derive(Metrics)] #[metrics(scope = "exex_manager")] @@ -473,13 +605,14 @@ impl Clone for ExExManagerHandle { #[cfg(test)] mod tests { use super::*; + use futures::StreamExt; use reth_primitives::{SealedBlockWithSenders, B256}; use reth_provider::Chain; #[tokio::test] async fn test_delivers_events() { let (mut exex_handle, event_tx, mut _notification_rx) = - ExExHandle::new("test_exex".to_string()); + ExExHandle::new("test_exex".to_string(), ()); // Send an event and check that it's delivered correctly event_tx.send(ExExEvent::FinishedHeight(42)).unwrap(); @@ -489,7 +622,7 @@ mod tests { #[tokio::test] async fn test_has_exexs() { - let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string()); + let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ()); assert!(!ExExManager::new(vec![], 0).handle.has_exexs()); @@ -498,7 +631,7 @@ mod tests { #[tokio::test] async fn test_has_capacity() { - let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string()); + let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ()); assert!(!ExExManager::new(vec![], 0).handle.has_capacity()); @@ -507,7 +640,7 @@ mod tests { #[test] fn test_push_notification() { - let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string()); + let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), ()); // Create a mock ExExManager and add the exex_handle to it let mut exex_manager = ExExManager::new(vec![exex_handle], 10); @@ -552,7 +685,7 @@ mod tests { #[test] fn test_update_capacity() { - let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string()); + let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), ()); // Create a mock ExExManager and add the exex_handle to it let max_capacity = 5; @@ -587,7 +720,7 @@ mod tests { #[tokio::test] async fn test_updates_block_height() { let (exex_handle, event_tx, mut _notification_rx) = - ExExHandle::new("test_exex".to_string()); + ExExHandle::new("test_exex".to_string(), ()); // Check initial block height assert!(exex_handle.finished_height.is_none()); @@ -624,8 +757,8 @@ mod tests { #[tokio::test] async fn test_updates_block_height_lower() { // Create two `ExExHandle` instances - let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string()); - let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string()); + let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string(), ()); + let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string(), ()); // Send events to update the block heights of the two handles, with the second being lower event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap(); @@ -655,8 +788,8 @@ mod tests { #[tokio::test] async fn test_updates_block_height_greater() { // Create two `ExExHandle` instances - let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string()); - let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string()); + let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string(), ()); + let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string(), ()); // Assert that the initial block height is `None` for the first `ExExHandle`. assert!(exex_handle1.finished_height.is_none()); @@ -692,7 +825,7 @@ mod tests { #[tokio::test] async fn test_exex_manager_capacity() { - let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string()); + let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ()); // Create an ExExManager with a small max capacity let max_capacity = 2; @@ -730,7 +863,7 @@ mod tests { #[tokio::test] async fn exex_handle_new() { - let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ()); // Check initial state assert_eq!(exex_handle.id, "test_exex"); @@ -759,7 +892,7 @@ mod tests { // Send a notification and ensure it's received correctly match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notification_rx.recv().await.unwrap(); + let received_notification = notifications.next().await.unwrap(); assert_eq!(received_notification, notification); } Poll::Pending => panic!("Notification send is pending"), @@ -772,7 +905,7 @@ mod tests { #[tokio::test] async fn test_notification_if_finished_height_gt_chain_tip() { - let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ()); // Set finished_height to a value higher than the block tip exex_handle.finished_height = Some(15); @@ -790,9 +923,17 @@ mod tests { // Send the notification match exex_handle.send(&mut cx, &(22, notification)) { Poll::Ready(Ok(())) => { - // The notification should be skipped, so nothing should be sent. - // Check that the receiver channel is indeed empty - assert!(notification_rx.try_recv().is_err(), "Receiver channel should be empty"); + poll_fn(|cx| { + // The notification should be skipped, so nothing should be sent. + // Check that the receiver channel is indeed empty + assert_eq!( + notifications.poll_next_unpin(cx), + Poll::Pending, + "Receiver channel should be empty" + ); + Poll::Ready(()) + }) + .await; } Poll::Pending | Poll::Ready(Err(_)) => { panic!("Notification should not be pending or fail"); @@ -805,7 +946,7 @@ mod tests { #[tokio::test] async fn test_sends_chain_reorged_notification() { - let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ()); let notification = ExExNotification::ChainReorged { old: Arc::new(Chain::default()), @@ -821,7 +962,7 @@ mod tests { // Send the notification match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notification_rx.recv().await.unwrap(); + let received_notification = notifications.next().await.unwrap(); assert_eq!(received_notification, notification); } Poll::Pending | Poll::Ready(Err(_)) => { @@ -835,7 +976,7 @@ mod tests { #[tokio::test] async fn test_sends_chain_reverted_notification() { - let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ()); let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) }; @@ -848,7 +989,7 @@ mod tests { // Send the notification match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notification_rx.recv().await.unwrap(); + let received_notification = notifications.next().await.unwrap(); assert_eq!(received_notification, notification); } Poll::Pending | Poll::Ready(Err(_)) => { diff --git a/crates/exex/test-utils/src/lib.rs b/crates/exex/test-utils/src/lib.rs index bfa18a4549e8..dd4907649c3a 100644 --- a/crates/exex/test-utils/src/lib.rs +++ b/crates/exex/test-utils/src/lib.rs @@ -19,7 +19,7 @@ use reth_db::{ use reth_db_common::init::init_genesis; use reth_evm::test_utils::MockExecutorProvider; use reth_execution_types::Chain; -use reth_exex::{ExExContext, ExExEvent, ExExNotification}; +use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotifications}; use reth_network::{config::SecretKey, NetworkConfigBuilder, NetworkManager}; use reth_node_api::{ FullNodeTypes, FullNodeTypesAdapter, NodeTypes, NodeTypesWithDBAdapter, NodeTypesWithEngine, @@ -296,13 +296,14 @@ pub async fn test_exex_context_with_chain_spec( let (events_tx, events_rx) = tokio::sync::mpsc::unbounded_channel(); let (notifications_tx, notifications_rx) = tokio::sync::mpsc::channel(1); + let notifications = ExExNotifications::new(components.clone(), notifications_rx); let ctx = ExExContext { head, config: NodeConfig::test(), reth_config: reth_config::Config::default(), events: events_tx, - notifications: notifications_rx, + notifications, components, }; diff --git a/crates/exex/types/Cargo.toml b/crates/exex/types/Cargo.toml index a70bcc1dd43c..75cd498cd173 100644 --- a/crates/exex/types/Cargo.toml +++ b/crates/exex/types/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] # reth +reth-primitives.workspace = true reth-provider.workspace = true # reth diff --git a/crates/exex/types/src/head.rs b/crates/exex/types/src/head.rs new file mode 100644 index 000000000000..1552c2380fdb --- /dev/null +++ b/crates/exex/types/src/head.rs @@ -0,0 +1,9 @@ +use reth_primitives::BlockNumHash; + +#[allow(clippy::doc_markdown)] +/// A head of the ExEx. It determines the highest block committed to the internal ExEx state. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ExExHead { + /// The head block. + pub block: BlockNumHash, +} diff --git a/crates/exex/types/src/lib.rs b/crates/exex/types/src/lib.rs index 3c0ca731f216..8e71fbc619b3 100644 --- a/crates/exex/types/src/lib.rs +++ b/crates/exex/types/src/lib.rs @@ -9,7 +9,9 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] mod finished_height; +mod head; mod notification; pub use finished_height::FinishedExExHeight; +pub use head::ExExHead; pub use notification::ExExNotification; diff --git a/crates/node/builder/src/launch/exex.rs b/crates/node/builder/src/launch/exex.rs index 86bb14ecf551..798ddea2d829 100644 --- a/crates/node/builder/src/launch/exex.rs +++ b/crates/node/builder/src/launch/exex.rs @@ -48,7 +48,7 @@ impl ExExLauncher { for (id, exex) in extensions { // create a new exex handle - let (handle, events, notifications) = ExExHandle::new(id.clone()); + let (handle, events, notifications) = ExExHandle::new(id.clone(), components.clone()); exex_handles.push(handle); // create the launch context for the exex