Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(rpc): rpc unit tests with db snapshot #5034

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ where
&self
.settings
.require_obj::<TipsetKey>(HEAD_KEY)
.expect("failed to load heaviest tipset"),
.expect("failed to load heaviest tipset key"),
)
.expect("failed to load heaviest tipset")
}
Expand Down
13 changes: 8 additions & 5 deletions src/chain/store/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::beacon::{BeaconEntry, IGNORE_DRAND_VAR};
use crate::blocks::{Tipset, TipsetKey};
use crate::metrics;
use crate::shim::clock::ChainEpoch;
use crate::utils::misc::env::is_env_truthy;
use fvm_ipld_blockstore::Blockstore;
use itertools::Itertools;
use lru::LruCache;
Expand Down Expand Up @@ -47,11 +48,13 @@ impl<DB: Blockstore> ChainIndex<DB> {
/// Loads a tipset from memory given the tipset keys and cache. Semantically
/// identical to [`Tipset::load`] but the result is cached.
pub fn load_tipset(&self, tsk: &TipsetKey) -> Result<Option<Arc<Tipset>>, Error> {
if let Some(ts) = self.ts_cache.lock().get(tsk) {
metrics::LRU_CACHE_HIT
.get_or_create(&metrics::values::TIPSET)
.inc();
return Ok(Some(ts.clone()));
if !is_env_truthy("FOREST_TIPSET_CACHE_DISABLED") {
if let Some(ts) = self.ts_cache.lock().get(tsk) {
metrics::LRU_CACHE_HIT
.get_or_create(&metrics::values::TIPSET)
.inc();
return Ok(Some(ts.clone()));
}
}

let ts_opt = Tipset::load(&self.db, tsk)?.map(Arc::new);
Expand Down
3 changes: 3 additions & 0 deletions src/cli_shared/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ fn parse_content_disposition(value: &reqwest::header::HeaderValue) -> Option<Str

/// Download the file at `url` with a private HTTP client, returning the path to the downloaded file
async fn download_http(url: &Url, directory: &Path, filename: &str) -> anyhow::Result<PathBuf> {
if !directory.is_dir() {
std::fs::create_dir_all(directory)?;
}
let dst_path = directory.join(filename);
let destination = dst_path.display();
event!(target: "forest::snapshot", tracing::Level::INFO, %url, %destination, "downloading snapshot");
Expand Down
39 changes: 39 additions & 0 deletions src/db/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use itertools::Itertools;
use parking_lot::RwLock;
use std::ops::Deref;

use super::{EthMappingsStore, SettingsStore};

Expand All @@ -22,6 +23,44 @@ pub struct MemoryDB {
eth_mappings_db: RwLock<HashMap<EthHash, Vec<u8>>>,
}

impl MemoryDB {
#[allow(dead_code)]
pub fn serialize(&self) -> anyhow::Result<Vec<u8>> {
let blockchain_db = self.blockchain_db.read();
let blockchain_persistent_db = self.blockchain_persistent_db.read();
let settings_db = self.settings_db.read();
let eth_mappings_db = self.eth_mappings_db.read();
let tuple = (
blockchain_db.deref(),
blockchain_persistent_db.deref(),
settings_db.deref(),
eth_mappings_db.deref(),
);
Ok(fvm_ipld_encoding::to_vec(&tuple)?)
}

pub fn deserialize_from(bytes: &[u8]) -> anyhow::Result<Self> {
let (blockchain_db, blockchain_persistent_db, settings_db, eth_mappings_db) =
fvm_ipld_encoding::from_slice(bytes)?;
Ok(Self {
blockchain_db: RwLock::new(blockchain_db),
blockchain_persistent_db: RwLock::new(blockchain_persistent_db),
settings_db: RwLock::new(settings_db),
eth_mappings_db: RwLock::new(eth_mappings_db),
})
}

pub fn deserialize_from_legacy(bytes: &[u8]) -> anyhow::Result<Self> {
let (blockchain_db, settings_db, eth_mappings_db) = fvm_ipld_encoding::from_slice(bytes)?;
Ok(Self {
blockchain_db: RwLock::new(blockchain_db),
blockchain_persistent_db: Default::default(),
settings_db: RwLock::new(settings_db),
eth_mappings_db: RwLock::new(eth_mappings_db),
})
}
}

impl GarbageCollectable<CidHashSet> for MemoryDB {
fn get_keys(&self) -> anyhow::Result<CidHashSet> {
let mut set = CidHashSet::new();
Expand Down
402 changes: 206 additions & 196 deletions src/rpc/mod.rs

Large diffs are not rendered by default.

26 changes: 16 additions & 10 deletions src/rpc/reflect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,21 @@ pub trait RpcMethodExt<const ARITY: usize>: RpcMethod<ARITY> {
)),
}
}

fn parse_params(
params_raw: Option<impl AsRef<str>>,
calling_convention: ParamStructure,
) -> anyhow::Result<Self::Params> {
Ok(Self::Params::parse(
params_raw
.map(|s| serde_json::from_str(s.as_ref()))
.transpose()?,
Self::PARAM_NAMES,
calling_convention,
Self::N_REQUIRED_PARAMS,
)?)
}

/// Generate a full `OpenRPC` method definition for this endpoint.
fn openrpc<'de>(
gen: &mut SchemaGenerator,
Expand Down Expand Up @@ -214,17 +229,8 @@ pub trait RpcMethodExt<const ARITY: usize>: RpcMethod<ARITY> {
);

module.register_async_method(Self::NAME, move |params, ctx, _extensions| async move {
let raw = params
.as_str()
.map(serde_json::from_str)
.transpose()
let params = Self::parse_params(params.as_str(), calling_convention)
.map_err(|e| Error::invalid_params(e, None))?;
let params = Self::Params::parse(
raw,
Self::PARAM_NAMES,
calling_convention,
Self::N_REQUIRED_PARAMS,
)?;
let ok = Self::handle(ctx, params).await?;
Result::<_, jsonrpsee::types::ErrorObjectOwned>::Ok(ok.into_lotus_json())
})
Expand Down
2 changes: 1 addition & 1 deletion src/shim/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ thread_local! {
///
/// The thread-local network variable is initialized to the value of the global network. This global
/// network variable is set once when Forest has figured out which network it is using.
pub struct CurrentNetwork();
pub struct CurrentNetwork;
impl CurrentNetwork {
pub fn get() -> Network {
FromPrimitive::from_u8(LOCAL_NETWORK.with(|ident| ident.load(Ordering::Acquire)))
Expand Down
13 changes: 13 additions & 0 deletions src/tool/subcommands/api_cmd.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

mod test_snapshot;

use crate::blocks::{ElectionProof, Ticket, Tipset};
use crate::db::car::ManyCar;
use crate::eth::{EthChainId as EthChainIdType, SAFE_EPOCH_DELAY};
Expand Down Expand Up @@ -152,6 +154,10 @@ pub enum ApiCommands {
#[arg(long)]
include_ignored: bool,
},
Test {
#[arg(num_args = 1.., required = true)]
files: Vec<PathBuf>,
},
}

impl ApiCommands {
Expand Down Expand Up @@ -260,6 +266,13 @@ impl ApiCommands {
println!();
}
}
Self::Test { files } => {
for path in files {
print!("Running RPC test with snapshot {} ...", path.display());
test_snapshot::run_test_from_snapshot(&path).await?;
println!(" Success");
}
}
}
Ok(())
}
Expand Down
162 changes: 162 additions & 0 deletions src/tool/subcommands/api_cmd/test_snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::{
chain::ChainStore,
chain_sync::{network_context::SyncNetworkContext, SyncConfig, SyncStage},
db::MemoryDB,
genesis::{get_network_name_from_genesis, read_genesis_header},
libp2p::{NetworkMessage, PeerManager},
lotus_json::HasLotusJson,
message_pool::{MessagePool, MpoolRpcProvider},
networks::ChainConfig,
rpc::{
eth::filter::EthEventHandler, RPCState, RpcCallSnapshot, RpcMethod as _, RpcMethodExt as _,
},
shim::address::{CurrentNetwork, Network},
state_manager::StateManager,
KeyStore, KeyStoreConfig,
};
use base64::prelude::*;
use openrpc_types::ParamStructure;
use parking_lot::RwLock;
use std::{path::Path, sync::Arc};
use tokio::{sync::mpsc, task::JoinSet};

pub async fn run_test_from_snapshot(path: &Path) -> anyhow::Result<()> {
CurrentNetwork::set_global(Network::Testnet);
let mut run = false;
let snapshot_bytes = std::fs::read(path)?;
let snapshot_bytes = if let Ok(bytes) = zstd::decode_all(snapshot_bytes.as_slice()) {
bytes
} else {
snapshot_bytes
};
let snapshot: RpcCallSnapshot = serde_json::from_slice(snapshot_bytes.as_slice())?;
let db_bytes = BASE64_STANDARD.decode(&snapshot.db)?;
let db = Arc::new(match MemoryDB::deserialize_from(db_bytes.as_slice()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use CAR files rather than serialized memory-dbs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's doable. The only downside is that CAR DB is not capable of storing non-content-addressable data(e.g. HEAD -> head_tsk) thus at least 2 files(CAR db, and setting DB+request+desired response) are required for a test case, while with the current solution a single test-snap file is sufficient.

Ok(db) => db,
Err(_) => MemoryDB::deserialize_from_legacy(db_bytes.as_slice())?,
});
let chain_config = Arc::new(ChainConfig::calibnet());
let (ctx, _, _) = ctx(db, chain_config).await?;
let params_raw = if let Some(params) = &snapshot.params {
Some(serde_json::to_string(params)?)
} else {
None
};

macro_rules! run_test {
($ty:ty) => {
if snapshot.name.as_str() == <$ty>::NAME {
let params = <$ty>::parse_params(params_raw.clone(), ParamStructure::Either)?;
let result = <$ty>::handle(ctx.clone(), params).await?;
assert_eq!(snapshot.response, result.into_lotus_json_value()?);
run = true;
}
};
}

crate::for_each_method!(run_test);

assert!(run, "RPC method not found");

Ok(())
}

async fn ctx(
db: Arc<MemoryDB>,
chain_config: Arc<ChainConfig>,
) -> anyhow::Result<(
Arc<RPCState<MemoryDB>>,
flume::Receiver<NetworkMessage>,
tokio::sync::mpsc::Receiver<()>,
)> {
let (network_send, network_rx) = flume::bounded(5);
let (tipset_send, _) = flume::bounded(5);
let sync_config = Arc::new(SyncConfig::default());
let genesis_header =
read_genesis_header(None, chain_config.genesis_bytes(&db).await?.as_deref(), &db).await?;

let chain_store = Arc::new(
ChainStore::new(
db.clone(),
db.clone(),
db,
chain_config.clone(),
genesis_header.clone(),
)
.unwrap(),
);

let state_manager =
Arc::new(StateManager::new(chain_store.clone(), chain_config, sync_config).unwrap());
let network_name = get_network_name_from_genesis(&genesis_header, &state_manager)?;
let message_pool = MessagePool::new(
MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()),
network_name.clone(),
network_send.clone(),
Default::default(),
state_manager.chain_config().clone(),
&mut JoinSet::new(),
)?;

let peer_manager = Arc::new(PeerManager::default());
let sync_network_context =
SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned());
let (shutdown, shutdown_recv) = mpsc::channel(1);
let rpc_state = Arc::new(RPCState {
state_manager,
keystore: Arc::new(tokio::sync::RwLock::new(KeyStore::new(
KeyStoreConfig::Memory,
)?)),
mpool: Arc::new(message_pool),
bad_blocks: Default::default(),
sync_state: Arc::new(RwLock::new(Default::default())),
eth_event_handler: Arc::new(EthEventHandler::new()),
sync_network_context,
network_name,
start_time: chrono::Utc::now(),
shutdown,
tipset_send,
});
rpc_state.sync_state.write().set_stage(SyncStage::Idle);
Ok((rpc_state, network_rx, shutdown_recv))
}

#[cfg(test)]
mod tests {
use super::*;
use crate::daemon::db_util::download_to;
use itertools::Itertools as _;
use url::Url;

#[tokio::test]
async fn rpc_regression_tests() {
let urls = include_str!("test_snapshots.txt")
.trim()
.split("\n")
.filter_map(|n| {
Url::parse(
format!(
"https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/rpc_test/{n}"
)
.as_str(),
)
.ok()
})
.collect_vec();
for url in urls {
print!("Testing {url} ...");
let tmp_dir = tempfile::tempdir().unwrap();
let tmp = tempfile::NamedTempFile::new_in(&tmp_dir)
.unwrap()
.into_temp_path();
println!("start downloading at {}", tmp.display());
download_to(&url, &tmp).await.unwrap();
println!("done downloading {}", tmp.display());
run_test_from_snapshot(&tmp).await.unwrap();
println!(" succeeded.");
}
}
}
2 changes: 2 additions & 0 deletions src/tool/subcommands/api_cmd/test_snapshots.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
f3_gettipsetbyepoch_1730952732441851.json.zst
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The list is to be expanded.

filecoin_statelistactors_1730953255032189.json.zst
Loading