diff --git a/network/src/protocol/mod.rs b/network/src/protocol/mod.rs index d9257650eb8f..0ed2d9ac4395 100644 --- a/network/src/protocol/mod.rs +++ b/network/src/protocol/mod.rs @@ -503,7 +503,10 @@ impl ProtocolHandler { } fn on_connect(&mut self, peer: PeerId, role: ObservedRole) { - let claimed_validator = matches!(role, ObservedRole::OurSentry | ObservedRole::OurGuardedAuthority | ObservedRole::Authority); + let claimed_validator = matches!( + role, + ObservedRole::OurSentry | ObservedRole::OurGuardedAuthority | ObservedRole::Authority + ); self.peers.insert(peer.clone(), PeerData { claimed_validator, @@ -978,7 +981,11 @@ impl Worker where ); } ServiceToWorkerMsg::AwaitCollation(relay_parent, para_id, sender) => { - debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent); + debug!( + target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", + para_id, + relay_parent, + ); self.protocol_handler.await_collation(relay_parent, para_id, sender) } ServiceToWorkerMsg::NoteBadCollator(collator) => { diff --git a/validation/src/error.rs b/validation/src/error.rs index d3b83176d931..834270151f0b 100644 --- a/validation/src/error.rs +++ b/validation/src/error.rs @@ -47,6 +47,9 @@ pub enum Error { /// Proposer destroyed before finishing proposing or evaluating #[display(fmt = "Proposer destroyed before finishing proposing or evaluating")] PrematureDestruction, + /// Failed to build the table router. + #[display(fmt = "Failed to build the table router: {}", _0)] + CouldNotBuildTableRouter(String), /// Timer failed #[display(fmt = "Timer failed: {}", _0)] Timer(std::io::Error), diff --git a/validation/src/validation_service/mod.rs b/validation/src/validation_service/mod.rs index 9bd9dd7242e6..a19e2f0e3bbf 100644 --- a/validation/src/validation_service/mod.rs +++ b/validation/src/validation_service/mod.rs @@ -208,7 +208,7 @@ impl ServiceBuilder where relay_parent, &keystore, max_block_data_size, - )); + ).await); } Message::NotifyImport(notification) => { let relay_parent = notification.hash; @@ -217,7 +217,7 @@ impl ServiceBuilder where relay_parent, &keystore, max_block_data_size, - ); + ).await; if let Err(e) = res { warn!( @@ -299,8 +299,17 @@ fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option { + instance_handle: ValidationInstanceHandle, + /// Make sure we keep the table router alive, to respond/receive consensus messages. + _table_router: TR, +} + /// Constructs parachain-agreement instances. -pub(crate) struct ParachainValidationInstances { +pub(crate) struct ParachainValidationInstances { /// The client instance. client: Arc

, /// The backing network handle. @@ -311,7 +320,7 @@ pub(crate) struct ParachainValidationInstances { availability_store: AvailabilityStore, /// Live agreements. Maps relay chain parent hashes to attestation /// instances. - live_instances: HashMap, + live_instances: HashMap>, /// The underlying validation pool of processes to use. /// Only `None` in tests. validation_pool: Option, @@ -339,7 +348,7 @@ impl ParachainValidationInstances where /// /// Additionally, this will trigger broadcast of data to the new block's duty /// roster. - fn get_or_instantiate( + async fn get_or_instantiate( &mut self, parent_hash: Hash, keystore: &KeyStorePtr, @@ -347,8 +356,8 @@ impl ParachainValidationInstances where ) -> Result { use primitives::Pair; - if let Some(tracker) = self.live_instances.get(&parent_hash) { - return Ok(tracker.clone()); + if let Some(instance) = self.live_instances.get(&parent_hash) { + return Ok(instance.instance_handle.clone()); } let id = BlockId::hash(parent_hash); @@ -417,41 +426,39 @@ impl ParachainValidationInstances where self.validation_pool.clone(), )); - let build_router = self.network.build_table_router( + // The router will join the consensus gossip network. This is important + // to receive messages sent for the current round. + let router = match self.network.build_table_router( table.clone(), &validators, - ); - - let availability_store = self.availability_store.clone(); - let client = self.client.clone(); - let collation_fetch = self.collation_fetch.clone(); - - let res = self.spawner.spawn(async move { - // It is important that we build the router as it launches tasks internally - // that are required to receive gossip messages. - let router = match build_router.await { - Ok(res) => res, - Err(e) => { - warn!(target: "validation", "Failed to build router: {:?}", e); - return - } - }; + ).await { + Ok(res) => res, + Err(e) => { + warn!(target: "validation", "Failed to build router: {:?}", e); + return Err(Error::CouldNotBuildTableRouter(format!("{:?}", e))) + } + }; - if let Some((Chain::Parachain(id), index)) = local_duty.map(|d| (d.validation, d.index)) { - let n_validators = validators.len(); + if let Some((Chain::Parachain(id), index)) = local_duty.map(|d| (d.validation, d.index)) { + let n_validators = validators.len(); + let availability_store = self.availability_store.clone(); + let client = self.client.clone(); + let collation_fetch = self.collation_fetch.clone(); + let router = router.clone(); + let res = self.spawner.spawn( launch_work( move || collation_fetch.collation_fetch(id, parent_hash, client, max_block_data_size, n_validators), availability_store, router, n_validators, index, - ).await; - } - }); + ), + ); - if let Err(e) = res { - error!(target: "validation", "Failed to create router and launch work: {:?}", e); + if let Err(e) = res { + error!(target: "validation", "Failed to launch work: {:?}", e); + } } let tracker = ValidationInstanceHandle { @@ -459,7 +466,11 @@ impl ParachainValidationInstances where started: Instant::now(), }; - self.live_instances.insert(parent_hash, tracker.clone()); + let live_instance = LiveInstance { + instance_handle: tracker.clone(), + _table_router: router, + }; + self.live_instances.insert(parent_hash, live_instance); Ok(tracker) } @@ -721,8 +732,9 @@ mod tests { validation_pool: None, }; - parachain_validation.get_or_instantiate(Default::default(), &keystore, None) + executor::block_on(parachain_validation.get_or_instantiate(Default::default(), &keystore, None)) .expect("Creates new validation round"); + assert!(parachain_validation.live_instances.contains_key(&Default::default())); let mut events = executor::block_on_stream(events); @@ -760,8 +772,9 @@ mod tests { validation_pool: None, }; - parachain_validation.get_or_instantiate(Default::default(), &keystore, None) + executor::block_on(parachain_validation.get_or_instantiate(Default::default(), &keystore, None)) .expect("Creates new validation round"); + assert!(parachain_validation.live_instances.contains_key(&Default::default())); let mut events = executor::block_on_stream(events);