diff --git a/primitives/consensus/common/Cargo.toml b/primitives/consensus/common/Cargo.toml index a6f8c01928d96..e8eaa06ee005f 100644 --- a/primitives/consensus/common/Cargo.toml +++ b/primitives/consensus/common/Cargo.toml @@ -36,6 +36,7 @@ prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../.. wasm-timer = "0.2.4" [dev-dependencies] +futures = "0.3.4" sp-test-primitives = { version = "2.0.0", path = "../../test-primitives" } [features] diff --git a/primitives/consensus/common/src/import_queue/basic_queue.rs b/primitives/consensus/common/src/import_queue/basic_queue.rs index 77cb49abf5e06..ea0ca2cf3ee88 100644 --- a/primitives/consensus/common/src/import_queue/basic_queue.rs +++ b/primitives/consensus/common/src/import_queue/basic_queue.rs @@ -36,8 +36,10 @@ use crate::{ /// Interface to a basic block import queue that is importing blocks sequentially in a separate /// task, with plugable verification. pub struct BasicQueue { - /// Channel to send messages to the background task. - sender: TracingUnboundedSender>, + /// Channel to send finality work messages to the background task. + finality_sender: TracingUnboundedSender>, + /// Channel to send block import messages to the background task. + block_import_sender: TracingUnboundedSender>, /// Results coming from the worker task. result_port: BufferedLinkReceiver, _phantom: PhantomData, @@ -46,7 +48,8 @@ pub struct BasicQueue { impl Drop for BasicQueue { fn drop(&mut self) { // Flush the queue and close the receiver to terminate the future. - self.sender.close_channel(); + self.finality_sender.close_channel(); + self.block_import_sender.close_channel(); self.result_port.close(); } } @@ -65,12 +68,16 @@ impl BasicQueue { prometheus_registry: Option<&Registry>, ) -> Self { let (result_sender, result_port) = buffered_link::buffered_link(); - let metrics = prometheus_registry.and_then(|r| + + let metrics = prometheus_registry.and_then(|r| { Metrics::register(r) - .map_err(|err| { log::warn!("Failed to register Prometheus metrics: {}", err); }) - .ok() - ); - let (future, worker_sender) = BlockImportWorker::new( + .map_err(|err| { + log::warn!("Failed to register Prometheus metrics: {}", err); + }) + .ok() + }); + + let (future, finality_sender, block_import_sender) = BlockImportWorker::new( result_sender, verifier, block_import, @@ -82,7 +89,8 @@ impl BasicQueue { spawner.spawn_blocking("basic-block-import-worker", future.boxed()); Self { - sender: worker_sender, + finality_sender, + block_import_sender, result_port, _phantom: PhantomData, } @@ -96,7 +104,9 @@ impl ImportQueue for BasicQueue } trace!(target: "sync", "Scheduling {} blocks for import", blocks.len()); - let res = self.sender.unbounded_send(ToWorkerMsg::ImportBlocks(origin, blocks)); + let res = + self.block_import_sender.unbounded_send(worker_messages::ImportBlocks(origin, blocks)); + if res.is_err() { log::error!( target: "sync", @@ -110,12 +120,12 @@ impl ImportQueue for BasicQueue who: Origin, hash: B::Hash, number: NumberFor, - justification: Justification + justification: Justification, ) { - let res = self.sender - .unbounded_send( - ToWorkerMsg::ImportJustification(who, hash, number, justification) - ); + let res = self.finality_sender.unbounded_send( + worker_messages::Finality::ImportJustification(who, hash, number, justification), + ); + if res.is_err() { log::error!( target: "sync", @@ -132,10 +142,10 @@ impl ImportQueue for BasicQueue finality_proof: Vec, ) { trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash); - let res = self.sender - .unbounded_send( - ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof) - ); + let res = self.finality_sender.unbounded_send( + worker_messages::Finality::ImportFinalityProof(who, hash, number, finality_proof), + ); + if res.is_err() { log::error!( target: "sync", @@ -151,12 +161,16 @@ impl ImportQueue for BasicQueue } } -/// Message destinated to the background worker. -#[derive(Debug)] -enum ToWorkerMsg { - ImportBlocks(BlockOrigin, Vec>), - ImportJustification(Origin, B::Hash, NumberFor, Justification), - ImportFinalityProof(Origin, B::Hash, NumberFor, Vec), +/// Messages destinated to the background worker. +mod worker_messages { + use super::*; + + pub struct ImportBlocks(pub BlockOrigin, pub Vec>); + + pub enum Finality { + ImportJustification(Origin, B::Hash, NumberFor, Justification), + ImportFinalityProof(Origin, B::Hash, NumberFor, Vec), + } } struct BlockImportWorker { @@ -176,8 +190,18 @@ impl BlockImportWorker { justification_import: Option>, finality_proof_import: Option>, metrics: Option, - ) -> (impl Future + Send, TracingUnboundedSender>) { - let (sender, mut port) = tracing_unbounded("mpsc_block_import_worker"); + ) -> ( + impl Future + Send, + TracingUnboundedSender>, + TracingUnboundedSender>, + ) { + use worker_messages::*; + + let (finality_sender, mut finality_port) = + tracing_unbounded("mpsc_import_queue_worker_finality"); + + let (block_import_sender, mut block_import_port) = + tracing_unbounded("mpsc_import_queue_worker_blocks"); let mut worker = BlockImportWorker { result_sender, @@ -206,6 +230,8 @@ impl BlockImportWorker { // `Future`, and `block_import` is `None`. // - Something else, in which case `block_import` is `Some` and `importing` is None. // + // Additionally, the task will prioritize processing of finality work messages over + // block import messages, hence why two distinct channels are used. let mut block_import_verifier = Some((block_import, verifier)); let mut importing = None; @@ -217,7 +243,30 @@ impl BlockImportWorker { return Poll::Ready(()) } - // If we are in the process of importing a bunch of block, let's resume this + // Grab the next finality action request sent to the import queue. + let finality_work = match Stream::poll_next(Pin::new(&mut finality_port), cx) { + Poll::Ready(Some(msg)) => Some(msg), + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => None, + }; + + match finality_work { + Some(Finality::ImportFinalityProof(who, hash, number, proof)) => { + let (_, verif) = block_import_verifier + .as_mut() + .expect("block_import_verifier is always Some; qed"); + + worker.import_finality_proof(verif, who, hash, number, proof); + continue; + } + Some(Finality::ImportJustification(who, hash, number, justification)) => { + worker.import_justification(who, hash, number, justification); + continue; + } + None => {} + } + + // If we are in the process of importing a bunch of blocks, let's resume this // process before doing anything more. if let Some(imp_fut) = importing.as_mut() { match Future::poll(Pin::new(imp_fut), cx) { @@ -232,34 +281,25 @@ impl BlockImportWorker { debug_assert!(importing.is_none()); debug_assert!(block_import_verifier.is_some()); - // Grab the next action request sent to the import queue. - let msg = match Stream::poll_next(Pin::new(&mut port), cx) { - Poll::Ready(Some(msg)) => msg, - Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => return Poll::Pending, - }; + // Grab the next block import request sent to the import queue. + let ImportBlocks(origin, blocks) = + match Stream::poll_next(Pin::new(&mut block_import_port), cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => return Poll::Pending, + }; - match msg { - ToWorkerMsg::ImportBlocks(origin, blocks) => { - // On blocks import request, we merely *start* the process and store - // a `Future` into `importing`. - let (bi, verif) = block_import_verifier.take() - .expect("block_import_verifier is always Some; qed"); - importing = Some(worker.import_batch(bi, verif, origin, blocks)); - }, - ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => { - let (_, verif) = block_import_verifier.as_mut() - .expect("block_import_verifier is always Some; qed"); - worker.import_finality_proof(verif, who, hash, number, proof); - }, - ToWorkerMsg::ImportJustification(who, hash, number, justification) => { - worker.import_justification(who, hash, number, justification); - } - } + // On blocks import request, we merely *start* the process and store + // a `Future` into `importing`. + let (block_import, verifier) = block_import_verifier + .take() + .expect("block_import_verifier is always Some; qed"); + + importing = Some(worker.import_batch(block_import, verifier, origin, blocks)); } }); - (future, sender) + (future, finality_sender, block_import_sender) } /// Returns a `Future` that imports the given blocks and sends the results on @@ -363,12 +403,11 @@ fn import_many_blocks, Transaction>( Output = ( usize, usize, - Vec<(Result>, BlockImportError>, B::Hash,)>, + Vec<(Result>, BlockImportError>, B::Hash)>, BoxBlockImport, - V - ) -> -{ + V, + ), +> { let count = blocks.len(); let blocks_range = match ( @@ -461,3 +500,187 @@ fn import_many_blocks, Transaction>( Poll::Pending }) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + import_queue::{CacheKeyId, Verifier}, + BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport, + }; + use futures::{executor::block_on, Future}; + use sp_test_primitives::{Block, BlockNumber, Extrinsic, Hash, Header}; + use std::collections::HashMap; + + impl Verifier for () { + fn verify( + &mut self, + origin: BlockOrigin, + header: Header, + _justification: Option, + _body: Option>, + ) -> Result<(BlockImportParams, Option)>>), String> { + Ok((BlockImportParams::new(origin, header), None)) + } + } + + impl BlockImport for () { + type Error = crate::Error; + type Transaction = Extrinsic; + + fn check_block( + &mut self, + _block: BlockCheckParams, + ) -> Result { + Ok(ImportResult::imported(false)) + } + + fn import_block( + &mut self, + _block: BlockImportParams, + _cache: HashMap>, + ) -> Result { + Ok(ImportResult::imported(true)) + } + } + + impl JustificationImport for () { + type Error = crate::Error; + + fn import_justification( + &mut self, + _hash: Hash, + _number: BlockNumber, + _justification: Justification, + ) -> Result<(), Self::Error> { + Ok(()) + } + } + + #[derive(Debug, PartialEq)] + enum Event { + JustificationImported(Hash), + BlockImported(Hash), + } + + #[derive(Default)] + struct TestLink { + events: Vec, + } + + impl Link for TestLink { + fn blocks_processed( + &mut self, + _imported: usize, + _count: usize, + results: Vec<(Result, BlockImportError>, Hash)>, + ) { + if let Some(hash) = results.into_iter().find_map(|(r, h)| r.ok().map(|_| h)) { + self.events.push(Event::BlockImported(hash)); + } + } + + fn justification_imported( + &mut self, + _who: Origin, + hash: &Hash, + _number: BlockNumber, + _success: bool, + ) { + self.events.push(Event::JustificationImported(hash.clone())) + } + } + + #[test] + fn prioritizes_finality_work_over_block_import() { + let (result_sender, mut result_port) = buffered_link::buffered_link(); + + let (mut worker, mut finality_sender, mut block_import_sender) = + BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None, None); + + let mut import_block = |n| { + let header = Header { + parent_hash: Hash::random(), + number: n, + extrinsics_root: Hash::random(), + state_root: Default::default(), + digest: Default::default(), + }; + + let hash = header.hash(); + + block_on(block_import_sender.send(worker_messages::ImportBlocks( + BlockOrigin::Own, + vec![IncomingBlock { + hash, + header: Some(header), + body: None, + justification: None, + origin: None, + allow_missing_state: false, + import_existing: false, + }], + ))) + .unwrap(); + + hash + }; + + let mut import_justification = || { + let hash = Hash::random(); + + block_on(finality_sender.send(worker_messages::Finality::ImportJustification( + libp2p::PeerId::random(), + hash, + 1, + Vec::new(), + ))) + .unwrap(); + + hash + }; + + let mut link = TestLink::default(); + + // we send a bunch of tasks to the worker + let block1 = import_block(1); + let block2 = import_block(2); + let block3 = import_block(3); + let justification1 = import_justification(); + let justification2 = import_justification(); + let block4 = import_block(4); + let block5 = import_block(5); + let block6 = import_block(6); + let justification3 = import_justification(); + + // we poll the worker until we have processed 9 events + block_on(futures::future::poll_fn(|cx| { + while link.events.len() < 9 { + match Future::poll(Pin::new(&mut worker), cx) { + Poll::Pending => {} + Poll::Ready(()) => panic!("import queue worker should not conclude."), + } + + result_port.poll_actions(cx, &mut link).unwrap(); + } + + Poll::Ready(()) + })); + + // all justification tasks must be done before any block import work + assert_eq!( + link.events, + vec![ + Event::JustificationImported(justification1), + Event::JustificationImported(justification2), + Event::JustificationImported(justification3), + Event::BlockImported(block1), + Event::BlockImported(block2), + Event::BlockImported(block3), + Event::BlockImported(block4), + Event::BlockImported(block5), + Event::BlockImported(block6), + ] + ); + } +}