Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Update import metrics and add verification time (#6170)
Browse files Browse the repository at this point in the history
* refactor import reporting and add time

* Update primitives/consensus/common/src/metrics.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* remove (crate)

* fix longer lines

* swap names to avoid api breaking

Co-authored-by: Max Inden <mail@max-inden.de>
  • Loading branch information
NikVolf and mxinden authored Jun 1, 2020
1 parent 4077246 commit 9ebcda4
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 55 deletions.
18 changes: 15 additions & 3 deletions client/network/test/src/block_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ fn import_single_good_block_works() {
let mut expected_aux = ImportedAux::default();
expected_aux.is_new_best = true;

match import_single_block(&mut substrate_test_runtime_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier(true)) {
match import_single_block(
&mut substrate_test_runtime_client::new(),
BlockOrigin::File, block,
&mut PassThroughVerifier(true),
) {
Ok(BlockImportResult::ImportedUnknown(ref num, ref aux, ref org))
if *num == number && *aux == expected_aux && *org == Some(peer_id) => {}
r @ _ => panic!("{:?}", r)
Expand All @@ -65,7 +69,11 @@ fn import_single_good_block_works() {
#[test]
fn import_single_good_known_block_is_ignored() {
let (mut client, _hash, number, _, block) = prepare_good_block();
match import_single_block(&mut client, BlockOrigin::File, block, &mut PassThroughVerifier(true)) {
match import_single_block(
&mut client, BlockOrigin::File,
block,
&mut PassThroughVerifier(true)
) {
Ok(BlockImportResult::ImportedKnown(ref n)) if *n == number => {}
_ => panic!()
}
Expand All @@ -75,7 +83,11 @@ fn import_single_good_known_block_is_ignored() {
fn import_single_good_block_without_header_fails() {
let (_, _, _, peer_id, mut block) = prepare_good_block();
block.header = None;
match import_single_block(&mut substrate_test_runtime_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier(true)) {
match import_single_block(
&mut substrate_test_runtime_client::new(),
BlockOrigin::File, block,
&mut PassThroughVerifier(true),
) {
Err(BlockImportError::IncompleteHeader(ref org)) if *org == Some(peer_id) => {}
_ => panic!()
}
Expand Down
42 changes: 33 additions & 9 deletions primitives/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
//! queues to be instantiated simply.
use std::collections::HashMap;

use sp_runtime::{Justification, traits::{Block as BlockT, Header as _, NumberFor}};
use crate::error::Error as ConsensusError;
use crate::block_import::{
BlockImport, BlockOrigin, BlockImportParams, ImportedAux, JustificationImport, ImportResult,
BlockCheckParams, FinalityProofImport,
};

use crate::{
error::Error as ConsensusError,
block_import::{
BlockImport, BlockOrigin, BlockImportParams, ImportedAux, JustificationImport, ImportResult,
BlockCheckParams, FinalityProofImport,
},
metrics::Metrics,
};
pub use basic_queue::BasicQueue;

mod basic_queue;
Expand Down Expand Up @@ -186,6 +190,17 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>, Transaction>(
block_origin: BlockOrigin,
block: IncomingBlock<B>,
verifier: &mut V,
) -> Result<BlockImportResult<NumberFor<B>>, BlockImportError> {
import_single_block_metered(import_handle, block_origin, block, verifier, None)
}

/// Single block import function with metering.
pub(crate) fn import_single_block_metered<B: BlockT, V: Verifier<B>, Transaction>(
import_handle: &mut dyn BlockImport<B, Transaction = Transaction, Error = ConsensusError>,
block_origin: BlockOrigin,
block: IncomingBlock<B>,
verifier: &mut V,
metrics: Option<Metrics>,
) -> Result<BlockImportResult<NumberFor<B>>, BlockImportError> {
let peer = block.origin;

Expand All @@ -207,8 +222,8 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>, Transaction>(
let hash = header.hash();
let parent_hash = header.parent_hash().clone();

let import_error = |e| {
match e {
let import_handler = |import| {
match import {
Ok(ImportResult::AlreadyInChain) => {
trace!(target: "sync", "Block already in chain {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedKnown(number))
Expand All @@ -232,7 +247,8 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>, Transaction>(
}
}
};
match import_error(import_handle.check_block(BlockCheckParams {

match import_handler(import_handle.check_block(BlockCheckParams {
hash,
number,
parent_hash,
Expand All @@ -243,21 +259,29 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>, Transaction>(
r => return Ok(r), // Any other successful result means that the block is already imported.
}

let started = std::time::Instant::now();
let (mut import_block, maybe_keys) = verifier.verify(block_origin, header, justification, block.body)
.map_err(|msg| {
if let Some(ref peer) = peer {
trace!(target: "sync", "Verifying {}({}) from {} failed: {}", number, hash, peer, msg);
} else {
trace!(target: "sync", "Verifying {}({}) failed: {}", number, hash, msg);
}
if let Some(metrics) = metrics.as_ref() {
metrics.report_verification(false, started.elapsed());
}
BlockImportError::VerificationFailed(peer.clone(), msg)
})?;

if let Some(metrics) = metrics.as_ref() {
metrics.report_verification(true, started.elapsed());
}

let mut cache = HashMap::new();
if let Some(keys) = maybe_keys {
cache.extend(keys.into_iter());
}
import_block.allow_missing_state = block.allow_missing_state;

import_error(import_handle.import_block(import_block.convert_transaction(), cache))
import_handler(import_handle.import_block(import_block.convert_transaction(), cache))
}
66 changes: 24 additions & 42 deletions primitives/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, Num
use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
use prometheus_endpoint::Registry;

use crate::block_import::BlockOrigin;
use crate::metrics::Metrics;
use crate::import_queue::{
BlockImportResult, BlockImportError, Verifier, BoxBlockImport, BoxFinalityProofImport,
BoxJustificationImport, ImportQueue, Link, Origin,
IncomingBlock, import_single_block,
buffered_link::{self, BufferedLinkSender, BufferedLinkReceiver}
use crate::{
block_import::BlockOrigin,
import_queue::{
BlockImportResult, BlockImportError, Verifier, BoxBlockImport, BoxFinalityProofImport,
BoxJustificationImport, ImportQueue, Link, Origin,
IncomingBlock, import_single_block_metered,
buffered_link::{self, BufferedLinkSender, BufferedLinkReceiver},
},
metrics::Metrics,
};

/// Interface to a basic block import queue that is importing blocks sequentially in a separate
Expand Down Expand Up @@ -146,11 +148,6 @@ struct BlockImportWorker<B: BlockT, Transaction> {
_phantom: PhantomData<Transaction>,
}

const METRIC_SUCCESS_FIELDS: [&'static str; 8] = [
"success", "incomplete_header", "verification_failed", "bad_block",
"missing_state", "unknown_parent", "cancelled", "failed"
];

impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
fn new<V: 'static + Verifier<B>>(
result_sender: BufferedLinkSender<B>,
Expand Down Expand Up @@ -228,7 +225,7 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
// a `Future` into `importing`.
let (bi, verif) = block_import_verifier.take()
.expect("block_import_verifier is always Some; qed");
importing = Some(worker.import_a_batch_of_blocks(bi, verif, origin, blocks));
importing = Some(worker.import_batch(bi, verif, origin, blocks));
},
ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => {
let (_, verif) = block_import_verifier.as_mut()
Expand All @@ -250,39 +247,18 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
///
/// For lifetime reasons, the `BlockImport` implementation must be passed by value, and is
/// yielded back in the output once the import is finished.
fn import_a_batch_of_blocks<V: 'static + Verifier<B>>(
fn import_batch<V: 'static + Verifier<B>>(
&mut self,
block_import: BoxBlockImport<B, Transaction>,
verifier: V,
origin: BlockOrigin,
blocks: Vec<IncomingBlock<B>>
blocks: Vec<IncomingBlock<B>>,
) -> impl Future<Output = (BoxBlockImport<B, Transaction>, V)> {
let mut result_sender = self.result_sender.clone();
let metrics = self.metrics.clone();

import_many_blocks(block_import, origin, blocks, verifier, self.delay_between_blocks)
import_many_blocks(block_import, origin, blocks, verifier, self.delay_between_blocks, metrics)
.then(move |(imported, count, results, block_import, verifier)| {
if let Some(metrics) = metrics {
let amounts = results.iter().fold([0u64; 8], |mut acc, result| {
match result.0 {
Ok(_) => acc[0] += 1,
Err(BlockImportError::IncompleteHeader(_)) => acc[1] += 1,
Err(BlockImportError::VerificationFailed(_,_)) => acc[2] += 1,
Err(BlockImportError::BadBlock(_)) => acc[3] += 1,
Err(BlockImportError::MissingState) => acc[4] += 1,
Err(BlockImportError::UnknownParent) => acc[5] += 1,
Err(BlockImportError::Cancelled) => acc[6] += 1,
Err(BlockImportError::Other(_)) => acc[7] += 1,
};
acc
});
for (idx, field) in METRIC_SUCCESS_FIELDS.iter().enumerate() {
let amount = amounts[idx];
if amount > 0 {
metrics.import_queue_processed.with_label_values(&[&field]).inc_by(amount)
}
};
}
result_sender.blocks_processed(imported, count, results);
future::ready((block_import, verifier))
})
Expand Down Expand Up @@ -352,6 +328,7 @@ fn import_many_blocks<B: BlockT, V: Verifier<B>, Transaction>(
blocks: Vec<IncomingBlock<B>>,
verifier: V,
delay_between_blocks: Duration,
metrics: Option<Metrics>,
) -> impl Future<
Output = (
usize,
Expand Down Expand Up @@ -401,9 +378,9 @@ fn import_many_blocks<B: BlockT, V: Verifier<B>, Transaction>(
None => {
// No block left to import, success!
let import_handle = import_handle.take()
.expect("Future polled again after it has finished");
.expect("Future polled again after it has finished (import handle is None)");
let verifier = verifier.take()
.expect("Future polled again after it has finished");
.expect("Future polled again after it has finished (verifier handle is None)");
let results = mem::replace(&mut results, Vec::new());
return Poll::Ready((imported, count, results, import_handle, verifier));
},
Expand All @@ -413,24 +390,29 @@ fn import_many_blocks<B: BlockT, V: Verifier<B>, Transaction>(
// therefore `import_handle` and `verifier` are always `Some` here. It is illegal to poll
// a `Future` again after it has ended.
let import_handle = import_handle.as_mut()
.expect("Future polled again after it has finished");
.expect("Future polled again after it has finished (import handle is None)");
let verifier = verifier.as_mut()
.expect("Future polled again after it has finished");
.expect("Future polled again after it has finished (verifier handle is None)");

let block_number = block.header.as_ref().map(|h| h.number().clone());
let block_hash = block.hash;
let import_result = if has_error {
Err(BlockImportError::Cancelled)
} else {
// The actual import.
import_single_block(
import_single_block_metered(
&mut **import_handle,
blocks_origin.clone(),
block,
verifier,
metrics.clone(),
)
};

if let Some(metrics) = metrics.as_ref() {
metrics.report_import::<B>(&import_result);
}

if import_result.is_ok() {
trace!(target: "sync", "Block imported successfully {:?} ({})", block_number, block_hash);
imported += 1;
Expand Down
43 changes: 42 additions & 1 deletion primitives/consensus/common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@

//! Metering tools for consensus
use prometheus_endpoint::{register, U64, Registry, PrometheusError, Opts, CounterVec};
use prometheus_endpoint::{register, U64, Registry, PrometheusError, Opts, CounterVec, HistogramVec, HistogramOpts};

use sp_runtime::traits::{Block as BlockT, NumberFor};

use crate::import_queue::{BlockImportResult, BlockImportError};

/// Generic Prometheus metrics for common consensus functionality.
#[derive(Clone)]
pub(crate) struct Metrics {
pub import_queue_processed: CounterVec<U64>,
pub block_verification_time: HistogramVec,
}

impl Metrics {
Expand All @@ -34,6 +39,42 @@ impl Metrics {
)?,
registry,
)?,
block_verification_time: register(
HistogramVec::new(
HistogramOpts::new(
"block_verification_time",
"Histogram of time taken to import blocks",
),
&["result"],
)?,
registry,
)?,
})
}

pub fn report_import<B: BlockT>(
&self,
result: &Result<BlockImportResult<NumberFor<B>>, BlockImportError>,
) {
let label = match result {
Ok(_) => "success",
Err(BlockImportError::IncompleteHeader(_)) => "incomplete_header",
Err(BlockImportError::VerificationFailed(_,_)) => "verification_failed",
Err(BlockImportError::BadBlock(_)) => "bad_block",
Err(BlockImportError::MissingState) => "missing_state",
Err(BlockImportError::UnknownParent) => "unknown_parent",
Err(BlockImportError::Cancelled) => "cancelled",
Err(BlockImportError::Other(_)) => "failed",
};

self.import_queue_processed.with_label_values(
&[label]
).inc();
}

pub fn report_verification(&self, success: bool, time: std::time::Duration) {
self.block_verification_time.with_label_values(
&[if success { "success" } else { "verification_failed" }]
).observe(time.as_secs_f64());
}
}

0 comments on commit 9ebcda4

Please sign in to comment.