From ca153cc4ff4bcb60e6a1148c5448cba91157b890 Mon Sep 17 00:00:00 2001 From: ltitanb <163874448+ltitanb@users.noreply.github.com> Date: Mon, 25 Nov 2024 11:46:57 +0000 Subject: [PATCH] feat: add pbs mux (#172) * add pbs mux * comments --- config.example.toml | 20 ++++ configs/pbs-mux.toml | 29 +++++ crates/common/src/config/mod.rs | 7 ++ crates/common/src/config/mux.rs | 138 ++++++++++++++++++++++ crates/common/src/config/pbs.rs | 36 +++++- crates/common/src/pbs/relay.rs | 11 +- crates/common/src/pbs/types/get_header.rs | 3 + crates/pbs/src/mev_boost/get_header.rs | 16 ++- crates/pbs/src/state.rs | 12 ++ tests/src/mock_validator.rs | 7 +- tests/tests/pbs_integration.rs | 51 +++++++- 11 files changed, 311 insertions(+), 19 deletions(-) create mode 100644 configs/pbs-mux.toml create mode 100644 crates/common/src/config/mux.rs diff --git a/config.example.toml b/config.example.toml index 4c019706..a58c335d 100644 --- a/config.example.toml +++ b/config.example.toml @@ -97,6 +97,26 @@ target_first_request_ms = 200 # OPTIONAL frequency_get_header_ms = 300 +# Configuration for the PBS multiplexers, which enable different configs to be used for get header requests, depending on validator pubkey +# Note that: +# - multiple sets of keys can be defined by adding multiple [[mux]] sections. The validator pubkey sets need to be disjoint +# - the mux is only used for get header requests +# - if any value is missing from the mux config, the default value from the main config will be used +[[mux]] +# Which validator pubkeys to match against this mux config +validator_pubkeys = [ + "0x80c7f782b2467c5898c5516a8b6595d75623960b4afc4f71ee07d40985d20e117ba35e7cd352a3e75fb85a8668a3b745", + "0xa119589bb33ef52acbb8116832bec2b58fca590fe5c85eac5d3230b44d5bc09fe73ccd21f88eab31d6de16194d17782e", +] +timeout_get_header_ms = 900 +late_in_slot_time_ms = 1500 +# For each mux, one or more [[pbs_mux.relays]] can be defined, which will be used for the matching validator pubkeys +# Only the relays defined here will be used, and the rest of the relays defined in the main config will be ignored +# Any field defined here will override the default value from the relay config with the same id in [[relays]] +[[mux.relays]] +id = "example-relay" +headers = { X-MyCustomHeader = "ADifferentCustomValue" } + # Configuration for the Signer Module, only required if any `commit` module is present, or if `pbs.with_signer = true` # OPTIONAL [signer] diff --git a/configs/pbs-mux.toml b/configs/pbs-mux.toml new file mode 100644 index 00000000..5f77aa9d --- /dev/null +++ b/configs/pbs-mux.toml @@ -0,0 +1,29 @@ +# PBS config with a mux for a single validator + +chain = "Holesky" + +[pbs] +port = 18550 +timeout_get_header_ms = 950 +late_in_slot_time_ms = 2000 + +[[relays]] +id = "relay-1" +url = "http://0xa1cec75a3f0661e99299274182938151e8433c61a19222347ea1313d839229cb4ce4e3e5aa2bdeb71c8fcf1b084963c2@abc.xyz" + +[[relays]] +id = "relay-2" +url = "http://0xa119589bb33ef52acbb8116832bec2b58fca590fe5c85eac5d3230b44d5bc09fe73ccd21f88eab31d6de16194d17782e@def.xyz" +enable_timing_games = true +target_first_request_ms = 200 + +[[mux]] +validator_pubkeys = [ + "0x80c7f782b2467c5898c5516a8b6595d75623960b4afc4f71ee07d40985d20e117ba35e7cd352a3e75fb85a8668a3b745", +] +timeout_get_header_ms = 900 +late_in_slot_time_ms = 1500 + +[[mux.relays]] +id = "relay-2" +enable_timing_games = false diff --git a/crates/common/src/config/mod.rs b/crates/common/src/config/mod.rs index ac537cbd..94c5e368 100644 --- a/crates/common/src/config/mod.rs +++ b/crates/common/src/config/mod.rs @@ -9,6 +9,7 @@ mod constants; mod log; mod metrics; mod module; +mod mux; mod pbs; mod signer; mod utils; @@ -17,6 +18,7 @@ pub use constants::*; pub use log::*; pub use metrics::*; pub use module::*; +pub use mux::*; pub use pbs::*; pub use signer::*; pub use utils::*; @@ -26,6 +28,8 @@ pub struct CommitBoostConfig { pub chain: Chain, pub relays: Vec, pub pbs: StaticPbsConfig, + #[serde(flatten)] + pub muxes: Option, pub modules: Option>, pub signer: Option, pub metrics: Option, @@ -57,6 +61,7 @@ impl CommitBoostConfig { chain, relays: rest_config.relays, pbs: rest_config.pbs, + muxes: rest_config.muxes, modules: rest_config.modules, signer: rest_config.signer, metrics: rest_config.metrics, @@ -96,6 +101,8 @@ struct ChainConfig { struct HelperConfig { relays: Vec, pbs: StaticPbsConfig, + #[serde(flatten)] + muxes: Option, modules: Option>, signer: Option, metrics: Option, diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs new file mode 100644 index 00000000..85bc609d --- /dev/null +++ b/crates/common/src/config/mux.rs @@ -0,0 +1,138 @@ +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use alloy::rpc::types::beacon::BlsPublicKey; +use eyre::{bail, ensure, eyre}; +use serde::{Deserialize, Serialize}; + +use super::{PbsConfig, RelayConfig}; +use crate::pbs::{RelayClient, RelayEntry}; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct PbsMuxes { + /// List of PBS multiplexers + #[serde(rename = "mux")] + pub muxes: Vec, +} + +#[derive(Debug, Clone)] +pub struct RuntimeMuxConfig { + pub config: Arc, + pub relays: Vec, +} + +impl PbsMuxes { + pub fn validate_and_fill( + self, + default_pbs: &PbsConfig, + default_relays: &[RelayConfig], + ) -> eyre::Result> { + // check that validator pubkeys are in disjoint sets + let mut unique_pubkeys = HashSet::new(); + for mux in self.muxes.iter() { + for pubkey in mux.validator_pubkeys.iter() { + if !unique_pubkeys.insert(pubkey) { + bail!("duplicate validator pubkey in muxes: {pubkey}"); + } + } + } + + let mut configs = HashMap::new(); + // fill the configs using the default pbs config and relay entries + for mux in self.muxes { + ensure!(!mux.relays.is_empty(), "mux config must have at least one relay"); + ensure!( + !mux.validator_pubkeys.is_empty(), + "mux config must have at least one validator pubkey" + ); + + let mut relay_clients = Vec::with_capacity(mux.relays.len()); + for partial_relay in mux.relays.into_iter() { + // create a new config overriding only the missing fields + let partial_id = partial_relay.id()?; + // assume that there is always a relay defined in the default config. If this + // becomes too much of a burden, we can change this to allow defining relays + // that are exclusively used by a mux + let default_relay = default_relays + .iter() + .find(|r| r.id() == partial_id) + .ok_or_else(|| eyre!("default relay config not found for: {}", partial_id))?; + + let full_config = RelayConfig { + id: Some(partial_id.to_string()), + entry: partial_relay.entry.unwrap_or(default_relay.entry.clone()), + headers: partial_relay.headers.or(default_relay.headers.clone()), + enable_timing_games: partial_relay + .enable_timing_games + .unwrap_or(default_relay.enable_timing_games), + target_first_request_ms: partial_relay + .target_first_request_ms + .or(default_relay.target_first_request_ms), + frequency_get_header_ms: partial_relay + .frequency_get_header_ms + .or(default_relay.frequency_get_header_ms), + }; + + relay_clients.push(RelayClient::new(full_config)?); + } + + let config = PbsConfig { + timeout_get_header_ms: mux + .timeout_get_header_ms + .unwrap_or(default_pbs.timeout_get_header_ms), + late_in_slot_time_ms: mux + .late_in_slot_time_ms + .unwrap_or(default_pbs.late_in_slot_time_ms), + ..default_pbs.clone() + }; + let config = Arc::new(config); + + let runtime_config = RuntimeMuxConfig { config, relays: relay_clients }; + for pubkey in mux.validator_pubkeys.iter() { + configs.insert(*pubkey, runtime_config.clone()); + } + } + + Ok(configs) + } +} + +/// Configuration for the PBS Multiplexer +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct MuxConfig { + /// Relays to use for this mux config + pub relays: Vec, + /// Which validator pubkeys to match against this mux config + pub validator_pubkeys: Vec, + pub timeout_get_header_ms: Option, + pub late_in_slot_time_ms: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +/// A relay config with all optional fields. See [`RelayConfig`] for the +/// description of the fields. +pub struct PartialRelayConfig { + pub id: Option, + #[serde(rename = "url")] + pub entry: Option, + pub headers: Option>, + pub enable_timing_games: Option, + pub target_first_request_ms: Option, + pub frequency_get_header_ms: Option, +} + +impl PartialRelayConfig { + pub fn id(&self) -> eyre::Result<&str> { + match &self.id { + Some(id) => Ok(id.as_str()), + None => { + let entry = self.entry.as_ref().ok_or_else(|| { + eyre!("relays in [[mux]] need to specifify either an `id` or a `url`") + })?; + Ok(entry.id.as_str()) + } + } + } +} diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index 3b36d40f..0144ce88 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -6,17 +6,23 @@ use std::{ sync::Arc, }; -use alloy::primitives::{utils::format_ether, U256}; +use alloy::{ + primitives::{utils::format_ether, U256}, + rpc::types::beacon::BlsPublicKey, +}; use eyre::{ensure, Result}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use url::Url; use super::{ - constants::PBS_IMAGE_DEFAULT, load_optional_env_var, CommitBoostConfig, PBS_ENDPOINT_ENV, + constants::PBS_IMAGE_DEFAULT, load_optional_env_var, CommitBoostConfig, RuntimeMuxConfig, + PBS_ENDPOINT_ENV, }; use crate::{ commit::client::SignerClient, - config::{load_env_var, load_file_from_env, CONFIG_ENV, MODULE_JWT_ENV, SIGNER_URL_ENV}, + config::{ + load_env_var, load_file_from_env, PbsMuxes, CONFIG_ENV, MODULE_JWT_ENV, SIGNER_URL_ENV, + }, pbs::{ BuilderEventPublisher, DefaultTimeout, RelayClient, RelayEntry, DEFAULT_PBS_PORT, LATE_IN_SLOT_TIME_MS, @@ -45,6 +51,12 @@ pub struct RelayConfig { pub frequency_get_header_ms: Option, } +impl RelayConfig { + pub fn id(&self) -> &str { + self.id.as_deref().unwrap_or(self.entry.id.as_str()) + } +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct PbsConfig { /// Host to receive BuilderAPI calls from beacon node @@ -149,6 +161,8 @@ pub struct PbsModuleConfig { pub signer_client: Option, /// Event publisher pub event_publisher: Option, + /// Muxes config + pub muxes: Option>, } fn default_pbs() -> String { @@ -158,6 +172,7 @@ fn default_pbs() -> String { /// Loads the default pbs config, i.e. with no signer client or custom data pub fn load_pbs_config() -> Result { let config = CommitBoostConfig::from_env_path()?; + config.validate()?; // use endpoint from env if set, otherwise use default host and port let endpoint = if let Some(endpoint) = load_optional_env_var(PBS_ENDPOINT_ENV) { @@ -166,6 +181,11 @@ pub fn load_pbs_config() -> Result { SocketAddr::from((config.pbs.pbs_config.host, config.pbs.pbs_config.port)) }; + let muxes = config + .muxes + .map(|muxes| muxes.validate_and_fill(&config.pbs.pbs_config, &config.relays)) + .transpose()?; + let relay_clients = config.relays.into_iter().map(RelayClient::new).collect::>>()?; let maybe_publiher = BuilderEventPublisher::new_from_env()?; @@ -177,6 +197,7 @@ pub fn load_pbs_config() -> Result { relays: relay_clients, signer_client: None, event_publisher: maybe_publiher, + muxes, }) } @@ -195,6 +216,7 @@ pub fn load_pbs_custom_config() -> Result<(PbsModuleConfig, chain: Chain, relays: Vec, pbs: CustomPbsConfig, + muxes: Option, } // load module config including the extra data (if any) @@ -211,6 +233,13 @@ pub fn load_pbs_custom_config() -> Result<(PbsModuleConfig, )) }; + let muxes = match cb_config.muxes { + Some(muxes) => Some( + muxes.validate_and_fill(&cb_config.pbs.static_config.pbs_config, &cb_config.relays)?, + ), + None => None, + }; + let relay_clients = cb_config.relays.into_iter().map(RelayClient::new).collect::>>()?; let maybe_publiher = BuilderEventPublisher::new_from_env()?; @@ -232,6 +261,7 @@ pub fn load_pbs_custom_config() -> Result<(PbsModuleConfig, relays: relay_clients, signer_client, event_publisher: maybe_publiher, + muxes, }, cb_config.pbs.extra, )) diff --git a/crates/common/src/pbs/relay.rs b/crates/common/src/pbs/relay.rs index 3a0702e1..db18466a 100644 --- a/crates/common/src/pbs/relay.rs +++ b/crates/common/src/pbs/relay.rs @@ -19,7 +19,7 @@ use crate::{config::RelayConfig, DEFAULT_REQUEST_TIMEOUT}; /// A parsed entry of the relay url in the format: scheme://pubkey@host #[derive(Debug, Clone)] pub struct RelayEntry { - /// Default if of the relay, the hostname of the url + /// Default ID of the relay, the hostname of the url pub id: String, /// Public key of the relay pub pubkey: BlsPublicKey, @@ -42,8 +42,9 @@ impl<'de> Deserialize<'de> for RelayEntry { D: serde::Deserializer<'de>, { let url = Url::deserialize(deserializer)?; - let pubkey = BlsPublicKey::from_hex(url.username()).map_err(serde::de::Error::custom)?; let id = url.host().ok_or(serde::de::Error::custom("missing host"))?.to_string(); + let pubkey = BlsPublicKey::from_hex(url.username()) + .map_err(|_| serde::de::Error::custom("invalid BLS pubkey"))?; Ok(RelayEntry { pubkey, url, id }) } @@ -79,11 +80,7 @@ impl RelayClient { .timeout(DEFAULT_REQUEST_TIMEOUT) .build()?; - Ok(Self { - id: Arc::new(config.id.clone().unwrap_or(config.entry.id.clone())), - client, - config: Arc::new(config), - }) + Ok(Self { id: Arc::new(config.id().to_owned()), client, config: Arc::new(config) }) } pub fn pubkey(&self) -> BlsPublicKey { diff --git a/crates/common/src/pbs/types/get_header.rs b/crates/common/src/pbs/types/get_header.rs index f8591a24..ebffa946 100644 --- a/crates/common/src/pbs/types/get_header.rs +++ b/crates/common/src/pbs/types/get_header.rs @@ -12,8 +12,11 @@ use super::{ #[derive(Debug, Serialize, Deserialize, Clone, Copy)] pub struct GetHeaderParams { + /// The slot to request the header for pub slot: u64, + /// The parent hash of the block to request the header for pub parent_hash: B256, + /// The pubkey of the validator that is requesting the header pub pubkey: BlsPublicKey, } diff --git a/crates/pbs/src/mev_boost/get_header.rs b/crates/pbs/src/mev_boost/get_header.rs index b13a4e0c..42eec95c 100644 --- a/crates/pbs/src/mev_boost/get_header.rs +++ b/crates/pbs/src/mev_boost/get_header.rs @@ -51,15 +51,22 @@ pub async fn get_header( } let ms_into_slot = ms_into_slot(params.slot, state.config.chain); - let max_timeout_ms = state - .pbs_config() + let (pbs_config, relays, is_mux) = state.mux_config_and_relays(¶ms.pubkey); + + if is_mux { + debug!(pubkey = %params.pubkey, relays = relays.len(), "using mux config"); + } else { + debug!(pubkey = %params.pubkey, relays = relays.len(), "using default config"); + } + + let max_timeout_ms = pbs_config .timeout_get_header_ms - .min(state.pbs_config().late_in_slot_time_ms.saturating_sub(ms_into_slot)); + .min(pbs_config.late_in_slot_time_ms.saturating_sub(ms_into_slot)); if max_timeout_ms == 0 { warn!( ms_into_slot, - threshold = state.pbs_config().late_in_slot_time_ms, + threshold = pbs_config.late_in_slot_time_ms, "late in slot, skipping relay requests" ); @@ -73,7 +80,6 @@ pub async fn get_header( send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?); send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?); - let relays = state.relays(); let mut handles = Vec::with_capacity(relays.len()); for relay in relays.iter() { handles.push(send_timed_get_header( diff --git a/crates/pbs/src/state.rs b/crates/pbs/src/state.rs index eb910f0a..bee228eb 100644 --- a/crates/pbs/src/state.rs +++ b/crates/pbs/src/state.rs @@ -82,6 +82,18 @@ where pub fn relays(&self) -> &[RelayClient] { &self.config.relays } + /// Returns the PBS config and relay clients for the given validator pubkey. + /// If the pubkey is not found in any mux, the default configs are + /// returned + pub fn mux_config_and_relays( + &self, + pubkey: &BlsPublicKey, + ) -> (&PbsConfig, &[RelayClient], bool) { + match self.config.muxes.as_ref().and_then(|muxes| muxes.get(pubkey)) { + Some(mux) => (&mux.config, mux.relays.as_slice(), true), + None => (self.pbs_config(), self.relays(), false), + } + } pub fn has_monitors(&self) -> bool { !self.config.pbs_config.relay_monitors.is_empty() diff --git a/tests/src/mock_validator.rs b/tests/src/mock_validator.rs index 44301235..a8f6a8a3 100644 --- a/tests/src/mock_validator.rs +++ b/tests/src/mock_validator.rs @@ -16,8 +16,11 @@ impl MockValidator { Ok(Self { comm_boost: generate_mock_relay(port, BlsPublicKey::default())? }) } - pub async fn do_get_header(&self) -> Result<(), Error> { - let url = self.comm_boost.get_header_url(0, B256::ZERO, BlsPublicKey::ZERO).unwrap(); + pub async fn do_get_header(&self, pubkey: Option) -> Result<(), Error> { + let url = self + .comm_boost + .get_header_url(0, B256::ZERO, pubkey.unwrap_or(BlsPublicKey::ZERO)) + .unwrap(); let res = self.comm_boost.client.get(url).send().await?.bytes().await?; assert!(serde_json::from_slice::(&res).is_ok()); diff --git a/tests/tests/pbs_integration.rs b/tests/tests/pbs_integration.rs index 9fdf2bab..28596d2a 100644 --- a/tests/tests/pbs_integration.rs +++ b/tests/tests/pbs_integration.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, net::{Ipv4Addr, SocketAddr}, sync::Arc, time::Duration, @@ -7,7 +8,7 @@ use std::{ use alloy::primitives::U256; use cb_common::{ - config::{PbsConfig, PbsModuleConfig}, + config::{PbsConfig, PbsModuleConfig, RuntimeMuxConfig}, pbs::RelayClient, signer::{random_secret, BlsPublicKey}, types::Chain, @@ -48,6 +49,7 @@ fn to_pbs_config(chain: Chain, pbs_config: PbsConfig, relays: Vec) signer_client: None, event_publisher: None, relays, + muxes: None, } } @@ -73,7 +75,7 @@ async fn test_get_header() -> Result<()> { let mock_validator = MockValidator::new(port)?; info!("Sending get header"); - let res = mock_validator.do_get_header().await; + let res = mock_validator.do_get_header(None).await; assert!(res.is_ok()); assert_eq!(mock_state.received_get_header(), 1); @@ -197,3 +199,48 @@ async fn test_submit_block_too_large() -> Result<()> { assert_eq!(mock_state.received_submit_block(), 1); Ok(()) } + +#[tokio::test] +async fn test_mux() -> Result<()> { + setup_test_env(); + let signer = random_secret(); + let pubkey_1: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); + let signer_2 = random_secret(); + let pubkey_2: BlsPublicKey = blst_pubkey_to_alloy(&signer_2.sk_to_pk()).into(); + + let chain = Chain::Holesky; + let port = 3600; + + let mux_relay = generate_mock_relay(port + 1, *pubkey_1)?; + let relays = vec![mux_relay.clone(), generate_mock_relay(port + 2, *pubkey_2)?]; + let mock_state = Arc::new(MockRelayState::new(chain, signer)); + tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); + tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 2)); + + let mut config = to_pbs_config(chain, get_pbs_static_config(port), relays); + let mux = RuntimeMuxConfig { config: config.pbs_config.clone(), relays: vec![mux_relay] }; + + let validator_pubkey = blst_pubkey_to_alloy(&random_secret().sk_to_pk()); + + config.muxes = Some(HashMap::from([(validator_pubkey, mux)])); + + let state = PbsState::new(config); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + + // leave some time to start servers + tokio::time::sleep(Duration::from_millis(100)).await; + + let mock_validator = MockValidator::new(port)?; + info!("Sending get header with default"); + let res = mock_validator.do_get_header(None).await; + + assert!(res.is_ok()); + assert_eq!(mock_state.received_get_header(), 2); // both relays were used + + info!("Sending get header with mux"); + let res = mock_validator.do_get_header(Some(validator_pubkey)).await; + + assert!(res.is_ok()); + assert_eq!(mock_state.received_get_header(), 3); // only one relay was used + Ok(()) +}