From b9ada4541ea57f21476350088fa4e55f3a791922 Mon Sep 17 00:00:00 2001 From: "Demi M. Obenour" Date: Fri, 26 Jul 2019 09:00:36 -0400 Subject: [PATCH] Remove rhododendron It is unused and unmaintained, and has presumably regressed. Remove it for now. If anyone wants to use it in the future, and has the resources to maintain it, they are encouraged to bring it back from git history. --- Cargo.lock | 46 - Cargo.toml | 1 - README.adoc | 2 +- core/consensus/rhd/Cargo.toml | 37 - core/consensus/rhd/src/error.rs | 50 - core/consensus/rhd/src/lib.rs | 1699 ------------------ core/consensus/rhd/src/misbehaviour_check.rs | 191 -- core/consensus/rhd/src/service.rs | 181 -- node/cli/src/service.rs | 58 - 9 files changed, 1 insertion(+), 2264 deletions(-) delete mode 100644 core/consensus/rhd/Cargo.toml delete mode 100644 core/consensus/rhd/src/error.rs delete mode 100644 core/consensus/rhd/src/lib.rs delete mode 100644 core/consensus/rhd/src/misbehaviour_check.rs delete mode 100644 core/consensus/rhd/src/service.rs diff --git a/Cargo.lock b/Cargo.lock index 515c2e01efaa1..8464c8a268f38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -795,15 +795,6 @@ dependencies = [ "serde 1.0.94 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "error-chain" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "backtrace 0.3.32 (registry+https://github.com/rust-lang/crates.io-index)", - "version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "exit-future" version = "0.1.4" @@ -3170,17 +3161,6 @@ dependencies = [ "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "rhododendron" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "error-chain 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "parity-codec 4.1.1 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "ring" version = "0.14.6" @@ -4417,30 +4397,6 @@ dependencies = [ "substrate-test-runtime-client 2.0.0", ] -[[package]] -name = "substrate-consensus-rhd" -version = "2.0.0" -dependencies = [ - "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", - "exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "parity-codec 4.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "rhododendron 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "sr-io 2.0.0", - "sr-primitives 2.0.0", - "sr-version 2.0.0", - "srml-support 2.0.0", - "srml-system 2.0.0", - "substrate-client 2.0.0", - "substrate-consensus-common 2.0.0", - "substrate-keyring 2.0.0", - "substrate-primitives 2.0.0", - "substrate-transaction-pool 2.0.0", - "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "substrate-consensus-slots" version = "2.0.0" @@ -6042,7 +5998,6 @@ dependencies = [ "checksum env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "aafcde04e90a5226a6443b7aabdb016ba2f8307c847d524724bd9b346dd1a2d3" "checksum environmental 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5c7464757b80de8930c91c9afe77ddce501826bf9d134a87db2c67d9dc177e2c" "checksum erased-serde 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3beee4bc16478a1b26f2e80ad819a52d24745e292f521a63c16eea5f74b7eb60" -"checksum error-chain 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3ab49e9dcb602294bc42f9a7dfc9bc6e936fca4418ea300dbfb84fe16de0b7d9" "checksum exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d8013f441e38e31c670e7f34ec8f1d5d3a2bd9d303c1ff83976ca886005e8f48" "checksum failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "795bd83d3abeb9220f257e597aa0080a508b27533824adf336529648f6abf7e2" "checksum failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "ea1063915fd7ef4309e222a5a07cf9c319fb9c7836b1f89b85458672dbb127e1" @@ -6261,7 +6216,6 @@ dependencies = [ "checksum regex-automata 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "3ed09217220c272b29ef237a974ad58515bde75f194e3ffa7e6d0bf0f3b01f86" "checksum regex-syntax 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)" = "9b01330cce219c1c6b2e209e5ed64ccd587ae5c67bed91c0b49eecf02ae40e21" "checksum remove_dir_all 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4a83fa3702a688b9359eccba92d153ac33fd2e8462f9e0e3fdf155239ea7792e" -"checksum rhododendron 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "057fecd57cc69e24d9d215c9f283a42133c3f48952e4fc06b088ecf3ce3d90bb" "checksum ring 0.14.6 (registry+https://github.com/rust-lang/crates.io-index)" = "426bc186e3e95cac1e4a4be125a4aca7e84c2d616ffc02244eef36e2a60a093c" "checksum rocksdb 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f1651697fefd273bfb4fd69466cc2a9d20de557a0213b97233b22b5e95924b5e" "checksum rpassword 3.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c34fa7bcae7fca3c8471e8417088bbc3ad9af8066b0ecf4f3c0d98a0d772716e" diff --git a/Cargo.toml b/Cargo.toml index a6a7b8d17ba35..e285608105dbd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,6 @@ members = [ "core/consensus/aura", "core/consensus/babe", "core/consensus/common", - "core/consensus/rhd", "core/consensus/slots", "core/executor", "core/executor/runtime-test", diff --git a/README.adoc b/README.adoc index 89c042bb325b7..4edeeddeb1c3c 100644 --- a/README.adoc +++ b/README.adoc @@ -393,7 +393,7 @@ substrate * Substrate Core [source, shell] substrate, substrate-cli, substrate-client, substrate-client-db, -substrate-consensus-common, substrate-consensus-rhd, +substrate-consensus-common, substrate-executor, substrate-finality-grandpa, substrate-keyring, substrate-keystore, substrate-network, substrate-network-libp2p, substrate-primitives, substrate-rpc, substrate-rpc-servers, substrate-serializer, substrate-service, substrate-service-test, substrate-state-db, diff --git a/core/consensus/rhd/Cargo.toml b/core/consensus/rhd/Cargo.toml deleted file mode 100644 index 7eda2d6904517..0000000000000 --- a/core/consensus/rhd/Cargo.toml +++ /dev/null @@ -1,37 +0,0 @@ -[package] -name = "substrate-consensus-rhd" -version = "2.0.0" -authors = ["Parity Technologies "] -description = "Rhododendron Round-Based consensus-algorithm for substrate" -edition = "2018" - -[dependencies] -derive_more = "0.14.0" -futures = "0.1.17" -codec = { package = "parity-codec", version = "4.1.1", features = ["derive"] } -primitives = { package = "substrate-primitives", path = "../../primitives" } -consensus = { package = "substrate-consensus-common", path = "../common" } -client = { package = "substrate-client", path = "../../client" } -transaction_pool = { package = "substrate-transaction-pool", path = "../../transaction-pool" } -runtime_support = { package = "srml-support", path = "../../../srml/support" } -srml-system = { path = "../../../srml/system" } -runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives" } -runtime_version = { package = "sr-version", path = "../../sr-version" } -runtime_io = { package = "sr-io", path = "../../sr-io" } -tokio = "0.1.7" -parking_lot = "0.8.0" -log = "0.4" -rhododendron = { version = "0.6.0", features = ["codec"] } -exit-future = "0.1" - -[dev-dependencies] -keyring = { package = "substrate-keyring", path = "../../keyring" } - -[features] -default = ["std"] -std = [ - "primitives/std", - "runtime_support/std", - "runtime_primitives/std", - "runtime_version/std", -] diff --git a/core/consensus/rhd/src/error.rs b/core/consensus/rhd/src/error.rs deleted file mode 100644 index 601cf1c963a58..0000000000000 --- a/core/consensus/rhd/src/error.rs +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2018-2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Error types in the rhododendron Consensus service. -use consensus::error::{Error as CommonError}; -use primitives::AuthorityId; -use client; - -/// A result alias. -pub type Result = std::result::Result; - -/// A RHD error type. -#[derive(Debug, derive_more::Display, derive_more::From)] -pub enum Error { - /// Client error. - Client(client::error::Error), - /// Consensus error. - Common(CommonError), - /// Local account ID not a validator at this block. - #[display(fmt="Local account ID ({:?}) not a validator at this block.", _0)] - NotValidator(AuthorityId), - /// Proposer destroyed before finishing proposing or evaluating - #[display(fmt="Proposer destroyed before finishing proposing or evaluating")] - PrematureDestruction, - /// Failed to register or resolve async timer. - #[display(fmt="Timer failed: {}", _0)] - Timer(tokio::timer::Error), - /// Unable to dispatch agreement future - #[display(fmt="Unable to dispatch agreement future: {:?}", _0)] - Executor(futures::future::ExecuteErrorKind), -} - -impl From<::rhododendron::InputStreamConcluded> for Error { - fn from(_: ::rhododendron::InputStreamConcluded) -> Self { - CommonError::IoTerminated.into() - } -} diff --git a/core/consensus/rhd/src/lib.rs b/core/consensus/rhd/src/lib.rs deleted file mode 100644 index 4670cb5deeae1..0000000000000 --- a/core/consensus/rhd/src/lib.rs +++ /dev/null @@ -1,1699 +0,0 @@ -// Copyright 2017-2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! BFT Agreement based on a rotating proposer in different rounds. -//! -//! Where this crate refers to input stream, should never logically conclude. -//! The logic in this crate assumes that messages flushed to the output stream -//! will eventually reach other nodes and that our own messages are not included -//! in the input stream. -//! -//! Note that it is possible to witness agreement being reached without ever -//! seeing the candidate. Any candidates seen will be checked for validity. -//! -//! Although technically the agreement will always complete (given the eventual -//! delivery of messages), in practice it is possible for this future to -//! conclude without having witnessed the conclusion. -//! In general, this future should be pre-empted by the import of a justification -//! set for this block height. - -#![cfg(feature="rhd")] -// FIXME #1020 doesn't compile -// NOTE: this is the legacy constant used for transaction size. No longer used except -// for the rhd code which is not updated. Placed here for compatibility. -const MAX_TRANSACTIONS_SIZE: u32 = 4 * 1024 * 1024; - -use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::{self, Instant, Duration}; - -use parity_codec::{Decode, Encode}; -use consensus::offline_tracker::OfflineTracker; -use consensus::error::{ErrorKind as CommonErrorKind}; -use consensus::{Authorities, BlockImport, Environment, Proposer as BaseProposer}; -use client::{Client as SubstrateClient, CallExecutor}; -use client::runtime_api::{Core, BlockBuilder as BlockBuilderAPI, OldTxQueue, BlockBuilderError}; -use runtime_primitives::generic::{BlockId, Era, ImportResult, BlockImportParams, BlockOrigin}; -use runtime_primitives::traits::{Block, Header}; -use runtime_primitives::traits::{ - Block as BlockT, Hash as HashT, Header as HeaderT, - BlockNumberToHash, SaturatedConversion -}; -use runtime_primitives::Justification; -use primitives::{AuthorityId, ed25519, Blake2Hasher, ed25519::LocalizedSignature}; -use srml_system::Trait as SystemT; - -use node_runtime::Runtime; -use transaction_pool::txpool::{self, Pool as TransactionPool}; - -use futures::prelude::*; -use futures::future; -use futures::sync::oneshot; -use tokio::runtime::TaskExecutor; -use tokio::timer::Delay; -use parking_lot::{RwLock, Mutex}; - -pub use rhododendron::{ - self, InputStreamConcluded, AdvanceRoundReason, Message as RhdMessage, - Vote as RhdMessageVote, Communication as RhdCommunication, -}; -pub use self::error::{Error, ErrorKind}; - -// pub mod misbehavior_check; -mod error; -mod service; - -// statuses for an agreement -mod status { - pub const LIVE: usize = 0; - pub const BAD: usize = 1; - pub const GOOD: usize = 2; -} - -pub type Timestamp = u64; - -pub type AccountId = ::primitives::H256; - -/// Localized message type. -pub type LocalizedMessage = rhododendron::LocalizedMessage< - B, - ::Hash, - AuthorityId, - LocalizedSignature ->; - -/// Justification of some hash. -pub struct RhdJustification(rhododendron::Justification); - -/// Justification of a prepare message. -pub struct PrepareJustification(rhododendron::PrepareJustification); - -/// Unchecked justification. -#[derive(Encode, Decode)] -pub struct UncheckedJustification(rhododendron::UncheckedJustification); - -impl UncheckedJustification { - /// Create a new, unchecked justification. - pub fn new(digest: H, signatures: Vec, round_number: u32) -> Self { - UncheckedJustification(rhododendron::UncheckedJustification { - digest, - signatures, - round_number, - }) - } -} - -impl UncheckedJustification { - /// Decode a justification. - pub fn decode_justification(justification: Justification) -> Option { - let inner: rhododendron::UncheckedJustification<_, _> = Decode::decode(&mut &justification[..])?; - - Some(UncheckedJustification(inner)) - } -} - -impl Into for UncheckedJustification { - fn into(self) -> Justification { - self.0.encode() - } -} - -impl From> for UncheckedJustification { - fn from(inner: rhododendron::UncheckedJustification) -> Self { - UncheckedJustification(inner) - } -} - -/// Result of a committed round of BFT -pub type Committed = rhododendron::Committed::Hash, LocalizedSignature>; - -/// Communication between BFT participants. -pub type Communication = rhododendron::Communication::Hash, AuthorityId, LocalizedSignature>; - -/// Misbehavior observed from BFT participants. -pub type Misbehavior = rhododendron::Misbehavior; - -/// Shared offline validator tracker. -pub type SharedOfflineTracker = Arc>; - -/// A proposer for a rhododendron instance. This must implement the base proposer logic. -pub trait LocalProposer: BaseProposer { - /// Import witnessed rhododendron misbehavior. - fn import_misbehavior(&self, misbehavior: Vec<(AuthorityId, Misbehavior)>); - - /// Determine the proposer for a given round. This should be a deterministic function - /// with consistent results across all authorities. - fn round_proposer(&self, round_number: u32, authorities: &[AuthorityId]) -> AuthorityId; - - /// Hook called when a BFT round advances without a proposal. - fn on_round_end(&self, _round_number: u32, _proposed: bool) { } -} - - -/// Build new blocks. -pub trait BlockBuilder { - /// Push an extrinsic onto the block. Fails if the extrinsic is invalid. - fn push_extrinsic(&mut self, extrinsic: ::Extrinsic) -> Result<(), Error>; -} - -/// Local client abstraction for the consensus. -pub trait AuthoringApi: - Send - + Sync - + BlockBuilderAPI<::Block, InherentData, Error=::Error> - + Core<::Block, AuthorityId, Error=::Error> - + OldTxQueue<::Block, Error=::Error> -{ - /// The block used for this API type. - type Block: BlockT; - /// The error used by this API type. - type Error: std::error::Error; - - /// Build a block on top of the given, with inherent extrinsics pre-pushed. - fn build_block) -> ()>( - &self, - at: &BlockId, - inherent_data: InherentData, - build_ctx: F, - ) -> Result; -} - -/// A long-lived network which can create BFT message routing processes on demand. -pub trait Network { - /// The block used for this API type. - type Block: BlockT; - /// The input stream of BFT messages. Should never logically conclude. - type Input: Stream,Error=Error>; - /// The output sink of BFT messages. Messages sent here should eventually pass to all - /// current authorities. - type Output: Sink,SinkError=Error>; - - /// Instantiate input and output streams. - fn communication_for( - &self, - validators: &[AuthorityId], - local_id: AuthorityId, - parent_hash: ::Hash, - task_executor: TaskExecutor - ) -> (Self::Input, Self::Output); -} - - -// caches the round number to start at if we end up with BFT consensus on the same -// parent hash more than once (happens if block is bad). -// -// this will force a committed but locally-bad block to be considered analogous to -// a round advancement vote. -#[derive(Debug)] -struct RoundCache { - hash: Option, - start_round: u32, -} - -/// Instance of BFT agreement. -struct BftInstance { - key: Arc, - authorities: Vec, - parent_hash: B::Hash, - round_timeout_multiplier: u64, - cache: Arc>>, - proposer: P, -} - -impl> BftInstance - where - B: Clone + Eq, - B::Hash: ::std::hash::Hash - -{ - fn round_timeout_duration(&self, round: u32) -> Duration { - // 2^(min(6, x/8)) * 10 - // Grows exponentially starting from 10 seconds, capped at 640 seconds. - const ROUND_INCREMENT_STEP: u32 = 8; - - let round = round / ROUND_INCREMENT_STEP; - let round = ::std::cmp::min(6, round); - - let timeout = 1u64.checked_shl(round) - .unwrap_or_else(u64::max_value) - .saturating_mul(self.round_timeout_multiplier); - - Duration::from_secs(timeout) - } - - fn update_round_cache(&self, current_round: u32) { - let mut cache = self.cache.lock(); - if cache.hash.as_ref() == Some(&self.parent_hash) { - cache.start_round = current_round + 1; - } - } -} - -impl> rhododendron::Context for BftInstance - where - B: Clone + Eq, - B::Hash: ::std::hash::Hash, -{ - type Error = P::Error; - type AuthorityId = AuthorityId; - type Digest = B::Hash; - type Signature = LocalizedSignature; - type Candidate = B; - type RoundTimeout = Box>; - type CreateProposal = ::Future; - type EvaluateProposal = ::Future; - - fn local_id(&self) -> AuthorityId { - self.key.public().into() - } - - fn proposal(&self) -> Self::CreateProposal { - self.proposer.propose().into_future() - } - - fn candidate_digest(&self, proposal: &B) -> B::Hash { - proposal.hash() - } - - fn sign_local(&self, message: RhdMessage) -> LocalizedMessage { - sign_message(message, &*self.key, self.parent_hash.clone()) - } - - fn round_proposer(&self, round: u32) -> AuthorityId { - self.proposer.round_proposer(round, &self.authorities[..]) - } - - fn proposal_valid(&self, proposal: &B) -> Self::EvaluateProposal { - self.proposer.evaluate(proposal).into_future() - } - - fn begin_round_timeout(&self, round: u32) -> Self::RoundTimeout { - let timeout = self.round_timeout_duration(round); - let fut = Delay::new(Instant::now() + timeout) - .map_err(|e| Error::from(CommonErrorKind::FaultyTimer(e))) - .map_err(Into::into); - - Box::new(fut) - } - - fn on_advance_round( - &self, - accumulator: &rhododendron::Accumulator, - round: u32, - next_round: u32, - reason: AdvanceRoundReason, - ) { - use std::collections::HashSet; - - let collect_pubkeys = |participants: HashSet<&Self::AuthorityId>| participants.into_iter() - .map(|p| ::ed25519::Public::from_raw(p.0)) - .collect::>(); - - let round_timeout = self.round_timeout_duration(next_round); - debug!(target: "rhd", "Advancing to round {} from {}", next_round, round); - debug!(target: "rhd", "Participating authorities: {:?}", - collect_pubkeys(accumulator.participants())); - debug!(target: "rhd", "Voting authorities: {:?}", - collect_pubkeys(accumulator.voters())); - debug!(target: "rhd", "Round {} should end in at most {} seconds from now", next_round, round_timeout.as_secs()); - - self.update_round_cache(next_round); - - if let AdvanceRoundReason::Timeout = reason { - self.proposer.on_round_end(round, accumulator.proposal().is_some()); - } - } -} - -/// A future that resolves either when canceled (witnessing a block from the network at same height) -/// or when agreement completes. -pub struct BftFuture where - B: Block + Clone + Eq, - B::Hash: ::std::hash::Hash, - P: LocalProposer, - P: BaseProposer, - InStream: Stream, Error=Error>, - OutSink: Sink, SinkError=Error>, -{ - inner: rhododendron::Agreement, InStream, OutSink>, - status: Arc, - cancel: oneshot::Receiver<()>, - import: Arc, -} - -impl Future for BftFuture where - B: Block + Clone + Eq, - B::Hash: ::std::hash::Hash, - P: LocalProposer, - P: BaseProposer, - I: BlockImport, - InStream: Stream, Error=Error>, - OutSink: Sink, SinkError=Error>, -{ - type Item = (); - type Error = (); - - fn poll(&mut self) -> ::futures::Poll<(), ()> { - // service has canceled the future. bail - let cancel = match self.cancel.poll() { - Ok(Async::Ready(())) | Err(_) => true, - Ok(Async::NotReady) => false, - }; - - let committed = match self.inner.poll().map_err(|_| ()) { - Ok(Async::Ready(x)) => x, - Ok(Async::NotReady) => - return Ok(if cancel { Async::Ready(()) } else { Async::NotReady }), - Err(()) => return Err(()), - }; - - // if something was committed, the round leader must have proposed. - self.inner.context().proposer.on_round_end(committed.round_number, true); - - // If we didn't see the proposal (very unlikely), - // we will get the block from the network later. - if let Some(justified_block) = committed.candidate { - let hash = justified_block.hash(); - info!(target: "rhd", "Importing block #{} ({}) directly from BFT consensus", - justified_block.header().number(), hash); - let just: Justification = UncheckedJustification(committed.justification.uncheck()).into(); - let (header, body) = justified_block.deconstruct(); - let import_block = BlockImportParams { - origin: BlockOrigin::ConsensusBroadcast, - header: header, - justification: Some(just), - body: Some(body), - finalized: true, - post_digests: Default::default(), - auxiliary: Default::default() - }; - - let new_status = match self.import.import_block(import_block, None) { - Err(e) => { - warn!(target: "rhd", "Error importing block {:?} in round #{}: {:?}", - hash, committed.round_number, e); - status::BAD - } - Ok(ImportResult::KnownBad) => { - warn!(target: "rhd", "{:?} was bad block agreed on in round #{}", - hash, committed.round_number); - status::BAD - } - _ => status::GOOD - }; - - self.status.store(new_status, Ordering::Release); - - } else { - // assume good unless we received the proposal. - self.status.store(status::GOOD, Ordering::Release); - } - - self.inner.context().update_round_cache(committed.round_number); - - Ok(Async::Ready(())) - } -} - -impl Drop for BftFuture where - B: Block + Clone + Eq, - B::Hash: ::std::hash::Hash, - P: LocalProposer, - P: BaseProposer, - InStream: Stream, Error=Error>, - OutSink: Sink, SinkError=Error>, -{ - fn drop(&mut self) { - let misbehavior = self.inner.drain_misbehavior().collect::>(); - self.inner.context().proposer.import_misbehavior(misbehavior); - } -} - -struct AgreementHandle { - status: Arc, - send_cancel: Option>, -} - -impl AgreementHandle { - fn status(&self) -> usize { - self.status.load(Ordering::Acquire) - } -} - -impl Drop for AgreementHandle { - fn drop(&mut self) { - if let Some(sender) = self.send_cancel.take() { - let _ = sender.send(()); - } - } -} - -/// The BftService kicks off the agreement process on top of any blocks it -/// is notified of. -/// -/// This assumes that it is being run in the context of a tokio runtime. -pub struct BftService { - client: Arc, - live_agreement: Mutex>, - round_cache: Arc>>, - round_timeout_multiplier: u64, - key: Arc, - factory: P, -} - -impl BftService - where - B: Block + Clone + Eq, - P: Environment, - P::Proposer: LocalProposer, - P::Proposer: BaseProposer, - I: BlockImport + Authorities, -{ - /// Create a new service instance. - pub fn new(client: Arc, key: Arc, factory: P) -> BftService { - BftService { - client: client, - live_agreement: Mutex::new(None), - round_cache: Arc::new(Mutex::new(RoundCache { - hash: None, - start_round: 0, - })), - round_timeout_multiplier: 10, - key: key, - factory, - } - } - - /// Get the local Authority ID. - pub fn local_id(&self) -> AuthorityId { - self.key.public().into() - } - - /// Signal that a valid block with the given header has been imported. - /// Provide communication streams that are localized to this block. - /// It's recommended to use the communication primitives provided by this - /// module for signature checking and decoding. See `CheckedStream` and - /// `SigningSink` for more details. - /// - /// Messages received on the stream that don't match the expected format - /// will be dropped. - /// - /// If the local signing key is an authority, this will begin the consensus process to build a - /// block on top of it. If the executor fails to run the future, an error will be returned. - /// Returns `None` if the agreement on the block with given parent is already in progress. - pub fn build_upon(&self, header: &B::Header, input: In, output: Out) - -> Result>::Proposer, - I, - In, - Out, - >>, P::Error> - where - In: Stream, Error=Error>, - Out: Sink, SinkError=Error>, - { - let hash = header.hash(); - - let mut live_agreement = self.live_agreement.lock(); - let can_build = live_agreement.as_ref() - .map_or(true, |x| self.can_build_on_inner(header, x)); - - if !can_build { - return Ok(None) - } - - let authorities = self.client.authorities(&BlockId::Hash(hash.clone())) - .map_err(|e| CommonErrorKind::Other(Box::new(e)).into())?; - - let n = authorities.len(); - let max_faulty = max_faulty_of(n); - trace!(target: "rhd", "Initiating agreement on top of #{}, {:?}", header.number(), hash); - trace!(target: "rhd", "max_faulty_of({})={}", n, max_faulty); - - let local_id = self.local_id(); - - if !authorities.contains(&local_id) { - // cancel current agreement - live_agreement.take(); - Err(CommonErrorKind::InvalidAuthority(local_id).into())?; - } - - let proposer = self.factory.init(header, &authorities, self.key.clone())?; - - let bft_instance = BftInstance { - proposer, - parent_hash: hash.clone(), - cache: self.round_cache.clone(), - round_timeout_multiplier: self.round_timeout_multiplier, - key: self.key.clone(), - authorities: authorities, - }; - - let mut agreement = rhododendron::agree( - bft_instance, - n, - max_faulty, - input, - output, - ); - - // fast forward round number if necessary. - { - let mut cache = self.round_cache.lock(); - trace!(target: "rhd", "Round cache: {:?}", &*cache); - if cache.hash.as_ref() == Some(&hash) { - trace!(target: "rhd", "Fast-forwarding to round {}", cache.start_round); - let start_round = cache.start_round; - cache.start_round += 1; - - drop(cache); - agreement.fast_forward(start_round); - } else { - *cache = RoundCache { - hash: Some(hash.clone()), - start_round: 1, - }; - } - } - - let status = Arc::new(AtomicUsize::new(status::LIVE)); - let (tx, rx) = oneshot::channel(); - - // cancel current agreement. - *live_agreement = Some((header.clone(), AgreementHandle { - send_cancel: Some(tx), - status: status.clone(), - })); - - Ok(Some(BftFuture { - inner: agreement, - status: status, - cancel: rx, - import: self.client.clone(), - })) - } - - /// Cancel current agreement if any. - pub fn cancel_agreement(&self) { - self.live_agreement.lock().take(); - } - - /// Whether we can build using the given header. - pub fn can_build_on(&self, header: &B::Header) -> bool { - self.live_agreement.lock().as_ref() - .map_or(true, |x| self.can_build_on_inner(header, x)) - } - - /// Get a reference to the underlying client. - pub fn client(&self) -> &I { &*self.client } - - fn can_build_on_inner(&self, header: &B::Header, live: &(B::Header, AgreementHandle)) -> bool { - let hash = header.hash(); - let &(ref live_header, ref handle) = live; - match handle.status() { - _ if *header != *live_header && *live_header.parent_hash() != hash => true, // can always follow with next block. - status::BAD => hash == live_header.hash(), // bad block can be re-agreed on. - _ => false, // canceled won't appear since we overwrite the handle before returning. - } - } -} - -/// Stream that decodes rhododendron messages and checks signatures. -/// -/// This stream is localized to a specific parent block-hash, as all messages -/// will be signed in a way that accounts for it. When using this with -/// `BftService::build_upon`, the user should take care to use the same hash as for that. -pub struct CheckedStream { - inner: S, - local_id: AuthorityId, - authorities: Vec, - parent_hash: B::Hash, -} - -impl CheckedStream { - /// Construct a new checked stream. - pub fn new( - inner: S, - local_id: AuthorityId, - authorities: Vec, - parent_hash: B::Hash, - ) -> Self { - CheckedStream { - inner, - local_id, - authorities, - parent_hash, - } - } -} - -impl>> Stream for CheckedStream - where S::Error: From, -{ - type Item = Communication; - type Error = S::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - use rhododendron::LocalizedMessage as RhdLocalized; - loop { - match self.inner.poll()? { - Async::Ready(Some(item)) => { - let comms: Communication = match Decode::decode(&mut &item[..]) { - Some(x) => x, - None => continue, - }; - - match comms { - RhdCommunication::Auxiliary(prepare_just) => { - let checked = check_prepare_justification::( - &self.authorities, - self.parent_hash, - UncheckedJustification(prepare_just.uncheck()), - ); - if let Ok(checked) = checked { - return Ok(Async::Ready( - Some(RhdCommunication::Auxiliary(checked.0)) - )); - } - } - RhdCommunication::Consensus(RhdLocalized::Propose(p)) => { - if p.sender == self.local_id { continue } - - let checked = check_proposal::( - &self.authorities, - &self.parent_hash, - &p, - ); - - if let Ok(()) = checked { - return Ok(Async::Ready( - Some(RhdCommunication::Consensus(RhdLocalized::Propose(p))) - )); - } - } - RhdCommunication::Consensus(RhdLocalized::Vote(v)) => { - if v.sender == self.local_id { continue } - - let checked = check_vote::( - &self.authorities, - &self.parent_hash, - &v, - ); - - if let Ok(()) = checked { - return Ok(Async::Ready( - Some(RhdCommunication::Consensus(RhdLocalized::Vote(v))) - )); - } - } - } - } - Async::Ready(None) => return Ok(Async::Ready(None)), - Async::NotReady => return Ok(Async::NotReady), - } - } - } -} - -/// Given a total number of authorities, yield the maximum faulty that would be allowed. -/// This will always be under 1/3. -pub fn max_faulty_of(n: usize) -> usize { - n.saturating_sub(1) / 3 -} - -/// Given a total number of authorities, yield the minimum required signatures. -/// This will always be over 2/3. -pub fn bft_threshold(n: usize) -> usize { - n - max_faulty_of(n) -} - -// actions in the signature scheme. -#[derive(Encode)] -enum Action { - Prepare(u32, H), - Commit(u32, H), - AdvanceRound(u32), - // signatures of header hash and full candidate are both included. - ProposeHeader(u32, H), - Propose(u32, B), -} - -// encode something in a way which is localized to a specific parent-hash -fn localized_encode(parent_hash: H, value: E) -> Vec { - (parent_hash, value).encode() -} - -fn check_justification_signed_message( - authorities: &[AuthorityId], - message: &[u8], - just: UncheckedJustification) --> Result, UncheckedJustification> { - // additional error information could be useful here. - just.0.check(authorities.len() - max_faulty_of(authorities.len()), |_, _, sig| { - let auth_id = sig.signer.clone().into(); - if !authorities.contains(&auth_id) { return None } - - if ed25519::Pair::verify(&sig.signature, message, &sig.signer) { - Some(sig.signer.0) - } else { - None - } - }).map(RhdJustification).map_err(UncheckedJustification) -} - -/// Check a full justification for a header hash. -/// Provide all valid authorities. -/// -/// On failure, returns the justification back. -pub fn check_justification( - authorities: &[AuthorityId], - parent: B::Hash, - just: UncheckedJustification -) -> Result, UncheckedJustification> { - let vote: Action = Action::Commit(just.0.round_number as u32, just.0.digest.clone()); - let message = localized_encode(parent, vote); - - check_justification_signed_message(authorities, &message[..], just) -} - -/// Check a prepare justification for a header hash. -/// Provide all valid authorities. -/// -/// On failure, returns the justification back. -pub fn check_prepare_justification( - authorities: &[AuthorityId], - parent: B::Hash, - just: UncheckedJustification -) -> Result, UncheckedJustification> { - let vote: Action = Action::Prepare(just.0.round_number as u32, just.0.digest.clone()); - let message = localized_encode(parent, vote); - - check_justification_signed_message(authorities, &message[..], just).map(|e| PrepareJustification(e.0)) -} - -/// Check proposal message signatures and authority. -/// Provide all valid authorities. -pub fn check_proposal( - authorities: &[AuthorityId], - parent_hash: &B::Hash, - propose: &rhododendron::LocalizedProposal) - -> Result<(), Error> -{ - if !authorities.contains(&propose.sender) { - return Err(CommonErrorKind::InvalidAuthority(propose.sender.into()).into()); - } - - let action_header = Action::ProposeHeader(propose.round_number as u32, propose.digest.clone()); - let action_propose = Action::Propose(propose.round_number as u32, propose.proposal.clone()); - check_action::(action_header, parent_hash, &propose.digest_signature)?; - check_action::(action_propose, parent_hash, &propose.full_signature) -} - -/// Check vote message signatures and authority. -/// Provide all valid authorities. -pub fn check_vote( - authorities: &[AuthorityId], - parent_hash: &B::Hash, - vote: &rhododendron::LocalizedVote) - -> Result<(), Error> -{ - if !authorities.contains(&vote.sender) { - return Err(CommonErrorKind::InvalidAuthority(vote.sender.into()).into()); - } - - let action = match vote.vote { - rhododendron::Vote::Prepare(r, ref h) => Action::Prepare(r as u32, h.clone()), - rhododendron::Vote::Commit(r, ref h) => Action::Commit(r as u32, h.clone()), - rhododendron::Vote::AdvanceRound(r) => Action::AdvanceRound(r as u32), - }; - check_action::(action, parent_hash, &vote.signature) -} - -fn check_action( - action: Action, - parent_hash: &B::Hash, - sig: &LocalizedSignature -) -> Result<(), Error> { - let message = localized_encode(*parent_hash, action); - if ed25519::Pair::verify(&sig.signature, &message, &sig.signer) { - Ok(()) - } else { - Err(CommonErrorKind::InvalidSignature(sig.signature.into(), sig.signer.clone().into()).into()) - } -} - -/// Sign a BFT message with the given key. -pub fn sign_message( - message: RhdMessage, - key: &ed25519::Pair, - parent_hash: B::Hash -) -> LocalizedMessage { - let signer = key.public(); - - let sign_action = |action: Action| { - let to_sign = localized_encode(parent_hash.clone(), action); - - LocalizedSignature { - signer: signer.clone(), - signature: key.sign(&to_sign), - } - }; - - match message { - RhdMessage::Propose(r, proposal) => { - let header_hash = proposal.hash(); - let action_header = Action::ProposeHeader(r as u32, header_hash.clone()); - let action_propose = Action::Propose(r as u32, proposal.clone()); - - rhododendron::LocalizedMessage::Propose(rhododendron::LocalizedProposal { - round_number: r, - proposal, - digest: header_hash, - sender: signer.clone().into(), - digest_signature: sign_action(action_header), - full_signature: sign_action(action_propose), - }) - } - RhdMessage::Vote(vote) => rhododendron::LocalizedMessage::Vote({ - let action = match vote { - RhdMessageVote::Prepare(r, h) => Action::Prepare(r as u32, h), - RhdMessageVote::Commit(r, h) => Action::Commit(r as u32, h), - RhdMessageVote::AdvanceRound(r) => Action::AdvanceRound(r as u32), - }; - - rhododendron::LocalizedVote { - vote: vote, - sender: signer.clone().into(), - signature: sign_action(action), - } - }) - } -} - - -impl<'a, B, E, Block> BlockBuilder for client::block_builder::BlockBuilder<'a, B, E, Block, Blake2Hasher> where - B: client::backend::Backend + Send + Sync + 'static, - E: CallExecutor + Send + Sync + Clone + 'static, - Block: BlockT -{ - fn push_extrinsic(&mut self, extrinsic: ::Extrinsic) -> Result<(), Error> { - client::block_builder::BlockBuilder::push(self, extrinsic).map_err(Into::into) - } -} - -impl<'a, B, E, Block> AuthoringApi for SubstrateClient where - B: client::backend::Backend + Send + Sync + 'static, - E: CallExecutor + Send + Sync + Clone + 'static, - Block: BlockT, -{ - type Block = Block; - type Error = client::error::Error; - - fn build_block) -> ()>( - &self, - at: &BlockId, - inherent_data: InherentData, - mut build_ctx: F, - ) -> Result { - let runtime_version = self.runtime_version_at(at)?; - - let mut block_builder = self.new_block_at(at)?; - if runtime_version.has_api(*b"blkbuild", 1) { - for inherent in self.inherent_extrinsics(at, &inherent_data)? { - block_builder.push(inherent)?; - } - } - - build_ctx(&mut block_builder); - - block_builder.bake().map_err(Into::into) - } -} - - -/// Proposer factory. -pub struct ProposerFactory where - C: AuthoringApi, - A: txpool::ChainApi, -{ - /// The client instance. - pub client: Arc, - /// The transaction pool. - pub transaction_pool: Arc>, - /// The backing network handle. - pub network: N, - /// handle to remote task executor - pub handle: TaskExecutor, - /// Offline-tracker. - pub offline: SharedOfflineTracker, - /// Force delay in evaluation this long. - pub force_delay: u64, -} - -impl consensus::Environment<::Block> for ProposerFactory where - N: Network::Block>, - C: AuthoringApi + BlockNumberToHash, - A: txpool::ChainApi::Block>, - // <::Block as BlockT>::Hash: - // Into<::Hash> + PartialEq + Into, - Error: From<::Error> -{ - type Proposer = Proposer; - type Error = Error; - - fn init( - &self, - parent_header: &<::Block as BlockT>::Header, - authorities: &[AuthorityId], - sign_with: Arc, - ) -> Result { - use runtime_primitives::traits::Hash as HashT; - let parent_hash = parent_header.hash(); - - let id = BlockId::hash(parent_hash); - let random_seed = self.client.random_seed(&id)?; - let random_seed = <<::Block as BlockT>::Header as HeaderT> - ::Hashing::hash(random_seed.as_ref()); - - let validators = self.client.validators(&id)?; - self.offline.write().note_new_block(&validators[..]); - - info!("Starting consensus session on top of parent {:?}", parent_hash); - - let local_id = sign_with.public().0.into(); - let (input, output) = self.network.communication_for( - authorities, - local_id, - parent_hash.clone(), - self.handle.clone(), - ); - let now = Instant::now(); - let proposer = Proposer { - client: self.client.clone(), - start: now, - local_key: sign_with, - parent_hash, - parent_id: id, - parent_number: *parent_header.number(), - random_seed, - transaction_pool: self.transaction_pool.clone(), - offline: self.offline.clone(), - validators, - minimum_timestamp: current_timestamp() + self.force_delay, - network: self.network.clone() - }; - - Ok(proposer) - } -} - -/// The proposer logic. -pub struct Proposer { - client: Arc, - start: Instant, - local_key: Arc, - parent_hash: <::Block as BlockT>::Hash, - parent_id: BlockId<::Block>, - parent_number: <<::Block as BlockT>::Header as HeaderT>::Number, - random_seed: <::Block as BlockT>::Hash, - transaction_pool: Arc>, - offline: SharedOfflineTracker, - validators: Vec, - minimum_timestamp: u64, - network: N, -} - -impl Proposer { - fn primary_index(&self, round_number: u32, len: usize) -> usize { - use primitives::uint::U256; - - let big_len = U256::from(len); - let offset = U256::from_big_endian(self.random_seed.as_ref()) % big_len; - let offset = offset.low_u64() as usize + round_number as usize; - offset % len - } -} - -impl BaseProposer<::Block> for Proposer where - C: AuthoringApi + BlockNumberToHash, - A: txpool::ChainApi::Block>, - <::Block as BlockT>::Hash: - Into<::Hash> + PartialEq + Into, - error::Error: From<::Error> -{ - type Create = Result<::Block, Error>; - type Error = Error; - type Evaluate = Box>; - - fn propose(&self) -> Self::Create { - use runtime_primitives::traits::BlakeTwo256; - - const MAX_VOTE_OFFLINE_SECONDS: Duration = Duration::from_secs(60); - - let timestamp = ::std::cmp::max(self.minimum_timestamp, current_timestamp()); - - let elapsed_since_start = self.start.elapsed(); - let offline_indices = if elapsed_since_start > MAX_VOTE_OFFLINE_SECONDS { - Vec::new() - } else { - self.offline.read().reports(&self.validators[..]) - }; - - if !offline_indices.is_empty() { - info!( - "Submitting offline validators {:?} for slash-vote", - offline_indices.iter().map(|&i| self.validators[i as usize]).collect::>(), - ) - } - - let inherent_data = InherentData { - timestamp, - offline_indices, - }; - - let block = self.client.build_block( - &self.parent_id, - inherent_data, - |block_builder| { - let mut unqueue_invalid = Vec::new(); - self.transaction_pool.ready(|pending_iterator| { - let mut pending_size = 0; - for pending in pending_iterator { - let encoded_size = pending.data.encode().len(); - if pending_size + encoded_size >= MAX_TRANSACTIONS_SIZE { break } - - match block_builder.push_extrinsic(pending.data.clone()) { - Ok(()) => { - pending_size += encoded_size; - } - Err(e) => { - trace!(target: "transaction-pool", "Invalid transaction: {}", e); - unqueue_invalid.push(pending.hash.clone()); - } - } - } - }); - - self.transaction_pool.remove_invalid(&unqueue_invalid); - })?; - - info!("Proposing block [number: {}; hash: {}; parent_hash: {}; extrinsics: [{}]]", - block.header().number(), - <::Block as BlockT>::Hash::from(block.header().hash()), - block.header().parent_hash(), - block.extrinsics().iter() - .map(|xt| format!("{}", BlakeTwo256::hash_of(xt))) - .collect::>() - .join(", ") - ); - - let substrate_block = Decode::decode(&mut block.encode().as_slice()) - .expect("blocks are defined to serialize to substrate blocks correctly; qed"); - - assert!(evaluation::evaluate_initial( - &substrate_block, - &self.parent_hash, - self.parent_number, - ).is_ok()); - - Ok(substrate_block) - } - - fn evaluate(&self, unchecked_proposal: &::Block) -> Self::Evaluate { - debug!(target: "rhd", "evaluating block on top of parent ({}, {:?})", self.parent_number, self.parent_hash); - - // do initial serialization and structural integrity checks. - if let Err(e) = evaluation::evaluate_initial( - unchecked_proposal, - &self.parent_hash, - self.parent_number, - ) { - debug!(target: "rhd", "Invalid proposal: {:?}", e); - return Box::new(future::ok(false)); - }; - - let current_timestamp = current_timestamp(); - let inherent = InherentData::new( - current_timestamp, - self.offline.read().reports(&self.validators) - ); - let proposed_timestamp = match self.client.check_inherents( - &self.parent_id, - &unchecked_proposal, - &inherent, - ) { - Ok(Ok(())) => None, - Ok(Err(BlockBuilderError::ValidAtTimestamp(timestamp))) => Some(timestamp), - Ok(Err(e)) => { - debug!(target: "rhd", "Invalid proposal (check_inherents): {:?}", e); - return Box::new(future::ok(false)); - }, - Err(e) => { - debug!(target: "rhd", "Could not call into runtime: {:?}", e); - return Box::new(future::ok(false)); - } - }; - - let vote_delays = { - - // the duration until the given timestamp is current - let proposed_timestamp = ::std::cmp::max(self.minimum_timestamp, proposed_timestamp.unwrap_or(0)); - let timestamp_delay = if proposed_timestamp > current_timestamp { - let delay_s = proposed_timestamp - current_timestamp; - debug!(target: "rhd", "Delaying evaluation of proposal for {} seconds", delay_s); - Some(Instant::now() + Duration::from_secs(delay_s)) - } else { - None - }; - - match timestamp_delay { - Some(duration) => future::Either::A( - Delay::new(duration).map_err(|e| ErrorKind::Timer(e).into()) - ), - None => future::Either::B(future::ok(())), - } - }; - - // evaluate whether the block is actually valid. - // it may be better to delay this until the delays are finished - let evaluated = match self.client.execute_block(&self.parent_id, &unchecked_proposal.clone()) - .map_err(Error::from) { - Ok(()) => Ok(true), - Err(err) => match err.kind() { - error::ErrorKind::Client(client::error::ErrorKind::Execution(_)) => Ok(false), - _ => Err(err) - } - }; - - let future = future::result(evaluated).and_then(move |good| { - let end_result = future::ok(good); - if good { - // delay a "good" vote. - future::Either::A(vote_delays.and_then(|_| end_result)) - } else { - // don't delay a "bad" evaluation. - future::Either::B(end_result) - } - }); - - Box::new(future) as Box<_> - } -} - -impl LocalProposer<::Block> for Proposer where - C: AuthoringApi + BlockNumberToHash, - A: txpool::ChainApi::Block>, - Self: BaseProposer<::Block, Error=Error>, - <::Block as BlockT>::Hash: - Into<::Hash> + PartialEq + Into, - error::Error: From<::Error> -{ - - fn round_proposer(&self, round_number: u32, authorities: &[AuthorityId]) -> AuthorityId { - let offset = self.primary_index(round_number, authorities.len()); - let proposer = authorities[offset as usize].clone(); - trace!(target: "rhd", "proposer for round {} is {}", round_number, proposer); - - proposer - } - - fn import_misbehavior( - &self, - _misbehavior: Vec<(AuthorityId, Misbehavior<<::Block as BlockT>::Hash>)> - ) { - use rhododendron::Misbehavior as GenericMisbehavior; - use runtime_primitives::bft::{MisbehaviorKind, MisbehaviorReport}; - use node_runtime::{Call, UncheckedExtrinsic, ConsensusCall}; - - let mut next_index = { - let local_id = self.local_key.public().0; - let cur_index = self.transaction_pool.cull_and_get_pending(&BlockId::hash(self.parent_hash), |pending| pending - .filter(|tx| tx.verified.sender == local_id) - .last() - .map(|tx| Ok(tx.verified.index())) - .unwrap_or_else(|| self.client.account_nonce(&self.parent_id, local_id)) - .map_err(Error::from) - ); - - match cur_index { - Ok(cur_index) => cur_index + 1, - Err(e) => { - warn!(target: "consensus", "Error computing next transaction index: {:?}", e); - return; - } - } - }; - - for (target, misbehavior) in misbehavior { - let report = MisbehaviorReport { - parent_hash: self.parent_hash.into(), - parent_number: self.parent_number.saturated_into::(), - target, - misbehavior: match misbehavior { - GenericMisbehavior::ProposeOutOfTurn(_, _, _) => continue, - GenericMisbehavior::DoublePropose(_, _, _) => continue, - GenericMisbehavior::DoublePrepare(round, (h1, s1), (h2, s2)) - => MisbehaviorKind::BftDoublePrepare(round as u32, (h1.into(), s1.signature), (h2.into(), s2.signature)), - GenericMisbehavior::DoubleCommit(round, (h1, s1), (h2, s2)) - => MisbehaviorKind::BftDoubleCommit(round as u32, (h1.into(), s1.signature), (h2.into(), s2.signature)), - } - }; - let payload = ( - next_index, - Call::Consensus(ConsensusCall::report_misbehavior(report)), - Era::immortal(), - self.client.genesis_hash() - ); - let signature = self.local_key.sign(&payload.encode()).into(); - next_index += 1; - - let local_id = self.local_key.public().0.into(); - let extrinsic = UncheckedExtrinsic { - signature: Some((node_runtime::RawAddress::Id(local_id), signature, payload.0, Era::immortal())), - function: payload.1, - }; - let uxt: <::Block as BlockT>::Extrinsic = Decode::decode( - &mut extrinsic.encode().as_slice()).expect("Encoded extrinsic is valid"); - let hash = BlockId::<::Block>::hash(self.parent_hash); - if let Err(e) = self.transaction_pool.submit_one(&hash, uxt) { - warn!("Error importing misbehavior report: {:?}", e); - } - } - } - - fn on_round_end(&self, round_number: u32, was_proposed: bool) { - let primary_validator = self.validators[ - self.primary_index(round_number, self.validators.len()) - ]; - - // alter the message based on whether we think the empty proposer was forced to skip the round. - // this is determined by checking if our local validator would have been forced to skip the round. - if !was_proposed { - let public = ed25519::Public::from_raw(primary_validator.0); - info!( - "Potential Offline Validator: {} failed to propose during assigned slot: {}", - public, - round_number, - ); - } - - self.offline.write().note_round_end(primary_validator, was_proposed); - } -} - -fn current_timestamp() -> u64 { - time::SystemTime::now().duration_since(time::UNIX_EPOCH) - .expect("now always later than unix epoch; qed") - .as_secs() -} - - -#[cfg(test)] -mod tests { - use super::*; - use std::collections::HashSet; - use std::marker::PhantomData; - - use runtime_primitives::testing::{Block as GenericTestBlock, Header as TestHeader}; - use primitives::H256; - use keyring::Ed25519Keyring; - - type TestBlock = GenericTestBlock<()>; - - struct FakeClient { - authorities: Vec, - imported_heights: Mutex> - } - - impl BlockImport for FakeClient { - type Error = Error; - - fn import_block(&self, - block: BlockImportParams, - _new_authorities: Option> - ) -> Result { - assert!(self.imported_heights.lock().insert(block.header.number)); - Ok(ImportResult::Queued) - } - } - - impl Authorities for FakeClient { - type Error = Error; - - fn authorities(&self, _at: &BlockId) -> Result, Self::Error> { - Ok(self.authorities.clone()) - } - } - - // "black hole" output sink. - struct Comms(::std::marker::PhantomData); - - impl Sink for Comms { - type SinkItem = Communication; - type SinkError = E; - - fn start_send(&mut self, _item: Communication) -> ::futures::StartSend, E> { - Ok(::futures::AsyncSink::Ready) - } - - fn poll_complete(&mut self) -> ::futures::Poll<(), E> { - Ok(Async::Ready(())) - } - } - - impl Stream for Comms { - type Item = Communication; - type Error = E; - - fn poll(&mut self) -> ::futures::Poll, Self::Error> { - Ok(::futures::Async::NotReady) - } - } - - struct DummyFactory; - struct DummyProposer(u64); - - impl Environment for DummyFactory { - type Proposer = DummyProposer; - type Error = Error; - - fn init(&self, parent_header: &TestHeader, _authorities: &[AuthorityId], _sign_with: Arc) - -> Result - { - Ok(DummyProposer(parent_header.number + 1)) - } - } - - impl BaseProposer for DummyProposer { - type Error = Error; - type Create = Result; - type Evaluate = Result; - - fn propose(&self) -> Result { - - Ok(TestBlock { - header: from_block_number(self.0), - extrinsics: Default::default() - }) - } - - fn evaluate(&self, proposal: &TestBlock) -> Result { - Ok(proposal.header.number == self.0) - } - } - - impl LocalProposer for DummyProposer { - fn import_misbehavior(&self, _misbehavior: Vec<(AuthorityId, Misbehavior)>) {} - - fn round_proposer(&self, round_number: u32, authorities: &[AuthorityId]) -> AuthorityId { - authorities[(round_number as usize) % authorities.len()].clone() - } - } - - fn make_service(client: FakeClient) - -> BftService - { - BftService { - client: Arc::new(client), - live_agreement: Mutex::new(None), - round_cache: Arc::new(Mutex::new(RoundCache { - hash: None, - start_round: 0, - })), - round_timeout_multiplier: 10, - key: Arc::new(Ed25519Keyring::One.into()), - factory: DummyFactory - } - } - - fn sign_vote(vote: rhododendron::Vote, key: &ed25519::Pair, parent_hash: H256) -> LocalizedSignature { - match sign_message::(vote.into(), key, parent_hash) { - rhododendron::LocalizedMessage::Vote(vote) => vote.signature, - _ => panic!("signing vote leads to signed vote"), - } - } - - fn from_block_number(num: u64) -> TestHeader { - TestHeader::new( - num, - Default::default(), - Default::default(), - Default::default(), - Default::default(), - ) - } - - #[test] - fn future_gets_preempted() { - let client = FakeClient { - authorities: vec![ - Ed25519Keyring::One.into(), - Ed25519Keyring::Two.into(), - Ed25519Keyring::Alice.into(), - Ed25519Keyring::Eve.into(), - ], - imported_heights: Mutex::new(HashSet::new()), - }; - - let service = make_service(client); - - let first = from_block_number(2); - let first_hash = first.hash(); - - let mut second = from_block_number(3); - second.parent_hash = first_hash; - let _second_hash = second.hash(); - - let mut first_bft = service.build_upon(&first, Comms(PhantomData), Comms(PhantomData)).unwrap().unwrap(); - assert!(service.live_agreement.lock().as_ref().unwrap().0 == first); - - let _second_bft = service.build_upon(&second, Comms(PhantomData), Comms(PhantomData)).unwrap(); - assert!(service.live_agreement.lock().as_ref().unwrap().0 != first); - assert!(service.live_agreement.lock().as_ref().unwrap().0 == second); - - // first_bft has been cancelled. need to swap out so we can check it. - let (_tx, mut rx) = oneshot::channel(); - ::std::mem::swap(&mut rx, &mut first_bft.cancel); - - assert!(rx.wait().is_ok()); - } - - #[test] - fn max_faulty() { - assert_eq!(max_faulty_of(3), 0); - assert_eq!(max_faulty_of(4), 1); - assert_eq!(max_faulty_of(100), 33); - assert_eq!(max_faulty_of(0), 0); - assert_eq!(max_faulty_of(11), 3); - assert_eq!(max_faulty_of(99), 32); - } - - #[test] - fn justification_check_works() { - let parent_hash = Default::default(); - let hash = [0xff; 32].into(); - - let authorities = vec![ - Ed25519Keyring::One.into(), - Ed25519Keyring::Two.into(), - Ed25519Keyring::Alice.into(), - Ed25519Keyring::Eve.into(), - ]; - - let authorities_keys = vec![ - Ed25519Keyring::One.into(), - Ed25519Keyring::Two.into(), - Ed25519Keyring::Alice.into(), - Ed25519Keyring::Eve.into(), - ]; - - let unchecked = UncheckedJustification(rhododendron::UncheckedJustification { - digest: hash, - round_number: 1, - signatures: authorities_keys.iter().take(3).map(|key| { - sign_vote(rhododendron::Vote::Commit(1, hash).into(), key, parent_hash) - }).collect(), - }); - - assert!(check_justification::(&authorities, parent_hash, unchecked).is_ok()); - - let unchecked = UncheckedJustification(rhododendron::UncheckedJustification { - digest: hash, - round_number: 0, // wrong round number (vs. the signatures) - signatures: authorities_keys.iter().take(3).map(|key| { - sign_vote(rhododendron::Vote::Commit(1, hash).into(), key, parent_hash) - }).collect(), - }); - - assert!(check_justification::(&authorities, parent_hash, unchecked).is_err()); - - // not enough signatures. - let unchecked = UncheckedJustification(rhododendron::UncheckedJustification { - digest: hash, - round_number: 1, - signatures: authorities_keys.iter().take(2).map(|key| { - sign_vote(rhododendron::Vote::Commit(1, hash).into(), key, parent_hash) - }).collect(), - }); - - assert!(check_justification::(&authorities, parent_hash, unchecked).is_err()); - - // wrong hash. - let unchecked = UncheckedJustification(rhododendron::UncheckedJustification { - digest: [0xfe; 32].into(), - round_number: 1, - signatures: authorities_keys.iter().take(3).map(|key| { - sign_vote(rhododendron::Vote::Commit(1, hash).into(), key, parent_hash) - }).collect(), - }); - - assert!(check_justification::(&authorities, parent_hash, unchecked).is_err()); - } - - #[test] - fn propose_check_works() { - let parent_hash = Default::default(); - - let authorities = vec![ - Ed25519Keyring::Alice.into(), - Ed25519Keyring::Eve.into(), - ]; - - let block = TestBlock { - header: from_block_number(1), - extrinsics: Default::default() - }; - - let proposal = sign_message( - rhododendron::Message::Propose(1, block.clone()), - &Ed25519Keyring::Alice.pair(), - parent_hash, - ); - if let rhododendron::LocalizedMessage::Propose(proposal) = proposal { - assert!(check_proposal(&authorities, &parent_hash, &proposal).is_ok()); - let mut invalid_round = proposal.clone(); - invalid_round.round_number = 0; - assert!(check_proposal(&authorities, &parent_hash, &invalid_round).is_err()); - let mut invalid_digest = proposal.clone(); - invalid_digest.digest = [0xfe; 32].into(); - assert!(check_proposal(&authorities, &parent_hash, &invalid_digest).is_err()); - } else { - assert!(false); - } - - // Not an authority - let proposal = sign_message::( - rhododendron::Message::Propose(1, block), - &Ed25519Keyring::Bob.pair(), - parent_hash, - ); - if let rhododendron::LocalizedMessage::Propose(proposal) = proposal { - assert!(check_proposal(&authorities, &parent_hash, &proposal).is_err()); - } else { - assert!(false); - } - } - - #[test] - fn vote_check_works() { - let parent_hash: H256 = Default::default(); - let hash: H256 = [0xff; 32].into(); - - let authorities = vec![ - Ed25519Keyring::Alice.into(), - Ed25519Keyring::Eve.into(), - ]; - - let vote = sign_message::(rhododendron::Message::Vote(rhododendron::Vote::Prepare(1, hash)), &Keyring::Alice.pair(), parent_hash);; - if let rhododendron::LocalizedMessage::Vote(vote) = vote { - assert!(check_vote::(&authorities, &parent_hash, &vote).is_ok()); - let mut invalid_sender = vote.clone(); - invalid_sender.signature.signer = Keyring::Eve.into(); - assert!(check_vote::(&authorities, &parent_hash, &invalid_sender).is_err()); - } else { - assert!(false); - } - - // Not an authority - let vote = sign_message::(rhododendron::Message::Vote(rhododendron::Vote::Prepare(1, hash)), &Keyring::Bob.pair(), parent_hash);; - if let rhododendron::LocalizedMessage::Vote(vote) = vote { - assert!(check_vote::(&authorities, &parent_hash, &vote).is_err()); - } else { - assert!(false); - } - } - - #[test] - fn drop_bft_future_does_not_deadlock() { - let client = FakeClient { - authorities: vec![ - Ed25519Keyring::One.into(), - Ed25519Keyring::Two.into(), - Ed25519Keyring::Alice.into(), - Ed25519Keyring::Eve.into(), - ], - imported_heights: Mutex::new(HashSet::new()), - }; - - let service = make_service(client); - - let first = from_block_number(2); - let first_hash = first.hash(); - - let mut second = from_block_number(3); - second.parent_hash = first_hash; - - let _ = service.build_upon(&first, Comms(PhantomData), Comms(PhantomData)).unwrap(); - assert!(service.live_agreement.lock().as_ref().unwrap().0 == first); - service.live_agreement.lock().take(); - } - - #[test] - fn bft_can_build_though_skipped() { - let client = FakeClient { - authorities: vec![ - Ed25519Keyring::One.into(), - Ed25519Keyring::Two.into(), - Ed25519Keyring::Alice.into(), - Ed25519Keyring::Eve.into(), - ], - imported_heights: Mutex::new(HashSet::new()), - }; - - let service = make_service(client); - - let first = from_block_number(2); - let first_hash = first.hash(); - - let mut second = from_block_number(3); - second.parent_hash = first_hash; - - let mut third = from_block_number(4); - third.parent_hash = second.hash(); - - let _ = service.build_upon(&first, Comms(PhantomData), Comms(PhantomData)).unwrap(); - assert!(service.live_agreement.lock().as_ref().unwrap().0 == first); - // BFT has not seen second, but will move forward on third - service.build_upon(&third, Comms(PhantomData), Comms(PhantomData)).unwrap(); - assert!(service.live_agreement.lock().as_ref().unwrap().0 == third); - - // but we are not walking backwards - service.build_upon(&second, Comms(PhantomData), Comms(PhantomData)).unwrap(); - assert!(service.live_agreement.lock().as_ref().unwrap().0 == third); - } -} diff --git a/core/consensus/rhd/src/misbehaviour_check.rs b/core/consensus/rhd/src/misbehaviour_check.rs deleted file mode 100644 index a475f5d1ef597..0000000000000 --- a/core/consensus/rhd/src/misbehaviour_check.rs +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright 2017-2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Utility for substrate-based runtimes that want to check misbehavior reports. - -use codec::{Codec, Encode}; -use primitives::{AuthorityId, Signature}; - -use rhododendron::messages::{Action, Message, MisbehaviorKind}; -use runtime_io; - -// check a message signature. returns true if signed by that authority. -fn check_message_sig( - message: Message, - signature: &Signature, - from: &AuthorityId -) -> bool { - let msg: Vec = message.encode(); - runtime_io::ed25519_verify(&signature.0, &msg, from) -} - -fn prepare(parent: H, round_number: u32, hash: H) -> Message { - Message { - parent, - action: Action::Prepare(round_number, hash), - } -} - -fn commit(parent: H, round_number: u32, hash: H) -> Message { - Message { - parent, - action: Action::Commit(round_number, hash), - } -} - -/// Evaluate misbehavior. -/// -/// Doesn't check that the header hash in question is -/// valid or whether the misbehaving authority was part of -/// the set at that block. -pub fn evaluate_misbehavior( - misbehaved: &AuthorityId, - parent_hash: H, - kind: &MisbehaviorKind, -) -> bool { - match *kind { - MisbehaviorKind::BftDoublePrepare(round, (h_1, ref s_1), (h_2, ref s_2)) => { - s_1 != s_2 && - check_message_sig::(prepare::(parent_hash, round, h_1), s_1, misbehaved) && - check_message_sig::(prepare::(parent_hash, round, h_2), s_2, misbehaved) - } - MisbehaviorKind::BftDoubleCommit(round, (h_1, ref s_1), (h_2, ref s_2)) => { - s_1 != s_2 && - check_message_sig::(commit::(parent_hash, round, h_1), s_1, misbehaved) && - check_message_sig::(commit::(parent_hash, round, h_2), s_2, misbehaved) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use keyring::Ed25519Keyring; - use rhododendron; - - use runtime_primitives::testing::{H256, Block as RawBlock}; - - type Block = RawBlock; - - fn sign_prepare(key: &ed25519::Pair, round: u32, hash: H256, parent_hash: H256) -> (H256, Signature) { - let msg = ::sign_message::( - rhododendron::Message::Vote(rhododendron::Vote::Prepare(round as _, hash)), - key, - parent_hash - ); - - match msg { - rhododendron::LocalizedMessage::Vote(vote) => (hash, vote.signature.signature), - _ => panic!("signing vote leads to signed vote"), - } - } - - fn sign_commit(key: &ed25519::Pair, round: u32, hash: H256, parent_hash: H256) -> (H256, Signature) { - let msg = ::sign_message::( - rhododendron::Message::Vote(rhododendron::Vote::Commit(round as _, hash)), - key, - parent_hash - ); - - match msg { - rhododendron::LocalizedMessage::Vote(vote) => (hash, vote.signature.signature), - _ => panic!("signing vote leads to signed vote"), - } - } - - #[test] - fn evaluates_double_prepare() { - let key = Ed25519Keyring::One.pair(); - let parent_hash = [0xff; 32].into(); - let hash_1 = [0; 32].into(); - let hash_2 = [1; 32].into(); - - assert!(evaluate_misbehavior::( - &key.public().into(), - parent_hash, - &MisbehaviorKind::BftDoublePrepare( - 1, - sign_prepare(&key, 1, hash_1, parent_hash), - sign_prepare(&key, 1, hash_2, parent_hash) - ) - )); - - // same signature twice is not misbehavior. - let signed = sign_prepare(&key, 1, hash_1, parent_hash); - assert!(!evaluate_misbehavior::( - &key.public().into(), - parent_hash, - &MisbehaviorKind::BftDoublePrepare( - 1, - signed, - signed, - ) - )); - - // misbehavior has wrong target. - assert!(!evaluate_misbehavior::( - &Ed25519Keyring::Two.into(), - parent_hash, - &MisbehaviorKind::BftDoublePrepare( - 1, - sign_prepare(&key, 1, hash_1, parent_hash), - sign_prepare(&key, 1, hash_2, parent_hash), - ) - )); - } - - #[test] - fn evaluates_double_commit() { - let key = Ed25519Keyring::One.pair(); - let parent_hash = [0xff; 32].into(); - let hash_1 = [0; 32].into(); - let hash_2 = [1; 32].into(); - - assert!(evaluate_misbehavior::( - &key.public().into(), - parent_hash, - &MisbehaviorKind::BftDoubleCommit( - 1, - sign_commit(&key, 1, hash_1, parent_hash), - sign_commit(&key, 1, hash_2, parent_hash) - ) - )); - - // same signature twice is not misbehavior. - let signed = sign_commit(&key, 1, hash_1, parent_hash); - assert!(!evaluate_misbehavior::( - &key.public().into(), - parent_hash, - &MisbehaviorKind::BftDoubleCommit( - 1, - signed, - signed, - ) - )); - - // misbehavior has wrong target. - assert!(!evaluate_misbehavior::( - &Ed25519Keyring::Two.into(), - parent_hash, - &MisbehaviorKind::BftDoubleCommit( - 1, - sign_commit(&key, 1, hash_1, parent_hash), - sign_commit(&key, 1, hash_2, parent_hash), - ) - )); - } -} diff --git a/core/consensus/rhd/src/service.rs b/core/consensus/rhd/src/service.rs deleted file mode 100644 index f59393c530356..0000000000000 --- a/core/consensus/rhd/src/service.rs +++ /dev/null @@ -1,181 +0,0 @@ -// Copyright 2018-2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Consensus service. - -/// Consensus service. A long running service that manages BFT agreement -/// the network. -use std::thread; -use std::time::{Duration, Instant}; -use std::sync::Arc; - -use client::{BlockchainEvents, BlockBody}; -use futures::prelude::*; -use transaction_pool::txpool::{Pool as TransactionPool, ChainApi as PoolChainApi}; -use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, BlockNumberToHash}; - -use tokio::executor::current_thread::TaskExecutor as LocalThreadHandle; -use tokio::runtime::TaskExecutor as ThreadPoolHandle; -use tokio::runtime::current_thread::Runtime as LocalRuntime; -use tokio::timer::Interval; - -use parking_lot::RwLock; -use consensus::{self, offline_tracker::OfflineTracker}; - -use super::{Network, ProposerFactory, AuthoringApi}; -use {consensus, primitives, ed25519, error, BftService, LocalProposer}; - -const TIMER_DELAY_MS: u64 = 5000; -const TIMER_INTERVAL_MS: u64 = 500; - -// spin up an instance of BFT agreement on the current thread's executor. -// panics if there is no current thread executor. -fn start_bft( - header: ::Header, - bft_service: Arc>, -) where - F: consensus::Environment + 'static, - C: consensus::BlockImport + consensus::Authorities + 'static, - F::Error: ::std::fmt::Debug, - >::Error: ::std::fmt::Display + Into, - >::Proposer : LocalProposer, - >::Error: ::std::fmt::Display, - Block: BlockT, -{ - let mut handle = LocalThreadHandle::current(); - match bft_service.build_upon(&header) { - Ok(Some(bft_work)) => if let Err(e) = handle.spawn_local(Box::new(bft_work)) { - warn!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e); - } - Ok(None) => trace!(target: "bft", "Could not start agreement on top of {}", header.hash()), - Err(e) => warn!(target: "bft", "BFT agreement error: {}", e), - } -} - -/// Consensus service. Starts working when created. -pub struct Service { - thread: Option>, - exit_signal: Option<::exit_future::Signal>, -} - -impl Service { - /// Create and start a new instance. - pub fn new( - client: Arc, - api: Arc, - network: N, - transaction_pool: Arc>, - thread_pool: ThreadPoolHandle, - key: ed25519::Pair, - block_delay: u64, - ) -> Service - where - error::Error: From<::Error>, - A: AuthoringApi + BlockNumberToHash + 'static, - P: PoolChainApi::Block> + 'static, - C: BlockchainEvents<::Block> - + BlockBody<::Block> - + consensus::SelectChain<::Block> - + consensus::BlockImport<::Block> - + consensus::Authorities<::Block> + Send + Sync + 'static, - primitives::H256: From<<::Block as BlockT>::Hash>, - <::Block as BlockT>::Hash: PartialEq + PartialEq, - N: Network::Block> + Send + 'static, - { - - let (signal, exit) = ::exit_future::signal(); - let thread = thread::spawn(move || { - let mut runtime = LocalRuntime::new().expect("Could not create local runtime"); - let key = Arc::new(key); - - let factory = ProposerFactory { - client: api.clone(), - transaction_pool: transaction_pool.clone(), - network, - handle: thread_pool.clone(), - offline: Arc::new(RwLock::new(OfflineTracker::new())), - force_delay: block_delay, - }; - let bft_service = Arc::new(BftService::new(client.clone(), key, factory)); - - let notifications = { - let client = client.clone(); - let bft_service = bft_service.clone(); - - client.import_notification_stream().for_each(move |notification| { - if notification.is_new_best { - start_bft(notification.header, bft_service.clone()); - } - Ok(()) - }) - }; - - let interval = Interval::new( - Instant::now() + Duration::from_millis(TIMER_DELAY_MS), - Duration::from_millis(TIMER_INTERVAL_MS), - ); - - let mut prev_best = match client.best_block_header() { - Ok(header) => header.hash(), - Err(e) => { - warn!("Cant's start consensus service. Error reading best block header: {:?}", e); - return; - } - }; - - let timed = { - let c = client.clone(); - let s = bft_service.clone(); - - interval.map_err(|e| debug!(target: "bft", "Timer error: {:?}", e)).for_each(move |_| { - if let Ok(best_block) = c.best_block_header() { - let hash = best_block.hash(); - - if hash == prev_best { - debug!(target: "bft", "Starting consensus round after a timeout"); - start_bft(best_block, s.clone()); - } - prev_best = hash; - } - Ok(()) - }) - }; - - runtime.spawn(notifications); - runtime.spawn(timed); - - if let Err(e) = runtime.block_on(exit) { - debug!("BFT event loop error {:?}", e); - } - }); - Service { - thread: Some(thread), - exit_signal: Some(signal), - } - } -} - -impl Drop for Service { - fn drop(&mut self) { - if let Some(signal) = self.exit_signal.take() { - signal.fire(); - } - - if let Some(thread) = self.thread.take() { - thread.join().expect("The service thread has panicked"); - } - } -} diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index aeeb8e2061fdf..616b6ec722a48 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -278,64 +278,6 @@ mod tests { use service_test::SyncService; use crate::service::Factory; - #[cfg(feature = "rhd")] - fn test_sync() { - use primitives::ed25519::Pair; - - use {service_test, Factory}; - use client::{BlockImportParams, BlockOrigin}; - - let alice: Arc = Arc::new(Keyring::Alice.into()); - let bob: Arc = Arc::new(Keyring::Bob.into()); - let validators = vec![alice.public().0.into(), bob.public().0.into()]; - let keys: Vec<&ed25519::Pair> = vec![&*alice, &*bob]; - let dummy_runtime = ::tokio::runtime::Runtime::new().unwrap(); - let block_factory = |service: &::FullService| { - let block_id = BlockId::number(service.client().info().chain.best_number); - let parent_header = service.client().header(&block_id).unwrap().unwrap(); - let consensus_net = ConsensusNetwork::new(service.network(), service.client().clone()); - let proposer_factory = consensus::ProposerFactory { - client: service.client().clone(), - transaction_pool: service.transaction_pool().clone(), - network: consensus_net, - force_delay: 0, - handle: dummy_runtime.executor(), - }; - let (proposer, _, _) = proposer_factory.init(&parent_header, &validators, alice.clone()).unwrap(); - let block = proposer.propose().expect("Error making test block"); - BlockImportParams { - origin: BlockOrigin::File, - justification: Vec::new(), - internal_justification: Vec::new(), - finalized: true, - body: Some(block.extrinsics), - header: block.header, - auxiliary: Vec::new(), - } - }; - let extrinsic_factory = |service: &SyncService<::FullService>| { - let payload = ( - 0, - Call::Balances(BalancesCall::transfer(RawAddress::Id(bob.public().0.into()), 69.into())), - Era::immortal(), - service.client().genesis_hash() - ); - let signature = alice.sign(&payload.encode()).into(); - let id = alice.public().0.into(); - let xt = UncheckedExtrinsic { - signature: Some((RawAddress::Id(id), signature, payload.0, Era::immortal())), - function: payload.1, - }.encode(); - let v: Vec = Decode::decode(&mut xt.as_slice()).unwrap(); - OpaqueExtrinsic(v) - }; - service_test::sync::( - chain_spec::integration_test_config(), - block_factory, - extrinsic_factory, - ); - } - #[test] #[ignore] fn test_sync() {