Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add --diff flag to archive export #3284

Merged
merged 14 commits into from
Jul 31, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
option to `forest-cli state fetch` command.
- [#3213](https://github.com/ChainSafe/forest/pull/3213): Add support for
loading forest.car.zst files.
- [#3284](https://github.com/ChainSafe/forest/pull/3284): Add `--diff` flag to
`archive export`.

### Changed

Expand Down
20 changes: 19 additions & 1 deletion scripts/tests/forest_cli_check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,29 @@ pushd "$(mktemp --directory)"
"$FOREST_CLI_PATH" --chain calibnet snapshot fetch --vendor filops
# this will fail if they happen to have the same height - we should change the format of our filenames
test "$(num-files-here)" -eq 2
# verify that we are byte-for-byte identical with filops

: verify that we are byte-for-byte identical with filops
zstd -d filops_*.car.zst
"$FOREST_CLI_PATH" archive export filops_*.car -o exported_snapshot.car.zst
zstd -d exported_snapshot.car.zst
cmp --silent filops_*.car exported_snapshot.car

: verify that the exported snapshot is in ForestCAR.zst format
assert_eq "$(forest_query_format exported_snapshot.car.zst)" "ForestCARv1.zst"

: verify that diff exports contain the expected number of state roots
EPOCH=$(forest_query_epoch exported_snapshot.car.zst)
"$FOREST_CLI_PATH" archive export --epoch $((EPOCH-1100)) --depth 900 --output-path base_snapshot.forest.car.zst exported_snapshot.car.zst

BASE_EPOCH=$(forest_query_epoch base_snapshot.forest.car.zst)
assert_eq "$BASE_EPOCH" $((EPOCH-1100))

BASE_STATE_ROOTS=$(forest_query_state_roots base_snapshot.forest.car.zst)
assert_eq "$BASE_STATE_ROOTS" 900

"$FOREST_CLI_PATH" archive export --diff "$BASE_EPOCH" -o diff_snapshot.forest.car.zst exported_snapshot.car.zst
DIFF_STATE_ROOTS=$(forest_query_state_roots diff_snapshot.forest.car.zst)
assert_eq "$DIFF_STATE_ROOTS" 1100
rm -- *
popd

Expand Down
25 changes: 25 additions & 0 deletions scripts/tests/harness.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ function forest_check_db_stats {
$FOREST_CLI_PATH --chain calibnet db stats
}

function forest_query_epoch {
$FOREST_CLI_PATH archive info "$1" | grep Epoch | awk '{print $2}'
}

function forest_query_state_roots {
$FOREST_CLI_PATH archive info "$1" | grep State-roots | awk '{print $2}'
}

function forest_query_format {
$FOREST_CLI_PATH archive info "$1" | grep "CAR format" | awk '{print $3}'
}

function forest_run_node_detached {
echo "Running forest in detached mode"
$FOREST_PATH --chain calibnet --encrypt-keystore false --log-dir "$LOG_DIRECTORY" --detach --save-token ./admin_token --track-peak-rss
Expand Down Expand Up @@ -68,4 +80,17 @@ function forest_cleanup {
fi
}

function assert_eq {
local expected="$1"
local actual="$2"
local msg="${3-}"

if [ "$expected" == "$actual" ]; then
return 0
else
[ "${#msg}" -gt 0 ] && echo "$expected == $actual :: $msg"
return 1
fi
}

trap forest_cleanup EXIT
6 changes: 4 additions & 2 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod store;
mod weight;
use crate::blocks::Tipset;
use crate::db::car::forest;
use crate::ipld::stream_chain;
use crate::ipld::{stream_chain, CidHashSet};
use crate::utils::io::{AsyncWriterWithChecksum, Checksum};
use anyhow::{Context, Result};
use digest::Digest;
Expand All @@ -18,6 +18,7 @@ pub async fn export<D: Digest>(
tipset: &Tipset,
lookup_depth: ChainEpochDelta,
writer: impl AsyncWrite + Unpin,
seen: CidHashSet,
skip_checksum: bool,
) -> Result<Option<digest::Output<D>>, Error> {
let stateroot_lookup_limit = tipset.epoch() - lookup_depth;
Expand All @@ -28,7 +29,8 @@ pub async fn export<D: Digest>(

// Stream stateroots in range stateroot_lookup_limit..=tipset.epoch(). Also
// stream all block headers until genesis.
let blocks = stream_chain(&db, tipset.clone().chain(&db), stateroot_lookup_limit);
let blocks =
stream_chain(&db, tipset.clone().chain(&db), stateroot_lookup_limit).with_seen(seen);

// Encode Ipld key-value pairs in zstd frames
let frames = forest::Encoder::compress_stream(8000usize.next_power_of_two(), 3, blocks);
Expand Down
25 changes: 23 additions & 2 deletions src/cli/subcommands/archive_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ use crate::chain::index::{ChainIndex, ResolveNullTipset};
use crate::chain::ChainEpochDelta;
use crate::cli_shared::{snapshot, snapshot::TrustedVendor};
use crate::db::car::AnyCar;
use crate::ipld::stream_graph;
use crate::ipld::CidHashSet;
use crate::networks::{calibnet, mainnet, ChainConfig, NetworkChain};
use crate::shim::clock::EPOCH_DURATION_SECONDS;
use crate::shim::clock::{ChainEpoch, EPOCHS_IN_DAY};
use anyhow::{bail, Context as _};
use chrono::NaiveDateTime;
use clap::Subcommand;
use futures::TryStreamExt;
use fvm_ipld_blockstore::Blockstore;
use indicatif::ProgressIterator;
use itertools::Itertools;
Expand Down Expand Up @@ -69,6 +72,9 @@ pub enum ArchiveCommands {
/// How many state-roots to include. Lower limit is 900 for `calibnet` and `mainnet`.
#[arg(short, long, default_value_t = 2000)]
depth: ChainEpochDelta,
/// Do not include any values reachable from this epoch
#[arg(short, long)]
diff: Option<ChainEpochDelta>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit confusing, doc says epoch while code says epoch delta

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to epoch-diff.

},
/// Print block headers at 30 day interval for a snapshot file
Checkpoints {
Expand All @@ -89,6 +95,7 @@ impl ArchiveCommands {
output_path,
epoch,
depth,
diff,
} => {
info!(
"indexing a car-backed store using snapshot: {}",
Expand All @@ -97,7 +104,7 @@ impl ArchiveCommands {

let reader = move || std::fs::File::open(&input_path);

do_export(reader, output_path, epoch, depth).await
do_export(reader, output_path, epoch, depth, diff).await
}
Self::Checkpoints { snapshot } => print_checkpoints(snapshot),
}
Expand Down Expand Up @@ -134,6 +141,7 @@ async fn do_export<ReaderT: Read + Seek + Send + Sync>(
output_path: PathBuf,
epoch_option: Option<ChainEpoch>,
depth: ChainEpochDelta,
diff: Option<ChainEpochDelta>,
) -> anyhow::Result<()> {
let store = Arc::new(AnyCar::new(reader).context("couldn't read input CAR file")?);

Expand Down Expand Up @@ -165,6 +173,18 @@ async fn do_export<ReaderT: Read + Seek + Send + Sync>(
.tipset_by_height(epoch, ts, ResolveNullTipset::TakeOlder)
.context("unable to get a tipset at given height")?;

let seen = if let Some(diff) = diff {
let diff_ts: Arc<Tipset> = index
.tipset_by_height(diff, ts.clone(), ResolveNullTipset::TakeOlder)
.context("diff epoch must be smaller than target epoch")?;
let diff_ts: &Tipset = &diff_ts;
let mut stream = stream_graph(&store, diff_ts.clone().chain(&store));
while stream.try_next().await?.is_some() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to warn in into_seen when the stream is not fully consumed or move stream consuming logic into into_seen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't always want to consume the entire stream.

stream.into_seen()
} else {
CidHashSet::default()
};

let output_path =
build_output_path(network.to_string(), genesis.timestamp(), epoch, output_path);

Expand All @@ -180,7 +200,7 @@ async fn do_export<ReaderT: Read + Seek + Send + Sync>(
output_path.to_str().unwrap_or_default()
);

crate::chain::export::<Sha256>(store, &ts, depth, writer, true).await?;
crate::chain::export::<Sha256>(store, &ts, depth, writer, seen, true).await?;

Ok(())
}
Expand Down Expand Up @@ -385,6 +405,7 @@ mod tests {
output_path.path().into(),
Some(0),
1,
None,
)
.await
.unwrap();
Expand Down
98 changes: 66 additions & 32 deletions src/ipld/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,24 @@ enum Task {
}

pin_project! {
struct ChainStream<DB, T> {
pub struct ChainStream<DB, T> {
#[pin]
tipset_iter: T,
db: DB,
dfs: VecDeque<Task>, // Depth-first work queue.
seen: CidHashSet,
stateroot_limit: ChainEpoch,
fail_on_dead_links: bool,
}
}

impl<DB, T> ChainStream<DB, T> {
pub fn with_seen(self, seen: CidHashSet) -> Self {
ChainStream { seen, ..self }
}

pub fn into_seen(self) -> CidHashSet {
self.seen
}
}

Expand All @@ -293,13 +304,28 @@ pub fn stream_chain<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin>(
db: DB,
tipset_iter: T,
stateroot_limit: ChainEpoch,
) -> impl Stream<Item = anyhow::Result<Block>> {
) -> ChainStream<DB, T> {
ChainStream {
tipset_iter,
db,
dfs: VecDeque::new(),
seen: CidHashSet::default(),
stateroot_limit,
fail_on_dead_links: true,
}
}

pub fn stream_graph<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way to indicate fail_on_dead_links: false in the function name or doc? e.g. stream_graph_unstrict

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the documentation of both stream_graph and stream_chain.

db: DB,
tipset_iter: T,
) -> ChainStream<DB, T> {
ChainStream {
tipset_iter,
db,
dfs: VecDeque::new(),
seen: CidHashSet::default(),
stateroot_limit: 0,
fail_on_dead_links: false,
}
}

Expand All @@ -316,9 +342,12 @@ impl<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin> Stream for ChainStream<
match task {
Emit(cid) => {
let cid = *cid;
let data = this.db.get(&cid)?.ok_or(anyhow::anyhow!("missing key"))?;
this.dfs.pop_front();
return Poll::Ready(Some(Ok(Block { cid, data })));
if let Some(data) = this.db.get(&cid)? {
return Poll::Ready(Some(Ok(Block { cid, data })));
} else if *this.fail_on_dead_links {
return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {}", cid))));
}
}
Iterate(dfs_iter) => {
while let Some(ipld) = dfs_iter.next() {
Expand All @@ -329,15 +358,18 @@ impl<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin> Stream for ChainStream<
// 3. _: ignore all other links
// Don't revisit what's already been visited.
if should_save_block_to_snapshot(cid) && this.seen.insert(cid) {
let data =
this.db.get(&cid)?.ok_or(anyhow::anyhow!("missing key"))?;

if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
let ipld: Ipld = from_slice(&data)?;
dfs_iter.walk_next(ipld);
if let Some(data) = this.db.get(&cid)? {
if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
let ipld: Ipld = from_slice(&data)?;
dfs_iter.walk_next(ipld);
}
return Poll::Ready(Some(Ok(Block { cid, data })));
} else if *this.fail_on_dead_links {
return Poll::Ready(Some(Err(anyhow::anyhow!(
"missing key: {}",
cid
))));
}

return Poll::Ready(Some(Ok(Block { cid, data })));
}
}
}
Expand All @@ -351,29 +383,31 @@ impl<DB: Blockstore, T: Iterator<Item = Tipset> + Unpin> Stream for ChainStream<
// yield the block without walking the graph it represents.
if let Some(tipset) = this.tipset_iter.as_mut().next() {
for block in tipset.into_blocks().into_iter() {
// Make sure we always yield a block otherwise.
this.dfs.push_back(Emit(*block.cid()));

if block.epoch() == 0 {
// The genesis block has some kind of dummy parent that needs to be emitted.
for p in block.parents().cids() {
this.dfs.push_back(Emit(*p));
if this.seen.insert(*block.cid()) {
// Make sure we always yield a block otherwise.
this.dfs.push_back(Emit(*block.cid()));

if block.epoch() == 0 {
// The genesis block has some kind of dummy parent that needs to be emitted.
for p in block.parents().cids() {
this.dfs.push_back(Emit(*p));
}
}
}

// Process block messages.
if block.epoch() > stateroot_limit {
this.dfs
.push_back(Iterate(DfsIter::from(*block.messages())));
}
// Process block messages.
if block.epoch() > stateroot_limit {
this.dfs
.push_back(Iterate(DfsIter::from(*block.messages())));
}

// Visit the block if it's within required depth. And a special case for `0`
// epoch to match Lotus' implementation.
if block.epoch() == 0 || block.epoch() > stateroot_limit {
// NOTE: In the original `walk_snapshot` implementation we walk the dag
// immediately. Which is what we do here as well, but using a queue.
this.dfs
.push_back(Iterate(DfsIter::from(*block.state_root())));
// Visit the block if it's within required depth. And a special case for `0`
// epoch to match Lotus' implementation.
if block.epoch() == 0 || block.epoch() > stateroot_limit {
// NOTE: In the original `walk_snapshot` implementation we walk the dag
// immediately. Which is what we do here as well, but using a queue.
this.dfs
.push_back(Iterate(DfsIter::from(*block.state_root())));
}
}
}
} else {
Expand Down
3 changes: 3 additions & 0 deletions src/rpc/chain_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::blocks::{
BlockHeader, Tipset,
};
use crate::chain::index::ResolveNullTipset;
use crate::ipld::CidHashSet;
use crate::json::{cid::CidJson, message::json::MessageJson};
use crate::rpc_api::{
chain_api::*,
Expand Down Expand Up @@ -85,6 +86,7 @@ where
&start_ts,
recent_roots,
VoidAsyncWriter,
CidHashSet::default(),
skip_checksum,
)
.await
Expand All @@ -95,6 +97,7 @@ where
&start_ts,
recent_roots,
file,
CidHashSet::default(),
skip_checksum,
)
.await
Expand Down
6 changes: 5 additions & 1 deletion src/utils/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ pub async fn decompress_if_needed(
mut reader: impl AsyncBufRead + Unpin,
) -> anyhow::Result<impl AsyncRead> {
Ok(match is_zstd(reader.fill_buf().await?) {
true => Left(ZstdDecoder::new(reader)),
true => {
let mut decoder = ZstdDecoder::new(reader);
decoder.multiple_members(true);
Left(decoder)
}
false => Right(reader),
})
}
Expand Down