Skip to content

Commit

Permalink
Improve HTTP API error messages + tweaks (sigp#4595)
Browse files Browse the repository at this point in the history
Closes sigp#3404 (mostly)

- Remove all uses of Warp's `and_then` (which backtracks) in favour of `then` (which doesn't).
- Bump the priority of the `POST` method for `v2/blocks` to `P0`. Publishing a block needs to happen quickly.
- Run the new SSZ POST endpoints on the beacon processor. I think this was missed in between merging sigp#4462 and sigp#4504/sigp#4479.
- Fix a minor issue in the validator registrations endpoint whereby an error from spawning the task on the beacon processor would be dropped.

I've tested this manually and can confirm that we no longer get the dreaded `Unsupported endpoint version` errors for queries like:

```
$ curl -X POST -H "Content-Type: application/json" --data @block.json "http://localhost:5052/eth/v2/beacon/blocks" | jq
{
  "code": 400,
  "message": "BAD_REQUEST: WeakSubjectivityConflict",
  "stacktraces": []
}
```

```
$ curl -X POST -H "Content-Type: application/octet-stream" --data @block.json "http://localhost:5052/eth/v2/beacon/blocks" | jq
{
  "code": 400,
  "message": "BAD_REQUEST: invalid SSZ: OffsetOutOfBounds(572530811)",
  "stacktraces": []
}
```

```
$ curl "http://localhost:5052/eth/v2/validator/blocks/7067595"
{"code":400,"message":"BAD_REQUEST: invalid query: Invalid query string","stacktraces":[]}
```

However, I can still trigger it by leaving off the `Content-Type`. We can re-test this aspect with sigp#4575.
  • Loading branch information
michaelsproul authored and Woodpile37 committed Jan 6, 2024
1 parent 0833331 commit f7d5059
Showing 1 changed file with 55 additions and 37 deletions.
92 changes: 55 additions & 37 deletions beacon_node/http_api/src/task_spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@ pub struct TaskSpawner<E: EthSpec> {
beacon_processor_send: Option<BeaconProcessorSend<E>>,
}

/// Convert a warp `Rejection` into a `Response`.
///
/// This function should *always* be used to convert rejections into responses. This prevents warp
/// from trying to backtrack in strange ways. See: https://github.com/sigp/lighthouse/issues/3404
pub async fn convert_rejection<T: Reply>(res: Result<T, warp::Rejection>) -> Response {
match res {
Ok(response) => response.into_response(),
Err(e) => match warp_utils::reject::handle_rejection(e).await {
Ok(reply) => reply.into_response(),
Err(_) => warp::reply::with_status(
warp::reply::json(&"unhandled error"),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response(),
},
}
}

impl<E: EthSpec> TaskSpawner<E> {
pub fn new(beacon_processor_send: Option<BeaconProcessorSend<E>>) -> Self {
Self {
Expand All @@ -43,11 +61,7 @@ impl<E: EthSpec> TaskSpawner<E> {
}

/// Executes a "blocking" (non-async) task which returns a `Response`.
pub async fn blocking_response_task<F, T>(
self,
priority: Priority,
func: F,
) -> Result<Response, warp::Rejection>
pub async fn blocking_response_task<F, T>(self, priority: Priority, func: F) -> Response
where
F: FnOnce() -> Result<T, warp::Rejection> + Send + Sync + 'static,
T: Reply + Send + 'static,
Expand All @@ -65,31 +79,25 @@ impl<E: EthSpec> TaskSpawner<E> {
};

// Send the function to the beacon processor for execution at some arbitrary time.
match send_to_beacon_processor(
let result = send_to_beacon_processor(
beacon_processor_send,
priority,
BlockingOrAsync::Blocking(Box::new(process_fn)),
rx,
)
.await
{
Ok(result) => result.map(Reply::into_response),
Err(error_response) => Ok(error_response),
}
.and_then(|x| x);
convert_rejection(result).await
} else {
// There is no beacon processor so spawn a task directly on the
// tokio executor.
warp_utils::task::blocking_response_task(func).await
convert_rejection(warp_utils::task::blocking_response_task(func).await).await
}
}

/// Executes a "blocking" (non-async) task which returns a JSON-serializable
/// object.
pub async fn blocking_json_task<F, T>(
self,
priority: Priority,
func: F,
) -> Result<Response, warp::Rejection>
pub async fn blocking_json_task<F, T>(self, priority: Priority, func: F) -> Response
where
F: FnOnce() -> Result<T, warp::Rejection> + Send + Sync + 'static,
T: Serialize + Send + 'static,
Expand All @@ -98,11 +106,26 @@ impl<E: EthSpec> TaskSpawner<E> {
self.blocking_response_task(priority, func).await
}

/// Executes an async task which may return a `warp::Rejection`.
/// Executes an async task which may return a `Rejection`, which will be converted to a response.
pub async fn spawn_async_with_rejection(
self,
priority: Priority,
func: impl Future<Output = Result<Response, warp::Rejection>> + Send + Sync + 'static,
) -> Response {
let result = self
.spawn_async_with_rejection_no_conversion(priority, func)
.await;
convert_rejection(result).await
}

/// Same as `spawn_async_with_rejection` but returning a result with the unhandled rejection.
///
/// If you call this function you MUST convert the rejection to a response and not let it
/// propagate into Warp's filters. See `convert_rejection`.
pub async fn spawn_async_with_rejection_no_conversion(
self,
priority: Priority,
func: impl Future<Output = Result<Response, warp::Rejection>> + Send + Sync + 'static,
) -> Result<Response, warp::Rejection> {
if let Some(beacon_processor_send) = &self.beacon_processor_send {
// Create a wrapper future that will execute `func` and send the
Expand All @@ -124,18 +147,16 @@ impl<E: EthSpec> TaskSpawner<E> {
rx,
)
.await
.unwrap_or_else(Result::Ok)
.and_then(|x| x)
} else {
// There is no beacon processor so spawn a task directly on the
// tokio executor.
tokio::task::spawn(func).await.unwrap_or_else(|e| {
let response = warp::reply::with_status(
warp::reply::json(&format!("Tokio did not execute task: {e:?}")),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response();
Ok(response)
})
tokio::task::spawn(func)
.await
.map_err(|_| {
warp_utils::reject::custom_server_error("Tokio failed to spawn task".into())
})
.and_then(|x| x)
}
}

Expand All @@ -158,14 +179,14 @@ impl<E: EthSpec> TaskSpawner<E> {
};

// Send the function to the beacon processor for execution at some arbitrary time.
send_to_beacon_processor(
let result = send_to_beacon_processor(
beacon_processor_send,
priority,
BlockingOrAsync::Async(Box::pin(process_fn)),
rx,
)
.await
.unwrap_or_else(|error_response| error_response)
.await;
convert_rejection(result).await
} else {
// There is no beacon processor so spawn a task directly on the
// tokio executor.
Expand All @@ -182,14 +203,14 @@ impl<E: EthSpec> TaskSpawner<E> {

/// Send a task to the beacon processor and await execution.
///
/// If the task is not executed, return an `Err(response)` with an error message
/// If the task is not executed, return an `Err` with an error message
/// for the API consumer.
async fn send_to_beacon_processor<E: EthSpec, T>(
beacon_processor_send: &BeaconProcessorSend<E>,
priority: Priority,
process_fn: BlockingOrAsync,
rx: oneshot::Receiver<T>,
) -> Result<T, Response> {
) -> Result<T, warp::Rejection> {
let error_message = match beacon_processor_send.try_send(priority.work_event(process_fn)) {
Ok(()) => {
match rx.await {
Expand All @@ -205,10 +226,7 @@ async fn send_to_beacon_processor<E: EthSpec, T>(
Err(TrySendError::Closed(_)) => "The task was dropped. The server is shutting down.",
};

let error_response = warp::reply::with_status(
warp::reply::json(&error_message),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response();
Err(error_response)
Err(warp_utils::reject::custom_server_error(
error_message.to_string(),
))
}

0 comments on commit f7d5059

Please sign in to comment.