From 578a99ddfdff79c8e6a07641c09c7670a0b2900a Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Fri, 6 Oct 2023 15:49:33 -0700 Subject: [PATCH 1/4] Check for parent being on chain When retrieveing the ready blocks, verify that the parent of the first ready block is on chain. If the parent is not on chain, we are downloading from a fork. In this case, keep downloading until we have a parent on chain (common ancestor) --- substrate/client/network/sync/src/blocks.rs | 24 ++ substrate/client/network/sync/src/lib.rs | 437 +++++++++++++++++++- 2 files changed, 439 insertions(+), 22 deletions(-) diff --git a/substrate/client/network/sync/src/blocks.rs b/substrate/client/network/sync/src/blocks.rs index 240c1ca1f8b2..56019f768fc1 100644 --- a/substrate/client/network/sync/src/blocks.rs +++ b/substrate/client/network/sync/src/blocks.rs @@ -212,6 +212,30 @@ impl BlockCollection { ready } + /// Returns the block header of the first block that is ready for importing. + /// `from` is the maximum block number for the start of the range that we are interested in. + /// The function will return None if the first block ready is higher than `from`. + pub fn first_ready_block_hdr(&self, from: NumberFor) -> Option { + let mut prev = from; + for (&start, range_data) in &self.blocks { + if start > prev { + break + } + + match range_data { + BlockRangeState::Complete(blocks) => { + let len = (blocks.len() as u32).into(); + prev = start + len; + if let Some(BlockData { block, .. }) = blocks.first() { + return block.header.clone() + } + }, + _ => continue, + } + } + None + } + pub fn clear_queued(&mut self, hash: &B::Hash) { if let Some((from, to)) = self.queued_blocks.remove(hash) { let mut block_num = from; diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 10eaa2450518..0a01442922fe 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -1405,28 +1405,45 @@ where /// Get the set of downloaded blocks that are ready to be queued for import. fn ready_blocks(&mut self) -> Vec> { - self.blocks - .ready_blocks(self.best_queued_number + One::one()) - .into_iter() - .map(|block_data| { - let justifications = block_data - .block - .justifications - .or_else(|| legacy_justification_mapping(block_data.block.justification)); - IncomingBlock { - hash: block_data.block.hash, - header: block_data.block.header, - body: block_data.block.body, - indexed_body: block_data.block.indexed_body, - justifications, - origin: block_data.origin, - allow_missing_state: true, - import_existing: self.import_existing, - skip_execution: self.skip_execution(), - state: None, - } - }) - .collect() + let target_block = self.best_queued_number + One::one(); + + // Verify that the parent of the first available block is in the chain. + // If not, we are downloading from a fork. In this case, wait until + // the start block has a parent on chain. + let parent_on_chain = + self.blocks.first_ready_block_hdr(target_block).map_or(false, |hdr| { + let parent_status = + self.block_status(hdr.parent_hash()).unwrap_or(BlockStatus::Unknown); + parent_status == BlockStatus::InChainWithState || + parent_status == BlockStatus::Queued + }); + + if parent_on_chain { + self.blocks + .ready_blocks(target_block) + .into_iter() + .map(|block_data| { + let justifications = block_data + .block + .justifications + .or_else(|| legacy_justification_mapping(block_data.block.justification)); + IncomingBlock { + hash: block_data.block.hash, + header: block_data.block.header, + body: block_data.block.body, + indexed_body: block_data.block.indexed_body, + justifications, + origin: block_data.origin, + allow_missing_state: true, + import_existing: self.import_existing, + skip_execution: self.skip_execution(), + state: None, + } + }) + .collect() + } else { + vec![] + } } /// Set warp sync target block externally in case we skip warp proof downloading. @@ -3364,4 +3381,380 @@ mod test { pending_responses.remove(&peers[1]); assert_eq!(pending_responses.len(), 0); } + + #[test] + fn syncs_fork_with_partial_response_extends_tip() { + sp_tracing::try_init_simple(); + + // Set up: the two chains share the first 15 blocks before + // diverging. The other(canonical) chain fork is longer. + let max_blocks_per_request = 64; + let common_ancestor = 15; + let non_canonical_chain_length = common_ancestor + 3; + let canonical_chain_length = common_ancestor + max_blocks_per_request + 10; + + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); + let mut client = Arc::new(TestClientBuilder::new().build()); + + // Blocks on the non-canonical chain. + let non_canonical_blocks = (0..non_canonical_chain_length) + .map(|_| build_block(&mut client, None, false)) + .collect::>(); + + // Blocks on the canonical chain. + let canonical_blocks = { + let mut client = Arc::new(TestClientBuilder::new().build()); + let common_blocks = non_canonical_blocks[..common_ancestor as usize] + .into_iter() + .inspect(|b| block_on(client.import(BlockOrigin::Own, (*b).clone())).unwrap()) + .cloned() + .collect::>(); + + common_blocks + .into_iter() + .chain( + (0..(canonical_chain_length - common_ancestor as u32)) + .map(|_| build_block(&mut client, None, true)), + ) + .collect::>() + }; + + let mut sync = ChainSync::new( + SyncMode::Full, + client.clone(), + ProtocolName::from("test-block-announce-protocol"), + 1, + 64, + None, + chain_sync_network_handle, + ) + .unwrap(); + + // Connect the node we will sync from + let peer_id = PeerId::random(); + let canonical_tip = canonical_blocks.last().unwrap().clone(); + let mut request = sync + .new_peer(peer_id, canonical_tip.hash(), *canonical_tip.header().number()) + .unwrap() + .unwrap(); + assert_eq!(FromBlock::Number(client.info().best_number), request.from); + assert_eq!(Some(1), request.max); + + // Do the ancestor search + loop { + let block = + &canonical_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1]; + let response = create_block_response(vec![block.clone()]); + + let on_block_data = sync.on_block_data(&peer_id, Some(request), response).unwrap(); + request = if let OnBlockData::Request(_peer, request) = on_block_data { + request + } else { + // We found the ancestor + break + }; + + log::trace!(target: LOG_TARGET, "Request: {request:?}"); + } + + // The response for the 64 blocks is returned in two parts: + // part 1: last 61 blocks [19..79], part 2: first 3 blocks [16-18]. + // Even though the first part extends the current chain ending at 18, + // it should not result in an import yet. + let resp_1_from = common_ancestor as u64 + max_blocks_per_request as u64; + let resp_2_from = common_ancestor as u64 + 3; + + // No import expected. + let request = get_block_request( + &mut sync, + FromBlock::Number(resp_1_from), + max_blocks_per_request as u32, + &peer_id, + ); + + let from = unwrap_from_block_number(request.from.clone()); + let mut resp_blocks = canonical_blocks[18..from as usize].to_vec(); + resp_blocks.reverse(); + let response = create_block_response(resp_blocks.clone()); + let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); + assert!(matches!( + res, + OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty() + ),); + + // Gap filled, expect max_blocks_per_request being imported now. + let request = get_block_request(&mut sync, FromBlock::Number(resp_2_from), 3, &peer_id); + let mut resp_blocks = canonical_blocks[common_ancestor as usize..18].to_vec(); + resp_blocks.reverse(); + let response = create_block_response(resp_blocks.clone()); + let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); + let to_import: Vec<_> = match &res { + OnBlockData::Import(ImportBlocksAction { origin: _, blocks }) => { + assert_eq!(blocks.len(), sync.max_blocks_per_request as usize); + blocks + .iter() + .map(|b| { + let num = *b.header.as_ref().unwrap().number() as usize; + canonical_blocks[num - 1].clone() + }) + .collect() + }, + _ => { + panic!("Unexpected response: {res:?}"); + }, + }; + + let _ = sync.on_blocks_processed( + max_blocks_per_request as usize, + resp_blocks.len(), + resp_blocks + .iter() + .rev() + .map(|b| { + ( + Ok(BlockImportStatus::ImportedUnknown( + *b.header().number(), + Default::default(), + Some(peer_id), + )), + b.hash(), + ) + }) + .collect(), + ); + to_import.into_iter().for_each(|b| { + assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_)))); + block_on(client.import(BlockOrigin::Own, b)).unwrap(); + }); + let expected_number = common_ancestor as u32 + max_blocks_per_request as u32; + assert_eq!(sync.best_queued_number as u32, expected_number); + assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash()); + // Sync rest of the chain. + let request = + get_block_request(&mut sync, FromBlock::Hash(canonical_tip.hash()), 10_u32, &peer_id); + let mut resp_blocks = canonical_blocks + [(canonical_chain_length - 10) as usize..canonical_chain_length as usize] + .to_vec(); + resp_blocks.reverse(); + let response = create_block_response(resp_blocks.clone()); + let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); + assert!(matches!( + res, + OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 10 as usize + ),); + let _ = sync.on_blocks_processed( + max_blocks_per_request as usize, + resp_blocks.len(), + resp_blocks + .iter() + .rev() + .map(|b| { + ( + Ok(BlockImportStatus::ImportedUnknown( + *b.header().number(), + Default::default(), + Some(peer_id), + )), + b.hash(), + ) + }) + .collect(), + ); + resp_blocks.into_iter().rev().for_each(|b| { + assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_)))); + block_on(client.import(BlockOrigin::Own, b)).unwrap(); + }); + let expected_number = canonical_chain_length as u32; + assert_eq!(sync.best_queued_number as u32, expected_number); + assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash()); + } + + #[test] + fn syncs_fork_with_partial_response_does_not_extend_tip() { + sp_tracing::try_init_simple(); + + // Set up: the two chains share the first 15 blocks before + // diverging. The other(canonical) chain fork is longer. + let max_blocks_per_request = 64; + let common_ancestor = 15; + let non_canonical_chain_length = common_ancestor + 3; + let canonical_chain_length = common_ancestor + max_blocks_per_request + 10; + + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); + let mut client = Arc::new(TestClientBuilder::new().build()); + + // Blocks on the non-canonical chain. + let non_canonical_blocks = (0..non_canonical_chain_length) + .map(|_| build_block(&mut client, None, false)) + .collect::>(); + + // Blocks on the canonical chain. + let canonical_blocks = { + let mut client = Arc::new(TestClientBuilder::new().build()); + let common_blocks = non_canonical_blocks[..common_ancestor as usize] + .into_iter() + .inspect(|b| block_on(client.import(BlockOrigin::Own, (*b).clone())).unwrap()) + .cloned() + .collect::>(); + + common_blocks + .into_iter() + .chain( + (0..(canonical_chain_length - common_ancestor as u32)) + .map(|_| build_block(&mut client, None, true)), + ) + .collect::>() + }; + + let mut sync = ChainSync::new( + SyncMode::Full, + client.clone(), + ProtocolName::from("test-block-announce-protocol"), + 1, + 64, + None, + chain_sync_network_handle, + ) + .unwrap(); + + // Connect the node we will sync from + let peer_id = PeerId::random(); + let canonical_tip = canonical_blocks.last().unwrap().clone(); + let mut request = sync + .new_peer(peer_id, canonical_tip.hash(), *canonical_tip.header().number()) + .unwrap() + .unwrap(); + assert_eq!(FromBlock::Number(client.info().best_number), request.from); + assert_eq!(Some(1), request.max); + + // Do the ancestor search + loop { + let block = + &canonical_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1]; + let response = create_block_response(vec![block.clone()]); + + let on_block_data = sync.on_block_data(&peer_id, Some(request), response).unwrap(); + request = if let OnBlockData::Request(_peer, request) = on_block_data { + request + } else { + // We found the ancestor + break + }; + + log::trace!(target: LOG_TARGET, "Request: {request:?}"); + } + + // The response for the 64 blocks is returned in two parts: + // part 1: last 61 blocks [18..79], part 2: first 3 blocks [16-17]. + // Even though the first part extends the current chain ending at 18, + // it should not result in an import yet. + let resp_1_from = common_ancestor as u64 + max_blocks_per_request as u64; + let resp_2_from = common_ancestor as u64 + 2; + + // No import expected. + let request = get_block_request( + &mut sync, + FromBlock::Number(resp_1_from), + max_blocks_per_request as u32, + &peer_id, + ); + + let from = unwrap_from_block_number(request.from.clone()); + let mut resp_blocks = canonical_blocks[17..from as usize].to_vec(); + resp_blocks.reverse(); + let response = create_block_response(resp_blocks.clone()); + let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); + assert!(matches!( + res, + OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty() + ),); + + // Gap filled, expect max_blocks_per_request being imported now. + let request = get_block_request(&mut sync, FromBlock::Number(resp_2_from), 2, &peer_id); + let mut resp_blocks = canonical_blocks[common_ancestor as usize..17].to_vec(); + resp_blocks.reverse(); + let response = create_block_response(resp_blocks.clone()); + let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); + let to_import: Vec<_> = match &res { + OnBlockData::Import(ImportBlocksAction { origin: _, blocks }) => { + assert_eq!(blocks.len(), sync.max_blocks_per_request as usize); + blocks + .iter() + .map(|b| { + let num = *b.header.as_ref().unwrap().number() as usize; + canonical_blocks[num - 1].clone() + }) + .collect() + }, + _ => { + panic!("Unexpected response: {res:?}"); + }, + }; + + let _ = sync.on_blocks_processed( + max_blocks_per_request as usize, + resp_blocks.len(), + resp_blocks + .iter() + .rev() + .map(|b| { + ( + Ok(BlockImportStatus::ImportedUnknown( + *b.header().number(), + Default::default(), + Some(peer_id), + )), + b.hash(), + ) + }) + .collect(), + ); + to_import.into_iter().for_each(|b| { + assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_)))); + block_on(client.import(BlockOrigin::Own, b)).unwrap(); + }); + let expected_number = common_ancestor as u32 + max_blocks_per_request as u32; + assert_eq!(sync.best_queued_number as u32, expected_number); + assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash()); + // Sync rest of the chain. + let request = + get_block_request(&mut sync, FromBlock::Hash(canonical_tip.hash()), 10_u32, &peer_id); + let mut resp_blocks = canonical_blocks + [(canonical_chain_length - 10) as usize..canonical_chain_length as usize] + .to_vec(); + resp_blocks.reverse(); + let response = create_block_response(resp_blocks.clone()); + let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); + assert!(matches!( + res, + OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 10 as usize + ),); + let _ = sync.on_blocks_processed( + max_blocks_per_request as usize, + resp_blocks.len(), + resp_blocks + .iter() + .rev() + .map(|b| { + ( + Ok(BlockImportStatus::ImportedUnknown( + *b.header().number(), + Default::default(), + Some(peer_id), + )), + b.hash(), + ) + }) + .collect(), + ); + resp_blocks.into_iter().rev().for_each(|b| { + assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_)))); + block_on(client.import(BlockOrigin::Own, b)).unwrap(); + }); + let expected_number = canonical_chain_length as u32; + assert_eq!(sync.best_queued_number as u32, expected_number); + assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash()); + } } From 51c34309ea1e95d1710d08d04b018d6938c6e0c7 Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Sat, 7 Oct 2023 12:00:26 -0700 Subject: [PATCH 2/4] Address comments --- substrate/client/network/sync/src/lib.rs | 70 ++++++++++++------------ 1 file changed, 34 insertions(+), 36 deletions(-) diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 0a01442922fe..1addee773815 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -1405,45 +1405,43 @@ where /// Get the set of downloaded blocks that are ready to be queued for import. fn ready_blocks(&mut self) -> Vec> { - let target_block = self.best_queued_number + One::one(); + let start_block = self.best_queued_number + One::one(); // Verify that the parent of the first available block is in the chain. // If not, we are downloading from a fork. In this case, wait until // the start block has a parent on chain. - let parent_on_chain = - self.blocks.first_ready_block_hdr(target_block).map_or(false, |hdr| { - let parent_status = - self.block_status(hdr.parent_hash()).unwrap_or(BlockStatus::Unknown); - parent_status == BlockStatus::InChainWithState || - parent_status == BlockStatus::Queued - }); + let parent_on_chain = self.blocks.first_ready_block_hdr(start_block).map_or(false, |hdr| { + let parent_status = + self.block_status(hdr.parent_hash()).unwrap_or(BlockStatus::Unknown); + parent_status == BlockStatus::InChainWithState || parent_status == BlockStatus::Queued + }); - if parent_on_chain { - self.blocks - .ready_blocks(target_block) - .into_iter() - .map(|block_data| { - let justifications = block_data - .block - .justifications - .or_else(|| legacy_justification_mapping(block_data.block.justification)); - IncomingBlock { - hash: block_data.block.hash, - header: block_data.block.header, - body: block_data.block.body, - indexed_body: block_data.block.indexed_body, - justifications, - origin: block_data.origin, - allow_missing_state: true, - import_existing: self.import_existing, - skip_execution: self.skip_execution(), - state: None, - } - }) - .collect() - } else { - vec![] + if !parent_on_chain { + return vec![] } + + self.blocks + .ready_blocks(start_block) + .into_iter() + .map(|block_data| { + let justifications = block_data + .block + .justifications + .or_else(|| legacy_justification_mapping(block_data.block.justification)); + IncomingBlock { + hash: block_data.block.hash, + header: block_data.block.header, + body: block_data.block.body, + indexed_body: block_data.block.indexed_body, + justifications, + origin: block_data.origin, + allow_missing_state: true, + import_existing: self.import_existing, + skip_execution: self.skip_execution(), + state: None, + } + }) + .collect() } /// Set warp sync target block externally in case we skip warp proof downloading. @@ -3425,7 +3423,7 @@ mod test { client.clone(), ProtocolName::from("test-block-announce-protocol"), 1, - 64, + max_blocks_per_request, None, chain_sync_network_handle, ) @@ -3613,7 +3611,7 @@ mod test { client.clone(), ProtocolName::from("test-block-announce-protocol"), 1, - 64, + max_blocks_per_request, None, chain_sync_network_handle, ) @@ -3647,7 +3645,7 @@ mod test { } // The response for the 64 blocks is returned in two parts: - // part 1: last 61 blocks [18..79], part 2: first 3 blocks [16-17]. + // part 1: last 62 blocks [18..79], part 2: first 2 blocks [16-17]. // Even though the first part extends the current chain ending at 18, // it should not result in an import yet. let resp_1_from = common_ancestor as u64 + max_blocks_per_request as u64; From bba2f37b2692ddeb57b3c3bc26bc557f0393cafa Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Sat, 7 Oct 2023 12:11:17 -0700 Subject: [PATCH 3/4] Address more comments --- substrate/client/network/sync/src/blocks.rs | 2 +- substrate/client/network/sync/src/lib.rs | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/substrate/client/network/sync/src/blocks.rs b/substrate/client/network/sync/src/blocks.rs index 56019f768fc1..22861b259d7f 100644 --- a/substrate/client/network/sync/src/blocks.rs +++ b/substrate/client/network/sync/src/blocks.rs @@ -215,7 +215,7 @@ impl BlockCollection { /// Returns the block header of the first block that is ready for importing. /// `from` is the maximum block number for the start of the range that we are interested in. /// The function will return None if the first block ready is higher than `from`. - pub fn first_ready_block_hdr(&self, from: NumberFor) -> Option { + pub fn first_ready_block_header(&self, from: NumberFor) -> Option { let mut prev = from; for (&start, range_data) in &self.blocks { if start > prev { diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 1addee773815..9c93d8fa30ca 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -1410,11 +1410,13 @@ where // Verify that the parent of the first available block is in the chain. // If not, we are downloading from a fork. In this case, wait until // the start block has a parent on chain. - let parent_on_chain = self.blocks.first_ready_block_hdr(start_block).map_or(false, |hdr| { - let parent_status = - self.block_status(hdr.parent_hash()).unwrap_or(BlockStatus::Unknown); - parent_status == BlockStatus::InChainWithState || parent_status == BlockStatus::Queued - }); + let parent_on_chain = + self.blocks.first_ready_block_header(start_block).map_or(false, |hdr| { + std::matches!( + self.block_status(hdr.parent_hash()).unwrap_or(BlockStatus::Unknown), + BlockStatus::InChainWithState | BlockStatus::Queued + ) + }); if !parent_on_chain { return vec![] From 91bac2f61219afc97c83fef6d09da2075e90836c Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam Date: Mon, 9 Oct 2023 08:11:55 -0700 Subject: [PATCH 4/4] Include InChainPruned in parent check --- substrate/client/network/sync/src/blocks.rs | 1 + substrate/client/network/sync/src/lib.rs | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/substrate/client/network/sync/src/blocks.rs b/substrate/client/network/sync/src/blocks.rs index 22861b259d7f..cad50fef3e32 100644 --- a/substrate/client/network/sync/src/blocks.rs +++ b/substrate/client/network/sync/src/blocks.rs @@ -215,6 +215,7 @@ impl BlockCollection { /// Returns the block header of the first block that is ready for importing. /// `from` is the maximum block number for the start of the range that we are interested in. /// The function will return None if the first block ready is higher than `from`. + /// The logic is structured to be consistent with ready_blocks(). pub fn first_ready_block_header(&self, from: NumberFor) -> Option { let mut prev = from; for (&start, range_data) in &self.blocks { diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 9c93d8fa30ca..a291da4a90d5 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -1414,7 +1414,9 @@ where self.blocks.first_ready_block_header(start_block).map_or(false, |hdr| { std::matches!( self.block_status(hdr.parent_hash()).unwrap_or(BlockStatus::Unknown), - BlockStatus::InChainWithState | BlockStatus::Queued + BlockStatus::InChainWithState | + BlockStatus::InChainPruned | + BlockStatus::Queued ) });