Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(debug): engine reorg util depth #10575

Merged
merged 5 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions book/cli/reth/node.md
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,9 @@ Debug:
--debug.reorg-frequency <REORG_FREQUENCY>
If provided, the chain will be reorged at specified frequency

--debug.reorg-depth <REORG_DEPTH>
The reorg depth for chain reorgs

--debug.engine-api-store <PATH>
The path to store engine API messages at. If specified, all of the intercepted engine API messages will be written to specified location

Expand Down
12 changes: 11 additions & 1 deletion crates/engine/util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,19 @@ pub trait EngineMessageStreamExt<Engine: EngineTypes>:
evm_config: Evm,
payload_validator: ExecutionPayloadValidator,
frequency: usize,
depth: Option<usize>,
) -> EngineReorg<Self, Engine, Provider, Evm>
where
Self: Sized,
{
EngineReorg::new(self, provider, evm_config, payload_validator, frequency)
EngineReorg::new(
self,
provider,
evm_config,
payload_validator,
frequency,
depth.unwrap_or_default(),
)
}

/// If frequency is [Some], returns the stream that creates reorgs with
Expand All @@ -116,6 +124,7 @@ pub trait EngineMessageStreamExt<Engine: EngineTypes>:
evm_config: Evm,
payload_validator: ExecutionPayloadValidator,
frequency: Option<usize>,
depth: Option<usize>,
) -> Either<EngineReorg<Self, Engine, Provider, Evm>, Self>
where
Self: Sized,
Expand All @@ -127,6 +136,7 @@ pub trait EngineMessageStreamExt<Engine: EngineTypes>:
evm_config,
payload_validator,
frequency,
depth.unwrap_or_default(),
))
} else {
Either::Right(self)
Expand Down
172 changes: 94 additions & 78 deletions crates/engine/util/src/reorg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub struct EngineReorg<S, Engine: EngineTypes, Provider, Evm> {
payload_validator: ExecutionPayloadValidator,
/// The frequency of reorgs.
frequency: usize,
/// The depth of reorgs.
depth: usize,
/// The number of forwarded forkchoice states.
/// This is reset after a reorg.
forkchoice_states_forwarded: usize,
Expand All @@ -80,13 +82,15 @@ impl<S, Engine: EngineTypes, Provider, Evm> EngineReorg<S, Engine, Provider, Evm
evm_config: Evm,
payload_validator: ExecutionPayloadValidator,
frequency: usize,
depth: usize,
) -> Self {
Self {
stream,
provider,
evm_config,
payload_validator,
frequency,
depth,
state: EngineReorgState::Forward,
forkchoice_states_forwarded: 0,
last_forkchoice_state: None,
Expand Down Expand Up @@ -138,88 +142,84 @@ where
}

let next = ready!(this.stream.poll_next_unpin(cx));
let item = match next {
Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx }) => {
if this.forkchoice_states_forwarded > this.frequency {
if let Some(last_forkchoice_state) = this
.last_forkchoice_state
// Only enter reorg state if new payload attaches to current head.
.filter(|state| state.head_block_hash == payload.parent_hash())
{
// Enter the reorg state.
// The current payload will be immediately forwarded by being in front
// of the queue. Then we attempt to reorg the current head by generating
// a payload that attaches to the head's parent and is based on the
// non-conflicting transactions (txs from block `n + 1` that are valid
// at block `n` according to consensus checks) from the current payload
// as well as the corresponding forkchoice state. We will rely on CL to
// reorg us back to canonical chain.
// TODO: This is an expensive blocking operation, ideally it's spawned
// as a task so that the stream could yield the control back.
let (reorg_payload, reorg_cancun_fields) = match create_reorg_head(
this.provider,
this.evm_config,
this.payload_validator,
payload.clone(),
cancun_fields.clone(),
) {
Ok(result) => result,
Err(error) => {
error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
// Forward the payload and attempt to create reorg on top of the
// next one
return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
payload,
cancun_fields,
tx,
}))
}
};
let reorg_forkchoice_state = ForkchoiceState {
finalized_block_hash: last_forkchoice_state.finalized_block_hash,
safe_block_hash: last_forkchoice_state.safe_block_hash,
head_block_hash: reorg_payload.block_hash(),
};

let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
this.reorg_responses.extend([
Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
]);

*this.state = EngineReorgState::Reorg {
queue: VecDeque::from([
// Current payload
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx },
// Reorg payload
BeaconEngineMessage::NewPayload {
payload: reorg_payload,
cancun_fields: reorg_cancun_fields,
tx: reorg_payload_tx,
},
// Reorg forkchoice state
BeaconEngineMessage::ForkchoiceUpdated {
state: reorg_forkchoice_state,
payload_attrs: None,
tx: reorg_fcu_tx,
},
]),
};
continue
let item = match (next, &this.last_forkchoice_state) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a refactor, nothing to see here

(
Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx }),
Some(last_forkchoice_state),
) if this.forkchoice_states_forwarded > this.frequency &&
// Only enter reorg state if new payload attaches to current head.
last_forkchoice_state.head_block_hash == payload.parent_hash() =>
{
// Enter the reorg state.
// The current payload will be immediately forwarded by being in front of the
// queue. Then we attempt to reorg the current head by generating a payload that
// attaches to the head's parent and is based on the non-conflicting
// transactions (txs from block `n + 1` that are valid at block `n` according to
// consensus checks) from the current payload as well as the corresponding
// forkchoice state. We will rely on CL to reorg us back to canonical chain.
// TODO: This is an expensive blocking operation, ideally it's spawned as a task
// so that the stream could yield the control back.
let (reorg_payload, reorg_cancun_fields) = match create_reorg_head(
this.provider,
this.evm_config,
this.payload_validator,
*this.depth,
payload.clone(),
cancun_fields.clone(),
) {
Ok(result) => result,
Err(error) => {
error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
// Forward the payload and attempt to create reorg on top of
// the next one
return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
payload,
cancun_fields,
tx,
}))
}
}
Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx })
};
let reorg_forkchoice_state = ForkchoiceState {
finalized_block_hash: last_forkchoice_state.finalized_block_hash,
safe_block_hash: last_forkchoice_state.safe_block_hash,
head_block_hash: reorg_payload.block_hash(),
};

let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
this.reorg_responses.extend([
Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
]);

let queue = VecDeque::from([
// Current payload
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx },
// Reorg payload
BeaconEngineMessage::NewPayload {
payload: reorg_payload,
cancun_fields: reorg_cancun_fields,
tx: reorg_payload_tx,
},
// Reorg forkchoice state
BeaconEngineMessage::ForkchoiceUpdated {
state: reorg_forkchoice_state,
payload_attrs: None,
tx: reorg_fcu_tx,
},
]);
*this.state = EngineReorgState::Reorg { queue };
continue
}
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }) => {
(Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }), _) => {
// Record last forkchoice state forwarded to the engine.
// We do not care if it's valid since engine should be able to handle
// reorgs that rely on invalid forkchoice state.
*this.last_forkchoice_state = Some(state);
*this.forkchoice_states_forwarded += 1;
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx })
}
item => item,
(item, _) => item,
};
return Poll::Ready(item)
}
Expand All @@ -230,6 +230,7 @@ fn create_reorg_head<Provider, Evm>(
provider: &Provider,
evm_config: &Evm,
payload_validator: &ExecutionPayloadValidator,
mut depth: usize,
next_payload: ExecutionPayload,
next_cancun_fields: Option<CancunPayloadFields>,
) -> RethResult<(ExecutionPayload, Option<CancunPayloadFields>)>
Expand All @@ -244,14 +245,29 @@ where
.ensure_well_formed_payload(next_payload, next_cancun_fields.into())
.map_err(RethError::msg)?;

// Fetch reorg target block and its parent
let reorg_target = provider
.block_by_hash(next_block.parent_hash)?
.ok_or_else(|| ProviderError::HeaderNotFound(next_block.parent_hash.into()))?;
// Fetch reorg target block depending on its depth and its parent.
let mut previous_hash = next_block.parent_hash;
let mut candidate_transactions = next_block.body;
let reorg_target = 'target: {
loop {
let reorg_target = provider
.block_by_hash(previous_hash)?
.ok_or_else(|| ProviderError::HeaderNotFound(previous_hash.into()))?;
if depth == 0 {
break 'target reorg_target
}

depth -= 1;
previous_hash = reorg_target.parent_hash;
candidate_transactions = reorg_target.body;
}
};
let reorg_target_parent = provider
.block_by_hash(reorg_target.parent_hash)?
.ok_or_else(|| ProviderError::HeaderNotFound(reorg_target.parent_hash.into()))?;

debug!(target: "engine::stream::reorg", number = reorg_target.number, hash = %previous_hash, "Selected reorg target");

// Configure state
let state_provider = provider.state_by_block_hash(reorg_target.parent_hash)?;
let mut state = State::builder()
Expand Down Expand Up @@ -287,7 +303,7 @@ where
let mut transactions = Vec::new();
let mut receipts = Vec::new();
let mut versioned_hashes = Vec::new();
for tx in next_block.body {
for tx in candidate_transactions {
// ensure we still have capacity for this transaction
if cumulative_gas_used + tx.gas_limit() > reorg_target.gas_limit {
continue
Expand Down
1 change: 1 addition & 0 deletions crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ where
ctx.components().evm_config().clone(),
reth_payload_validator::ExecutionPayloadValidator::new(ctx.chain_spec()),
node_config.debug.reorg_frequency,
node_config.debug.reorg_depth,
)
// Store messages _after_ skipping so that `replay-engine` command
// would replay only the messages that were observed by the engine
Expand Down
1 change: 1 addition & 0 deletions crates/node/builder/src/launch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ where
ctx.components().evm_config().clone(),
reth_payload_validator::ExecutionPayloadValidator::new(ctx.chain_spec()),
node_config.debug.reorg_frequency,
node_config.debug.reorg_depth,
)
// Store messages _after_ skipping so that `replay-engine` command
// would replay only the messages that were observed by the engine
Expand Down
4 changes: 4 additions & 0 deletions crates/node/core/src/args/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ pub struct DebugArgs {
#[arg(long = "debug.reorg-frequency", help_heading = "Debug")]
pub reorg_frequency: Option<usize>,

/// The reorg depth for chain reorgs.
#[arg(long = "debug.reorg-depth", requires = "reorg_frequency", help_heading = "Debug")]
pub reorg_depth: Option<usize>,

/// The path to store engine API messages at.
/// If specified, all of the intercepted engine API messages
/// will be written to specified location.
Expand Down
Loading