Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: forest-cli car concat #3150

Merged
merged 44 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
f984ba9
feat: forest-cli car concat
hanabi1224 Jul 7, 2023
5d3adb9
changelog
hanabi1224 Jul 7, 2023
a005591
Merge remote-tracking branch 'origin/main' into hm/car-concat
hanabi1224 Jul 13, 2023
8ab4d7d
BufReader
hanabi1224 Jul 13, 2023
11bc145
Apply suggestions from code review
hanabi1224 Jul 13, 2023
d9992f4
BufWriter
hanabi1224 Jul 13, 2023
cca30ac
stream API
hanabi1224 Jul 13, 2023
c752e05
Merge branch 'main' into hm/car-concat
hanabi1224 Jul 13, 2023
83330a0
typo
hanabi1224 Jul 13, 2023
08df700
accept arbitrary num of car files
hanabi1224 Jul 13, 2023
3b2e465
rm tokio::spawn
hanabi1224 Jul 13, 2023
d98fca5
Update src/cli/subcommands/car_cmd.rs
hanabi1224 Jul 13, 2023
fd3fce3
looping instead of async_recursion
hanabi1224 Jul 13, 2023
601c2f4
Merge remote-tracking branch 'origin/main' into hm/car-concat
hanabi1224 Jul 14, 2023
efd59be
quickcheck tests
hanabi1224 Jul 14, 2023
4059a00
more stream API
hanabi1224 Jul 14, 2023
9525bec
Apply suggestions from code review
hanabi1224 Jul 14, 2023
1cab372
Merge remote-tracking branch 'origin/main' into hm/car-concat
hanabi1224 Jul 14, 2023
a47238c
remove Box<Pin< from dedup_block_stream
hanabi1224 Jul 14, 2023
059de74
more stream api
hanabi1224 Jul 14, 2023
204a159
Merge remote-tracking branch 'origin/main' into hm/car-concat
hanabi1224 Jul 14, 2023
c6180d0
simplify flat_map
hanabi1224 Jul 14, 2023
ebdcb66
Update src/cli/subcommands/car_cmd.rs
hanabi1224 Jul 14, 2023
3995c44
add notes
hanabi1224 Jul 14, 2023
abe8b5d
simplify code
hanabi1224 Jul 14, 2023
c3241ea
Merge remote-tracking branch 'origin/main' into hm/car-concat
hanabi1224 Jul 17, 2023
72e6280
tests
hanabi1224 Jul 17, 2023
122f06c
Update src/cli/subcommands/car_cmd.rs
hanabi1224 Jul 17, 2023
9d9879f
Merge remote-tracking branch 'origin/main' into hm/car-concat
hanabi1224 Jul 17, 2023
bc5704c
union properties
hanabi1224 Jul 17, 2023
662dedc
test
hanabi1224 Jul 17, 2023
952a984
test
hanabi1224 Jul 17, 2023
4b3d1dc
test
hanabi1224 Jul 17, 2023
859c2e6
update test
hanabi1224 Jul 17, 2023
d051094
From trait
hanabi1224 Jul 17, 2023
dc54cce
to_stream
hanabi1224 Jul 17, 2023
40d408f
Merge branch 'main' into hm/car-concat
hanabi1224 Jul 17, 2023
46653f0
cleanup
hanabi1224 Jul 17, 2023
d879e0b
resolve comments
hanabi1224 Jul 17, 2023
f6e5171
block_on from futures
hanabi1224 Jul 17, 2023
0a12ef4
resolve comments
hanabi1224 Jul 17, 2023
d03dbf4
Update src/cli/subcommands/car_cmd.rs
hanabi1224 Jul 17, 2023
7f768fa
Merge branch 'main' into hm/car-concat
hanabi1224 Jul 17, 2023
d518d8f
fix build
hanabi1224 Jul 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
`forest-cli archive info` command for inspecting archives.
- [#3159](https://github.com/ChainSafe/forest/issues/3159): Add
`forest-cli archive export -e=X` command for exporting archives.
- [#3150](https://github.com/ChainSafe/forest/pull/3150):
`forest-cli car concat` subcommand for concatenating 2 `.car` files.
hanabi1224 marked this conversation as resolved.
Show resolved Hide resolved

### Changed

Expand Down
1 change: 1 addition & 0 deletions src/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ where
Subcommand::Archive(cmd) => cmd.run(config).await,
Subcommand::Attach(cmd) => cmd.run(config),
Subcommand::Shutdown(cmd) => cmd.run(config).await,
Subcommand::Car(cmd) => cmd.run().await,
}
}
Err(e) => {
Expand Down
125 changes: 125 additions & 0 deletions src/cli/subcommands/car_cmd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2019-2023 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use std::path::PathBuf;

use async_recursion::async_recursion;
use clap::Subcommand;
use futures::{AsyncRead, StreamExt};
use fvm_ipld_car::Block;
use fvm_ipld_car::CarHeader;
use fvm_ipld_car::CarReader;
use tokio_util::compat::TokioAsyncReadCompatExt;

use crate::ipld::CidHashSet;

#[derive(Debug, Subcommand)]
pub enum CarCommands {
Concat {
/// A list of `.car` file paths
car_files: Vec<PathBuf>,
/// The output `.car` file path
#[arg(short, long)]
output: PathBuf,
},
}

impl CarCommands {
pub async fn run(self) -> anyhow::Result<()> {
match self {
Self::Concat { car_files, output } => {
let mut readers = Vec::with_capacity(car_files.len());
for f in car_files {
readers.push(
CarReader::new(
tokio::io::BufReader::new(tokio::fs::File::open(f).await?).compat(),
)
.await?,
);
}
hanabi1224 marked this conversation as resolved.
Show resolved Hide resolved
let mut roots = vec![];
{
let mut seen = CidHashSet::default();
for reader in &readers {
for &root in &reader.header.roots {
if seen.insert(root) {
println!("roots.push {root}");
roots.push(root);
}
}
}
}

let mut stream = Box::pin(
futures::stream::unfold(
MultiCarDedupReader::new(readers),
move |mut reader| async {
reader
.next_block()
.await
.expect("Failed calling `MultiCarDedupReader::next_block`")
.map(|b| (b, reader))
},
)
.map(|out| (out.cid, out.data)),
);

let car_writer = CarHeader::from(roots);
let mut output_file =
tokio::io::BufWriter::new(tokio::fs::File::create(output).await?).compat();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

car_writer
.write_stream_async(&mut output_file, &mut stream)
.await?;
}
}
Ok(())
}
}

struct MultiCarDedupReader<R>
where
R: AsyncRead + Send + Unpin,
{
readers: Vec<CarReader<R>>,
index: usize,
seen: CidHashSet,
}

impl<R> MultiCarDedupReader<R>
where
R: AsyncRead + Send + Unpin,
{
fn new(readers: Vec<CarReader<R>>) -> Self {
Self {
readers,
index: 0,
seen: Default::default(),
}
}

#[async_recursion]
lemmih marked this conversation as resolved.
Show resolved Hide resolved
async fn next_block(&mut self) -> Result<Option<Block>, fvm_ipld_car::Error> {
while let Some(block) = if self.index >= self.readers.len() {
Ok(None)
} else if let Some(block) = self.readers[self.index].next_block().await? {
// Note: Using while loop here because below code causes stack overflow in unit tests
// ```rust
// if self.seen.insert(block.cid) {
// Ok(Some(block))
// } else {
// self.next_block().await
// }
// ```
Ok(Some(block))
} else {
self.index += 1;
self.next_block().await
}? {
if self.seen.insert(block.cid) {
return Ok(Some(block));
}
}

Ok(None)
}
}
7 changes: 6 additions & 1 deletion src/cli/subcommands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
mod archive_cmd;
mod attach_cmd;
mod auth_cmd;
mod car_cmd;
mod chain_cmd;
mod config_cmd;
mod db_cmd;
Expand Down Expand Up @@ -37,7 +38,7 @@ use tracing::error;

pub(super) use self::{
archive_cmd::ArchiveCommands, attach_cmd::AttachCommand, auth_cmd::AuthCommands,
chain_cmd::ChainCommands, config_cmd::ConfigCommands, db_cmd::DBCommands,
car_cmd::CarCommands, chain_cmd::ChainCommands, config_cmd::ConfigCommands, db_cmd::DBCommands,
fetch_params_cmd::FetchCommands, mpool_cmd::MpoolCommands, net_cmd::NetCommands,
send_cmd::SendCommand, shutdown_cmd::ShutdownCommand, snapshot_cmd::SnapshotCommands,
state_cmd::StateCommands, sync_cmd::SyncCommands, wallet_cmd::WalletCommands,
Expand Down Expand Up @@ -118,6 +119,10 @@ pub enum Subcommand {

/// Shutdown Forest
Shutdown(ShutdownCommand),

/// Utilities for manipulating CAR files
#[command(subcommand)]
Car(CarCommands),
}

/// Pretty-print a JSON-RPC error and exit
Expand Down
118 changes: 118 additions & 0 deletions tests/car_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2019-2023 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

pub mod common;

use std::path::Path;

use anyhow::*;
use cid::{
multihash::{self, MultihashDigest},
Cid,
};
use futures::StreamExt;
use fvm_ipld_car::{CarHeader, CarReader};
use fvm_ipld_encoding::DAG_CBOR;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use tempfile::NamedTempFile;
use tokio_util::compat::TokioAsyncReadCompatExt;

use crate::common::cli;

#[tokio::test]
async fn forest_cli_car_concat() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these tests are useful. Correct me if I'm wrong, but they just check if the output file is a valid CAR file, right? I can imagine lots of incorrect implementations that would pass these tests.

Let's either remove the tests or make them more rigorous.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike unit tests, the main purpose of these bin tests ensure the executable runs without failures, and the assertions ensure the output file exists and can be parsed by CarReader. Any suggestions on adding more checks of the output car file?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would love unit tests that didn't use any files at all.

This PR is about a function that computes the union of a list of sets. We can test that thoroughly with quickcheck. Generate arbitrary sets of data and verify that the union contains all the elements of the input sets with no duplicates.

∀AB. A⊆(A∪B)
∀AB. B⊆(A∪B)
∀AB. A∪B = B∪A
∀ABC. (A ∪ B) ∪ C = A ∪ (B ∪ C)
∀A. A∪A = A

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asides:

  • I think we should invest in some mock IPLD/ car creation utils rather than checking in test files
  • I think we should have better guidelines on when and where to write integration tests. Part of the "engineering values" piece

let a = NamedTempFile::new()?;
new_car(1024, a.path()).await?;
let b = NamedTempFile::new()?;
new_car(2048, b.path()).await?;
let output = NamedTempFile::new()?;

cli()?
.arg("car")
.arg("concat")
.arg(a.path().as_os_str().to_str().unwrap())
.arg(b.path().as_os_str().to_str().unwrap())
.arg("-o")
.arg(output.path().as_os_str().to_str().unwrap())
.assert()
.success();

validate_car(output.path()).await?;

Ok(())
}

#[tokio::test]
async fn forest_cli_car_concat_same_file() -> Result<()> {
let output = NamedTempFile::new()?;

cli()?
.arg("car")
.arg("concat")
.arg("./test-snapshots/chain4.car")
.arg("./test-snapshots/chain4.car")
.arg("-o")
.arg(output.path().as_os_str().to_str().unwrap())
.assert()
.success();

validate_car(output.path()).await?;

Ok(())
}

#[tokio::test]
async fn forest_cli_car_concat_same_file_3_times() -> Result<()> {
let output = NamedTempFile::new()?;

cli()?
.arg("car")
.arg("concat")
.arg("./test-snapshots/chain4.car")
.arg("./test-snapshots/chain4.car")
.arg("./test-snapshots/chain4.car")
.arg("-o")
.arg(output.path().as_os_str().to_str().unwrap())
.assert()
.success();

validate_car(output.path()).await?;

Ok(())
}

async fn new_car(size: usize, path: impl AsRef<Path>) -> Result<()> {
let rng = SmallRng::seed_from_u64(0xdeadbeef);
let (cid, _data) = new_block(&mut rng.clone());
let header = CarHeader::from(vec![cid]);

let mut block_stream = Box::pin(
futures::stream::unfold(rng, |mut rng| async { Some((new_block(&mut rng), rng)) })
.take(size),
);

let mut writer = tokio::fs::File::create(path).await?.compat();
header
.write_stream_async(&mut writer, &mut block_stream)
.await?;

Ok(())
}

fn new_block(rng: &mut SmallRng) -> (Cid, Vec<u8>) {
let mut data = [0; 64];
rng.fill(&mut data);
let cid = Cid::new_v1(DAG_CBOR, multihash::Code::Blake2b256.digest(&data));
(cid, data.to_vec())
}

async fn validate_car(path: impl AsRef<Path>) -> Result<()> {
let mut reader = CarReader::new(tokio::fs::File::open(path).await?.compat()).await?;
assert!(reader.validate);
let mut count = 0;
while reader.next_block().await?.is_some() {
count += 1;
}
println!("Result car block count: {count}");
Ok(())
}
4 changes: 4 additions & 0 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ use assert_cmd::Command;
use tempfile::TempDir;

pub fn cli() -> Result<Command> {
Ok(Command::cargo_bin("forest-cli")?)
}

pub fn daemon() -> Result<Command> {
Ok(Command::cargo_bin("forest")?)
}

Expand Down
4 changes: 2 additions & 2 deletions tests/import_snapshot_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ pub mod common;

use anyhow::Result;

use crate::common::{cli, create_tmp_config, CommonEnv};
use crate::common::{create_tmp_config, daemon, CommonEnv};

#[test]
fn importing_bad_snapshot_should_fail() -> Result<()> {
let (config_file, data_dir) = create_tmp_config()?;
let temp_file = data_dir.path().join("bad-snapshot.car");
std::fs::write(&temp_file, "bad-snapshot")?;
cli()?
daemon()?
.common_env()
.arg("--rpc-address")
.arg("127.0.0.1:0")
Expand Down
10 changes: 5 additions & 5 deletions tests/keystore_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use forest_filecoin::{
KeyStore, KeyStoreConfig, ENCRYPTED_KEYSTORE_NAME, FOREST_KEYSTORE_PHRASE_ENV, KEYSTORE_NAME,
};

use crate::common::{cli, create_tmp_config, CommonArgs};
use crate::common::{create_tmp_config, daemon, CommonArgs};

// https://github.com/ChainSafe/forest/issues/2499
#[test]
fn forest_headless_encrypt_keystore_no_passphrase_should_fail() -> Result<()> {
let (config_file, _data_dir) = create_tmp_config()?;
cli()?
daemon()?
.common_args()
.arg("--config")
.arg(config_file)
Expand All @@ -28,7 +28,7 @@ fn forest_headless_encrypt_keystore_no_passphrase_should_fail() -> Result<()> {
#[test]
fn forest_headless_no_encrypt_no_passphrase_should_succeed() -> Result<()> {
let (config_file, data_dir) = create_tmp_config()?;
cli()?
daemon()?
.common_args()
.arg("--config")
.arg(config_file)
Expand All @@ -45,7 +45,7 @@ fn forest_headless_no_encrypt_no_passphrase_should_succeed() -> Result<()> {
#[test]
fn forest_headless_encrypt_keystore_with_passphrase_should_succeed() -> Result<()> {
let (config_file, data_dir) = create_tmp_config()?;
cli()?
daemon()?
.env(FOREST_KEYSTORE_PHRASE_ENV, "hunter2")
.common_args()
.arg("--config")
Expand All @@ -61,7 +61,7 @@ fn forest_headless_encrypt_keystore_with_passphrase_should_succeed() -> Result<(
fn should_create_jwt_admin_token() -> Result<()> {
let (config_file, data_dir) = create_tmp_config()?;
let token_path = data_dir.path().join("admin-token");
cli()?
daemon()?
.common_args()
.arg("--config")
.arg(config_file)
Expand Down