Skip to content

Commit

Permalink
add info logs, deduplicate copy to static files part
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Jun 26, 2024
1 parent df5e8ab commit 509acc9
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 45 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 9 additions & 24 deletions bin/reth/src/commands/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
use crate::commands::common::{AccessRights, Environment, EnvironmentArgs};
use clap::Parser;
use reth_provider::StageCheckpointReader;
use reth_prune::PrunerBuilder;
use reth_stages::StageId;
use reth_static_file::{HighestStaticFiles, StaticFileProducer};
use reth_static_file::StaticFileProducer;
use tracing::info;

/// Prunes according to the configuration without any limits
#[derive(Debug, Parser)]
Expand All @@ -20,37 +19,23 @@ impl PruneCommand {
let Environment { config, provider_factory, .. } = self.env.init(AccessRights::RW)?;
let prune_config = config.prune.unwrap_or_default();

// Copy data from database to static files
info!(target: "reth::cli", "Copying data from database to static files...");
let static_file_producer =
StaticFileProducer::new(provider_factory.clone(), prune_config.segments.clone());
let static_file_producer = static_file_producer.lock();
let lowest_static_file_height = static_file_producer.lock().copy_to_static_files()?.min();
info!(target: "reth::cli", ?lowest_static_file_height, "Copied data from database to static files");

// Copies data from database to static files
let lowest_static_file_height = {
let provider = provider_factory.provider()?;
let stages_checkpoints = [StageId::Headers, StageId::Execution, StageId::Bodies]
.into_iter()
.map(|stage| {
provider.get_stage_checkpoint(stage).map(|c| c.map(|c| c.block_number))
})
.collect::<Result<Vec<_>, _>>()?;

let targets = static_file_producer.get_static_file_targets(HighestStaticFiles {
headers: stages_checkpoints[0],
receipts: stages_checkpoints[1],
transactions: stages_checkpoints[2],
})?;
static_file_producer.run(targets)?;
stages_checkpoints.into_iter().min().expect("exists")
};

// Deletes data which has been copied to static files.
// Delete data which has been copied to static files.
if let Some(prune_tip) = lowest_static_file_height {
info!(target: "reth::cli", ?prune_tip, ?prune_config, "Pruning data from database...");
// Run the pruner according to the configuration, and don't enforce any limits on it
let mut pruner = PrunerBuilder::new(prune_config)
.prune_delete_limit(usize::MAX)
.build(provider_factory);

pruner.run(prune_tip)?;
info!(target: "reth::cli", "Pruned data from database");
}

Ok(())
Expand Down
22 changes: 2 additions & 20 deletions crates/stages/api/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use reth_provider::{
};
use reth_prune::PrunerBuilder;
use reth_static_file::StaticFileProducer;
use reth_static_file_types::HighestStaticFiles;
use reth_tokio_util::{EventSender, EventStream};
use std::pin::Pin;
use tokio::sync::watch;
Expand Down Expand Up @@ -248,26 +247,9 @@ where
/// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the
/// lock is occupied.
pub fn move_to_static_files(&self) -> RethResult<()> {
let static_file_producer = self.static_file_producer.lock();

// Copies data from database to static files
let lowest_static_file_height = {
let provider = self.provider_factory.provider()?;
let stages_checkpoints = [StageId::Headers, StageId::Execution, StageId::Bodies]
.into_iter()
.map(|stage| {
provider.get_stage_checkpoint(stage).map(|c| c.map(|c| c.block_number))
})
.collect::<Result<Vec<_>, _>>()?;

let targets = static_file_producer.get_static_file_targets(HighestStaticFiles {
headers: stages_checkpoints[0],
receipts: stages_checkpoints[1],
transactions: stages_checkpoints[2],
})?;
static_file_producer.run(targets)?;
stages_checkpoints.into_iter().min().expect("exists")
};
let lowest_static_file_height =
self.static_file_producer.lock().copy_to_static_files()?.min();

// Deletes data which has been copied to static files.
if let Some(prune_tip) = lowest_static_file_height {
Expand Down
1 change: 1 addition & 0 deletions crates/static-file/static-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ reth-nippy-jar.workspace = true
reth-tokio-util.workspace = true
reth-prune-types.workspace = true
reth-static-file-types.workspace = true
reth-stages-types.workspace = true

alloy-primitives.workspace = true

Expand Down
28 changes: 27 additions & 1 deletion crates/static-file/static-file/src/static_file_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ use alloy_primitives::BlockNumber;
use parking_lot::Mutex;
use rayon::prelude::*;
use reth_db_api::database::Database;
use reth_provider::{providers::StaticFileWriter, ProviderFactory, StaticFileProviderFactory};
use reth_provider::{
providers::StaticFileWriter, ProviderFactory, StageCheckpointReader as _,
StaticFileProviderFactory,
};
use reth_prune_types::PruneModes;
use reth_stages_types::StageId;
use reth_static_file_types::HighestStaticFiles;
use reth_storage_errors::provider::ProviderResult;
use reth_tokio_util::{EventSender, EventStream};
Expand Down Expand Up @@ -167,6 +171,28 @@ impl<DB: Database> StaticFileProducerInner<DB> {
Ok(targets)
}

/// Copies data from database to static files according to
/// [stage checkpoints](reth_stages_types::StageCheckpoint).
///
/// Returns the highest block numbers for all static file segments.
pub fn copy_to_static_files(&self) -> ProviderResult<HighestStaticFiles> {
let provider = self.provider_factory.provider()?;
let stages_checkpoints = [StageId::Headers, StageId::Execution, StageId::Bodies]
.into_iter()
.map(|stage| provider.get_stage_checkpoint(stage).map(|c| c.map(|c| c.block_number)))
.collect::<Result<Vec<_>, _>>()?;

let highest_static_files = HighestStaticFiles {
headers: stages_checkpoints[0],
receipts: stages_checkpoints[1],
transactions: stages_checkpoints[2],
};
let targets = self.get_static_file_targets(highest_static_files)?;
self.run(targets)?;

Ok(highest_static_files)
}

/// Returns a static file targets at the provided finalized block numbers per segment.
/// The target is determined by the check against highest `static_files` using
/// [`reth_provider::providers::StaticFileProvider::get_highest_static_files`].
Expand Down
5 changes: 5 additions & 0 deletions crates/static-file/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ impl HighestStaticFiles {
}
}

/// Returns the minimum block of all segments.
pub fn min(&self) -> Option<u64> {
[self.headers, self.transactions, self.receipts].iter().filter_map(|&option| option).min()
}

/// Returns the maximum block of all segments.
pub fn max(&self) -> Option<u64> {
[self.headers, self.transactions, self.receipts].iter().filter_map(|&option| option).max()
Expand Down

0 comments on commit 509acc9

Please sign in to comment.