Skip to content

Commit

Permalink
fix: read body in chunks (#164)
Browse files Browse the repository at this point in the history
* read body in chunks

* fix test

* add test
  • Loading branch information
ltitanb authored Oct 24, 2024
1 parent d0abdb4 commit a293bdb
Show file tree
Hide file tree
Showing 15 changed files with 133 additions and 56 deletions.
19 changes: 17 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ ethereum_serde_utils = "0.7.0"
# networking
axum = { version = "0.7.5", features = ["macros"] }
axum-extra = { version = "0.9.3", features = ["typed-header"] }
reqwest = { version = "0.12.4", features = ["json"] }
reqwest = { version = "0.12.4", features = ["json", "stream"] }
headers = "0.4.0"

# async / threads
Expand Down
10 changes: 5 additions & 5 deletions crates/common/src/pbs/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ use crate::error::BlstErrorWrapper;

#[derive(Debug, Error)]
pub enum PbsError {
#[error("axum error: {0}")]
#[error("axum error: {0:?}")]
AxumError(#[from] axum::Error),

#[error("reqwest error: {0}")]
#[error("reqwest error: {0:?}")]
Reqwest(#[from] reqwest::Error),

#[error("json decode error: {err}, raw: {raw}")]
#[error("json decode error: {err:?}, raw: {raw}")]
JsonDecode { err: serde_json::Error, raw: String },

#[error("relay response error. Code: {code}, err: {error_msg}")]
RelayResponse { error_msg: String, code: u16 },

#[error("response size exceeds max size: max: {max} got: {got}")]
PayloadTooLarge { max: usize, got: usize },
#[error("response size exceeds max size: max: {max} raw: {raw}")]
PayloadTooLarge { max: usize, raw: String },

#[error("failed validating relay response: {0}")]
Validation(#[from] ValidationError),
Expand Down
2 changes: 0 additions & 2 deletions crates/common/src/pbs/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use super::{
};
use crate::{config::RelayConfig, DEFAULT_REQUEST_TIMEOUT};

pub const MAX_SIZE: usize = 10 * 1024 * 1024;

/// A parsed entry of the relay url in the format: scheme://pubkey@host
#[derive(Debug, Clone)]
pub struct RelayEntry {
Expand Down
21 changes: 15 additions & 6 deletions crates/pbs/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
pub(crate) const STATUS_ENDPOINT_TAG: &str = "status";
pub(crate) const REGISTER_VALIDATOR_ENDPOINT_TAG: &str = "register_validator";
pub(crate) const SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG: &str = "submit_blinded_block";
pub(crate) const GET_HEADER_ENDPOINT_TAG: &str = "get_header";
pub const STATUS_ENDPOINT_TAG: &str = "status";
pub const REGISTER_VALIDATOR_ENDPOINT_TAG: &str = "register_validator";
pub const SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG: &str = "submit_blinded_block";
pub const GET_HEADER_ENDPOINT_TAG: &str = "get_header";

/// For metrics recorded when a request times out
pub(crate) const TIMEOUT_ERROR_CODE: u16 = 555;
pub(crate) const TIMEOUT_ERROR_CODE_STR: &str = "555";
pub const TIMEOUT_ERROR_CODE: u16 = 555;
pub const TIMEOUT_ERROR_CODE_STR: &str = "555";

/// 20 MiB to cover edge cases for heavy blocks and also add a bit of slack for
/// any Ethereum upgrades in the near future
pub const MAX_SIZE_SUBMIT_BLOCK: usize = 20 * 1024 * 1024;

/// 10 KiB, headers are around 700 bytes + buffer for encoding
pub const MAX_SIZE_GET_HEADER: usize = 10 * 1024;

pub const MAX_SIZE_DEFAULT: usize = 1024;
2 changes: 2 additions & 0 deletions crates/pbs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ mod mev_boost;
mod routes;
mod service;
mod state;
mod utils;

pub use api::*;
pub use constants::*;
pub use mev_boost::*;
pub use service::PbsService;
pub use state::{BuilderApiState, PbsState};
13 changes: 6 additions & 7 deletions crates/pbs/src/mev_boost/get_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use cb_common::{
pbs::{
error::{PbsError, ValidationError},
GetHeaderParams, GetHeaderResponse, RelayClient, SignedExecutionPayloadHeader,
EMPTY_TX_ROOT_HASH, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS, MAX_SIZE,
EMPTY_TX_ROOT_HASH, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS,
},
signature::verify_signed_message,
types::Chain,
Expand All @@ -24,9 +24,12 @@ use tracing::{debug, error, warn, Instrument};
use url::Url;

use crate::{
constants::{GET_HEADER_ENDPOINT_TAG, TIMEOUT_ERROR_CODE, TIMEOUT_ERROR_CODE_STR},
constants::{
GET_HEADER_ENDPOINT_TAG, MAX_SIZE_GET_HEADER, TIMEOUT_ERROR_CODE, TIMEOUT_ERROR_CODE_STR,
},
metrics::{RELAY_HEADER_VALUE, RELAY_LAST_SLOT, RELAY_LATENCY, RELAY_STATUS_CODE},
state::{BuilderApiState, PbsState},
utils::read_chunked_body_with_max,
};

/// Implements https://ethereum.github.io/builder-specs/#/Builder/getHeader
Expand Down Expand Up @@ -247,11 +250,7 @@ async fn send_one_get_header(
let code = res.status();
RELAY_STATUS_CODE.with_label_values(&[code.as_str(), GET_HEADER_ENDPOINT_TAG, &relay.id]).inc();

let response_bytes = res.bytes().await?;
if response_bytes.len() > MAX_SIZE {
return Err(PbsError::PayloadTooLarge { max: MAX_SIZE, got: response_bytes.len() });
}

let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_GET_HEADER).await?;
if !code.is_success() {
return Err(PbsError::RelayResponse {
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
Expand Down
10 changes: 4 additions & 6 deletions crates/pbs/src/mev_boost/register_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::{Duration, Instant};
use alloy::rpc::types::beacon::relay::ValidatorRegistration;
use axum::http::{HeaderMap, HeaderValue};
use cb_common::{
pbs::{error::PbsError, RelayClient, HEADER_START_TIME_UNIX_MS, MAX_SIZE},
pbs::{error::PbsError, RelayClient, HEADER_START_TIME_UNIX_MS},
utils::{get_user_agent_with_version, utcnow_ms},
};
use eyre::bail;
Expand All @@ -12,9 +12,10 @@ use reqwest::header::USER_AGENT;
use tracing::{debug, error, Instrument};

use crate::{
constants::{REGISTER_VALIDATOR_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR},
constants::{MAX_SIZE_DEFAULT, REGISTER_VALIDATOR_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR},
metrics::{RELAY_LATENCY, RELAY_STATUS_CODE},
state::{BuilderApiState, PbsState},
utils::read_chunked_body_with_max,
};

/// Implements https://ethereum.github.io/builder-specs/#/Builder/registerValidator
Expand Down Expand Up @@ -103,11 +104,8 @@ async fn send_register_validator(
.with_label_values(&[code.as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG, &relay.id])
.inc();

let response_bytes = res.bytes().await?;
if response_bytes.len() > MAX_SIZE {
return Err(PbsError::PayloadTooLarge { max: MAX_SIZE, got: response_bytes.len() });
}
if !code.is_success() {
let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_DEFAULT).await?;
let err = PbsError::RelayResponse {
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
code: code.as_u16(),
Expand Down
10 changes: 4 additions & 6 deletions crates/pbs/src/mev_boost/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ use std::time::{Duration, Instant};

use axum::http::HeaderMap;
use cb_common::{
pbs::{error::PbsError, RelayClient, MAX_SIZE},
pbs::{error::PbsError, RelayClient},
utils::get_user_agent_with_version,
};
use futures::future::select_ok;
use reqwest::header::USER_AGENT;
use tracing::{debug, error};

use crate::{
constants::{STATUS_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR},
constants::{MAX_SIZE_DEFAULT, STATUS_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR},
metrics::{RELAY_LATENCY, RELAY_STATUS_CODE},
state::{BuilderApiState, PbsState},
utils::read_chunked_body_with_max,
};

/// Implements https://ethereum.github.io/builder-specs/#/Builder/status
Expand Down Expand Up @@ -74,11 +75,8 @@ async fn send_relay_check(relay: &RelayClient, headers: HeaderMap) -> Result<(),
let code = res.status();
RELAY_STATUS_CODE.with_label_values(&[code.as_str(), STATUS_ENDPOINT_TAG, &relay.id]).inc();

let response_bytes = res.bytes().await?;
if response_bytes.len() > MAX_SIZE {
return Err(PbsError::PayloadTooLarge { max: MAX_SIZE, got: response_bytes.len() });
}
if !code.is_success() {
let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_DEFAULT).await?;
let err = PbsError::RelayResponse {
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
code: code.as_u16(),
Expand Down
13 changes: 5 additions & 8 deletions crates/pbs/src/mev_boost/submit_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use cb_common::{
pbs::{
error::{PbsError, ValidationError},
RelayClient, SignedBlindedBeaconBlock, SubmitBlindedBlockResponse, HEADER_SLOT_UUID_KEY,
HEADER_START_TIME_UNIX_MS, MAX_SIZE,
HEADER_START_TIME_UNIX_MS,
},
utils::{get_user_agent_with_version, utcnow_ms},
};
Expand All @@ -14,9 +14,10 @@ use reqwest::header::USER_AGENT;
use tracing::{debug, warn};

use crate::{
constants::{SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR},
constants::{MAX_SIZE_SUBMIT_BLOCK, SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR},
metrics::{RELAY_LATENCY, RELAY_STATUS_CODE},
state::{BuilderApiState, PbsState},
utils::read_chunked_body_with_max,
};

/// Implements https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlock
Expand Down Expand Up @@ -94,18 +95,14 @@ async fn send_submit_block(
.with_label_values(&[code.as_str(), SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id])
.inc();

let response_bytes = res.bytes().await?;

if response_bytes.len() > MAX_SIZE {
return Err(PbsError::PayloadTooLarge { max: MAX_SIZE, got: response_bytes.len() });
}
let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_SUBMIT_BLOCK).await?;
if !code.is_success() {
let err = PbsError::RelayResponse {
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
code: code.as_u16(),
};

// we request payload to all relays, but some may have not received it
// we requested the payload from all relays, but some may have not received it
warn!(%err, "failed to get payload (this might be ok if other relays have it)");
return Err(err);
};
Expand Down
27 changes: 27 additions & 0 deletions crates/pbs/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use cb_common::pbs::error::PbsError;
use futures::StreamExt;
use reqwest::Response;

pub async fn read_chunked_body_with_max(
res: Response,
max_size: usize,
) -> Result<Vec<u8>, PbsError> {
let mut stream = res.bytes_stream();
let mut response_bytes = Vec::new();

while let Some(chunk) = stream.next().await {
let chunk = chunk?;
if response_bytes.len() + chunk.len() > max_size {
// avoid spamming logs if the message is too large
response_bytes.truncate(1024);
return Err(PbsError::PayloadTooLarge {
max: max_size,
raw: String::from_utf8_lossy(&response_bytes).into_owned(),
});
}

response_bytes.extend_from_slice(&chunk);
}

Ok(response_bytes)
}
15 changes: 14 additions & 1 deletion tests/src/mock_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use cb_common::{
types::Chain,
utils::blst_pubkey_to_alloy,
};
use cb_pbs::MAX_SIZE_SUBMIT_BLOCK;
use tokio::net::TcpListener;
use tracing::debug;
use tree_hash::TreeHash;
Expand All @@ -41,6 +42,7 @@ pub async fn start_mock_relay_service(state: Arc<MockRelayState>, port: u16) ->
pub struct MockRelayState {
pub chain: Chain,
pub signer: BlsSecretKey,
large_body: bool,
received_get_header: Arc<AtomicU64>,
received_get_status: Arc<AtomicU64>,
received_register_validator: Arc<AtomicU64>,
Expand All @@ -67,12 +69,17 @@ impl MockRelayState {
Self {
chain,
signer,
large_body: false,
received_get_header: Default::default(),
received_get_status: Default::default(),
received_register_validator: Default::default(),
received_submit_block: Default::default(),
}
}

pub fn with_large_body(self) -> Self {
Self { large_body: true, ..self }
}
}

pub fn mock_relay_app_router(state: Arc<MockRelayState>) -> Router {
Expand Down Expand Up @@ -119,6 +126,12 @@ async fn handle_register_validator(

async fn handle_submit_block(State(state): State<Arc<MockRelayState>>) -> impl IntoResponse {
state.received_submit_block.fetch_add(1, Ordering::Relaxed);
let response = SubmitBlindedBlockResponse::default();

let response = if state.large_body {
vec![1u8; 1 + MAX_SIZE_SUBMIT_BLOCK]
} else {
serde_json::to_vec(&SubmitBlindedBlockResponse::default()).unwrap()
};

(StatusCode::OK, Json(response)).into_response()
}
12 changes: 2 additions & 10 deletions tests/src/mock_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,7 @@ impl MockValidator {

let registration: Vec<ValidatorRegistration> = vec![];

self.comm_boost
.client
.post(url)
.header("Content-Type", "application/json")
.body(serde_json::to_string(&registration).unwrap())
.send()
.await?
.error_for_status()?;
self.comm_boost.client.post(url).json(&registration).send().await?.error_for_status()?;

Ok(())
}
Expand All @@ -57,8 +50,7 @@ impl MockValidator {
self.comm_boost
.client
.post(url)
.header("Content-Type", "application/json")
.body(serde_json::to_string(&signed_blinded_block).unwrap())
.json(&signed_blinded_block)
.send()
.await?
.error_for_status()?;
Expand Down
2 changes: 1 addition & 1 deletion tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub fn get_local_address(port: u16) -> String {
static SYNC_SETUP: Once = Once::new();
pub fn setup_test_env() {
SYNC_SETUP.call_once(|| {
tracing_subscriber::fmt().with_max_level(tracing::Level::DEBUG).init();
tracing_subscriber::fmt().with_max_level(tracing::Level::TRACE).init();
});
}

Expand Down
Loading

0 comments on commit a293bdb

Please sign in to comment.