diff --git a/Cargo.lock b/Cargo.lock index a0b946f3555de..e6e8c4902d04b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8187,6 +8187,7 @@ dependencies = [ "reth-provider", "reth-prune-types", "reth-stages", + "reth-stages-types", "reth-static-file-types", "reth-storage-errors", "reth-testing-utils", diff --git a/bin/reth/src/commands/prune.rs b/bin/reth/src/commands/prune.rs index 2cd63d3970d11..f3b0fcaab9660 100644 --- a/bin/reth/src/commands/prune.rs +++ b/bin/reth/src/commands/prune.rs @@ -2,10 +2,9 @@ use crate::commands::common::{AccessRights, Environment, EnvironmentArgs}; use clap::Parser; -use reth_provider::StageCheckpointReader; use reth_prune::PrunerBuilder; -use reth_stages::StageId; -use reth_static_file::{HighestStaticFiles, StaticFileProducer}; +use reth_static_file::StaticFileProducer; +use tracing::info; /// Prunes according to the configuration without any limits #[derive(Debug, Parser)] @@ -20,37 +19,23 @@ impl PruneCommand { let Environment { config, provider_factory, .. } = self.env.init(AccessRights::RW)?; let prune_config = config.prune.unwrap_or_default(); + // Copy data from database to static files + info!(target: "reth::cli", "Copying data from database to static files..."); let static_file_producer = StaticFileProducer::new(provider_factory.clone(), prune_config.segments.clone()); - let static_file_producer = static_file_producer.lock(); + let lowest_static_file_height = static_file_producer.lock().copy_to_static_files()?.min(); + info!(target: "reth::cli", ?lowest_static_file_height, "Copied data from database to static files"); - // Copies data from database to static files - let lowest_static_file_height = { - let provider = provider_factory.provider()?; - let stages_checkpoints = [StageId::Headers, StageId::Execution, StageId::Bodies] - .into_iter() - .map(|stage| { - provider.get_stage_checkpoint(stage).map(|c| c.map(|c| c.block_number)) - }) - .collect::, _>>()?; - - let targets = static_file_producer.get_static_file_targets(HighestStaticFiles { - headers: stages_checkpoints[0], - receipts: stages_checkpoints[1], - transactions: stages_checkpoints[2], - })?; - static_file_producer.run(targets)?; - stages_checkpoints.into_iter().min().expect("exists") - }; - - // Deletes data which has been copied to static files. + // Delete data which has been copied to static files. if let Some(prune_tip) = lowest_static_file_height { + info!(target: "reth::cli", ?prune_tip, ?prune_config, "Pruning data from database..."); // Run the pruner according to the configuration, and don't enforce any limits on it let mut pruner = PrunerBuilder::new(prune_config) .prune_delete_limit(usize::MAX) .build(provider_factory); pruner.run(prune_tip)?; + info!(target: "reth::cli", "Pruned data from database"); } Ok(()) diff --git a/crates/stages/api/src/pipeline/mod.rs b/crates/stages/api/src/pipeline/mod.rs index 67ef53855b15a..1be468702e432 100644 --- a/crates/stages/api/src/pipeline/mod.rs +++ b/crates/stages/api/src/pipeline/mod.rs @@ -13,7 +13,6 @@ use reth_provider::{ }; use reth_prune::PrunerBuilder; use reth_static_file::StaticFileProducer; -use reth_static_file_types::HighestStaticFiles; use reth_tokio_util::{EventSender, EventStream}; use std::pin::Pin; use tokio::sync::watch; @@ -248,26 +247,9 @@ where /// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the /// lock is occupied. pub fn move_to_static_files(&self) -> RethResult<()> { - let static_file_producer = self.static_file_producer.lock(); - // Copies data from database to static files - let lowest_static_file_height = { - let provider = self.provider_factory.provider()?; - let stages_checkpoints = [StageId::Headers, StageId::Execution, StageId::Bodies] - .into_iter() - .map(|stage| { - provider.get_stage_checkpoint(stage).map(|c| c.map(|c| c.block_number)) - }) - .collect::, _>>()?; - - let targets = static_file_producer.get_static_file_targets(HighestStaticFiles { - headers: stages_checkpoints[0], - receipts: stages_checkpoints[1], - transactions: stages_checkpoints[2], - })?; - static_file_producer.run(targets)?; - stages_checkpoints.into_iter().min().expect("exists") - }; + let lowest_static_file_height = + self.static_file_producer.lock().copy_to_static_files()?.min(); // Deletes data which has been copied to static files. if let Some(prune_tip) = lowest_static_file_height { diff --git a/crates/static-file/static-file/Cargo.toml b/crates/static-file/static-file/Cargo.toml index 29a601f050d0e..1a1921d58c5c7 100644 --- a/crates/static-file/static-file/Cargo.toml +++ b/crates/static-file/static-file/Cargo.toml @@ -21,6 +21,7 @@ reth-nippy-jar.workspace = true reth-tokio-util.workspace = true reth-prune-types.workspace = true reth-static-file-types.workspace = true +reth-stages-types.workspace = true alloy-primitives.workspace = true diff --git a/crates/static-file/static-file/src/static_file_producer.rs b/crates/static-file/static-file/src/static_file_producer.rs index 44ea3a5c84b4e..72637c3e507bb 100644 --- a/crates/static-file/static-file/src/static_file_producer.rs +++ b/crates/static-file/static-file/src/static_file_producer.rs @@ -5,8 +5,12 @@ use alloy_primitives::BlockNumber; use parking_lot::Mutex; use rayon::prelude::*; use reth_db_api::database::Database; -use reth_provider::{providers::StaticFileWriter, ProviderFactory, StaticFileProviderFactory}; +use reth_provider::{ + providers::StaticFileWriter, ProviderFactory, StageCheckpointReader as _, + StaticFileProviderFactory, +}; use reth_prune_types::PruneModes; +use reth_stages_types::StageId; use reth_static_file_types::HighestStaticFiles; use reth_storage_errors::provider::ProviderResult; use reth_tokio_util::{EventSender, EventStream}; @@ -167,6 +171,28 @@ impl StaticFileProducerInner { Ok(targets) } + /// Copies data from database to static files according to + /// [stage checkpoints](reth_stages_types::StageCheckpoint). + /// + /// Returns the highest block numbers for all static file segments. + pub fn copy_to_static_files(&self) -> ProviderResult { + let provider = self.provider_factory.provider()?; + let stages_checkpoints = [StageId::Headers, StageId::Execution, StageId::Bodies] + .into_iter() + .map(|stage| provider.get_stage_checkpoint(stage).map(|c| c.map(|c| c.block_number))) + .collect::, _>>()?; + + let highest_static_files = HighestStaticFiles { + headers: stages_checkpoints[0], + receipts: stages_checkpoints[1], + transactions: stages_checkpoints[2], + }; + let targets = self.get_static_file_targets(highest_static_files)?; + self.run(targets)?; + + Ok(highest_static_files) + } + /// Returns a static file targets at the provided finalized block numbers per segment. /// The target is determined by the check against highest `static_files` using /// [`reth_provider::providers::StaticFileProvider::get_highest_static_files`]. diff --git a/crates/static-file/types/src/lib.rs b/crates/static-file/types/src/lib.rs index f78d61f6961ba..3b3b42e21fe3d 100644 --- a/crates/static-file/types/src/lib.rs +++ b/crates/static-file/types/src/lib.rs @@ -53,6 +53,11 @@ impl HighestStaticFiles { } } + /// Returns the minimum block of all segments. + pub fn min(&self) -> Option { + [self.headers, self.transactions, self.receipts].iter().filter_map(|&option| option).min() + } + /// Returns the maximum block of all segments. pub fn max(&self) -> Option { [self.headers, self.transactions, self.receipts].iter().filter_map(|&option| option).max()