Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ExExEvent::FinishedHeight takes BlockNumHash instead #11278

Merged
merged 8 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion book/developers/exex/hello-world.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ async fn my_exex<Node: FullNodeComponents>(mut ctx: ExExContext<Node>) -> eyre::

if let Some(committed_chain) = notification.committed_chain() {
ctx.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}
}

Expand Down
2 changes: 1 addition & 1 deletion book/developers/exex/how-it-works.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ event to signify what blocks have been processed. This event is used by Reth to

An ExEx will only receive notifications for block numbers greater than the block in the most recently emitted `FinishedHeight` event.

To clarify: if an ExEx emits `ExExEvent::FinishedHeight(0)` it will receive notifications for any `block_number > 0`.
To clarify: if an ExEx emits `ExExEvent::FinishedHeight` for `block #0` it will receive notifications for any `block_number > 0`.
4 changes: 2 additions & 2 deletions book/developers/exex/remote.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ async fn remote_exex<Node: FullNodeComponents>(
while let Some(notification) = ctx.notifications.next().await {
if let Some(committed_chain) = notification.committed_chain() {
ctx.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}

info!("Notification sent to the gRPC server");
Expand Down Expand Up @@ -388,7 +388,7 @@ async fn remote_exex<Node: FullNodeComponents>(
while let Some(notification) = ctx.notifications.next().await {
if let Some(committed_chain) = notification.committed_chain() {
ctx.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}

info!(?notification, "Notification sent to the gRPC server");
Expand Down
4 changes: 2 additions & 2 deletions book/developers/exex/tracking-state.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl<Node: FullNodeComponents> Future for MyExEx<Node> {
if let Some(committed_chain) = notification.committed_chain() {
this.ctx
.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}
}

Expand Down Expand Up @@ -152,7 +152,7 @@ impl<Node: FullNodeComponents> Future for MyExEx<Node> {

this.ctx
.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}

if let Some(first_block) = this.first_block {
Expand Down
4 changes: 2 additions & 2 deletions crates/exex/exex/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use alloy_primitives::BlockNumber;
use reth_primitives::BlockNumHash;

/// Events emitted by an `ExEx`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand All @@ -9,5 +9,5 @@ pub enum ExExEvent {
/// meaning that Reth is allowed to prune them.
///
/// On reorgs, it's possible for the height to go down.
FinishedHeight(BlockNumber),
FinishedHeight(BlockNumHash),
}
43 changes: 25 additions & 18 deletions crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::{
wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle,
};
use alloy_primitives::BlockNumber;
use futures::StreamExt;
use metrics::Gauge;
use reth_chain_state::ForkChoiceStream;
use reth_chainspec::Head;
use reth_metrics::{metrics::Counter, Metrics};
use reth_primitives::SealedHeader;
use reth_primitives::{BlockNumHash, SealedHeader};
use reth_tracing::tracing::debug;
use std::{
collections::VecDeque,
Expand Down Expand Up @@ -53,10 +52,10 @@ pub struct ExExHandle {
receiver: UnboundedReceiver<ExExEvent>,
/// The ID of the next notification to send to this `ExEx`.
next_notification_id: usize,
/// The finished block number of the `ExEx`.
/// The finished block of the `ExEx`.
///
/// If this is `None`, the `ExEx` has not emitted a `FinishedHeight` event.
finished_height: Option<BlockNumber>,
finished_height: Option<BlockNumHash>,
}

impl ExExHandle {
Expand Down Expand Up @@ -105,11 +104,11 @@ impl ExExHandle {
// Skip the chain commit notification if the finished height of the ExEx is
// higher than or equal to the tip of the new notification.
// I.e., the ExEx has already processed the notification.
if finished_height >= new.tip().number {
if finished_height.number >= new.tip().number {
debug!(
exex_id = %self.id,
%notification_id,
%finished_height,
?finished_height,
new_tip = %new.tip().number,
"Skipping notification"
);
Expand Down Expand Up @@ -377,7 +376,7 @@ impl Future for ExExManager {

// update watch channel block number
let finished_height = self.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
exex.finished_height.map_or(Err(()), |height| Ok(height.min(curr)))
exex.finished_height.map_or(Err(()), |height| Ok(height.number.min(curr)))
});
if let Ok(finished_height) = finished_height {
let _ = self.finished_height.send(FinishedExExHeight::Height(finished_height));
Expand Down Expand Up @@ -532,9 +531,10 @@ mod tests {
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());

// Send an event and check that it's delivered correctly
event_tx.send(ExExEvent::FinishedHeight(42)).unwrap();
let event = ExExEvent::FinishedHeight(BlockNumHash::new(42, B256::random()));
event_tx.send(event).unwrap();
let received_event = exex_handle.receiver.recv().await.unwrap();
assert_eq!(received_event, ExExEvent::FinishedHeight(42));
assert_eq!(received_event, event);
}

#[tokio::test]
Expand Down Expand Up @@ -672,7 +672,8 @@ mod tests {
assert!(exex_handle.finished_height.is_none());

// Update the block height via an event
event_tx.send(ExExEvent::FinishedHeight(42)).unwrap();
let block = BlockNumHash::new(42, B256::random());
event_tx.send(ExExEvent::FinishedHeight(block)).unwrap();

// Create a mock ExExManager and add the exex_handle to it
let exex_manager = ExExManager::new(
Expand All @@ -690,7 +691,7 @@ mod tests {

// Check that the block height was updated
let updated_exex_handle = &pinned_manager.exex_handles[0];
assert_eq!(updated_exex_handle.finished_height, Some(42));
assert_eq!(updated_exex_handle.finished_height, Some(block));

// Get the receiver for the finished height
let mut receiver = pinned_manager.handle.finished_height();
Expand All @@ -716,9 +717,12 @@ mod tests {
let (exex_handle2, event_tx2, _) =
ExExHandle::new("test_exex2".to_string(), Head::default(), (), (), wal.handle());

let block1 = BlockNumHash::new(42, B256::random());
let block2 = BlockNumHash::new(10, B256::random());

// Send events to update the block heights of the two handles, with the second being lower
event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap();
event_tx2.send(ExExEvent::FinishedHeight(10)).unwrap();
event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();

let exex_manager = ExExManager::new(
vec![exex_handle1, exex_handle2],
Expand Down Expand Up @@ -760,9 +764,12 @@ mod tests {
// Assert that the initial block height is `None` for the first `ExExHandle`.
assert!(exex_handle1.finished_height.is_none());

let block1 = BlockNumHash::new(42, B256::random());
let block2 = BlockNumHash::new(100, B256::random());

// Send events to update the block heights of the two handles, with the second being higher.
event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap();
event_tx2.send(ExExEvent::FinishedHeight(100)).unwrap();
event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();

let exex_manager = ExExManager::new(
vec![exex_handle1, exex_handle2],
Expand Down Expand Up @@ -896,7 +903,7 @@ mod tests {
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());

// Set finished_height to a value higher than the block tip
exex_handle.finished_height = Some(15);
exex_handle.finished_height = Some(BlockNumHash::new(15, B256::random()));

let mut block1 = SealedBlockWithSenders::default();
block1.block.header.set_hash(B256::new([0x01; 32]));
Expand Down Expand Up @@ -947,7 +954,7 @@ mod tests {

// Even if the finished height is higher than the tip of the new chain, the reorg
// notification should be received
exex_handle.finished_height = Some(u64::MAX);
exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));

let mut cx = Context::from_waker(futures::task::noop_waker_ref());

Expand Down Expand Up @@ -978,7 +985,7 @@ mod tests {

// Even if the finished height is higher than the tip of the new chain, the reorg
// notification should be received
exex_handle.finished_height = Some(u64::MAX);
exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));

let mut cx = Context::from_waker(futures::task::noop_waker_ref());

Expand Down
4 changes: 2 additions & 2 deletions crates/exex/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use reth_node_ethereum::{
EthEngineTypes, EthEvmConfig,
};
use reth_payload_builder::noop::NoopPayloadBuilderService;
use reth_primitives::{Head, SealedBlockWithSenders};
use reth_primitives::{BlockNumHash, Head, SealedBlockWithSenders};
use reth_provider::{
providers::{BlockchainProvider, StaticFileProvider},
BlockReader, ProviderFactory,
Expand Down Expand Up @@ -223,7 +223,7 @@ impl TestExExHandle {
/// Asserts that the Execution Extension emitted a `FinishedHeight` event with the correct
/// height.
#[track_caller]
pub fn assert_event_finished_height(&mut self, height: u64) -> eyre::Result<()> {
pub fn assert_event_finished_height(&mut self, height: BlockNumHash) -> eyre::Result<()> {
let event = self.events_rx.try_recv()?;
assert_eq!(event, ExExEvent::FinishedHeight(height));
Ok(())
Expand Down
Loading