Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Parachain node should not recover blocks while syncing (#2462)
Browse files Browse the repository at this point in the history
  • Loading branch information
skunert authored Apr 19, 2023
1 parent d403b49 commit eced0cb
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 13 deletions.
37 changes: 28 additions & 9 deletions client/pov-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};

use polkadot_node_primitives::{AvailableData, POV_BOMB_LIMIT};
Expand Down Expand Up @@ -228,6 +228,7 @@ pub struct PoVRecovery<Block: BlockT, PC, RC> {
recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
/// Blocks that we are retrying currently
candidates_in_retry: HashSet<Block::Hash>,
parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
}

impl<Block: BlockT, PC, RCInterface> PoVRecovery<Block, PC, RCInterface>
Expand All @@ -244,6 +245,7 @@ where
relay_chain_interface: RCInterface,
para_id: ParaId,
recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
) -> Self {
Self {
candidates: HashMap::new(),
Expand All @@ -256,6 +258,7 @@ where
para_id,
candidates_in_retry: HashSet::new(),
recovery_chan_rx,
parachain_sync_service,
}
}

Expand Down Expand Up @@ -538,14 +541,19 @@ where
pub async fn run(mut self) {
let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse();
let pending_candidates =
match pending_candidates(self.relay_chain_interface.clone(), self.para_id).await {
Ok(pending_candidate_stream) => pending_candidate_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
return
},
};
let pending_candidates = match pending_candidates(
self.relay_chain_interface.clone(),
self.para_id,
self.parachain_sync_service.clone(),
)
.await
{
Ok(pending_candidate_stream) => pending_candidate_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
return
},
};

futures::pin_mut!(pending_candidates);

Expand Down Expand Up @@ -600,13 +608,24 @@ where
async fn pending_candidates(
relay_chain_client: impl RelayChainInterface + Clone,
para_id: ParaId,
sync_service: Arc<dyn SyncOracle + Sync + Send>,
) -> RelayChainResult<impl Stream<Item = (CommittedCandidateReceipt, SessionIndex)>> {
let import_notification_stream = relay_chain_client.import_notification_stream().await?;

let filtered_stream = import_notification_stream.filter_map(move |n| {
let client_for_closure = relay_chain_client.clone();
let sync_oracle = sync_service.clone();
async move {
let hash = n.hash();
if sync_oracle.is_major_syncing() {
tracing::debug!(
target: LOG_TARGET,
relay_hash = ?hash,
"Skipping candidate due to sync.",
);
return None
}

let pending_availability_result = client_for_closure
.candidate_pending_availability(hash, para_id)
.await
Expand Down
6 changes: 6 additions & 0 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawn
pub collator_key: CollatorPair,
pub relay_chain_slot_duration: Duration,
pub recovery_handle: Box<dyn RecoveryHandle>,
pub sync_service: Arc<SyncingService<Block>>,
}

/// Start a collator node for a parachain.
Expand All @@ -91,6 +92,7 @@ pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner
collator_key,
relay_chain_slot_duration,
recovery_handle,
sync_service,
}: StartCollatorParams<'a, Block, BS, Client, RCInterface, Spawner>,
) -> sc_service::error::Result<()>
where
Expand Down Expand Up @@ -136,6 +138,7 @@ where
relay_chain_interface.clone(),
para_id,
recovery_chan_rx,
sync_service,
);

task_manager
Expand Down Expand Up @@ -170,6 +173,7 @@ pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface> {
pub relay_chain_slot_duration: Duration,
pub import_queue: Box<dyn ImportQueueService<Block>>,
pub recovery_handle: Box<dyn RecoveryHandle>,
pub sync_service: Arc<SyncingService<Block>>,
}

/// Start a full node for a parachain.
Expand All @@ -186,6 +190,7 @@ pub fn start_full_node<Block, Client, Backend, RCInterface>(
relay_chain_slot_duration,
import_queue,
recovery_handle,
sync_service,
}: StartFullNodeParams<Block, Client, RCInterface>,
) -> sc_service::error::Result<()>
where
Expand Down Expand Up @@ -231,6 +236,7 @@ where
relay_chain_interface,
para_id,
recovery_chan_rx,
sync_service,
);

task_manager
Expand Down
4 changes: 3 additions & 1 deletion parachain-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ async fn start_node_impl(
&task_manager,
relay_chain_interface.clone(),
transaction_pool,
sync_service,
sync_service.clone(),
params.keystore_container.keystore(),
force_authoring,
para_id,
Expand All @@ -291,6 +291,7 @@ async fn start_node_impl(
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
relay_chain_slot_duration,
recovery_handle: Box::new(overseer_handle),
sync_service,
};

start_collator(params).await?;
Expand All @@ -304,6 +305,7 @@ async fn start_node_impl(
relay_chain_slot_duration,
import_queue: import_queue_service,
recovery_handle: Box::new(overseer_handle),
sync_service,
};

start_full_node(params)?;
Expand Down
12 changes: 9 additions & 3 deletions polkadot-parachain/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ where
&task_manager,
relay_chain_interface.clone(),
transaction_pool,
sync_service,
sync_service.clone(),
params.keystore_container.keystore(),
force_authoring,
)?;
Expand All @@ -480,6 +480,7 @@ where
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
relay_chain_slot_duration,
recovery_handle: Box::new(overseer_handle),
sync_service,
};

start_collator(params).await?;
Expand All @@ -493,6 +494,7 @@ where
relay_chain_slot_duration,
import_queue: import_queue_service,
recovery_handle: Box::new(overseer_handle),
sync_service,
};

start_full_node(params)?;
Expand Down Expand Up @@ -659,7 +661,7 @@ where
&task_manager,
relay_chain_interface.clone(),
transaction_pool,
sync_service,
sync_service.clone(),
params.keystore_container.keystore(),
force_authoring,
)?;
Expand All @@ -679,6 +681,7 @@ where
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
relay_chain_slot_duration,
recovery_handle: Box::new(overseer_handle),
sync_service,
};

start_collator(params).await?;
Expand All @@ -692,6 +695,7 @@ where
relay_chain_slot_duration,
import_queue: import_queue_service,
recovery_handle: Box::new(overseer_handle),
sync_service,
};

start_full_node(params)?;
Expand Down Expand Up @@ -1429,7 +1433,7 @@ where
&task_manager,
relay_chain_interface.clone(),
transaction_pool,
sync_service,
sync_service.clone(),
params.keystore_container.keystore(),
force_authoring,
)?;
Expand All @@ -1449,6 +1453,7 @@ where
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
relay_chain_slot_duration,
recovery_handle: Box::new(overseer_handle),
sync_service,
};

start_collator(params).await?;
Expand All @@ -1462,6 +1467,7 @@ where
relay_chain_slot_duration,
import_queue: import_queue_service,
recovery_handle: Box::new(overseer_handle),
sync_service,
};

start_full_node(params)?;
Expand Down
2 changes: 2 additions & 0 deletions test/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ where
import_queue: import_queue_service,
relay_chain_slot_duration: Duration::from_secs(6),
recovery_handle,
sync_service,
};

start_collator(params).await?;
Expand All @@ -446,6 +447,7 @@ where
import_queue: import_queue_service,
relay_chain_slot_duration: Duration::from_secs(6),
recovery_handle,
sync_service,
};

start_full_node(params)?;
Expand Down

0 comments on commit eced0cb

Please sign in to comment.