Skip to content

Commit

Permalink
feat(debug): engine reorg util depth (#10575)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk authored Aug 28, 2024
1 parent 5b7d637 commit f600ff0
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 79 deletions.
3 changes: 3 additions & 0 deletions book/cli/reth/node.md
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,9 @@ Debug:
--debug.reorg-frequency <REORG_FREQUENCY>
If provided, the chain will be reorged at specified frequency

--debug.reorg-depth <REORG_DEPTH>
The reorg depth for chain reorgs

--debug.engine-api-store <PATH>
The path to store engine API messages at. If specified, all of the intercepted engine API messages will be written to specified location

Expand Down
12 changes: 11 additions & 1 deletion crates/engine/util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,19 @@ pub trait EngineMessageStreamExt<Engine: EngineTypes>:
evm_config: Evm,
payload_validator: ExecutionPayloadValidator,
frequency: usize,
depth: Option<usize>,
) -> EngineReorg<Self, Engine, Provider, Evm>
where
Self: Sized,
{
EngineReorg::new(self, provider, evm_config, payload_validator, frequency)
EngineReorg::new(
self,
provider,
evm_config,
payload_validator,
frequency,
depth.unwrap_or_default(),
)
}

/// If frequency is [Some], returns the stream that creates reorgs with
Expand All @@ -116,6 +124,7 @@ pub trait EngineMessageStreamExt<Engine: EngineTypes>:
evm_config: Evm,
payload_validator: ExecutionPayloadValidator,
frequency: Option<usize>,
depth: Option<usize>,
) -> Either<EngineReorg<Self, Engine, Provider, Evm>, Self>
where
Self: Sized,
Expand All @@ -127,6 +136,7 @@ pub trait EngineMessageStreamExt<Engine: EngineTypes>:
evm_config,
payload_validator,
frequency,
depth.unwrap_or_default(),
))
} else {
Either::Right(self)
Expand Down
172 changes: 94 additions & 78 deletions crates/engine/util/src/reorg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub struct EngineReorg<S, Engine: EngineTypes, Provider, Evm> {
payload_validator: ExecutionPayloadValidator,
/// The frequency of reorgs.
frequency: usize,
/// The depth of reorgs.
depth: usize,
/// The number of forwarded forkchoice states.
/// This is reset after a reorg.
forkchoice_states_forwarded: usize,
Expand All @@ -80,13 +82,15 @@ impl<S, Engine: EngineTypes, Provider, Evm> EngineReorg<S, Engine, Provider, Evm
evm_config: Evm,
payload_validator: ExecutionPayloadValidator,
frequency: usize,
depth: usize,
) -> Self {
Self {
stream,
provider,
evm_config,
payload_validator,
frequency,
depth,
state: EngineReorgState::Forward,
forkchoice_states_forwarded: 0,
last_forkchoice_state: None,
Expand Down Expand Up @@ -138,88 +142,84 @@ where
}

let next = ready!(this.stream.poll_next_unpin(cx));
let item = match next {
Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx }) => {
if this.forkchoice_states_forwarded > this.frequency {
if let Some(last_forkchoice_state) = this
.last_forkchoice_state
// Only enter reorg state if new payload attaches to current head.
.filter(|state| state.head_block_hash == payload.parent_hash())
{
// Enter the reorg state.
// The current payload will be immediately forwarded by being in front
// of the queue. Then we attempt to reorg the current head by generating
// a payload that attaches to the head's parent and is based on the
// non-conflicting transactions (txs from block `n + 1` that are valid
// at block `n` according to consensus checks) from the current payload
// as well as the corresponding forkchoice state. We will rely on CL to
// reorg us back to canonical chain.
// TODO: This is an expensive blocking operation, ideally it's spawned
// as a task so that the stream could yield the control back.
let (reorg_payload, reorg_cancun_fields) = match create_reorg_head(
this.provider,
this.evm_config,
this.payload_validator,
payload.clone(),
cancun_fields.clone(),
) {
Ok(result) => result,
Err(error) => {
error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
// Forward the payload and attempt to create reorg on top of the
// next one
return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
payload,
cancun_fields,
tx,
}))
}
};
let reorg_forkchoice_state = ForkchoiceState {
finalized_block_hash: last_forkchoice_state.finalized_block_hash,
safe_block_hash: last_forkchoice_state.safe_block_hash,
head_block_hash: reorg_payload.block_hash(),
};

let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
this.reorg_responses.extend([
Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
]);

*this.state = EngineReorgState::Reorg {
queue: VecDeque::from([
// Current payload
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx },
// Reorg payload
BeaconEngineMessage::NewPayload {
payload: reorg_payload,
cancun_fields: reorg_cancun_fields,
tx: reorg_payload_tx,
},
// Reorg forkchoice state
BeaconEngineMessage::ForkchoiceUpdated {
state: reorg_forkchoice_state,
payload_attrs: None,
tx: reorg_fcu_tx,
},
]),
};
continue
let item = match (next, &this.last_forkchoice_state) {
(
Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx }),
Some(last_forkchoice_state),
) if this.forkchoice_states_forwarded > this.frequency &&
// Only enter reorg state if new payload attaches to current head.
last_forkchoice_state.head_block_hash == payload.parent_hash() =>
{
// Enter the reorg state.
// The current payload will be immediately forwarded by being in front of the
// queue. Then we attempt to reorg the current head by generating a payload that
// attaches to the head's parent and is based on the non-conflicting
// transactions (txs from block `n + 1` that are valid at block `n` according to
// consensus checks) from the current payload as well as the corresponding
// forkchoice state. We will rely on CL to reorg us back to canonical chain.
// TODO: This is an expensive blocking operation, ideally it's spawned as a task
// so that the stream could yield the control back.
let (reorg_payload, reorg_cancun_fields) = match create_reorg_head(
this.provider,
this.evm_config,
this.payload_validator,
*this.depth,
payload.clone(),
cancun_fields.clone(),
) {
Ok(result) => result,
Err(error) => {
error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
// Forward the payload and attempt to create reorg on top of
// the next one
return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
payload,
cancun_fields,
tx,
}))
}
}
Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx })
};
let reorg_forkchoice_state = ForkchoiceState {
finalized_block_hash: last_forkchoice_state.finalized_block_hash,
safe_block_hash: last_forkchoice_state.safe_block_hash,
head_block_hash: reorg_payload.block_hash(),
};

let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
this.reorg_responses.extend([
Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
]);

let queue = VecDeque::from([
// Current payload
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx },
// Reorg payload
BeaconEngineMessage::NewPayload {
payload: reorg_payload,
cancun_fields: reorg_cancun_fields,
tx: reorg_payload_tx,
},
// Reorg forkchoice state
BeaconEngineMessage::ForkchoiceUpdated {
state: reorg_forkchoice_state,
payload_attrs: None,
tx: reorg_fcu_tx,
},
]);
*this.state = EngineReorgState::Reorg { queue };
continue
}
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }) => {
(Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }), _) => {
// Record last forkchoice state forwarded to the engine.
// We do not care if it's valid since engine should be able to handle
// reorgs that rely on invalid forkchoice state.
*this.last_forkchoice_state = Some(state);
*this.forkchoice_states_forwarded += 1;
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx })
}
item => item,
(item, _) => item,
};
return Poll::Ready(item)
}
Expand All @@ -230,6 +230,7 @@ fn create_reorg_head<Provider, Evm>(
provider: &Provider,
evm_config: &Evm,
payload_validator: &ExecutionPayloadValidator,
mut depth: usize,
next_payload: ExecutionPayload,
next_cancun_fields: Option<CancunPayloadFields>,
) -> RethResult<(ExecutionPayload, Option<CancunPayloadFields>)>
Expand All @@ -244,14 +245,29 @@ where
.ensure_well_formed_payload(next_payload, next_cancun_fields.into())
.map_err(RethError::msg)?;

// Fetch reorg target block and its parent
let reorg_target = provider
.block_by_hash(next_block.parent_hash)?
.ok_or_else(|| ProviderError::HeaderNotFound(next_block.parent_hash.into()))?;
// Fetch reorg target block depending on its depth and its parent.
let mut previous_hash = next_block.parent_hash;
let mut candidate_transactions = next_block.body;
let reorg_target = 'target: {
loop {
let reorg_target = provider
.block_by_hash(previous_hash)?
.ok_or_else(|| ProviderError::HeaderNotFound(previous_hash.into()))?;
if depth == 0 {
break 'target reorg_target
}

depth -= 1;
previous_hash = reorg_target.parent_hash;
candidate_transactions = reorg_target.body;
}
};
let reorg_target_parent = provider
.block_by_hash(reorg_target.parent_hash)?
.ok_or_else(|| ProviderError::HeaderNotFound(reorg_target.parent_hash.into()))?;

debug!(target: "engine::stream::reorg", number = reorg_target.number, hash = %previous_hash, "Selected reorg target");

// Configure state
let state_provider = provider.state_by_block_hash(reorg_target.parent_hash)?;
let mut state = State::builder()
Expand Down Expand Up @@ -287,7 +303,7 @@ where
let mut transactions = Vec::new();
let mut receipts = Vec::new();
let mut versioned_hashes = Vec::new();
for tx in next_block.body {
for tx in candidate_transactions {
// ensure we still have capacity for this transaction
if cumulative_gas_used + tx.gas_limit() > reorg_target.gas_limit {
continue
Expand Down
1 change: 1 addition & 0 deletions crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ where
ctx.components().evm_config().clone(),
reth_payload_validator::ExecutionPayloadValidator::new(ctx.chain_spec()),
node_config.debug.reorg_frequency,
node_config.debug.reorg_depth,
)
// Store messages _after_ skipping so that `replay-engine` command
// would replay only the messages that were observed by the engine
Expand Down
1 change: 1 addition & 0 deletions crates/node/builder/src/launch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ where
ctx.components().evm_config().clone(),
reth_payload_validator::ExecutionPayloadValidator::new(ctx.chain_spec()),
node_config.debug.reorg_frequency,
node_config.debug.reorg_depth,
)
// Store messages _after_ skipping so that `replay-engine` command
// would replay only the messages that were observed by the engine
Expand Down
4 changes: 4 additions & 0 deletions crates/node/core/src/args/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ pub struct DebugArgs {
#[arg(long = "debug.reorg-frequency", help_heading = "Debug")]
pub reorg_frequency: Option<usize>,

/// The reorg depth for chain reorgs.
#[arg(long = "debug.reorg-depth", requires = "reorg_frequency", help_heading = "Debug")]
pub reorg_depth: Option<usize>,

/// The path to store engine API messages at.
/// If specified, all of the intercepted engine API messages
/// will be written to specified location.
Expand Down

0 comments on commit f600ff0

Please sign in to comment.