Skip to content

Commit

Permalink
test(rpc): rpc unit tests with db snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
hanabi1224 committed Dec 4, 2024
1 parent 816fa80 commit f5d98d7
Show file tree
Hide file tree
Showing 10 changed files with 450 additions and 213 deletions.
2 changes: 1 addition & 1 deletion src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ where
&self
.settings
.require_obj::<TipsetKey>(HEAD_KEY)
.expect("failed to load heaviest tipset"),
.expect("failed to load heaviest tipset key"),
)
.expect("failed to load heaviest tipset")
}
Expand Down
13 changes: 8 additions & 5 deletions src/chain/store/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::beacon::{BeaconEntry, IGNORE_DRAND_VAR};
use crate::blocks::{Tipset, TipsetKey};
use crate::metrics;
use crate::shim::clock::ChainEpoch;
use crate::utils::misc::env::is_env_truthy;
use fvm_ipld_blockstore::Blockstore;
use itertools::Itertools;
use lru::LruCache;
Expand Down Expand Up @@ -47,11 +48,13 @@ impl<DB: Blockstore> ChainIndex<DB> {
/// Loads a tipset from memory given the tipset keys and cache. Semantically
/// identical to [`Tipset::load`] but the result is cached.
pub fn load_tipset(&self, tsk: &TipsetKey) -> Result<Option<Arc<Tipset>>, Error> {
if let Some(ts) = self.ts_cache.lock().get(tsk) {
metrics::LRU_CACHE_HIT
.get_or_create(&metrics::values::TIPSET)
.inc();
return Ok(Some(ts.clone()));
if !is_env_truthy("FOREST_TIPSET_CACHE_DISABLED") {
if let Some(ts) = self.ts_cache.lock().get(tsk) {
metrics::LRU_CACHE_HIT
.get_or_create(&metrics::values::TIPSET)
.inc();
return Ok(Some(ts.clone()));
}
}

let ts_opt = Tipset::load(&self.db, tsk)?.map(Arc::new);
Expand Down
3 changes: 3 additions & 0 deletions src/cli_shared/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ fn parse_content_disposition(value: &reqwest::header::HeaderValue) -> Option<Str

/// Download the file at `url` with a private HTTP client, returning the path to the downloaded file
async fn download_http(url: &Url, directory: &Path, filename: &str) -> anyhow::Result<PathBuf> {
if !directory.is_dir() {
std::fs::create_dir_all(directory)?;
}
let dst_path = directory.join(filename);
let destination = dst_path.display();
event!(target: "forest::snapshot", tracing::Level::INFO, %url, %destination, "downloading snapshot");
Expand Down
39 changes: 39 additions & 0 deletions src/db/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use itertools::Itertools;
use parking_lot::RwLock;
use std::ops::Deref;

use super::{EthMappingsStore, SettingsStore};

Expand All @@ -22,6 +23,44 @@ pub struct MemoryDB {
eth_mappings_db: RwLock<HashMap<EthHash, Vec<u8>>>,
}

impl MemoryDB {
#[allow(dead_code)]
pub fn serialize(&self) -> anyhow::Result<Vec<u8>> {
let blockchain_db = self.blockchain_db.read();
let blockchain_persistent_db = self.blockchain_persistent_db.read();
let settings_db = self.settings_db.read();
let eth_mappings_db = self.eth_mappings_db.read();
let tuple = (
blockchain_db.deref(),
blockchain_persistent_db.deref(),
settings_db.deref(),
eth_mappings_db.deref(),
);
Ok(fvm_ipld_encoding::to_vec(&tuple)?)
}

pub fn deserialize_from(bytes: &[u8]) -> anyhow::Result<Self> {
let (blockchain_db, blockchain_persistent_db, settings_db, eth_mappings_db) =
fvm_ipld_encoding::from_slice(bytes)?;
Ok(Self {
blockchain_db: RwLock::new(blockchain_db),
blockchain_persistent_db: RwLock::new(blockchain_persistent_db),
settings_db: RwLock::new(settings_db),
eth_mappings_db: RwLock::new(eth_mappings_db),
})
}

pub fn deserialize_from_legacy(bytes: &[u8]) -> anyhow::Result<Self> {
let (blockchain_db, settings_db, eth_mappings_db) = fvm_ipld_encoding::from_slice(bytes)?;
Ok(Self {
blockchain_db: RwLock::new(blockchain_db),
blockchain_persistent_db: Default::default(),
settings_db: RwLock::new(settings_db),
eth_mappings_db: RwLock::new(eth_mappings_db),
})
}
}

impl GarbageCollectable<CidHashSet> for MemoryDB {
fn get_keys(&self) -> anyhow::Result<CidHashSet> {
let mut set = CidHashSet::new();
Expand Down
402 changes: 206 additions & 196 deletions src/rpc/mod.rs

Large diffs are not rendered by default.

26 changes: 16 additions & 10 deletions src/rpc/reflect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,21 @@ pub trait RpcMethodExt<const ARITY: usize>: RpcMethod<ARITY> {
)),
}
}

fn parse_params(
params_raw: Option<impl AsRef<str>>,
calling_convention: ParamStructure,
) -> anyhow::Result<Self::Params> {
Ok(Self::Params::parse(
params_raw
.map(|s| serde_json::from_str(s.as_ref()))
.transpose()?,
Self::PARAM_NAMES,
calling_convention,
Self::N_REQUIRED_PARAMS,
)?)
}

/// Generate a full `OpenRPC` method definition for this endpoint.
fn openrpc<'de>(
gen: &mut SchemaGenerator,
Expand Down Expand Up @@ -213,17 +228,8 @@ pub trait RpcMethodExt<const ARITY: usize>: RpcMethod<ARITY> {
);

module.register_async_method(Self::NAME, move |params, ctx, _extensions| async move {
let raw = params
.as_str()
.map(serde_json::from_str)
.transpose()
let params = Self::parse_params(params.as_str(), calling_convention)
.map_err(|e| Error::invalid_params(e, None))?;
let params = Self::Params::parse(
raw,
Self::PARAM_NAMES,
calling_convention,
Self::N_REQUIRED_PARAMS,
)?;
let ok = Self::handle(ctx, params).await?;
Result::<_, jsonrpsee::types::ErrorObjectOwned>::Ok(ok.into_lotus_json())
})
Expand Down
2 changes: 1 addition & 1 deletion src/shim/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ thread_local! {
///
/// The thread-local network variable is initialized to the value of the global network. This global
/// network variable is set once when Forest has figured out which network it is using.
pub struct CurrentNetwork();
pub struct CurrentNetwork;
impl CurrentNetwork {
pub fn get() -> Network {
FromPrimitive::from_u8(LOCAL_NETWORK.with(|ident| ident.load(Ordering::Acquire)))
Expand Down
13 changes: 13 additions & 0 deletions src/tool/subcommands/api_cmd.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

mod test_snapshot;

use crate::blocks::{ElectionProof, Ticket, Tipset};
use crate::db::car::ManyCar;
use crate::eth::{EthChainId as EthChainIdType, SAFE_EPOCH_DELAY};
Expand Down Expand Up @@ -149,6 +151,10 @@ pub enum ApiCommands {
#[arg(long)]
include_ignored: bool,
},
Test {
#[arg(num_args = 1.., required = true)]
files: Vec<PathBuf>,
},
}

impl ApiCommands {
Expand Down Expand Up @@ -255,6 +261,13 @@ impl ApiCommands {
println!();
}
}
Self::Test { files } => {
for path in files {
print!("Running RPC test with snapshot {} ...", path.display());
test_snapshot::run_test_from_snapshot(&path).await?;
println!(" Success");
}
}
}
Ok(())
}
Expand Down
161 changes: 161 additions & 0 deletions src/tool/subcommands/api_cmd/test_snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::filter::EthEventHandler;
use crate::{
chain::ChainStore,
chain_sync::{network_context::SyncNetworkContext, SyncConfig, SyncStage},
db::MemoryDB,
genesis::{get_network_name_from_genesis, read_genesis_header},
libp2p::{NetworkMessage, PeerManager},
lotus_json::HasLotusJson,
message_pool::{MessagePool, MpoolRpcProvider},
networks::ChainConfig,
rpc::{RPCState, RpcCallSnapshot, RpcMethod as _, RpcMethodExt as _},
shim::address::{CurrentNetwork, Network},
state_manager::StateManager,
KeyStore, KeyStoreConfig,
};
use base64::prelude::*;
use openrpc_types::ParamStructure;
use parking_lot::RwLock;
use std::{path::Path, sync::Arc};
use tokio::{sync::mpsc, task::JoinSet};

pub async fn run_test_from_snapshot(path: &Path) -> anyhow::Result<()> {
CurrentNetwork::set_global(Network::Testnet);
let mut run = false;
let snapshot_bytes = std::fs::read(path)?;
let snapshot_bytes = if let Ok(bytes) = zstd::decode_all(snapshot_bytes.as_slice()) {
bytes
} else {
snapshot_bytes
};
let snapshot: RpcCallSnapshot = serde_json::from_slice(snapshot_bytes.as_slice())?;
let db_bytes = BASE64_STANDARD.decode(&snapshot.db)?;
let db = Arc::new(match MemoryDB::deserialize_from(db_bytes.as_slice()) {
Ok(db) => db,
Err(_) => MemoryDB::deserialize_from_legacy(db_bytes.as_slice())?,
});
let chain_config = Arc::new(ChainConfig::calibnet());
let (ctx, _, _) = ctx(db, chain_config).await?;
let params_raw = if let Some(params) = &snapshot.params {
Some(serde_json::to_string(params)?)
} else {
None
};

macro_rules! run_test {
($ty:ty) => {
if snapshot.name.as_str() == <$ty>::NAME {
let params = <$ty>::parse_params(params_raw.clone(), ParamStructure::Either)?;
let result = <$ty>::handle(ctx.clone(), params).await?;
assert_eq!(snapshot.response, result.into_lotus_json_value()?);
run = true;
}
};
}

crate::for_each_method!(run_test);

assert!(run, "RPC method not found");

Ok(())
}

async fn ctx(
db: Arc<MemoryDB>,
chain_config: Arc<ChainConfig>,
) -> anyhow::Result<(
Arc<RPCState<MemoryDB>>,
flume::Receiver<NetworkMessage>,
tokio::sync::mpsc::Receiver<()>,
)> {
let (network_send, network_rx) = flume::bounded(5);
let (tipset_send, _) = flume::bounded(5);
let sync_config = Arc::new(SyncConfig::default());
let genesis_header =
read_genesis_header(None, chain_config.genesis_bytes(&db).await?.as_deref(), &db).await?;

let chain_store = Arc::new(
ChainStore::new(
db.clone(),
db.clone(),
db,
chain_config.clone(),
genesis_header.clone(),
)
.unwrap(),
);

let state_manager =
Arc::new(StateManager::new(chain_store.clone(), chain_config, sync_config).unwrap());
let network_name = get_network_name_from_genesis(&genesis_header, &state_manager)?;
let message_pool = MessagePool::new(
MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()),
network_name.clone(),
network_send.clone(),
Default::default(),
state_manager.chain_config().clone(),
&mut JoinSet::new(),
)?;

let peer_manager = Arc::new(PeerManager::default());
let sync_network_context =
SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned());
let (shutdown, shutdown_recv) = mpsc::channel(1);
let rpc_state = Arc::new(RPCState {
state_manager,
keystore: Arc::new(tokio::sync::RwLock::new(KeyStore::new(
KeyStoreConfig::Memory,
)?)),
mpool: Arc::new(message_pool),
bad_blocks: Default::default(),
sync_state: Arc::new(RwLock::new(Default::default())),
eth_event_handler: Arc::new(EthEventHandler::new()),
sync_network_context,
network_name,
start_time: chrono::Utc::now(),
shutdown,
tipset_send,
});
rpc_state.sync_state.write().set_stage(SyncStage::Idle);
Ok((rpc_state, network_rx, shutdown_recv))
}

#[cfg(test)]
mod tests {
use super::*;
use crate::daemon::db_util::download_to;
use itertools::Itertools as _;
use url::Url;

#[tokio::test]
async fn rpc_regression_tests() {
let urls = include_str!("test_snapshots.txt")
.trim()
.split("\n")
.filter_map(|n| {
Url::parse(
format!(
"https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/rpc_test/{n}"
)
.as_str(),
)
.ok()
})
.collect_vec();
for url in urls {
print!("Testing {url} ...");
let tmp_dir = tempfile::tempdir().unwrap();
let tmp = tempfile::NamedTempFile::new_in(&tmp_dir)
.unwrap()
.into_temp_path();
println!("start downloading at {}", tmp.display());
download_to(&url, &tmp).await.unwrap();
println!("done downloading {}", tmp.display());
run_test_from_snapshot(&tmp).await.unwrap();
println!(" succeeded.");
}
}
}
2 changes: 2 additions & 0 deletions src/tool/subcommands/api_cmd/test_snapshots.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
f3_gettipsetbyepoch_1730952732441851.json.zst
filecoin_statelistactors_1730953255032189.json.zst

0 comments on commit f5d98d7

Please sign in to comment.