From eced0cb3d55c04c3687ff0dc62c480d098f497b0 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Wed, 19 Apr 2023 16:02:35 +0200 Subject: [PATCH] Parachain node should not recover blocks while syncing (#2462) --- client/pov-recovery/src/lib.rs | 37 +++++++++++++++++++------- client/service/src/lib.rs | 6 +++++ parachain-template/node/src/service.rs | 4 ++- polkadot-parachain/src/service.rs | 12 ++++++--- test/service/src/lib.rs | 2 ++ 5 files changed, 48 insertions(+), 13 deletions(-) diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index 60fbdab310c..7d92934c784 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -47,7 +47,7 @@ use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider}; use sc_consensus::import_queue::{ImportQueueService, IncomingBlock}; -use sp_consensus::{BlockOrigin, BlockStatus}; +use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use polkadot_node_primitives::{AvailableData, POV_BOMB_LIMIT}; @@ -228,6 +228,7 @@ pub struct PoVRecovery { recovery_chan_rx: Receiver>, /// Blocks that we are retrying currently candidates_in_retry: HashSet, + parachain_sync_service: Arc, } impl PoVRecovery @@ -244,6 +245,7 @@ where relay_chain_interface: RCInterface, para_id: ParaId, recovery_chan_rx: Receiver>, + parachain_sync_service: Arc, ) -> Self { Self { candidates: HashMap::new(), @@ -256,6 +258,7 @@ where para_id, candidates_in_retry: HashSet::new(), recovery_chan_rx, + parachain_sync_service, } } @@ -538,14 +541,19 @@ where pub async fn run(mut self) { let mut imported_blocks = self.parachain_client.import_notification_stream().fuse(); let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse(); - let pending_candidates = - match pending_candidates(self.relay_chain_interface.clone(), self.para_id).await { - Ok(pending_candidate_stream) => pending_candidate_stream.fuse(), - Err(err) => { - tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream."); - return - }, - }; + let pending_candidates = match pending_candidates( + self.relay_chain_interface.clone(), + self.para_id, + self.parachain_sync_service.clone(), + ) + .await + { + Ok(pending_candidate_stream) => pending_candidate_stream.fuse(), + Err(err) => { + tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream."); + return + }, + }; futures::pin_mut!(pending_candidates); @@ -600,13 +608,24 @@ where async fn pending_candidates( relay_chain_client: impl RelayChainInterface + Clone, para_id: ParaId, + sync_service: Arc, ) -> RelayChainResult> { let import_notification_stream = relay_chain_client.import_notification_stream().await?; let filtered_stream = import_notification_stream.filter_map(move |n| { let client_for_closure = relay_chain_client.clone(); + let sync_oracle = sync_service.clone(); async move { let hash = n.hash(); + if sync_oracle.is_major_syncing() { + tracing::debug!( + target: LOG_TARGET, + relay_hash = ?hash, + "Skipping candidate due to sync.", + ); + return None + } + let pending_availability_result = client_for_closure .candidate_pending_availability(hash, para_id) .await diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 6f0d5790586..ec6fc5e3c30 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -70,6 +70,7 @@ pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawn pub collator_key: CollatorPair, pub relay_chain_slot_duration: Duration, pub recovery_handle: Box, + pub sync_service: Arc>, } /// Start a collator node for a parachain. @@ -91,6 +92,7 @@ pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner collator_key, relay_chain_slot_duration, recovery_handle, + sync_service, }: StartCollatorParams<'a, Block, BS, Client, RCInterface, Spawner>, ) -> sc_service::error::Result<()> where @@ -136,6 +138,7 @@ where relay_chain_interface.clone(), para_id, recovery_chan_rx, + sync_service, ); task_manager @@ -170,6 +173,7 @@ pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface> { pub relay_chain_slot_duration: Duration, pub import_queue: Box>, pub recovery_handle: Box, + pub sync_service: Arc>, } /// Start a full node for a parachain. @@ -186,6 +190,7 @@ pub fn start_full_node( relay_chain_slot_duration, import_queue, recovery_handle, + sync_service, }: StartFullNodeParams, ) -> sc_service::error::Result<()> where @@ -231,6 +236,7 @@ where relay_chain_interface, para_id, recovery_chan_rx, + sync_service, ); task_manager diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs index 6fc04ef91d5..4c9e1febf70 100644 --- a/parachain-template/node/src/service.rs +++ b/parachain-template/node/src/service.rs @@ -271,7 +271,7 @@ async fn start_node_impl( &task_manager, relay_chain_interface.clone(), transaction_pool, - sync_service, + sync_service.clone(), params.keystore_container.keystore(), force_authoring, para_id, @@ -291,6 +291,7 @@ async fn start_node_impl( collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, recovery_handle: Box::new(overseer_handle), + sync_service, }; start_collator(params).await?; @@ -304,6 +305,7 @@ async fn start_node_impl( relay_chain_slot_duration, import_queue: import_queue_service, recovery_handle: Box::new(overseer_handle), + sync_service, }; start_full_node(params)?; diff --git a/polkadot-parachain/src/service.rs b/polkadot-parachain/src/service.rs index 000ebef80fd..803d0987844 100644 --- a/polkadot-parachain/src/service.rs +++ b/polkadot-parachain/src/service.rs @@ -460,7 +460,7 @@ where &task_manager, relay_chain_interface.clone(), transaction_pool, - sync_service, + sync_service.clone(), params.keystore_container.keystore(), force_authoring, )?; @@ -480,6 +480,7 @@ where collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, recovery_handle: Box::new(overseer_handle), + sync_service, }; start_collator(params).await?; @@ -493,6 +494,7 @@ where relay_chain_slot_duration, import_queue: import_queue_service, recovery_handle: Box::new(overseer_handle), + sync_service, }; start_full_node(params)?; @@ -659,7 +661,7 @@ where &task_manager, relay_chain_interface.clone(), transaction_pool, - sync_service, + sync_service.clone(), params.keystore_container.keystore(), force_authoring, )?; @@ -679,6 +681,7 @@ where collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, recovery_handle: Box::new(overseer_handle), + sync_service, }; start_collator(params).await?; @@ -692,6 +695,7 @@ where relay_chain_slot_duration, import_queue: import_queue_service, recovery_handle: Box::new(overseer_handle), + sync_service, }; start_full_node(params)?; @@ -1429,7 +1433,7 @@ where &task_manager, relay_chain_interface.clone(), transaction_pool, - sync_service, + sync_service.clone(), params.keystore_container.keystore(), force_authoring, )?; @@ -1449,6 +1453,7 @@ where collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, recovery_handle: Box::new(overseer_handle), + sync_service, }; start_collator(params).await?; @@ -1462,6 +1467,7 @@ where relay_chain_slot_duration, import_queue: import_queue_service, recovery_handle: Box::new(overseer_handle), + sync_service, }; start_full_node(params)?; diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index 5b38a53afb4..84d5636f9b1 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -433,6 +433,7 @@ where import_queue: import_queue_service, relay_chain_slot_duration: Duration::from_secs(6), recovery_handle, + sync_service, }; start_collator(params).await?; @@ -446,6 +447,7 @@ where import_queue: import_queue_service, relay_chain_slot_duration: Duration::from_secs(6), recovery_handle, + sync_service, }; start_full_node(params)?;