Skip to content

Commit

Permalink
fix(en): Fix reorg detector logic for dealing with last L1 batch (#1906)
Browse files Browse the repository at this point in the history
## What ❔

Fixes a reorg detector false positive error if the latest remote L1
batch doesn't have its state hash computed.

## Why ❔

This error was observed in CI and can theoretically appear on real envs.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli authored May 15, 2024
1 parent 1f90ca5 commit 3af5f5b
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 84 deletions.
37 changes: 26 additions & 11 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,11 +686,9 @@ async fn init_tasks(
}

async fn shutdown_components(
stop_sender: watch::Sender<bool>,
tasks: ManagedTasks,
healthcheck_handle: HealthCheckHandle,
) -> anyhow::Result<()> {
stop_sender.send(true).ok();
task::spawn_blocking(RocksDB::await_rocksdb_termination)
.await
.context("error waiting for RocksDB instances to drop")?;
Expand Down Expand Up @@ -864,7 +862,8 @@ async fn run_node(
) -> anyhow::Result<()> {
tracing::warn!("The external node is in the alpha phase, and should be used with caution.");
tracing::info!("Started the external node");
let (stop_sender, stop_receiver) = watch::channel(false);
let (stop_sender, mut stop_receiver) = watch::channel(false);
let stop_sender = Arc::new(stop_sender);

let app_health = Arc::new(AppHealthCheck::new(
config.optional.healthcheck_slow_time_limit(),
Expand Down Expand Up @@ -931,6 +930,16 @@ async fn run_node(
)
.await?;
let sigint_receiver = env.setup_sigint_handler();
// Spawn reacting to signals in a separate task so that the node is responsive to signals right away
// (e.g., during the initial reorg detection).
tokio::spawn({
let stop_sender = stop_sender.clone();
async move {
sigint_receiver.await.ok();
tracing::info!("Stop signal received, shutting down");
stop_sender.send_replace(true);
}
});

// Revert the storage if needed.
let mut reverter = BlockReverter::new(NodeRole::External, connection_pool.clone());
Expand All @@ -946,8 +955,15 @@ async fn run_node(
// the node lifecycle, the node will exit the same way as it does with any other critical error,
// and would restart. Then, on the 2nd launch reorg would be detected here, then processed and the node
// will be able to operate normally afterwards.
match reorg_detector.check_consistency().await {
Ok(()) => {}
match reorg_detector.run_once(stop_receiver.clone()).await {
Ok(()) if *stop_receiver.borrow() => {
tracing::info!("Stop signal received during initial reorg detection; shutting down");
healthcheck_handle.stop().await;
return Ok(());
}
Ok(()) => {
tracing::info!("Successfully checked no reorg compared to the main node");
}
Err(reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => {
tracing::info!("Reverting to l1 batch number {last_correct_l1_batch}");
reverter.roll_back(last_correct_l1_batch).await?;
Expand Down Expand Up @@ -999,15 +1015,14 @@ async fn run_node(

let mut tasks = ManagedTasks::new(task_handles);
tokio::select! {
_ = tasks.wait_single() => {},
_ = sigint_receiver => {
tracing::info!("Stop signal received, shutting down");
},
() = tasks.wait_single() => {},
_ = stop_receiver.changed() => {},
};

// Reaching this point means that either some actor exited unexpectedly or we received a stop signal.
// Broadcast the stop signal to all actors and exit.
shutdown_components(stop_sender, tasks, healthcheck_handle).await?;
// Broadcast the stop signal (in case it wasn't broadcast previously) to all actors and exit.
stop_sender.send_replace(true);
shutdown_components(tasks, healthcheck_handle).await?;
tracing::info!("Stopped");
Ok(())
}
120 changes: 92 additions & 28 deletions core/bin/external_node/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use zksync_types::{
api, ethabi, fee_model::FeeParams, Address, L1BatchNumber, L2BlockNumber, ProtocolVersionId,
H256, U64,
};
use zksync_web3_decl::client::MockClient;
use zksync_web3_decl::{client::MockClient, jsonrpsee::core::ClientError};

use super::*;

Expand Down Expand Up @@ -96,6 +96,34 @@ fn expected_health_components(components: &ComponentsToRun) -> Vec<&'static str>
output
}

fn mock_eth_client(diamond_proxy_addr: Address) -> MockEthereum {
MockEthereum::default().with_call_handler(move |call, _| {
tracing::info!("L1 call: {call:?}");
if call.to == Some(diamond_proxy_addr) {
let call_signature = &call.data.as_ref().unwrap().0[..4];
let contract = zksync_contracts::hyperchain_contract();
let pricing_mode_sig = contract
.function("getPubdataPricingMode")
.unwrap()
.short_signature();
let protocol_version_sig = contract
.function("getProtocolVersion")
.unwrap()
.short_signature();
match call_signature {
sig if sig == pricing_mode_sig => {
return ethabi::Token::Uint(0.into()); // "rollup" mode encoding
}
sig if sig == protocol_version_sig => {
return ethabi::Token::Uint((ProtocolVersionId::latest() as u16).into())
}
_ => { /* unknown call; panic below */ }
}
}
panic!("Unexpected L1 call: {call:?}");
})
}

#[test_casing(5, ["all", "core", "api", "tree", "tree,tree_api"])]
#[tokio::test]
#[tracing::instrument] // Add args to the test logs
Expand Down Expand Up @@ -159,33 +187,7 @@ async fn external_node_basics(components_str: &'static str) {
.method("en_whitelistedTokensForAA", || Ok([] as [Address; 0]))
.build();
let l2_client = Box::new(l2_client);

let eth_client = MockEthereum::default().with_call_handler(move |call, _| {
tracing::info!("L1 call: {call:?}");
if call.to == Some(diamond_proxy_addr) {
let call_signature = &call.data.as_ref().unwrap().0[..4];
let contract = zksync_contracts::hyperchain_contract();
let pricing_mode_sig = contract
.function("getPubdataPricingMode")
.unwrap()
.short_signature();
let protocol_version_sig = contract
.function("getProtocolVersion")
.unwrap()
.short_signature();
match call_signature {
sig if sig == pricing_mode_sig => {
return ethabi::Token::Uint(0.into()); // "rollup" mode encoding
}
sig if sig == protocol_version_sig => {
return ethabi::Token::Uint((ProtocolVersionId::latest() as u16).into())
}
_ => { /* unknown call; panic below */ }
}
}
panic!("Unexpected L1 call: {call:?}");
});
let eth_client = Box::new(eth_client);
let eth_client = Box::new(mock_eth_client(diamond_proxy_addr));

let (env, env_handles) = TestEnvironment::new();
let node_handle = tokio::spawn(async move {
Expand Down Expand Up @@ -242,3 +244,65 @@ async fn external_node_basics(components_str: &'static str) {
assert_matches!(component_health.status(), HealthStatus::ShutDown);
}
}

#[tokio::test]
async fn node_reacts_to_stop_signal_during_initial_reorg_detection() {
let _guard = vlog::ObservabilityBuilder::new().build(); // Enable logging to simplify debugging
let temp_dir = tempfile::TempDir::new().unwrap();

let connection_pool = ConnectionPool::test_pool().await;
let singleton_pool_builder = ConnectionPool::singleton(connection_pool.database_url().clone());
let mut storage = connection_pool.connection().await.unwrap();
insert_genesis_batch(&mut storage, &GenesisParams::mock())
.await
.unwrap();
drop(storage);

let opt = Cli {
revert_pending_l1_batch: false,
enable_consensus: false,
components: "core".parse().unwrap(),
};
let mut config = ExternalNodeConfig::mock(&temp_dir, &connection_pool);
if opt.components.0.contains(&Component::TreeApi) {
config.tree_component.api_port = Some(0);
}

let l2_client = MockClient::builder(L2::default())
.method("eth_chainId", || Ok(U64::from(270)))
.method("zks_L1ChainId", || Ok(U64::from(9)))
.method("zks_L1BatchNumber", || {
Err::<(), _>(ClientError::RequestTimeout)
})
.method("eth_blockNumber", || {
Err::<(), _>(ClientError::RequestTimeout)
})
.method("zks_getFeeParams", || Ok(FeeParams::sensible_v1_default()))
.method("en_whitelistedTokensForAA", || Ok([] as [Address; 0]))
.build();
let l2_client = Box::new(l2_client);
let diamond_proxy_addr = config.remote.diamond_proxy_addr;
let eth_client = Box::new(mock_eth_client(diamond_proxy_addr));

let (env, env_handles) = TestEnvironment::new();
let mut node_handle = tokio::spawn(async move {
run_node(
env,
&opt,
&config,
connection_pool,
singleton_pool_builder,
l2_client,
eth_client,
)
.await
});

// Check that the node doesn't stop on its own.
let timeout_result = tokio::time::timeout(Duration::from_millis(50), &mut node_handle).await;
assert_matches!(timeout_result, Err(tokio::time::error::Elapsed { .. }));

// Send a stop signal and check that the node reacts to it.
env_handles.sigint_sender.send(()).unwrap();
node_handle.await.unwrap().unwrap();
}
Loading

0 comments on commit 3af5f5b

Please sign in to comment.