Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pbs): stateless pbs module #212

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions crates/common/src/config/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,7 @@ async fn fetch_lido_registry_keys(
chain: Chain,
node_operator_id: U256,
) -> eyre::Result<Vec<BlsPublicKey>> {
debug!(
"loading operator keys from Lido registry: chain={:?}, node_operator_id={}",
chain, node_operator_id
);
debug!(?chain, %node_operator_id, "loading operator keys from Lido registry");

let provider = ProviderBuilder::new().on_http(rpc_url);
let registry_address = lido_registry_address(chain)?;
Expand Down Expand Up @@ -263,7 +260,7 @@ async fn fetch_lido_registry_keys(
}

ensure!(keys.len() == total_keys as usize, "expected {total_keys} keys, got {}", keys.len());
let unique: Vec<_> = keys.iter().collect::<HashSet<_>>().into_iter().collect();
let unique = keys.iter().collect::<HashSet<_>>();
ensure!(unique.len() == keys.len(), "found duplicate keys in registry");

Ok(keys)
Expand Down
3 changes: 2 additions & 1 deletion crates/common/src/pbs/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ pub const SUBMIT_BLOCK_PATH: &str = "/blinded_blocks";

// https://ethereum.github.io/builder-specs/#/Builder

pub const HEADER_SLOT_UUID_KEY: &str = "X-MEVBoost-SlotID";
// Currently unused to enable a stateless default PBS module
// const HEADER_SLOT_UUID_KEY: &str = "X-MEVBoost-SlotID";
pub const HEADER_VERSION_KEY: &str = "X-CommitBoost-Version";
pub const HEADER_VERSION_VALUE: &str = COMMIT_BOOST_VERSION;
pub const HEADER_START_TIME_UNIX_MS: &str = "X-MEVBoost-StartTimeUnixMS";
Expand Down
3 changes: 0 additions & 3 deletions crates/common/src/pbs/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ pub enum BuilderEvent {
MissedPayload {
/// Hash for the block for which no payload was received
block_hash: B256,
/// Relays which delivered the header but for which no payload was
/// received
missing_relays: String,
},
RegisterValidatorRequest(Vec<ValidatorRegistration>),
RegisterValidatorResponse,
Expand Down
9 changes: 4 additions & 5 deletions crates/pbs/src/mev_boost/get_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use cb_common::{
pbs::{
error::{PbsError, ValidationError},
GetHeaderParams, GetHeaderResponse, RelayClient, SignedExecutionPayloadHeader,
EMPTY_TX_ROOT_HASH, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS,
EMPTY_TX_ROOT_HASH, HEADER_START_TIME_UNIX_MS,
},
signature::verify_signed_message,
types::Chain,
Expand Down Expand Up @@ -73,11 +73,8 @@ pub async fn get_header<S: BuilderApiState>(
return Ok(None);
}

let (_, slot_uuid) = state.get_slot_and_uuid();

// prepare headers, except for start time which is set in `send_one_get_header`
let mut send_headers = HeaderMap::new();
send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?);
send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?);

let mut handles = Vec::with_capacity(relays.len());
Expand Down Expand Up @@ -118,7 +115,9 @@ pub async fn get_header<S: BuilderApiState>(
}
}

Ok(state.add_bids(params.slot, relay_bids))
let max_bid = relay_bids.into_iter().max_by_key(|bid| bid.value());

Ok(max_bid)
}

/// Fetch the parent block from the RPC URL for extra validation of the header.
Expand Down
5 changes: 1 addition & 4 deletions crates/pbs/src/mev_boost/submit_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use axum::http::{HeaderMap, HeaderValue};
use cb_common::{
pbs::{
error::{PbsError, ValidationError},
RelayClient, SignedBlindedBeaconBlock, SubmitBlindedBlockResponse, HEADER_SLOT_UUID_KEY,
RelayClient, SignedBlindedBeaconBlock, SubmitBlindedBlockResponse,
HEADER_START_TIME_UNIX_MS,
},
utils::{get_user_agent_with_version, utcnow_ms},
Expand All @@ -27,11 +27,8 @@ pub async fn submit_block<S: BuilderApiState>(
req_headers: HeaderMap,
state: PbsState<S>,
) -> eyre::Result<SubmitBlindedBlockResponse> {
let (_, slot_uuid) = state.get_slot_and_uuid();

// prepare headers
let mut send_headers = HeaderMap::new();
send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?);
send_headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(utcnow_ms()));
send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?);

Expand Down
1 change: 0 additions & 1 deletion crates/pbs/src/routes/get_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub async fn handle_get_header<S: BuilderApiState, A: BuilderApi<S>>(
Path(params): Path<GetHeaderParams>,
) -> Result<impl IntoResponse, PbsClientError> {
state.publish_event(BuilderEvent::GetHeaderRequest(params));
state.get_or_update_slot_uuid(params.slot);

let ua = get_user_agent(&req_headers);
let ms_into_slot = ms_into_slot(params.slot, state.config.chain);
Expand Down
29 changes: 4 additions & 25 deletions crates/pbs/src/routes/submit_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use cb_common::{
utils::{get_user_agent, timestamp_of_slot_start_millis, utcnow_ms},
};
use reqwest::StatusCode;
use tracing::{error, info, trace, warn};
use tracing::{error, info, trace};
use uuid::Uuid;

use crate::{
Expand All @@ -29,13 +29,8 @@ pub async fn handle_submit_block<S: BuilderApiState, A: BuilderApi<S>>(
let block_hash = signed_blinded_block.message.body.execution_payload_header.block_hash;
let slot_start_ms = timestamp_of_slot_start_millis(slot, state.config.chain);
let ua = get_user_agent(&req_headers);
let (curr_slot, slot_uuid) = state.get_slot_and_uuid();

info!(ua, %slot_uuid, ms_into_slot=now.saturating_sub(slot_start_ms), %block_hash);

if curr_slot != signed_blinded_block.message.slot {
warn!(expected = curr_slot, got = slot, "blinded beacon slot mismatch")
}
info!(ua, ms_into_slot=now.saturating_sub(slot_start_ms), %block_hash);

match A::submit_block(signed_blinded_block, req_headers, state.clone()).await {
Ok(res) => {
Expand All @@ -48,24 +43,8 @@ pub async fn handle_submit_block<S: BuilderApiState, A: BuilderApi<S>>(
}

Err(err) => {
if let Some(fault_pubkeys) = state.get_relays_by_block_hash(slot, block_hash) {
let missing_relays = state
.relays()
.iter()
.filter(|relay| fault_pubkeys.contains(&relay.pubkey()))
.map(|relay| &**relay.id)
.collect::<Vec<_>>()
.join(",");

error!(%err, %block_hash, missing_relays, "CRITICAL: no payload received from relays");
state.publish_event(BuilderEvent::MissedPayload { block_hash, missing_relays });
} else {
error!(%err, %block_hash, "CRITICAL: no payload delivered and no relay for block hash. Was getHeader even called?");
state.publish_event(BuilderEvent::MissedPayload {
block_hash,
missing_relays: String::default(),
});
};
error!(%err, %block_hash, "CRITICAL: no payload received from relays. Check previous logs or use the Relay Data API");
state.publish_event(BuilderEvent::MissedPayload { block_hash });

let err = PbsClientError::NoPayload;
BEACON_NODE_STATUS
Expand Down
81 changes: 7 additions & 74 deletions crates/pbs/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,30 @@
use std::{
collections::HashSet,
sync::{Arc, Mutex},
};

use alloy::{primitives::B256, rpc::types::beacon::BlsPublicKey};
use alloy::rpc::types::beacon::BlsPublicKey;
use cb_common::{
config::{PbsConfig, PbsModuleConfig},
pbs::{BuilderEvent, GetHeaderResponse, RelayClient},
pbs::{BuilderEvent, RelayClient},
};
use dashmap::DashMap;
use uuid::Uuid;

pub trait BuilderApiState: Clone + Sync + Send + 'static {}
impl BuilderApiState for () {}

/// State for the Pbs module. It can be extended by adding extra data to the
/// state
/// Config for the Pbs module. It can be extended by adding extra data to the
/// state for modules that need it
// TODO: consider remove state from the PBS module altogether
#[derive(Clone)]
pub struct PbsState<S: BuilderApiState = ()> {
/// Config data for the Pbs service
pub config: PbsModuleConfig,
/// Opaque extra data for library use
pub data: S,
/// Info about the latest slot and its uuid
current_slot_info: Arc<Mutex<(u64, Uuid)>>,
/// Keeps track of which relays delivered which block for which slot
bid_cache: Arc<DashMap<u64, Vec<GetHeaderResponse>>>,
}

impl PbsState<()> {
pub fn new(config: PbsModuleConfig) -> Self {
Self {
config,
data: (),
current_slot_info: Arc::new(Mutex::new((0, Uuid::new_v4()))),
bid_cache: Arc::new(DashMap::new()),
}
Self { config, data: () }
}

pub fn with_data<S: BuilderApiState>(self, data: S) -> PbsState<S> {
PbsState {
data,
config: self.config,
current_slot_info: self.current_slot_info,
bid_cache: self.bid_cache,
}
PbsState { data, config: self.config }
}
}

Expand All @@ -58,22 +38,6 @@ where
}
}

pub fn get_or_update_slot_uuid(&self, last_slot: u64) -> Uuid {
let mut guard = self.current_slot_info.lock().expect("poisoned");
if guard.0 < last_slot {
// new slot
guard.0 = last_slot;
guard.1 = Uuid::new_v4();
self.clear(last_slot);
}
guard.1
}

pub fn get_slot_and_uuid(&self) -> (u64, Uuid) {
let guard = self.current_slot_info.lock().expect("poisoned");
*guard
}

// Getters
pub fn pbs_config(&self) -> &PbsConfig {
&self.config.pbs_config
Expand Down Expand Up @@ -102,35 +66,4 @@ where
pub fn extra_validation_enabled(&self) -> bool {
self.config.pbs_config.extra_validation_enabled
}

/// Add some bids to the cache, the bids are all assumed to be for the
/// provided slot Returns the bid with the max value
pub fn add_bids(&self, slot: u64, bids: Vec<GetHeaderResponse>) -> Option<GetHeaderResponse> {
let mut slot_entry = self.bid_cache.entry(slot).or_default();
slot_entry.extend(bids);
slot_entry.iter().max_by_key(|bid| bid.value()).cloned()
}

/// Retrieves a list of relays pubkeys that delivered a given block hash
/// Returns None if we dont have bids for the slot or for the block hash
pub fn get_relays_by_block_hash(
&self,
slot: u64,
block_hash: B256,
) -> Option<HashSet<BlsPublicKey>> {
self.bid_cache.get(&slot).and_then(|bids| {
let filtered: HashSet<_> = bids
.iter()
.filter(|&bid| (bid.block_hash() == block_hash))
.map(|bid| bid.pubkey())
.collect();

(!filtered.is_empty()).then_some(filtered)
})
}

/// Clear bids which are more than ~3 minutes old
fn clear(&self, last_slot: u64) {
self.bid_cache.retain(|slot, _| last_slot.saturating_sub(*slot) < 15)
}
}
Loading