-
Notifications
You must be signed in to change notification settings - Fork 159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add --diff
flag to archive export
#3284
Changes from all commits
2854302
8ac9af7
f150365
dd08e87
257da91
0590d2c
56ca426
bf72b2c
2ddd7b9
e5af484
4ab0c56
911474e
4b7bbad
50590d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,12 +31,15 @@ use crate::chain::index::{ChainIndex, ResolveNullTipset}; | |
use crate::chain::ChainEpochDelta; | ||
use crate::cli_shared::{snapshot, snapshot::TrustedVendor}; | ||
use crate::db::car::AnyCar; | ||
use crate::ipld::stream_graph; | ||
use crate::ipld::CidHashSet; | ||
use crate::networks::{calibnet, mainnet, ChainConfig, NetworkChain}; | ||
use crate::shim::clock::EPOCH_DURATION_SECONDS; | ||
use crate::shim::clock::{ChainEpoch, EPOCHS_IN_DAY}; | ||
use anyhow::{bail, Context as _}; | ||
use chrono::NaiveDateTime; | ||
use clap::Subcommand; | ||
use futures::TryStreamExt; | ||
use fvm_ipld_blockstore::Blockstore; | ||
use indicatif::ProgressIterator; | ||
use itertools::Itertools; | ||
|
@@ -69,6 +72,9 @@ pub enum ArchiveCommands { | |
/// How many state-roots to include. Lower limit is 900 for `calibnet` and `mainnet`. | ||
#[arg(short, long, default_value_t = 2000)] | ||
depth: ChainEpochDelta, | ||
/// Do not include any values reachable from epoch-diff. | ||
#[arg(short, long)] | ||
diff: Option<ChainEpochDelta>, | ||
}, | ||
/// Print block headers at 30 day interval for a snapshot file | ||
Checkpoints { | ||
|
@@ -89,6 +95,7 @@ impl ArchiveCommands { | |
output_path, | ||
epoch, | ||
depth, | ||
diff, | ||
} => { | ||
info!( | ||
"indexing a car-backed store using snapshot: {}", | ||
|
@@ -97,7 +104,7 @@ impl ArchiveCommands { | |
|
||
let reader = move || std::fs::File::open(&input_path); | ||
|
||
do_export(reader, output_path, epoch, depth).await | ||
do_export(reader, output_path, epoch, depth, diff).await | ||
} | ||
Self::Checkpoints { snapshot } => print_checkpoints(snapshot), | ||
} | ||
|
@@ -134,6 +141,7 @@ async fn do_export<ReaderT: Read + Seek + Send + Sync>( | |
output_path: PathBuf, | ||
epoch_option: Option<ChainEpoch>, | ||
depth: ChainEpochDelta, | ||
diff: Option<ChainEpochDelta>, | ||
) -> anyhow::Result<()> { | ||
let store = Arc::new(AnyCar::new(reader).context("couldn't read input CAR file")?); | ||
|
||
|
@@ -165,6 +173,18 @@ async fn do_export<ReaderT: Read + Seek + Send + Sync>( | |
.tipset_by_height(epoch, ts, ResolveNullTipset::TakeOlder) | ||
.context("unable to get a tipset at given height")?; | ||
|
||
let seen = if let Some(diff) = diff { | ||
let diff_ts: Arc<Tipset> = index | ||
.tipset_by_height(diff, ts.clone(), ResolveNullTipset::TakeOlder) | ||
.context("diff epoch must be smaller than target epoch")?; | ||
let diff_ts: &Tipset = &diff_ts; | ||
let mut stream = stream_graph(&store, diff_ts.clone().chain(&store)); | ||
while stream.try_next().await?.is_some() {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it make sense to warn in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't always want to consume the entire stream. |
||
stream.into_seen() | ||
} else { | ||
CidHashSet::default() | ||
}; | ||
|
||
let output_path = | ||
build_output_path(network.to_string(), genesis.timestamp(), epoch, output_path); | ||
|
||
|
@@ -180,7 +200,7 @@ async fn do_export<ReaderT: Read + Seek + Send + Sync>( | |
output_path.to_str().unwrap_or_default() | ||
); | ||
|
||
crate::chain::export::<Sha256>(store, &ts, depth, writer, true).await?; | ||
crate::chain::export::<Sha256>(store, &ts, depth, writer, seen, true).await?; | ||
|
||
Ok(()) | ||
} | ||
|
@@ -385,6 +405,7 @@ mod tests { | |
output_path.path().into(), | ||
Some(0), | ||
1, | ||
None, | ||
) | ||
.await | ||
.unwrap(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -270,17 +270,29 @@ enum Task { | |
} | ||
|
||
pin_project! { | ||
struct ChainStream<DB, T> { | ||
pub struct ChainStream<DB, T> { | ||
#[pin] | ||
tipset_iter: T, | ||
db: DB, | ||
dfs: VecDeque<Task>, // Depth-first work queue. | ||
seen: CidHashSet, | ||
stateroot_limit: ChainEpoch, | ||
fail_on_dead_links: bool, | ||
} | ||
} | ||
|
||
/// Initializes a stream of blocks. | ||
impl<DB, T> ChainStream<DB, T> { | ||
pub fn with_seen(self, seen: CidHashSet) -> Self { | ||
ChainStream { seen, ..self } | ||
} | ||
|
||
pub fn into_seen(self) -> CidHashSet { | ||
self.seen | ||
} | ||
} | ||
|
||
/// Stream all blocks that are reachable before the `stateroot_limit` epoch. After this limit, only | ||
/// block headers are streamed. Any dead links are reported as errors. | ||
/// | ||
/// # Arguments | ||
/// | ||
|
@@ -293,13 +305,30 @@ pub fn stream_chain<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin>( | |
db: DB, | ||
tipset_iter: T, | ||
stateroot_limit: ChainEpoch, | ||
) -> impl Stream<Item = anyhow::Result<Block>> { | ||
) -> ChainStream<DB, T> { | ||
ChainStream { | ||
tipset_iter, | ||
db, | ||
dfs: VecDeque::new(), | ||
seen: CidHashSet::default(), | ||
stateroot_limit, | ||
fail_on_dead_links: true, | ||
} | ||
} | ||
|
||
// Stream available graph in a depth-first search. All reachable nodes are touched and dead-links | ||
// are ignored. | ||
pub fn stream_graph<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any way to indicate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've updated the documentation of both |
||
db: DB, | ||
tipset_iter: T, | ||
) -> ChainStream<DB, T> { | ||
ChainStream { | ||
tipset_iter, | ||
db, | ||
dfs: VecDeque::new(), | ||
seen: CidHashSet::default(), | ||
stateroot_limit: 0, | ||
fail_on_dead_links: false, | ||
} | ||
} | ||
|
||
|
@@ -316,9 +345,12 @@ impl<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin> Stream for ChainStream< | |
match task { | ||
Emit(cid) => { | ||
let cid = *cid; | ||
let data = this.db.get(&cid)?.ok_or(anyhow::anyhow!("missing key"))?; | ||
this.dfs.pop_front(); | ||
return Poll::Ready(Some(Ok(Block { cid, data }))); | ||
if let Some(data) = this.db.get(&cid)? { | ||
return Poll::Ready(Some(Ok(Block { cid, data }))); | ||
} else if *this.fail_on_dead_links { | ||
return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {}", cid)))); | ||
} | ||
} | ||
Iterate(dfs_iter) => { | ||
while let Some(ipld) = dfs_iter.next() { | ||
|
@@ -329,15 +361,18 @@ impl<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin> Stream for ChainStream< | |
// 3. _: ignore all other links | ||
// Don't revisit what's already been visited. | ||
if should_save_block_to_snapshot(cid) && this.seen.insert(cid) { | ||
let data = | ||
this.db.get(&cid)?.ok_or(anyhow::anyhow!("missing key"))?; | ||
|
||
if cid.codec() == fvm_ipld_encoding::DAG_CBOR { | ||
let ipld: Ipld = from_slice(&data)?; | ||
dfs_iter.walk_next(ipld); | ||
if let Some(data) = this.db.get(&cid)? { | ||
if cid.codec() == fvm_ipld_encoding::DAG_CBOR { | ||
let ipld: Ipld = from_slice(&data)?; | ||
dfs_iter.walk_next(ipld); | ||
} | ||
return Poll::Ready(Some(Ok(Block { cid, data }))); | ||
} else if *this.fail_on_dead_links { | ||
return Poll::Ready(Some(Err(anyhow::anyhow!( | ||
"missing key: {}", | ||
cid | ||
)))); | ||
} | ||
|
||
return Poll::Ready(Some(Ok(Block { cid, data }))); | ||
} | ||
} | ||
} | ||
|
@@ -351,29 +386,31 @@ impl<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin> Stream for ChainStream< | |
// yield the block without walking the graph it represents. | ||
if let Some(tipset) = this.tipset_iter.as_mut().next() { | ||
for block in tipset.into_blocks().into_iter() { | ||
// Make sure we always yield a block otherwise. | ||
this.dfs.push_back(Emit(*block.cid())); | ||
|
||
if block.epoch() == 0 { | ||
// The genesis block has some kind of dummy parent that needs to be emitted. | ||
for p in block.parents().cids() { | ||
this.dfs.push_back(Emit(*p)); | ||
if this.seen.insert(*block.cid()) { | ||
// Make sure we always yield a block otherwise. | ||
this.dfs.push_back(Emit(*block.cid())); | ||
|
||
if block.epoch() == 0 { | ||
// The genesis block has some kind of dummy parent that needs to be emitted. | ||
for p in block.parents().cids() { | ||
this.dfs.push_back(Emit(*p)); | ||
} | ||
} | ||
} | ||
|
||
// Process block messages. | ||
if block.epoch() > stateroot_limit { | ||
this.dfs | ||
.push_back(Iterate(DfsIter::from(*block.messages()))); | ||
} | ||
// Process block messages. | ||
if block.epoch() > stateroot_limit { | ||
this.dfs | ||
.push_back(Iterate(DfsIter::from(*block.messages()))); | ||
} | ||
|
||
// Visit the block if it's within required depth. And a special case for `0` | ||
// epoch to match Lotus' implementation. | ||
if block.epoch() == 0 || block.epoch() > stateroot_limit { | ||
// NOTE: In the original `walk_snapshot` implementation we walk the dag | ||
// immediately. Which is what we do here as well, but using a queue. | ||
this.dfs | ||
.push_back(Iterate(DfsIter::from(*block.state_root()))); | ||
// Visit the block if it's within required depth. And a special case for `0` | ||
// epoch to match Lotus' implementation. | ||
if block.epoch() == 0 || block.epoch() > stateroot_limit { | ||
// NOTE: In the original `walk_snapshot` implementation we walk the dag | ||
// immediately. Which is what we do here as well, but using a queue. | ||
this.dfs | ||
.push_back(Iterate(DfsIter::from(*block.state_root()))); | ||
} | ||
} | ||
} | ||
} else { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit confusing, doc says epoch while code says epoch delta
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to
epoch-diff
.