Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: inline actor bundles #3276

Merged
merged 15 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 84 additions & 74 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 13 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,19 @@ tokio-test = "0.4.2"

[build-dependencies]
anyhow = "1"
protobuf-codegen = "3.2"
walkdir = "2.3"
async-compression = { version = "0.4", features = ["futures-io", "zstd"] }
async-fs = "1"
cid = { version = "0.10", default-features = false, features = ["std"] }
futures = "0.3"
fvm_ipld_car = "0.7"
itertools = "0.11"
once_cell = "1"
protobuf-codegen = "3"
reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] }
tempfile = "3"
tokio = { version = "1", features = ['full'] }
url = "2"
walkdir = "2"

# This needs to be set as default. Otherwise, a regular build or test will produce
# gargantuan artifacts (around 70G for all tests). For a debugging session, you can
Expand Down
169 changes: 165 additions & 4 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,187 @@
// Copyright 2019-2023 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use std::path::PathBuf;
mod src;
use src::{ActorBundleInfo, ACTOR_BUNDLES};

use std::{
env, fs, io,
path::{Path, PathBuf},
pin::pin,
};

use anyhow::Context;
use async_compression::futures::write::ZstdEncoder;
use cid::Cid;
use futures::{
io::{BufReader, BufWriter},
AsyncRead, AsyncWriteExt, Stream, StreamExt, TryStreamExt,
};
use fvm_ipld_car::{CarHeader, CarReader};
use itertools::Itertools;
use once_cell::sync::Lazy;
use protobuf_codegen::Customize;
use reqwest::Url;
use tokio::task::JoinSet;
use walkdir::WalkDir;

const PROTO_DIR: &str = "proto";
const CARGO_OUT_DIR: &str = "proto";

fn main() -> anyhow::Result<()> {
// Using a local path instead of `OUT_DIR` to reuse the cache as much as possible
static ACTOR_BUNDLE_CACHE_DIR: Lazy<PathBuf> =
Lazy::new(|| Path::new("target/actor_bundles/").to_owned());

pub fn global_http_client() -> reqwest::Client {
static CLIENT: Lazy<reqwest::Client> = Lazy::new(reqwest::Client::new);
CLIENT.clone()
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
generate_protobuf_code()?;

generate_compressed_actor_bundles().await?;

Ok(())
}

async fn generate_compressed_actor_bundles() -> anyhow::Result<()> {
println!("cargo:rerun-if-changed=src/mod.rs");
println!(
"cargo:rerun-if-changed={}",
ACTOR_BUNDLE_CACHE_DIR.display()
);

let mut tasks = JoinSet::new();
for ActorBundleInfo { manifest, url } in ACTOR_BUNDLES.iter() {
tasks.spawn(async move {
download_bundle_if_needed(manifest, url)
.await
.with_context(|| format!("Failed to get {manifest}.car from {url}"))
});
}

let mut car_roots = vec![];
let mut car_readers = vec![];
while let Some(path) = tasks.join_next().await {
let car_reader = fvm_ipld_car::CarReader::new(futures::io::BufReader::new(
async_fs::File::open(path??).await?,
))
.await?;
car_roots.extend_from_slice(car_reader.header.roots.as_slice());
car_readers.push(car_reader);
}

let car_writer = CarHeader::from(
car_readers
.iter()
.flat_map(|it| it.header.roots.iter())
.unique()
.cloned()
.collect::<Vec<_>>(),
);

let mut zstd_encoder = ZstdEncoder::with_quality(
async_fs::File::create(Path::new(&env::var("OUT_DIR")?).join("actor_bundles.car.zst"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also defined in src/daemon/bundle.rs. Let's have a single source of truth.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how to share the code, include_bytes!(concat!(env!("OUT_DIR"), "/actor_bundles.car.zst")); requires str literals and is evaluated at compile time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below code does not compile, only literals (like "foo", -42 and 3.14) can be passed to concat!()

const BUNDLE_NAME: &str = "/actor_bundles.car.zst";
pub const ACTOR_BUNDLES_CAR_ZST: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), BUNDLE_NAME));

.await?,
if cfg!(debug_assertions) {
async_compression::Level::Default
} else {
async_compression::Level::Precise(17)
},
);

car_writer
.write_stream_async(&mut zstd_encoder, &mut pin!(merge_car_readers(car_readers)))
.await?;

Ok(())
}

fn read_car_as_stream<R>(reader: CarReader<R>) -> impl Stream<Item = (Cid, Vec<u8>)>
where
R: AsyncRead + Send + Unpin,
{
futures::stream::unfold(reader, move |mut reader| async {
reader
.next_block()
.await
.expect("Failed to call CarReader::next_block")
.map(|b| ((b.cid, b.data), reader))
})
}

fn merge_car_readers<R>(readers: Vec<CarReader<R>>) -> impl Stream<Item = (Cid, Vec<u8>)>
where
R: AsyncRead + Send + Unpin,
{
futures::stream::iter(readers).flat_map(read_car_as_stream)
}

async fn download_bundle_if_needed(root: &Cid, url: &Url) -> anyhow::Result<PathBuf> {
if !ACTOR_BUNDLE_CACHE_DIR.is_dir() {
fs::create_dir_all(ACTOR_BUNDLE_CACHE_DIR.as_path())?;
}
let cached_car_path = ACTOR_BUNDLE_CACHE_DIR.join(format!("{root}.car"));
if cached_car_path.is_file() {
if let Ok(file) = async_fs::File::open(&cached_car_path).await {
if let Ok(true) = is_bundle_valid(root, BufReader::new(file)).await {
return Ok(cached_car_path);
}
}
}

let tmp = tempfile::NamedTempFile::new_in(ACTOR_BUNDLE_CACHE_DIR.as_path())?.into_temp_path();
{
let response = global_http_client().get(url.clone()).send().await?;
let mut writer = BufWriter::new(async_fs::File::create(&tmp).await?);
futures::io::copy(
response
.bytes_stream()
.map_err(|reqwest_error| io::Error::new(io::ErrorKind::Other, reqwest_error))
.into_async_read(),
&mut writer,
)
.await?;
writer.flush().await?;
}
if is_bundle_valid(root, BufReader::new(async_fs::File::open(&tmp).await?)).await? {
tmp.persist(&cached_car_path)?;
Ok(cached_car_path)
} else {
anyhow::bail!("Invalid bundle: {url}");
}
}

async fn is_bundle_valid<R>(root: &Cid, reader: R) -> anyhow::Result<bool>
where
R: AsyncRead + Send + Unpin,
{
is_bundle_car_valid(root, CarReader::new(reader).await?)
}

fn is_bundle_car_valid<R>(root: &Cid, car_reader: CarReader<R>) -> anyhow::Result<bool>
where
R: AsyncRead + Send + Unpin,
{
Ok(car_reader.header.roots.len() == 1 && &car_reader.header.roots[0] == root)
}

fn generate_protobuf_code() -> anyhow::Result<()> {
println!("cargo:rerun-if-changed=proto");

protobuf_codegen::Codegen::new()
.pure()
.cargo_out_dir(CARGO_OUT_DIR)
.inputs(get_inputs()?.as_slice())
.inputs(get_proto_inputs()?.as_slice())
.include(PROTO_DIR)
.customize(Customize::default().lite_runtime(true))
.run()?;
Ok(())
}

fn get_inputs() -> anyhow::Result<Vec<PathBuf>> {
fn get_proto_inputs() -> anyhow::Result<Vec<PathBuf>> {
let mut inputs = Vec::new();
for entry in WalkDir::new(PROTO_DIR).into_iter().flatten() {
let path = entry.path();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,25 @@ NV19.

The first step is to import the actor bundle into Forest. This is done by:

- adding the bundle to the `HeightInfos` struct in the network definitions files
(e.g.,
- adding the bundle cid to the `HeightInfos` struct in the network definitions
files (e.g.,
[calibnet](https://github.com/ChainSafe/forest/blob/main/src/networks/calibnet/mod.rs)).

```rust
HeightInfo {
height: Height::Hygge,
epoch: 322_354,
bundle: Some(ActorBundleInfo {
manifest: Cid::try_from("bafy2bzaced25ta3j6ygs34roprilbtb3f6mxifyfnm7z7ndquaruxzdq3y7lo").unwrap(),
url: Url::parse("https://github.com/filecoin-project/builtin-actors/releases/download/v10.0.0-rc.1/builtin-actors-calibrationnet.car").unwrap()
})
bundle: Some(Cid::try_from("bafy2bzaced25ta3j6ygs34roprilbtb3f6mxifyfnm7z7ndquaruxzdq3y7lo").unwrap()),
}
```

- adding the bundle cid and url to the `ACTOR_BUNDLES` in the `src/mod.rs`.

```rust
ActorBundleInfo{
manifest: Cid::try_from("bafy2bzacedbedgynklc4dgpyxippkxmba2mgtw7ecntoneclsvvl4klqwuyyy").unwrap(),
url: Url::parse("https://forest-continuous-integration.fra1.cdn.digitaloceanspaces.com/builtin-actors/calibnet/Shark.car").unwrap(),
},
```

### Implement the migration
Expand Down
15 changes: 3 additions & 12 deletions src/cli/subcommands/snapshot_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::blocks::{tipset_keys_json::TipsetKeysJson, Tipset, TipsetKeys};
use crate::chain::index::ChainIndex;
use crate::cli::subcommands::{cli_error_and_die, handle_rpc_err};
use crate::cli_shared::snapshot::{self, TrustedVendor};
use crate::daemon::bundle::load_bundles;
use crate::daemon::bundle::load_actor_bundles;
use crate::db::car::AnyCar;
use crate::fil_cns::composition as cns;
use crate::ipld::{recurse_links_hash, CidHashSet};
Expand Down Expand Up @@ -443,17 +443,8 @@ where

let last_epoch = ts.epoch() - epochs as i64;

// Bundles are required when doing state migrations. Download any bundles
// that may be necessary after `last_epoch`.
load_bundles(
last_epoch,
&Config {
chain: chain_config.clone(),
..Default::default()
},
&db,
)
.await?;
// Bundles are required when doing state migrations.
load_actor_bundles(&db).await?;

// Set proof parameter data dir and make sure the proofs are available
if cns::FETCH_PARAMS {
Expand Down
97 changes: 23 additions & 74 deletions src/daemon/bundle.rs
Original file line number Diff line number Diff line change
@@ -1,84 +1,33 @@
// Copyright 2019-2023 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::cli_shared::cli::Config;
use crate::networks::Height;
use crate::shim::clock::ChainEpoch;
use async_compression::futures::bufread::ZstdDecoder;
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use tokio::{fs::File, io::BufWriter};
use tracing::info;

pub async fn load_bundles(
epoch: ChainEpoch,
config: &Config,
db: &impl Blockstore,
) -> anyhow::Result<()> {
// collect bundles to load into the database.
let mut bundles = Vec::new();
for info in &config.chain.height_infos {
if epoch < config.chain.epoch(info.height) {
if let Some(bundle) = &info.bundle {
bundles.push((
bundle.manifest,
get_actors_bundle(config, info.height).await?,
));
}
}
}
pub async fn load_actor_bundles(db: &impl Blockstore) -> anyhow::Result<Vec<Cid>> {
pub const ACTOR_BUNDLES_CAR_ZST: &[u8] =
include_bytes!(concat!(env!("OUT_DIR"), "/actor_bundles.car.zst"));

for (manifest_cid, reader) in bundles {
let roots = fvm_ipld_car::load_car(db, reader).await?;
assert_eq!(
roots.len(),
1,
"expected one root when loading actors bundle"
);
info!("Loaded actors bundle with CID: {}", roots[0]);
anyhow::ensure!(
manifest_cid == roots[0],
"manifest cid in config '{manifest_cid}' does not match manifest cid from bundle '{}'",
roots[0]
);
}
Ok(())
Ok(fvm_ipld_car::load_car(
db,
ZstdDecoder::new(futures::io::BufReader::new(ACTOR_BUNDLES_CAR_ZST)),
)
.await?)
}

/// Downloads the actors bundle (if not already downloaded) and returns a reader
/// to it.
// TODO Get it from IPFS instead of GitHub.
pub async fn get_actors_bundle(
config: &Config,
height: Height,
) -> anyhow::Result<futures::io::BufReader<async_fs::File>> {
let bundle_info = config.chain.height_infos[height as usize]
.bundle
.as_ref()
.ok_or_else(|| anyhow::anyhow!("no bundle for epoch {}", config.chain.epoch(height)))?;

// This is the path where the actors bundle will be stored.
let bundle_path_dir = config
.client
.data_dir
.join("bundles")
.join(config.chain.network.to_string());

tokio::fs::create_dir_all(&bundle_path_dir).await?;
let bundle_path = bundle_path_dir.join(format!("bundle_{height}.car"));

// If the bundle already exists, return a reader to it.
if bundle_path.exists() {
let file = async_fs::File::open(bundle_path).await?;
return Ok(futures::io::BufReader::new(file));
#[cfg(test)]
mod tests {
use super::*;
use crate::build::ACTOR_BUNDLES;
use ahash::HashSet;
use pretty_assertions::assert_eq;

#[tokio::test]
async fn test_load_actor_bundles() {
let db = fvm_ipld_blockstore::MemoryBlockstore::new();
let roots = HashSet::from_iter(load_actor_bundles(&db).await.unwrap());
let roots_expected: HashSet<Cid> = ACTOR_BUNDLES.iter().map(|b| b.manifest).collect();
assert_eq!(roots, roots_expected);
}

// Otherwise, download it.
info!("Downloading actors bundle...");
let mut reader = crate::utils::net::reader(bundle_info.url.as_str()).await?;

let file = File::create(&bundle_path).await?;
let mut writer = BufWriter::new(file);
tokio::io::copy(&mut reader, &mut writer).await?;

let file = async_fs::File::open(bundle_path).await?;
Ok(futures::io::BufReader::new(file))
}
Loading