From f374636c3ed4db74ec826aeff39601463fa7351c Mon Sep 17 00:00:00 2001 From: ltitanb Date: Fri, 9 Aug 2024 13:06:03 +0100 Subject: [PATCH 1/2] add relay monitor support --- config.example.toml | 3 +++ crates/common/src/config/mod.rs | 14 ++++++++++++-- crates/common/src/config/pbs.rs | 21 ++++++++++++++++++++- crates/pbs/src/state.rs | 4 ++++ tests/tests/pbs_integration.rs | 1 + 5 files changed, 40 insertions(+), 3 deletions(-) diff --git a/config.example.toml b/config.example.toml index eec19657..95633156 100644 --- a/config.example.toml +++ b/config.example.toml @@ -34,6 +34,9 @@ skip_sigverify = false # Minimum bid in ETH that will be accepted from `get_header` # OPTIONAL, DEFAULT: 0.0 min_bid_eth = 0.0 +# List of URLs of relay monitors to send registrations to +# OPTIONAL +relay_monitors = [] # How late in milliseconds in the slot is "late". This impacts the `get_header` requests, by shortening timeouts for `get_header` calls to # relays and make sure a header is returned within this deadline. If the request from the CL comes later in the slot, then fetching headers is skipped # to force local building and miniminzing the risk of missed slots. See also the timing games section below diff --git a/crates/common/src/config/mod.rs b/crates/common/src/config/mod.rs index a3ece576..9bd82a14 100644 --- a/crates/common/src/config/mod.rs +++ b/crates/common/src/config/mod.rs @@ -34,11 +34,21 @@ pub struct CommitBoostConfig { } impl CommitBoostConfig { + /// Validate config + pub fn validate(&self) -> Result<()> { + self.pbs.pbs_config.validate()?; + Ok(()) + } + pub fn from_file(path: &str) -> Result { - load_from_file(path) + let config: Self = load_from_file(path)?; + config.validate()?; + Ok(config) } pub fn from_env_path() -> Result { - load_file_from_env(CB_CONFIG_ENV) + let config: Self = load_file_from_env(CB_CONFIG_ENV)?; + config.validate()?; + Ok(config) } } diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index be85e21d..9ce95eb3 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -3,8 +3,9 @@ use std::{collections::HashMap, sync::Arc}; use alloy::primitives::U256; -use eyre::Result; +use eyre::{Context, Result}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use url::Url; use super::{constants::PBS_DEFAULT_IMAGE, CommitBoostConfig}; use crate::{ @@ -55,11 +56,25 @@ pub struct PbsConfig { /// Minimum bid that will be accepted from get_header #[serde(rename = "min_bid_eth", with = "as_eth_str", default = "default_u256")] pub min_bid_wei: U256, + /// List of relay monitor urls in the form of scheme://host + #[serde(default)] + pub relay_monitors: Vec, /// How late in the slot we consider to be "late" #[serde(default = "default_u64::")] pub late_in_slot_time_ms: u64, } +impl PbsConfig { + /// Validate PBS config parameters + pub fn validate(&self) -> Result<()> { + for monitor in &self.relay_monitors { + Url::parse(monitor).wrap_err(format!("Invalid relay monitor URL: {}", monitor))?; + } + + Ok(()) + } +} + /// Static pbs config from config file #[derive(Debug, Default, Deserialize, Serialize)] pub struct StaticPbsConfig { @@ -96,6 +111,8 @@ fn default_pbs() -> String { /// Loads the default pbs config, i.e. with no signer client or custom data pub fn load_pbs_config() -> Result { let config = CommitBoostConfig::from_env_path()?; + config.pbs.pbs_config.validate()?; + let relay_clients = config.relays.into_iter().map(RelayClient::new).collect::>>()?; let maybe_publiher = BuilderEventPublisher::new_from_env(); @@ -128,6 +145,8 @@ pub fn load_pbs_custom_config() -> Result<(PbsModuleConfig, // load module config including the extra data (if any) let cb_config: StubConfig = load_file_from_env(CB_CONFIG_ENV)?; + cb_config.pbs.static_config.pbs_config.validate()?; + let relay_clients = cb_config.relays.into_iter().map(RelayClient::new).collect::>>()?; let maybe_publiher = BuilderEventPublisher::new_from_env(); diff --git a/crates/pbs/src/state.rs b/crates/pbs/src/state.rs index 6b095291..46ac0423 100644 --- a/crates/pbs/src/state.rs +++ b/crates/pbs/src/state.rs @@ -82,6 +82,10 @@ where &self.config.relays } + pub fn has_monitors(&self) -> bool { + !self.config.pbs_config.relay_monitors.is_empty() + } + /// Add some bids to the cache, the bids are all assumed to be for the /// provided slot Returns the bid with the max value pub fn add_bids(&self, slot: u64, bids: Vec) -> Option { diff --git a/tests/tests/pbs_integration.rs b/tests/tests/pbs_integration.rs index d5fb9f71..eb3cff56 100644 --- a/tests/tests/pbs_integration.rs +++ b/tests/tests/pbs_integration.rs @@ -38,6 +38,7 @@ fn get_pbs_static_config(port: u16) -> PbsConfig { skip_sigverify: false, min_bid_wei: U256::ZERO, late_in_slot_time_ms: u64::MAX, + relay_monitors: vec![], } } From b493e9f0682b55f2d3ff9792ce557a1dacc63ea2 Mon Sep 17 00:00:00 2001 From: ltitanb Date: Fri, 9 Aug 2024 13:32:54 +0100 Subject: [PATCH 2/2] use `Url` instead of strings --- Cargo.toml | 4 +- crates/common/src/config/pbs.rs | 15 ++-- crates/common/src/pbs/constants.rs | 2 +- crates/common/src/pbs/error.rs | 69 +++++++++++++++ crates/common/src/pbs/mod.rs | 1 + crates/common/src/pbs/relay.rs | 87 ++++++++++++++----- crates/pbs/Cargo.toml | 1 + crates/pbs/src/error.rs | 65 -------------- crates/pbs/src/mev_boost/get_header.rs | 10 +-- .../pbs/src/mev_boost/register_validator.rs | 5 +- crates/pbs/src/mev_boost/status.rs | 8 +- crates/pbs/src/mev_boost/submit_block.rs | 4 +- crates/pbs/src/routes/register_validator.rs | 60 ++++++++++++- crates/pbs/src/routes/router.rs | 4 +- tests/src/mock_relay.rs | 4 +- tests/src/mock_validator.rs | 8 +- tests/src/utils.rs | 12 ++- 17 files changed, 232 insertions(+), 127 deletions(-) create mode 100644 crates/common/src/pbs/error.rs diff --git a/Cargo.toml b/Cargo.toml index 8b3f1e6e..2f682612 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,7 +72,7 @@ clap = { version = "4.5.4", features = ["derive", "env"] } thiserror = "1.0.61" color-eyre = "0.6.3" eyre = "0.6.12" -url = "2.5.0" +url = { version = "2.5.0", features = ["serde"] } uuid = { version = "1.8.0", features = ["v4", "fast-rng", "serde"] } typenum = "1.17.0" rand = "0.8.5" @@ -80,4 +80,4 @@ dotenvy = "0.15.7" indexmap = "2.2.6" lazy_static = "1.5.0" bimap = { version = "0.6.3", features = ["serde"] } -derive_more = "0.99.18" \ No newline at end of file +derive_more = "0.99.18" diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index 9ce95eb3..ca7dc332 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc}; use alloy::primitives::U256; -use eyre::{Context, Result}; +use eyre::Result; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use url::Url; @@ -16,7 +16,7 @@ use crate::{ utils::{as_eth_str, default_bool, default_u256, default_u64}, }; -#[derive(Debug, Clone, Default, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct RelayConfig { /// Relay ID, if missing will default to the URL hostname from the entry pub id: Option, @@ -34,7 +34,7 @@ pub struct RelayConfig { pub frequency_get_header_ms: Option, } -#[derive(Debug, Clone, Default, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct PbsConfig { /// Port to receive BuilderAPI calls from beacon node pub port: u16, @@ -58,7 +58,7 @@ pub struct PbsConfig { pub min_bid_wei: U256, /// List of relay monitor urls in the form of scheme://host #[serde(default)] - pub relay_monitors: Vec, + pub relay_monitors: Vec, /// How late in the slot we consider to be "late" #[serde(default = "default_u64::")] pub late_in_slot_time_ms: u64, @@ -67,16 +67,12 @@ pub struct PbsConfig { impl PbsConfig { /// Validate PBS config parameters pub fn validate(&self) -> Result<()> { - for monitor in &self.relay_monitors { - Url::parse(monitor).wrap_err(format!("Invalid relay monitor URL: {}", monitor))?; - } - Ok(()) } } /// Static pbs config from config file -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct StaticPbsConfig { /// Docker image of the module #[serde(default = "default_pbs")] @@ -111,7 +107,6 @@ fn default_pbs() -> String { /// Loads the default pbs config, i.e. with no signer client or custom data pub fn load_pbs_config() -> Result { let config = CommitBoostConfig::from_env_path()?; - config.pbs.pbs_config.validate()?; let relay_clients = config.relays.into_iter().map(RelayClient::new).collect::>>()?; diff --git a/crates/common/src/pbs/constants.rs b/crates/common/src/pbs/constants.rs index b118c536..e8e0c9e1 100644 --- a/crates/common/src/pbs/constants.rs +++ b/crates/common/src/pbs/constants.rs @@ -1,4 +1,4 @@ -pub const BULDER_API_PATH: &str = "/eth/v1/builder"; +pub const BUILDER_API_PATH: &str = "/eth/v1/builder"; pub const GET_HEADER_PATH: &str = "/header/:slot/:parent_hash/:pubkey"; pub const GET_STATUS_PATH: &str = "/status"; diff --git a/crates/common/src/pbs/error.rs b/crates/common/src/pbs/error.rs new file mode 100644 index 00000000..83ea4b6b --- /dev/null +++ b/crates/common/src/pbs/error.rs @@ -0,0 +1,69 @@ +use alloy::{ + primitives::{B256, U256}, + rpc::types::beacon::BlsPublicKey, +}; +use thiserror::Error; + +use crate::error::BlstErrorWrapper; + +#[derive(Debug, Error)] +pub enum PbsError { + #[error("axum error: {0}")] + AxumError(#[from] axum::Error), + + #[error("reqwest error: {0}")] + Reqwest(#[from] reqwest::Error), + + #[error("serde decode error: {0}")] + SerdeDecodeError(#[from] serde_json::Error), + + #[error("relay response error. Code: {code}, err: {error_msg}")] + RelayResponse { error_msg: String, code: u16 }, + + #[error("failed validating relay response: {0}")] + Validation(#[from] ValidationError), + + #[error("URL parsing error: {0}")] + UrlParsing(#[from] url::ParseError), +} + +impl PbsError { + pub fn is_timeout(&self) -> bool { + matches!(self, PbsError::Reqwest(err) if err.is_timeout()) + } +} + +#[derive(Debug, Error, PartialEq, Eq)] +pub enum ValidationError { + #[error("empty blockhash")] + EmptyBlockhash, + + #[error("pubkey mismatch: expected {expected} got {got}")] + PubkeyMismatch { expected: BlsPublicKey, got: BlsPublicKey }, + + #[error("parent hash mismatch: expected {expected} got {got}")] + ParentHashMismatch { expected: B256, got: B256 }, + + #[error("block hash mismatch: expected {expected} got {got}")] + BlockHashMismatch { expected: B256, got: B256 }, + + #[error("mismatch in KZG commitments: exepcted_blobs: {expected_blobs} got_blobs: {got_blobs} got_commitments: {got_commitments} got_proofs: {got_proofs}")] + KzgCommitments { + expected_blobs: usize, + got_blobs: usize, + got_commitments: usize, + got_proofs: usize, + }, + + #[error("mismatch in KZG blob commitment: expected: {expected} got: {got} index: {index}")] + KzgMismatch { expected: String, got: String, index: usize }, + + #[error("bid below minimum: min: {min} got {got}")] + BidTooLow { min: U256, got: U256 }, + + #[error("empty tx root")] + EmptyTxRoot, + + #[error("failed signature verification: {0:?}")] + Sigverify(#[from] BlstErrorWrapper), +} diff --git a/crates/common/src/pbs/mod.rs b/crates/common/src/pbs/mod.rs index 6ec42615..05f4f77b 100644 --- a/crates/common/src/pbs/mod.rs +++ b/crates/common/src/pbs/mod.rs @@ -1,4 +1,5 @@ mod constants; +pub mod error; mod event; mod relay; mod types; diff --git a/crates/common/src/pbs/relay.rs b/crates/common/src/pbs/relay.rs index cbbe89e8..903277b0 100644 --- a/crates/common/src/pbs/relay.rs +++ b/crates/common/src/pbs/relay.rs @@ -4,25 +4,26 @@ use alloy::{ primitives::{hex::FromHex, B256}, rpc::types::beacon::BlsPublicKey, }; -use eyre::{Result, WrapErr}; +use eyre::WrapErr; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use serde::{Deserialize, Serialize}; use url::Url; use super::{ - constants::{BULDER_API_PATH, GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH}, + constants::{BUILDER_API_PATH, GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH}, + error::PbsError, HEADER_VERSION_KEY, HEADER_VERSION_VALUE, }; use crate::{config::RelayConfig, DEFAULT_REQUEST_TIMEOUT}; /// A parsed entry of the relay url in the format: scheme://pubkey@host -#[derive(Debug, Default, Clone)] +#[derive(Debug, Clone)] pub struct RelayEntry { /// Default if of the relay, the hostname of the url pub id: String, /// Public key of the relay pub pubkey: BlsPublicKey, /// Full url of the relay - pub url: String, + pub url: Url, } impl Serialize for RelayEntry { @@ -30,7 +31,7 @@ impl Serialize for RelayEntry { where S: serde::Serializer, { - serializer.serialize_str(&self.url) + self.url.serialize(serializer) } } @@ -39,12 +40,11 @@ impl<'de> Deserialize<'de> for RelayEntry { where D: serde::Deserializer<'de>, { - let str = String::deserialize(deserializer)?; - let url = Url::parse(&str).map_err(serde::de::Error::custom)?; + let url = Url::deserialize(deserializer)?; let pubkey = BlsPublicKey::from_hex(url.username()).map_err(serde::de::Error::custom)?; let id = url.host().ok_or(serde::de::Error::custom("missing host"))?.to_string(); - Ok(RelayEntry { pubkey, url: str, id }) + Ok(RelayEntry { pubkey, url, id }) } } @@ -60,7 +60,7 @@ pub struct RelayClient { } impl RelayClient { - pub fn new(config: RelayConfig) -> Result { + pub fn new(config: RelayConfig) -> eyre::Result { let mut headers = HeaderMap::new(); headers.insert(HEADER_VERSION_KEY, HeaderValue::from_static(HEADER_VERSION_VALUE)); @@ -90,8 +90,11 @@ impl RelayClient { } // URL builders - pub fn get_url(&self, path: &str) -> String { - format!("{}{path}", &self.config.entry.url) + pub fn get_url(&self, path: &str) -> Result { + self.config.entry.url.join(path).map_err(PbsError::UrlParsing) + } + pub fn builder_api_url(&self, path: &str) -> Result { + self.get_url(&format!("{BUILDER_API_PATH}{path}")) } pub fn get_header_url( @@ -99,37 +102,75 @@ impl RelayClient { slot: u64, parent_hash: B256, validator_pubkey: BlsPublicKey, - ) -> String { - self.get_url(&format!("{BULDER_API_PATH}/header/{slot}/{parent_hash}/{validator_pubkey}")) + ) -> Result { + self.builder_api_url(&format!("/header/{slot}/{parent_hash}/{validator_pubkey}")) } - pub fn get_status_url(&self) -> String { - self.get_url(&format!("{BULDER_API_PATH}{GET_STATUS_PATH}")) + pub fn get_status_url(&self) -> Result { + self.builder_api_url(GET_STATUS_PATH) } - pub fn register_validator_url(&self) -> String { - self.get_url(&format!("{BULDER_API_PATH}{REGISTER_VALIDATOR_PATH}")) + pub fn register_validator_url(&self) -> Result { + self.builder_api_url(REGISTER_VALIDATOR_PATH) } - pub fn submit_block_url(&self) -> String { - self.get_url(&format!("{BULDER_API_PATH}{SUBMIT_BLOCK_PATH}")) + pub fn submit_block_url(&self) -> Result { + self.builder_api_url(SUBMIT_BLOCK_PATH) } } #[cfg(test)] mod tests { - use alloy::{primitives::hex::FromHex, rpc::types::beacon::BlsPublicKey}; + use alloy::{ + primitives::{hex::FromHex, B256}, + rpc::types::beacon::BlsPublicKey, + }; - use super::RelayEntry; + use super::{RelayClient, RelayEntry}; + use crate::config::RelayConfig; #[test] fn test_relay_entry() { - let s = "http://0xac6e77dfe25ecd6110b8e780608cce0dab71fdd5ebea22a16c0205200f2f8e2e3ad3b71d3499c54ad14d6c21b41a37ae@abc.xyz"; + let s = "http://0xac6e77dfe25ecd6110b8e780608cce0dab71fdd5ebea22a16c0205200f2f8e2e3ad3b71d3499c54ad14d6c21b41a37ae@abc.xyz/"; let parsed = serde_json::from_str::(&format!("\"{s}\"")).unwrap(); assert_eq!(parsed.pubkey, BlsPublicKey::from_hex("0xac6e77dfe25ecd6110b8e780608cce0dab71fdd5ebea22a16c0205200f2f8e2e3ad3b71d3499c54ad14d6c21b41a37ae").unwrap()); - assert_eq!(parsed.url, s); + assert_eq!(parsed.url.as_str(), s); assert_eq!(parsed.id, "abc.xyz"); } + + #[test] + fn test_relay_url() { + let slot = 0; + let parent_hash = B256::ZERO; + let validator_pubkey = BlsPublicKey::ZERO; + let expected = format!("http://0xa1cec75a3f0661e99299274182938151e8433c61a19222347ea1313d839229cb4ce4e3e5aa2bdeb71c8fcf1b084963c2@abc.xyz/eth/v1/builder/header/{slot}/{parent_hash}/{validator_pubkey}"); + + let relay_config = r#" + { + "url": "http://0xa1cec75a3f0661e99299274182938151e8433c61a19222347ea1313d839229cb4ce4e3e5aa2bdeb71c8fcf1b084963c2@abc.xyz" + }"#; + + let config = serde_json::from_str::(relay_config).unwrap(); + let relay = RelayClient::new(config).unwrap(); + + assert_eq!( + relay.get_header_url(slot, parent_hash, validator_pubkey).unwrap().to_string(), + expected + ); + + let relay_config = r#" + { + "url": "http://0xa1cec75a3f0661e99299274182938151e8433c61a19222347ea1313d839229cb4ce4e3e5aa2bdeb71c8fcf1b084963c2@abc.xyz//" + }"#; + + let config = serde_json::from_str::(relay_config).unwrap(); + let relay = RelayClient::new(config).unwrap(); + + assert_eq!( + relay.get_header_url(slot, parent_hash, validator_pubkey).unwrap().to_string(), + expected + ); + } } diff --git a/crates/pbs/Cargo.toml b/crates/pbs/Cargo.toml index c07f40c1..913a70e8 100644 --- a/crates/pbs/Cargo.toml +++ b/crates/pbs/Cargo.toml @@ -38,6 +38,7 @@ tree_hash_derive.workspace = true # misc thiserror.workspace = true eyre.workspace = true +url.workspace = true uuid.workspace = true typenum.workspace = true lazy_static.workspace = true diff --git a/crates/pbs/src/error.rs b/crates/pbs/src/error.rs index 92d86b7c..b085114d 100644 --- a/crates/pbs/src/error.rs +++ b/crates/pbs/src/error.rs @@ -1,10 +1,4 @@ -use alloy::{ - primitives::{B256, U256}, - rpc::types::beacon::BlsPublicKey, -}; use axum::{http::StatusCode, response::IntoResponse}; -use cb_common::error::BlstErrorWrapper; -use thiserror::Error; #[derive(Debug)] /// Errors that the PbsService returns to client @@ -32,62 +26,3 @@ impl IntoResponse for PbsClientError { (self.status_code(), msg).into_response() } } - -#[derive(Debug, Error)] -pub enum PbsError { - #[error("axum error: {0}")] - AxumError(#[from] axum::Error), - - #[error("reqwest error: {0}")] - Reqwest(#[from] reqwest::Error), - - #[error("serde decode error: {0}")] - SerdeDecodeError(#[from] serde_json::Error), - - #[error("relay response error. Code: {code}, err: {error_msg}")] - RelayResponse { error_msg: String, code: u16 }, - - #[error("failed validating relay response: {0}")] - Validation(#[from] ValidationError), -} - -impl PbsError { - pub fn is_timeout(&self) -> bool { - matches!(self, PbsError::Reqwest(err) if err.is_timeout()) - } -} - -#[derive(Debug, Error, PartialEq, Eq)] -pub enum ValidationError { - #[error("empty blockhash")] - EmptyBlockhash, - - #[error("pubkey mismatch: expected {expected} got {got}")] - PubkeyMismatch { expected: BlsPublicKey, got: BlsPublicKey }, - - #[error("parent hash mismatch: expected {expected} got {got}")] - ParentHashMismatch { expected: B256, got: B256 }, - - #[error("block hash mismatch: expected {expected} got {got}")] - BlockHashMismatch { expected: B256, got: B256 }, - - #[error("mismatch in KZG commitments: exepcted_blobs: {expected_blobs} got_blobs: {got_blobs} got_commitments: {got_commitments} got_proofs: {got_proofs}")] - KzgCommitments { - expected_blobs: usize, - got_blobs: usize, - got_commitments: usize, - got_proofs: usize, - }, - - #[error("mismatch in KZG blob commitment: expected: {expected} got: {got} index: {index}")] - KzgMismatch { expected: String, got: String, index: usize }, - - #[error("bid below minimum: min: {min} got {got}")] - BidTooLow { min: U256, got: U256 }, - - #[error("empty tx root")] - EmptyTxRoot, - - #[error("failed signature verification: {0:?}")] - Sigverify(#[from] BlstErrorWrapper), -} diff --git a/crates/pbs/src/mev_boost/get_header.rs b/crates/pbs/src/mev_boost/get_header.rs index a1ca1c6a..550aaedc 100644 --- a/crates/pbs/src/mev_boost/get_header.rs +++ b/crates/pbs/src/mev_boost/get_header.rs @@ -8,6 +8,7 @@ use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ config::PbsConfig, pbs::{ + error::{PbsError, ValidationError}, GetHeaderParams, GetHeaderReponse, RelayClient, SignedExecutionPayloadHeader, EMPTY_TX_ROOT_HASH, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS, }, @@ -19,10 +20,10 @@ use futures::future::join_all; use reqwest::{header::USER_AGENT, StatusCode}; use tokio::time::sleep; use tracing::{debug, error, warn, Instrument}; +use url::Url; use crate::{ constants::{GET_HEADER_ENDPOINT_TAG, TIMEOUT_ERROR_CODE, TIMEOUT_ERROR_CODE_STR}, - error::{PbsError, ValidationError}, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, }; @@ -97,7 +98,7 @@ async fn send_timed_get_header( ms_into_slot: u64, mut timeout_left_ms: u64, ) -> Result, PbsError> { - let url = relay.get_header_url(params.slot, params.parent_hash, params.pubkey); + let url = relay.get_header_url(params.slot, params.parent_hash, params.pubkey)?; if relay.config.enable_timing_games { if let Some(target_ms) = relay.config.target_first_request_ms { @@ -193,7 +194,7 @@ async fn send_timed_get_header( } struct RequestConfig { - url: String, + url: Url, timeout_ms: u64, headers: HeaderMap, } @@ -337,13 +338,12 @@ mod tests { }; use blst::min_pk; use cb_common::{ - pbs::{SignedExecutionPayloadHeader, EMPTY_TX_ROOT_HASH}, + pbs::{error::ValidationError, SignedExecutionPayloadHeader, EMPTY_TX_ROOT_HASH}, signature::sign_builder_message, types::Chain, }; use super::validate_header; - use crate::error::ValidationError; #[test] fn test_validate_header() { diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 4cc4b53c..d44ca2d3 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -3,7 +3,7 @@ use std::time::{Duration, Instant}; use alloy::rpc::types::beacon::relay::ValidatorRegistration; use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ - pbs::{RelayClient, HEADER_START_TIME_UNIX_MS}, + pbs::{error::PbsError, RelayClient, HEADER_START_TIME_UNIX_MS}, utils::{get_user_agent_with_version, utcnow_ms}, }; use eyre::bail; @@ -13,7 +13,6 @@ use tracing::{debug, error}; use crate::{ constants::{REGISTER_VALIDATOR_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, - error::PbsError, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, }; @@ -58,7 +57,7 @@ async fn send_register_validator( headers: HeaderMap, timeout_ms: u64, ) -> Result<(), PbsError> { - let url = relay.register_validator_url(); + let url = relay.register_validator_url()?; let start_request = Instant::now(); let res = match relay diff --git a/crates/pbs/src/mev_boost/status.rs b/crates/pbs/src/mev_boost/status.rs index 42df281f..13b29035 100644 --- a/crates/pbs/src/mev_boost/status.rs +++ b/crates/pbs/src/mev_boost/status.rs @@ -1,14 +1,16 @@ use std::time::{Duration, Instant}; use axum::http::HeaderMap; -use cb_common::{pbs::RelayClient, utils::get_user_agent_with_version}; +use cb_common::{ + pbs::{error::PbsError, RelayClient}, + utils::get_user_agent_with_version, +}; use futures::future::select_ok; use reqwest::header::USER_AGENT; use tracing::{debug, error}; use crate::{ constants::{STATUS_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, - error::PbsError, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, }; @@ -45,7 +47,7 @@ pub async fn get_status( #[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref()))] async fn send_relay_check(relay: &RelayClient, headers: HeaderMap) -> Result<(), PbsError> { - let url = relay.get_status_url(); + let url = relay.get_status_url()?; let start_request = Instant::now(); let res = match relay diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index 8f9b81ac..9ae83df0 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -3,6 +3,7 @@ use std::time::{Duration, Instant}; use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ pbs::{ + error::{PbsError, ValidationError}, RelayClient, SignedBlindedBeaconBlock, SubmitBlindedBlockResponse, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS, }, @@ -14,7 +15,6 @@ use tracing::{debug, warn}; use crate::{ constants::{SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, - error::{PbsError, ValidationError}, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, }; @@ -60,7 +60,7 @@ async fn send_submit_block( headers: HeaderMap, timeout_ms: u64, ) -> Result { - let url = relay.submit_block_url(); + let url = relay.submit_block_url()?; let start_request = Instant::now(); let res = match relay diff --git a/crates/pbs/src/routes/register_validator.rs b/crates/pbs/src/routes/register_validator.rs index d826762e..a13b71f8 100644 --- a/crates/pbs/src/routes/register_validator.rs +++ b/crates/pbs/src/routes/register_validator.rs @@ -1,8 +1,14 @@ +use std::time::Instant; + use alloy::rpc::types::beacon::relay::ValidatorRegistration; use axum::{extract::State, http::HeaderMap, response::IntoResponse, Json}; -use cb_common::{pbs::BuilderEvent, utils::get_user_agent}; -use reqwest::StatusCode; -use tracing::{error, info, trace}; +use cb_common::{ + pbs::{BuilderEvent, REGISTER_VALIDATOR_PATH}, + utils::get_user_agent, + DEFAULT_REQUEST_TIMEOUT, +}; +use reqwest::{StatusCode, Url}; +use tracing::{debug, error, info, trace}; use uuid::Uuid; use crate::{ @@ -26,6 +32,13 @@ pub async fn handle_register_validator>( info!(ua, num_registrations = registrations.len()); + if state.has_monitors() { + // send registrations to monitors + for relay_monitor in state.pbs_config().relay_monitors.clone() { + tokio::spawn(send_relay_monitor_registrations(registrations.clone(), relay_monitor)); + } + } + if let Err(err) = A::register_validator(registrations, req_headers, state.clone()).await { state.publish_event(BuilderEvent::RegisterValidatorResponse); error!(?err, "all relays failed registration"); @@ -42,3 +55,44 @@ pub async fn handle_register_validator>( Ok(StatusCode::OK) } } + +#[tracing::instrument(skip_all, name = "monitor", fields(url = relay_monitor_url.host_str().unwrap_or_default()))] +async fn send_relay_monitor_registrations( + registrations: Vec, + relay_monitor_url: Url, +) { + let Ok(url) = relay_monitor_url.join(REGISTER_VALIDATOR_PATH) else { + error!("invalid URL"); + return; + }; + + let start_request = Instant::now(); + let res = match reqwest::Client::new() + .post(url) + .timeout(DEFAULT_REQUEST_TIMEOUT) + .json(®istrations) + .send() + .await + { + Ok(res) => res, + Err(err) => { + error!(?err, "failed monitor registration"); + return; + } + }; + let request_latency = start_request.elapsed(); + + let code = res.status(); + match res.bytes().await { + Ok(response_bytes) => { + if code.is_success() { + debug!(?code, latency = ?request_latency, "relay monitor registration successful"); + } else { + let err = String::from_utf8_lossy(&response_bytes); + error!(?code, ?err, "failed monitor registration"); + } + } + + Err(err) => error!(?err, "failed to decode monitor response"), + } +} diff --git a/crates/pbs/src/routes/router.rs b/crates/pbs/src/routes/router.rs index e035702b..f6ae4202 100644 --- a/crates/pbs/src/routes/router.rs +++ b/crates/pbs/src/routes/router.rs @@ -3,7 +3,7 @@ use axum::{ Router, }; use cb_common::pbs::{ - BULDER_API_PATH, GET_HEADER_PATH, GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH, + BUILDER_API_PATH, GET_HEADER_PATH, GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH, }; use super::{handle_get_header, handle_get_status, handle_register_validator, handle_submit_block}; @@ -19,7 +19,7 @@ pub fn create_app_router>(state: PbsState)) .route(SUBMIT_BLOCK_PATH, post(handle_submit_block::)); - let builder_api = Router::new().nest(BULDER_API_PATH, builder_routes); + let builder_api = Router::new().nest(BUILDER_API_PATH, builder_routes); let app = if let Some(extra_routes) = A::extra_routes() { builder_api.merge(extra_routes) diff --git a/tests/src/mock_relay.rs b/tests/src/mock_relay.rs index b43a5b53..25a489fa 100644 --- a/tests/src/mock_relay.rs +++ b/tests/src/mock_relay.rs @@ -13,7 +13,7 @@ use axum::{ }; use cb_common::{ pbs::{ - GetHeaderParams, GetHeaderReponse, SubmitBlindedBlockResponse, BULDER_API_PATH, + GetHeaderParams, GetHeaderReponse, SubmitBlindedBlockResponse, BUILDER_API_PATH, GET_HEADER_PATH, GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH, }, signer::Signer, @@ -69,7 +69,7 @@ pub fn mock_relay_app_router(state: Arc) -> Router { .route(SUBMIT_BLOCK_PATH, post(handle_submit_block)) .with_state(state); - Router::new().nest(BULDER_API_PATH, builder_routes) + Router::new().nest(BUILDER_API_PATH, builder_routes) } async fn handle_get_header( diff --git a/tests/src/mock_validator.rs b/tests/src/mock_validator.rs index ac0a734a..d9afbdbd 100644 --- a/tests/src/mock_validator.rs +++ b/tests/src/mock_validator.rs @@ -17,7 +17,7 @@ impl MockValidator { } pub async fn do_get_header(&self) -> Result<(), Error> { - let url = self.comm_boost.get_header_url(0, B256::ZERO, BlsPublicKey::ZERO); + let url = self.comm_boost.get_header_url(0, B256::ZERO, BlsPublicKey::ZERO).unwrap(); let res = self.comm_boost.client.get(url).send().await?.bytes().await?; assert!(serde_json::from_slice::(&res).is_ok()); @@ -25,7 +25,7 @@ impl MockValidator { } pub async fn do_get_status(&self) -> Result<(), Error> { - let url = self.comm_boost.get_status_url(); + let url = self.comm_boost.get_status_url().unwrap(); let _res = self.comm_boost.client.get(url).send().await?; // assert!(res.status().is_success()); @@ -33,7 +33,7 @@ impl MockValidator { } pub async fn do_register_validator(&self) -> Result<(), Error> { - let url = self.comm_boost.register_validator_url(); + let url = self.comm_boost.register_validator_url().unwrap(); let registration: Vec = vec![]; @@ -50,7 +50,7 @@ impl MockValidator { } pub async fn do_submit_block(&self) -> Result<(), Error> { - let url = self.comm_boost.submit_block_url(); + let url = self.comm_boost.submit_block_url().unwrap(); let signed_blinded_block = SignedBlindedBeaconBlock::default(); diff --git a/tests/src/utils.rs b/tests/src/utils.rs index 1072ee0d..0ee7f526 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -19,7 +19,15 @@ pub fn setup_test_env() { } pub fn generate_mock_relay(port: u16, pubkey: BlsPublicKey) -> Result { - let entry = RelayEntry { id: format!("mock_{port}"), pubkey, url: get_local_address(port) }; - let config = RelayConfig { entry, ..RelayConfig::default() }; + let entry = + RelayEntry { id: format!("mock_{port}"), pubkey, url: get_local_address(port).parse()? }; + let config = RelayConfig { + entry, + id: None, + headers: None, + enable_timing_games: false, + target_first_request_ms: None, + frequency_get_header_ms: None, + }; RelayClient::new(config) }