Skip to content

Commit

Permalink
feat: add --diff flag to archive export (#3284)
Browse files Browse the repository at this point in the history
  • Loading branch information
lemmih authored Jul 31, 2023
1 parent 1fcea0c commit 25be5be
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 39 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
option to `forest-cli state fetch` command.
- [#3213](https://github.com/ChainSafe/forest/pull/3213): Add support for
loading forest.car.zst files.
- [#3284](https://github.com/ChainSafe/forest/pull/3284): Add `--diff` flag to
`archive export`.

### Changed

Expand Down
20 changes: 19 additions & 1 deletion scripts/tests/forest_cli_check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,29 @@ pushd "$(mktemp --directory)"
"$FOREST_CLI_PATH" --chain calibnet snapshot fetch --vendor filops
# this will fail if they happen to have the same height - we should change the format of our filenames
test "$(num-files-here)" -eq 2
# verify that we are byte-for-byte identical with filops

: verify that we are byte-for-byte identical with filops
zstd -d filops_*.car.zst
"$FOREST_CLI_PATH" archive export filops_*.car -o exported_snapshot.car.zst
zstd -d exported_snapshot.car.zst
cmp --silent filops_*.car exported_snapshot.car

: verify that the exported snapshot is in ForestCAR.zst format
assert_eq "$(forest_query_format exported_snapshot.car.zst)" "ForestCARv1.zst"

: verify that diff exports contain the expected number of state roots
EPOCH=$(forest_query_epoch exported_snapshot.car.zst)
"$FOREST_CLI_PATH" archive export --epoch $((EPOCH-1100)) --depth 900 --output-path base_snapshot.forest.car.zst exported_snapshot.car.zst

BASE_EPOCH=$(forest_query_epoch base_snapshot.forest.car.zst)
assert_eq "$BASE_EPOCH" $((EPOCH-1100))

BASE_STATE_ROOTS=$(forest_query_state_roots base_snapshot.forest.car.zst)
assert_eq "$BASE_STATE_ROOTS" 900

"$FOREST_CLI_PATH" archive export --diff "$BASE_EPOCH" -o diff_snapshot.forest.car.zst exported_snapshot.car.zst
DIFF_STATE_ROOTS=$(forest_query_state_roots diff_snapshot.forest.car.zst)
assert_eq "$DIFF_STATE_ROOTS" 1100
rm -- *
popd

Expand Down
25 changes: 25 additions & 0 deletions scripts/tests/harness.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ function forest_check_db_stats {
$FOREST_CLI_PATH --chain calibnet db stats
}

function forest_query_epoch {
$FOREST_CLI_PATH archive info "$1" | grep Epoch | awk '{print $2}'
}

function forest_query_state_roots {
$FOREST_CLI_PATH archive info "$1" | grep State-roots | awk '{print $2}'
}

function forest_query_format {
$FOREST_CLI_PATH archive info "$1" | grep "CAR format" | awk '{print $3}'
}

function forest_run_node_detached {
echo "Running forest in detached mode"
$FOREST_PATH --chain calibnet --encrypt-keystore false --log-dir "$LOG_DIRECTORY" --detach --save-token ./admin_token --track-peak-rss
Expand Down Expand Up @@ -68,4 +80,17 @@ function forest_cleanup {
fi
}

function assert_eq {
local expected="$1"
local actual="$2"
local msg="${3-}"

if [ "$expected" == "$actual" ]; then
return 0
else
[ "${#msg}" -gt 0 ] && echo "$expected == $actual :: $msg"
return 1
fi
}

trap forest_cleanup EXIT
6 changes: 4 additions & 2 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod store;
mod weight;
use crate::blocks::Tipset;
use crate::db::car::forest;
use crate::ipld::stream_chain;
use crate::ipld::{stream_chain, CidHashSet};
use crate::utils::io::{AsyncWriterWithChecksum, Checksum};
use anyhow::{Context, Result};
use digest::Digest;
Expand All @@ -18,6 +18,7 @@ pub async fn export<D: Digest>(
tipset: &Tipset,
lookup_depth: ChainEpochDelta,
writer: impl AsyncWrite + Unpin,
seen: CidHashSet,
skip_checksum: bool,
) -> Result<Option<digest::Output<D>>, Error> {
let stateroot_lookup_limit = tipset.epoch() - lookup_depth;
Expand All @@ -28,7 +29,8 @@ pub async fn export<D: Digest>(

// Stream stateroots in range stateroot_lookup_limit..=tipset.epoch(). Also
// stream all block headers until genesis.
let blocks = stream_chain(&db, tipset.clone().chain(&db), stateroot_lookup_limit);
let blocks =
stream_chain(&db, tipset.clone().chain(&db), stateroot_lookup_limit).with_seen(seen);

// Encode Ipld key-value pairs in zstd frames
let frames = forest::Encoder::compress_stream(8000usize.next_power_of_two(), 3, blocks);
Expand Down
25 changes: 23 additions & 2 deletions src/cli/subcommands/archive_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ use crate::chain::index::{ChainIndex, ResolveNullTipset};
use crate::chain::ChainEpochDelta;
use crate::cli_shared::{snapshot, snapshot::TrustedVendor};
use crate::db::car::AnyCar;
use crate::ipld::stream_graph;
use crate::ipld::CidHashSet;
use crate::networks::{calibnet, mainnet, ChainConfig, NetworkChain};
use crate::shim::clock::EPOCH_DURATION_SECONDS;
use crate::shim::clock::{ChainEpoch, EPOCHS_IN_DAY};
use anyhow::{bail, Context as _};
use chrono::NaiveDateTime;
use clap::Subcommand;
use futures::TryStreamExt;
use fvm_ipld_blockstore::Blockstore;
use indicatif::ProgressIterator;
use itertools::Itertools;
Expand Down Expand Up @@ -69,6 +72,9 @@ pub enum ArchiveCommands {
/// How many state-roots to include. Lower limit is 900 for `calibnet` and `mainnet`.
#[arg(short, long, default_value_t = 2000)]
depth: ChainEpochDelta,
/// Do not include any values reachable from epoch-diff.
#[arg(short, long)]
diff: Option<ChainEpochDelta>,
},
/// Print block headers at 30 day interval for a snapshot file
Checkpoints {
Expand All @@ -89,6 +95,7 @@ impl ArchiveCommands {
output_path,
epoch,
depth,
diff,
} => {
info!(
"indexing a car-backed store using snapshot: {}",
Expand All @@ -97,7 +104,7 @@ impl ArchiveCommands {

let reader = move || std::fs::File::open(&input_path);

do_export(reader, output_path, epoch, depth).await
do_export(reader, output_path, epoch, depth, diff).await
}
Self::Checkpoints { snapshot } => print_checkpoints(snapshot),
}
Expand Down Expand Up @@ -134,6 +141,7 @@ async fn do_export<ReaderT: Read + Seek + Send + Sync>(
output_path: PathBuf,
epoch_option: Option<ChainEpoch>,
depth: ChainEpochDelta,
diff: Option<ChainEpochDelta>,
) -> anyhow::Result<()> {
let store = Arc::new(AnyCar::new(reader).context("couldn't read input CAR file")?);

Expand Down Expand Up @@ -165,6 +173,18 @@ async fn do_export<ReaderT: Read + Seek + Send + Sync>(
.tipset_by_height(epoch, ts, ResolveNullTipset::TakeOlder)
.context("unable to get a tipset at given height")?;

let seen = if let Some(diff) = diff {
let diff_ts: Arc<Tipset> = index
.tipset_by_height(diff, ts.clone(), ResolveNullTipset::TakeOlder)
.context("diff epoch must be smaller than target epoch")?;
let diff_ts: &Tipset = &diff_ts;
let mut stream = stream_graph(&store, diff_ts.clone().chain(&store));
while stream.try_next().await?.is_some() {}
stream.into_seen()
} else {
CidHashSet::default()
};

let output_path =
build_output_path(network.to_string(), genesis.timestamp(), epoch, output_path);

Expand All @@ -180,7 +200,7 @@ async fn do_export<ReaderT: Read + Seek + Send + Sync>(
output_path.to_str().unwrap_or_default()
);

crate::chain::export::<Sha256>(store, &ts, depth, writer, true).await?;
crate::chain::export::<Sha256>(store, &ts, depth, writer, seen, true).await?;

Ok(())
}
Expand Down Expand Up @@ -385,6 +405,7 @@ mod tests {
output_path.path().into(),
Some(0),
1,
None,
)
.await
.unwrap();
Expand Down
103 changes: 70 additions & 33 deletions src/ipld/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,29 @@ enum Task {
}

pin_project! {
struct ChainStream<DB, T> {
pub struct ChainStream<DB, T> {
#[pin]
tipset_iter: T,
db: DB,
dfs: VecDeque<Task>, // Depth-first work queue.
seen: CidHashSet,
stateroot_limit: ChainEpoch,
fail_on_dead_links: bool,
}
}

/// Initializes a stream of blocks.
impl<DB, T> ChainStream<DB, T> {
pub fn with_seen(self, seen: CidHashSet) -> Self {
ChainStream { seen, ..self }
}

pub fn into_seen(self) -> CidHashSet {
self.seen
}
}

/// Stream all blocks that are reachable before the `stateroot_limit` epoch. After this limit, only
/// block headers are streamed. Any dead links are reported as errors.
///
/// # Arguments
///
Expand All @@ -293,13 +305,30 @@ pub fn stream_chain<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin>(
db: DB,
tipset_iter: T,
stateroot_limit: ChainEpoch,
) -> impl Stream<Item = anyhow::Result<Block>> {
) -> ChainStream<DB, T> {
ChainStream {
tipset_iter,
db,
dfs: VecDeque::new(),
seen: CidHashSet::default(),
stateroot_limit,
fail_on_dead_links: true,
}
}

// Stream available graph in a depth-first search. All reachable nodes are touched and dead-links
// are ignored.
pub fn stream_graph<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin>(
db: DB,
tipset_iter: T,
) -> ChainStream<DB, T> {
ChainStream {
tipset_iter,
db,
dfs: VecDeque::new(),
seen: CidHashSet::default(),
stateroot_limit: 0,
fail_on_dead_links: false,
}
}

Expand All @@ -316,9 +345,12 @@ impl<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin> Stream for ChainStream<
match task {
Emit(cid) => {
let cid = *cid;
let data = this.db.get(&cid)?.ok_or(anyhow::anyhow!("missing key"))?;
this.dfs.pop_front();
return Poll::Ready(Some(Ok(Block { cid, data })));
if let Some(data) = this.db.get(&cid)? {
return Poll::Ready(Some(Ok(Block { cid, data })));
} else if *this.fail_on_dead_links {
return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {}", cid))));
}
}
Iterate(dfs_iter) => {
while let Some(ipld) = dfs_iter.next() {
Expand All @@ -329,15 +361,18 @@ impl<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin> Stream for ChainStream<
// 3. _: ignore all other links
// Don't revisit what's already been visited.
if should_save_block_to_snapshot(cid) && this.seen.insert(cid) {
let data =
this.db.get(&cid)?.ok_or(anyhow::anyhow!("missing key"))?;

if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
let ipld: Ipld = from_slice(&data)?;
dfs_iter.walk_next(ipld);
if let Some(data) = this.db.get(&cid)? {
if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
let ipld: Ipld = from_slice(&data)?;
dfs_iter.walk_next(ipld);
}
return Poll::Ready(Some(Ok(Block { cid, data })));
} else if *this.fail_on_dead_links {
return Poll::Ready(Some(Err(anyhow::anyhow!(
"missing key: {}",
cid
))));
}

return Poll::Ready(Some(Ok(Block { cid, data })));
}
}
}
Expand All @@ -351,29 +386,31 @@ impl<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin> Stream for ChainStream<
// yield the block without walking the graph it represents.
if let Some(tipset) = this.tipset_iter.as_mut().next() {
for block in tipset.into_blocks().into_iter() {
// Make sure we always yield a block otherwise.
this.dfs.push_back(Emit(*block.cid()));

if block.epoch() == 0 {
// The genesis block has some kind of dummy parent that needs to be emitted.
for p in block.parents().cids() {
this.dfs.push_back(Emit(*p));
if this.seen.insert(*block.cid()) {
// Make sure we always yield a block otherwise.
this.dfs.push_back(Emit(*block.cid()));

if block.epoch() == 0 {
// The genesis block has some kind of dummy parent that needs to be emitted.
for p in block.parents().cids() {
this.dfs.push_back(Emit(*p));
}
}
}

// Process block messages.
if block.epoch() > stateroot_limit {
this.dfs
.push_back(Iterate(DfsIter::from(*block.messages())));
}
// Process block messages.
if block.epoch() > stateroot_limit {
this.dfs
.push_back(Iterate(DfsIter::from(*block.messages())));
}

// Visit the block if it's within required depth. And a special case for `0`
// epoch to match Lotus' implementation.
if block.epoch() == 0 || block.epoch() > stateroot_limit {
// NOTE: In the original `walk_snapshot` implementation we walk the dag
// immediately. Which is what we do here as well, but using a queue.
this.dfs
.push_back(Iterate(DfsIter::from(*block.state_root())));
// Visit the block if it's within required depth. And a special case for `0`
// epoch to match Lotus' implementation.
if block.epoch() == 0 || block.epoch() > stateroot_limit {
// NOTE: In the original `walk_snapshot` implementation we walk the dag
// immediately. Which is what we do here as well, but using a queue.
this.dfs
.push_back(Iterate(DfsIter::from(*block.state_root())));
}
}
}
} else {
Expand Down
3 changes: 3 additions & 0 deletions src/rpc/chain_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::blocks::{
BlockHeader, Tipset,
};
use crate::chain::index::ResolveNullTipset;
use crate::ipld::CidHashSet;
use crate::json::{cid::CidJson, message::json::MessageJson};
use crate::rpc_api::{
chain_api::*,
Expand Down Expand Up @@ -85,6 +86,7 @@ where
&start_ts,
recent_roots,
VoidAsyncWriter,
CidHashSet::default(),
skip_checksum,
)
.await
Expand All @@ -95,6 +97,7 @@ where
&start_ts,
recent_roots,
file,
CidHashSet::default(),
skip_checksum,
)
.await
Expand Down
6 changes: 5 additions & 1 deletion src/utils/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ pub async fn decompress_if_needed(
mut reader: impl AsyncBufRead + Unpin,
) -> anyhow::Result<impl AsyncRead> {
Ok(match is_zstd(reader.fill_buf().await?) {
true => Left(ZstdDecoder::new(reader)),
true => {
let mut decoder = ZstdDecoder::new(reader);
decoder.multiple_members(true);
Left(decoder)
}
false => Right(reader),
})
}
Expand Down

0 comments on commit 25be5be

Please sign in to comment.