diff --git a/src/db/gc/mod.rs b/src/db/gc/mod.rs index 9031c8e313e..6ca4c23729c 100644 --- a/src/db/gc/mod.rs +++ b/src/db/gc/mod.rs @@ -137,8 +137,7 @@ impl + Sync + Se // NOTE: One concern here is that this is going to consume a lot of CPU. async fn filter(&mut self, tipset: Arc, depth: ChainEpochDelta) -> anyhow::Result<()> { // NOTE: We want to keep all the block headers from genesis to heaviest tipset epoch. - let mut stream = stream_graph(self.db.clone(), (*tipset).clone().chain(&self.db), depth); - + let mut stream = stream_graph(self.db.clone(), tipset.chain_arc(&self.db), depth); while let Some(block) = stream.next().await { let block = block?; self.marked.remove(&block.cid); @@ -189,9 +188,8 @@ impl + Sync + Se // next step. async fn gc_workflow(&mut self, interval: Duration) -> anyhow::Result<()> { let depth = self.depth; - let tipset = (self.get_heaviest_tipset)(); - - let mut current_epoch = tipset.epoch(); + let mut current_tipset = (self.get_heaviest_tipset)(); + let mut current_epoch = current_tipset.epoch(); let last_gc_run = self.fetch_last_gc_run()?; // Don't run the GC if there aren't enough state-roots yet or if we're too close to the last // GC run. Sleep and yield to the main loop in order to refresh the heaviest tipset value. @@ -205,8 +203,9 @@ impl + Sync + Se // Make sure we don't run the GC too often. time::sleep(interval).await; - // Refresh `current_epoch` after sleeping. - current_epoch = (self.get_heaviest_tipset)().epoch(); + // Refresh `current_tipset` and `current_epoch` after sleeping. + current_tipset = (self.get_heaviest_tipset)(); + current_epoch = current_tipset.epoch(); info!("populate keys for GC"); self.populate()?; @@ -222,7 +221,7 @@ impl + Sync + Se } info!("filter keys for GC"); - self.filter(tipset, depth).await?; + self.filter(current_tipset, depth).await?; info!("GC sweep"); let deleted = self.sweep()?; diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 33cc4bfa288..987b04898e7 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -15,6 +15,7 @@ use futures::Stream; use fvm_ipld_blockstore::Blockstore; use parking_lot::Mutex; use pin_project_lite::pin_project; +use std::borrow::Borrow; use std::ops::DerefMut; use std::pin::Pin; use std::task::{Context, Poll}; @@ -139,11 +140,11 @@ impl ChainStream { /// * `stateroot_limit` - An epoch that signifies how far back we need to inspect tipsets, /// in-depth. This has to be pre-calculated using this formula: `$cur_epoch - $depth`, where `$depth` /// is the number of `[`Tipset`]` that needs inspection. -pub fn stream_chain + Unpin>( +pub fn stream_chain, ITER: Iterator + Unpin>( db: DB, - tipset_iter: T, + tipset_iter: ITER, stateroot_limit: ChainEpoch, -) -> ChainStream { +) -> ChainStream { ChainStream { tipset_iter, db, @@ -156,11 +157,11 @@ pub fn stream_chain + Unpin>( // Stream available graph in a depth-first search. All reachable nodes are touched and dead-links // are ignored. -pub fn stream_graph + Unpin>( +pub fn stream_graph, ITER: Iterator + Unpin>( db: DB, - tipset_iter: T, + tipset_iter: ITER, stateroot_limit: ChainEpoch, -) -> ChainStream { +) -> ChainStream { ChainStream { tipset_iter, db, @@ -171,7 +172,9 @@ pub fn stream_graph + Unpin>( } } -impl + Unpin> Stream for ChainStream { +impl, ITER: Iterator + Unpin> Stream + for ChainStream +{ type Item = anyhow::Result; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { @@ -233,7 +236,7 @@ impl + Unpin> Stream for ChainStream< // enclosing loop is processing the queue. Once the desired depth has been reached - // yield the block without walking the graph it represents. if let Some(tipset) = this.tipset_iter.next() { - for block in tipset.into_block_headers().into_iter() { + for block in tipset.borrow().block_headers() { if this.seen.insert(*block.cid()) { // Make sure we always yield a block otherwise. this.dfs.push_back(Emit(*block.cid())); @@ -318,17 +321,18 @@ impl UnorderedChainStream { #[allow(dead_code)] pub fn unordered_stream_chain< DB: Blockstore + Sync + Send + 'static, - T: Iterator + Unpin + Send + 'static, + T: Borrow, + ITER: Iterator + Unpin + Send + 'static, >( db: Arc, - tipset_iter: T, + tipset_iter: ITER, stateroot_limit: ChainEpoch, -) -> UnorderedChainStream { +) -> UnorderedChainStream { let (sender, receiver) = flume::bounded(BLOCK_CHANNEL_LIMIT); let (extract_sender, extract_receiver) = flume::unbounded(); let fail_on_dead_links = true; let seen = Arc::new(Mutex::new(CidHashSet::default())); - let handle = UnorderedChainStream::::start_workers( + let handle = UnorderedChainStream::::start_workers( db.clone(), sender.clone(), extract_receiver, @@ -353,17 +357,18 @@ pub fn unordered_stream_chain< // are ignored. pub fn unordered_stream_graph< DB: Blockstore + Sync + Send + 'static, - T: Iterator + Unpin + Send + 'static, + T: Borrow, + ITER: Iterator + Unpin + Send + 'static, >( db: Arc, - tipset_iter: T, + tipset_iter: ITER, stateroot_limit: ChainEpoch, -) -> UnorderedChainStream { +) -> UnorderedChainStream { let (sender, receiver) = flume::bounded(2048); let (extract_sender, extract_receiver) = flume::unbounded(); let fail_on_dead_links = false; let seen = Arc::new(Mutex::new(CidHashSet::default())); - let handle = UnorderedChainStream::::start_workers( + let handle = UnorderedChainStream::::start_workers( db.clone(), sender.clone(), extract_receiver, @@ -384,8 +389,11 @@ pub fn unordered_stream_graph< } } -impl + Unpin> - UnorderedChainStream +impl< + DB: Blockstore + Send + Sync + 'static, + T: Borrow, + ITER: Iterator + Unpin, + > UnorderedChainStream { fn start_workers( db: Arc,