Skip to content

Commit

Permalink
feat: da_dispatcher refactoring (#3409)
Browse files Browse the repository at this point in the history
## What ❔

- Don't assume NoDA Validium by default, make the config required even
for no DA, that helps avoid misconfiguration issues
- Separate threads in da_dispatcer for `dispatch` and
`poll_for_inclusion`
- Change the default amount of rows that are fetched for dispatching

## Why ❔

To make configuration more resilient and main logic more efficient.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zkstack dev fmt` and `zkstack dev
lint`.
  • Loading branch information
dimazhornyk authored Jan 8, 2025
1 parent 1000f61 commit 591cd86
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 47 deletions.
39 changes: 25 additions & 14 deletions core/bin/zksync_server/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,21 @@ impl MainNodeBuilder {
self.node.runtime_handle()
}

pub fn get_pubdata_type(&self) -> PubdataType {
pub fn get_pubdata_type(&self) -> anyhow::Result<PubdataType> {
if self.genesis_config.l1_batch_commit_data_generator_mode == L1BatchCommitmentMode::Rollup
{
return PubdataType::Rollup;
return Ok(PubdataType::Rollup);
}

let Some(da_client_config) = self.configs.da_client_config.clone() else {
return PubdataType::NoDA;
};

match da_client_config {
DAClientConfig::Avail(_) => PubdataType::Avail,
DAClientConfig::Celestia(_) => PubdataType::Celestia,
DAClientConfig::Eigen(_) => PubdataType::Eigen,
DAClientConfig::ObjectStore(_) => PubdataType::ObjectStore,
match self.configs.da_client_config.clone() {
None => Err(anyhow::anyhow!("No config for DA client")),
Some(da_client_config) => Ok(match da_client_config {
DAClientConfig::Avail(_) => PubdataType::Avail,
DAClientConfig::Celestia(_) => PubdataType::Celestia,
DAClientConfig::Eigen(_) => PubdataType::Eigen,
DAClientConfig::ObjectStore(_) => PubdataType::ObjectStore,
DAClientConfig::NoDA => PubdataType::NoDA,
}),
}
}

Expand Down Expand Up @@ -273,7 +273,7 @@ impl MainNodeBuilder {
try_load_config!(self.configs.mempool_config),
try_load_config!(wallets.state_keeper),
self.contracts_config.l2_da_validator_addr,
self.get_pubdata_type(),
self.get_pubdata_type()?,
);
let db_config = try_load_config!(self.configs.db_config);
let experimental_vm_config = self
Expand Down Expand Up @@ -551,11 +551,22 @@ impl MainNodeBuilder {
}

fn add_da_client_layer(mut self) -> anyhow::Result<Self> {
let eth_sender_config = try_load_config!(self.configs.eth);
if let Some(sender_config) = eth_sender_config.sender {
if sender_config.pubdata_sending_mode != PubdataSendingMode::Custom {
tracing::warn!("DA dispatcher is enabled, but the pubdata sending mode is not `Custom`. DA client will not be started.");
return Ok(self);
}
}

let Some(da_client_config) = self.configs.da_client_config.clone() else {
tracing::warn!("No config for DA client, using the NoDA client");
bail!("No config for DA client");
};

if let DAClientConfig::NoDA = da_client_config {
self.node.add_layer(NoDAClientWiringLayer);
return Ok(self);
};
}

let secrets = try_load_config!(self.secrets.data_availability);
match (da_client_config, secrets) {
Expand Down
21 changes: 21 additions & 0 deletions core/lib/config/src/configs/da_client/avail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use zksync_basic_types::secrets::{APIKey, SeedPhrase};
pub const AVAIL_GAS_RELAY_CLIENT_NAME: &str = "GasRelay";
pub const AVAIL_FULL_CLIENT_NAME: &str = "FullClient";

pub const IN_BLOCK_FINALITY_STATE: &str = "inBlock";
pub const FINALIZED_FINALITY_STATE: &str = "finalized";

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(tag = "avail_client")]
pub enum AvailClientConfig {
Expand All @@ -23,6 +26,7 @@ pub struct AvailConfig {
pub struct AvailDefaultConfig {
pub api_node_url: String,
pub app_id: u32,
pub finality_state: Option<String>,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand All @@ -36,3 +40,20 @@ pub struct AvailSecrets {
pub seed_phrase: Option<SeedPhrase>,
pub gas_relay_api_key: Option<APIKey>,
}

impl AvailDefaultConfig {
pub fn finality_state(&self) -> anyhow::Result<String> {
match self.finality_state.clone() {
Some(finality_state) => match finality_state.as_str() {
IN_BLOCK_FINALITY_STATE | FINALIZED_FINALITY_STATE => Ok(finality_state),
_ => Err(anyhow::anyhow!(
"Invalid finality state: {}. Supported values are: {}, {}",
finality_state,
IN_BLOCK_FINALITY_STATE,
FINALIZED_FINALITY_STATE
)),
},
None => Ok(IN_BLOCK_FINALITY_STATE.to_string()),
}
}
}
2 changes: 2 additions & 0 deletions core/lib/config/src/configs/da_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ pub const AVAIL_CLIENT_CONFIG_NAME: &str = "Avail";
pub const CELESTIA_CLIENT_CONFIG_NAME: &str = "Celestia";
pub const EIGEN_CLIENT_CONFIG_NAME: &str = "Eigen";
pub const OBJECT_STORE_CLIENT_CONFIG_NAME: &str = "ObjectStore";
pub const NO_DA_CLIENT_CONFIG_NAME: &str = "NoDA";

#[derive(Debug, Clone, PartialEq)]
pub enum DAClientConfig {
Avail(AvailConfig),
Celestia(CelestiaConfig),
Eigen(EigenConfig),
ObjectStore(ObjectStoreConfig),
NoDA,
}

impl From<AvailConfig> for DAClientConfig {
Expand Down
7 changes: 6 additions & 1 deletion core/lib/config/src/configs/da_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ use std::time::Duration;

use serde::Deserialize;

/// The default interval between the `da_dispatcher's` iterations.
pub const DEFAULT_POLLING_INTERVAL_MS: u32 = 5000;
pub const DEFAULT_MAX_ROWS_TO_DISPATCH: u32 = 100;
/// The maximum number of rows to fetch from the database in a single query. The value has to be
/// not too high to avoid the dispatcher iteration taking too much time.
pub const DEFAULT_MAX_ROWS_TO_DISPATCH: u32 = 3;
/// The maximum number of retries for the dispatch of a blob.
pub const DEFAULT_MAX_RETRIES: u16 = 5;
/// Use dummy value as inclusion proof instead of getting it from the client.
pub const DEFAULT_USE_DUMMY_INCLUSION_DATA: bool = false;

#[derive(Debug, Clone, PartialEq, Deserialize)]
Expand Down
1 change: 1 addition & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,7 @@ impl Distribution<configs::da_client::DAClientConfig> for EncodeDist {
config: AvailClientConfig::FullClient(AvailDefaultConfig {
api_node_url: self.sample(rng),
app_id: self.sample(rng),
finality_state: None,
}),
})
}
Expand Down
1 change: 1 addition & 0 deletions core/lib/env_config/src/da_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ mod tests {
config: AvailClientConfig::FullClient(AvailDefaultConfig {
api_node_url: api_node_url.to_string(),
app_id,
finality_state: None,
}),
})
}
Expand Down
9 changes: 6 additions & 3 deletions core/lib/protobuf_config/src/da_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use zksync_config::configs::{
avail::{AvailClientConfig, AvailConfig, AvailDefaultConfig, AvailGasRelayConfig},
celestia::CelestiaConfig,
eigen::EigenConfig,
DAClientConfig::{Avail, Celestia, Eigen, ObjectStore},
DAClientConfig::{Avail, Celestia, Eigen, NoDA, ObjectStore},
},
};
use zksync_protobuf::{required, ProtoRepr};
Expand All @@ -16,8 +16,7 @@ impl ProtoRepr for proto::DataAvailabilityClient {
type Type = configs::DAClientConfig;

fn read(&self) -> anyhow::Result<Self::Type> {
let config = required(&self.config).context("config")?;

let config = required(&self.config).context("da_client config")?;
let client = match config {
proto::data_availability_client::Config::Avail(conf) => Avail(AvailConfig {
bridge_api_url: required(&conf.bridge_api_url)
Expand All @@ -31,6 +30,7 @@ impl ProtoRepr for proto::DataAvailabilityClient {
.context("api_node_url")?
.clone(),
app_id: *required(&full_client_conf.app_id).context("app_id")?,
finality_state: full_client_conf.finality_state.clone(),
})
}
Some(proto::avail_config::Config::GasRelay(gas_relay_conf)) => {
Expand Down Expand Up @@ -62,6 +62,7 @@ impl ProtoRepr for proto::DataAvailabilityClient {
proto::data_availability_client::Config::ObjectStore(conf) => {
ObjectStore(object_store_proto::ObjectStore::read(conf)?)
}
proto::data_availability_client::Config::NoDa(_) => NoDA,
};

Ok(client)
Expand All @@ -77,6 +78,7 @@ impl ProtoRepr for proto::DataAvailabilityClient {
proto::avail_config::Config::FullClient(proto::AvailClientConfig {
api_node_url: Some(conf.api_node_url.clone()),
app_id: Some(conf.app_id),
finality_state: conf.finality_state.clone(),
}),
),
AvailClientConfig::GasRelay(conf) => Some(
Expand All @@ -102,6 +104,7 @@ impl ProtoRepr for proto::DataAvailabilityClient {
ObjectStore(config) => proto::data_availability_client::Config::ObjectStore(
object_store_proto::ObjectStore::build(config),
),
NoDA => proto::data_availability_client::Config::NoDa(proto::NoDaConfig {}),
};

Self {
Expand Down
4 changes: 4 additions & 0 deletions core/lib/protobuf_config/src/proto/config/da_client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ message AvailConfig {
message AvailClientConfig {
optional string api_node_url = 1;
optional uint32 app_id = 2;
optional string finality_state = 3;
}

message AvailGasRelayConfig {
Expand All @@ -41,12 +42,15 @@ message EigenConfig {
optional uint64 inclusion_polling_interval_ms = 2;
}

message NoDAConfig {}

message DataAvailabilityClient {
// oneof in protobuf allows for None
oneof config {
AvailConfig avail = 1;
object_store.ObjectStore object_store = 2;
CelestiaConfig celestia = 3;
EigenConfig eigen = 4;
NoDAConfig no_da = 5;
}
}
8 changes: 6 additions & 2 deletions core/node/da_clients/src/avail/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,12 @@ impl AvailClient {
.seed_phrase
.ok_or_else(|| anyhow::anyhow!("Seed phrase is missing"))?;
// these unwraps are safe because we validate in protobuf config
let sdk_client =
RawAvailClient::new(conf.app_id, seed_phrase.0.expose_secret()).await?;
let sdk_client = RawAvailClient::new(
conf.app_id,
seed_phrase.0.expose_secret(),
conf.finality_state()?,
)
.await?;

Ok(Self {
config,
Expand Down
15 changes: 12 additions & 3 deletions core/node/da_clients/src/avail/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const PROTOCOL_VERSION: u8 = 4;
pub(crate) struct RawAvailClient {
app_id: u32,
keypair: Keypair,
finality_state: String,
}

/// Utility type needed for encoding the call data
Expand All @@ -44,11 +45,19 @@ struct BoundedVec<_0>(pub Vec<_0>);
impl RawAvailClient {
pub(crate) const MAX_BLOB_SIZE: usize = 512 * 1024; // 512kb

pub(crate) async fn new(app_id: u32, seed: &str) -> anyhow::Result<Self> {
pub(crate) async fn new(
app_id: u32,
seed: &str,
finality_state: String,
) -> anyhow::Result<Self> {
let mnemonic = Mnemonic::parse(seed)?;
let keypair = Keypair::from_phrase(&mnemonic, None)?;

Ok(Self { app_id, keypair })
Ok(Self {
app_id,
keypair,
finality_state,
})
}

/// Returns a hex-encoded extrinsic
Expand Down Expand Up @@ -291,7 +300,7 @@ impl RawAvailClient {
let status = sub.next().await.transpose()?;

if status.is_some() && status.as_ref().unwrap().is_object() {
if let Some(block_hash) = status.unwrap().get("finalized") {
if let Some(block_hash) = status.unwrap().get(self.finality_state.as_str()) {
break block_hash
.as_str()
.ok_or_else(|| anyhow::anyhow!("Invalid block hash"))?
Expand Down
70 changes: 46 additions & 24 deletions core/node/da_dispatcher/src/da_dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::Future, time::Duration};
use std::{future::Future, sync::Arc, time::Duration};

use anyhow::Context;
use chrono::Utc;
Expand All @@ -14,7 +14,7 @@ use zksync_types::L1BatchNumber;

use crate::metrics::METRICS;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DataAvailabilityDispatcher {
client: Box<dyn DataAvailabilityClient>,
pool: ConnectionPool<Core>,
Expand All @@ -35,37 +35,59 @@ impl DataAvailabilityDispatcher {
}

pub async fn run(self, mut stop_receiver: Receiver<bool>) -> anyhow::Result<()> {
loop {
if *stop_receiver.borrow() {
break;
}
let self_arc = Arc::new(self.clone());

let subtasks = futures::future::join(
async {
if let Err(err) = self.dispatch().await {
tracing::error!("dispatch error {err:?}");
}
},
async {
if let Err(err) = self.poll_for_inclusion().await {
tracing::error!("poll_for_inclusion error {err:?}");
}
},
);
let mut stop_receiver_dispatch = stop_receiver.clone();
let mut stop_receiver_poll_for_inclusion = stop_receiver.clone();

let dispatch_task = tokio::spawn(async move {
loop {
if *stop_receiver_dispatch.borrow() {
break;
}

tokio::select! {
_ = subtasks => {},
_ = stop_receiver.changed() => {
if let Err(err) = self_arc.dispatch().await {
tracing::error!("dispatch error {err:?}");
}

if tokio::time::timeout(
self_arc.config.polling_interval(),
stop_receiver_dispatch.changed(),
)
.await
.is_ok()
{
break;
}
}
});

if tokio::time::timeout(self.config.polling_interval(), stop_receiver.changed())
let inclusion_task = tokio::spawn(async move {
loop {
if *stop_receiver_poll_for_inclusion.borrow() {
break;
}

if let Err(err) = self.poll_for_inclusion().await {
tracing::error!("poll_for_inclusion error {err:?}");
}

if tokio::time::timeout(
self.config.polling_interval(),
stop_receiver_poll_for_inclusion.changed(),
)
.await
.is_ok()
{
break;
{
break;
}
}
});

tokio::select! {
_ = dispatch_task => {},
_ = inclusion_task => {},
_ = stop_receiver.changed() => {},
}

tracing::info!("Stop signal received, da_dispatcher is shutting down");
Expand Down
2 changes: 2 additions & 0 deletions etc/env/file_based/overrides/validium.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ eth:
state_keeper:
pubdata_overhead_part: 0
compute_overhead_part: 1
da_client:
no_da: {}

0 comments on commit 591cd86

Please sign in to comment.