diff --git a/Cargo.lock b/Cargo.lock index 1d3dc307066..fc7b49de0b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8739,6 +8739,7 @@ name = "warp_utils" version = "0.1.0" dependencies = [ "beacon_chain", + "bytes", "eth2", "headers", "lazy_static", @@ -8746,6 +8747,7 @@ dependencies = [ "safe_arith", "serde", "serde_array_query", + "serde_json", "state_processing", "tokio", "types", diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 1594668e5c8..d4a43806f49 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -682,7 +682,7 @@ pub fn serve( .clone() .and(warp::path("validator_balances")) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .then( |state_id: StateId, task_spawner: TaskSpawner, @@ -726,7 +726,7 @@ pub fn serve( .clone() .and(warp::path("validators")) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .then( |state_id: StateId, task_spawner: TaskSpawner, @@ -1257,7 +1257,7 @@ pub fn serve( .and(warp::path("beacon")) .and(warp::path("blocks")) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) @@ -1327,7 +1327,7 @@ pub fn serve( .and(warp::path("blocks")) .and(warp::query::()) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) @@ -1404,7 +1404,7 @@ pub fn serve( .and(warp::path("beacon")) .and(warp::path("blinded_blocks")) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) @@ -1472,7 +1472,7 @@ pub fn serve( .and(warp::path("blinded_blocks")) .and(warp::query::()) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) @@ -1754,7 +1754,7 @@ pub fn serve( .clone() .and(warp::path("attestations")) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) .then( @@ -1930,7 +1930,7 @@ pub fn serve( .clone() .and(warp::path("attester_slashings")) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(network_tx_filter.clone()) .then( |task_spawner: TaskSpawner, @@ -1988,7 +1988,7 @@ pub fn serve( .clone() .and(warp::path("proposer_slashings")) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(network_tx_filter.clone()) .then( |task_spawner: TaskSpawner, @@ -2046,7 +2046,7 @@ pub fn serve( .clone() .and(warp::path("voluntary_exits")) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(network_tx_filter.clone()) .then( |task_spawner: TaskSpawner, @@ -2102,7 +2102,7 @@ pub fn serve( .clone() .and(warp::path("sync_committees")) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) .then( @@ -2139,7 +2139,7 @@ pub fn serve( .clone() .and(warp::path("bls_to_execution_changes")) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) .then( @@ -2533,7 +2533,7 @@ pub fn serve( .and(warp::path("attestations")) .and(warp::path::param::()) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .then( |task_spawner: TaskSpawner, chain: Arc>, @@ -2583,7 +2583,7 @@ pub fn serve( .and(warp::path("sync_committee")) .and(block_id_or_err) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(log_filter.clone()) .then( |task_spawner: TaskSpawner, @@ -3326,7 +3326,7 @@ pub fn serve( })) .and(warp::path::end()) .and(not_while_syncing_filter.clone()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .then( @@ -3352,7 +3352,7 @@ pub fn serve( })) .and(warp::path::end()) .and(not_while_syncing_filter.clone()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .then( @@ -3406,7 +3406,7 @@ pub fn serve( .and(not_while_syncing_filter.clone()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) .then( @@ -3519,7 +3519,7 @@ pub fn serve( .and(not_while_syncing_filter.clone()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(network_tx_filter) .and(log_filter.clone()) .then( @@ -3545,7 +3545,7 @@ pub fn serve( .and(warp::path("validator")) .and(warp::path("beacon_committee_subscriptions")) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(validator_subscription_tx_filter.clone()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) @@ -3601,7 +3601,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .then( |task_spawner: TaskSpawner, chain: Arc>, @@ -3652,7 +3652,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .then( |task_spawner: TaskSpawner, chain: Arc>, @@ -3826,7 +3826,7 @@ pub fn serve( .and(warp::path("validator")) .and(warp::path("sync_committee_subscriptions")) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(validator_subscription_tx_filter) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) @@ -3872,7 +3872,7 @@ pub fn serve( .and(warp::path("liveness")) .and(warp::path::param::()) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .then( @@ -3913,7 +3913,7 @@ pub fn serve( let post_lighthouse_liveness = warp::path("lighthouse") .and(warp::path("liveness")) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .then( @@ -4016,7 +4016,7 @@ pub fn serve( .and(warp::path("ui")) .and(warp::path("validator_metrics")) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .then( @@ -4035,7 +4035,7 @@ pub fn serve( .and(warp::path("ui")) .and(warp::path("validator_info")) .and(warp::path::end()) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .then( @@ -4368,7 +4368,7 @@ pub fn serve( let post_lighthouse_block_rewards = warp::path("lighthouse") .and(warp::path("analysis")) .and(warp::path("block_rewards")) - .and(warp::body::json()) + .and(warp_utils::json::json()) .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 16801be8e23..6687788d528 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -373,10 +373,30 @@ impl BeaconNodeHttpClient { if let Some(timeout) = timeout { builder = builder.timeout(timeout); } + let response = builder.json(body).send().await?; ok_or_error(response).await } + /// Generic POST function supporting arbitrary responses and timeouts. + /// Does not include Content-Type application/json in the request header. + async fn post_generic_json_without_content_type_header( + &self, + url: U, + body: &T, + timeout: Option, + ) -> Result { + let mut builder = self.client.post(url); + if let Some(timeout) = timeout { + builder = builder.timeout(timeout); + } + + let serialized_body = serde_json::to_vec(body).map_err(Error::InvalidJson)?; + + let response = builder.body(serialized_body).send().await?; + ok_or_error(response).await + } + /// Generic POST function supporting arbitrary responses and timeouts. async fn post_generic_with_consensus_version( &self, @@ -1250,7 +1270,8 @@ impl BeaconNodeHttpClient { .push("pool") .push("attester_slashings"); - self.post(path, slashing).await?; + self.post_generic_json_without_content_type_header(path, slashing, None) + .await?; Ok(()) } diff --git a/common/warp_utils/Cargo.toml b/common/warp_utils/Cargo.toml index 85c1901badd..0d33de998ea 100644 --- a/common/warp_utils/Cargo.toml +++ b/common/warp_utils/Cargo.toml @@ -14,8 +14,10 @@ beacon_chain = { workspace = true } state_processing = { workspace = true } safe_arith = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } tokio = { workspace = true } headers = "0.3.2" lighthouse_metrics = { workspace = true } lazy_static = { workspace = true } serde_array_query = "0.1.0" +bytes = { workspace = true } diff --git a/common/warp_utils/src/json.rs b/common/warp_utils/src/json.rs new file mode 100644 index 00000000000..203a6495a42 --- /dev/null +++ b/common/warp_utils/src/json.rs @@ -0,0 +1,22 @@ +use bytes::Bytes; +use serde::de::DeserializeOwned; +use std::error::Error as StdError; +use warp::{Filter, Rejection}; + +use crate::reject; + +struct Json; + +type BoxError = Box; + +impl Json { + fn decode(bytes: Bytes) -> Result { + serde_json::from_slice(&bytes).map_err(Into::into) + } +} + +pub fn json() -> impl Filter + Copy { + warp::body::bytes().and_then(|bytes: Bytes| async move { + Json::decode(bytes).map_err(|err| reject::custom_deserialize_error(format!("{:?}", err))) + }) +} diff --git a/common/warp_utils/src/lib.rs b/common/warp_utils/src/lib.rs index 77d61251f24..55ee423fa41 100644 --- a/common/warp_utils/src/lib.rs +++ b/common/warp_utils/src/lib.rs @@ -2,6 +2,7 @@ //! Lighthouse project. E.g., the `http_api` and `http_metrics` crates. pub mod cors; +pub mod json; pub mod metrics; pub mod query; pub mod reject; diff --git a/common/warp_utils/src/reject.rs b/common/warp_utils/src/reject.rs index cf3d11af8d7..b6bb5ace3d0 100644 --- a/common/warp_utils/src/reject.rs +++ b/common/warp_utils/src/reject.rs @@ -82,6 +82,15 @@ pub fn custom_bad_request(msg: String) -> warp::reject::Rejection { warp::reject::custom(CustomBadRequest(msg)) } +#[derive(Debug)] +pub struct CustomDeserializeError(pub String); + +impl Reject for CustomDeserializeError {} + +pub fn custom_deserialize_error(msg: String) -> warp::reject::Rejection { + warp::reject::custom(CustomDeserializeError(msg)) +} + #[derive(Debug)] pub struct CustomServerError(pub String); @@ -161,6 +170,9 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result() { + message = format!("BAD_REQUEST: body deserialize error: {}", e.0); + code = StatusCode::BAD_REQUEST; } else if let Some(e) = err.find::() { message = format!("BAD_REQUEST: body deserialize error: {}", e); code = StatusCode::BAD_REQUEST;