Skip to content

Commit

Permalink
fix(gc): refresh current_tipset together with current_epoch after…
Browse files Browse the repository at this point in the history
… sleeping
  • Loading branch information
hanabi1224 committed Jan 9, 2025
1 parent 52742ac commit eb1d322
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 26 deletions.
15 changes: 7 additions & 8 deletions src/db/gc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ impl<DB: Blockstore + SettingsStore + GarbageCollectable<CidHashSet> + Sync + Se
// NOTE: One concern here is that this is going to consume a lot of CPU.
async fn filter(&mut self, tipset: Arc<Tipset>, depth: ChainEpochDelta) -> anyhow::Result<()> {
// NOTE: We want to keep all the block headers from genesis to heaviest tipset epoch.
let mut stream = stream_graph(self.db.clone(), (*tipset).clone().chain(&self.db), depth);

let mut stream = stream_graph(self.db.clone(), tipset.chain_arc(&self.db), depth);
while let Some(block) = stream.next().await {
let block = block?;
self.marked.remove(&block.cid);
Expand Down Expand Up @@ -189,9 +188,8 @@ impl<DB: Blockstore + SettingsStore + GarbageCollectable<CidHashSet> + Sync + Se
// next step.
async fn gc_workflow(&mut self, interval: Duration) -> anyhow::Result<()> {
let depth = self.depth;
let tipset = (self.get_heaviest_tipset)();

let mut current_epoch = tipset.epoch();
let mut current_tipset = (self.get_heaviest_tipset)();
let mut current_epoch = current_tipset.epoch();
let last_gc_run = self.fetch_last_gc_run()?;
// Don't run the GC if there aren't enough state-roots yet or if we're too close to the last
// GC run. Sleep and yield to the main loop in order to refresh the heaviest tipset value.
Expand All @@ -205,8 +203,9 @@ impl<DB: Blockstore + SettingsStore + GarbageCollectable<CidHashSet> + Sync + Se
// Make sure we don't run the GC too often.
time::sleep(interval).await;

// Refresh `current_epoch` after sleeping.
current_epoch = (self.get_heaviest_tipset)().epoch();
// Refresh `current_tipset` and `current_epoch` after sleeping.
current_tipset = (self.get_heaviest_tipset)();
current_epoch = current_tipset.epoch();

info!("populate keys for GC");
self.populate()?;
Expand All @@ -222,7 +221,7 @@ impl<DB: Blockstore + SettingsStore + GarbageCollectable<CidHashSet> + Sync + Se
}

info!("filter keys for GC");
self.filter(tipset, depth).await?;
self.filter(current_tipset, depth).await?;

info!("GC sweep");
let deleted = self.sweep()?;
Expand Down
44 changes: 26 additions & 18 deletions src/ipld/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use futures::Stream;
use fvm_ipld_blockstore::Blockstore;
use parking_lot::Mutex;
use pin_project_lite::pin_project;
use std::borrow::Borrow;
use std::ops::DerefMut;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -139,11 +140,11 @@ impl<DB, T> ChainStream<DB, T> {
/// * `stateroot_limit` - An epoch that signifies how far back we need to inspect tipsets,
/// in-depth. This has to be pre-calculated using this formula: `$cur_epoch - $depth`, where `$depth`
/// is the number of `[`Tipset`]` that needs inspection.
pub fn stream_chain<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin>(
pub fn stream_chain<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin>(
db: DB,
tipset_iter: T,
tipset_iter: ITER,
stateroot_limit: ChainEpoch,
) -> ChainStream<DB, T> {
) -> ChainStream<DB, ITER> {
ChainStream {
tipset_iter,
db,
Expand All @@ -156,11 +157,11 @@ pub fn stream_chain<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin>(

// Stream available graph in a depth-first search. All reachable nodes are touched and dead-links
// are ignored.
pub fn stream_graph<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin>(
pub fn stream_graph<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin>(
db: DB,
tipset_iter: T,
tipset_iter: ITER,
stateroot_limit: ChainEpoch,
) -> ChainStream<DB, T> {
) -> ChainStream<DB, ITER> {
ChainStream {
tipset_iter,
db,
Expand All @@ -171,7 +172,9 @@ pub fn stream_graph<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin>(
}
}

impl<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin> Stream for ChainStream<DB, T> {
impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
for ChainStream<DB, ITER>
{
type Item = anyhow::Result<CarBlock>;

fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down Expand Up @@ -233,7 +236,7 @@ impl<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin> Stream for ChainStream<
// enclosing loop is processing the queue. Once the desired depth has been reached -
// yield the block without walking the graph it represents.
if let Some(tipset) = this.tipset_iter.next() {
for block in tipset.into_block_headers().into_iter() {
for block in tipset.borrow().block_headers() {
if this.seen.insert(*block.cid()) {
// Make sure we always yield a block otherwise.
this.dfs.push_back(Emit(*block.cid()));
Expand Down Expand Up @@ -318,17 +321,18 @@ impl<DB, T> UnorderedChainStream<DB, T> {
#[allow(dead_code)]
pub fn unordered_stream_chain<
DB: Blockstore + Sync + Send + 'static,
T: Iterator<Item = Tipset> + Unpin + Send + 'static,
T: Borrow<Tipset>,
ITER: Iterator<Item = T> + Unpin + Send + 'static,
>(
db: Arc<DB>,
tipset_iter: T,
tipset_iter: ITER,
stateroot_limit: ChainEpoch,
) -> UnorderedChainStream<DB, T> {
) -> UnorderedChainStream<DB, ITER> {
let (sender, receiver) = flume::bounded(BLOCK_CHANNEL_LIMIT);
let (extract_sender, extract_receiver) = flume::unbounded();
let fail_on_dead_links = true;
let seen = Arc::new(Mutex::new(CidHashSet::default()));
let handle = UnorderedChainStream::<DB, T>::start_workers(
let handle = UnorderedChainStream::<DB, ITER>::start_workers(
db.clone(),
sender.clone(),
extract_receiver,
Expand All @@ -353,17 +357,18 @@ pub fn unordered_stream_chain<
// are ignored.
pub fn unordered_stream_graph<
DB: Blockstore + Sync + Send + 'static,
T: Iterator<Item = Tipset> + Unpin + Send + 'static,
T: Borrow<Tipset>,
ITER: Iterator<Item = T> + Unpin + Send + 'static,
>(
db: Arc<DB>,
tipset_iter: T,
tipset_iter: ITER,
stateroot_limit: ChainEpoch,
) -> UnorderedChainStream<DB, T> {
) -> UnorderedChainStream<DB, ITER> {
let (sender, receiver) = flume::bounded(2048);
let (extract_sender, extract_receiver) = flume::unbounded();
let fail_on_dead_links = false;
let seen = Arc::new(Mutex::new(CidHashSet::default()));
let handle = UnorderedChainStream::<DB, T>::start_workers(
let handle = UnorderedChainStream::<DB, ITER>::start_workers(
db.clone(),
sender.clone(),
extract_receiver,
Expand All @@ -384,8 +389,11 @@ pub fn unordered_stream_graph<
}
}

impl<DB: Blockstore + Send + Sync + 'static, T: Iterator<Item = Tipset> + Unpin>
UnorderedChainStream<DB, T>
impl<
DB: Blockstore + Send + Sync + 'static,
T: Borrow<Tipset>,
ITER: Iterator<Item = T> + Unpin,
> UnorderedChainStream<DB, ITER>
{
fn start_workers(
db: Arc<DB>,
Expand Down

0 comments on commit eb1d322

Please sign in to comment.