Skip to content

Commit

Permalink
update pruner
Browse files Browse the repository at this point in the history
  • Loading branch information
fgimenez committed May 10, 2024
1 parent 7c78137 commit 9d5d81d
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/prune/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ reth-provider.workspace = true
reth-interfaces.workspace = true
reth-tokio-util.workspace = true
reth-config.workspace = true
reth-network-api.workspace = true

# metrics
reth-metrics.workspace = true
Expand Down
9 changes: 9 additions & 0 deletions crates/prune/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::PrunerEvent;
use reth_db::DatabaseError;
use reth_interfaces::RethError;
use reth_network_api::NetworkError;
use reth_primitives::PruneSegmentError;
use reth_provider::ProviderError;
use thiserror::Error;
use tokio::sync::broadcast::error::SendError;

#[derive(Error, Debug)]
pub enum PrunerError {
Expand Down Expand Up @@ -34,3 +37,9 @@ impl From<PrunerError> for RethError {
}
}
}

impl From<SendError<PrunerEvent>> for PrunerError {
fn from(_: SendError<PrunerEvent>) -> Self {
Self::Interface(RethError::Network(NetworkError::ChannelClosed))
}
}
8 changes: 4 additions & 4 deletions crates/prune/src/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{
time::{Duration, Instant},
};
use tokio::sync::watch;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::wrappers::BroadcastStream;
use tracing::debug;

/// Result of [Pruner::run] execution.
Expand Down Expand Up @@ -82,7 +82,7 @@ impl<DB: Database> Pruner<DB> {
}

/// Listen for events on the pruner.
pub fn events(&mut self) -> UnboundedReceiverStream<PrunerEvent> {
pub fn events(&mut self) -> BroadcastStream<PrunerEvent> {
self.listeners.new_listener()
}

Expand All @@ -100,7 +100,7 @@ impl<DB: Database> Pruner<DB> {
return Ok(PruneProgress::Finished)
}

self.listeners.notify(PrunerEvent::Started { tip_block_number });
self.listeners.notify(PrunerEvent::Started { tip_block_number })?;

debug!(target: "pruner", %tip_block_number, "Pruner started");
let start = Instant::now();
Expand Down Expand Up @@ -154,7 +154,7 @@ impl<DB: Database> Pruner<DB> {
"{message}",
);

self.listeners.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats });
self.listeners.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats })?;

Ok(progress)
}
Expand Down

0 comments on commit 9d5d81d

Please sign in to comment.