diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index f239ff125fb6..bcd9415069f2 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -35,7 +35,8 @@ use zksync_node_consensus as consensus; use zksync_node_db_pruner::{DbPruner, DbPrunerConfig}; use zksync_node_fee_model::l1_gas_price::MainNodeFeeParamsFetcher; use zksync_node_sync::{ - batch_status_updater::BatchStatusUpdater, external_io::ExternalIO, ActionQueue, SyncState, + batch_status_updater::BatchStatusUpdater, external_io::ExternalIO, + tree_data_fetcher::TreeDataFetcher, ActionQueue, SyncState, }; use zksync_reorg_detector::ReorgDetector; use zksync_state::{PostgresStorageCaches, RocksdbStorageOptions}; @@ -625,6 +626,16 @@ async fn init_tasks( None }; + if components.contains(&Component::TreeFetcher) { + tracing::warn!( + "Running tree data fetcher (allows a node to operate w/o a Merkle tree or w/o waiting the tree to catch up). \ + This is an experimental feature; do not use unless you know what you're doing" + ); + let fetcher = TreeDataFetcher::new(main_node_client.clone(), connection_pool.clone()); + app_health.insert_component(fetcher.health_check())?; + task_handles.push(tokio::spawn(fetcher.run(stop_receiver.clone()))); + } + let fee_params_fetcher = Arc::new(MainNodeFeeParamsFetcher::new(main_node_client.clone())); let sync_state = if components.contains(&Component::Core) { @@ -722,6 +733,7 @@ pub enum Component { WsApi, Tree, TreeApi, + TreeFetcher, Core, } @@ -733,6 +745,7 @@ impl Component { "ws_api" => Ok(&[Component::WsApi]), "tree" => Ok(&[Component::Tree]), "tree_api" => Ok(&[Component::TreeApi]), + "tree_fetcher" => Ok(&[Component::TreeFetcher]), "core" => Ok(&[Component::Core]), "all" => Ok(&[ Component::HttpApi, diff --git a/core/lib/dal/.sqlx/query-16e1a17bfc426bb32489595bd8cccb1ef34292fcf694deddc06b6dd5b72a02f3.json b/core/lib/dal/.sqlx/query-16e1a17bfc426bb32489595bd8cccb1ef34292fcf694deddc06b6dd5b72a02f3.json new file mode 100644 index 000000000000..479bc818b9bb --- /dev/null +++ b/core/lib/dal/.sqlx/query-16e1a17bfc426bb32489595bd8cccb1ef34292fcf694deddc06b6dd5b72a02f3.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n MAX(INDEX) AS \"max?\"\n FROM\n initial_writes\n WHERE\n l1_batch_number = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "max?", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + null + ] + }, + "hash": "16e1a17bfc426bb32489595bd8cccb1ef34292fcf694deddc06b6dd5b72a02f3" +} diff --git a/core/lib/dal/.sqlx/query-df3256c012f86a9cd3b9260b97be5c6feb8059722149a747c4b6bd46731e2536.json b/core/lib/dal/.sqlx/query-df3256c012f86a9cd3b9260b97be5c6feb8059722149a747c4b6bd46731e2536.json deleted file mode 100644 index 552b6ab2cc8e..000000000000 --- a/core/lib/dal/.sqlx/query-df3256c012f86a9cd3b9260b97be5c6feb8059722149a747c4b6bd46731e2536.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n COUNT(*) AS \"count!\"\n FROM\n l1_batches\n WHERE\n number = $1\n AND hash = $2\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "count!", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Int8", - "Bytea" - ] - }, - "nullable": [ - null - ] - }, - "hash": "df3256c012f86a9cd3b9260b97be5c6feb8059722149a747c4b6bd46731e2536" -} diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index 3b9b2176b1c0..467e5437c1fa 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -143,7 +143,7 @@ impl BlocksDal<'_, '_> { Ok(row.number.map(|num| L1BatchNumber(num as u32))) } - pub async fn get_last_l1_batch_number_with_metadata( + pub async fn get_last_l1_batch_number_with_tree_data( &mut self, ) -> DalResult> { let row = sqlx::query!( @@ -156,7 +156,7 @@ impl BlocksDal<'_, '_> { hash IS NOT NULL "# ) - .instrument("get_last_block_number_with_metadata") + .instrument("get_last_block_number_with_tree_data") .report_latency() .fetch_one(self.storage) .await?; @@ -805,33 +805,12 @@ impl BlocksDal<'_, '_> { if update_result.rows_affected() == 0 { tracing::debug!("L1 batch #{number}: tree data wasn't updated as it's already present"); - // Batch was already processed. Verify that existing hash matches - let matched: i64 = sqlx::query!( - r#" - SELECT - COUNT(*) AS "count!" - FROM - l1_batches - WHERE - number = $1 - AND hash = $2 - "#, - i64::from(number.0), - tree_data.hash.as_bytes(), - ) - .instrument("get_matching_batch_hash") - .with_arg("number", &number) - .report_latency() - .fetch_one(self.storage) - .await? - .count; - + // Batch was already processed. Verify that the existing tree data matches. + let existing_tree_data = self.get_l1_batch_tree_data(number).await?; anyhow::ensure!( - matched == 1, - "Root hash verification failed. Hash for L1 batch #{} does not match the expected value \ - (expected root hash: {:?})", - number, - tree_data.hash, + existing_tree_data.as_ref() == Some(tree_data), + "Root hash verification failed. Tree data for L1 batch #{number} does not match the expected value \ + (expected: {tree_data:?}, existing: {existing_tree_data:?})", ); } Ok(()) diff --git a/core/lib/dal/src/storage_logs_dedup_dal.rs b/core/lib/dal/src/storage_logs_dedup_dal.rs index 6304869a8261..2ad4f2a3c71a 100644 --- a/core/lib/dal/src/storage_logs_dedup_dal.rs +++ b/core/lib/dal/src/storage_logs_dedup_dal.rs @@ -172,6 +172,30 @@ impl StorageLogsDedupDal<'_, '_> { .map(|max| max as u64)) } + /// Returns the maximum enumeration index assigned in a specific L1 batch. + pub async fn max_enumeration_index_for_l1_batch( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> DalResult> { + let row = sqlx::query!( + r#" + SELECT + MAX(INDEX) AS "max?" + FROM + initial_writes + WHERE + l1_batch_number = $1 + "#, + i64::from(l1_batch_number.0) + ) + .instrument("max_enumeration_index_for_l1_batch") + .with_arg("l1_batch_number", &l1_batch_number) + .fetch_one(self.storage) + .await?; + + Ok(row.max.map(|max| max as u64)) + } + pub async fn initial_writes_for_batch( &mut self, l1_batch_number: L1BatchNumber, @@ -295,3 +319,61 @@ impl StorageLogsDedupDal<'_, '_> { .collect() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ConnectionPool, CoreDal}; + + #[tokio::test] + async fn getting_max_enumeration_index_for_batch() { + let pool = ConnectionPool::::test_pool().await; + let mut conn = pool.connection().await.unwrap(); + let max_index = conn + .storage_logs_dedup_dal() + .max_enumeration_index_for_l1_batch(L1BatchNumber(0)) + .await + .unwrap(); + assert_eq!(max_index, None); + + let account = AccountTreeId::new(Address::repeat_byte(1)); + let initial_writes = [ + StorageKey::new(account, H256::zero()), + StorageKey::new(account, H256::repeat_byte(1)), + ]; + conn.storage_logs_dedup_dal() + .insert_initial_writes(L1BatchNumber(0), &initial_writes) + .await + .unwrap(); + + let max_index = conn + .storage_logs_dedup_dal() + .max_enumeration_index_for_l1_batch(L1BatchNumber(0)) + .await + .unwrap(); + assert_eq!(max_index, Some(2)); + + let initial_writes = [ + StorageKey::new(account, H256::repeat_byte(2)), + StorageKey::new(account, H256::repeat_byte(3)), + ]; + conn.storage_logs_dedup_dal() + .insert_initial_writes(L1BatchNumber(1), &initial_writes) + .await + .unwrap(); + + let max_index = conn + .storage_logs_dedup_dal() + .max_enumeration_index_for_l1_batch(L1BatchNumber(0)) + .await + .unwrap(); + assert_eq!(max_index, Some(2)); + + let max_index = conn + .storage_logs_dedup_dal() + .max_enumeration_index_for_l1_batch(L1BatchNumber(1)) + .await + .unwrap(); + assert_eq!(max_index, Some(4)); + } +} diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index ff1af127c038..6f064d66efce 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -276,7 +276,11 @@ async fn calculate_mock_metadata(ctx: &ctx::Ctx, pool: &ConnectionPool) -> ctx:: return Ok(()); }; let prev = ctx - .wait(conn.0.blocks_dal().get_last_l1_batch_number_with_metadata()) + .wait( + conn.0 + .blocks_dal() + .get_last_l1_batch_number_with_tree_data(), + ) .await? .map_err(DalError::generalize)?; let mut first = match prev { diff --git a/core/node/house_keeper/src/blocks_state_reporter.rs b/core/node/house_keeper/src/blocks_state_reporter.rs index a0736c003fe3..5285390a2783 100644 --- a/core/node/house_keeper/src/blocks_state_reporter.rs +++ b/core/node/house_keeper/src/blocks_state_reporter.rs @@ -28,11 +28,11 @@ impl L1BatchMetricsReporter { block_metrics.push((number, BlockStage::Sealed)); } - let last_l1_batch_with_metadata = conn + let last_l1_batch_with_tree_data = conn .blocks_dal() - .get_last_l1_batch_number_with_metadata() + .get_last_l1_batch_number_with_tree_data() .await?; - if let Some(number) = last_l1_batch_with_metadata { + if let Some(number) = last_l1_batch_with_tree_data { block_metrics.push((number, BlockStage::MetadataCalculated)); } diff --git a/core/node/metadata_calculator/src/updater.rs b/core/node/metadata_calculator/src/updater.rs index 58523b468e3d..d8ef34a68e7b 100644 --- a/core/node/metadata_calculator/src/updater.rs +++ b/core/node/metadata_calculator/src/updater.rs @@ -231,28 +231,28 @@ impl TreeUpdater { let mut next_l1_batch_to_seal = tree.next_l1_batch_number(); let current_db_batch = storage.blocks_dal().get_sealed_l1_batch_number().await?; - let last_l1_batch_with_metadata = storage + let last_l1_batch_with_tree_data = storage .blocks_dal() - .get_last_l1_batch_number_with_metadata() + .get_last_l1_batch_number_with_tree_data() .await?; drop(storage); tracing::info!( "Initialized metadata calculator with {max_batches_per_iter} max L1 batches per iteration. \ Next L1 batch for Merkle tree: {next_l1_batch_to_seal}, current Postgres L1 batch: {current_db_batch:?}, \ - last L1 batch with metadata: {last_l1_batch_with_metadata:?}", + last L1 batch with metadata: {last_l1_batch_with_tree_data:?}", max_batches_per_iter = self.max_l1_batches_per_iter ); // It may be the case that we don't have any L1 batches with metadata in Postgres, e.g. after // recovering from a snapshot. We cannot wait for such a batch to appear (*this* is the component // responsible for their appearance!), but fortunately most of the updater doesn't depend on it. - if let Some(last_l1_batch_with_metadata) = last_l1_batch_with_metadata { + if let Some(last_l1_batch_with_tree_data) = last_l1_batch_with_tree_data { let backup_lag = - (last_l1_batch_with_metadata.0 + 1).saturating_sub(next_l1_batch_to_seal.0); + (last_l1_batch_with_tree_data.0 + 1).saturating_sub(next_l1_batch_to_seal.0); METRICS.backup_lag.set(backup_lag.into()); - if next_l1_batch_to_seal > last_l1_batch_with_metadata + 1 { + if next_l1_batch_to_seal > last_l1_batch_with_tree_data + 1 { // Check stop signal before proceeding with a potentially time-consuming operation. if *stop_receiver.borrow_and_update() { tracing::info!("Stop signal received, metadata_calculator is shutting down"); @@ -261,10 +261,10 @@ impl TreeUpdater { tracing::warn!( "Next L1 batch of the tree ({next_l1_batch_to_seal}) is greater than last L1 batch with metadata in Postgres \ - ({last_l1_batch_with_metadata}); this may be a result of restoring Postgres from a snapshot. \ + ({last_l1_batch_with_tree_data}); this may be a result of restoring Postgres from a snapshot. \ Truncating Merkle tree versions so that this mismatch is fixed..." ); - tree.revert_logs(last_l1_batch_with_metadata); + tree.revert_logs(last_l1_batch_with_tree_data); tree.save().await?; next_l1_batch_to_seal = tree.next_l1_batch_number(); tracing::info!("Truncated Merkle tree to L1 batch #{next_l1_batch_to_seal}"); diff --git a/core/node/node_sync/src/external_io.rs b/core/node/node_sync/src/external_io.rs index 6e9b5d9e017d..4e868ffb9fdf 100644 --- a/core/node/node_sync/src/external_io.rs +++ b/core/node/node_sync/src/external_io.rs @@ -312,7 +312,7 @@ impl StateKeeperIO for ExternalIO { ); } async fn load_base_system_contracts( - &mut self, + &self, protocol_version: ProtocolVersionId, cursor: &IoCursor, ) -> anyhow::Result { @@ -371,7 +371,7 @@ impl StateKeeperIO for ExternalIO { } async fn load_batch_version_id( - &mut self, + &self, number: L1BatchNumber, ) -> anyhow::Result { let mut storage = self.pool.connection_tagged("sync_layer").await?; @@ -383,17 +383,14 @@ impl StateKeeperIO for ExternalIO { } async fn load_upgrade_tx( - &mut self, + &self, _version_id: ProtocolVersionId, ) -> anyhow::Result> { // External node will fetch upgrade tx from the main node Ok(None) } - async fn load_batch_state_hash( - &mut self, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result { + async fn load_batch_state_hash(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result { tracing::info!("Getting L1 batch hash for L1 batch #{l1_batch_number}"); let mut storage = self.pool.connection_tagged("sync_layer").await?; let wait_latency = KEEPER_METRICS.wait_for_prev_hash_time.start(); diff --git a/core/node/node_sync/src/lib.rs b/core/node/node_sync/src/lib.rs index 663a924e52b7..6a2c5b8c54b2 100644 --- a/core/node/node_sync/src/lib.rs +++ b/core/node/node_sync/src/lib.rs @@ -9,6 +9,7 @@ mod sync_state; pub mod testonly; #[cfg(test)] mod tests; +pub mod tree_data_fetcher; pub use self::{ client::MainNodeClient, diff --git a/core/node/node_sync/src/tree_data_fetcher/metrics.rs b/core/node/node_sync/src/tree_data_fetcher/metrics.rs new file mode 100644 index 000000000000..5d063312f4ca --- /dev/null +++ b/core/node/node_sync/src/tree_data_fetcher/metrics.rs @@ -0,0 +1,85 @@ +//! Metrics for Tree data fetcher. + +use std::time::Duration; + +use vise::{ + Buckets, Counter, DurationAsSecs, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, + Info, Metrics, Unit, +}; + +use super::{StepOutcome, TreeDataFetcher, TreeDataFetcherError}; + +#[derive(Debug, EncodeLabelSet)] +struct TreeDataFetcherInfo { + #[metrics(unit = Unit::Seconds)] + poll_interval: DurationAsSecs, +} + +impl From<&TreeDataFetcher> for TreeDataFetcherInfo { + fn from(fetcher: &TreeDataFetcher) -> Self { + Self { + poll_interval: fetcher.poll_interval.into(), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "stage", rename_all = "snake_case")] +pub(super) enum ProcessingStage { + Fetch, + Persistence, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "kind", rename_all = "snake_case")] +pub(super) enum StepOutcomeLabel { + UpdatedBatch, + NoProgress, + RemoteHashMissing, + TransientError, +} + +#[derive(Debug, Metrics)] +#[metrics(prefix = "external_node_tree_data_fetcher")] +pub(super) struct TreeDataFetcherMetrics { + /// Immutable information about the fetcher. + info: Info, + /// Number of times a fetcher step resulted in a certain outcome. + pub step_outcomes: Family, + /// Last L1 batch with tree data updated by the fetcher. + pub last_updated_batch_number: Gauge, + /// Latency of a particular stage of processing a single L1 batch. + #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] + pub stage_latency: Family>, +} + +impl TreeDataFetcherMetrics { + pub fn observe_info(&self, fetcher: &TreeDataFetcher) { + let info = TreeDataFetcherInfo::from(fetcher); + tracing::info!("Starting tree data fetcher with {info:?}"); + if let Err(err) = self.info.set(info) { + tracing::warn!( + "Error setting configuration info {:?} for tree data fetcher; already set to {:?}", + err.into_inner(), + self.info.get() + ); + } + } + + pub fn observe_step_outcome(&self, outcome: Result<&StepOutcome, &TreeDataFetcherError>) { + let label = match outcome { + Ok(StepOutcome::UpdatedBatch(number)) => { + self.last_updated_batch_number.set(number.0.into()); + StepOutcomeLabel::UpdatedBatch + } + Ok(StepOutcome::NoProgress) => StepOutcomeLabel::NoProgress, + Ok(StepOutcome::RemoteHashMissing) => StepOutcomeLabel::RemoteHashMissing, + Err(err) if err.is_transient() => StepOutcomeLabel::TransientError, + Err(_) => return, // fatal error; the node will exit soon anyway + }; + self.step_outcomes[&label].inc(); + } +} + +#[vise::register] +pub(super) static METRICS: vise::Global = vise::Global::new(); diff --git a/core/node/node_sync/src/tree_data_fetcher/mod.rs b/core/node/node_sync/src/tree_data_fetcher/mod.rs new file mode 100644 index 000000000000..c09b99c850f4 --- /dev/null +++ b/core/node/node_sync/src/tree_data_fetcher/mod.rs @@ -0,0 +1,295 @@ +//! Fetcher responsible for getting Merkle tree outputs from the main node. + +use std::{fmt, time::Duration}; + +use anyhow::Context as _; +use async_trait::async_trait; +use serde::Serialize; +#[cfg(test)] +use tokio::sync::mpsc; +use tokio::sync::watch; +use zksync_dal::{Connection, ConnectionPool, Core, CoreDal, DalError}; +use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; +use zksync_types::{api, block::L1BatchTreeData, L1BatchNumber}; +use zksync_web3_decl::{ + client::{DynClient, L2}, + error::{ClientRpcContext, EnrichedClientError, EnrichedClientResult}, + namespaces::ZksNamespaceClient, +}; + +use self::metrics::{ProcessingStage, TreeDataFetcherMetrics, METRICS}; + +mod metrics; +#[cfg(test)] +mod tests; + +#[async_trait] +trait MainNodeClient: fmt::Debug + Send + Sync + 'static { + async fn batch_details( + &self, + number: L1BatchNumber, + ) -> EnrichedClientResult>; +} + +#[async_trait] +impl MainNodeClient for Box> { + async fn batch_details( + &self, + number: L1BatchNumber, + ) -> EnrichedClientResult> { + self.get_l1_batch_details(number) + .rpc_context("get_l1_batch_details") + .with_arg("number", &number) + .await + } +} + +#[derive(Debug, thiserror::Error)] +enum TreeDataFetcherError { + #[error("error fetching data from main node")] + Rpc(#[from] EnrichedClientError), + #[error("internal error")] + Internal(#[from] anyhow::Error), +} + +impl From for TreeDataFetcherError { + fn from(err: DalError) -> Self { + Self::Internal(err.generalize()) + } +} + +impl TreeDataFetcherError { + fn is_transient(&self) -> bool { + match self { + Self::Rpc(err) => err.is_transient(), + Self::Internal(_) => false, + } + } +} + +#[derive(Debug, Serialize)] +#[serde(untagged)] +enum TreeDataFetcherHealth { + Ready { + #[serde(skip_serializing_if = "Option::is_none")] + last_updated_l1_batch: Option, + }, + Affected { + error: String, + }, +} + +impl From for Health { + fn from(health: TreeDataFetcherHealth) -> Self { + let status = match health { + TreeDataFetcherHealth::Ready { .. } => HealthStatus::Ready, + TreeDataFetcherHealth::Affected { .. } => HealthStatus::Affected, + }; + Self::from(status).with_details(health) + } +} + +#[derive(Debug)] +enum StepOutcome { + UpdatedBatch(L1BatchNumber), + NoProgress, + RemoteHashMissing, +} + +/// Component fetching tree data (i.e., state root hashes for L1 batches) from external sources, such as +/// the main node, and persisting this data to Postgres. +/// +/// # Overview +/// +/// This component allows a node to operate w/o a Merkle tree or w/o waiting the tree to catch up. +/// It can be operated together with Metadata calculator or instead of it. In the first case, Metadata calculator +/// (which is generally expected to be slower) will check that data returned by the main node is correct +/// (i.e., "trust but verify" trust model). Additionally, the persisted data will be checked against L1 commitment transactions +/// by Consistency checker. +#[derive(Debug)] +pub struct TreeDataFetcher { + main_node_client: Box, + pool: ConnectionPool, + metrics: &'static TreeDataFetcherMetrics, + health_updater: HealthUpdater, + poll_interval: Duration, + #[cfg(test)] + updates_sender: mpsc::UnboundedSender, +} + +impl TreeDataFetcher { + const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(100); + + /// Creates a new fetcher connected to the main node. + pub fn new(client: Box>, pool: ConnectionPool) -> Self { + Self { + main_node_client: Box::new(client.for_component("tree_data_fetcher")), + pool, + metrics: &METRICS, + health_updater: ReactiveHealthCheck::new("tree_data_fetcher").1, + poll_interval: Self::DEFAULT_POLL_INTERVAL, + #[cfg(test)] + updates_sender: mpsc::unbounded_channel().0, + } + } + + /// Returns a health check for this fetcher. + pub fn health_check(&self) -> ReactiveHealthCheck { + self.health_updater.subscribe() + } + + async fn get_batch_to_fetch(&self) -> anyhow::Result> { + let mut storage = self.pool.connection_tagged("tree_data_fetcher").await?; + // Fetch data in a readonly transaction to have a consistent view of the storage + let mut storage = storage.start_transaction().await?; + + let last_l1_batch = storage.blocks_dal().get_sealed_l1_batch_number().await?; + let Some(last_l1_batch) = last_l1_batch else { + tracing::debug!("No L1 batches in the database yet; cannot progress"); + return Ok(None); + }; + + let last_l1_batch_with_tree_data = storage + .blocks_dal() + .get_last_l1_batch_number_with_tree_data() + .await?; + let l1_batch_to_fetch = if let Some(batch) = last_l1_batch_with_tree_data { + batch + 1 + } else { + let earliest_l1_batch = storage.blocks_dal().get_earliest_l1_batch_number().await?; + let earliest_l1_batch = + earliest_l1_batch.context("all L1 batches disappeared from Postgres")?; + tracing::debug!("No L1 batches with metadata present in the storage; will fetch the earliest batch #{earliest_l1_batch}"); + earliest_l1_batch + }; + Ok(if l1_batch_to_fetch <= last_l1_batch { + Some(l1_batch_to_fetch) + } else { + None + }) + } + + async fn get_rollup_last_leaf_index( + storage: &mut Connection<'_, Core>, + mut l1_batch_number: L1BatchNumber, + ) -> anyhow::Result { + // With overwhelming probability, there's at least one initial write in an L1 batch, + // so this loop will execute for 1 iteration. + loop { + let maybe_index = storage + .storage_logs_dedup_dal() + .max_enumeration_index_for_l1_batch(l1_batch_number) + .await?; + if let Some(index) = maybe_index { + return Ok(index + 1); + } + tracing::warn!( + "No initial writes in L1 batch #{l1_batch_number}; trying the previous batch" + ); + l1_batch_number -= 1; + } + } + + async fn step(&self) -> Result { + let Some(l1_batch_to_fetch) = self.get_batch_to_fetch().await? else { + return Ok(StepOutcome::NoProgress); + }; + + tracing::debug!("Fetching tree data for L1 batch #{l1_batch_to_fetch} from main node"); + let stage_latency = self.metrics.stage_latency[&ProcessingStage::Fetch].start(); + let batch_details = self + .main_node_client + .batch_details(l1_batch_to_fetch) + .await? + .with_context(|| { + format!( + "L1 batch #{l1_batch_to_fetch} is sealed locally, but is not present on the main node, \ + which is assumed to store batch info indefinitely" + ) + })?; + stage_latency.observe(); + let Some(root_hash) = batch_details.base.root_hash else { + tracing::debug!( + "L1 batch #{l1_batch_to_fetch} does not have root hash computed on the main node" + ); + return Ok(StepOutcome::RemoteHashMissing); + }; + + let stage_latency = self.metrics.stage_latency[&ProcessingStage::Persistence].start(); + let mut storage = self.pool.connection_tagged("tree_data_fetcher").await?; + let rollup_last_leaf_index = + Self::get_rollup_last_leaf_index(&mut storage, l1_batch_to_fetch).await?; + let tree_data = L1BatchTreeData { + hash: root_hash, + rollup_last_leaf_index, + }; + storage + .blocks_dal() + .save_l1_batch_tree_data(l1_batch_to_fetch, &tree_data) + .await?; + stage_latency.observe(); + tracing::debug!("Updated L1 batch #{l1_batch_to_fetch} with tree data: {tree_data:?}"); + Ok(StepOutcome::UpdatedBatch(l1_batch_to_fetch)) + } + + fn update_health(&self, last_updated_l1_batch: Option) { + let health = TreeDataFetcherHealth::Ready { + last_updated_l1_batch, + }; + self.health_updater.update(health.into()); + } + + /// Runs this component until a fatal error occurs or a stop signal is received. Transient errors + /// (e.g., no network connection) are handled gracefully by retrying after a delay. + pub async fn run(self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { + self.metrics.observe_info(&self); + self.health_updater + .update(Health::from(HealthStatus::Ready)); + let mut last_updated_l1_batch = None; + + while !*stop_receiver.borrow_and_update() { + let step_outcome = self.step().await; + self.metrics.observe_step_outcome(step_outcome.as_ref()); + let need_to_sleep = match step_outcome { + Ok(StepOutcome::UpdatedBatch(batch_number)) => { + #[cfg(test)] + self.updates_sender.send(batch_number).ok(); + + last_updated_l1_batch = Some(batch_number); + self.update_health(last_updated_l1_batch); + false + } + Ok(StepOutcome::NoProgress | StepOutcome::RemoteHashMissing) => { + // Update health status even if no progress was made to timely clear a previously set + // "affected" health. + self.update_health(last_updated_l1_batch); + true + } + Err(err) if err.is_transient() => { + tracing::warn!( + "Transient error in tree data fetcher, will retry after a delay: {err:?}" + ); + let health = TreeDataFetcherHealth::Affected { + error: err.to_string(), + }; + self.health_updater.update(health.into()); + true + } + Err(err) => { + tracing::error!("Fatal error in tree data fetcher: {err:?}"); + return Err(err.into()); + } + }; + + if need_to_sleep + && tokio::time::timeout(self.poll_interval, stop_receiver.changed()) + .await + .is_ok() + { + break; + } + } + tracing::info!("Stop signal received; tree data fetcher is shutting down"); + Ok(()) + } +} diff --git a/core/node/node_sync/src/tree_data_fetcher/tests.rs b/core/node/node_sync/src/tree_data_fetcher/tests.rs new file mode 100644 index 000000000000..b51ec7a3cf61 --- /dev/null +++ b/core/node/node_sync/src/tree_data_fetcher/tests.rs @@ -0,0 +1,354 @@ +use std::{ + collections::HashMap, + ops, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, + }, +}; + +use assert_matches::assert_matches; +use test_casing::test_casing; +use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; +use zksync_node_test_utils::{create_l1_batch, prepare_recovery_snapshot}; +use zksync_types::{AccountTreeId, Address, L2BlockNumber, StorageKey, StorageLog, H256}; +use zksync_web3_decl::jsonrpsee::core::ClientError; + +use super::{metrics::StepOutcomeLabel, *}; + +#[derive(Debug, Default)] +struct MockMainNodeClient { + transient_error: Arc, + batch_details_responses: HashMap, +} + +#[async_trait] +impl MainNodeClient for MockMainNodeClient { + async fn batch_details( + &self, + number: L1BatchNumber, + ) -> EnrichedClientResult> { + if self.transient_error.fetch_and(false, Ordering::Relaxed) { + let err = ClientError::RequestTimeout; + return Err(EnrichedClientError::new(err, "batch_details")); + } + Ok(self.batch_details_responses.get(&number).cloned()) + } +} + +fn mock_l1_batch_details(number: L1BatchNumber, root_hash: Option) -> api::L1BatchDetails { + api::L1BatchDetails { + number, + base: api::BlockDetailsBase { + timestamp: number.0.into(), + l1_tx_count: 0, + l2_tx_count: 10, + root_hash, + status: api::BlockStatus::Sealed, + commit_tx_hash: None, + committed_at: None, + prove_tx_hash: None, + proven_at: None, + execute_tx_hash: None, + executed_at: None, + l1_gas_price: 123, + l2_fair_gas_price: 456, + base_system_contracts_hashes: Default::default(), + }, + } +} + +async fn seal_l1_batch(storage: &mut Connection<'_, Core>, number: L1BatchNumber) { + let mut transaction = storage.start_transaction().await.unwrap(); + transaction + .blocks_dal() + .insert_mock_l1_batch(&create_l1_batch(number.0)) + .await + .unwrap(); + // One initial write per L1 batch + let initial_writes = [StorageKey::new( + AccountTreeId::new(Address::repeat_byte(1)), + H256::from_low_u64_be(number.0.into()), + )]; + transaction + .storage_logs_dedup_dal() + .insert_initial_writes(number, &initial_writes) + .await + .unwrap(); + transaction.commit().await.unwrap(); +} + +#[derive(Debug)] +struct FetcherHarness { + fetcher: TreeDataFetcher, + updates_receiver: mpsc::UnboundedReceiver, + metrics: &'static TreeDataFetcherMetrics, +} + +impl FetcherHarness { + fn new(client: impl MainNodeClient, pool: ConnectionPool) -> Self { + let (updates_sender, updates_receiver) = mpsc::unbounded_channel(); + let metrics = &*Box::leak(Box::::default()); + let fetcher = TreeDataFetcher { + main_node_client: Box::new(client), + pool: pool.clone(), + metrics, + health_updater: ReactiveHealthCheck::new("tree_data_fetcher").1, + poll_interval: Duration::from_millis(10), + updates_sender, + }; + Self { + fetcher, + updates_receiver, + metrics, + } + } +} + +#[tokio::test] +async fn tree_data_fetcher_steps() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + let genesis = insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); + + let mut client = MockMainNodeClient::default(); + for number in 1..=5 { + let number = L1BatchNumber(number); + let details = mock_l1_batch_details(number, Some(H256::from_low_u64_be(number.0.into()))); + client.batch_details_responses.insert(number, details); + seal_l1_batch(&mut storage, number).await; + } + + let fetcher = FetcherHarness::new(client, pool.clone()).fetcher; + for number in 1..=5 { + let step_outcome = fetcher.step().await.unwrap(); + assert_matches!( + step_outcome, + StepOutcome::UpdatedBatch(updated_number) if updated_number == L1BatchNumber(number) + ); + } + let step_outcome = fetcher.step().await.unwrap(); + assert_matches!(step_outcome, StepOutcome::NoProgress); + + // Check tree data in updated batches. + assert_batch_tree_data(&mut storage, 1..=5, genesis.rollup_last_leaf_index).await; +} + +async fn assert_batch_tree_data( + storage: &mut Connection<'_, Core>, + batch_numbers: ops::RangeInclusive, + starting_rollup_last_leaf_index: u64, +) { + for (i, number) in batch_numbers.enumerate() { + let tree_data = storage + .blocks_dal() + .get_l1_batch_tree_data(L1BatchNumber(number)) + .await + .unwrap(); + let tree_data = tree_data.unwrap_or_else(|| { + panic!("No tree data persisted for L1 batch #{number}"); + }); + assert_eq!(tree_data.hash, H256::from_low_u64_be(number.into())); + assert_eq!( + tree_data.rollup_last_leaf_index, + starting_rollup_last_leaf_index + i as u64 + 1 // expecting 1 initial write per batch + ); + } +} + +#[tokio::test] +async fn tree_data_fetcher_steps_after_snapshot_recovery() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + let account = AccountTreeId::new(Address::zero()); + let snapshot_storage_logs: Vec<_> = (0..20) + .map(|i| { + let key = StorageKey::new(account, H256::repeat_byte(i)); + StorageLog::new_write_log(key, H256::repeat_byte(0xff)) + }) + .collect(); + let snapshot = prepare_recovery_snapshot( + &mut storage, + L1BatchNumber(23), + L2BlockNumber(42), + &snapshot_storage_logs, + ) + .await; + + let mut client = MockMainNodeClient::default(); + for i in 1..=5 { + let number = snapshot.l1_batch_number + i; + let details = mock_l1_batch_details(number, Some(H256::from_low_u64_be(number.0.into()))); + client.batch_details_responses.insert(number, details); + seal_l1_batch(&mut storage, number).await; + } + + let fetcher = FetcherHarness::new(client, pool.clone()).fetcher; + for i in 1..=5 { + let step_outcome = fetcher.step().await.unwrap(); + assert_matches!( + step_outcome, + StepOutcome::UpdatedBatch(updated_number) if updated_number == snapshot.l1_batch_number + i + ); + } + let step_outcome = fetcher.step().await.unwrap(); + assert_matches!(step_outcome, StepOutcome::NoProgress); + + let batch_numbers = (snapshot.l1_batch_number.0 + 1)..=(snapshot.l1_batch_number.0 + 5); + assert_batch_tree_data(&mut storage, batch_numbers, 21).await; +} + +#[tokio::test] +async fn tree_data_fetcher_recovers_from_transient_errors() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + let genesis = insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); + + let mut client = MockMainNodeClient::default(); + for number in 1..=5 { + let number = L1BatchNumber(number); + let details = mock_l1_batch_details(number, Some(H256::from_low_u64_be(number.0.into()))); + client.batch_details_responses.insert(number, details); + } + let transient_error = client.transient_error.clone(); + + let FetcherHarness { + fetcher, + mut updates_receiver, + metrics, + } = FetcherHarness::new(client, pool.clone()); + let (stop_sender, stop_receiver) = watch::channel(false); + let fetcher_handle = tokio::spawn(fetcher.run(stop_receiver)); + + for number in 1..=5 { + transient_error.store(true, Ordering::Relaxed); + // Insert L1 batch into a local storage and wait for its tree data to be updated. + seal_l1_batch(&mut storage, L1BatchNumber(number)).await; + let updated_batch = updates_receiver.recv().await.unwrap(); + assert_eq!(updated_batch, L1BatchNumber(number)); + + let tree_data = storage + .blocks_dal() + .get_l1_batch_tree_data(L1BatchNumber(number)) + .await + .unwrap(); + let tree_data = tree_data.unwrap_or_else(|| { + panic!("No tree data persisted for L1 batch #{number}"); + }); + assert_eq!(tree_data.hash, H256::from_low_u64_be(number.into())); + assert_eq!( + tree_data.rollup_last_leaf_index, + genesis.rollup_last_leaf_index + u64::from(number) + ); + } + + // Check metrics. + assert_eq!(metrics.last_updated_batch_number.get(), 5); + assert_eq!( + metrics.step_outcomes[&StepOutcomeLabel::TransientError].get(), + 5 + ); + assert_eq!( + metrics.step_outcomes[&StepOutcomeLabel::UpdatedBatch].get(), + 5 + ); + + stop_sender.send_replace(true); + fetcher_handle.await.unwrap().unwrap(); +} + +#[derive(Debug)] +struct SlowMainNode { + request_count: AtomicUsize, + compute_root_hash_after: usize, +} + +impl SlowMainNode { + fn new(compute_root_hash_after: usize) -> Self { + Self { + request_count: AtomicUsize::new(0), + compute_root_hash_after, + } + } +} + +#[async_trait] +impl MainNodeClient for SlowMainNode { + async fn batch_details( + &self, + number: L1BatchNumber, + ) -> EnrichedClientResult> { + if number != L1BatchNumber(1) { + return Ok(None); + } + let request_count = self.request_count.fetch_add(1, Ordering::Relaxed); + let root_hash = if request_count >= self.compute_root_hash_after { + Some(H256::repeat_byte(1)) + } else { + None + }; + Ok(Some(mock_l1_batch_details(number, root_hash))) + } +} + +#[test_casing(2, [false, true])] +#[tokio::test] +async fn tree_data_fetcher_with_missing_remote_hash(delayed_insertion: bool) { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + let genesis = insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); + if !delayed_insertion { + seal_l1_batch(&mut storage, L1BatchNumber(1)).await; + } + + let client = SlowMainNode::new(3); + let FetcherHarness { + fetcher, + mut updates_receiver, + metrics, + } = FetcherHarness::new(client, pool.clone()); + let (stop_sender, stop_receiver) = watch::channel(false); + let fetcher_handle = tokio::spawn(fetcher.run(stop_receiver)); + + if delayed_insertion { + tokio::time::sleep(Duration::from_millis(10)).await; + seal_l1_batch(&mut storage, L1BatchNumber(1)).await; + } + + // Wait until the L1 batch is updated by the fetcher. + let updated_batch = updates_receiver.recv().await.unwrap(); + assert_eq!(updated_batch, L1BatchNumber(1)); + + let tree_data = storage + .blocks_dal() + .get_l1_batch_tree_data(L1BatchNumber(1)) + .await + .unwrap(); + let tree_data = tree_data.expect("no tree data for batch"); + assert_eq!(tree_data.hash, H256::repeat_byte(1)); + assert_eq!( + tree_data.rollup_last_leaf_index, + genesis.rollup_last_leaf_index + 1 + ); + + // Check metrics. + assert_eq!(metrics.last_updated_batch_number.get(), 1); + assert_eq!( + metrics.step_outcomes[&StepOutcomeLabel::RemoteHashMissing].get(), + 3 + ); + assert_eq!( + metrics.step_outcomes[&StepOutcomeLabel::UpdatedBatch].get(), + 1 + ); + + // Check that the fetcher can be stopped. + stop_sender.send_replace(true); + fetcher_handle.await.unwrap().unwrap(); +} diff --git a/core/node/reorg_detector/src/lib.rs b/core/node/reorg_detector/src/lib.rs index 39af91e5c2fb..ff9aa63e29b0 100644 --- a/core/node/reorg_detector/src/lib.rs +++ b/core/node/reorg_detector/src/lib.rs @@ -258,7 +258,7 @@ impl ReorgDetector { let mut storage = self.pool.connection().await.context("connection()")?; let Some(local_l1_batch) = storage .blocks_dal() - .get_last_l1_batch_number_with_metadata() + .get_last_l1_batch_number_with_tree_data() .await .map_err(DalError::generalize)? else { diff --git a/core/node/state_keeper/src/io/mempool.rs b/core/node/state_keeper/src/io/mempool.rs index 955050f5117d..8d44e38cc6ee 100644 --- a/core/node/state_keeper/src/io/mempool.rs +++ b/core/node/state_keeper/src/io/mempool.rs @@ -289,7 +289,7 @@ impl StateKeeperIO for MempoolIO { } async fn load_base_system_contracts( - &mut self, + &self, protocol_version: ProtocolVersionId, _cursor: &IoCursor, ) -> anyhow::Result { @@ -308,7 +308,7 @@ impl StateKeeperIO for MempoolIO { } async fn load_batch_version_id( - &mut self, + &self, number: L1BatchNumber, ) -> anyhow::Result { let mut storage = self.pool.connection_tagged("state_keeper").await?; @@ -320,7 +320,7 @@ impl StateKeeperIO for MempoolIO { } async fn load_upgrade_tx( - &mut self, + &self, version_id: ProtocolVersionId, ) -> anyhow::Result> { let mut storage = self.pool.connection_tagged("state_keeper").await?; @@ -331,10 +331,7 @@ impl StateKeeperIO for MempoolIO { .map_err(Into::into) } - async fn load_batch_state_hash( - &mut self, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result { + async fn load_batch_state_hash(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result { tracing::trace!("Getting L1 batch hash for L1 batch #{l1_batch_number}"); let wait_latency = KEEPER_METRICS.wait_for_prev_hash_time.start(); diff --git a/core/node/state_keeper/src/io/mod.rs b/core/node/state_keeper/src/io/mod.rs index 43364b0036ee..6cd6f818f401 100644 --- a/core/node/state_keeper/src/io/mod.rs +++ b/core/node/state_keeper/src/io/mod.rs @@ -1,6 +1,5 @@ use std::{fmt, time::Duration}; -use anyhow::Context as _; use async_trait::async_trait; use multivm::interface::{L1BatchEnv, SystemEnv}; use vm_utils::storage::l1_batch_params; @@ -102,9 +101,12 @@ impl L1BatchParams { /// it's used to receive volatile parameters (such as batch parameters) and sequence transactions /// providing L2 block and L1 batch boundaries for them. /// +/// Methods with `&self` receiver must be cancel-safe; i.e., they should not use interior mutability +/// to change the I/O state. Methods with `&mut self` receiver don't need to be cancel-safe. +/// /// All errors returned from this method are treated as unrecoverable. #[async_trait] -pub trait StateKeeperIO: 'static + Send + fmt::Debug + IoSealCriteria { +pub trait StateKeeperIO: 'static + Send + Sync + fmt::Debug + IoSealCriteria { /// Returns the ID of the L2 chain. This ID is supposed to be static. fn chain_id(&self) -> L2ChainId; @@ -138,52 +140,21 @@ pub trait StateKeeperIO: 'static + Send + fmt::Debug + IoSealCriteria { /// Loads base system contracts with the specified version. async fn load_base_system_contracts( - &mut self, + &self, protocol_version: ProtocolVersionId, cursor: &IoCursor, ) -> anyhow::Result; /// Loads protocol version of the specified L1 batch, which is guaranteed to exist in the storage. async fn load_batch_version_id( - &mut self, + &self, number: L1BatchNumber, ) -> anyhow::Result; /// Loads protocol upgrade tx for given version. async fn load_upgrade_tx( - &mut self, + &self, version_id: ProtocolVersionId, ) -> anyhow::Result>; /// Loads state hash for the L1 batch with the specified number. The batch is guaranteed to be present /// in the storage. - async fn load_batch_state_hash(&mut self, number: L1BatchNumber) -> anyhow::Result; -} - -impl dyn StateKeeperIO { - pub(super) async fn wait_for_new_batch_env( - &mut self, - cursor: &IoCursor, - max_wait: Duration, - ) -> anyhow::Result> { - let Some(params) = self.wait_for_new_batch_params(cursor, max_wait).await? else { - return Ok(None); - }; - let contracts = self - .load_base_system_contracts(params.protocol_version, cursor) - .await - .with_context(|| { - format!( - "failed loading system contracts for protocol version {:?}", - params.protocol_version - ) - })?; - let previous_batch_hash = self - .load_batch_state_hash(cursor.l1_batch - 1) - .await - .context("cannot load state hash for previous L1 batch")?; - Ok(Some(params.into_env( - self.chain_id(), - contracts, - cursor, - previous_batch_hash, - ))) - } + async fn load_batch_state_hash(&self, number: L1BatchNumber) -> anyhow::Result; } diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index b787b7a65bf2..6aee5bb0c1ea 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -15,7 +15,7 @@ use zksync_types::{ use super::{ batch_executor::{BatchExecutor, BatchExecutorHandle, TxExecutionResult}, - io::{IoCursor, L2BlockParams, OutputHandler, PendingBatchData, StateKeeperIO}, + io::{IoCursor, L1BatchParams, L2BlockParams, OutputHandler, PendingBatchData, StateKeeperIO}, metrics::{AGGREGATION_METRICS, KEEPER_METRICS, L1_BATCH_METRICS}, seal_criteria::{ConditionalSealer, SealData, SealResolution}, types::ExecutionMetricsForCriteria, @@ -274,23 +274,50 @@ impl ZkSyncStateKeeper { .with_context(|| format!("failed loading upgrade transaction for {protocol_version:?}")) } - async fn wait_for_new_batch_env( + async fn wait_for_new_batch_params( &mut self, cursor: &IoCursor, - ) -> Result<(SystemEnv, L1BatchEnv), Error> { + ) -> Result { while !self.is_canceled() { - if let Some(envs) = self + if let Some(params) = self .io - .wait_for_new_batch_env(cursor, POLL_WAIT_DURATION) - .await - .context("error waiting for new L1 batch environment")? + .wait_for_new_batch_params(cursor, POLL_WAIT_DURATION) + .await? { - return Ok(envs); + return Ok(params); } } Err(Error::Canceled) } + async fn wait_for_new_batch_env( + &mut self, + cursor: &IoCursor, + ) -> Result<(SystemEnv, L1BatchEnv), Error> { + // `io.wait_for_new_batch_params(..)` is not cancel-safe; once we get new batch params, we must hold onto them + // until we get the rest of parameters from I/O or receive a stop signal. + let params = self.wait_for_new_batch_params(cursor).await?; + let contracts = self + .io + .load_base_system_contracts(params.protocol_version, cursor) + .await + .with_context(|| { + format!( + "failed loading system contracts for protocol version {:?}", + params.protocol_version + ) + })?; + + // `select!` is safe to use here; `io.load_batch_state_hash(..)` is cancel-safe by contract + tokio::select! { + hash_result = self.io.load_batch_state_hash(cursor.l1_batch - 1) => { + let previous_batch_hash = hash_result.context("cannot load state hash for previous L1 batch")?; + Ok(params.into_env(self.io.chain_id(), contracts, cursor, previous_batch_hash)) + } + _ = self.stop_receiver.changed() => Err(Error::Canceled), + } + } + async fn wait_for_new_l2_block_params( &mut self, updates: &UpdatesManager, diff --git a/core/node/state_keeper/src/testonly/test_batch_executor.rs b/core/node/state_keeper/src/testonly/test_batch_executor.rs index 9934d9145767..39bc20a5d9fc 100644 --- a/core/node/state_keeper/src/testonly/test_batch_executor.rs +++ b/core/node/state_keeper/src/testonly/test_batch_executor.rs @@ -56,7 +56,7 @@ pub(crate) struct TestScenario { l2_block_seal_fn: Box, } -type SealFn = dyn FnMut(&UpdatesManager) -> bool + Send; +type SealFn = dyn FnMut(&UpdatesManager) -> bool + Send + Sync; impl fmt::Debug for TestScenario { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -176,7 +176,7 @@ impl TestScenario { pub(crate) fn seal_l1_batch_when(mut self, seal_fn: F) -> Self where - F: FnMut(&UpdatesManager) -> bool + Send + 'static, + F: FnMut(&UpdatesManager) -> bool + Send + Sync + 'static, { self.l1_batch_seal_fn = Box::new(seal_fn); self @@ -184,7 +184,7 @@ impl TestScenario { pub(crate) fn seal_l2_block_when(mut self, seal_fn: F) -> Self where - F: FnMut(&UpdatesManager) -> bool + Send + 'static, + F: FnMut(&UpdatesManager) -> bool + Send + Sync + 'static, { self.l2_block_seal_fn = Box::new(seal_fn); self @@ -675,6 +675,9 @@ impl StateKeeperIO for TestIO { l1_batch: self.batch_number, }; let pending_batch = self.pending_batch.take(); + if pending_batch.is_some() { + self.batch_number += 1; + } Ok((cursor, pending_batch)) } @@ -772,7 +775,7 @@ impl StateKeeperIO for TestIO { } async fn load_base_system_contracts( - &mut self, + &self, _protocol_version: ProtocolVersionId, _cursor: &IoCursor, ) -> anyhow::Result { @@ -780,23 +783,20 @@ impl StateKeeperIO for TestIO { } async fn load_batch_version_id( - &mut self, + &self, _number: L1BatchNumber, ) -> anyhow::Result { Ok(self.previous_batch_protocol_version) } async fn load_upgrade_tx( - &mut self, + &self, version_id: ProtocolVersionId, ) -> anyhow::Result> { Ok(self.protocol_upgrade_txs.get(&version_id).cloned()) } - async fn load_batch_state_hash( - &mut self, - _l1_batch_number: L1BatchNumber, - ) -> anyhow::Result { + async fn load_batch_state_hash(&self, _l1_batch_number: L1BatchNumber) -> anyhow::Result { Ok(H256::zero()) } }