diff --git a/CHANGELOG.md b/CHANGELOG.md index 22080d75f381..d21d5d0129c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,8 @@ option to `forest-cli state fetch` command. - [#3213](https://github.com/ChainSafe/forest/pull/3213): Add support for loading forest.car.zst files. +- [#3284](https://github.com/ChainSafe/forest/pull/3284): Add `--diff` flag to + `archive export`. ### Changed diff --git a/scripts/tests/forest_cli_check.sh b/scripts/tests/forest_cli_check.sh index 03476a386c6a..0b0d54d6469f 100755 --- a/scripts/tests/forest_cli_check.sh +++ b/scripts/tests/forest_cli_check.sh @@ -32,11 +32,29 @@ pushd "$(mktemp --directory)" "$FOREST_CLI_PATH" --chain calibnet snapshot fetch --vendor filops # this will fail if they happen to have the same height - we should change the format of our filenames test "$(num-files-here)" -eq 2 -# verify that we are byte-for-byte identical with filops + +: verify that we are byte-for-byte identical with filops zstd -d filops_*.car.zst "$FOREST_CLI_PATH" archive export filops_*.car -o exported_snapshot.car.zst zstd -d exported_snapshot.car.zst cmp --silent filops_*.car exported_snapshot.car + +: verify that the exported snapshot is in ForestCAR.zst format +assert_eq "$(forest_query_format exported_snapshot.car.zst)" "ForestCARv1.zst" + +: verify that diff exports contain the expected number of state roots +EPOCH=$(forest_query_epoch exported_snapshot.car.zst) +"$FOREST_CLI_PATH" archive export --epoch $((EPOCH-1100)) --depth 900 --output-path base_snapshot.forest.car.zst exported_snapshot.car.zst + +BASE_EPOCH=$(forest_query_epoch base_snapshot.forest.car.zst) +assert_eq "$BASE_EPOCH" $((EPOCH-1100)) + +BASE_STATE_ROOTS=$(forest_query_state_roots base_snapshot.forest.car.zst) +assert_eq "$BASE_STATE_ROOTS" 900 + +"$FOREST_CLI_PATH" archive export --diff "$BASE_EPOCH" -o diff_snapshot.forest.car.zst exported_snapshot.car.zst +DIFF_STATE_ROOTS=$(forest_query_state_roots diff_snapshot.forest.car.zst) +assert_eq "$DIFF_STATE_ROOTS" 1100 rm -- * popd diff --git a/scripts/tests/harness.sh b/scripts/tests/harness.sh index a7ac2bf36cda..805819f8e760 100644 --- a/scripts/tests/harness.sh +++ b/scripts/tests/harness.sh @@ -22,6 +22,18 @@ function forest_check_db_stats { $FOREST_CLI_PATH --chain calibnet db stats } +function forest_query_epoch { + $FOREST_CLI_PATH archive info "$1" | grep Epoch | awk '{print $2}' +} + +function forest_query_state_roots { + $FOREST_CLI_PATH archive info "$1" | grep State-roots | awk '{print $2}' +} + +function forest_query_format { + $FOREST_CLI_PATH archive info "$1" | grep "CAR format" | awk '{print $3}' +} + function forest_run_node_detached { echo "Running forest in detached mode" $FOREST_PATH --chain calibnet --encrypt-keystore false --log-dir "$LOG_DIRECTORY" --detach --save-token ./admin_token --track-peak-rss @@ -68,4 +80,17 @@ function forest_cleanup { fi } +function assert_eq { + local expected="$1" + local actual="$2" + local msg="${3-}" + + if [ "$expected" == "$actual" ]; then + return 0 + else + [ "${#msg}" -gt 0 ] && echo "$expected == $actual :: $msg" + return 1 + fi +} + trap forest_cleanup EXIT diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 65a1d9e25c3d..00309359d8d3 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -4,7 +4,7 @@ pub mod store; mod weight; use crate::blocks::Tipset; use crate::db::car::forest; -use crate::ipld::stream_chain; +use crate::ipld::{stream_chain, CidHashSet}; use crate::utils::io::{AsyncWriterWithChecksum, Checksum}; use anyhow::{Context, Result}; use digest::Digest; @@ -18,6 +18,7 @@ pub async fn export( tipset: &Tipset, lookup_depth: ChainEpochDelta, writer: impl AsyncWrite + Unpin, + seen: CidHashSet, skip_checksum: bool, ) -> Result>, Error> { let stateroot_lookup_limit = tipset.epoch() - lookup_depth; @@ -28,7 +29,8 @@ pub async fn export( // Stream stateroots in range stateroot_lookup_limit..=tipset.epoch(). Also // stream all block headers until genesis. - let blocks = stream_chain(&db, tipset.clone().chain(&db), stateroot_lookup_limit); + let blocks = + stream_chain(&db, tipset.clone().chain(&db), stateroot_lookup_limit).with_seen(seen); // Encode Ipld key-value pairs in zstd frames let frames = forest::Encoder::compress_stream(8000usize.next_power_of_two(), 3, blocks); diff --git a/src/cli/subcommands/archive_cmd.rs b/src/cli/subcommands/archive_cmd.rs index 8cb9ca6b7f25..8471cf5845e8 100644 --- a/src/cli/subcommands/archive_cmd.rs +++ b/src/cli/subcommands/archive_cmd.rs @@ -31,12 +31,15 @@ use crate::chain::index::{ChainIndex, ResolveNullTipset}; use crate::chain::ChainEpochDelta; use crate::cli_shared::{snapshot, snapshot::TrustedVendor}; use crate::db::car::AnyCar; +use crate::ipld::stream_graph; +use crate::ipld::CidHashSet; use crate::networks::{calibnet, mainnet, ChainConfig, NetworkChain}; use crate::shim::clock::EPOCH_DURATION_SECONDS; use crate::shim::clock::{ChainEpoch, EPOCHS_IN_DAY}; use anyhow::{bail, Context as _}; use chrono::NaiveDateTime; use clap::Subcommand; +use futures::TryStreamExt; use fvm_ipld_blockstore::Blockstore; use indicatif::ProgressIterator; use itertools::Itertools; @@ -69,6 +72,9 @@ pub enum ArchiveCommands { /// How many state-roots to include. Lower limit is 900 for `calibnet` and `mainnet`. #[arg(short, long, default_value_t = 2000)] depth: ChainEpochDelta, + /// Do not include any values reachable from epoch-diff. + #[arg(short, long)] + diff: Option, }, /// Print block headers at 30 day interval for a snapshot file Checkpoints { @@ -89,6 +95,7 @@ impl ArchiveCommands { output_path, epoch, depth, + diff, } => { info!( "indexing a car-backed store using snapshot: {}", @@ -97,7 +104,7 @@ impl ArchiveCommands { let reader = move || std::fs::File::open(&input_path); - do_export(reader, output_path, epoch, depth).await + do_export(reader, output_path, epoch, depth, diff).await } Self::Checkpoints { snapshot } => print_checkpoints(snapshot), } @@ -134,6 +141,7 @@ async fn do_export( output_path: PathBuf, epoch_option: Option, depth: ChainEpochDelta, + diff: Option, ) -> anyhow::Result<()> { let store = Arc::new(AnyCar::new(reader).context("couldn't read input CAR file")?); @@ -165,6 +173,18 @@ async fn do_export( .tipset_by_height(epoch, ts, ResolveNullTipset::TakeOlder) .context("unable to get a tipset at given height")?; + let seen = if let Some(diff) = diff { + let diff_ts: Arc = index + .tipset_by_height(diff, ts.clone(), ResolveNullTipset::TakeOlder) + .context("diff epoch must be smaller than target epoch")?; + let diff_ts: &Tipset = &diff_ts; + let mut stream = stream_graph(&store, diff_ts.clone().chain(&store)); + while stream.try_next().await?.is_some() {} + stream.into_seen() + } else { + CidHashSet::default() + }; + let output_path = build_output_path(network.to_string(), genesis.timestamp(), epoch, output_path); @@ -180,7 +200,7 @@ async fn do_export( output_path.to_str().unwrap_or_default() ); - crate::chain::export::(store, &ts, depth, writer, true).await?; + crate::chain::export::(store, &ts, depth, writer, seen, true).await?; Ok(()) } @@ -385,6 +405,7 @@ mod tests { output_path.path().into(), Some(0), 1, + None, ) .await .unwrap(); diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 39c56626c12f..1584b8f91412 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -270,17 +270,29 @@ enum Task { } pin_project! { - struct ChainStream { + pub struct ChainStream { #[pin] tipset_iter: T, db: DB, dfs: VecDeque, // Depth-first work queue. seen: CidHashSet, stateroot_limit: ChainEpoch, + fail_on_dead_links: bool, } } -/// Initializes a stream of blocks. +impl ChainStream { + pub fn with_seen(self, seen: CidHashSet) -> Self { + ChainStream { seen, ..self } + } + + pub fn into_seen(self) -> CidHashSet { + self.seen + } +} + +/// Stream all blocks that are reachable before the `stateroot_limit` epoch. After this limit, only +/// block headers are streamed. Any dead links are reported as errors. /// /// # Arguments /// @@ -293,13 +305,30 @@ pub fn stream_chain + Unpin>( db: DB, tipset_iter: T, stateroot_limit: ChainEpoch, -) -> impl Stream> { +) -> ChainStream { ChainStream { tipset_iter, db, dfs: VecDeque::new(), seen: CidHashSet::default(), stateroot_limit, + fail_on_dead_links: true, + } +} + +// Stream available graph in a depth-first search. All reachable nodes are touched and dead-links +// are ignored. +pub fn stream_graph + Unpin>( + db: DB, + tipset_iter: T, +) -> ChainStream { + ChainStream { + tipset_iter, + db, + dfs: VecDeque::new(), + seen: CidHashSet::default(), + stateroot_limit: 0, + fail_on_dead_links: false, } } @@ -316,9 +345,12 @@ impl + Unpin> Stream for ChainStream< match task { Emit(cid) => { let cid = *cid; - let data = this.db.get(&cid)?.ok_or(anyhow::anyhow!("missing key"))?; this.dfs.pop_front(); - return Poll::Ready(Some(Ok(Block { cid, data }))); + if let Some(data) = this.db.get(&cid)? { + return Poll::Ready(Some(Ok(Block { cid, data }))); + } else if *this.fail_on_dead_links { + return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {}", cid)))); + } } Iterate(dfs_iter) => { while let Some(ipld) = dfs_iter.next() { @@ -329,15 +361,18 @@ impl + Unpin> Stream for ChainStream< // 3. _: ignore all other links // Don't revisit what's already been visited. if should_save_block_to_snapshot(cid) && this.seen.insert(cid) { - let data = - this.db.get(&cid)?.ok_or(anyhow::anyhow!("missing key"))?; - - if cid.codec() == fvm_ipld_encoding::DAG_CBOR { - let ipld: Ipld = from_slice(&data)?; - dfs_iter.walk_next(ipld); + if let Some(data) = this.db.get(&cid)? { + if cid.codec() == fvm_ipld_encoding::DAG_CBOR { + let ipld: Ipld = from_slice(&data)?; + dfs_iter.walk_next(ipld); + } + return Poll::Ready(Some(Ok(Block { cid, data }))); + } else if *this.fail_on_dead_links { + return Poll::Ready(Some(Err(anyhow::anyhow!( + "missing key: {}", + cid + )))); } - - return Poll::Ready(Some(Ok(Block { cid, data }))); } } } @@ -351,29 +386,31 @@ impl + Unpin> Stream for ChainStream< // yield the block without walking the graph it represents. if let Some(tipset) = this.tipset_iter.as_mut().next() { for block in tipset.into_blocks().into_iter() { - // Make sure we always yield a block otherwise. - this.dfs.push_back(Emit(*block.cid())); - - if block.epoch() == 0 { - // The genesis block has some kind of dummy parent that needs to be emitted. - for p in block.parents().cids() { - this.dfs.push_back(Emit(*p)); + if this.seen.insert(*block.cid()) { + // Make sure we always yield a block otherwise. + this.dfs.push_back(Emit(*block.cid())); + + if block.epoch() == 0 { + // The genesis block has some kind of dummy parent that needs to be emitted. + for p in block.parents().cids() { + this.dfs.push_back(Emit(*p)); + } } - } - // Process block messages. - if block.epoch() > stateroot_limit { - this.dfs - .push_back(Iterate(DfsIter::from(*block.messages()))); - } + // Process block messages. + if block.epoch() > stateroot_limit { + this.dfs + .push_back(Iterate(DfsIter::from(*block.messages()))); + } - // Visit the block if it's within required depth. And a special case for `0` - // epoch to match Lotus' implementation. - if block.epoch() == 0 || block.epoch() > stateroot_limit { - // NOTE: In the original `walk_snapshot` implementation we walk the dag - // immediately. Which is what we do here as well, but using a queue. - this.dfs - .push_back(Iterate(DfsIter::from(*block.state_root()))); + // Visit the block if it's within required depth. And a special case for `0` + // epoch to match Lotus' implementation. + if block.epoch() == 0 || block.epoch() > stateroot_limit { + // NOTE: In the original `walk_snapshot` implementation we walk the dag + // immediately. Which is what we do here as well, but using a queue. + this.dfs + .push_back(Iterate(DfsIter::from(*block.state_root()))); + } } } } else { diff --git a/src/rpc/chain_api.rs b/src/rpc/chain_api.rs index 79b5b575763b..9ecae8f05229 100644 --- a/src/rpc/chain_api.rs +++ b/src/rpc/chain_api.rs @@ -9,6 +9,7 @@ use crate::blocks::{ BlockHeader, Tipset, }; use crate::chain::index::ResolveNullTipset; +use crate::ipld::CidHashSet; use crate::json::{cid::CidJson, message::json::MessageJson}; use crate::rpc_api::{ chain_api::*, @@ -85,6 +86,7 @@ where &start_ts, recent_roots, VoidAsyncWriter, + CidHashSet::default(), skip_checksum, ) .await @@ -95,6 +97,7 @@ where &start_ts, recent_roots, file, + CidHashSet::default(), skip_checksum, ) .await diff --git a/src/utils/net.rs b/src/utils/net.rs index 4f25bfa5c84d..8b4f4a248bf5 100644 --- a/src/utils/net.rs +++ b/src/utils/net.rs @@ -97,7 +97,11 @@ pub async fn decompress_if_needed( mut reader: impl AsyncBufRead + Unpin, ) -> anyhow::Result { Ok(match is_zstd(reader.fill_buf().await?) { - true => Left(ZstdDecoder::new(reader)), + true => { + let mut decoder = ZstdDecoder::new(reader); + decoder.multiple_members(true); + Left(decoder) + } false => Right(reader), }) }