Skip to content

Commit

Permalink
test: live sync test to check make canonical (#10131)
Browse files Browse the repository at this point in the history
  • Loading branch information
fgimenez authored Aug 7, 2024
1 parent 2ab17a4 commit e7214af
Showing 1 changed file with 206 additions and 16 deletions.
222 changes: 206 additions & 16 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,16 @@ where
match self.insert_block(child) {
Ok(res) => {
debug!(target: "engine", child =?child_num_hash, ?res, "connected buffered block");
if self.is_sync_target_head(child_num_hash.hash) &&
matches!(
res,
InsertPayloadOk::Inserted(BlockStatus::Valid(
BlockAttachment::Canonical
))
)
{
self.make_canonical(child_num_hash.hash);
}
}
Err(err) => {
debug!(target: "engine", ?err, "failed to connect buffered block to tree");
Expand Down Expand Up @@ -1973,6 +1983,7 @@ mod tests {
action_rx: Receiver<PersistenceAction>,
executor_provider: MockExecutorProvider,
block_builder: TestBlockBuilder,
provider: MockEthProvider,
}

impl TestHarness {
Expand All @@ -1997,7 +2008,7 @@ mod tests {
let payload_builder = PayloadBuilderHandle::new(to_payload_service);

let tree = EngineApiTreeHandler::new(
provider,
provider.clone(),
executor_provider.clone(),
consensus,
payload_validator,
Expand All @@ -2021,6 +2032,7 @@ mod tests {
action_rx,
executor_provider,
block_builder,
provider,
}
}

Expand Down Expand Up @@ -2056,7 +2068,17 @@ mod tests {
self.tree.canonical_in_memory_state =
CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None);

self.blocks = blocks;
self.blocks = blocks.clone();
self.persist_blocks(
blocks
.into_iter()
.map(|b| SealedBlockWithSenders {
block: (*b.block).clone(),
senders: b.senders.to_vec(),
})
.collect(),
);

self
}

Expand Down Expand Up @@ -2138,7 +2160,7 @@ mod tests {

async fn insert_chain(
&mut self,
mut chain: impl Iterator<Item = &SealedBlockWithSenders> + Clone,
chain: impl IntoIterator<Item = SealedBlockWithSenders> + Clone,
) {
for block in chain.clone() {
self.insert_block(block.clone()).unwrap();
Expand All @@ -2149,7 +2171,7 @@ mod tests {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _),
) => {
assert!(chain.any(|b| b.hash() == block.hash()));
assert!(chain.clone().into_iter().any(|b| b.hash() == block.hash()));
}
_ => panic!("Unexpected event: {:#?}", event),
}
Expand All @@ -2167,6 +2189,49 @@ mod tests {
_ => panic!("Unexpected event: {:#?}", event),
}
}

async fn check_fork_chain_insertion(
&mut self,
chain: impl IntoIterator<Item = SealedBlockWithSenders> + Clone,
) {
for _ in chain.clone() {
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::ForkBlockAdded(block),
) => {
assert!(chain.clone().into_iter().any(|b| b.hash() == block.hash()));
}
_ => panic!("Unexpected event: {:#?}", event),
}
}
}

fn persist_blocks(&mut self, blocks: Vec<SealedBlockWithSenders>) {
self.setup_range_insertion_for_chain(blocks.clone());

let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len());
let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len());

for block in &blocks {
let unsealed_block = block.clone().unseal();
block_data.push((block.hash(), unsealed_block.clone().block));
headers_data.push((block.hash(), unsealed_block.header.clone()));
}

self.provider.extend_blocks(block_data);
self.provider.extend_headers(headers_data);
}

fn setup_range_insertion_for_chain(&mut self, chain: Vec<SealedBlockWithSenders>) {
let mut execution_outcomes = Vec::new();
for block in &chain {
let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
self.tree.provider.add_state_root(block.state_root);
execution_outcomes.push(execution_outcome);
}
self.extend_execution_outcome(execution_outcomes);
}
}

#[tokio::test]
Expand Down Expand Up @@ -2563,7 +2628,7 @@ mod tests {
// extend main chain
let main_chain = test_harness.block_builder.create_fork(base_chain[0].block(), 3);

test_harness.insert_chain(main_chain.iter()).await;
test_harness.insert_chain(main_chain).await;
}

#[tokio::test]
Expand All @@ -2584,17 +2649,7 @@ mod tests {
test_harness.send_fcu(fork_chain.last().unwrap().hash(), ForkchoiceStatus::Valid).await;

// check for ForkBlockAdded events, we expect fork_chain.len() blocks added
for _ in 0..fork_chain.len() {
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkBlockAdded(
block,
)) => {
assert!(fork_chain.iter().any(|b| b.hash() == block.hash()));
}
_ => panic!("Unexpected event: {:#?}", event),
}
}
test_harness.check_fork_chain_insertion(fork_chain.clone()).await;

// check for CanonicalChainCommitted event
test_harness.check_canon_commit(fork_chain.last().unwrap().hash()).await;
Expand Down Expand Up @@ -2659,4 +2714,139 @@ mod tests {
_ => panic!("Unexpected event: {:#?}", event),
}
}

#[tokio::test]
async fn test_engine_tree_live_sync_transition_eventually_canonical() {
reth_tracing::init_test_tracing();

let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec.clone());

// create base chain and setup test harness with it
let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
test_harness = test_harness.with_blocks(base_chain.clone());

// fcu to the tip of base chain
test_harness
.fcu_to(base_chain.last().unwrap().block().hash(), ForkchoiceStatus::Valid)
.await;

// create main chain, extension of base chain, with enough blocks to
// trigger backfill sync
let main_chain = test_harness
.block_builder
.create_fork(base_chain[0].block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);

let main_chain_last = main_chain.last().unwrap();
let main_chain_last_hash = main_chain_last.hash();
let main_chain_backfill_target =
main_chain.get(MIN_BLOCKS_FOR_PIPELINE_RUN as usize).unwrap();
let main_chain_backfill_target_hash = main_chain_backfill_target.hash();

// fcu to the element of main chain that should trigger backfill sync
test_harness.send_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
test_harness.check_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;

// check download request for target
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
assert_eq!(hash_set, HashSet::from([main_chain_backfill_target_hash]));
}
_ => panic!("Unexpected event: {:#?}", event),
}

// send message to tell the engine the requested block was downloaded
test_harness.tree.on_engine_message(FromEngine::DownloadedBlocks(vec![
main_chain_backfill_target.clone(),
]));

// check that backfill is triggered
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BackfillAction(BackfillAction::Start(
reth_stages::PipelineTarget::Sync(target_hash),
)) => {
assert_eq!(target_hash, main_chain_backfill_target_hash);
}
_ => panic!("Unexpected event: {:#?}", event),
}

// persist blocks of main chain, same as the backfill operation would do
let backfilled_chain: Vec<_> =
main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect();
test_harness.persist_blocks(backfilled_chain.clone());

// setting up execution outcomes for the chain, the blocks will be
// executed starting from the oldest, so we need to reverse.
let mut backfilled_chain_rev = backfilled_chain.clone();
backfilled_chain_rev.reverse();
test_harness.setup_range_insertion_for_chain(backfilled_chain_rev.to_vec());

// send message to mark backfill finished
test_harness.tree.on_engine_message(FromEngine::Event(
FromOrchestrator::BackfillSyncFinished(ControlFlow::Continue {
block_number: main_chain_backfill_target.number,
}),
));

// send fcu to the tip of main
test_harness.fcu_to(main_chain_last_hash, ForkchoiceStatus::Syncing).await;

let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::Download(DownloadRequest::BlockSet(target_hash)) => {
assert_eq!(target_hash, HashSet::from([main_chain_last_hash]));
}
_ => panic!("Unexpected event: {:#?}", event),
}

// tell engine main chain tip downloaded
test_harness
.tree
.on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_last.clone()]));

// check download range request
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
assert_eq!(
total_blocks,
(main_chain.len() - MIN_BLOCKS_FOR_PIPELINE_RUN as usize - 2) as u64
);
assert_eq!(initial_hash, main_chain_last.parent_hash);
}
_ => panic!("Unexpected event: {:#?}", event),
}

let remaining: Vec<_> = main_chain
.clone()
.drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len())
.collect();

// setting up execution outcomes for the chain, the blocks will be
// executed starting from the oldest, so we need to reverse.
let mut remaining_rev = remaining.clone();
remaining_rev.reverse();
test_harness.setup_range_insertion_for_chain(remaining_rev.to_vec());

// tell engine block range downloaded
test_harness.tree.on_engine_message(FromEngine::DownloadedBlocks(remaining.clone()));

test_harness.check_fork_chain_insertion(remaining).await;

// check canonical chain committed event with the hash of the latest block
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalChainCommitted(header, ..),
) => {
assert_eq!(header.hash(), main_chain_last_hash);
}
_ => panic!("Unexpected event: {:#?}", event),
}

// new head is the tip of the main chain
assert_eq!(test_harness.tree.state.tree_state.canonical_head().hash, main_chain_last_hash);
}
}

0 comments on commit e7214af

Please sign in to comment.