Skip to content

Commit

Permalink
feat: add pbs mux (#172)
Browse files Browse the repository at this point in the history
* add pbs mux

* comments
  • Loading branch information
ltitanb authored Nov 25, 2024
1 parent 75e8129 commit ca153cc
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 19 deletions.
20 changes: 20 additions & 0 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,26 @@ target_first_request_ms = 200
# OPTIONAL
frequency_get_header_ms = 300

# Configuration for the PBS multiplexers, which enable different configs to be used for get header requests, depending on validator pubkey
# Note that:
# - multiple sets of keys can be defined by adding multiple [[mux]] sections. The validator pubkey sets need to be disjoint
# - the mux is only used for get header requests
# - if any value is missing from the mux config, the default value from the main config will be used
[[mux]]
# Which validator pubkeys to match against this mux config
validator_pubkeys = [
"0x80c7f782b2467c5898c5516a8b6595d75623960b4afc4f71ee07d40985d20e117ba35e7cd352a3e75fb85a8668a3b745",
"0xa119589bb33ef52acbb8116832bec2b58fca590fe5c85eac5d3230b44d5bc09fe73ccd21f88eab31d6de16194d17782e",
]
timeout_get_header_ms = 900
late_in_slot_time_ms = 1500
# For each mux, one or more [[pbs_mux.relays]] can be defined, which will be used for the matching validator pubkeys
# Only the relays defined here will be used, and the rest of the relays defined in the main config will be ignored
# Any field defined here will override the default value from the relay config with the same id in [[relays]]
[[mux.relays]]
id = "example-relay"
headers = { X-MyCustomHeader = "ADifferentCustomValue" }

# Configuration for the Signer Module, only required if any `commit` module is present, or if `pbs.with_signer = true`
# OPTIONAL
[signer]
Expand Down
29 changes: 29 additions & 0 deletions configs/pbs-mux.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# PBS config with a mux for a single validator

chain = "Holesky"

[pbs]
port = 18550
timeout_get_header_ms = 950
late_in_slot_time_ms = 2000

[[relays]]
id = "relay-1"
url = "http://0xa1cec75a3f0661e99299274182938151e8433c61a19222347ea1313d839229cb4ce4e3e5aa2bdeb71c8fcf1b084963c2@abc.xyz"

[[relays]]
id = "relay-2"
url = "http://0xa119589bb33ef52acbb8116832bec2b58fca590fe5c85eac5d3230b44d5bc09fe73ccd21f88eab31d6de16194d17782e@def.xyz"
enable_timing_games = true
target_first_request_ms = 200

[[mux]]
validator_pubkeys = [
"0x80c7f782b2467c5898c5516a8b6595d75623960b4afc4f71ee07d40985d20e117ba35e7cd352a3e75fb85a8668a3b745",
]
timeout_get_header_ms = 900
late_in_slot_time_ms = 1500

[[mux.relays]]
id = "relay-2"
enable_timing_games = false
7 changes: 7 additions & 0 deletions crates/common/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod constants;
mod log;
mod metrics;
mod module;
mod mux;
mod pbs;
mod signer;
mod utils;
Expand All @@ -17,6 +18,7 @@ pub use constants::*;
pub use log::*;
pub use metrics::*;
pub use module::*;
pub use mux::*;
pub use pbs::*;
pub use signer::*;
pub use utils::*;
Expand All @@ -26,6 +28,8 @@ pub struct CommitBoostConfig {
pub chain: Chain,
pub relays: Vec<RelayConfig>,
pub pbs: StaticPbsConfig,
#[serde(flatten)]
pub muxes: Option<PbsMuxes>,
pub modules: Option<Vec<StaticModuleConfig>>,
pub signer: Option<SignerConfig>,
pub metrics: Option<MetricsConfig>,
Expand Down Expand Up @@ -57,6 +61,7 @@ impl CommitBoostConfig {
chain,
relays: rest_config.relays,
pbs: rest_config.pbs,
muxes: rest_config.muxes,
modules: rest_config.modules,
signer: rest_config.signer,
metrics: rest_config.metrics,
Expand Down Expand Up @@ -96,6 +101,8 @@ struct ChainConfig {
struct HelperConfig {
relays: Vec<RelayConfig>,
pbs: StaticPbsConfig,
#[serde(flatten)]
muxes: Option<PbsMuxes>,
modules: Option<Vec<StaticModuleConfig>>,
signer: Option<SignerConfig>,
metrics: Option<MetricsConfig>,
Expand Down
138 changes: 138 additions & 0 deletions crates/common/src/config/mux.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use alloy::rpc::types::beacon::BlsPublicKey;
use eyre::{bail, ensure, eyre};
use serde::{Deserialize, Serialize};

use super::{PbsConfig, RelayConfig};
use crate::pbs::{RelayClient, RelayEntry};

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct PbsMuxes {
/// List of PBS multiplexers
#[serde(rename = "mux")]
pub muxes: Vec<MuxConfig>,
}

#[derive(Debug, Clone)]
pub struct RuntimeMuxConfig {
pub config: Arc<PbsConfig>,
pub relays: Vec<RelayClient>,
}

impl PbsMuxes {
pub fn validate_and_fill(
self,
default_pbs: &PbsConfig,
default_relays: &[RelayConfig],
) -> eyre::Result<HashMap<BlsPublicKey, RuntimeMuxConfig>> {
// check that validator pubkeys are in disjoint sets
let mut unique_pubkeys = HashSet::new();
for mux in self.muxes.iter() {
for pubkey in mux.validator_pubkeys.iter() {
if !unique_pubkeys.insert(pubkey) {
bail!("duplicate validator pubkey in muxes: {pubkey}");
}
}
}

let mut configs = HashMap::new();
// fill the configs using the default pbs config and relay entries
for mux in self.muxes {
ensure!(!mux.relays.is_empty(), "mux config must have at least one relay");
ensure!(
!mux.validator_pubkeys.is_empty(),
"mux config must have at least one validator pubkey"
);

let mut relay_clients = Vec::with_capacity(mux.relays.len());
for partial_relay in mux.relays.into_iter() {
// create a new config overriding only the missing fields
let partial_id = partial_relay.id()?;
// assume that there is always a relay defined in the default config. If this
// becomes too much of a burden, we can change this to allow defining relays
// that are exclusively used by a mux
let default_relay = default_relays
.iter()
.find(|r| r.id() == partial_id)
.ok_or_else(|| eyre!("default relay config not found for: {}", partial_id))?;

let full_config = RelayConfig {
id: Some(partial_id.to_string()),
entry: partial_relay.entry.unwrap_or(default_relay.entry.clone()),
headers: partial_relay.headers.or(default_relay.headers.clone()),
enable_timing_games: partial_relay
.enable_timing_games
.unwrap_or(default_relay.enable_timing_games),
target_first_request_ms: partial_relay
.target_first_request_ms
.or(default_relay.target_first_request_ms),
frequency_get_header_ms: partial_relay
.frequency_get_header_ms
.or(default_relay.frequency_get_header_ms),
};

relay_clients.push(RelayClient::new(full_config)?);
}

let config = PbsConfig {
timeout_get_header_ms: mux
.timeout_get_header_ms
.unwrap_or(default_pbs.timeout_get_header_ms),
late_in_slot_time_ms: mux
.late_in_slot_time_ms
.unwrap_or(default_pbs.late_in_slot_time_ms),
..default_pbs.clone()
};
let config = Arc::new(config);

let runtime_config = RuntimeMuxConfig { config, relays: relay_clients };
for pubkey in mux.validator_pubkeys.iter() {
configs.insert(*pubkey, runtime_config.clone());
}
}

Ok(configs)
}
}

/// Configuration for the PBS Multiplexer
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct MuxConfig {
/// Relays to use for this mux config
pub relays: Vec<PartialRelayConfig>,
/// Which validator pubkeys to match against this mux config
pub validator_pubkeys: Vec<BlsPublicKey>,
pub timeout_get_header_ms: Option<u64>,
pub late_in_slot_time_ms: Option<u64>,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
/// A relay config with all optional fields. See [`RelayConfig`] for the
/// description of the fields.
pub struct PartialRelayConfig {
pub id: Option<String>,
#[serde(rename = "url")]
pub entry: Option<RelayEntry>,
pub headers: Option<HashMap<String, String>>,
pub enable_timing_games: Option<bool>,
pub target_first_request_ms: Option<u64>,
pub frequency_get_header_ms: Option<u64>,
}

impl PartialRelayConfig {
pub fn id(&self) -> eyre::Result<&str> {
match &self.id {
Some(id) => Ok(id.as_str()),
None => {
let entry = self.entry.as_ref().ok_or_else(|| {
eyre!("relays in [[mux]] need to specifify either an `id` or a `url`")
})?;
Ok(entry.id.as_str())
}
}
}
}
36 changes: 33 additions & 3 deletions crates/common/src/config/pbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@ use std::{
sync::Arc,
};

use alloy::primitives::{utils::format_ether, U256};
use alloy::{
primitives::{utils::format_ether, U256},
rpc::types::beacon::BlsPublicKey,
};
use eyre::{ensure, Result};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use url::Url;

use super::{
constants::PBS_IMAGE_DEFAULT, load_optional_env_var, CommitBoostConfig, PBS_ENDPOINT_ENV,
constants::PBS_IMAGE_DEFAULT, load_optional_env_var, CommitBoostConfig, RuntimeMuxConfig,
PBS_ENDPOINT_ENV,
};
use crate::{
commit::client::SignerClient,
config::{load_env_var, load_file_from_env, CONFIG_ENV, MODULE_JWT_ENV, SIGNER_URL_ENV},
config::{
load_env_var, load_file_from_env, PbsMuxes, CONFIG_ENV, MODULE_JWT_ENV, SIGNER_URL_ENV,
},
pbs::{
BuilderEventPublisher, DefaultTimeout, RelayClient, RelayEntry, DEFAULT_PBS_PORT,
LATE_IN_SLOT_TIME_MS,
Expand Down Expand Up @@ -45,6 +51,12 @@ pub struct RelayConfig {
pub frequency_get_header_ms: Option<u64>,
}

impl RelayConfig {
pub fn id(&self) -> &str {
self.id.as_deref().unwrap_or(self.entry.id.as_str())
}
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct PbsConfig {
/// Host to receive BuilderAPI calls from beacon node
Expand Down Expand Up @@ -149,6 +161,8 @@ pub struct PbsModuleConfig {
pub signer_client: Option<SignerClient>,
/// Event publisher
pub event_publisher: Option<BuilderEventPublisher>,
/// Muxes config
pub muxes: Option<HashMap<BlsPublicKey, RuntimeMuxConfig>>,
}

fn default_pbs() -> String {
Expand All @@ -158,6 +172,7 @@ fn default_pbs() -> String {
/// Loads the default pbs config, i.e. with no signer client or custom data
pub fn load_pbs_config() -> Result<PbsModuleConfig> {
let config = CommitBoostConfig::from_env_path()?;
config.validate()?;

// use endpoint from env if set, otherwise use default host and port
let endpoint = if let Some(endpoint) = load_optional_env_var(PBS_ENDPOINT_ENV) {
Expand All @@ -166,6 +181,11 @@ pub fn load_pbs_config() -> Result<PbsModuleConfig> {
SocketAddr::from((config.pbs.pbs_config.host, config.pbs.pbs_config.port))
};

let muxes = config
.muxes
.map(|muxes| muxes.validate_and_fill(&config.pbs.pbs_config, &config.relays))
.transpose()?;

let relay_clients =
config.relays.into_iter().map(RelayClient::new).collect::<Result<Vec<_>>>()?;
let maybe_publiher = BuilderEventPublisher::new_from_env()?;
Expand All @@ -177,6 +197,7 @@ pub fn load_pbs_config() -> Result<PbsModuleConfig> {
relays: relay_clients,
signer_client: None,
event_publisher: maybe_publiher,
muxes,
})
}

Expand All @@ -195,6 +216,7 @@ pub fn load_pbs_custom_config<T: DeserializeOwned>() -> Result<(PbsModuleConfig,
chain: Chain,
relays: Vec<RelayConfig>,
pbs: CustomPbsConfig<U>,
muxes: Option<PbsMuxes>,
}

// load module config including the extra data (if any)
Expand All @@ -211,6 +233,13 @@ pub fn load_pbs_custom_config<T: DeserializeOwned>() -> Result<(PbsModuleConfig,
))
};

let muxes = match cb_config.muxes {
Some(muxes) => Some(
muxes.validate_and_fill(&cb_config.pbs.static_config.pbs_config, &cb_config.relays)?,
),
None => None,
};

let relay_clients =
cb_config.relays.into_iter().map(RelayClient::new).collect::<Result<Vec<_>>>()?;
let maybe_publiher = BuilderEventPublisher::new_from_env()?;
Expand All @@ -232,6 +261,7 @@ pub fn load_pbs_custom_config<T: DeserializeOwned>() -> Result<(PbsModuleConfig,
relays: relay_clients,
signer_client,
event_publisher: maybe_publiher,
muxes,
},
cb_config.pbs.extra,
))
Expand Down
11 changes: 4 additions & 7 deletions crates/common/src/pbs/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{config::RelayConfig, DEFAULT_REQUEST_TIMEOUT};
/// A parsed entry of the relay url in the format: scheme://pubkey@host
#[derive(Debug, Clone)]
pub struct RelayEntry {
/// Default if of the relay, the hostname of the url
/// Default ID of the relay, the hostname of the url
pub id: String,
/// Public key of the relay
pub pubkey: BlsPublicKey,
Expand All @@ -42,8 +42,9 @@ impl<'de> Deserialize<'de> for RelayEntry {
D: serde::Deserializer<'de>,
{
let url = Url::deserialize(deserializer)?;
let pubkey = BlsPublicKey::from_hex(url.username()).map_err(serde::de::Error::custom)?;
let id = url.host().ok_or(serde::de::Error::custom("missing host"))?.to_string();
let pubkey = BlsPublicKey::from_hex(url.username())
.map_err(|_| serde::de::Error::custom("invalid BLS pubkey"))?;

Ok(RelayEntry { pubkey, url, id })
}
Expand Down Expand Up @@ -79,11 +80,7 @@ impl RelayClient {
.timeout(DEFAULT_REQUEST_TIMEOUT)
.build()?;

Ok(Self {
id: Arc::new(config.id.clone().unwrap_or(config.entry.id.clone())),
client,
config: Arc::new(config),
})
Ok(Self { id: Arc::new(config.id().to_owned()), client, config: Arc::new(config) })
}

pub fn pubkey(&self) -> BlsPublicKey {
Expand Down
3 changes: 3 additions & 0 deletions crates/common/src/pbs/types/get_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ use super::{

#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
pub struct GetHeaderParams {
/// The slot to request the header for
pub slot: u64,
/// The parent hash of the block to request the header for
pub parent_hash: B256,
/// The pubkey of the validator that is requesting the header
pub pubkey: BlsPublicKey,
}

Expand Down
Loading

0 comments on commit ca153cc

Please sign in to comment.