Skip to content

Commit

Permalink
feat(en): Fetch L1 batch root hashes from main node (#1923)
Browse files Browse the repository at this point in the history
## What ❔

Implements a PoC for the treeless node mode, i.e., a mode in which a
node doesn't maintain a Merkle tree locally, but rather downloads
necessary data from a trusted peer (i.e., the main node). This extends
to the case when a node is configured to start a Merkle tree after some
time or immediately; the root hashes produced by the tree (which are
expected to be slower than the tree data fetcher) will be used to
double-check fetched hashes. Also, these hashes are checked by the
consistency checker against L1 commitment transactions.

## Why ❔

- Merkle tree recovery takes more than 50% of overall recovery process
on the mainnet, so by introducing this mode, a recovering node can start
operating a couple of hours earlier.
- Merkle tree introduces non-trivial operational requirements (~2.5 TB
disk space / ~16 GB RAM on the mainnet), so by not running the tree,
these requirements can be reduced.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.

fix(state-keeper): Fix hanging up during shutdown if L1 batch root
hashes are not being computed
  • Loading branch information
slowli authored May 16, 2024
1 parent 966a278 commit 72a3571
Show file tree
Hide file tree
Showing 18 changed files with 938 additions and 134 deletions.
15 changes: 14 additions & 1 deletion core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use zksync_node_consensus as consensus;
use zksync_node_db_pruner::{DbPruner, DbPrunerConfig};
use zksync_node_fee_model::l1_gas_price::MainNodeFeeParamsFetcher;
use zksync_node_sync::{
batch_status_updater::BatchStatusUpdater, external_io::ExternalIO, ActionQueue, SyncState,
batch_status_updater::BatchStatusUpdater, external_io::ExternalIO,
tree_data_fetcher::TreeDataFetcher, ActionQueue, SyncState,
};
use zksync_reorg_detector::ReorgDetector;
use zksync_state::{PostgresStorageCaches, RocksdbStorageOptions};
Expand Down Expand Up @@ -625,6 +626,16 @@ async fn init_tasks(
None
};

if components.contains(&Component::TreeFetcher) {
tracing::warn!(
"Running tree data fetcher (allows a node to operate w/o a Merkle tree or w/o waiting the tree to catch up). \
This is an experimental feature; do not use unless you know what you're doing"
);
let fetcher = TreeDataFetcher::new(main_node_client.clone(), connection_pool.clone());
app_health.insert_component(fetcher.health_check())?;
task_handles.push(tokio::spawn(fetcher.run(stop_receiver.clone())));
}

let fee_params_fetcher = Arc::new(MainNodeFeeParamsFetcher::new(main_node_client.clone()));

let sync_state = if components.contains(&Component::Core) {
Expand Down Expand Up @@ -722,6 +733,7 @@ pub enum Component {
WsApi,
Tree,
TreeApi,
TreeFetcher,
Core,
}

Expand All @@ -733,6 +745,7 @@ impl Component {
"ws_api" => Ok(&[Component::WsApi]),
"tree" => Ok(&[Component::Tree]),
"tree_api" => Ok(&[Component::TreeApi]),
"tree_fetcher" => Ok(&[Component::TreeFetcher]),
"core" => Ok(&[Component::Core]),
"all" => Ok(&[
Component::HttpApi,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

35 changes: 7 additions & 28 deletions core/lib/dal/src/blocks_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl BlocksDal<'_, '_> {
Ok(row.number.map(|num| L1BatchNumber(num as u32)))
}

pub async fn get_last_l1_batch_number_with_metadata(
pub async fn get_last_l1_batch_number_with_tree_data(
&mut self,
) -> DalResult<Option<L1BatchNumber>> {
let row = sqlx::query!(
Expand All @@ -156,7 +156,7 @@ impl BlocksDal<'_, '_> {
hash IS NOT NULL
"#
)
.instrument("get_last_block_number_with_metadata")
.instrument("get_last_block_number_with_tree_data")
.report_latency()
.fetch_one(self.storage)
.await?;
Expand Down Expand Up @@ -805,33 +805,12 @@ impl BlocksDal<'_, '_> {
if update_result.rows_affected() == 0 {
tracing::debug!("L1 batch #{number}: tree data wasn't updated as it's already present");

// Batch was already processed. Verify that existing hash matches
let matched: i64 = sqlx::query!(
r#"
SELECT
COUNT(*) AS "count!"
FROM
l1_batches
WHERE
number = $1
AND hash = $2
"#,
i64::from(number.0),
tree_data.hash.as_bytes(),
)
.instrument("get_matching_batch_hash")
.with_arg("number", &number)
.report_latency()
.fetch_one(self.storage)
.await?
.count;

// Batch was already processed. Verify that the existing tree data matches.
let existing_tree_data = self.get_l1_batch_tree_data(number).await?;
anyhow::ensure!(
matched == 1,
"Root hash verification failed. Hash for L1 batch #{} does not match the expected value \
(expected root hash: {:?})",
number,
tree_data.hash,
existing_tree_data.as_ref() == Some(tree_data),
"Root hash verification failed. Tree data for L1 batch #{number} does not match the expected value \
(expected: {tree_data:?}, existing: {existing_tree_data:?})",
);
}
Ok(())
Expand Down
82 changes: 82 additions & 0 deletions core/lib/dal/src/storage_logs_dedup_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,30 @@ impl StorageLogsDedupDal<'_, '_> {
.map(|max| max as u64))
}

/// Returns the maximum enumeration index assigned in a specific L1 batch.
pub async fn max_enumeration_index_for_l1_batch(
&mut self,
l1_batch_number: L1BatchNumber,
) -> DalResult<Option<u64>> {
let row = sqlx::query!(
r#"
SELECT
MAX(INDEX) AS "max?"
FROM
initial_writes
WHERE
l1_batch_number = $1
"#,
i64::from(l1_batch_number.0)
)
.instrument("max_enumeration_index_for_l1_batch")
.with_arg("l1_batch_number", &l1_batch_number)
.fetch_one(self.storage)
.await?;

Ok(row.max.map(|max| max as u64))
}

pub async fn initial_writes_for_batch(
&mut self,
l1_batch_number: L1BatchNumber,
Expand Down Expand Up @@ -295,3 +319,61 @@ impl StorageLogsDedupDal<'_, '_> {
.collect()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{ConnectionPool, CoreDal};

#[tokio::test]
async fn getting_max_enumeration_index_for_batch() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut conn = pool.connection().await.unwrap();
let max_index = conn
.storage_logs_dedup_dal()
.max_enumeration_index_for_l1_batch(L1BatchNumber(0))
.await
.unwrap();
assert_eq!(max_index, None);

let account = AccountTreeId::new(Address::repeat_byte(1));
let initial_writes = [
StorageKey::new(account, H256::zero()),
StorageKey::new(account, H256::repeat_byte(1)),
];
conn.storage_logs_dedup_dal()
.insert_initial_writes(L1BatchNumber(0), &initial_writes)
.await
.unwrap();

let max_index = conn
.storage_logs_dedup_dal()
.max_enumeration_index_for_l1_batch(L1BatchNumber(0))
.await
.unwrap();
assert_eq!(max_index, Some(2));

let initial_writes = [
StorageKey::new(account, H256::repeat_byte(2)),
StorageKey::new(account, H256::repeat_byte(3)),
];
conn.storage_logs_dedup_dal()
.insert_initial_writes(L1BatchNumber(1), &initial_writes)
.await
.unwrap();

let max_index = conn
.storage_logs_dedup_dal()
.max_enumeration_index_for_l1_batch(L1BatchNumber(0))
.await
.unwrap();
assert_eq!(max_index, Some(2));

let max_index = conn
.storage_logs_dedup_dal()
.max_enumeration_index_for_l1_batch(L1BatchNumber(1))
.await
.unwrap();
assert_eq!(max_index, Some(4));
}
}
6 changes: 5 additions & 1 deletion core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,11 @@ async fn calculate_mock_metadata(ctx: &ctx::Ctx, pool: &ConnectionPool) -> ctx::
return Ok(());
};
let prev = ctx
.wait(conn.0.blocks_dal().get_last_l1_batch_number_with_metadata())
.wait(
conn.0
.blocks_dal()
.get_last_l1_batch_number_with_tree_data(),
)
.await?
.map_err(DalError::generalize)?;
let mut first = match prev {
Expand Down
6 changes: 3 additions & 3 deletions core/node/house_keeper/src/blocks_state_reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ impl L1BatchMetricsReporter {
block_metrics.push((number, BlockStage::Sealed));
}

let last_l1_batch_with_metadata = conn
let last_l1_batch_with_tree_data = conn
.blocks_dal()
.get_last_l1_batch_number_with_metadata()
.get_last_l1_batch_number_with_tree_data()
.await?;
if let Some(number) = last_l1_batch_with_metadata {
if let Some(number) = last_l1_batch_with_tree_data {
block_metrics.push((number, BlockStage::MetadataCalculated));
}

Expand Down
16 changes: 8 additions & 8 deletions core/node/metadata_calculator/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,28 +231,28 @@ impl TreeUpdater {
let mut next_l1_batch_to_seal = tree.next_l1_batch_number();

let current_db_batch = storage.blocks_dal().get_sealed_l1_batch_number().await?;
let last_l1_batch_with_metadata = storage
let last_l1_batch_with_tree_data = storage
.blocks_dal()
.get_last_l1_batch_number_with_metadata()
.get_last_l1_batch_number_with_tree_data()
.await?;
drop(storage);

tracing::info!(
"Initialized metadata calculator with {max_batches_per_iter} max L1 batches per iteration. \
Next L1 batch for Merkle tree: {next_l1_batch_to_seal}, current Postgres L1 batch: {current_db_batch:?}, \
last L1 batch with metadata: {last_l1_batch_with_metadata:?}",
last L1 batch with metadata: {last_l1_batch_with_tree_data:?}",
max_batches_per_iter = self.max_l1_batches_per_iter
);

// It may be the case that we don't have any L1 batches with metadata in Postgres, e.g. after
// recovering from a snapshot. We cannot wait for such a batch to appear (*this* is the component
// responsible for their appearance!), but fortunately most of the updater doesn't depend on it.
if let Some(last_l1_batch_with_metadata) = last_l1_batch_with_metadata {
if let Some(last_l1_batch_with_tree_data) = last_l1_batch_with_tree_data {
let backup_lag =
(last_l1_batch_with_metadata.0 + 1).saturating_sub(next_l1_batch_to_seal.0);
(last_l1_batch_with_tree_data.0 + 1).saturating_sub(next_l1_batch_to_seal.0);
METRICS.backup_lag.set(backup_lag.into());

if next_l1_batch_to_seal > last_l1_batch_with_metadata + 1 {
if next_l1_batch_to_seal > last_l1_batch_with_tree_data + 1 {
// Check stop signal before proceeding with a potentially time-consuming operation.
if *stop_receiver.borrow_and_update() {
tracing::info!("Stop signal received, metadata_calculator is shutting down");
Expand All @@ -261,10 +261,10 @@ impl TreeUpdater {

tracing::warn!(
"Next L1 batch of the tree ({next_l1_batch_to_seal}) is greater than last L1 batch with metadata in Postgres \
({last_l1_batch_with_metadata}); this may be a result of restoring Postgres from a snapshot. \
({last_l1_batch_with_tree_data}); this may be a result of restoring Postgres from a snapshot. \
Truncating Merkle tree versions so that this mismatch is fixed..."
);
tree.revert_logs(last_l1_batch_with_metadata);
tree.revert_logs(last_l1_batch_with_tree_data);
tree.save().await?;
next_l1_batch_to_seal = tree.next_l1_batch_number();
tracing::info!("Truncated Merkle tree to L1 batch #{next_l1_batch_to_seal}");
Expand Down
11 changes: 4 additions & 7 deletions core/node/node_sync/src/external_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl StateKeeperIO for ExternalIO {
);
}
async fn load_base_system_contracts(
&mut self,
&self,
protocol_version: ProtocolVersionId,
cursor: &IoCursor,
) -> anyhow::Result<BaseSystemContracts> {
Expand Down Expand Up @@ -371,7 +371,7 @@ impl StateKeeperIO for ExternalIO {
}

async fn load_batch_version_id(
&mut self,
&self,
number: L1BatchNumber,
) -> anyhow::Result<ProtocolVersionId> {
let mut storage = self.pool.connection_tagged("sync_layer").await?;
Expand All @@ -383,17 +383,14 @@ impl StateKeeperIO for ExternalIO {
}

async fn load_upgrade_tx(
&mut self,
&self,
_version_id: ProtocolVersionId,
) -> anyhow::Result<Option<ProtocolUpgradeTx>> {
// External node will fetch upgrade tx from the main node
Ok(None)
}

async fn load_batch_state_hash(
&mut self,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<H256> {
async fn load_batch_state_hash(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result<H256> {
tracing::info!("Getting L1 batch hash for L1 batch #{l1_batch_number}");
let mut storage = self.pool.connection_tagged("sync_layer").await?;
let wait_latency = KEEPER_METRICS.wait_for_prev_hash_time.start();
Expand Down
1 change: 1 addition & 0 deletions core/node/node_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod sync_state;
pub mod testonly;
#[cfg(test)]
mod tests;
pub mod tree_data_fetcher;

pub use self::{
client::MainNodeClient,
Expand Down
Loading

0 comments on commit 72a3571

Please sign in to comment.