Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pbs mux #172

Merged
merged 2 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,26 @@ target_first_request_ms = 200
# OPTIONAL
frequency_get_header_ms = 300

# Configuration for the PBS multiplexers, which enable different configs to be used for get header requests, depending on validator pubkey
# Note that:
# - multiple sets of keys can be defined by adding multiple [[mux]] sections. The validator pubkey sets need to be disjoint
# - the mux is only used for get header requests
# - if any value is missing from the mux config, the default value from the main config will be used
[[mux]]
# Which validator pubkeys to match against this mux config
validator_pubkeys = [
"0x80c7f782b2467c5898c5516a8b6595d75623960b4afc4f71ee07d40985d20e117ba35e7cd352a3e75fb85a8668a3b745",
"0xa119589bb33ef52acbb8116832bec2b58fca590fe5c85eac5d3230b44d5bc09fe73ccd21f88eab31d6de16194d17782e",
]
timeout_get_header_ms = 900
late_in_slot_time_ms = 1500
# For each mux, one or more [[pbs_mux.relays]] can be defined, which will be used for the matching validator pubkeys
# Only the relays defined here will be used, and the rest of the relays defined in the main config will be ignored
# Any field defined here will override the default value from the relay config with the same id in [[relays]]
[[mux.relays]]
id = "example-relay"
headers = { X-MyCustomHeader = "ADifferentCustomValue" }

# Configuration for the Signer Module, only required if any `commit` module is present, or if `pbs.with_signer = true`
# OPTIONAL
[signer]
Expand Down
29 changes: 29 additions & 0 deletions configs/pbs-mux.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# PBS config with a mux for a single validator

chain = "Holesky"

[pbs]
port = 18550
timeout_get_header_ms = 950
late_in_slot_time_ms = 2000

[[relays]]
id = "relay-1"
url = "http://0xa1cec75a3f0661e99299274182938151e8433c61a19222347ea1313d839229cb4ce4e3e5aa2bdeb71c8fcf1b084963c2@abc.xyz"

[[relays]]
id = "relay-2"
url = "http://0xa119589bb33ef52acbb8116832bec2b58fca590fe5c85eac5d3230b44d5bc09fe73ccd21f88eab31d6de16194d17782e@def.xyz"
enable_timing_games = true
target_first_request_ms = 200

[[mux]]
validator_pubkeys = [
"0x80c7f782b2467c5898c5516a8b6595d75623960b4afc4f71ee07d40985d20e117ba35e7cd352a3e75fb85a8668a3b745",
]
timeout_get_header_ms = 900
late_in_slot_time_ms = 1500

[[mux.relays]]
id = "relay-2"
enable_timing_games = false
7 changes: 7 additions & 0 deletions crates/common/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod constants;
mod log;
mod metrics;
mod module;
mod mux;
mod pbs;
mod signer;
mod utils;
Expand All @@ -17,6 +18,7 @@ pub use constants::*;
pub use log::*;
pub use metrics::*;
pub use module::*;
pub use mux::*;
pub use pbs::*;
pub use signer::*;
pub use utils::*;
Expand All @@ -26,6 +28,8 @@ pub struct CommitBoostConfig {
pub chain: Chain,
pub relays: Vec<RelayConfig>,
pub pbs: StaticPbsConfig,
#[serde(flatten)]
pub muxes: Option<PbsMuxes>,
pub modules: Option<Vec<StaticModuleConfig>>,
pub signer: Option<SignerConfig>,
pub metrics: Option<MetricsConfig>,
Expand Down Expand Up @@ -57,6 +61,7 @@ impl CommitBoostConfig {
chain,
relays: rest_config.relays,
pbs: rest_config.pbs,
muxes: rest_config.muxes,
modules: rest_config.modules,
signer: rest_config.signer,
metrics: rest_config.metrics,
Expand Down Expand Up @@ -96,6 +101,8 @@ struct ChainConfig {
struct HelperConfig {
relays: Vec<RelayConfig>,
pbs: StaticPbsConfig,
#[serde(flatten)]
muxes: Option<PbsMuxes>,
modules: Option<Vec<StaticModuleConfig>>,
signer: Option<SignerConfig>,
metrics: Option<MetricsConfig>,
Expand Down
138 changes: 138 additions & 0 deletions crates/common/src/config/mux.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use alloy::rpc::types::beacon::BlsPublicKey;
use eyre::{bail, ensure, eyre};
use serde::{Deserialize, Serialize};

use super::{PbsConfig, RelayConfig};
use crate::pbs::{RelayClient, RelayEntry};

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct PbsMuxes {
/// List of PBS multiplexers
#[serde(rename = "mux")]
pub muxes: Vec<MuxConfig>,
}

#[derive(Debug, Clone)]
pub struct RuntimeMuxConfig {
pub config: Arc<PbsConfig>,
pub relays: Vec<RelayClient>,
}

impl PbsMuxes {
pub fn validate_and_fill(
self,
default_pbs: &PbsConfig,
default_relays: &[RelayConfig],
) -> eyre::Result<HashMap<BlsPublicKey, RuntimeMuxConfig>> {
// check that validator pubkeys are in disjoint sets
let mut unique_pubkeys = HashSet::new();
for mux in self.muxes.iter() {
for pubkey in mux.validator_pubkeys.iter() {
if !unique_pubkeys.insert(pubkey) {
bail!("duplicate validator pubkey in muxes: {pubkey}");
}
}
}

let mut configs = HashMap::new();
// fill the configs using the default pbs config and relay entries
for mux in self.muxes {
ensure!(!mux.relays.is_empty(), "mux config must have at least one relay");
ensure!(
!mux.validator_pubkeys.is_empty(),
"mux config must have at least one validator pubkey"
);

let mut relay_clients = Vec::with_capacity(mux.relays.len());
for partial_relay in mux.relays.into_iter() {
// create a new config overriding only the missing fields
let partial_id = partial_relay.id()?;
// assume that there is always a relay defined in the default config. If this
// becomes too much of a burden, we can change this to allow defining relays
// that are exclusively used by a mux
let default_relay = default_relays
.iter()
.find(|r| r.id() == partial_id)
.ok_or_else(|| eyre!("default relay config not found for: {}", partial_id))?;

let full_config = RelayConfig {
id: Some(partial_id.to_string()),
entry: partial_relay.entry.unwrap_or(default_relay.entry.clone()),
headers: partial_relay.headers.or(default_relay.headers.clone()),
enable_timing_games: partial_relay
.enable_timing_games
.unwrap_or(default_relay.enable_timing_games),
target_first_request_ms: partial_relay
.target_first_request_ms
.or(default_relay.target_first_request_ms),
frequency_get_header_ms: partial_relay
.frequency_get_header_ms
.or(default_relay.frequency_get_header_ms),
};

relay_clients.push(RelayClient::new(full_config)?);
}

let config = PbsConfig {
timeout_get_header_ms: mux
.timeout_get_header_ms
.unwrap_or(default_pbs.timeout_get_header_ms),
late_in_slot_time_ms: mux
.late_in_slot_time_ms
.unwrap_or(default_pbs.late_in_slot_time_ms),
..default_pbs.clone()
};
let config = Arc::new(config);

let runtime_config = RuntimeMuxConfig { config, relays: relay_clients };
for pubkey in mux.validator_pubkeys.iter() {
configs.insert(*pubkey, runtime_config.clone());
}
}

Ok(configs)
}
}

/// Configuration for the PBS Multiplexer
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct MuxConfig {
/// Relays to use for this mux config
pub relays: Vec<PartialRelayConfig>,
/// Which validator pubkeys to match against this mux config
pub validator_pubkeys: Vec<BlsPublicKey>,
pub timeout_get_header_ms: Option<u64>,
pub late_in_slot_time_ms: Option<u64>,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
/// A relay config with all optional fields. See [`RelayConfig`] for the
/// description of the fields.
pub struct PartialRelayConfig {
pub id: Option<String>,
#[serde(rename = "url")]
pub entry: Option<RelayEntry>,
pub headers: Option<HashMap<String, String>>,
pub enable_timing_games: Option<bool>,
pub target_first_request_ms: Option<u64>,
pub frequency_get_header_ms: Option<u64>,
}

impl PartialRelayConfig {
pub fn id(&self) -> eyre::Result<&str> {
match &self.id {
Some(id) => Ok(id.as_str()),
None => {
let entry = self.entry.as_ref().ok_or_else(|| {
eyre!("relays in [[mux]] need to specifify either an `id` or a `url`")
})?;
Ok(entry.id.as_str())
}
}
}
}
36 changes: 33 additions & 3 deletions crates/common/src/config/pbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@ use std::{
sync::Arc,
};

use alloy::primitives::{utils::format_ether, U256};
use alloy::{
primitives::{utils::format_ether, U256},
rpc::types::beacon::BlsPublicKey,
};
use eyre::{ensure, Result};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use url::Url;

use super::{
constants::PBS_IMAGE_DEFAULT, load_optional_env_var, CommitBoostConfig, PBS_ENDPOINT_ENV,
constants::PBS_IMAGE_DEFAULT, load_optional_env_var, CommitBoostConfig, RuntimeMuxConfig,
PBS_ENDPOINT_ENV,
};
use crate::{
commit::client::SignerClient,
config::{load_env_var, load_file_from_env, CONFIG_ENV, MODULE_JWT_ENV, SIGNER_URL_ENV},
config::{
load_env_var, load_file_from_env, PbsMuxes, CONFIG_ENV, MODULE_JWT_ENV, SIGNER_URL_ENV,
},
pbs::{
BuilderEventPublisher, DefaultTimeout, RelayClient, RelayEntry, DEFAULT_PBS_PORT,
LATE_IN_SLOT_TIME_MS,
Expand Down Expand Up @@ -45,6 +51,12 @@ pub struct RelayConfig {
pub frequency_get_header_ms: Option<u64>,
}

impl RelayConfig {
pub fn id(&self) -> &str {
self.id.as_deref().unwrap_or(self.entry.id.as_str())
}
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct PbsConfig {
/// Host to receive BuilderAPI calls from beacon node
Expand Down Expand Up @@ -149,6 +161,8 @@ pub struct PbsModuleConfig {
pub signer_client: Option<SignerClient>,
/// Event publisher
pub event_publisher: Option<BuilderEventPublisher>,
/// Muxes config
pub muxes: Option<HashMap<BlsPublicKey, RuntimeMuxConfig>>,
}

fn default_pbs() -> String {
Expand All @@ -158,6 +172,7 @@ fn default_pbs() -> String {
/// Loads the default pbs config, i.e. with no signer client or custom data
pub fn load_pbs_config() -> Result<PbsModuleConfig> {
let config = CommitBoostConfig::from_env_path()?;
config.validate()?;

// use endpoint from env if set, otherwise use default host and port
let endpoint = if let Some(endpoint) = load_optional_env_var(PBS_ENDPOINT_ENV) {
Expand All @@ -166,6 +181,11 @@ pub fn load_pbs_config() -> Result<PbsModuleConfig> {
SocketAddr::from((config.pbs.pbs_config.host, config.pbs.pbs_config.port))
};

let muxes = config
.muxes
.map(|muxes| muxes.validate_and_fill(&config.pbs.pbs_config, &config.relays))
.transpose()?;

let relay_clients =
config.relays.into_iter().map(RelayClient::new).collect::<Result<Vec<_>>>()?;
let maybe_publiher = BuilderEventPublisher::new_from_env()?;
Expand All @@ -177,6 +197,7 @@ pub fn load_pbs_config() -> Result<PbsModuleConfig> {
relays: relay_clients,
signer_client: None,
event_publisher: maybe_publiher,
muxes,
})
}

Expand All @@ -195,6 +216,7 @@ pub fn load_pbs_custom_config<T: DeserializeOwned>() -> Result<(PbsModuleConfig,
chain: Chain,
relays: Vec<RelayConfig>,
pbs: CustomPbsConfig<U>,
muxes: Option<PbsMuxes>,
}

// load module config including the extra data (if any)
Expand All @@ -211,6 +233,13 @@ pub fn load_pbs_custom_config<T: DeserializeOwned>() -> Result<(PbsModuleConfig,
))
};

let muxes = match cb_config.muxes {
Some(muxes) => Some(
muxes.validate_and_fill(&cb_config.pbs.static_config.pbs_config, &cb_config.relays)?,
),
None => None,
};

let relay_clients =
cb_config.relays.into_iter().map(RelayClient::new).collect::<Result<Vec<_>>>()?;
let maybe_publiher = BuilderEventPublisher::new_from_env()?;
Expand All @@ -232,6 +261,7 @@ pub fn load_pbs_custom_config<T: DeserializeOwned>() -> Result<(PbsModuleConfig,
relays: relay_clients,
signer_client,
event_publisher: maybe_publiher,
muxes,
},
cb_config.pbs.extra,
))
Expand Down
11 changes: 4 additions & 7 deletions crates/common/src/pbs/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{config::RelayConfig, DEFAULT_REQUEST_TIMEOUT};
/// A parsed entry of the relay url in the format: scheme://pubkey@host
#[derive(Debug, Clone)]
pub struct RelayEntry {
/// Default if of the relay, the hostname of the url
/// Default ID of the relay, the hostname of the url
pub id: String,
/// Public key of the relay
pub pubkey: BlsPublicKey,
Expand All @@ -42,8 +42,9 @@ impl<'de> Deserialize<'de> for RelayEntry {
D: serde::Deserializer<'de>,
{
let url = Url::deserialize(deserializer)?;
let pubkey = BlsPublicKey::from_hex(url.username()).map_err(serde::de::Error::custom)?;
let id = url.host().ok_or(serde::de::Error::custom("missing host"))?.to_string();
let pubkey = BlsPublicKey::from_hex(url.username())
.map_err(|_| serde::de::Error::custom("invalid BLS pubkey"))?;

Ok(RelayEntry { pubkey, url, id })
}
Expand Down Expand Up @@ -79,11 +80,7 @@ impl RelayClient {
.timeout(DEFAULT_REQUEST_TIMEOUT)
.build()?;

Ok(Self {
id: Arc::new(config.id.clone().unwrap_or(config.entry.id.clone())),
client,
config: Arc::new(config),
})
Ok(Self { id: Arc::new(config.id().to_owned()), client, config: Arc::new(config) })
}

pub fn pubkey(&self) -> BlsPublicKey {
Expand Down
3 changes: 3 additions & 0 deletions crates/common/src/pbs/types/get_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ use super::{

#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
pub struct GetHeaderParams {
/// The slot to request the header for
pub slot: u64,
/// The parent hash of the block to request the header for
pub parent_hash: B256,
/// The pubkey of the validator that is requesting the header
pub pubkey: BlsPublicKey,
}

Expand Down
Loading