Skip to content

Commit

Permalink
chore: remove HeaderSyncMode::Continuous & debug.continuous (#8714)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Oliver Nordbjerg <hi@notbjerg.me>
  • Loading branch information
3 people authored Jun 12, 2024
1 parent fcd28f6 commit 0de932d
Show file tree
Hide file tree
Showing 21 changed files with 66 additions and 165 deletions.
11 changes: 6 additions & 5 deletions bin/reth/src/commands/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ use reth_node_core::{
},
dirs::{ChainPath, DataDirPath},
};
use reth_primitives::ChainSpec;
use reth_provider::{
providers::StaticFileProvider, HeaderSyncMode, ProviderFactory, StaticFileProviderFactory,
};
use reth_primitives::{ChainSpec, B256};
use reth_provider::{providers::StaticFileProvider, ProviderFactory, StaticFileProviderFactory};
use reth_stages::{sets::DefaultStages, Pipeline, PipelineTarget};
use reth_static_file::StaticFileProducer;
use std::{path::PathBuf, sync::Arc};
use tokio::sync::watch;
use tracing::{debug, info, warn};

/// Struct to hold config and datadir paths
Expand Down Expand Up @@ -127,11 +126,13 @@ impl EnvironmentArgs {

info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");

let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);

// Builds and executes an unwind-only pipeline
let mut pipeline = Pipeline::builder()
.add_stages(DefaultStages::new(
factory.clone(),
HeaderSyncMode::Continuous,
tip_rx,
Arc::new(EthBeaconConsensus::new(self.chain.clone())),
NoopHeaderDownloader::default(),
NoopBodiesDownloader::default(),
Expand Down
5 changes: 2 additions & 3 deletions bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use reth_network_api::NetworkInfo;
use reth_network_p2p::{bodies::client::BodiesClient, headers::client::HeadersClient};
use reth_primitives::{BlockHashOrNumber, BlockNumber, B256};
use reth_provider::{
BlockExecutionWriter, ChainSpecProvider, HeaderSyncMode, ProviderFactory, StageCheckpointReader,
BlockExecutionWriter, ChainSpecProvider, ProviderFactory, StageCheckpointReader,
};
use reth_prune_types::PruneModes;
use reth_stages::{
Expand Down Expand Up @@ -86,13 +86,12 @@ impl Command {
let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
let executor = block_executor!(provider_factory.chain_spec());

let header_mode = HeaderSyncMode::Tip(tip_rx);
let pipeline = Pipeline::builder()
.with_tip_sender(tip_tx)
.add_stages(
DefaultStages::new(
provider_factory.clone(),
header_mode,
tip_rx,
Arc::clone(&consensus),
header_downloader,
body_downloader,
Expand Down
1 change: 0 additions & 1 deletion bin/reth/src/commands/debug_cmd/replay_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ impl Command {
Box::new(ctx.task_executor.clone()),
Box::new(network),
None,
false,
payload_builder,
None,
u64::MAX,
Expand Down
6 changes: 3 additions & 3 deletions bin/reth/src/commands/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use reth_network_p2p::{
use reth_node_events::node::NodeEvent;
use reth_primitives::B256;
use reth_provider::{
BlockNumReader, ChainSpecProvider, HeaderProvider, HeaderSyncMode, ProviderError,
ProviderFactory, StageCheckpointReader,
BlockNumReader, ChainSpecProvider, HeaderProvider, ProviderError, ProviderFactory,
StageCheckpointReader,
};
use reth_prune_types::PruneModes;
use reth_stages::{prelude::*, Pipeline, StageId, StageSet};
Expand Down Expand Up @@ -208,7 +208,7 @@ where
.add_stages(
DefaultStages::new(
provider_factory.clone(),
HeaderSyncMode::Tip(tip_rx),
tip_rx,
consensus.clone(),
header_downloader,
body_downloader,
Expand Down
5 changes: 2 additions & 3 deletions bin/reth/src/commands/stage/unwind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use reth_node_core::args::NetworkArgs;
use reth_primitives::{BlockHashOrNumber, BlockNumber, B256};
use reth_provider::{
BlockExecutionWriter, BlockNumReader, ChainSpecProvider, FinalizedBlockReader,
FinalizedBlockWriter, HeaderSyncMode, ProviderFactory, StaticFileProviderFactory,
FinalizedBlockWriter, ProviderFactory, StaticFileProviderFactory,
};
use reth_prune_types::PruneModes;
use reth_stages::{
Expand Down Expand Up @@ -105,13 +105,12 @@ impl Command {
let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
let executor = block_executor!(provider_factory.chain_spec());

let header_mode = HeaderSyncMode::Tip(tip_rx);
let pipeline = Pipeline::builder()
.with_tip_sender(tip_tx)
.add_stages(
DefaultStages::new(
provider_factory.clone(),
header_mode,
tip_rx,
Arc::clone(&consensus),
NoopHeaderDownloader::default(),
NoopBodiesDownloader::default(),
Expand Down
5 changes: 0 additions & 5 deletions book/cli/reth/node.md
Original file line number Diff line number Diff line change
Expand Up @@ -447,11 +447,6 @@ Builder:
[default: 3]

Debug:
--debug.continuous
Prompt the downloader to download blocks one at a time.

NOTE: This is for testing purposes only.

--debug.terminate
Flag indicating whether the node should be terminated after the pipeline sync

Expand Down
9 changes: 0 additions & 9 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ where
task_spawner: Box<dyn TaskSpawner>,
sync_state_updater: Box<dyn NetworkSyncUpdater>,
max_block: Option<BlockNumber>,
run_pipeline_continuously: bool,
payload_builder: PayloadBuilderHandle<EngineT>,
target: Option<B256>,
pipeline_run_threshold: u64,
Expand All @@ -254,7 +253,6 @@ where
task_spawner,
sync_state_updater,
max_block,
run_pipeline_continuously,
payload_builder,
target,
pipeline_run_threshold,
Expand Down Expand Up @@ -285,7 +283,6 @@ where
task_spawner: Box<dyn TaskSpawner>,
sync_state_updater: Box<dyn NetworkSyncUpdater>,
max_block: Option<BlockNumber>,
run_pipeline_continuously: bool,
payload_builder: PayloadBuilderHandle<EngineT>,
target: Option<B256>,
pipeline_run_threshold: u64,
Expand All @@ -299,7 +296,6 @@ where
pipeline,
client,
task_spawner.clone(),
run_pipeline_continuously,
max_block,
blockchain.chain_spec(),
event_sender.clone(),
Expand Down Expand Up @@ -1448,11 +1444,6 @@ where
return Ok(())
}

// update the canon chain if continuous is enabled
if self.sync.run_pipeline_continuously() {
self.set_canonical_head(ctrl.block_number().unwrap_or_default())?;
}

let sync_target_state = match self.forkchoice_state_tracker.sync_target_state() {
Some(current_state) => current_state,
None => {
Expand Down
23 changes: 3 additions & 20 deletions crates/consensus/beacon/src/engine/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ where
/// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for
/// ordering. This means the blocks will be popped from the heap with ascending block numbers.
range_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlock>>,
/// If enabled, the pipeline will be triggered continuously, as soon as it becomes idle
run_pipeline_continuously: bool,
/// Max block after which the consensus engine would terminate the sync. Used for debugging
/// purposes.
max_block: Option<BlockNumber>,
Expand All @@ -73,7 +71,6 @@ where
pipeline: Pipeline<DB>,
client: Client,
pipeline_task_spawner: Box<dyn TaskSpawner>,
run_pipeline_continuously: bool,
max_block: Option<BlockNumber>,
chain_spec: Arc<ChainSpec>,
event_sender: EventSender<BeaconConsensusEngineEvent>,
Expand All @@ -89,7 +86,6 @@ where
inflight_full_block_requests: Vec::new(),
inflight_block_range_requests: Vec::new(),
range_buffered_blocks: BinaryHeap::new(),
run_pipeline_continuously,
event_sender,
max_block,
metrics: EngineSyncMetrics::default(),
Expand Down Expand Up @@ -122,11 +118,6 @@ where
self.update_block_download_metrics();
}

/// Returns whether or not the sync controller is set to run the pipeline continuously.
pub(crate) const fn run_pipeline_continuously(&self) -> bool {
self.run_pipeline_continuously
}

/// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`.
#[allow(dead_code)]
pub(crate) const fn is_pipeline_sync_pending(&self) -> bool {
Expand Down Expand Up @@ -271,20 +262,14 @@ where
fn try_spawn_pipeline(&mut self) -> Option<EngineSyncEvent> {
match &mut self.pipeline_state {
PipelineState::Idle(pipeline) => {
let target = self.pending_pipeline_target.take();

if target.is_none() && !self.run_pipeline_continuously {
// nothing to sync
return None
}

let target = self.pending_pipeline_target.take()?;
let (tx, rx) = oneshot::channel();

let pipeline = pipeline.take().expect("exists");
self.pipeline_task_spawner.spawn_critical_blocking(
"pipeline task",
Box::pin(async move {
let result = pipeline.run_as_fut(target).await;
let result = pipeline.run_as_fut(Some(target)).await;
let _ = tx.send(result);
}),
);
Expand All @@ -294,7 +279,7 @@ where
// outdated (included in the range the pipeline is syncing anyway)
self.clear_block_download_requests();

Some(EngineSyncEvent::PipelineStarted(target))
Some(EngineSyncEvent::PipelineStarted(Some(target)))
}
PipelineState::Running(_) => None,
}
Expand Down Expand Up @@ -550,8 +535,6 @@ mod tests {
pipeline,
client,
Box::<TokioTaskExecutor>::default(),
// run_pipeline_continuously: false here until we want to test this
false,
self.max_block,
chain_spec,
Default::default(),
Expand Down
5 changes: 2 additions & 3 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::{BlockNumber, ChainSpec, B256};
use reth_provider::{
providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
ExecutionOutcome, HeaderSyncMode,
ExecutionOutcome,
};
use reth_prune::Pruner;
use reth_prune_types::PruneModes;
Expand Down Expand Up @@ -371,7 +371,7 @@ where

Pipeline::builder().add_stages(DefaultStages::new(
provider_factory.clone(),
HeaderSyncMode::Tip(tip_rx.clone()),
tip_rx.clone(),
Arc::clone(&consensus),
header_downloader,
body_downloader,
Expand Down Expand Up @@ -418,7 +418,6 @@ where
Box::<TokioTaskExecutor>::default(),
Box::<NoopSyncStateUpdater>::default(),
None,
false,
payload_builder,
None,
self.base_config.pipeline_run_threshold.unwrap_or(MIN_BLOCKS_FOR_PIPELINE_RUN),
Expand Down
8 changes: 1 addition & 7 deletions crates/node-core/src/args/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,14 @@ use std::path::PathBuf;
#[derive(Debug, Clone, Args, PartialEq, Eq, Default)]
#[command(next_help_heading = "Debug")]
pub struct DebugArgs {
/// Prompt the downloader to download blocks one at a time.
///
/// NOTE: This is for testing purposes only.
#[arg(long = "debug.continuous", help_heading = "Debug", conflicts_with = "tip")]
pub continuous: bool,

/// Flag indicating whether the node should be terminated after the pipeline sync.
#[arg(long = "debug.terminate", help_heading = "Debug")]
pub terminate: bool,

/// Set the chain tip manually for testing purposes.
///
/// NOTE: This is a temporary flag
#[arg(long = "debug.tip", help_heading = "Debug", conflicts_with = "continuous")]
#[arg(long = "debug.tip", help_heading = "Debug")]
pub tip: Option<B256>,

/// Runs the sync only up to the specified block.
Expand Down
21 changes: 0 additions & 21 deletions crates/node-core/src/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,27 +239,6 @@ impl NodeConfig {
self
}

/// Returns the initial pipeline target, based on whether or not the node is running in
/// `debug.tip` mode, `debug.continuous` mode, or neither.
///
/// If running in `debug.tip` mode, the configured tip is returned.
/// Otherwise, if running in `debug.continuous` mode, the genesis hash is returned.
/// Otherwise, `None` is returned. This is what the node will do by default.
pub fn initial_pipeline_target(&self, genesis_hash: B256) -> Option<B256> {
if let Some(tip) = self.debug.tip {
// Set the provided tip as the initial pipeline target.
debug!(target: "reth::cli", %tip, "Tip manually set");
Some(tip)
} else if self.debug.continuous {
// Set genesis as the initial pipeline target.
// This will allow the downloader to start
debug!(target: "reth::cli", "Continuous sync mode enabled");
Some(genesis_hash)
} else {
None
}
}

/// Returns pruning configuration.
pub fn prune_config(&self) -> Option<PruneConfig> {
self.pruning.prune_config(&self.chain)
Expand Down
20 changes: 5 additions & 15 deletions crates/node/builder/src/launch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ use reth_node_core::{
node_config::NodeConfig,
};
use reth_primitives::{BlockNumber, Chain, ChainSpec, Head, B256};
use reth_provider::{
providers::StaticFileProvider, HeaderSyncMode, ProviderFactory, StaticFileProviderFactory,
};
use reth_provider::{providers::StaticFileProvider, ProviderFactory, StaticFileProviderFactory};
use reth_prune::{PruneModes, PrunerBuilder};
use reth_rpc_builder::config::RethRpcServerConfig;
use reth_rpc_layer::JwtSecret;
Expand All @@ -27,7 +25,7 @@ use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, error, info, warn};
use std::{sync::Arc, thread::available_parallelism};
use tokio::sync::{mpsc::Receiver, oneshot};
use tokio::sync::{mpsc::Receiver, oneshot, watch};

/// Reusable setup for launching a node.
///
Expand Down Expand Up @@ -316,16 +314,6 @@ impl<R> LaunchContextWith<Attached<WithConfigs, R>> {
.timeout(PrunerBuilder::DEFAULT_TIMEOUT)
}

/// Returns the initial pipeline target, based on whether or not the node is running in
/// `debug.tip` mode, `debug.continuous` mode, or neither.
///
/// If running in `debug.tip` mode, the configured tip is returned.
/// Otherwise, if running in `debug.continuous` mode, the genesis hash is returned.
/// Otherwise, `None` is returned. This is what the node will do by default.
pub fn initial_pipeline_target(&self) -> Option<B256> {
self.node_config().initial_pipeline_target(self.genesis_hash())
}

/// Loads the JWT secret for the engine API
pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
let default_jwt_path = self.data_dir().jwt();
Expand Down Expand Up @@ -377,11 +365,13 @@ where

info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");

let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);

// Builds an unwind-only pipeline
let pipeline = Pipeline::builder()
.add_stages(DefaultStages::new(
factory.clone(),
HeaderSyncMode::Continuous,
tip_rx,
Arc::new(EthBeaconConsensus::new(self.chain_spec())),
NoopHeaderDownloader::default(),
NoopBodiesDownloader::default(),
Expand Down
Loading

0 comments on commit 0de932d

Please sign in to comment.