Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
consensus: prioritize finality work over block import in queue
Browse files Browse the repository at this point in the history
  • Loading branch information
andresilva committed Oct 12, 2020
1 parent 8efe8b8 commit 2f57ae8
Showing 1 changed file with 92 additions and 52 deletions.
144 changes: 92 additions & 52 deletions primitives/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ use crate::{
/// Interface to a basic block import queue that is importing blocks sequentially in a separate
/// task, with plugable verification.
pub struct BasicQueue<B: BlockT, Transaction> {
/// Channel to send messages to the background task.
sender: TracingUnboundedSender<ToWorkerMsg<B>>,
/// Channel to send finality work messages to the background task.
finality_sender: TracingUnboundedSender<worker_messages::Finality<B>>,
/// Channel to send block import messages to the background task.
block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
/// Results coming from the worker task.
result_port: BufferedLinkReceiver<B>,
_phantom: PhantomData<Transaction>,
Expand All @@ -46,7 +48,8 @@ pub struct BasicQueue<B: BlockT, Transaction> {
impl<B: BlockT, Transaction> Drop for BasicQueue<B, Transaction> {
fn drop(&mut self) {
// Flush the queue and close the receiver to terminate the future.
self.sender.close_channel();
self.finality_sender.close_channel();
self.block_import_sender.close_channel();
self.result_port.close();
}
}
Expand All @@ -65,12 +68,16 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
prometheus_registry: Option<&Registry>,
) -> Self {
let (result_sender, result_port) = buffered_link::buffered_link();
let metrics = prometheus_registry.and_then(|r|

let metrics = prometheus_registry.and_then(|r| {
Metrics::register(r)
.map_err(|err| { log::warn!("Failed to register Prometheus metrics: {}", err); })
.ok()
);
let (future, worker_sender) = BlockImportWorker::new(
.map_err(|err| {
log::warn!("Failed to register Prometheus metrics: {}", err);
})
.ok()
});

let (future, finality_sender, block_import_sender) = BlockImportWorker::new(
result_sender,
verifier,
block_import,
Expand All @@ -82,7 +89,8 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
spawner.spawn_blocking("basic-block-import-worker", future.boxed());

Self {
sender: worker_sender,
finality_sender,
block_import_sender,
result_port,
_phantom: PhantomData,
}
Expand All @@ -96,7 +104,9 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
}

trace!(target: "sync", "Scheduling {} blocks for import", blocks.len());
let res = self.sender.unbounded_send(ToWorkerMsg::ImportBlocks(origin, blocks));
let res =
self.block_import_sender.unbounded_send(worker_messages::ImportBlocks(origin, blocks));

if res.is_err() {
log::error!(
target: "sync",
Expand All @@ -110,12 +120,12 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
who: Origin,
hash: B::Hash,
number: NumberFor<B>,
justification: Justification
justification: Justification,
) {
let res = self.sender
.unbounded_send(
ToWorkerMsg::ImportJustification(who, hash, number, justification)
);
let res = self.finality_sender.unbounded_send(
worker_messages::Finality::ImportJustification(who, hash, number, justification),
);

if res.is_err() {
log::error!(
target: "sync",
Expand All @@ -132,10 +142,10 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
finality_proof: Vec<u8>,
) {
trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash);
let res = self.sender
.unbounded_send(
ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof)
);
let res = self.finality_sender.unbounded_send(
worker_messages::Finality::ImportFinalityProof(who, hash, number, finality_proof),
);

if res.is_err() {
log::error!(
target: "sync",
Expand All @@ -151,12 +161,16 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
}
}

/// Message destinated to the background worker.
#[derive(Debug)]
enum ToWorkerMsg<B: BlockT> {
ImportBlocks(BlockOrigin, Vec<IncomingBlock<B>>),
ImportJustification(Origin, B::Hash, NumberFor<B>, Justification),
ImportFinalityProof(Origin, B::Hash, NumberFor<B>, Vec<u8>),
/// Messages destinated to the background worker.
mod worker_messages {
use super::*;

pub struct ImportBlocks<B: BlockT>(pub BlockOrigin, pub Vec<IncomingBlock<B>>);

pub enum Finality<B: BlockT> {
ImportJustification(Origin, B::Hash, NumberFor<B>, Justification),
ImportFinalityProof(Origin, B::Hash, NumberFor<B>, Vec<u8>),
}
}

struct BlockImportWorker<B: BlockT, Transaction> {
Expand All @@ -176,8 +190,18 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
metrics: Option<Metrics>,
) -> (impl Future<Output = ()> + Send, TracingUnboundedSender<ToWorkerMsg<B>>) {
let (sender, mut port) = tracing_unbounded("mpsc_block_import_worker");
) -> (
impl Future<Output = ()> + Send,
TracingUnboundedSender<worker_messages::Finality<B>>,
TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
) {
use worker_messages::*;

let (finality_sender, mut finality_port) =
tracing_unbounded("mpsc_import_queue_worker_finality");

let (block_import_sender, mut block_import_port) =
tracing_unbounded("mpsc_import_queue_worker_blocks");

let mut worker = BlockImportWorker {
result_sender,
Expand Down Expand Up @@ -206,6 +230,8 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
// `Future`, and `block_import` is `None`.
// - Something else, in which case `block_import` is `Some` and `importing` is None.
//
// Additionally, the task will prioritize processing of finality work messages over
// block import messages, hence why two distinct channels are used.
let mut block_import_verifier = Some((block_import, verifier));
let mut importing = None;

Expand All @@ -217,7 +243,30 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
return Poll::Ready(())
}

// If we are in the process of importing a bunch of block, let's resume this
// Grab the next finality action request sent to the import queue.
let finality_work = match Stream::poll_next(Pin::new(&mut finality_port), cx) {
Poll::Ready(Some(msg)) => Some(msg),
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => None,
};

match finality_work {
Some(Finality::ImportFinalityProof(who, hash, number, proof)) => {
let (_, verif) = block_import_verifier
.as_mut()
.expect("block_import_verifier is always Some; qed");

worker.import_finality_proof(verif, who, hash, number, proof);
continue;
}
Some(Finality::ImportJustification(who, hash, number, justification)) => {
worker.import_justification(who, hash, number, justification);
continue;
}
None => {}
}

// If we are in the process of importing a bunch of blocks, let's resume this
// process before doing anything more.
if let Some(imp_fut) = importing.as_mut() {
match Future::poll(Pin::new(imp_fut), cx) {
Expand All @@ -232,34 +281,25 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
debug_assert!(importing.is_none());
debug_assert!(block_import_verifier.is_some());

// Grab the next action request sent to the import queue.
let msg = match Stream::poll_next(Pin::new(&mut port), cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
};
// Grab the next block import request sent to the import queue.
let ImportBlocks(origin, blocks) =
match Stream::poll_next(Pin::new(&mut block_import_port), cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
};

match msg {
ToWorkerMsg::ImportBlocks(origin, blocks) => {
// On blocks import request, we merely *start* the process and store
// a `Future` into `importing`.
let (bi, verif) = block_import_verifier.take()
.expect("block_import_verifier is always Some; qed");
importing = Some(worker.import_batch(bi, verif, origin, blocks));
},
ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => {
let (_, verif) = block_import_verifier.as_mut()
.expect("block_import_verifier is always Some; qed");
worker.import_finality_proof(verif, who, hash, number, proof);
},
ToWorkerMsg::ImportJustification(who, hash, number, justification) => {
worker.import_justification(who, hash, number, justification);
}
}
// On blocks import request, we merely *start* the process and store
// a `Future` into `importing`.
let (block_import, verifier) = block_import_verifier
.take()
.expect("block_import_verifier is always Some; qed");

importing = Some(worker.import_batch(block_import, verifier, origin, blocks));
}
});

(future, sender)
(future, finality_sender, block_import_sender)
}

/// Returns a `Future` that imports the given blocks and sends the results on
Expand Down

0 comments on commit 2f57ae8

Please sign in to comment.