Skip to content

Commit

Permalink
Request tracing without task local storage (#3251)
Browse files Browse the repository at this point in the history
# Description
Alternative to #3247

All of the context still applies to this PR.
But instead of looking up the request id from the task local storage we
are now able to look it up from the tracing span.
When I introduced the request id feature I already wanted to have it
work this way but I was not able to get it working.

Finally after some more research I figured out that this can be
accomplished by implementing a custom tracing layer which makes the
request id available for lookup later on.

Now the request id is easily available wherever a the current span or
any of it's parents contains the request id.
This means we no longer have to worry about 2 different ways of making
important data available when spawning a new task.

# Changes
* reworked request_id handling to use a tracing layer instead of task
local storage
* piped the auction id as a request id into the ethrpc http transport
layer on `/settle` calls so we can associate rpc proxy logs with the
solution we tried so submit
* since the driver now has an internal settle queue the additional
indirection of spawning a new task in the `/settle` handler is no longer
needed => I removed it to make the code simpler

## How to test
Ran local_node test with extra logging to make sure the request ids
arrive in the http transport layer with and without request buffering.
Since I used the e2e test for multiple winners again we try to submit 2
solutions.
without buffering (applies to mevblocker):
```
2025-01-22T10:07:46.274Z ERROR request{id="554"}:/settle{solver="test_solver" auction_id=554}:mempool{kind="Mempool(MEVBlocker)"}: ethrpc::http: metadata="554" call="eth_sendRawTransaction"
...
2025-01-22T10:07:46.277Z ERROR request{id="554"}:/settle{solver="solver2" auction_id=554}:mempool{kind="Mempool(MEVBlocker)"}: ethrpc::http: metadata="554" call="eth_sendRawTransaction"
```
with buffering (applies to public mempool submissions):
```
2025-01-22T10:02:10.347Z ERROR ethrpc::http::metadata="548:eth_sendRawTransaction(0,1)"
```

Also added unit tests to show that we can correctly get the request_id
from the tracing span under various conditions.
  • Loading branch information
MartinquaXD authored Jan 24, 2025
1 parent 9cf2e35 commit 589e52d
Show file tree
Hide file tree
Showing 15 changed files with 201 additions and 118 deletions.
3 changes: 2 additions & 1 deletion crates/autopilot/src/infra/solvers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl Driver {
.post(url)
.json(request)
.timeout(timeout)
.header("X-REQUEST-ID", request.auction_id.to_string())
.send()
.await
.context("send")?;
Expand Down Expand Up @@ -94,7 +95,7 @@ impl Driver {
if let Some(timeout) = timeout {
request = request.timeout(timeout);
}
if let Some(request_id) = observe::request_id::get_task_local_storage() {
if let Some(request_id) = observe::request_id::from_current_span() {
request = request.header("X-REQUEST-ID", request_id);
}

Expand Down
8 changes: 3 additions & 5 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,10 @@ impl RunLoop {
.await;
if let Some(auction) = auction {
let auction_id = auction.id;
let auction_task = self_arc
self_arc
.single_run(auction)
.instrument(tracing::info_span!("auction", auction_id));

::observe::request_id::set_task_local_storage(auction_id.to_string(), auction_task)
.await;
.instrument(tracing::info_span!("auction", auction_id))
.await
};
}
}
Expand Down
6 changes: 4 additions & 2 deletions crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ impl Competition {
solution_id,
submission_deadline,
response_sender,
tracing_span: tracing::Span::current(),
};

self.settle_queue.try_send(request).map_err(|err| {
Expand Down Expand Up @@ -387,8 +388,8 @@ impl Competition {
solution_id,
submission_deadline,
response_sender,
tracing_span,
} = request;
let solver = self.solver.name().as_str();
async {
if self.eth.current_block().borrow().number >= submission_deadline {
if let Err(err) = response_sender.send(Err(DeadlineExceeded.into())) {
Expand All @@ -410,7 +411,7 @@ impl Competition {
tracing::error!(?err, "Failed to send /settle response");
}
}
.instrument(tracing::info_span!("/settle", solver, %auction_id))
.instrument(tracing_span)
.await
}
}
Expand Down Expand Up @@ -533,6 +534,7 @@ struct SettleRequest {
solution_id: u64,
submission_deadline: BlockNo,
response_sender: oneshot::Sender<Result<Settled, Error>>,
tracing_span: tracing::Span,
}

/// Solution information sent to the protocol by the driver before the solution
Expand Down
2 changes: 1 addition & 1 deletion crates/driver/src/infra/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Api {
.layer(axum::extract::DefaultBodyLimit::disable());
}

let make_svc = observe::make_service_with_task_local_storage!(app);
let make_svc = observe::make_service_with_request_tracing!(app);

// Start the server.
let server = axum::Server::bind(&self.addr).serve(make_svc);
Expand Down
18 changes: 4 additions & 14 deletions crates/driver/src/infra/api/routes/settle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod dto;

use {
crate::{
domain::{competition, competition::auction},
domain::competition::auction,
infra::{
api::{self, Error, State},
observe,
Expand All @@ -23,7 +23,7 @@ async fn route(
auction::Id::try_from(req.auction_id).map_err(api::routes::AuctionError::from)?;
let solver = state.solver().name().to_string();

let handle_request = async move {
async move {
observe::settling();
let result = state
.competition()
Expand All @@ -36,16 +36,6 @@ async fn route(
observe::settled(state.solver().name(), &result);
result.map(|_| ()).map_err(Into::into)
}
.instrument(tracing::info_span!("/settle", solver, %auction_id));

// Handle `/settle` call in a background task to ensure that we correctly
// submit the settlement (or cancellation) on-chain even if the server
// aborts the endpoint handler code.
// This can happen due do connection issues or when the autopilot aborts
// the `/settle` call when we reach the submission deadline.
Ok(
::observe::request_id::spawn_task_with_current_request_id(handle_request)
.await
.unwrap_or_else(|_| Err(competition::Error::SubmissionError))?,
)
.instrument(tracing::info_span!("/settle", solver, %auction_id))
.await
}
4 changes: 2 additions & 2 deletions crates/driver/src/infra/solver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl Solver {
.post(url.clone())
.body(body)
.timeout(auction.deadline().solvers().remaining().unwrap_or_default());
if let Some(id) = observe::request_id::get_task_local_storage() {
if let Some(id) = observe::request_id::from_current_span() {
req = req.header("X-REQUEST-ID", id);
}
let res = util::http::send(self.config.response_size_limit_max_bytes, req).await;
Expand All @@ -268,7 +268,7 @@ impl Solver {
let url = shared::url::join(&self.config.endpoint, "notify");
super::observe::solver_request(&url, &body);
let mut req = self.client.post(url).body(body);
if let Some(id) = observe::request_id::get_task_local_storage() {
if let Some(id) = observe::request_id::from_current_span() {
req = req.header("X-REQUEST-ID", id);
}
let response_size = self.config.response_size_limit_max_bytes;
Expand Down
2 changes: 1 addition & 1 deletion crates/e2e/src/setup/solver/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl Default for Mock {
.route("/solve", axum::routing::post(solve))
.with_state(state.clone());

let make_svc = observe::make_service_with_task_local_storage!(app);
let make_svc = observe::make_service_with_request_tracing!(app);
let server = axum::Server::bind(&"0.0.0.0:0".parse().unwrap()).serve(make_svc);

let mock = Mock {
Expand Down
16 changes: 5 additions & 11 deletions crates/ethrpc/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,8 @@ where
(requests.remove(0), trace_ids.remove(0), senders.remove(0));
let result = match (&request, trace_id) {
(Call::MethodCall(_), Some(trace_id)) => {
observe::request_id::set_task_local_storage(
trace_id,
inner.send(id, request),
)
.await
let span = observe::request_id::info_span(trace_id);
inner.send(id, request).instrument(span).await
}
_ => inner.send(id, request).await,
};
Expand All @@ -120,11 +117,8 @@ where
n => {
let results = match build_rpc_metadata(&requests, &trace_ids) {
Ok(metadata) => {
observe::request_id::set_task_local_storage(
metadata,
inner.send_batch(requests),
)
.await
let span = observe::request_id::info_span(metadata);
inner.send_batch(requests).instrument(span).await
}
Err(err) => {
tracing::error!(
Expand All @@ -148,7 +142,7 @@ where
/// Queue a call by sending it over calls channel to the background worker.
fn queue_call(&self, id: RequestId, request: Call) -> oneshot::Receiver<RpcResult> {
let (sender, receiver) = oneshot::channel();
let trace_id = observe::request_id::get_task_local_storage();
let trace_id = observe::request_id::from_current_span();
let context = (id, request, trace_id, sender);
self.calls
.unbounded_send(context)
Expand Down
4 changes: 2 additions & 2 deletions crates/ethrpc/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ async fn execute_rpc<T: DeserializeOwned>(
.body(body);
match request {
Request::Single(Call::MethodCall(call)) => {
if let Some(metadata) = observe::request_id::get_task_local_storage() {
if let Some(metadata) = observe::request_id::from_current_span() {
request_builder = request_builder.header("X-REQUEST-ID", metadata);
}
request_builder = request_builder.header("X-RPC-METHOD", call.method.clone());
}
Request::Batch(_) => {
if let Some(metadata) = observe::request_id::get_task_local_storage() {
if let Some(metadata) = observe::request_id::from_current_span() {
request_builder = request_builder.header("X-RPC-BATCH-METADATA", metadata);
}
}
Expand Down
Loading

0 comments on commit 589e52d

Please sign in to comment.