Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ForkChoiceNotifications for BlockchainProvider and NoopProvider #10231

Merged
merged 5 commits into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/chain-state/src/chain_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ impl ChainInfoTracker {
let _ = h.replace(header);
});
}

/// Subscribe to the finalized block.
pub fn subscribe_to_finalized_block(&self) -> watch::Receiver<Option<SealedHeader>> {
self.inner.finalized_block.subscribe()
}

/// Subscribe to the safe block.
pub fn subscribe_to_safe_block(&self) -> watch::Receiver<Option<SealedHeader>> {
self.inner.safe_block.subscribe()
}
}

/// Container type for all chain info fields
Expand Down
28 changes: 14 additions & 14 deletions crates/chain-state/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,20 @@ impl CanonStateNotification {

/// Wrapper around a broadcast receiver that receives fork choice notifications.
#[derive(Debug, Deref, DerefMut)]
pub struct ForkChoiceNotifications(broadcast::Receiver<SealedHeader>);
pub struct ForkChoiceNotifications(pub watch::Receiver<Option<SealedHeader>>);

/// A trait that allows to register to fork choice related events
/// and get notified when a new fork choice is available.
pub trait ForkChoiceSubscriptions: Send + Sync {
/// Get notified when a new head of the chain is selected.
fn subscribe_to_fork_choice(&self) -> ForkChoiceNotifications;
/// Get notified when a new safe block of the chain is selected.
fn subscribe_to_safe_block(&self) -> ForkChoiceNotifications;

/// Convenience method to get a stream of the new head of the chain.
/// Get notified when a new finalized block of the chain is selected.
fn subscribe_to_finalized_block(&self) -> ForkChoiceNotifications;

/// Convenience method to get a stream of the new safe blocks of the chain.
fn fork_choice_stream(&self) -> ForkChoiceStream {
ForkChoiceStream { st: BroadcastStream::new(self.subscribe_to_fork_choice().0) }
ForkChoiceStream { st: WatchStream::new(self.subscribe_to_safe_block().0) }
}
}

Expand All @@ -159,22 +162,19 @@ pub trait ForkChoiceSubscriptions: Send + Sync {
#[pin_project::pin_project]
pub struct ForkChoiceStream {
#[pin]
st: BroadcastStream<SealedHeader>,
st: WatchStream<Option<SealedHeader>>,
}

impl Stream for ForkChoiceStream {
type Item = SealedHeader;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().st.poll_next(cx)) {
Some(Ok(notification)) => Poll::Ready(Some(notification)),
Some(Err(err)) => {
debug!(%err, "finalized header notification stream lagging behind");
continue
}
None => Poll::Ready(None),
};
match ready!(self.as_mut().project().st.poll_next(cx)) {
Some(Some(notification)) => return Poll::Ready(Some(notification)),
Some(None) => continue,
None => return Poll::Ready(None),
}
}
}
}
Expand Down
17 changes: 16 additions & 1 deletion crates/storage/provider/src/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use reth_blockchain_tree_api::{
BlockValidationKind, BlockchainTreeEngine, BlockchainTreeViewer, CanonicalOutcome,
InsertPayloadOk,
};
use reth_chain_state::ChainInfoTracker;
use reth_chain_state::{ChainInfoTracker, ForkChoiceNotifications, ForkChoiceSubscriptions};
use reth_chainspec::{ChainInfo, ChainSpec};
use reth_db_api::{
database::Database,
Expand Down Expand Up @@ -940,6 +940,21 @@ where
}
}

impl<DB> ForkChoiceSubscriptions for BlockchainProvider<DB>
where
DB: Send + Sync,
{
fn subscribe_to_safe_block(&self) -> ForkChoiceNotifications {
let receiver = self.chain_info.subscribe_to_safe_block();
ForkChoiceNotifications(receiver)
}

fn subscribe_to_finalized_block(&self) -> ForkChoiceNotifications {
let receiver = self.chain_info.subscribe_to_finalized_block();
ForkChoiceNotifications(receiver)
}
}

impl<DB> ChangeSetReader for BlockchainProvider<DB>
where
DB: Database,
Expand Down
19 changes: 17 additions & 2 deletions crates/storage/provider/src/test_utils/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use std::{
sync::Arc,
};

use reth_chain_state::{CanonStateNotifications, CanonStateSubscriptions};
use reth_chain_state::{
CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications,
ForkChoiceSubscriptions,
};
use reth_chainspec::{ChainInfo, ChainSpec, MAINNET};
use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
use reth_errors::ProviderError;
Expand All @@ -21,7 +24,7 @@ use reth_storage_api::StateProofProvider;
use reth_storage_errors::provider::ProviderResult;
use reth_trie::{updates::TrieUpdates, AccountProof, HashedPostState};
use revm::primitives::{BlockEnv, CfgEnvWithHandlerCfg};
use tokio::sync::broadcast;
use tokio::sync::{broadcast, watch};

use crate::{
providers::StaticFileProvider,
Expand Down Expand Up @@ -532,3 +535,15 @@ impl CanonStateSubscriptions for NoopProvider {
broadcast::channel(1).1
}
}

impl ForkChoiceSubscriptions for NoopProvider {
fn subscribe_to_safe_block(&self) -> ForkChoiceNotifications {
let (_, rx) = watch::channel(None);
ForkChoiceNotifications(rx)
}

fn subscribe_to_finalized_block(&self) -> ForkChoiceNotifications {
let (_, rx) = watch::channel(None);
ForkChoiceNotifications(rx)
}
}
Loading