-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathserver.rs
216 lines (199 loc) · 9.04 KB
/
server.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
use alloy::primitives::B256;
use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId,
PayloadStatus,
};
use jsonrpsee::core::{async_trait, ClientError, RpcResult};
use jsonrpsee::http_client::transport::HttpBackend;
use jsonrpsee::http_client::HttpClient;
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::types::error::INVALID_REQUEST_CODE;
use jsonrpsee::types::{ErrorCode, ErrorObject};
use op_alloy_rpc_types_engine::{
AsInnerPayload, OptimismExecutionPayloadEnvelopeV3, OptimismPayloadAttributes,
};
use reth_rpc_layer::AuthClientService;
use std::sync::Arc;
use tracing::{error, info};
#[rpc(server, client, namespace = "engine")]
pub trait EngineApi {
#[method(name = "forkchoiceUpdatedV3")]
async fn fork_choice_updated_v3(
&self,
fork_choice_state: ForkchoiceState,
payload_attributes: Option<OptimismPayloadAttributes>,
) -> RpcResult<ForkchoiceUpdated>;
#[method(name = "getPayloadV3")]
async fn get_payload_v3(
&self,
payload_id: PayloadId,
) -> RpcResult<OptimismExecutionPayloadEnvelopeV3>;
#[method(name = "newPayloadV3")]
async fn new_payload_v3(
&self,
payload: ExecutionPayloadV3,
versioned_hashes: Vec<B256>,
parent_beacon_block_root: B256,
) -> RpcResult<PayloadStatus>;
}
pub struct EthEngineApi<S = AuthClientService<HttpBackend>> {
l2_client: Arc<HttpClient<S>>,
builder_client: Arc<HttpClient<S>>,
boost_sync: bool,
}
impl<S> EthEngineApi<S> {
pub fn new(
l2_client: Arc<HttpClient<S>>,
builder_client: Arc<HttpClient<S>>,
boost_sync: bool,
) -> Self {
Self {
l2_client,
builder_client,
boost_sync,
}
}
}
#[async_trait]
impl EngineApiServer for EthEngineApi {
async fn fork_choice_updated_v3(
&self,
fork_choice_state: ForkchoiceState,
payload_attributes: Option<OptimismPayloadAttributes>,
) -> RpcResult<ForkchoiceUpdated> {
info!(
message = "received fork_choice_updated_v3",
"head_block_hash" = %fork_choice_state.head_block_hash,
"has_attributes" = payload_attributes.is_some(),
);
let use_tx_pool = payload_attributes
.as_ref()
.map(|attr| !attr.no_tx_pool.unwrap_or_default());
let should_send_to_builder = if self.boost_sync {
// don't send to builder only if no_tx_pool is set
use_tx_pool.unwrap_or(true)
} else {
// send to builder if there are payload attributes and no_tx_pool is not set
use_tx_pool.is_some()
};
if should_send_to_builder {
// async call to builder to trigger payload building and sync
let builder = self.builder_client.clone();
let attr = payload_attributes.clone();
tokio::spawn(async move {
builder.fork_choice_updated_v3(fork_choice_state, attr).await.map(|response| {
let payload_id_str = response.payload_id.map(|id| id.to_string()).unwrap_or_default();
if response.is_invalid() {
error!(message = "builder rejected fork_choice_updated_v3 with attributes", "payload_id" = payload_id_str, "validation_error" = %response.payload_status.status);
} else {
info!(message = "called fork_choice_updated_v3 to builder with payload attributes", "payload_status" = %response.payload_status.status, "payload_id" = payload_id_str);
}
}).map_err(|e| {
error!(message = "error calling fork_choice_updated_v3 to builder", "error" = %e, "head_block_hash" = %fork_choice_state.head_block_hash);
})
});
} else {
info!(message = "no payload attributes provided or no_tx_pool is set", "head_block_hash" = %fork_choice_state.head_block_hash);
}
self.l2_client
.fork_choice_updated_v3(fork_choice_state, payload_attributes)
.await
.map_err(|e| match e {
ClientError::Call(err) => err, // Already an ErrorObjectOwned, so just return it
other_error => {
error!(
message = "error calling fork_choice_updated_v3 for l2 client",
"error" = %other_error,
"head_block_hash" = %fork_choice_state.head_block_hash,
);
ErrorCode::InternalError.into()
}
})
}
async fn get_payload_v3(
&self,
payload_id: PayloadId,
) -> RpcResult<OptimismExecutionPayloadEnvelopeV3> {
info!(message = "received get_payload_v3", "payload_id" = %payload_id);
let l2_client_future = self.l2_client.get_payload_v3(payload_id);
let builder_client_future = Box::pin(async {
let payload = self.builder_client.get_payload_v3(payload_id).await.map_err(|e| {
error!(message = "error calling get_payload_v3 from builder", "error" = %e, "payload_id" = %payload_id);
e
})?;
info!(message = "received payload from builder", "payload_id" = %payload_id, "block_hash" = %payload.as_v1_payload().block_hash);
// Send the payload to the local execution engine with engine_newPayload to validate the block from the builder.
// Otherwise, we do not want to risk the network to a halt since op-node will not be able to propose the block.
// If validation fails, return the local block since that one has already been validated.
let payload_status = self.l2_client.new_payload_v3(payload.execution_payload.clone(), vec![], payload.parent_beacon_block_root).await.map_err(|e| {
error!(message = "error calling new_payload_v3 to validate builder payload", "error" = %e, "payload_id" = %payload_id);
e
})?;
if payload_status.is_invalid() {
error!(message = "builder payload was not valid", "payload_status" = %payload_status.status, "payload_id" = %payload_id);
Err(ClientError::Call(ErrorObject::owned(
INVALID_REQUEST_CODE,
"Builder payload was not valid",
None::<String>,
)))
} else {
info!(message = "received payload status from local execution engine validating builder payload", "payload_id" = %payload_id);
Ok(payload)
}
});
let (l2_payload, builder_payload) = tokio::join!(l2_client_future, builder_client_future);
builder_payload.or(l2_payload).map_err(|e| match e {
ClientError::Call(err) => err, // Already an ErrorObjectOwned, so just return it
other_error => {
error!(
message = "error calling get_payload_v3",
"error" = %other_error,
"payload_id" = %payload_id
);
ErrorCode::InternalError.into()
}
})
}
async fn new_payload_v3(
&self,
payload: ExecutionPayloadV3,
versioned_hashes: Vec<B256>,
parent_beacon_block_root: B256,
) -> RpcResult<PayloadStatus> {
let block_hash = ExecutionPayload::from(payload.clone()).block_hash();
info!(message = "received new_payload_v3", "block_hash" = %block_hash);
// async call to builder to sync the builder node
if self.boost_sync {
let builder = self.builder_client.clone();
let builder_payload = payload.clone();
let builder_versioned_hashes = versioned_hashes.clone();
tokio::spawn(async move {
builder.new_payload_v3(builder_payload, builder_versioned_hashes, parent_beacon_block_root).await
.map(|response: PayloadStatus| {
if response.is_invalid() {
error!(message = "builder rejected new_payload_v3", "block_hash" = %block_hash);
} else {
info!(message = "called new_payload_v3 to builder", "payload_status" = %response.status, "block_hash" = %block_hash);
}
}).map_err(|e| {
error!(message = "error calling new_payload_v3 to builder", "error" = %e, "block_hash" = %block_hash);
e
})
});
}
self.l2_client
.new_payload_v3(payload, versioned_hashes, parent_beacon_block_root)
.await
.map_err(|e| match e {
ClientError::Call(err) => err, // Already an ErrorObjectOwned, so just return it
other_error => {
error!(
message = "error calling new_payload_v3",
"error" = %other_error,
"block_hash" = %block_hash
);
ErrorCode::InternalError.into()
}
})
}
}