diff --git a/Cargo.lock b/Cargo.lock index f04e00cf8a33..e460a317f99c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5540,6 +5540,15 @@ dependencies = [ "cc", ] +[[package]] +name = "secrecy" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bd1c54ea06cfd2f6b63219704de0b9b4f72dcc2b8fdef820be6cd799780e91e" +dependencies = [ + "zeroize", +] + [[package]] name = "security-framework" version = "2.9.2" @@ -8104,6 +8113,7 @@ version = "0.1.0" dependencies = [ "anyhow", "rand 0.8.5", + "secrecy", "serde", "url", "zksync_basic_types", @@ -8315,6 +8325,7 @@ dependencies = [ "prover_dal", "rand 0.8.5", "reqwest", + "secrecy", "serde", "serde_json", "serde_yaml", @@ -8884,6 +8895,7 @@ dependencies = [ "hex", "prost 0.12.1", "rand 0.8.5", + "secrecy", "serde_json", "serde_yaml", "zksync_basic_types", diff --git a/Cargo.toml b/Cargo.toml index d0112a46bbc0..6f08b0c0a360 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -133,6 +133,7 @@ rlp = "0.5" rocksdb = "0.21.0" rustc_version = "0.4.0" secp256k1 = { version = "0.27.0", features = ["recovery", "global-context"] } +secrecy = "0.8.0" semver = "1" sentry = "0.31" serde = "1" diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 8ef60a8fa9a0..058c13e0f750 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -289,7 +289,7 @@ async fn run_core( // but we only need to wait for stop signal once, and it will be propagated to all child contexts. let ctx = ctx::root(); scope::run!(&ctx, |ctx, s| async move { - s.spawn_bg(consensus::era::run_fetcher( + s.spawn_bg(consensus::era::run_en( ctx, cfg, pool, diff --git a/core/lib/config/Cargo.toml b/core/lib/config/Cargo.toml index 29c493d76cce..fe2c4287903e 100644 --- a/core/lib/config/Cargo.toml +++ b/core/lib/config/Cargo.toml @@ -16,5 +16,6 @@ zksync_consensus_utils.workspace = true anyhow.workspace = true rand.workspace = true +secrecy.workspace = true serde = { workspace = true, features = ["derive"] } url.workspace = true diff --git a/core/lib/config/src/configs/consensus.rs b/core/lib/config/src/configs/consensus.rs index 2033360e406a..c31d34941d2b 100644 --- a/core/lib/config/src/configs/consensus.rs +++ b/core/lib/config/src/configs/consensus.rs @@ -1,40 +1,70 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - fmt, -}; +use std::collections::{BTreeMap, BTreeSet}; -/// Public key of the validator (consensus participant) of the form "validator:public::" +use secrecy::{ExposeSecret as _, Secret}; +use zksync_basic_types::L2ChainId; + +/// `zksync_consensus_crypto::TextFmt` representation of `zksync_consensus_roles::validator::PublicKey`. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct ValidatorPublicKey(pub String); -// Secret key of the validator (consensus participant) of the form "validator:secret::" -#[derive(PartialEq)] -pub struct ValidatorSecretKey(pub String); +/// `zksync_consensus_crypto::TextFmt` representation of `zksync_consensus_roles::validator::SecretKey`. +#[derive(Debug, Clone)] +pub struct ValidatorSecretKey(pub Secret); -/// Public key of the node (gossip network participant) of the form "node:public::" +/// `zksync_consensus_crypto::TextFmt` representation of `zksync_consensus_roles::node::PublicKey`. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct NodePublicKey(pub String); -// Secret key of the node (gossip network participant) of the form "node:secret::" -#[derive(PartialEq)] -pub struct NodeSecretKey(pub String); +/// `zksync_consensus_crypto::TextFmt` representation of `zksync_consensus_roles::node::SecretKey`. +#[derive(Debug, Clone)] +pub struct NodeSecretKey(pub Secret); -impl fmt::Debug for ValidatorSecretKey { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.write_str("") +impl PartialEq for ValidatorSecretKey { + fn eq(&self, other: &Self) -> bool { + self.0.expose_secret().eq(other.0.expose_secret()) } } -impl fmt::Debug for NodeSecretKey { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.write_str("") +impl PartialEq for NodeSecretKey { + fn eq(&self, other: &Self) -> bool { + self.0.expose_secret().eq(other.0.expose_secret()) } } -/// Network address in the `:port` format. +/// Copy-paste of `zksync_consensus_roles::validator::WeightedValidator`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct WeightedValidator { + /// Validator key + pub key: ValidatorPublicKey, + /// Validator weight inside the Committee. + pub weight: u64, +} + +/// Copy-paste of `zksync_concurrency::net::Host`. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Host(pub String); +/// Copy-paste of `zksync_consensus_roles::validator::ProtocolVersion`. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct ProtocolVersion(pub u32); + +/// Consensus genesis specification. +/// It is a digest of the `validator::Genesis`, +/// which allows to initialize genesis (if not present) +/// decide whether a hard fork is necessary (if present). +#[derive(Clone, Debug, PartialEq)] +pub struct GenesisSpec { + /// Chain ID. + pub chain_id: L2ChainId, + /// Consensus protocol version. + pub protocol_version: ProtocolVersion, + /// The validator committee. Represents `zksync_consensus_roles::validator::Committee`. + pub validators: Vec, + /// Leader of the committee. Represents + /// `zksync_consensus_roles::validator::LeaderSelectionMode::Sticky`. + pub leader: ValidatorPublicKey, +} + /// Config (shared between main node and external node). #[derive(Clone, Debug, PartialEq)] pub struct ConsensusConfig { @@ -56,10 +86,15 @@ pub struct ConsensusConfig { /// Outbound gossip connections that the node should actively try to /// establish and maintain. pub gossip_static_outbound: BTreeMap, + + /// MAIN NODE ONLY: consensus genesis specification. + /// Used to (re)initialize genesis if needed. + /// External nodes fetch the genesis from the main node. + pub genesis_spec: Option, } /// Secrets need for consensus. -#[derive(Debug, PartialEq)] +#[derive(Debug, Clone, PartialEq)] pub struct ConsensusSecrets { pub validator_key: Option, pub node_key: Option, diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 77036cf8620b..13e8b142676d 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -705,6 +705,28 @@ impl Distribution for EncodeDist { } } +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> configs::consensus::WeightedValidator { + use configs::consensus::{ValidatorPublicKey, WeightedValidator}; + WeightedValidator { + key: ValidatorPublicKey(self.sample(rng)), + weight: self.sample(rng), + } + } +} + +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> configs::consensus::GenesisSpec { + use configs::consensus::{GenesisSpec, ProtocolVersion, ValidatorPublicKey}; + GenesisSpec { + chain_id: L2ChainId::default(), + protocol_version: ProtocolVersion(self.sample(rng)), + validators: self.sample_collect(rng), + leader: ValidatorPublicKey(self.sample(rng)), + } + } +} + impl Distribution for EncodeDist { fn sample(&self, rng: &mut R) -> configs::consensus::ConsensusConfig { use configs::consensus::{ConsensusConfig, Host, NodePublicKey}; @@ -721,6 +743,7 @@ impl Distribution for EncodeDist { .sample_range(rng) .map(|_| (NodePublicKey(self.sample(rng)), Host(self.sample(rng)))) .collect(), + genesis_spec: self.sample(rng), } } } @@ -729,8 +752,8 @@ impl Distribution for EncodeDist { fn sample(&self, rng: &mut R) -> configs::consensus::ConsensusSecrets { use configs::consensus::{ConsensusSecrets, NodeSecretKey, ValidatorSecretKey}; ConsensusSecrets { - validator_key: self.sample_opt(|| ValidatorSecretKey(self.sample(rng))), - node_key: self.sample_opt(|| NodeSecretKey(self.sample(rng))), + validator_key: self.sample_opt(|| ValidatorSecretKey(String::into(self.sample(rng)))), + node_key: self.sample_opt(|| NodeSecretKey(String::into(self.sample(rng)))), } } } diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs index 6dab5f0ae2f5..041bd5c39a81 100644 --- a/core/lib/dal/src/consensus_dal.rs +++ b/core/lib/dal/src/consensus_dal.rs @@ -47,6 +47,8 @@ impl ConsensusDal<'_, '_> { } /// Attempts to update the genesis. + /// Fails if the new genesis is invalid. + /// Fails if the new genesis has different `chain_id`. /// Fails if the storage contains a newer genesis (higher fork number). /// Noop if the new genesis is the same as the current one. /// Resets the stored consensus state otherwise and purges all certificates. @@ -57,12 +59,19 @@ impl ConsensusDal<'_, '_> { if &got == genesis { return Ok(()); } + anyhow::ensure!( + got.chain_id == genesis.chain_id, + "changing chain_id is not allowed: old = {:?}, new = {:?}", + got.chain_id, + genesis.chain_id, + ); anyhow::ensure!( got.fork_number < genesis.fork_number, "transition to a past fork is not allowed: old = {:?}, new = {:?}", got.fork_number, genesis.fork_number, ); + genesis.verify().context("genesis.verify()")?; } let genesis = zksync_protobuf::serde::serialize(genesis, serde_json::value::Serializer).unwrap(); @@ -144,7 +153,7 @@ impl ConsensusDal<'_, '_> { fork_number: old.fork_number.next(), first_block, - protocol_version: validator::ProtocolVersion::CURRENT, + protocol_version: old.protocol_version, committee: old.committee.clone(), leader_selection: old.leader_selection.clone(), } diff --git a/core/lib/protobuf_config/Cargo.toml b/core/lib/protobuf_config/Cargo.toml index bfc988659763..ee52d8d5472f 100644 --- a/core/lib/protobuf_config/Cargo.toml +++ b/core/lib/protobuf_config/Cargo.toml @@ -22,7 +22,8 @@ zksync_types.workspace = true anyhow.workspace = true prost.workspace = true rand.workspace = true -hex = "0.4.3" +hex.workspace = true +secrecy.workspace = true [build-dependencies] zksync_protobuf_build.workspace = true diff --git a/core/lib/protobuf_config/src/consensus.rs b/core/lib/protobuf_config/src/consensus.rs index 555cd1e64e0b..f4825e166ac0 100644 --- a/core/lib/protobuf_config/src/consensus.rs +++ b/core/lib/protobuf_config/src/consensus.rs @@ -1,10 +1,59 @@ use anyhow::Context as _; +use secrecy::ExposeSecret as _; +use zksync_basic_types::L2ChainId; use zksync_config::configs::consensus::{ - ConsensusConfig, ConsensusSecrets, Host, NodePublicKey, NodeSecretKey, ValidatorSecretKey, + ConsensusConfig, ConsensusSecrets, GenesisSpec, Host, NodePublicKey, NodeSecretKey, + ProtocolVersion, ValidatorPublicKey, ValidatorSecretKey, WeightedValidator, }; use zksync_protobuf::{repr::ProtoRepr, required}; -use crate::proto::consensus as proto; +use crate::{proto::consensus as proto, read_optional_repr}; + +impl ProtoRepr for proto::WeightedValidator { + type Type = WeightedValidator; + fn read(&self) -> anyhow::Result { + Ok(Self::Type { + key: ValidatorPublicKey(required(&self.key).context("key")?.clone()), + weight: *required(&self.weight).context("weight")?, + }) + } + fn build(this: &Self::Type) -> Self { + Self { + key: Some(this.key.0.clone()), + weight: Some(this.weight), + } + } +} + +impl ProtoRepr for proto::GenesisSpec { + type Type = GenesisSpec; + fn read(&self) -> anyhow::Result { + Ok(Self::Type { + chain_id: required(&self.chain_id) + .and_then(|x| L2ChainId::try_from(*x).map_err(|a| anyhow::anyhow!(a))) + .context("chain_id")?, + protocol_version: ProtocolVersion( + *required(&self.protocol_version).context("protocol_version")?, + ), + validators: self + .validators + .iter() + .enumerate() + .map(|(i, x)| x.read().context(i)) + .collect::>() + .context("validators")?, + leader: ValidatorPublicKey(required(&self.leader).context("leader")?.clone()), + }) + } + fn build(this: &Self::Type) -> Self { + Self { + chain_id: Some(this.chain_id.as_u64()), + protocol_version: Some(this.protocol_version.0), + validators: this.validators.iter().map(ProtoRepr::build).collect(), + leader: Some(this.leader.0.clone()), + } + } +} impl ProtoRepr for proto::Config { type Type = ConsensusConfig; @@ -36,6 +85,7 @@ impl ProtoRepr for proto::Config { .enumerate() .map(|(i, e)| read_addr(e).context(i)) .collect::>()?, + genesis_spec: read_optional_repr(&self.genesis_spec).context("genesis_spec")?, }) } @@ -60,6 +110,7 @@ impl ProtoRepr for proto::Config { addr: Some(x.1 .0.clone()), }) .collect(), + genesis_spec: this.genesis_spec.as_ref().map(ProtoRepr::build), } } } @@ -71,15 +122,21 @@ impl ProtoRepr for proto::Secrets { validator_key: self .validator_key .as_ref() - .map(|x| ValidatorSecretKey(x.clone())), - node_key: self.node_key.as_ref().map(|x| NodeSecretKey(x.clone())), + .map(|x| ValidatorSecretKey(x.clone().into())), + node_key: self + .node_key + .as_ref() + .map(|x| NodeSecretKey(x.clone().into())), }) } fn build(this: &Self::Type) -> Self { Self { - validator_key: this.validator_key.as_ref().map(|x| x.0.clone()), - node_key: this.node_key.as_ref().map(|x| x.0.clone()), + validator_key: this + .validator_key + .as_ref() + .map(|x| x.0.expose_secret().clone()), + node_key: this.node_key.as_ref().map(|x| x.0.expose_secret().clone()), } } } diff --git a/core/lib/protobuf_config/src/proto/core/consensus.proto b/core/lib/protobuf_config/src/proto/core/consensus.proto index 6bc1ba023eea..120ce56ce5af 100644 --- a/core/lib/protobuf_config/src/proto/core/consensus.proto +++ b/core/lib/protobuf_config/src/proto/core/consensus.proto @@ -35,6 +35,20 @@ message NodeAddr { optional string addr = 2; // required; IpAddr } +// Weighted member of a validator committee. +message WeightedValidator { + optional string key = 1; // required; ValidatorPublicKey + optional uint64 weight = 2; // required +} + +// Consensus genesis specification. +message GenesisSpec { + optional uint64 chain_id = 1; // required; L2ChainId, should be the same as `l2_chain_id` in the `zksync.config.genesis.Genesis`. + optional uint32 protocol_version = 2; // required; validator::ProtocolVersion + repeated WeightedValidator validators = 3; // must be non-empty; validator committee. + optional string leader = 4; // required; ValidatorPublicKey +} + message Config { reserved 3; reserved "validators"; @@ -60,6 +74,11 @@ message Config { // Outbound gossip network connections that the node should actively try to // establish and maintain. repeated NodeAddr gossip_static_outbound = 7; + + // MAIN NODE ONLY: consensus genesis specification. + // Used to (re)initialize genesis if needed. + // External nodes fetch the genesis from the main node. + optional GenesisSpec genesis_spec = 8; } message Secrets { diff --git a/core/lib/protobuf_config/src/tests.rs b/core/lib/protobuf_config/src/tests.rs index 0db119e718bd..c19af354a06c 100644 --- a/core/lib/protobuf_config/src/tests.rs +++ b/core/lib/protobuf_config/src/tests.rs @@ -20,6 +20,8 @@ fn test_encoding() { test_encode_all_formats::>(rng); test_encode_all_formats::>(rng); test_encode_all_formats::>(rng); + test_encode_all_formats::>(rng); + test_encode_all_formats::>(rng); test_encode_all_formats::>(rng); test_encode_all_formats::>(rng); test_encode_all_formats::>(rng); diff --git a/core/lib/zksync_core/Cargo.toml b/core/lib/zksync_core/Cargo.toml index 2648e9319dd7..9b8ff351e815 100644 --- a/core/lib/zksync_core/Cargo.toml +++ b/core/lib/zksync_core/Cargo.toml @@ -63,6 +63,7 @@ zksync_consensus_utils.workspace = true zksync_protobuf.workspace = true prost.workspace = true +secrecy.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true serde_yaml.workspace = true diff --git a/core/lib/zksync_core/src/consensus/config.rs b/core/lib/zksync_core/src/consensus/config.rs index 998a56a554a0..b0dfd3fbfef6 100644 --- a/core/lib/zksync_core/src/consensus/config.rs +++ b/core/lib/zksync_core/src/consensus/config.rs @@ -2,47 +2,82 @@ use std::collections::HashMap; use anyhow::Context as _; +use secrecy::{ExposeSecret as _, Secret}; use zksync_concurrency::net; -use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets, Host, NodePublicKey}; +use zksync_config::{ + configs, + configs::consensus::{ConsensusConfig, ConsensusSecrets, Host, NodePublicKey}, +}; use zksync_consensus_crypto::{Text, TextFmt}; use zksync_consensus_executor as executor; use zksync_consensus_roles::{node, validator}; -use zksync_types::L2ChainId; -use crate::consensus::{fetcher::P2PConfig, MainNodeConfig}; - -fn read_secret_text(text: Option<&String>) -> anyhow::Result { - Text::new(text.context("missing")?) - .decode() +fn read_secret_text(text: Option<&Secret>) -> anyhow::Result> { + text.map(|text| Text::new(text.expose_secret()).decode()) + .transpose() .map_err(|_| anyhow::format_err!("invalid format")) } -fn validator_key(secrets: &ConsensusSecrets) -> anyhow::Result { +pub(super) fn validator_key( + secrets: &ConsensusSecrets, +) -> anyhow::Result> { read_secret_text(secrets.validator_key.as_ref().map(|x| &x.0)) } -fn node_key(secrets: &ConsensusSecrets) -> anyhow::Result { - read_secret_text(secrets.node_key.as_ref().map(|x| &x.0)) +/// Consensus genesis specification. +/// It is a digest of the `validator::Genesis`, +/// which allows to initialize genesis (if not present) +/// decide whether a hard fork is necessary (if present). +#[derive(Debug, PartialEq)] +pub(super) struct GenesisSpec { + pub(super) chain_id: validator::ChainId, + pub(super) protocol_version: validator::ProtocolVersion, + pub(super) validators: validator::Committee, + pub(super) leader_selection: validator::LeaderSelectionMode, } -/// Constructs a main node config from raw config. -pub fn main_node( - cfg: &ConsensusConfig, - secrets: &ConsensusSecrets, - chain_id: L2ChainId, -) -> anyhow::Result { - Ok(MainNodeConfig { - executor: executor(cfg, secrets)?, - validator_key: validator_key(secrets).context("validator_key")?, - chain_id: validator::ChainId(chain_id.as_u64()), - }) +impl GenesisSpec { + pub(super) fn from_genesis(g: &validator::Genesis) -> Self { + Self { + chain_id: g.chain_id, + protocol_version: g.protocol_version, + validators: g.committee.clone(), + leader_selection: g.leader_selection.clone(), + } + } + + pub(super) fn parse(x: &configs::consensus::GenesisSpec) -> anyhow::Result { + let validators: Vec<_> = x + .validators + .iter() + .enumerate() + .map(|(i, v)| { + Ok(validator::WeightedValidator { + key: Text::new(&v.key.0).decode().context("key").context(i)?, + weight: v.weight, + }) + }) + .collect::>() + .context("validators")?; + Ok(Self { + chain_id: validator::ChainId(x.chain_id.as_u64()), + protocol_version: validator::ProtocolVersion(x.protocol_version.0), + leader_selection: validator::LeaderSelectionMode::Sticky( + Text::new(&x.leader.0).decode().context("leader")?, + ), + validators: validator::Committee::new(validators).context("validators")?, + }) + } } -pub(super) fn p2p(cfg: &ConsensusConfig, secrets: &ConsensusSecrets) -> anyhow::Result { - executor(cfg, secrets) +pub(super) fn node_key(secrets: &ConsensusSecrets) -> anyhow::Result> { + read_secret_text(secrets.node_key.as_ref().map(|x| &x.0)) } -fn executor(cfg: &ConsensusConfig, secrets: &ConsensusSecrets) -> anyhow::Result { +pub(super) fn executor( + cfg: &ConsensusConfig, + secrets: &ConsensusSecrets, +) -> anyhow::Result { let mut gossip_static_outbound = HashMap::new(); { let mut append = |key: &NodePublicKey, addr: &Host| { @@ -60,7 +95,9 @@ fn executor(cfg: &ConsensusConfig, secrets: &ConsensusSecrets) -> anyhow::Result server_addr: cfg.server_addr, public_addr: net::Host(cfg.public_addr.0.clone()), max_payload_size: cfg.max_payload_size, - node_key: node_key(secrets).context("node_key")?, + node_key: node_key(secrets) + .context("node_key")? + .context("missing node_key")?, gossip_dynamic_inbound_limit: cfg.gossip_dynamic_inbound_limit, gossip_static_inbound: cfg .gossip_static_inbound diff --git a/core/lib/zksync_core/src/consensus/fetcher.rs b/core/lib/zksync_core/src/consensus/en.rs similarity index 78% rename from core/lib/zksync_core/src/consensus/fetcher.rs rename to core/lib/zksync_core/src/consensus/en.rs index dcea154d04fe..adf246c34a15 100644 --- a/core/lib/zksync_core/src/consensus/fetcher.rs +++ b/core/lib/zksync_core/src/consensus/en.rs @@ -2,33 +2,37 @@ use anyhow::Context as _; use zksync_concurrency::{ctx, error::Wrap as _, scope, time}; use zksync_consensus_executor as executor; use zksync_consensus_roles::validator; +use zksync_consensus_storage::BlockStore; use zksync_types::L2BlockNumber; use zksync_web3_decl::client::{DynClient, L2}; +use super::{config, storage::Store, ConnectionPool, ConsensusConfig, ConsensusSecrets}; use crate::{ - consensus::{storage, Store}, + consensus::storage, sync_layer::{ fetcher::FetchedBlock, sync_action::ActionQueueSender, MainNodeClient, SyncState, }, }; -pub type P2PConfig = executor::Config; - -/// L2 block fetcher. -pub struct Fetcher { - pub store: Store, - pub sync_state: SyncState, - pub client: Box>, +/// External node. +pub(super) struct EN { + pub(super) pool: ConnectionPool, + pub(super) sync_state: SyncState, + pub(super) client: Box>, } -impl Fetcher { - /// Task fetching L2 blocks using peer-to-peer gossip network. - /// NOTE: it still uses main node json RPC in some cases for now. - pub async fn run_p2p( +impl EN { + /// Task running a consensus node for the external node. + /// It may be a validator, but it cannot be a leader (cannot propose blocks). + /// + /// NOTE: Before starting the consensus node if fetches all the blocks + /// older than consensus genesis from the main node using json RPC. + pub async fn run( self, ctx: &ctx::Ctx, actions: ActionQueueSender, - p2p: P2PConfig, + cfg: ConsensusConfig, + secrets: ConsensusSecrets, ) -> anyhow::Result<()> { let res: ctx::Result<()> = scope::run!(ctx, |ctx, s| async { // Update sync state in the background. @@ -36,12 +40,12 @@ impl Fetcher { // Initialize genesis. let genesis = self.fetch_genesis(ctx).await.wrap("fetch_genesis()")?; - let mut conn = self.store.access(ctx).await.wrap("access()")?; + let mut conn = self.pool.connection(ctx).await.wrap("connection()")?; conn.try_update_genesis(ctx, &genesis) .await .wrap("set_genesis()")?; let mut payload_queue = conn - .new_payload_queue(ctx, actions) + .new_payload_queue(ctx, actions, self.sync_state.clone()) .await .wrap("new_payload_queue()")?; drop(conn); @@ -67,17 +71,24 @@ impl Fetcher { }); // Run consensus component. - let (block_store, runner) = self - .store - .clone() - .into_block_store(ctx, Some(payload_queue)) + let (store, runner) = Store::new(ctx, self.pool.clone(), Some(payload_queue)) + .await + .wrap("Store::new()")?; + s.spawn_bg(async { Ok(runner.run(ctx).await?) }); + let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())) .await - .wrap("into_block_store()")?; + .wrap("BlockStore::new()")?; s.spawn_bg(async { Ok(runner.run(ctx).await?) }); let executor = executor::Executor { - config: p2p.clone(), + config: config::executor(&cfg, &secrets)?, block_store, - validator: None, + validator: config::validator_key(&secrets) + .context("validator_key")? + .map(|key| executor::Validator { + key, + replica_store: Box::new(store.clone()), + payload_manager: Box::new(store.clone()), + }), }; executor.run(ctx).await?; Ok(()) @@ -90,7 +101,7 @@ impl Fetcher { } /// Task fetching L2 blocks using JSON-RPC endpoint of the main node. - pub async fn run_centralized( + pub async fn run_fetcher( self, ctx: &ctx::Ctx, actions: ActionQueueSender, @@ -99,11 +110,11 @@ impl Fetcher { // Update sync state in the background. s.spawn_bg(self.fetch_state_loop(ctx)); let mut payload_queue = self - .store - .access(ctx) + .pool + .connection(ctx) .await - .wrap("access()")? - .new_payload_queue(ctx, actions) + .wrap("connection()")? + .new_payload_queue(ctx, actions, self.sync_state.clone()) .await .wrap("new_fetcher_cursor()")?; self.fetch_blocks(ctx, &mut payload_queue, None).await @@ -193,7 +204,7 @@ impl Fetcher { .await?; // If fetched anything, wait for the last block to be stored persistently. if first < queue.next() { - self.store + self.pool .wait_for_payload(ctx, queue.next().prev().unwrap()) .await?; } diff --git a/core/lib/zksync_core/src/consensus/era.rs b/core/lib/zksync_core/src/consensus/era.rs index fc6910ad4531..f042fef2fad7 100644 --- a/core/lib/zksync_core/src/consensus/era.rs +++ b/core/lib/zksync_core/src/consensus/era.rs @@ -6,22 +6,23 @@ use zksync_concurrency::ctx; use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets}; -use zksync_dal::{ConnectionPool, Core}; +use zksync_dal::Core; use zksync_web3_decl::client::{DynClient, L2}; -use super::{config, fetcher::Fetcher, storage::Store}; +use super::{en, storage::ConnectionPool}; use crate::sync_layer::{sync_action::ActionQueueSender, SyncState}; /// Runs the consensus task in the main node mode. pub async fn run_main_node( ctx: &ctx::Ctx, - cfg: super::MainNodeConfig, - pool: ConnectionPool, + cfg: ConsensusConfig, + secrets: ConsensusSecrets, + pool: zksync_dal::ConnectionPool, ) -> anyhow::Result<()> { // Consensus is a new component. // For now in case of error we just log it and allow the server // to continue running. - if let Err(err) = cfg.run(ctx, Store(pool)).await { + if let Err(err) = super::run_main_node(ctx, cfg, secrets, ConnectionPool(pool)).await { tracing::error!(%err, "Consensus actor failed"); } else { tracing::info!("Consensus actor stopped"); @@ -29,28 +30,25 @@ pub async fn run_main_node( Ok(()) } -/// Runs the consensus in the fetcher mode (e.g. for the external node needs). -/// The fetcher implementation may either be p2p or centralized. -pub async fn run_fetcher( +/// Runs the consensus node for the external node. +/// If `cfg` is `None`, it will just fetch blocks from the main node +/// using JSON RPC, without starting the consensus node. +pub async fn run_en( ctx: &ctx::Ctx, cfg: Option<(ConsensusConfig, ConsensusSecrets)>, - pool: ConnectionPool, + pool: zksync_dal::ConnectionPool, sync_state: SyncState, main_node_client: Box>, actions: ActionQueueSender, ) -> anyhow::Result<()> { - let fetcher = Fetcher { - store: Store(pool), + let en = en::EN { + pool: ConnectionPool(pool), sync_state: sync_state.clone(), client: main_node_client, }; let res = match cfg { - Some((cfg, secrets)) => { - fetcher - .run_p2p(ctx, actions, config::p2p(&cfg, &secrets)?) - .await - } - None => fetcher.run_centralized(ctx, actions).await, + Some((cfg, secrets)) => en.run(ctx, actions, cfg, secrets).await, + None => en.run_fetcher(ctx, actions).await, }; tracing::info!("Consensus actor stopped"); res diff --git a/core/lib/zksync_core/src/consensus/mod.rs b/core/lib/zksync_core/src/consensus/mod.rs index 449532fdc5ef..35bee505ec46 100644 --- a/core/lib/zksync_core/src/consensus/mod.rs +++ b/core/lib/zksync_core/src/consensus/mod.rs @@ -2,56 +2,67 @@ #![allow(clippy::redundant_locals)] #![allow(clippy::needless_pass_by_ref_mut)] - +use anyhow::Context as _; +use storage::{ConnectionPool, Store}; use zksync_concurrency::{ctx, error::Wrap as _, scope}; +use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets}; use zksync_consensus_executor as executor; use zksync_consensus_roles::validator; +use zksync_consensus_storage::BlockStore; -pub use self::{fetcher::*, storage::Store}; - -pub mod config; +mod config; +mod en; pub mod era; -mod fetcher; mod storage; #[cfg(test)] pub(crate) mod testonly; #[cfg(test)] mod tests; -/// Main node consensus config. -#[derive(Debug, Clone)] -pub struct MainNodeConfig { - pub executor: executor::Config, - pub validator_key: validator::SecretKey, - pub chain_id: validator::ChainId, -} - -impl MainNodeConfig { - /// Task generating consensus certificates for the L2 blocks generated by `StateKeeper`. - /// Broadcasts the blocks with certificates to gossip network peers. - pub async fn run(self, ctx: &ctx::Ctx, store: Store) -> anyhow::Result<()> { - scope::run!(&ctx, |ctx, s| async { - store - .try_init_genesis(ctx, self.chain_id, &self.validator_key.public()) +/// Task running a consensus validator for the main node. +/// Main node is currently the only leader of the consensus - i.e. it proposes all the +/// L2 blocks (generated by `Statekeeper`). +async fn run_main_node( + ctx: &ctx::Ctx, + cfg: ConsensusConfig, + secrets: ConsensusSecrets, + pool: ConnectionPool, +) -> anyhow::Result<()> { + let validator_key = config::validator_key(&secrets) + .context("validator_key")? + .context("missing validator_key")?; + scope::run!(&ctx, |ctx, s| async { + if let Some(spec) = &cfg.genesis_spec { + let spec = config::GenesisSpec::parse(spec).context("GenesisSpec::parse()")?; + pool.connection(ctx) .await - .wrap("block_store.try_init_genesis()")?; - let (block_store, runner) = store - .clone() - .into_block_store(ctx, None) + .wrap("connection()")? + .adjust_genesis(ctx, &spec) .await - .wrap("into_block_store()")?; - s.spawn_bg(runner.run(ctx)); - let executor = executor::Executor { - config: self.executor, - block_store, - validator: Some(executor::Validator { - key: self.validator_key, - replica_store: Box::new(store.clone()), - payload_manager: Box::new(store.clone()), - }), - }; - executor.run(ctx).await - }) - .await - } + .wrap("adjust_genesis()")?; + } + let (store, runner) = Store::new(ctx, pool, None).await.wrap("Store::new()")?; + s.spawn_bg(runner.run(ctx)); + let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())) + .await + .wrap("BlockStore::new()")?; + s.spawn_bg(runner.run(ctx)); + anyhow::ensure!( + block_store.genesis().leader_selection + == validator::LeaderSelectionMode::Sticky(validator_key.public()), + "unsupported leader selection mode - main node has to be the leader" + ); + + let executor = executor::Executor { + config: config::executor(&cfg, &secrets)?, + block_store, + validator: Some(executor::Validator { + key: validator_key, + replica_store: Box::new(store.clone()), + payload_manager: Box::new(store.clone()), + }), + }; + executor.run(ctx).await + }) + .await } diff --git a/core/lib/zksync_core/src/consensus/storage/mod.rs b/core/lib/zksync_core/src/consensus/storage/mod.rs index 0083481deed5..f63d09cbdd0d 100644 --- a/core/lib/zksync_core/src/consensus/storage/mod.rs +++ b/core/lib/zksync_core/src/consensus/storage/mod.rs @@ -1,26 +1,64 @@ //! Storage implementation based on DAL. - use std::sync::Arc; use anyhow::Context as _; -use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time}; +use zksync_concurrency::{ctx, error::Wrap as _, sync, time}; use zksync_consensus_bft::PayloadManager; use zksync_consensus_roles::validator; use zksync_consensus_storage as storage; -use zksync_dal::{consensus_dal::Payload, ConnectionPool, Core, CoreDal, DalError}; +use zksync_dal::{consensus_dal::Payload, Core, CoreDal, DalError}; use zksync_types::L2BlockNumber; -#[cfg(test)] -mod testonly; - +use super::config; use crate::{ state_keeper::io::common::IoCursor, sync_layer::{ fetcher::{FetchedBlock, FetchedTransaction}, sync_action::ActionQueueSender, + SyncState, }, }; +#[cfg(test)] +mod testonly; + +/// Context-aware `zksync_dal::ConnectionPool` wrapper. +#[derive(Debug, Clone)] +pub(super) struct ConnectionPool(pub(super) zksync_dal::ConnectionPool); + +impl ConnectionPool { + /// Wrapper for `connection_tagged()`. + pub(super) async fn connection<'a>(&'a self, ctx: &ctx::Ctx) -> ctx::Result> { + Ok(Connection( + ctx.wait(self.0.connection_tagged("consensus")) + .await? + .map_err(DalError::generalize)?, + )) + } + + /// Waits for the `number` L2 block. + pub async fn wait_for_payload( + &self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result { + const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); + loop { + if let Some(payload) = self + .connection(ctx) + .await + .wrap("connection()")? + .payload(ctx, number) + .await + .wrap("payload()")? + { + return Ok(payload); + } + ctx.sleep(POLL_INTERVAL).await?; + } + } +} + /// Context-aware `zksync_dal::Connection` wrapper. pub(super) struct Connection<'a>(pub(super) zksync_dal::Connection<'a, Core>); @@ -135,10 +173,12 @@ impl<'a> Connection<'a> { &mut self, ctx: &ctx::Ctx, actions: ActionQueueSender, + sync_state: SyncState, ) -> ctx::Result { Ok(PayloadQueue { inner: ctx.wait(IoCursor::for_fetcher(&mut self.0)).await??, actions, + sync_state, }) } @@ -158,99 +198,23 @@ impl<'a> Connection<'a> { .wait(self.0.consensus_dal().try_update_genesis(genesis)) .await??) } -} - -#[derive(Debug)] -pub(super) struct PayloadQueue { - inner: IoCursor, - actions: ActionQueueSender, -} - -impl PayloadQueue { - pub(super) fn next(&self) -> validator::BlockNumber { - validator::BlockNumber(self.inner.next_l2_block.0.into()) - } - - /// Converts the block into actions and pushes them to the actions queue. - /// Does nothing and returns Ok() if the block has been already processed. - /// Returns an error if a block with an earlier block number was expected. - pub(super) async fn send(&mut self, block: FetchedBlock) -> anyhow::Result<()> { - let want = self.inner.next_l2_block; - // Some blocks are missing. - if block.number > want { - anyhow::bail!("expected {want:?}, got {:?}", block.number); - } - // Block already processed. - if block.number < want { - return Ok(()); - } - self.actions.push_actions(self.inner.advance(block)).await; - Ok(()) - } -} - -/// Wrapper of `ConnectionPool` implementing `ReplicaStore` and `PayloadManager`. -#[derive(Clone, Debug)] -pub struct Store(pub ConnectionPool); - -impl Store { - /// Wrapper for `connection_tagged()`. - pub(super) async fn access<'a>(&'a self, ctx: &ctx::Ctx) -> ctx::Result> { - Ok(Connection( - ctx.wait(self.0.connection_tagged("consensus")) - .await? - .map_err(DalError::generalize)?, - )) - } - - /// Waits for the `number` L2 block. - pub async fn wait_for_payload( - &self, - ctx: &ctx::Ctx, - number: validator::BlockNumber, - ) -> ctx::Result { - const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); - loop { - if let Some(payload) = self - .access(ctx) - .await - .wrap("access()")? - .payload(ctx, number) - .await - .wrap("payload()")? - { - return Ok(payload); - } - ctx.sleep(POLL_INTERVAL).await?; - } - } - - pub(super) async fn genesis(&self, ctx: &ctx::Ctx) -> ctx::Result { - Ok(self - .access(ctx) - .await - .wrap("access()")? - .genesis(ctx) - .await - .wrap("genesis()")? - .context("genesis is missing")?) - } /// Fetches and verifies consistency of certificates in storage. - async fn certificates_range(&self, ctx: &ctx::Ctx) -> ctx::Result { - let mut conn = self.access(ctx).await.wrap("access()")?; - + async fn certificates_range( + &mut self, + ctx: &ctx::Ctx, + ) -> ctx::Result { // Fetch the range of L2 blocks in storage. - let block_range = conn.block_range(ctx).await.context("block_range")?; + let block_range = self.block_range(ctx).await.context("block_range")?; // Fetch the range of certificates in storage. - let genesis = conn + let genesis = self .genesis(ctx) .await .wrap("genesis()")? .context("genesis missing")?; let first_expected_cert = genesis.first_block.max(block_range.start); - let last_cert = conn + let last_cert = self .last_certificate(ctx) .await .wrap("last_certificate()")?; @@ -259,7 +223,7 @@ impl Store { .map_or(first_expected_cert, |cert| cert.header().number.next()); // Check that the first certificate in storage has the expected L2 block number. - if let Some(got) = conn + if let Some(got) = self .first_certificate(ctx) .await .wrap("first_certificate()")? @@ -283,69 +247,36 @@ impl Store { }) } - pub(super) async fn block( - &self, - ctx: &ctx::Ctx, - number: validator::BlockNumber, - ) -> ctx::Result> { - let conn = &mut self.access(ctx).await.wrap("access()")?; - let Some(justification) = conn.certificate(ctx, number).await.wrap("certificate()")? else { - return Ok(None); - }; - let payload = conn - .payload(ctx, number) - .await - .wrap("payload()")? - .context("L2 block disappeared from storage")?; - Ok(Some(validator::FinalBlock { - payload: payload.encode(), - justification, - })) - } - - /// Initializes consensus genesis (with 1 validator) to start at the last L2 block in storage. - /// No-op if db already contains a genesis. - pub(super) async fn try_init_genesis( - &self, + /// (Re)initializes consensus genesis to start at the last L2 block in storage. + /// Noop if `spec` matches the current genesis. + pub(super) async fn adjust_genesis( + &mut self, ctx: &ctx::Ctx, - chain_id: validator::ChainId, - validator_key: &validator::PublicKey, + spec: &config::GenesisSpec, ) -> ctx::Result<()> { - let mut conn = self.access(ctx).await.wrap("access()")?; - let block_range = conn.block_range(ctx).await.wrap("block_range()")?; - let mut txn = conn + let block_range = self.block_range(ctx).await.wrap("block_range()")?; + let mut txn = self .start_transaction(ctx) .await .wrap("start_transaction()")?; - // `Committee::new()` with a single validator should never fail. - let committee = validator::Committee::new([validator::WeightedValidator { - key: validator_key.clone(), - weight: 1, - }]) - .unwrap(); - let leader_selection = validator::LeaderSelectionMode::Sticky(validator_key.clone()); let old = txn.genesis(ctx).await.wrap("genesis()")?; - // Check if the current config of the main node is compatible with the stored genesis. - if old.as_ref().map_or(false, |old| { - old.chain_id == chain_id - && old.protocol_version == validator::ProtocolVersion::CURRENT - && old.committee == committee - && old.leader_selection == leader_selection - }) { - return Ok(()); + if let Some(old) = &old { + if &config::GenesisSpec::from_genesis(old) == spec { + // Hard fork is not needed. + return Ok(()); + } } - // If not, perform a hard fork. tracing::info!("Performing a hard fork of consensus."); let genesis = validator::GenesisRaw { - chain_id, + chain_id: spec.chain_id, fork_number: old .as_ref() .map_or(validator::ForkNumber(0), |old| old.fork_number.next()), first_block: block_range.end, - protocol_version: validator::ProtocolVersion::CURRENT, - committee, - leader_selection, + protocol_version: spec.protocol_version, + committee: spec.validators.clone(), + leader_selection: spec.leader_selection.clone(), } .with_hash(); txn.try_update_genesis(ctx, &genesis) @@ -355,76 +286,155 @@ impl Store { Ok(()) } - pub(super) async fn into_block_store( - self, + pub(super) async fn block( + &mut self, ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result> { + let Some(justification) = self.certificate(ctx, number).await.wrap("certificate()")? else { + return Ok(None); + }; + let payload = self + .payload(ctx, number) + .await + .wrap("payload()")? + .context("L2 block disappeared from storage")?; + Ok(Some(validator::FinalBlock { + payload: payload.encode(), + justification, + })) + } +} + +#[derive(Debug)] +pub(super) struct PayloadQueue { + inner: IoCursor, + actions: ActionQueueSender, + sync_state: SyncState, +} + +impl PayloadQueue { + pub(super) fn next(&self) -> validator::BlockNumber { + validator::BlockNumber(self.inner.next_l2_block.0.into()) + } + + /// Advances the cursor by converting the block into actions and pushing them + /// to the actions queue. + /// Does nothing and returns Ok() if the block has been already processed. + /// Returns an error if a block with an earlier block number was expected. + pub(super) async fn send(&mut self, block: FetchedBlock) -> anyhow::Result<()> { + let want = self.inner.next_l2_block; + // Some blocks are missing. + if block.number > want { + anyhow::bail!("expected {want:?}, got {:?}", block.number); + } + // Block already processed. + if block.number < want { + return Ok(()); + } + self.actions.push_actions(self.inner.advance(block)).await; + Ok(()) + } +} + +fn to_fetched_block( + number: validator::BlockNumber, + payload: &validator::Payload, +) -> anyhow::Result { + let number = L2BlockNumber( + number + .0 + .try_into() + .context("Integer overflow converting block number")?, + ); + let payload = Payload::decode(payload).context("Payload::decode()")?; + Ok(FetchedBlock { + number, + l1_batch_number: payload.l1_batch_number, + last_in_batch: payload.last_in_batch, + protocol_version: payload.protocol_version, + timestamp: payload.timestamp, + reference_hash: Some(payload.hash), + l1_gas_price: payload.l1_gas_price, + l2_fair_gas_price: payload.l2_fair_gas_price, + fair_pubdata_price: payload.fair_pubdata_price, + virtual_blocks: payload.virtual_blocks, + operator_address: payload.operator_address, + transactions: payload + .transactions + .into_iter() + .map(FetchedTransaction::new) + .collect(), + }) +} + +/// Wrapper of `ConnectionPool` implementing `ReplicaStore`, `PayloadManager` and +/// `PersistentBlockStore`. +#[derive(Clone, Debug)] +pub(super) struct Store { + pub(super) pool: ConnectionPool, + payloads: Arc>>, + certificates: ctx::channel::UnboundedSender, + persisted: sync::watch::Receiver, +} + +/// Background task of the `Store`. +pub struct StoreRunner { + pool: ConnectionPool, + persisted: sync::watch::Sender, + certificates: ctx::channel::UnboundedReceiver, +} + +impl Store { + pub(super) async fn new( + ctx: &ctx::Ctx, + pool: ConnectionPool, payload_queue: Option, - ) -> ctx::Result<(Arc, BlockStoreRunner)> { - let persisted = self + ) -> ctx::Result<(Store, StoreRunner)> { + let persisted = pool + .connection(ctx) + .await + .wrap("connection()")? .certificates_range(ctx) .await .wrap("certificates_range()")?; let persisted = sync::watch::channel(persisted).0; let (certs_send, certs_recv) = ctx::channel::unbounded(); - let (block_store, runner) = storage::BlockStore::new( - ctx, - Box::new(BlockStore { - inner: self.clone(), + Ok(( + Store { + pool: pool.clone(), certificates: certs_send, - payloads: payload_queue.map(sync::Mutex::new), + payloads: Arc::new(sync::Mutex::new(payload_queue)), persisted: persisted.subscribe(), - }), - ) - .await?; - Ok(( - block_store, - BlockStoreRunner { - store: self, + }, + StoreRunner { + pool, persisted, certificates: certs_recv, - inner: runner, }, )) } } -/// Wrapper of `ConnectionPool` implementing `PersistentBlockStore`. -#[derive(Debug)] -struct BlockStore { - inner: Store, - payloads: Option>, - certificates: ctx::channel::UnboundedSender, - persisted: sync::watch::Receiver, -} - -/// Background task of the `BlockStore`. -pub struct BlockStoreRunner { - store: Store, - persisted: sync::watch::Sender, - certificates: ctx::channel::UnboundedReceiver, - inner: storage::BlockStoreRunner, -} - -impl BlockStoreRunner { +impl StoreRunner { pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - let res = scope::run!(ctx, |ctx, s| async { - s.spawn(async { Ok(self.inner.run(ctx).await?) }); + let res = async { loop { let cert = self.certificates.recv(ctx).await?; - self.store + self.pool .wait_for_payload(ctx, cert.header().number) .await .wrap("wait_for_payload()")?; - self.store - .access(ctx) + self.pool + .connection(ctx) .await - .wrap("access()")? + .wrap("connection()")? .insert_certificate(ctx, &cert) .await .wrap("insert_certificate()")?; self.persisted.send_modify(|p| p.last = Some(cert)); } - }) + } .await; match res { Err(ctx::Error::Canceled(_)) | Ok(()) => Ok(()), @@ -434,9 +444,16 @@ impl BlockStoreRunner { } #[async_trait::async_trait] -impl storage::PersistentBlockStore for BlockStore { +impl storage::PersistentBlockStore for Store { async fn genesis(&self, ctx: &ctx::Ctx) -> ctx::Result { - self.inner.genesis(ctx).await + Ok(self + .pool + .connection(ctx) + .await + .wrap("connection")? + .genesis(ctx) + .await? + .context("not found")?) } fn persisted(&self) -> sync::watch::Receiver { @@ -448,7 +465,14 @@ impl storage::PersistentBlockStore for BlockStore { ctx: &ctx::Ctx, number: validator::BlockNumber, ) -> ctx::Result { - Ok(self.inner.block(ctx, number).await?.context("not found")?) + Ok(self + .pool + .connection(ctx) + .await + .wrap("connection")? + .block(ctx, number) + .await? + .context("not found")?) } /// If actions queue is set (and the block has not been stored yet), @@ -464,35 +488,12 @@ impl storage::PersistentBlockStore for BlockStore { ctx: &ctx::Ctx, block: validator::FinalBlock, ) -> ctx::Result<()> { - if let Some(payloads) = &self.payloads { - let mut payloads = sync::lock(ctx, payloads).await?.into_async(); - let number = L2BlockNumber( - block - .number() - .0 - .try_into() - .context("Integer overflow converting block number")?, - ); - let payload = Payload::decode(&block.payload).context("Payload::decode()")?; - let block = FetchedBlock { - number, - l1_batch_number: payload.l1_batch_number, - last_in_batch: payload.last_in_batch, - protocol_version: payload.protocol_version, - timestamp: payload.timestamp, - reference_hash: Some(payload.hash), - l1_gas_price: payload.l1_gas_price, - l2_fair_gas_price: payload.l2_fair_gas_price, - fair_pubdata_price: payload.fair_pubdata_price, - virtual_blocks: payload.virtual_blocks, - operator_address: payload.operator_address, - transactions: payload - .transactions - .into_iter() - .map(FetchedTransaction::new) - .collect(), - }; - payloads.send(block).await.context("payload_queue.send()")?; + let mut payloads = sync::lock(ctx, &self.payloads).await?.into_async(); + if let Some(payloads) = &mut *payloads { + payloads + .send(to_fetched_block(block.number(), &block.payload).context("to_fetched_block")?) + .await + .context("payload_queue.send()")?; } self.certificates.send(block.justification); Ok(()) @@ -502,18 +503,20 @@ impl storage::PersistentBlockStore for BlockStore { #[async_trait::async_trait] impl storage::ReplicaStore for Store { async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result { - self.access(ctx) + self.pool + .connection(ctx) .await - .wrap("access()")? + .wrap("connection()")? .replica_state(ctx) .await .wrap("replica_state()") } async fn set_state(&self, ctx: &ctx::Ctx, state: &storage::ReplicaState) -> ctx::Result<()> { - self.access(ctx) + self.pool + .connection(ctx) .await - .wrap("access()")? + .wrap("connection()")? .set_replica_state(ctx, state) .await .wrap("set_replica_state()") @@ -529,8 +532,7 @@ impl PayloadManager for Store { block_number: validator::BlockNumber, ) -> ctx::Result { const LARGE_PAYLOAD_SIZE: usize = 1 << 20; - tracing::info!("proposing block {block_number}"); - let payload = self.wait_for_payload(ctx, block_number).await?; + let payload = self.pool.wait_for_payload(ctx, block_number).await?; let encoded_payload = payload.encode(); if encoded_payload.0.len() > LARGE_PAYLOAD_SIZE { tracing::warn!( @@ -539,28 +541,44 @@ impl PayloadManager for Store { payload.transactions.len() ); } - tracing::info!("proposing block {block_number} DONE"); Ok(encoded_payload) } /// Verify that `payload` is a correct proposal for the block `block_number`. - /// Currently, (for the main node) it is implemented as checking whether the received payload - /// matches the L2 block in the db. + /// * for the main node it checks whether the same block is already present in storage. + /// * for the EN validator + /// * if the block with this number was already applied, it checks that it was the + /// same block. It should always be true, because main node is the only proposer and + /// to propose a different block a hard fork is needed. + /// * otherwise, EN attempts to apply the received block. If the block was incorrect + /// the statekeeper is expected to crash the whole EN. Otherwise OK is returned. async fn verify( &self, ctx: &ctx::Ctx, block_number: validator::BlockNumber, payload: &validator::Payload, ) -> ctx::Result<()> { - tracing::info!("verifying block {block_number}"); - let got = Payload::decode(payload).context("Payload::decode(got)")?; - let want = self.wait_for_payload(ctx, block_number).await?; - if got != want { - return Err( - anyhow::format_err!("unexpected payload: got {got:?} want {want:?}").into(), - ); + let mut payloads = sync::lock(ctx, &self.payloads).await?.into_async(); + if let Some(payloads) = &mut *payloads { + let block = to_fetched_block(block_number, payload).context("to_fetched_block")?; + let n = block.number; + payloads.send(block).await.context("payload_queue.send()")?; + // Wait for the block to be processed, without waiting for it to be stored. + // TODO(BFT-459): this is not ideal, because we don't check here whether the + // processed block is the same as `payload`. It will work correctly + // with the current implementation of EN, but we should make it more + // precise when block reverting support is implemented. + ctx.wait(payloads.sync_state.wait_for_local_block(n)) + .await?; + } else { + let want = self.pool.wait_for_payload(ctx, block_number).await?; + let got = Payload::decode(payload).context("Payload::decode(got)")?; + if got != want { + return Err( + anyhow::format_err!("unexpected payload: got {got:?} want {want:?}").into(), + ); + } } - tracing::info!("verifying block {block_number} DONE"); Ok(()) } } diff --git a/core/lib/zksync_core/src/consensus/storage/testonly.rs b/core/lib/zksync_core/src/consensus/storage/testonly.rs index beae80632cc0..48feba61e15d 100644 --- a/core/lib/zksync_core/src/consensus/storage/testonly.rs +++ b/core/lib/zksync_core/src/consensus/storage/testonly.rs @@ -3,13 +3,12 @@ use anyhow::Context as _; use zksync_concurrency::{ctx, error::Wrap as _, time}; use zksync_consensus_roles::validator; -use zksync_dal::ConnectionPool; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; use zksync_node_test_utils::{recover, snapshot, Snapshot}; -use super::Store; +use super::ConnectionPool; -impl Store { +impl ConnectionPool { /// Waits for the `number` L2 block to have a certificate. pub async fn wait_for_certificate( &self, @@ -18,9 +17,9 @@ impl Store { ) -> ctx::Result<()> { const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(100); while self - .access(ctx) + .connection(ctx) .await - .wrap("access()")? + .wrap("connection()")? .certificate(ctx, number) .await .wrap("certificate()")? @@ -33,13 +32,13 @@ impl Store { /// Takes a storage snapshot at the last sealed L1 batch. pub(crate) async fn snapshot(&self, ctx: &ctx::Ctx) -> ctx::Result { - let mut conn = self.access(ctx).await.wrap("access()")?; + let mut conn = self.connection(ctx).await.wrap("connection()")?; Ok(ctx.wait(snapshot(&mut conn.0)).await?) } /// Constructs a new db initialized with genesis state. pub(crate) async fn from_genesis() -> Self { - let pool = ConnectionPool::test_pool().await; + let pool = zksync_dal::ConnectionPool::test_pool().await; { let mut storage = pool.connection().await.unwrap(); insert_genesis_batch(&mut storage, &GenesisParams::mock()) @@ -51,7 +50,7 @@ impl Store { /// Recovers storage from a snapshot. pub(crate) async fn from_snapshot(snapshot: Snapshot) -> Self { - let pool = ConnectionPool::test_pool().await; + let pool = zksync_dal::ConnectionPool::test_pool().await; { let mut storage = pool.connection().await.unwrap(); recover(&mut storage, snapshot).await; @@ -66,7 +65,7 @@ impl Store { want_last: validator::BlockNumber, ) -> ctx::Result> { self.wait_for_certificate(ctx, want_last).await?; - let mut conn = self.access(ctx).await.wrap("access()")?; + let mut conn = self.connection(ctx).await.wrap("connection()")?; let last_cert = conn .last_certificate(ctx) .await @@ -81,7 +80,7 @@ impl Store { let mut blocks: Vec = vec![]; for i in first_cert.header().number.0..=last_cert.header().number.0 { let i = validator::BlockNumber(i); - let block = self.block(ctx, i).await.context("block()")?.unwrap(); + let block = conn.block(ctx, i).await.context("block()")?.unwrap(); blocks.push(block); } Ok(blocks) @@ -94,7 +93,14 @@ impl Store { want_last: validator::BlockNumber, ) -> ctx::Result> { let blocks = self.wait_for_certificates(ctx, want_last).await?; - let genesis = self.genesis(ctx).await.wrap("genesis()")?; + let genesis = self + .connection(ctx) + .await + .wrap("connection()")? + .genesis(ctx) + .await + .wrap("genesis()")? + .context("genesis is missing")?; for block in &blocks { block.verify(&genesis).context(block.number())?; } diff --git a/core/lib/zksync_core/src/consensus/testonly.rs b/core/lib/zksync_core/src/consensus/testonly.rs index e4831255052d..62e770b3444a 100644 --- a/core/lib/zksync_core/src/consensus/testonly.rs +++ b/core/lib/zksync_core/src/consensus/testonly.rs @@ -4,8 +4,10 @@ use std::{collections::HashMap, sync::Arc}; use anyhow::Context as _; use rand::Rng; -use zksync_concurrency::{ctx, error::Wrap as _, scope, sync}; -use zksync_config::{configs, GenesisConfig}; +use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time}; +use zksync_config::{configs, configs::consensus as config, GenesisConfig}; +use zksync_consensus_crypto::TextFmt as _; +use zksync_consensus_network as network; use zksync_consensus_roles::validator; use zksync_contracts::BaseSystemContractsHashes; use zksync_dal::{CoreDal, DalError}; @@ -22,7 +24,7 @@ use zksync_web3_decl::{ use crate::{ api_server::web3::{state::InternalApiConfig, tests::spawn_http_server}, - consensus::{fetcher::P2PConfig, Fetcher, Store}, + consensus::{en, ConnectionPool}, state_keeper::{ io::{IoCursor, L1BatchParams, L2BlockParams}, seal_criteria::NoopSealer, @@ -158,22 +160,66 @@ pub(super) struct StateKeeper { gas_per_pubdata: u64, actions_sender: ActionQueueSender, + sync_state: SyncState, addr: sync::watch::Receiver>, - store: Store, + pool: ConnectionPool, +} + +pub(super) fn config(cfg: &network::Config) -> (config::ConsensusConfig, config::ConsensusSecrets) { + ( + config::ConsensusConfig { + server_addr: *cfg.server_addr, + public_addr: config::Host(cfg.public_addr.0.clone()), + max_payload_size: usize::MAX, + gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit, + gossip_static_inbound: cfg + .gossip + .static_inbound + .iter() + .map(|k| config::NodePublicKey(k.encode())) + .collect(), + gossip_static_outbound: cfg + .gossip + .static_outbound + .iter() + .map(|(k, v)| (config::NodePublicKey(k.encode()), config::Host(v.0.clone()))) + .collect(), + genesis_spec: cfg.validator_key.as_ref().map(|key| config::GenesisSpec { + chain_id: L2ChainId::default(), + protocol_version: config::ProtocolVersion(validator::ProtocolVersion::CURRENT.0), + validators: vec![config::WeightedValidator { + key: config::ValidatorPublicKey(key.public().encode()), + weight: 1, + }], + leader: config::ValidatorPublicKey(key.public().encode()), + }), + }, + config::ConsensusSecrets { + node_key: Some(config::NodeSecretKey(cfg.gossip.key.encode().into())), + validator_key: cfg + .validator_key + .as_ref() + .map(|k| config::ValidatorSecretKey(k.encode().into())), + }, + ) } /// Fake StateKeeper task to be executed in the background. pub(super) struct StateKeeperRunner { actions_queue: ActionQueue, - store: Store, + sync_state: SyncState, + pool: ConnectionPool, addr: sync::watch::Sender>, } impl StateKeeper { /// Constructs and initializes a new `StateKeeper`. /// Caller has to run `StateKeeperRunner.run()` task in the background. - pub async fn new(ctx: &ctx::Ctx, store: Store) -> ctx::Result<(Self, StateKeeperRunner)> { - let mut conn = store.access(ctx).await.wrap("access()")?; + pub async fn new( + ctx: &ctx::Ctx, + pool: ConnectionPool, + ) -> ctx::Result<(Self, StateKeeperRunner)> { + let mut conn = pool.connection(ctx).await.wrap("connection()")?; let cursor = ctx .wait(IoCursor::for_fetcher(&mut conn.0)) .await? @@ -184,6 +230,7 @@ impl StateKeeper { .context("pending_batch_exists()")?; let (actions_sender, actions_queue) = ActionQueue::new(); let addr = sync::watch::channel(None).0; + let sync_state = SyncState::default(); Ok(( Self { last_batch: cursor.l1_batch, @@ -193,12 +240,14 @@ impl StateKeeper { fee_per_gas: 10, gas_per_pubdata: 100, actions_sender, + sync_state: sync_state.clone(), addr: addr.subscribe(), - store: store.clone(), + pool: pool.clone(), }, StateKeeperRunner { actions_queue, - store: store.clone(), + sync_state, + pool: pool.clone(), addr, }, )) @@ -285,43 +334,57 @@ impl StateKeeper { let client = Client::http(format!("http://{addr}/").parse().context("url")?) .context("json_rpc()")? .build(); - Ok(Box::new(client)) + let client: Box> = Box::new(client); + // Wait until the server is actually available. + loop { + let res = ctx.wait(client.fetch_l2_block_number()).await?; + match res { + Ok(_) => return Ok(client), + Err(err) if err.is_transient() => { + ctx.sleep(time::Duration::seconds(5)).await?; + } + Err(err) => { + return Err(anyhow::format_err!("{err}").into()); + } + } + } } /// Runs the centralized fetcher. - pub async fn run_centralized_fetcher( + pub async fn run_fetcher( self, ctx: &ctx::Ctx, client: Box>, ) -> anyhow::Result<()> { - Fetcher { - store: self.store, + en::EN { + pool: self.pool, client, - sync_state: SyncState::default(), + sync_state: self.sync_state.clone(), } - .run_centralized(ctx, self.actions_sender) + .run_fetcher(ctx, self.actions_sender) .await } - /// Runs the p2p fetcher. - pub async fn run_p2p_fetcher( + /// Runs consensus node for the external node. + pub async fn run_consensus( self, ctx: &ctx::Ctx, client: Box>, - cfg: P2PConfig, + cfg: &network::Config, ) -> anyhow::Result<()> { - Fetcher { - store: self.store, + let (cfg, secrets) = config(cfg); + en::EN { + pool: self.pool, client, - sync_state: SyncState::default(), + sync_state: self.sync_state.clone(), } - .run_p2p(ctx, self.actions_sender, cfg) + .run(ctx, self.actions_sender, cfg, secrets) .await } } -async fn calculate_mock_metadata(ctx: &ctx::Ctx, store: &Store) -> ctx::Result<()> { - let mut conn = store.access(ctx).await.wrap("access()")?; +async fn calculate_mock_metadata(ctx: &ctx::Ctx, pool: &ConnectionPool) -> ctx::Result<()> { + let mut conn = pool.connection(ctx).await.wrap("connection()")?; let Some(last) = ctx .wait(conn.0.blocks_dal().get_sealed_l1_batch_number()) .await? @@ -361,10 +424,10 @@ impl StateKeeperRunner { let res = scope::run!(ctx, |ctx, s| async { let (stop_send, stop_recv) = sync::watch::channel(false); let (persistence, l2_block_sealer) = - StateKeeperPersistence::new(self.store.0.clone(), Address::repeat_byte(11), 5); + StateKeeperPersistence::new(self.pool.0.clone(), Address::repeat_byte(11), 5); let io = ExternalIO::new( - self.store.0.clone(), + self.pool.0.clone(), self.actions_queue, Box::::default(), L2ChainId::default(), @@ -378,7 +441,7 @@ impl StateKeeperRunner { }); s.spawn_bg::<()>(async { loop { - calculate_mock_metadata(ctx, &self.store).await?; + calculate_mock_metadata(ctx, &self.pool).await?; // Sleep real time. ctx.wait(tokio::time::sleep(tokio::time::Duration::from_millis(100))) .await?; @@ -391,7 +454,8 @@ impl StateKeeperRunner { stop_recv, Box::new(io), Box::new(MockBatchExecutor), - OutputHandler::new(Box::new(persistence.with_tx_insertion())), + OutputHandler::new(Box::new(persistence.with_tx_insertion())) + .with_handler(Box::new(self.sync_state.clone())), Arc::new(NoopSealer), ) .run() @@ -409,7 +473,7 @@ impl StateKeeperRunner { ); let mut server = spawn_http_server( cfg, - self.store.0.clone(), + self.pool.0.clone(), Default::default(), Arc::default(), stop_recv, diff --git a/core/lib/zksync_core/src/consensus/tests.rs b/core/lib/zksync_core/src/consensus/tests.rs index c250fac35d26..6ed65161362f 100644 --- a/core/lib/zksync_core/src/consensus/tests.rs +++ b/core/lib/zksync_core/src/consensus/tests.rs @@ -2,23 +2,25 @@ use anyhow::Context as _; use test_casing::test_casing; use tracing::Instrument as _; use zksync_concurrency::{ctx, scope}; -use zksync_consensus_executor as executor; -use zksync_consensus_network as network; +use zksync_config::configs::consensus::{ValidatorPublicKey, WeightedValidator}; +use zksync_consensus_crypto::TextFmt as _; use zksync_consensus_network::testonly::{new_configs, new_fullnode}; -use zksync_consensus_roles::validator::testonly::{Setup, SetupSpec}; +use zksync_consensus_roles::{ + validator, + validator::testonly::{Setup, SetupSpec}, +}; use zksync_node_test_utils::Snapshot; use zksync_types::{L1BatchNumber, L2BlockNumber}; use super::*; -const CHAIN_ID: validator::ChainId = validator::ChainId(1337); - -async fn new_store(from_snapshot: bool) -> Store { +async fn new_pool(from_snapshot: bool) -> ConnectionPool { match from_snapshot { true => { - Store::from_snapshot(Snapshot::make(L1BatchNumber(23), L2BlockNumber(87), &[])).await + ConnectionPool::from_snapshot(Snapshot::make(L1BatchNumber(23), L2BlockNumber(87), &[])) + .await } - false => Store::from_genesis().await, + false => ConnectionPool::from_genesis().await, } } @@ -27,20 +29,20 @@ async fn test_validator_block_store() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - let store = new_store(false).await; + let pool = new_pool(false).await; // Fill storage with unsigned L2 blocks. // Fetch a suffix of blocks that we will generate (fake) certs for. let want = scope::run!(ctx, |ctx, s| async { // Start state keeper. - let (mut sk, runner) = testonly::StateKeeper::new(ctx, store.clone()).await?; + let (mut sk, runner) = testonly::StateKeeper::new(ctx, pool.clone()).await?; s.spawn_bg(runner.run(ctx)); sk.push_random_blocks(rng, 10).await; - store.wait_for_payload(ctx, sk.last_block()).await?; + pool.wait_for_payload(ctx, sk.last_block()).await?; let mut setup = SetupSpec::new(rng, 3); setup.first_block = validator::BlockNumber(4); let mut setup = Setup::from(setup); - let mut conn = store.access(ctx).await.wrap("access()")?; + let mut conn = pool.connection(ctx).await.wrap("connection()")?; conn.try_update_genesis(ctx, &setup.genesis) .await .wrap("try_update_genesis()")?; @@ -62,14 +64,17 @@ async fn test_validator_block_store() { // Insert blocks one by one and check the storage state. for (i, block) in want.iter().enumerate() { scope::run!(ctx, |ctx, s| async { - let (block_store, runner) = store.clone().into_block_store(ctx, None).await.unwrap(); + let (store, runner) = Store::new(ctx, pool.clone(), None).await.unwrap(); + s.spawn_bg(runner.run(ctx)); + let (block_store, runner) = + BlockStore::new(ctx, Box::new(store.clone())).await.unwrap(); s.spawn_bg(runner.run(ctx)); block_store.queue_block(ctx, block.clone()).await.unwrap(); block_store .wait_until_persisted(ctx, block.number()) .await .unwrap(); - let got = store + let got = pool .wait_for_certificates(ctx, block.number()) .await .unwrap(); @@ -81,18 +86,6 @@ async fn test_validator_block_store() { } } -fn executor_config(cfg: &network::Config) -> executor::Config { - executor::Config { - server_addr: *cfg.server_addr, - public_addr: cfg.public_addr.clone(), - max_payload_size: usize::MAX, - node_key: cfg.gossip.key.clone(), - gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit, - gossip_static_inbound: cfg.gossip.static_inbound.clone(), - gossip_static_outbound: cfg.gossip.static_outbound.clone(), - } -} - // In the current implementation, consensus certificates are created asynchronously // for the L2 blocks constructed by the StateKeeper. This means that consensus actor // is effectively just back filling the consensus certificates for the L2 blocks in storage. @@ -107,13 +100,13 @@ async fn test_validator(from_snapshot: bool) { scope::run!(ctx, |ctx, s| async { tracing::info!("Start state keeper."); - let store = new_store(from_snapshot).await; - let (mut sk, runner) = testonly::StateKeeper::new(ctx, store.clone()).await?; + let pool = new_pool(from_snapshot).await; + let (mut sk, runner) = testonly::StateKeeper::new(ctx, pool.clone()).await?; s.spawn_bg(runner.run(ctx)); tracing::info!("Populate storage with a bunch of blocks."); sk.push_random_blocks(rng, 5).await; - store + pool .wait_for_payload(ctx, sk.last_block()) .await .context("sk.wait_for_payload(<1st phase>)")?; @@ -124,16 +117,12 @@ async fn test_validator(from_snapshot: bool) { scope::run!(ctx, |ctx, s| async { tracing::info!("Start consensus actor"); // In the first iteration it will initialize genesis. - let cfg = MainNodeConfig { - executor: executor_config(&cfgs[0]), - validator_key: setup.keys[0].clone(), - chain_id: CHAIN_ID, - }; - s.spawn_bg(cfg.run(ctx, store.clone())); + let (cfg,secrets) = testonly::config(&cfgs[0]); + s.spawn_bg(run_main_node(ctx, cfg, secrets, pool.clone())); tracing::info!("Generate couple more blocks and wait for consensus to catch up."); sk.push_random_blocks(rng, 3).await; - store + pool .wait_for_certificate(ctx, sk.last_block()) .await .context("wait_for_certificate(<2nd phase>)")?; @@ -141,14 +130,14 @@ async fn test_validator(from_snapshot: bool) { tracing::info!("Synchronously produce blocks one by one, and wait for consensus."); for _ in 0..2 { sk.push_random_blocks(rng, 1).await; - store + pool .wait_for_certificate(ctx, sk.last_block()) .await .context("wait_for_certificate(<3rd phase>)")?; } tracing::info!("Verify all certificates"); - store + pool .wait_for_certificates_and_verify(ctx, sk.last_block()) .await .context("wait_for_certificates_and_verify()")?; @@ -174,58 +163,60 @@ async fn test_nodes_from_various_snapshots() { scope::run!(ctx, |ctx, s| async { tracing::info!("spawn validator"); - let validator_store = Store::from_genesis().await; + let validator_pool = ConnectionPool::from_genesis().await; let (mut validator, runner) = - testonly::StateKeeper::new(ctx, validator_store.clone()).await?; + testonly::StateKeeper::new(ctx, validator_pool.clone()).await?; s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("validator"))); - let cfg = MainNodeConfig { - executor: executor_config(&validator_cfg), - validator_key: setup.keys[0].clone(), - chain_id: CHAIN_ID, - }; - s.spawn_bg(cfg.run(ctx, validator_store.clone())); + let (cfg, secrets) = testonly::config(&validator_cfg); + s.spawn_bg(run_main_node(ctx, cfg, secrets, validator_pool.clone())); tracing::info!("produce some batches"); validator.push_random_blocks(rng, 5).await; validator.seal_batch().await; - validator_store + validator_pool .wait_for_certificate(ctx, validator.last_block()) .await?; tracing::info!("take snapshot and start a node from it"); - let snapshot = validator_store.snapshot(ctx).await?; - let node_store = Store::from_snapshot(snapshot).await; - let (node, runner) = testonly::StateKeeper::new(ctx, node_store.clone()).await?; + let snapshot = validator_pool.snapshot(ctx).await?; + let node_pool = ConnectionPool::from_snapshot(snapshot).await; + let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?; s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node1"))); - let node_cfg = executor_config(&new_fullnode(rng, &validator_cfg)); - s.spawn_bg(node.run_p2p_fetcher(ctx, validator.connect(ctx).await?, node_cfg)); + let conn = validator.connect(ctx).await?; + s.spawn_bg(async { + let cfg = new_fullnode(&mut ctx.rng(), &validator_cfg); + node.run_consensus(ctx, conn, &cfg).await + }); tracing::info!("produce more batches"); validator.push_random_blocks(rng, 5).await; validator.seal_batch().await; - node_store + node_pool .wait_for_certificate(ctx, validator.last_block()) .await?; tracing::info!("take another snapshot and start a node from it"); - let snapshot = validator_store.snapshot(ctx).await?; - let node_store2 = Store::from_snapshot(snapshot).await; - let (node, runner) = testonly::StateKeeper::new(ctx, node_store2.clone()).await?; + let snapshot = validator_pool.snapshot(ctx).await?; + let node_pool2 = ConnectionPool::from_snapshot(snapshot).await; + let (node, runner) = testonly::StateKeeper::new(ctx, node_pool2.clone()).await?; s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node2"))); - let node_cfg = executor_config(&new_fullnode(rng, &validator_cfg)); - s.spawn_bg(node.run_p2p_fetcher(ctx, validator.connect(ctx).await?, node_cfg)); + let conn = validator.connect(ctx).await?; + s.spawn_bg(async { + let cfg = new_fullnode(&mut ctx.rng(), &validator_cfg); + node.run_consensus(ctx, conn, &cfg).await + }); tracing::info!("produce more blocks and compare storages"); validator.push_random_blocks(rng, 5).await; - let want = validator_store + let want = validator_pool .wait_for_certificates_and_verify(ctx, validator.last_block()) .await?; // node stores should be suffixes for validator store. for got in [ - node_store + node_pool .wait_for_certificates_and_verify(ctx, validator.last_block()) .await?, - node_store2 + node_pool2 .wait_for_certificates_and_verify(ctx, validator.last_block()) .await?, ] { @@ -263,9 +254,9 @@ async fn test_full_nodes(from_snapshot: bool) { // Run validator and fetchers in parallel. scope::run!(ctx, |ctx, s| async { - let validator_store = new_store(from_snapshot).await; + let validator_pool = new_pool(from_snapshot).await; let (mut validator, runner) = - testonly::StateKeeper::new(ctx, validator_store.clone()).await?; + testonly::StateKeeper::new(ctx, validator_pool.clone()).await?; s.spawn_bg(async { runner .run(ctx) @@ -277,26 +268,22 @@ async fn test_full_nodes(from_snapshot: bool) { validator.push_random_blocks(rng, 5).await; // API server needs at least 1 L1 batch to start. validator.seal_batch().await; - validator_store + validator_pool .wait_for_payload(ctx, validator.last_block()) .await .unwrap(); tracing::info!("Run validator."); - let cfg = MainNodeConfig { - executor: executor_config(&validator_cfgs[0]), - validator_key: setup.keys[0].clone(), - chain_id: CHAIN_ID, - }; - s.spawn_bg(cfg.run(ctx, validator_store.clone())); + let (cfg, secrets) = testonly::config(&validator_cfgs[0]); + s.spawn_bg(run_main_node(ctx, cfg, secrets, validator_pool.clone())); tracing::info!("Run nodes."); - let mut node_stores = vec![]; + let mut node_pools = vec![]; for (i, cfg) in node_cfgs.iter().enumerate() { let i = ctx::NoCopy(i); - let store = new_store(from_snapshot).await; - let (node, runner) = testonly::StateKeeper::new(ctx, store.clone()).await?; - node_stores.push(store.clone()); + let pool = new_pool(from_snapshot).await; + let (node, runner) = testonly::StateKeeper::new(ctx, pool.clone()).await?; + node_pools.push(pool.clone()); s.spawn_bg(async { let i = i; runner @@ -305,25 +292,106 @@ async fn test_full_nodes(from_snapshot: bool) { .await .with_context(|| format!("node{}", *i)) }); - s.spawn_bg(node.run_p2p_fetcher( - ctx, - validator.connect(ctx).await?, - executor_config(cfg), - )); + s.spawn_bg(node.run_consensus(ctx, validator.connect(ctx).await?, cfg)); } tracing::info!("Make validator produce blocks and wait for fetchers to get them."); // Note that block from before and after genesis have to be fetched. validator.push_random_blocks(rng, 5).await; let want_last = validator.last_block(); - let want = validator_store + let want = validator_pool + .wait_for_certificates_and_verify(ctx, want_last) + .await?; + for pool in &node_pools { + assert_eq!( + want, + pool.wait_for_certificates_and_verify(ctx, want_last) + .await? + ); + } + Ok(()) + }) + .await + .unwrap(); +} + +// Test running external node (non-leader) validators. +#[test_casing(2, [false, true])] +#[tokio::test(flavor = "multi_thread")] +async fn test_en_validators(from_snapshot: bool) { + const NODES: usize = 3; + + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.)); + let rng = &mut ctx.rng(); + let setup = Setup::new(rng, NODES); + let cfgs = new_configs(rng, &setup, 1); + + // Run all nodes in parallel. + scope::run!(ctx, |ctx, s| async { + let main_node_pool = new_pool(from_snapshot).await; + let (mut main_node, runner) = + testonly::StateKeeper::new(ctx, main_node_pool.clone()).await?; + s.spawn_bg(async { + runner + .run(ctx) + .instrument(tracing::info_span!("main_node")) + .await + .context("main_node") + }); + tracing::info!("Generate a couple of blocks, before initializing consensus genesis."); + main_node.push_random_blocks(rng, 5).await; + // API server needs at least 1 L1 batch to start. + main_node.seal_batch().await; + main_node_pool + .wait_for_payload(ctx, main_node.last_block()) + .await + .unwrap(); + + tracing::info!("wait until the API server is actually available"); + // as otherwise waiting for view synchronization will take a while. + main_node.connect(ctx).await?; + + tracing::info!("Run main node with all nodes being validators."); + let (mut cfg, secrets) = testonly::config(&cfgs[0]); + cfg.genesis_spec.as_mut().unwrap().validators = setup + .keys + .iter() + .map(|k| WeightedValidator { + key: ValidatorPublicKey(k.public().encode()), + weight: 1, + }) + .collect(); + s.spawn_bg(run_main_node(ctx, cfg, secrets, main_node_pool.clone())); + + tracing::info!("Run external nodes."); + let mut ext_node_pools = vec![]; + for (i, cfg) in cfgs[1..].iter().enumerate() { + let i = ctx::NoCopy(i); + let pool = new_pool(from_snapshot).await; + let (ext_node, runner) = testonly::StateKeeper::new(ctx, pool.clone()).await?; + ext_node_pools.push(pool.clone()); + s.spawn_bg(async { + let i = i; + runner + .run(ctx) + .instrument(tracing::info_span!("en", i = *i)) + .await + .with_context(|| format!("en{}", *i)) + }); + s.spawn_bg(ext_node.run_consensus(ctx, main_node.connect(ctx).await?, cfg)); + } + + tracing::info!("Make the main node produce blocks and wait for consensus to finalize them"); + main_node.push_random_blocks(rng, 5).await; + let want_last = main_node.last_block(); + let want = main_node_pool .wait_for_certificates_and_verify(ctx, want_last) .await?; - for store in &node_stores { + for pool in &ext_node_pools { assert_eq!( want, - store - .wait_for_certificates_and_verify(ctx, want_last) + pool.wait_for_certificates_and_verify(ctx, want_last) .await? ); } @@ -342,35 +410,29 @@ async fn test_p2p_fetcher_backfill_certs(from_snapshot: bool) { let rng = &mut ctx.rng(); let setup = Setup::new(rng, 1); let validator_cfg = new_configs(rng, &setup, 0)[0].clone(); - let node_cfg = executor_config(&new_fullnode(rng, &validator_cfg)); + let node_cfg = new_fullnode(rng, &validator_cfg); scope::run!(ctx, |ctx, s| async { tracing::info!("Spawn validator."); - let validator_store = new_store(from_snapshot).await; + let validator_pool = new_pool(from_snapshot).await; let (mut validator, runner) = - testonly::StateKeeper::new(ctx, validator_store.clone()).await?; + testonly::StateKeeper::new(ctx, validator_pool.clone()).await?; s.spawn_bg(runner.run(ctx)); - s.spawn_bg( - MainNodeConfig { - executor: executor_config(&validator_cfg), - validator_key: setup.keys[0].clone(), - chain_id: CHAIN_ID, - } - .run(ctx, validator_store.clone()), - ); + let (cfg, secrets) = testonly::config(&validator_cfg); + s.spawn_bg(run_main_node(ctx, cfg, secrets, validator_pool.clone())); // API server needs at least 1 L1 batch to start. validator.seal_batch().await; let client = validator.connect(ctx).await?; - let node_store = new_store(from_snapshot).await; + let node_pool = new_pool(from_snapshot).await; tracing::info!("Run p2p fetcher."); scope::run!(ctx, |ctx, s| async { - let (node, runner) = testonly::StateKeeper::new(ctx, node_store.clone()).await?; + let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?; s.spawn_bg(runner.run(ctx)); - s.spawn_bg(node.run_p2p_fetcher(ctx, client.clone(), node_cfg.clone())); + s.spawn_bg(node.run_consensus(ctx, client.clone(), &node_cfg)); validator.push_random_blocks(rng, 3).await; - node_store + node_pool .wait_for_certificate(ctx, validator.last_block()) .await?; Ok(()) @@ -380,11 +442,11 @@ async fn test_p2p_fetcher_backfill_certs(from_snapshot: bool) { tracing::info!("Run centralized fetcher."); scope::run!(ctx, |ctx, s| async { - let (node, runner) = testonly::StateKeeper::new(ctx, node_store.clone()).await?; + let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?; s.spawn_bg(runner.run(ctx)); - s.spawn_bg(node.run_centralized_fetcher(ctx, client.clone())); + s.spawn_bg(node.run_fetcher(ctx, client.clone())); validator.push_random_blocks(rng, 3).await; - node_store + node_pool .wait_for_payload(ctx, validator.last_block()) .await?; Ok(()) @@ -394,14 +456,14 @@ async fn test_p2p_fetcher_backfill_certs(from_snapshot: bool) { tracing::info!("Run p2p fetcher again."); scope::run!(ctx, |ctx, s| async { - let (node, runner) = testonly::StateKeeper::new(ctx, node_store.clone()).await?; + let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?; s.spawn_bg(runner.run(ctx)); - s.spawn_bg(node.run_p2p_fetcher(ctx, client.clone(), node_cfg.clone())); + s.spawn_bg(node.run_consensus(ctx, client.clone(), &node_cfg)); validator.push_random_blocks(rng, 3).await; - let want = validator_store + let want = validator_pool .wait_for_certificates_and_verify(ctx, validator.last_block()) .await?; - let got = node_store + let got = node_pool .wait_for_certificates_and_verify(ctx, validator.last_block()) .await?; assert_eq!(want, got); @@ -424,9 +486,9 @@ async fn test_centralized_fetcher(from_snapshot: bool) { scope::run!(ctx, |ctx, s| async { tracing::info!("Spawn a validator."); - let validator_store = new_store(from_snapshot).await; + let validator_pool = new_pool(from_snapshot).await; let (mut validator, runner) = - testonly::StateKeeper::new(ctx, validator_store.clone()).await?; + testonly::StateKeeper::new(ctx, validator_pool.clone()).await?; s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("validator"))); tracing::info!("Produce a batch (to make api server start)"); @@ -434,17 +496,17 @@ async fn test_centralized_fetcher(from_snapshot: bool) { validator.seal_batch().await; tracing::info!("Spawn a node."); - let node_store = new_store(from_snapshot).await; - let (node, runner) = testonly::StateKeeper::new(ctx, node_store.clone()).await?; + let node_pool = new_pool(from_snapshot).await; + let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?; s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("fetcher"))); - s.spawn_bg(node.run_centralized_fetcher(ctx, validator.connect(ctx).await?)); + s.spawn_bg(node.run_fetcher(ctx, validator.connect(ctx).await?)); tracing::info!("Produce some blocks and wait for node to fetch them"); validator.push_random_blocks(rng, 10).await; - let want = validator_store + let want = validator_pool .wait_for_payload(ctx, validator.last_block()) .await?; - let got = node_store + let got = node_pool .wait_for_payload(ctx, validator.last_block()) .await?; assert_eq!(want, got); diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 3042bae022d4..4655b5a3b775 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -549,14 +549,13 @@ pub async fn initialize_components( .map(|a| a.state_transition_proxy_addr); if components.contains(&Component::Consensus) { - let secrets = secrets.consensus.as_ref().context("Secrets are missing")?; - let cfg = consensus::config::main_node( - consensus_config - .as_ref() - .context("consensus component's config is missing")?, - secrets, - l2_chain_id, - )?; + let cfg = consensus_config + .clone() + .context("consensus component's config is missing")?; + let secrets = secrets + .consensus + .clone() + .context("consensus component's secrets are missing")?; let started_at = Instant::now(); tracing::info!("initializing Consensus"); let pool = connection_pool.clone(); @@ -569,7 +568,7 @@ pub async fn initialize_components( // but we only need to wait for stop signal once, and it will be propagated to all child contexts. let root_ctx = ctx::root(); scope::run!(&root_ctx, |ctx, s| async move { - s.spawn_bg(consensus::era::run_main_node(ctx, cfg, pool)); + s.spawn_bg(consensus::era::run_main_node(ctx, cfg, secrets, pool)); let _ = stop_receiver.wait_for(|stop| *stop).await?; Ok(()) }) diff --git a/core/lib/zksync_core/src/sync_layer/sync_state.rs b/core/lib/zksync_core/src/sync_layer/sync_state.rs index 91f66a5d3e8f..ecc999f7ecc5 100644 --- a/core/lib/zksync_core/src/sync_layer/sync_state.rs +++ b/core/lib/zksync_core/src/sync_layer/sync_state.rs @@ -43,7 +43,6 @@ impl SyncState { self.0.borrow().local_block.unwrap_or_default() } - #[cfg(test)] pub(crate) async fn wait_for_local_block(&self, want: L2BlockNumber) { self.0 .subscribe() diff --git a/core/node/node_framework/examples/main_node.rs b/core/node/node_framework/examples/main_node.rs index fa1898c0b6a0..a91ff415d5a5 100644 --- a/core/node/node_framework/examples/main_node.rs +++ b/core/node/node_framework/examples/main_node.rs @@ -384,7 +384,6 @@ impl MainNodeBuilder { )) } - let genesis = GenesisConfig::from_env()?; let config = read_consensus_config().context("read_consensus_config()")?; let secrets = read_consensus_secrets().context("read_consensus_secrets()")?; @@ -392,7 +391,6 @@ impl MainNodeBuilder { mode: ConsensusMode::Main, config, secrets, - chain_id: genesis.l2_chain_id, }); Ok(self) diff --git a/core/node/node_framework/src/implementations/layers/consensus.rs b/core/node/node_framework/src/implementations/layers/consensus.rs index e9f199d878cc..3af88b0505f3 100644 --- a/core/node/node_framework/src/implementations/layers/consensus.rs +++ b/core/node/node_framework/src/implementations/layers/consensus.rs @@ -2,11 +2,10 @@ use anyhow::Context as _; use zksync_concurrency::{ctx, scope}; use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets}; use zksync_core::{ - consensus::{self, MainNodeConfig}, + consensus, sync_layer::{ActionQueueSender, SyncState}, }; use zksync_dal::{ConnectionPool, Core}; -use zksync_types::L2ChainId; use zksync_web3_decl::client::{DynClient, L2}; use crate::{ @@ -32,7 +31,6 @@ pub struct ConsensusLayer { pub mode: Mode, pub config: Option, pub secrets: Option, - pub chain_id: L2ChainId, } #[async_trait::async_trait] @@ -56,12 +54,9 @@ impl WiringLayer for ConsensusLayer { let secrets = self.secrets.ok_or_else(|| { WiringError::Configuration("Missing private consensus config".to_string()) })?; - - let main_node_config = - consensus::config::main_node(&config, &secrets, self.chain_id)?; - let task = MainNodeConsensusTask { - config: main_node_config, + config, + secrets, pool, }; context.add_task(Box::new(task)); @@ -110,7 +105,8 @@ impl WiringLayer for ConsensusLayer { #[derive(Debug)] pub struct MainNodeConsensusTask { - config: MainNodeConfig, + config: ConsensusConfig, + secrets: ConsensusSecrets, pool: ConnectionPool, } @@ -129,7 +125,12 @@ impl Task for MainNodeConsensusTask { // but we only need to wait for stop signal once, and it will be propagated to all child contexts. let root_ctx = ctx::root(); scope::run!(&root_ctx, |ctx, s| async move { - s.spawn_bg(consensus::era::run_main_node(ctx, self.config, self.pool)); + s.spawn_bg(consensus::era::run_main_node( + ctx, + self.config, + self.secrets, + self.pool, + )); let _ = stop_receiver.0.wait_for(|stop| *stop).await?; Ok(()) }) @@ -161,7 +162,7 @@ impl Task for FetcherTask { // but we only need to wait for stop signal once, and it will be propagated to all child contexts. let root_ctx = ctx::root(); scope::run!(&root_ctx, |ctx, s| async { - s.spawn_bg(zksync_core::consensus::era::run_fetcher( + s.spawn_bg(zksync_core::consensus::era::run_en( &root_ctx, self.config, self.pool, diff --git a/etc/env/consensus_config.yaml b/etc/env/consensus_config.yaml index 2b1b543b34f5..4a1f24c58e71 100644 --- a/etc/env/consensus_config.yaml +++ b/etc/env/consensus_config.yaml @@ -2,3 +2,11 @@ server_addr: '127.0.0.1:3054' public_addr: '127.0.0.1:3054' max_payload_size: 2500000 gossip_dynamic_inbound_limit: 1 +# LOCALHOST TEST CONFIGURATION ONLY, don't copy to other environments. +genesis_spec: + chain_id: 1337 + protocol_version: 1 + validators: + - key: 'validator:public:bls12_381:b14e3126668ae79e689a2d65c56522889a3812ef5433097c33bd7af601b073dcdddf46e188883aa381725c49e08f90c705df1f78bf918e1978912cebeadff0d0084b1a4fe2ddee243e826348045f528803207f5de303c6a95bc1a701a190dbcf' + weight: 1 + leader: 'validator:public:bls12_381:b14e3126668ae79e689a2d65c56522889a3812ef5433097c33bd7af601b073dcdddf46e188883aa381725c49e08f90c705df1f78bf918e1978912cebeadff0d0084b1a4fe2ddee243e826348045f528803207f5de303c6a95bc1a701a190dbcf' diff --git a/prover/Cargo.lock b/prover/Cargo.lock index 0d95356ae12f..8ce0a4e32788 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -5315,6 +5315,15 @@ dependencies = [ "cc", ] +[[package]] +name = "secrecy" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bd1c54ea06cfd2f6b63219704de0b9b4f72dcc2b8fdef820be6cd799780e91e" +dependencies = [ + "zeroize", +] + [[package]] name = "security-framework" version = "2.9.2" @@ -7789,6 +7798,7 @@ version = "0.1.0" dependencies = [ "anyhow", "rand 0.8.5", + "secrecy", "serde", "url", "zksync_basic_types",