Skip to content

Commit

Permalink
fix wait for head logic
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Sep 11, 2024
1 parent 8ff9b76 commit e3b7761
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 22 deletions.
1 change: 1 addition & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
msrv = "1.81"
too-large-for-stack = 128
doc-valid-idents = ["P2P", "ExEx", "ExExes", "IPv4", "IPv6", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"]
58 changes: 41 additions & 17 deletions crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
BackfillJobFactory, ExExEvent, ExExNotification, FinishedExExHeight, StreamBackfillJob,
};
use eyre::OptionExt;
use futures::{Stream, StreamExt};
use metrics::Gauge;
use reth_chainspec::Head;
Expand Down Expand Up @@ -254,8 +255,11 @@ pub struct ExExNotificationsWithHead<Node: FullNodeComponents> {
components: Node,
notifications: Receiver<ExExNotification>,
head: ExExHead,
/// The backfill job to run before consuming any notifications.
backfill_job: Option<StreamBackfillJob<Node::Executor, Node::Provider, Chain>>,
compare_head: bool,
/// Whether to wait for the node head to be at the same height as the ExEx head, and then
/// call the [`Self::heal`].
wait_for_head: bool,
}

impl<Node: FullNodeComponents> ExExNotificationsWithHead<Node> {
Expand All @@ -271,7 +275,7 @@ impl<Node: FullNodeComponents> ExExNotificationsWithHead<Node> {
notifications,
head: exex_head,
backfill_job: None,
compare_head: false,
wait_for_head: false,
};

notifications.heal(node_head)?;
Expand Down Expand Up @@ -337,7 +341,7 @@ impl<Node: FullNodeComponents> ExExNotificationsWithHead<Node> {

// TODO(alexey): wait until the node head is at the same height as the ExEx head
// and then repeat the process above
self.compare_head = true;
self.wait_for_head = true;
}
};

Expand Down Expand Up @@ -367,34 +371,54 @@ impl<Node: FullNodeComponents + Unpin> Stream for ExExNotificationsWithHead<Node
return Poll::Ready(None)
};

let chain = notification.committed_chain().or_else(|| notification.reverted_chain());

if this.compare_head {
if let Some(block) =
chain.as_ref().and_then(|chain| chain.blocks().get(&this.head.block.number))
{
let chain_and_tip = notification
.committed_chain()
.map(|chain| (chain.clone(), chain.tip().number))
.or_else(|| {
notification
.reverted_chain()
.map(|chain| (chain.clone(), chain.first().number - 1))
});

if this.wait_for_head {
// If we are waiting for the node head to be at the same height as the ExEx head,
// then we need to check if the ExEx is on the canonical chain. To do this, we need
// to get the block at the ExEx head's height from new chain, and compare its hash
// to the ExEx head's hash.
if let Some((block, tip)) = chain_and_tip.as_ref().and_then(|(chain, tip)| {
chain.blocks().get(&this.head.block.number).zip(Some(tip))
}) {
if block.hash() == this.head.block.hash {
// ExEx is on the canonical chain
this.compare_head = false;
// ExEx is on the canonical chain, proceed with the notification
this.wait_for_head = false;
} else {
// ExEx is not on the canonical chain, heal
let tip = this
.components
.provider()
.sealed_header(*tip)?
.ok_or_eyre("node head not found")?;
let total_difficulty = this
.components
.provider()
.header_td_by_number(block.number)?
.header_td_by_number(tip.number)?
.unwrap_or(U256::MAX);
this.heal(Head::new(
block.number,
block.hash(),
block.difficulty,
tip.number,
tip.hash(),
tip.difficulty,
total_difficulty,
block.timestamp,
tip.timestamp,
))?;
}
}
}

if chain.map_or(false, |chain| chain.first().number > this.head.block.number) {
if notification
.committed_chain()
.or_else(|| notification.reverted_chain())
.map_or(false, |chain| chain.first().number > this.head.block.number)
{
return Poll::Ready(Some(Ok(notification)))
}
}
Expand Down
1 change: 0 additions & 1 deletion crates/exex/types/src/head.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use reth_primitives::BlockNumHash;

#[allow(clippy::doc_markdown)]
/// A head of the ExEx. It determines the highest block committed to the internal ExEx state.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ExExHead {
Expand Down
3 changes: 1 addition & 2 deletions crates/node/core/src/args/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ pub struct NetworkArgs {
#[command(flatten)]
pub discovery: DiscoveryArgs,

#[allow(clippy::doc_markdown)]
/// Comma separated enode URLs of trusted peers for P2P connections.
///
/// --trusted-peers enode://abcd@192.168.0.1:30303
/// --trusted-peers <enode://abcd@192.168.0.1:30303>
#[arg(long, value_delimiter = ',')]
pub trusted_peers: Vec<TrustedPeer>,

Expand Down
2 changes: 0 additions & 2 deletions crates/prune/prune/src/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ impl<DB: Database> Pruner<DB, ()> {
///
/// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
/// to prune.
#[allow(clippy::doc_markdown)]
pub fn run(
&mut self,
provider: &DatabaseProviderRW<DB>,
Expand All @@ -321,7 +320,6 @@ impl<N: ProviderNodeTypes> Pruner<N::DB, ProviderFactory<N>> {
///
/// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
/// to prune.
#[allow(clippy::doc_markdown)]
pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
let provider = self.provider_factory.provider_rw()?;
let result = self.run_with_provider(&provider, tip_block_number);
Expand Down

0 comments on commit e3b7761

Please sign in to comment.