diff --git a/primitives/consensus/common/src/import_queue/basic_queue.rs b/primitives/consensus/common/src/import_queue/basic_queue.rs index 77cb49abf5e06..7389db8cc2f10 100644 --- a/primitives/consensus/common/src/import_queue/basic_queue.rs +++ b/primitives/consensus/common/src/import_queue/basic_queue.rs @@ -36,8 +36,10 @@ use crate::{ /// Interface to a basic block import queue that is importing blocks sequentially in a separate /// task, with plugable verification. pub struct BasicQueue { - /// Channel to send messages to the background task. - sender: TracingUnboundedSender>, + /// Channel to send finality work messages to the background task. + finality_sender: TracingUnboundedSender>, + /// Channel to send block import messages to the background task. + block_import_sender: TracingUnboundedSender>, /// Results coming from the worker task. result_port: BufferedLinkReceiver, _phantom: PhantomData, @@ -46,7 +48,8 @@ pub struct BasicQueue { impl Drop for BasicQueue { fn drop(&mut self) { // Flush the queue and close the receiver to terminate the future. - self.sender.close_channel(); + self.finality_sender.close_channel(); + self.block_import_sender.close_channel(); self.result_port.close(); } } @@ -65,12 +68,16 @@ impl BasicQueue { prometheus_registry: Option<&Registry>, ) -> Self { let (result_sender, result_port) = buffered_link::buffered_link(); - let metrics = prometheus_registry.and_then(|r| + + let metrics = prometheus_registry.and_then(|r| { Metrics::register(r) - .map_err(|err| { log::warn!("Failed to register Prometheus metrics: {}", err); }) - .ok() - ); - let (future, worker_sender) = BlockImportWorker::new( + .map_err(|err| { + log::warn!("Failed to register Prometheus metrics: {}", err); + }) + .ok() + }); + + let (future, finality_sender, block_import_sender) = BlockImportWorker::new( result_sender, verifier, block_import, @@ -82,7 +89,8 @@ impl BasicQueue { spawner.spawn_blocking("basic-block-import-worker", future.boxed()); Self { - sender: worker_sender, + finality_sender, + block_import_sender, result_port, _phantom: PhantomData, } @@ -96,7 +104,9 @@ impl ImportQueue for BasicQueue } trace!(target: "sync", "Scheduling {} blocks for import", blocks.len()); - let res = self.sender.unbounded_send(ToWorkerMsg::ImportBlocks(origin, blocks)); + let res = + self.block_import_sender.unbounded_send(worker_messages::ImportBlocks(origin, blocks)); + if res.is_err() { log::error!( target: "sync", @@ -110,12 +120,12 @@ impl ImportQueue for BasicQueue who: Origin, hash: B::Hash, number: NumberFor, - justification: Justification + justification: Justification, ) { - let res = self.sender - .unbounded_send( - ToWorkerMsg::ImportJustification(who, hash, number, justification) - ); + let res = self.finality_sender.unbounded_send( + worker_messages::Finality::ImportJustification(who, hash, number, justification), + ); + if res.is_err() { log::error!( target: "sync", @@ -132,10 +142,10 @@ impl ImportQueue for BasicQueue finality_proof: Vec, ) { trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash); - let res = self.sender - .unbounded_send( - ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof) - ); + let res = self.finality_sender.unbounded_send( + worker_messages::Finality::ImportFinalityProof(who, hash, number, finality_proof), + ); + if res.is_err() { log::error!( target: "sync", @@ -151,12 +161,16 @@ impl ImportQueue for BasicQueue } } -/// Message destinated to the background worker. -#[derive(Debug)] -enum ToWorkerMsg { - ImportBlocks(BlockOrigin, Vec>), - ImportJustification(Origin, B::Hash, NumberFor, Justification), - ImportFinalityProof(Origin, B::Hash, NumberFor, Vec), +/// Messages destinated to the background worker. +mod worker_messages { + use super::*; + + pub struct ImportBlocks(pub BlockOrigin, pub Vec>); + + pub enum Finality { + ImportJustification(Origin, B::Hash, NumberFor, Justification), + ImportFinalityProof(Origin, B::Hash, NumberFor, Vec), + } } struct BlockImportWorker { @@ -176,8 +190,18 @@ impl BlockImportWorker { justification_import: Option>, finality_proof_import: Option>, metrics: Option, - ) -> (impl Future + Send, TracingUnboundedSender>) { - let (sender, mut port) = tracing_unbounded("mpsc_block_import_worker"); + ) -> ( + impl Future + Send, + TracingUnboundedSender>, + TracingUnboundedSender>, + ) { + use worker_messages::*; + + let (finality_sender, mut finality_port) = + tracing_unbounded("mpsc_import_queue_worker_finality"); + + let (block_import_sender, mut block_import_port) = + tracing_unbounded("mpsc_import_queue_worker_blocks"); let mut worker = BlockImportWorker { result_sender, @@ -206,6 +230,8 @@ impl BlockImportWorker { // `Future`, and `block_import` is `None`. // - Something else, in which case `block_import` is `Some` and `importing` is None. // + // Additionally, the task will prioritize processing of finality work messages over + // block import messages, hence why two distinct channels are used. let mut block_import_verifier = Some((block_import, verifier)); let mut importing = None; @@ -217,7 +243,30 @@ impl BlockImportWorker { return Poll::Ready(()) } - // If we are in the process of importing a bunch of block, let's resume this + // Grab the next finality action request sent to the import queue. + let finality_work = match Stream::poll_next(Pin::new(&mut finality_port), cx) { + Poll::Ready(Some(msg)) => Some(msg), + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => None, + }; + + match finality_work { + Some(Finality::ImportFinalityProof(who, hash, number, proof)) => { + let (_, verif) = block_import_verifier + .as_mut() + .expect("block_import_verifier is always Some; qed"); + + worker.import_finality_proof(verif, who, hash, number, proof); + continue; + } + Some(Finality::ImportJustification(who, hash, number, justification)) => { + worker.import_justification(who, hash, number, justification); + continue; + } + None => {} + } + + // If we are in the process of importing a bunch of blocks, let's resume this // process before doing anything more. if let Some(imp_fut) = importing.as_mut() { match Future::poll(Pin::new(imp_fut), cx) { @@ -232,34 +281,25 @@ impl BlockImportWorker { debug_assert!(importing.is_none()); debug_assert!(block_import_verifier.is_some()); - // Grab the next action request sent to the import queue. - let msg = match Stream::poll_next(Pin::new(&mut port), cx) { - Poll::Ready(Some(msg)) => msg, - Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => return Poll::Pending, - }; + // Grab the next block import request sent to the import queue. + let ImportBlocks(origin, blocks) = + match Stream::poll_next(Pin::new(&mut block_import_port), cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => return Poll::Pending, + }; - match msg { - ToWorkerMsg::ImportBlocks(origin, blocks) => { - // On blocks import request, we merely *start* the process and store - // a `Future` into `importing`. - let (bi, verif) = block_import_verifier.take() - .expect("block_import_verifier is always Some; qed"); - importing = Some(worker.import_batch(bi, verif, origin, blocks)); - }, - ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => { - let (_, verif) = block_import_verifier.as_mut() - .expect("block_import_verifier is always Some; qed"); - worker.import_finality_proof(verif, who, hash, number, proof); - }, - ToWorkerMsg::ImportJustification(who, hash, number, justification) => { - worker.import_justification(who, hash, number, justification); - } - } + // On blocks import request, we merely *start* the process and store + // a `Future` into `importing`. + let (block_import, verifier) = block_import_verifier + .take() + .expect("block_import_verifier is always Some; qed"); + + importing = Some(worker.import_batch(block_import, verifier, origin, blocks)); } }); - (future, sender) + (future, finality_sender, block_import_sender) } /// Returns a `Future` that imports the given blocks and sends the results on