diff --git a/Cargo.lock b/Cargo.lock index 515c2e01efaa1..8464c8a268f38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -795,15 +795,6 @@ dependencies = [ "serde 1.0.94 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "error-chain" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "backtrace 0.3.32 (registry+https://github.com/rust-lang/crates.io-index)", - "version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "exit-future" version = "0.1.4" @@ -3170,17 +3161,6 @@ dependencies = [ "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "rhododendron" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "error-chain 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "parity-codec 4.1.1 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "ring" version = "0.14.6" @@ -4417,30 +4397,6 @@ dependencies = [ "substrate-test-runtime-client 2.0.0", ] -[[package]] -name = "substrate-consensus-rhd" -version = "2.0.0" -dependencies = [ - "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", - "exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "parity-codec 4.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "rhododendron 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "sr-io 2.0.0", - "sr-primitives 2.0.0", - "sr-version 2.0.0", - "srml-support 2.0.0", - "srml-system 2.0.0", - "substrate-client 2.0.0", - "substrate-consensus-common 2.0.0", - "substrate-keyring 2.0.0", - "substrate-primitives 2.0.0", - "substrate-transaction-pool 2.0.0", - "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "substrate-consensus-slots" version = "2.0.0" @@ -6042,7 +5998,6 @@ dependencies = [ "checksum env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "aafcde04e90a5226a6443b7aabdb016ba2f8307c847d524724bd9b346dd1a2d3" "checksum environmental 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5c7464757b80de8930c91c9afe77ddce501826bf9d134a87db2c67d9dc177e2c" "checksum erased-serde 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3beee4bc16478a1b26f2e80ad819a52d24745e292f521a63c16eea5f74b7eb60" -"checksum error-chain 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3ab49e9dcb602294bc42f9a7dfc9bc6e936fca4418ea300dbfb84fe16de0b7d9" "checksum exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d8013f441e38e31c670e7f34ec8f1d5d3a2bd9d303c1ff83976ca886005e8f48" "checksum failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "795bd83d3abeb9220f257e597aa0080a508b27533824adf336529648f6abf7e2" "checksum failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "ea1063915fd7ef4309e222a5a07cf9c319fb9c7836b1f89b85458672dbb127e1" @@ -6261,7 +6216,6 @@ dependencies = [ "checksum regex-automata 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "3ed09217220c272b29ef237a974ad58515bde75f194e3ffa7e6d0bf0f3b01f86" "checksum regex-syntax 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)" = "9b01330cce219c1c6b2e209e5ed64ccd587ae5c67bed91c0b49eecf02ae40e21" "checksum remove_dir_all 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4a83fa3702a688b9359eccba92d153ac33fd2e8462f9e0e3fdf155239ea7792e" -"checksum rhododendron 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "057fecd57cc69e24d9d215c9f283a42133c3f48952e4fc06b088ecf3ce3d90bb" "checksum ring 0.14.6 (registry+https://github.com/rust-lang/crates.io-index)" = "426bc186e3e95cac1e4a4be125a4aca7e84c2d616ffc02244eef36e2a60a093c" "checksum rocksdb 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f1651697fefd273bfb4fd69466cc2a9d20de557a0213b97233b22b5e95924b5e" "checksum rpassword 3.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c34fa7bcae7fca3c8471e8417088bbc3ad9af8066b0ecf4f3c0d98a0d772716e" diff --git a/Cargo.toml b/Cargo.toml index a6a7b8d17ba35..e285608105dbd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,6 @@ members = [ "core/consensus/aura", "core/consensus/babe", "core/consensus/common", - "core/consensus/rhd", "core/consensus/slots", "core/executor", "core/executor/runtime-test", diff --git a/README.adoc b/README.adoc index 89c042bb325b7..4edeeddeb1c3c 100644 --- a/README.adoc +++ b/README.adoc @@ -393,7 +393,7 @@ substrate * Substrate Core [source, shell] substrate, substrate-cli, substrate-client, substrate-client-db, -substrate-consensus-common, substrate-consensus-rhd, +substrate-consensus-common, substrate-executor, substrate-finality-grandpa, substrate-keyring, substrate-keystore, substrate-network, substrate-network-libp2p, substrate-primitives, substrate-rpc, substrate-rpc-servers, substrate-serializer, substrate-service, substrate-service-test, substrate-state-db, diff --git a/core/consensus/rhd/Cargo.toml b/core/consensus/rhd/Cargo.toml deleted file mode 100644 index 7eda2d6904517..0000000000000 --- a/core/consensus/rhd/Cargo.toml +++ /dev/null @@ -1,37 +0,0 @@ -[package] -name = "substrate-consensus-rhd" -version = "2.0.0" -authors = ["Parity Technologies "] -description = "Rhododendron Round-Based consensus-algorithm for substrate" -edition = "2018" - -[dependencies] -derive_more = "0.14.0" -futures = "0.1.17" -codec = { package = "parity-codec", version = "4.1.1", features = ["derive"] } -primitives = { package = "substrate-primitives", path = "../../primitives" } -consensus = { package = "substrate-consensus-common", path = "../common" } -client = { package = "substrate-client", path = "../../client" } -transaction_pool = { package = "substrate-transaction-pool", path = "../../transaction-pool" } -runtime_support = { package = "srml-support", path = "../../../srml/support" } -srml-system = { path = "../../../srml/system" } -runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives" } -runtime_version = { package = "sr-version", path = "../../sr-version" } -runtime_io = { package = "sr-io", path = "../../sr-io" } -tokio = "0.1.7" -parking_lot = "0.8.0" -log = "0.4" -rhododendron = { version = "0.6.0", features = ["codec"] } -exit-future = "0.1" - -[dev-dependencies] -keyring = { package = "substrate-keyring", path = "../../keyring" } - -[features] -default = ["std"] -std = [ - "primitives/std", - "runtime_support/std", - "runtime_primitives/std", - "runtime_version/std", -] diff --git a/core/consensus/rhd/src/error.rs b/core/consensus/rhd/src/error.rs deleted file mode 100644 index 601cf1c963a58..0000000000000 --- a/core/consensus/rhd/src/error.rs +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2018-2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Error types in the rhododendron Consensus service. -use consensus::error::{Error as CommonError}; -use primitives::AuthorityId; -use client; - -/// A result alias. -pub type Result = std::result::Result; - -/// A RHD error type. -#[derive(Debug, derive_more::Display, derive_more::From)] -pub enum Error { - /// Client error. - Client(client::error::Error), - /// Consensus error. - Common(CommonError), - /// Local account ID not a validator at this block. - #[display(fmt="Local account ID ({:?}) not a validator at this block.", _0)] - NotValidator(AuthorityId), - /// Proposer destroyed before finishing proposing or evaluating - #[display(fmt="Proposer destroyed before finishing proposing or evaluating")] - PrematureDestruction, - /// Failed to register or resolve async timer. - #[display(fmt="Timer failed: {}", _0)] - Timer(tokio::timer::Error), - /// Unable to dispatch agreement future - #[display(fmt="Unable to dispatch agreement future: {:?}", _0)] - Executor(futures::future::ExecuteErrorKind), -} - -impl From<::rhododendron::InputStreamConcluded> for Error { - fn from(_: ::rhododendron::InputStreamConcluded) -> Self { - CommonError::IoTerminated.into() - } -} diff --git a/core/consensus/rhd/src/lib.rs b/core/consensus/rhd/src/lib.rs deleted file mode 100644 index 4670cb5deeae1..0000000000000 --- a/core/consensus/rhd/src/lib.rs +++ /dev/null @@ -1,1699 +0,0 @@ -// Copyright 2017-2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! BFT Agreement based on a rotating proposer in different rounds. -//! -//! Where this crate refers to input stream, should never logically conclude. -//! The logic in this crate assumes that messages flushed to the output stream -//! will eventually reach other nodes and that our own messages are not included -//! in the input stream. -//! -//! Note that it is possible to witness agreement being reached without ever -//! seeing the candidate. Any candidates seen will be checked for validity. -//! -//! Although technically the agreement will always complete (given the eventual -//! delivery of messages), in practice it is possible for this future to -//! conclude without having witnessed the conclusion. -//! In general, this future should be pre-empted by the import of a justification -//! set for this block height. - -#![cfg(feature="rhd")] -// FIXME #1020 doesn't compile -// NOTE: this is the legacy constant used for transaction size. No longer used except -// for the rhd code which is not updated. Placed here for compatibility. -const MAX_TRANSACTIONS_SIZE: u32 = 4 * 1024 * 1024; - -use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::{self, Instant, Duration}; - -use parity_codec::{Decode, Encode}; -use consensus::offline_tracker::OfflineTracker; -use consensus::error::{ErrorKind as CommonErrorKind}; -use consensus::{Authorities, BlockImport, Environment, Proposer as BaseProposer}; -use client::{Client as SubstrateClient, CallExecutor}; -use client::runtime_api::{Core, BlockBuilder as BlockBuilderAPI, OldTxQueue, BlockBuilderError}; -use runtime_primitives::generic::{BlockId, Era, ImportResult, BlockImportParams, BlockOrigin}; -use runtime_primitives::traits::{Block, Header}; -use runtime_primitives::traits::{ - Block as BlockT, Hash as HashT, Header as HeaderT, - BlockNumberToHash, SaturatedConversion -}; -use runtime_primitives::Justification; -use primitives::{AuthorityId, ed25519, Blake2Hasher, ed25519::LocalizedSignature}; -use srml_system::Trait as SystemT; - -use node_runtime::Runtime; -use transaction_pool::txpool::{self, Pool as TransactionPool}; - -use futures::prelude::*; -use futures::future; -use futures::sync::oneshot; -use tokio::runtime::TaskExecutor; -use tokio::timer::Delay; -use parking_lot::{RwLock, Mutex}; - -pub use rhododendron::{ - self, InputStreamConcluded, AdvanceRoundReason, Message as RhdMessage, - Vote as RhdMessageVote, Communication as RhdCommunication, -}; -pub use self::error::{Error, ErrorKind}; - -// pub mod misbehavior_check; -mod error; -mod service; - -// statuses for an agreement -mod status { - pub const LIVE: usize = 0; - pub const BAD: usize = 1; - pub const GOOD: usize = 2; -} - -pub type Timestamp = u64; - -pub type AccountId = ::primitives::H256; - -/// Localized message type. -pub type LocalizedMessage = rhododendron::LocalizedMessage< - B, - ::Hash, - AuthorityId, - LocalizedSignature ->; - -/// Justification of some hash. -pub struct RhdJustification(rhododendron::Justification); - -/// Justification of a prepare message. -pub struct PrepareJustification(rhododendron::PrepareJustification); - -/// Unchecked justification. -#[derive(Encode, Decode)] -pub struct UncheckedJustification(rhododendron::UncheckedJustification); - -impl UncheckedJustification { - /// Create a new, unchecked justification. - pub fn new(digest: H, signatures: Vec, round_number: u32) -> Self { - UncheckedJustification(rhododendron::UncheckedJustification { - digest, - signatures, - round_number, - }) - } -} - -impl UncheckedJustification { - /// Decode a justification. - pub fn decode_justification(justification: Justification) -> Option { - let inner: rhododendron::UncheckedJustification<_, _> = Decode::decode(&mut &justification[..])?; - - Some(UncheckedJustification(inner)) - } -} - -impl Into for UncheckedJustification { - fn into(self) -> Justification { - self.0.encode() - } -} - -impl From> for UncheckedJustification { - fn from(inner: rhododendron::UncheckedJustification) -> Self { - UncheckedJustification(inner) - } -} - -/// Result of a committed round of BFT -pub type Committed = rhododendron::Committed::Hash, LocalizedSignature>; - -/// Communication between BFT participants. -pub type Communication = rhododendron::Communication::Hash, AuthorityId, LocalizedSignature>; - -/// Misbehavior observed from BFT participants. -pub type Misbehavior = rhododendron::Misbehavior; - -/// Shared offline validator tracker. -pub type SharedOfflineTracker = Arc>; - -/// A proposer for a rhododendron instance. This must implement the base proposer logic. -pub trait LocalProposer: BaseProposer { - /// Import witnessed rhododendron misbehavior. - fn import_misbehavior(&self, misbehavior: Vec<(AuthorityId, Misbehavior)>); - - /// Determine the proposer for a given round. This should be a deterministic function - /// with consistent results across all authorities. - fn round_proposer(&self, round_number: u32, authorities: &[AuthorityId]) -> AuthorityId; - - /// Hook called when a BFT round advances without a proposal. - fn on_round_end(&self, _round_number: u32, _proposed: bool) { } -} - - -/// Build new blocks. -pub trait BlockBuilder { - /// Push an extrinsic onto the block. Fails if the extrinsic is invalid. - fn push_extrinsic(&mut self, extrinsic: ::Extrinsic) -> Result<(), Error>; -} - -/// Local client abstraction for the consensus. -pub trait AuthoringApi: - Send - + Sync - + BlockBuilderAPI<::Block, InherentData, Error=::Error> - + Core<::Block, AuthorityId, Error=::Error> - + OldTxQueue<::Block, Error=::Error> -{ - /// The block used for this API type. - type Block: BlockT; - /// The error used by this API type. - type Error: std::error::Error; - - /// Build a block on top of the given, with inherent extrinsics pre-pushed. - fn build_block) -> ()>( - &self, - at: &BlockId, - inherent_data: InherentData, - build_ctx: F, - ) -> Result; -} - -/// A long-lived network which can create BFT message routing processes on demand. -pub trait Network { - /// The block used for this API type. - type Block: BlockT; - /// The input stream of BFT messages. Should never logically conclude. - type Input: Stream,Error=Error>; - /// The output sink of BFT messages. Messages sent here should eventually pass to all - /// current authorities. - type Output: Sink,SinkError=Error>; - - /// Instantiate input and output streams. - fn communication_for( - &self, - validators: &[AuthorityId], - local_id: AuthorityId, - parent_hash: ::Hash, - task_executor: TaskExecutor - ) -> (Self::Input, Self::Output); -} - - -// caches the round number to start at if we end up with BFT consensus on the same -// parent hash more than once (happens if block is bad). -// -// this will force a committed but locally-bad block to be considered analogous to -// a round advancement vote. -#[derive(Debug)] -struct RoundCache { - hash: Option, - start_round: u32, -} - -/// Instance of BFT agreement. -struct BftInstance { - key: Arc, - authorities: Vec, - parent_hash: B::Hash, - round_timeout_multiplier: u64, - cache: Arc>>, - proposer: P, -} - -impl> BftInstance - where - B: Clone + Eq, - B::Hash: ::std::hash::Hash - -{ - fn round_timeout_duration(&self, round: u32) -> Duration { - // 2^(min(6, x/8)) * 10 - // Grows exponentially starting from 10 seconds, capped at 640 seconds. - const ROUND_INCREMENT_STEP: u32 = 8; - - let round = round / ROUND_INCREMENT_STEP; - let round = ::std::cmp::min(6, round); - - let timeout = 1u64.checked_shl(round) - .unwrap_or_else(u64::max_value) - .saturating_mul(self.round_timeout_multiplier); - - Duration::from_secs(timeout) - } - - fn update_round_cache(&self, current_round: u32) { - let mut cache = self.cache.lock(); - if cache.hash.as_ref() == Some(&self.parent_hash) { - cache.start_round = current_round + 1; - } - } -} - -impl> rhododendron::Context for BftInstance - where - B: Clone + Eq, - B::Hash: ::std::hash::Hash, -{ - type Error = P::Error; - type AuthorityId = AuthorityId; - type Digest = B::Hash; - type Signature = LocalizedSignature; - type Candidate = B; - type RoundTimeout = Box>; - type CreateProposal = ::Future; - type EvaluateProposal = ::Future; - - fn local_id(&self) -> AuthorityId { - self.key.public().into() - } - - fn proposal(&self) -> Self::CreateProposal { - self.proposer.propose().into_future() - } - - fn candidate_digest(&self, proposal: &B) -> B::Hash { - proposal.hash() - } - - fn sign_local(&self, message: RhdMessage) -> LocalizedMessage { - sign_message(message, &*self.key, self.parent_hash.clone()) - } - - fn round_proposer(&self, round: u32) -> AuthorityId { - self.proposer.round_proposer(round, &self.authorities[..]) - } - - fn proposal_valid(&self, proposal: &B) -> Self::EvaluateProposal { - self.proposer.evaluate(proposal).into_future() - } - - fn begin_round_timeout(&self, round: u32) -> Self::RoundTimeout { - let timeout = self.round_timeout_duration(round); - let fut = Delay::new(Instant::now() + timeout) - .map_err(|e| Error::from(CommonErrorKind::FaultyTimer(e))) - .map_err(Into::into); - - Box::new(fut) - } - - fn on_advance_round( - &self, - accumulator: &rhododendron::Accumulator, - round: u32, - next_round: u32, - reason: AdvanceRoundReason, - ) { - use std::collections::HashSet; - - let collect_pubkeys = |participants: HashSet<&Self::AuthorityId>| participants.into_iter() - .map(|p| ::ed25519::Public::from_raw(p.0)) - .collect::>(); - - let round_timeout = self.round_timeout_duration(next_round); - debug!(target: "rhd", "Advancing to round {} from {}", next_round, round); - debug!(target: "rhd", "Participating authorities: {:?}", - collect_pubkeys(accumulator.participants())); - debug!(target: "rhd", "Voting authorities: {:?}", - collect_pubkeys(accumulator.voters())); - debug!(target: "rhd", "Round {} should end in at most {} seconds from now", next_round, round_timeout.as_secs()); - - self.update_round_cache(next_round); - - if let AdvanceRoundReason::Timeout = reason { - self.proposer.on_round_end(round, accumulator.proposal().is_some()); - } - } -} - -/// A future that resolves either when canceled (witnessing a block from the network at same height) -/// or when agreement completes. -pub struct BftFuture where - B: Block + Clone + Eq, - B::Hash: ::std::hash::Hash, - P: LocalProposer, - P: BaseProposer, - InStream: Stream, Error=Error>, - OutSink: Sink, SinkError=Error>, -{ - inner: rhododendron::Agreement, InStream, OutSink>, - status: Arc, - cancel: oneshot::Receiver<()>, - import: Arc, -} - -impl Future for BftFuture where - B: Block + Clone + Eq, - B::Hash: ::std::hash::Hash, - P: LocalProposer, - P: BaseProposer, - I: BlockImport, - InStream: Stream, Error=Error>, - OutSink: Sink, SinkError=Error>, -{ - type Item = (); - type Error = (); - - fn poll(&mut self) -> ::futures::Poll<(), ()> { - // service has canceled the future. bail - let cancel = match self.cancel.poll() { - Ok(Async::Ready(())) | Err(_) => true, - Ok(Async::NotReady) => false, - }; - - let committed = match self.inner.poll().map_err(|_| ()) { - Ok(Async::Ready(x)) => x, - Ok(Async::NotReady) => - return Ok(if cancel { Async::Ready(()) } else { Async::NotReady }), - Err(()) => return Err(()), - }; - - // if something was committed, the round leader must have proposed. - self.inner.context().proposer.on_round_end(committed.round_number, true); - - // If we didn't see the proposal (very unlikely), - // we will get the block from the network later. - if let Some(justified_block) = committed.candidate { - let hash = justified_block.hash(); - info!(target: "rhd", "Importing block #{} ({}) directly from BFT consensus", - justified_block.header().number(), hash); - let just: Justification = UncheckedJustification(committed.justification.uncheck()).into(); - let (header, body) = justified_block.deconstruct(); - let import_block = BlockImportParams { - origin: BlockOrigin::ConsensusBroadcast, - header: header, - justification: Some(just), - body: Some(body), - finalized: true, - post_digests: Default::default(), - auxiliary: Default::default() - }; - - let new_status = match self.import.import_block(import_block, None) { - Err(e) => { - warn!(target: "rhd", "Error importing block {:?} in round #{}: {:?}", - hash, committed.round_number, e); - status::BAD - } - Ok(ImportResult::KnownBad) => { - warn!(target: "rhd", "{:?} was bad block agreed on in round #{}", - hash, committed.round_number); - status::BAD - } - _ => status::GOOD - }; - - self.status.store(new_status, Ordering::Release); - - } else { - // assume good unless we received the proposal. - self.status.store(status::GOOD, Ordering::Release); - } - - self.inner.context().update_round_cache(committed.round_number); - - Ok(Async::Ready(())) - } -} - -impl Drop for BftFuture where - B: Block + Clone + Eq, - B::Hash: ::std::hash::Hash, - P: LocalProposer, - P: BaseProposer, - InStream: Stream, Error=Error>, - OutSink: Sink, SinkError=Error>, -{ - fn drop(&mut self) { - let misbehavior = self.inner.drain_misbehavior().collect::>(); - self.inner.context().proposer.import_misbehavior(misbehavior); - } -} - -struct AgreementHandle { - status: Arc, - send_cancel: Option>, -} - -impl AgreementHandle { - fn status(&self) -> usize { - self.status.load(Ordering::Acquire) - } -} - -impl Drop for AgreementHandle { - fn drop(&mut self) { - if let Some(sender) = self.send_cancel.take() { - let _ = sender.send(()); - } - } -} - -/// The BftService kicks off the agreement process on top of any blocks it -/// is notified of. -/// -/// This assumes that it is being run in the context of a tokio runtime. -pub struct BftService { - client: Arc, - live_agreement: Mutex>, - round_cache: Arc>>, - round_timeout_multiplier: u64, - key: Arc, - factory: P, -} - -impl BftService - where - B: Block + Clone + Eq, - P: Environment, - P::Proposer: LocalProposer, - P::Proposer: BaseProposer, - I: BlockImport + Authorities, -{ - /// Create a new service instance. - pub fn new(client: Arc, key: Arc, factory: P) -> BftService { - BftService { - client: client, - live_agreement: Mutex::new(None), - round_cache: Arc::new(Mutex::new(RoundCache { - hash: None, - start_round: 0, - })), - round_timeout_multiplier: 10, - key: key, - factory, - } - } - - /// Get the local Authority ID. - pub fn local_id(&self) -> AuthorityId { - self.key.public().into() - } - - /// Signal that a valid block with the given header has been imported. - /// Provide communication streams that are localized to this block. - /// It's recommended to use the communication primitives provided by this - /// module for signature checking and decoding. See `CheckedStream` and - /// `SigningSink` for more details. - /// - /// Messages received on the stream that don't match the expected format - /// will be dropped. - /// - /// If the local signing key is an authority, this will begin the consensus process to build a - /// block on top of it. If the executor fails to run the future, an error will be returned. - /// Returns `None` if the agreement on the block with given parent is already in progress. - pub fn build_upon(&self, header: &B::Header, input: In, output: Out) - -> Result>::Proposer, - I, - In, - Out, - >>, P::Error> - where - In: Stream, Error=Error>, - Out: Sink, SinkError=Error>, - { - let hash = header.hash(); - - let mut live_agreement = self.live_agreement.lock(); - let can_build = live_agreement.as_ref() - .map_or(true, |x| self.can_build_on_inner(header, x)); - - if !can_build { - return Ok(None) - } - - let authorities = self.client.authorities(&BlockId::Hash(hash.clone())) - .map_err(|e| CommonErrorKind::Other(Box::new(e)).into())?; - - let n = authorities.len(); - let max_faulty = max_faulty_of(n); - trace!(target: "rhd", "Initiating agreement on top of #{}, {:?}", header.number(), hash); - trace!(target: "rhd", "max_faulty_of({})={}", n, max_faulty); - - let local_id = self.local_id(); - - if !authorities.contains(&local_id) { - // cancel current agreement - live_agreement.take(); - Err(CommonErrorKind::InvalidAuthority(local_id).into())?; - } - - let proposer = self.factory.init(header, &authorities, self.key.clone())?; - - let bft_instance = BftInstance { - proposer, - parent_hash: hash.clone(), - cache: self.round_cache.clone(), - round_timeout_multiplier: self.round_timeout_multiplier, - key: self.key.clone(), - authorities: authorities, - }; - - let mut agreement = rhododendron::agree( - bft_instance, - n, - max_faulty, - input, - output, - ); - - // fast forward round number if necessary. - { - let mut cache = self.round_cache.lock(); - trace!(target: "rhd", "Round cache: {:?}", &*cache); - if cache.hash.as_ref() == Some(&hash) { - trace!(target: "rhd", "Fast-forwarding to round {}", cache.start_round); - let start_round = cache.start_round; - cache.start_round += 1; - - drop(cache); - agreement.fast_forward(start_round); - } else { - *cache = RoundCache { - hash: Some(hash.clone()), - start_round: 1, - }; - } - } - - let status = Arc::new(AtomicUsize::new(status::LIVE)); - let (tx, rx) = oneshot::channel(); - - // cancel current agreement. - *live_agreement = Some((header.clone(), AgreementHandle { - send_cancel: Some(tx), - status: status.clone(), - })); - - Ok(Some(BftFuture { - inner: agreement, - status: status, - cancel: rx, - import: self.client.clone(), - })) - } - - /// Cancel current agreement if any. - pub fn cancel_agreement(&self) { - self.live_agreement.lock().take(); - } - - /// Whether we can build using the given header. - pub fn can_build_on(&self, header: &B::Header) -> bool { - self.live_agreement.lock().as_ref() - .map_or(true, |x| self.can_build_on_inner(header, x)) - } - - /// Get a reference to the underlying client. - pub fn client(&self) -> &I { &*self.client } - - fn can_build_on_inner(&self, header: &B::Header, live: &(B::Header, AgreementHandle)) -> bool { - let hash = header.hash(); - let &(ref live_header, ref handle) = live; - match handle.status() { - _ if *header != *live_header && *live_header.parent_hash() != hash => true, // can always follow with next block. - status::BAD => hash == live_header.hash(), // bad block can be re-agreed on. - _ => false, // canceled won't appear since we overwrite the handle before returning. - } - } -} - -/// Stream that decodes rhododendron messages and checks signatures. -/// -/// This stream is localized to a specific parent block-hash, as all messages -/// will be signed in a way that accounts for it. When using this with -/// `BftService::build_upon`, the user should take care to use the same hash as for that. -pub struct CheckedStream { - inner: S, - local_id: AuthorityId, - authorities: Vec, - parent_hash: B::Hash, -} - -impl CheckedStream { - /// Construct a new checked stream. - pub fn new( - inner: S, - local_id: AuthorityId, - authorities: Vec, - parent_hash: B::Hash, - ) -> Self { - CheckedStream { - inner, - local_id, - authorities, - parent_hash, - } - } -} - -impl>> Stream for CheckedStream - where S::Error: From, -{ - type Item = Communication; - type Error = S::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - use rhododendron::LocalizedMessage as RhdLocalized; - loop { - match self.inner.poll()? { - Async::Ready(Some(item)) => { - let comms: Communication = match Decode::decode(&mut &item[..]) { - Some(x) => x, - None => continue, - }; - - match comms { - RhdCommunication::Auxiliary(prepare_just) => { - let checked = check_prepare_justification::( - &self.authorities, - self.parent_hash, - UncheckedJustification(prepare_just.uncheck()), - ); - if let Ok(checked) = checked { - return Ok(Async::Ready( - Some(RhdCommunication::Auxiliary(checked.0)) - )); - } - } - RhdCommunication::Consensus(RhdLocalized::Propose(p)) => { - if p.sender == self.local_id { continue } - - let checked = check_proposal::( - &self.authorities, - &self.parent_hash, - &p, - ); - - if let Ok(()) = checked { - return Ok(Async::Ready( - Some(RhdCommunication::Consensus(RhdLocalized::Propose(p))) - )); - } - } - RhdCommunication::Consensus(RhdLocalized::Vote(v)) => { - if v.sender == self.local_id { continue } - - let checked = check_vote::( - &self.authorities, - &self.parent_hash, - &v, - ); - - if let Ok(()) = checked { - return Ok(Async::Ready( - Some(RhdCommunication::Consensus(RhdLocalized::Vote(v))) - )); - } - } - } - } - Async::Ready(None) => return Ok(Async::Ready(None)), - Async::NotReady => return Ok(Async::NotReady), - } - } - } -} - -/// Given a total number of authorities, yield the maximum faulty that would be allowed. -/// This will always be under 1/3. -pub fn max_faulty_of(n: usize) -> usize { - n.saturating_sub(1) / 3 -} - -/// Given a total number of authorities, yield the minimum required signatures. -/// This will always be over 2/3. -pub fn bft_threshold(n: usize) -> usize { - n - max_faulty_of(n) -} - -// actions in the signature scheme. -#[derive(Encode)] -enum Action { - Prepare(u32, H), - Commit(u32, H), - AdvanceRound(u32), - // signatures of header hash and full candidate are both included. - ProposeHeader(u32, H), - Propose(u32, B), -} - -// encode something in a way which is localized to a specific parent-hash -fn localized_encode(parent_hash: H, value: E) -> Vec { - (parent_hash, value).encode() -} - -fn check_justification_signed_message( - authorities: &[AuthorityId], - message: &[u8], - just: UncheckedJustification) --> Result, UncheckedJustification> { - // additional error information could be useful here. - just.0.check(authorities.len() - max_faulty_of(authorities.len()), |_, _, sig| { - let auth_id = sig.signer.clone().into(); - if !authorities.contains(&auth_id) { return None } - - if ed25519::Pair::verify(&sig.signature, message, &sig.signer) { - Some(sig.signer.0) - } else { - None - } - }).map(RhdJustification).map_err(UncheckedJustification) -} - -/// Check a full justification for a header hash. -/// Provide all valid authorities. -/// -/// On failure, returns the justification back. -pub fn check_justification( - authorities: &[AuthorityId], - parent: B::Hash, - just: UncheckedJustification -) -> Result, UncheckedJustification> { - let vote: Action = Action::Commit(just.0.round_number as u32, just.0.digest.clone()); - let message = localized_encode(parent, vote); - - check_justification_signed_message(authorities, &message[..], just) -} - -/// Check a prepare justification for a header hash. -/// Provide all valid authorities. -/// -/// On failure, returns the justification back. -pub fn check_prepare_justification( - authorities: &[AuthorityId], - parent: B::Hash, - just: UncheckedJustification -) -> Result, UncheckedJustification> { - let vote: Action = Action::Prepare(just.0.round_number as u32, just.0.digest.clone()); - let message = localized_encode(parent, vote); - - check_justification_signed_message(authorities, &message[..], just).map(|e| PrepareJustification(e.0)) -} - -/// Check proposal message signatures and authority. -/// Provide all valid authorities. -pub fn check_proposal( - authorities: &[AuthorityId], - parent_hash: &B::Hash, - propose: &rhododendron::LocalizedProposal) - -> Result<(), Error> -{ - if !authorities.contains(&propose.sender) { - return Err(CommonErrorKind::InvalidAuthority(propose.sender.into()).into()); - } - - let action_header = Action::ProposeHeader(propose.round_number as u32, propose.digest.clone()); - let action_propose = Action::Propose(propose.round_number as u32, propose.proposal.clone()); - check_action::(action_header, parent_hash, &propose.digest_signature)?; - check_action::(action_propose, parent_hash, &propose.full_signature) -} - -/// Check vote message signatures and authority. -/// Provide all valid authorities. -pub fn check_vote( - authorities: &[AuthorityId], - parent_hash: &B::Hash, - vote: &rhododendron::LocalizedVote) - -> Result<(), Error> -{ - if !authorities.contains(&vote.sender) { - return Err(CommonErrorKind::InvalidAuthority(vote.sender.into()).into()); - } - - let action = match vote.vote { - rhododendron::Vote::Prepare(r, ref h) => Action::Prepare(r as u32, h.clone()), - rhododendron::Vote::Commit(r, ref h) => Action::Commit(r as u32, h.clone()), - rhododendron::Vote::AdvanceRound(r) => Action::AdvanceRound(r as u32), - }; - check_action::(action, parent_hash, &vote.signature) -} - -fn check_action( - action: Action, - parent_hash: &B::Hash, - sig: &LocalizedSignature -) -> Result<(), Error> { - let message = localized_encode(*parent_hash, action); - if ed25519::Pair::verify(&sig.signature, &message, &sig.signer) { - Ok(()) - } else { - Err(CommonErrorKind::InvalidSignature(sig.signature.into(), sig.signer.clone().into()).into()) - } -} - -/// Sign a BFT message with the given key. -pub fn sign_message( - message: RhdMessage, - key: &ed25519::Pair, - parent_hash: B::Hash -) -> LocalizedMessage { - let signer = key.public(); - - let sign_action = |action: Action| { - let to_sign = localized_encode(parent_hash.clone(), action); - - LocalizedSignature { - signer: signer.clone(), - signature: key.sign(&to_sign), - } - }; - - match message { - RhdMessage::Propose(r, proposal) => { - let header_hash = proposal.hash(); - let action_header = Action::ProposeHeader(r as u32, header_hash.clone()); - let action_propose = Action::Propose(r as u32, proposal.clone()); - - rhododendron::LocalizedMessage::Propose(rhododendron::LocalizedProposal { - round_number: r, - proposal, - digest: header_hash, - sender: signer.clone().into(), - digest_signature: sign_action(action_header), - full_signature: sign_action(action_propose), - }) - } - RhdMessage::Vote(vote) => rhododendron::LocalizedMessage::Vote({ - let action = match vote { - RhdMessageVote::Prepare(r, h) => Action::Prepare(r as u32, h), - RhdMessageVote::Commit(r, h) => Action::Commit(r as u32, h), - RhdMessageVote::AdvanceRound(r) => Action::AdvanceRound(r as u32), - }; - - rhododendron::LocalizedVote { - vote: vote, - sender: signer.clone().into(), - signature: sign_action(action), - } - }) - } -} - - -impl<'a, B, E, Block> BlockBuilder for client::block_builder::BlockBuilder<'a, B, E, Block, Blake2Hasher> where - B: client::backend::Backend + Send + Sync + 'static, - E: CallExecutor + Send + Sync + Clone + 'static, - Block: BlockT -{ - fn push_extrinsic(&mut self, extrinsic: ::Extrinsic) -> Result<(), Error> { - client::block_builder::BlockBuilder::push(self, extrinsic).map_err(Into::into) - } -} - -impl<'a, B, E, Block> AuthoringApi for SubstrateClient where - B: client::backend::Backend + Send + Sync + 'static, - E: CallExecutor + Send + Sync + Clone + 'static, - Block: BlockT, -{ - type Block = Block; - type Error = client::error::Error; - - fn build_block) -> ()>( - &self, - at: &BlockId, - inherent_data: InherentData, - mut build_ctx: F, - ) -> Result { - let runtime_version = self.runtime_version_at(at)?; - - let mut block_builder = self.new_block_at(at)?; - if runtime_version.has_api(*b"blkbuild", 1) { - for inherent in self.inherent_extrinsics(at, &inherent_data)? { - block_builder.push(inherent)?; - } - } - - build_ctx(&mut block_builder); - - block_builder.bake().map_err(Into::into) - } -} - - -/// Proposer factory. -pub struct ProposerFactory where - C: AuthoringApi, - A: txpool::ChainApi, -{ - /// The client instance. - pub client: Arc, - /// The transaction pool. - pub transaction_pool: Arc>, - /// The backing network handle. - pub network: N, - /// handle to remote task executor - pub handle: TaskExecutor, - /// Offline-tracker. - pub offline: SharedOfflineTracker, - /// Force delay in evaluation this long. - pub force_delay: u64, -} - -impl consensus::Environment<::Block> for ProposerFactory where - N: Network::Block>, - C: AuthoringApi + BlockNumberToHash, - A: txpool::ChainApi::Block>, - // <::Block as BlockT>::Hash: - // Into<::Hash> + PartialEq + Into, - Error: From<::Error> -{ - type Proposer = Proposer; - type Error = Error; - - fn init( - &self, - parent_header: &<::Block as BlockT>::Header, - authorities: &[AuthorityId], - sign_with: Arc, - ) -> Result { - use runtime_primitives::traits::Hash as HashT; - let parent_hash = parent_header.hash(); - - let id = BlockId::hash(parent_hash); - let random_seed = self.client.random_seed(&id)?; - let random_seed = <<::Block as BlockT>::Header as HeaderT> - ::Hashing::hash(random_seed.as_ref()); - - let validators = self.client.validators(&id)?; - self.offline.write().note_new_block(&validators[..]); - - info!("Starting consensus session on top of parent {:?}", parent_hash); - - let local_id = sign_with.public().0.into(); - let (input, output) = self.network.communication_for( - authorities, - local_id, - parent_hash.clone(), - self.handle.clone(), - ); - let now = Instant::now(); - let proposer = Proposer { - client: self.client.clone(), - start: now, - local_key: sign_with, - parent_hash, - parent_id: id, - parent_number: *parent_header.number(), - random_seed, - transaction_pool: self.transaction_pool.clone(), - offline: self.offline.clone(), - validators, - minimum_timestamp: current_timestamp() + self.force_delay, - network: self.network.clone() - }; - - Ok(proposer) - } -} - -/// The proposer logic. -pub struct Proposer { - client: Arc, - start: Instant, - local_key: Arc, - parent_hash: <::Block as BlockT>::Hash, - parent_id: BlockId<::Block>, - parent_number: <<::Block as BlockT>::Header as HeaderT>::Number, - random_seed: <::Block as BlockT>::Hash, - transaction_pool: Arc>, - offline: SharedOfflineTracker, - validators: Vec, - minimum_timestamp: u64, - network: N, -} - -impl Proposer { - fn primary_index(&self, round_number: u32, len: usize) -> usize { - use primitives::uint::U256; - - let big_len = U256::from(len); - let offset = U256::from_big_endian(self.random_seed.as_ref()) % big_len; - let offset = offset.low_u64() as usize + round_number as usize; - offset % len - } -} - -impl BaseProposer<::Block> for Proposer where - C: AuthoringApi + BlockNumberToHash, - A: txpool::ChainApi::Block>, - <::Block as BlockT>::Hash: - Into<::Hash> + PartialEq + Into, - error::Error: From<::Error> -{ - type Create = Result<::Block, Error>; - type Error = Error; - type Evaluate = Box>; - - fn propose(&self) -> Self::Create { - use runtime_primitives::traits::BlakeTwo256; - - const MAX_VOTE_OFFLINE_SECONDS: Duration = Duration::from_secs(60); - - let timestamp = ::std::cmp::max(self.minimum_timestamp, current_timestamp()); - - let elapsed_since_start = self.start.elapsed(); - let offline_indices = if elapsed_since_start > MAX_VOTE_OFFLINE_SECONDS { - Vec::new() - } else { - self.offline.read().reports(&self.validators[..]) - }; - - if !offline_indices.is_empty() { - info!( - "Submitting offline validators {:?} for slash-vote", - offline_indices.iter().map(|&i| self.validators[i as usize]).collect::>(), - ) - } - - let inherent_data = InherentData { - timestamp, - offline_indices, - }; - - let block = self.client.build_block( - &self.parent_id, - inherent_data, - |block_builder| { - let mut unqueue_invalid = Vec::new(); - self.transaction_pool.ready(|pending_iterator| { - let mut pending_size = 0; - for pending in pending_iterator { - let encoded_size = pending.data.encode().len(); - if pending_size + encoded_size >= MAX_TRANSACTIONS_SIZE { break } - - match block_builder.push_extrinsic(pending.data.clone()) { - Ok(()) => { - pending_size += encoded_size; - } - Err(e) => { - trace!(target: "transaction-pool", "Invalid transaction: {}", e); - unqueue_invalid.push(pending.hash.clone()); - } - } - } - }); - - self.transaction_pool.remove_invalid(&unqueue_invalid); - })?; - - info!("Proposing block [number: {}; hash: {}; parent_hash: {}; extrinsics: [{}]]", - block.header().number(), - <::Block as BlockT>::Hash::from(block.header().hash()), - block.header().parent_hash(), - block.extrinsics().iter() - .map(|xt| format!("{}", BlakeTwo256::hash_of(xt))) - .collect::>() - .join(", ") - ); - - let substrate_block = Decode::decode(&mut block.encode().as_slice()) - .expect("blocks are defined to serialize to substrate blocks correctly; qed"); - - assert!(evaluation::evaluate_initial( - &substrate_block, - &self.parent_hash, - self.parent_number, - ).is_ok()); - - Ok(substrate_block) - } - - fn evaluate(&self, unchecked_proposal: &::Block) -> Self::Evaluate { - debug!(target: "rhd", "evaluating block on top of parent ({}, {:?})", self.parent_number, self.parent_hash); - - // do initial serialization and structural integrity checks. - if let Err(e) = evaluation::evaluate_initial( - unchecked_proposal, - &self.parent_hash, - self.parent_number, - ) { - debug!(target: "rhd", "Invalid proposal: {:?}", e); - return Box::new(future::ok(false)); - }; - - let current_timestamp = current_timestamp(); - let inherent = InherentData::new( - current_timestamp, - self.offline.read().reports(&self.validators) - ); - let proposed_timestamp = match self.client.check_inherents( - &self.parent_id, - &unchecked_proposal, - &inherent, - ) { - Ok(Ok(())) => None, - Ok(Err(BlockBuilderError::ValidAtTimestamp(timestamp))) => Some(timestamp), - Ok(Err(e)) => { - debug!(target: "rhd", "Invalid proposal (check_inherents): {:?}", e); - return Box::new(future::ok(false)); - }, - Err(e) => { - debug!(target: "rhd", "Could not call into runtime: {:?}", e); - return Box::new(future::ok(false)); - } - }; - - let vote_delays = { - - // the duration until the given timestamp is current - let proposed_timestamp = ::std::cmp::max(self.minimum_timestamp, proposed_timestamp.unwrap_or(0)); - let timestamp_delay = if proposed_timestamp > current_timestamp { - let delay_s = proposed_timestamp - current_timestamp; - debug!(target: "rhd", "Delaying evaluation of proposal for {} seconds", delay_s); - Some(Instant::now() + Duration::from_secs(delay_s)) - } else { - None - }; - - match timestamp_delay { - Some(duration) => future::Either::A( - Delay::new(duration).map_err(|e| ErrorKind::Timer(e).into()) - ), - None => future::Either::B(future::ok(())), - } - }; - - // evaluate whether the block is actually valid. - // it may be better to delay this until the delays are finished - let evaluated = match self.client.execute_block(&self.parent_id, &unchecked_proposal.clone()) - .map_err(Error::from) { - Ok(()) => Ok(true), - Err(err) => match err.kind() { - error::ErrorKind::Client(client::error::ErrorKind::Execution(_)) => Ok(false), - _ => Err(err) - } - }; - - let future = future::result(evaluated).and_then(move |good| { - let end_result = future::ok(good); - if good { - // delay a "good" vote. - future::Either::A(vote_delays.and_then(|_| end_result)) - } else { - // don't delay a "bad" evaluation. - future::Either::B(end_result) - } - }); - - Box::new(future) as Box<_> - } -} - -impl LocalProposer<::Block> for Proposer where - C: AuthoringApi + BlockNumberToHash, - A: txpool::ChainApi::Block>, - Self: BaseProposer<::Block, Error=Error>, - <::Block as BlockT>::Hash: - Into<::Hash> + PartialEq + Into, - error::Error: From<::Error> -{ - - fn round_proposer(&self, round_number: u32, authorities: &[AuthorityId]) -> AuthorityId { - let offset = self.primary_index(round_number, authorities.len()); - let proposer = authorities[offset as usize].clone(); - trace!(target: "rhd", "proposer for round {} is {}", round_number, proposer); - - proposer - } - - fn import_misbehavior( - &self, - _misbehavior: Vec<(AuthorityId, Misbehavior<<::Block as BlockT>::Hash>)> - ) { - use rhododendron::Misbehavior as GenericMisbehavior; - use runtime_primitives::bft::{MisbehaviorKind, MisbehaviorReport}; - use node_runtime::{Call, UncheckedExtrinsic, ConsensusCall}; - - let mut next_index = { - let local_id = self.local_key.public().0; - let cur_index = self.transaction_pool.cull_and_get_pending(&BlockId::hash(self.parent_hash), |pending| pending - .filter(|tx| tx.verified.sender == local_id) - .last() - .map(|tx| Ok(tx.verified.index())) - .unwrap_or_else(|| self.client.account_nonce(&self.parent_id, local_id)) - .map_err(Error::from) - ); - - match cur_index { - Ok(cur_index) => cur_index + 1, - Err(e) => { - warn!(target: "consensus", "Error computing next transaction index: {:?}", e); - return; - } - } - }; - - for (target, misbehavior) in misbehavior { - let report = MisbehaviorReport { - parent_hash: self.parent_hash.into(), - parent_number: self.parent_number.saturated_into::(), - target, - misbehavior: match misbehavior { - GenericMisbehavior::ProposeOutOfTurn(_, _, _) => continue, - GenericMisbehavior::DoublePropose(_, _, _) => continue, - GenericMisbehavior::DoublePrepare(round, (h1, s1), (h2, s2)) - => MisbehaviorKind::BftDoublePrepare(round as u32, (h1.into(), s1.signature), (h2.into(), s2.signature)), - GenericMisbehavior::DoubleCommit(round, (h1, s1), (h2, s2)) - => MisbehaviorKind::BftDoubleCommit(round as u32, (h1.into(), s1.signature), (h2.into(), s2.signature)), - } - }; - let payload = ( - next_index, - Call::Consensus(ConsensusCall::report_misbehavior(report)), - Era::immortal(), - self.client.genesis_hash() - ); - let signature = self.local_key.sign(&payload.encode()).into(); - next_index += 1; - - let local_id = self.local_key.public().0.into(); - let extrinsic = UncheckedExtrinsic { - signature: Some((node_runtime::RawAddress::Id(local_id), signature, payload.0, Era::immortal())), - function: payload.1, - }; - let uxt: <::Block as BlockT>::Extrinsic = Decode::decode( - &mut extrinsic.encode().as_slice()).expect("Encoded extrinsic is valid"); - let hash = BlockId::<::Block>::hash(self.parent_hash); - if let Err(e) = self.transaction_pool.submit_one(&hash, uxt) { - warn!("Error importing misbehavior report: {:?}", e); - } - } - } - - fn on_round_end(&self, round_number: u32, was_proposed: bool) { - let primary_validator = self.validators[ - self.primary_index(round_number, self.validators.len()) - ]; - - // alter the message based on whether we think the empty proposer was forced to skip the round. - // this is determined by checking if our local validator would have been forced to skip the round. - if !was_proposed { - let public = ed25519::Public::from_raw(primary_validator.0); - info!( - "Potential Offline Validator: {} failed to propose during assigned slot: {}", - public, - round_number, - ); - } - - self.offline.write().note_round_end(primary_validator, was_proposed); - } -} - -fn current_timestamp() -> u64 { - time::SystemTime::now().duration_since(time::UNIX_EPOCH) - .expect("now always later than unix epoch; qed") - .as_secs() -} - - -#[cfg(test)] -mod tests { - use super::*; - use std::collections::HashSet; - use std::marker::PhantomData; - - use runtime_primitives::testing::{Block as GenericTestBlock, Header as TestHeader}; - use primitives::H256; - use keyring::Ed25519Keyring; - - type TestBlock = GenericTestBlock<()>; - - struct FakeClient { - authorities: Vec, - imported_heights: Mutex> - } - - impl BlockImport for FakeClient { - type Error = Error; - - fn import_block(&self, - block: BlockImportParams, - _new_authorities: Option> - ) -> Result { - assert!(self.imported_heights.lock().insert(block.header.number)); - Ok(ImportResult::Queued) - } - } - - impl Authorities for FakeClient { - type Error = Error; - - fn authorities(&self, _at: &BlockId) -> Result, Self::Error> { - Ok(self.authorities.clone()) - } - } - - // "black hole" output sink. - struct Comms(::std::marker::PhantomData); - - impl Sink for Comms { - type SinkItem = Communication; - type SinkError = E; - - fn start_send(&mut self, _item: Communication) -> ::futures::StartSend, E> { - Ok(::futures::AsyncSink::Ready) - } - - fn poll_complete(&mut self) -> ::futures::Poll<(), E> { - Ok(Async::Ready(())) - } - } - - impl Stream for Comms { - type Item = Communication; - type Error = E; - - fn poll(&mut self) -> ::futures::Poll, Self::Error> { - Ok(::futures::Async::NotReady) - } - } - - struct DummyFactory; - struct DummyProposer(u64); - - impl Environment for DummyFactory { - type Proposer = DummyProposer; - type Error = Error; - - fn init(&self, parent_header: &TestHeader, _authorities: &[AuthorityId], _sign_with: Arc) - -> Result - { - Ok(DummyProposer(parent_header.number + 1)) - } - } - - impl BaseProposer for DummyProposer { - type Error = Error; - type Create = Result; - type Evaluate = Result; - - fn propose(&self) -> Result { - - Ok(TestBlock { - header: from_block_number(self.0), - extrinsics: Default::default() - }) - } - - fn evaluate(&self, proposal: &TestBlock) -> Result { - Ok(proposal.header.number == self.0) - } - } - - impl LocalProposer for DummyProposer { - fn import_misbehavior(&self, _misbehavior: Vec<(AuthorityId, Misbehavior)>) {} - - fn round_proposer(&self, round_number: u32, authorities: &[AuthorityId]) -> AuthorityId { - authorities[(round_number as usize) % authorities.len()].clone() - } - } - - fn make_service(client: FakeClient) - -> BftService - { - BftService { - client: Arc::new(client), - live_agreement: Mutex::new(None), - round_cache: Arc::new(Mutex::new(RoundCache { - hash: None, - start_round: 0, - })), - round_timeout_multiplier: 10, - key: Arc::new(Ed25519Keyring::One.into()), - factory: DummyFactory - } - } - - fn sign_vote(vote: rhododendron::Vote, key: &ed25519::Pair, parent_hash: H256) -> LocalizedSignature { - match sign_message::(vote.into(), key, parent_hash) { - rhododendron::LocalizedMessage::Vote(vote) => vote.signature, - _ => panic!("signing vote leads to signed vote"), - } - } - - fn from_block_number(num: u64) -> TestHeader { - TestHeader::new( - num, - Default::default(), - Default::default(), - Default::default(), - Default::default(), - ) - } - - #[test] - fn future_gets_preempted() { - let client = FakeClient { - authorities: vec![ - Ed25519Keyring::One.into(), - Ed25519Keyring::Two.into(), - Ed25519Keyring::Alice.into(), - Ed25519Keyring::Eve.into(), - ], - imported_heights: Mutex::new(HashSet::new()), - }; - - let service = make_service(client); - - let first = from_block_number(2); - let first_hash = first.hash(); - - let mut second = from_block_number(3); - second.parent_hash = first_hash; - let _second_hash = second.hash(); - - let mut first_bft = service.build_upon(&first, Comms(PhantomData), Comms(PhantomData)).unwrap().unwrap(); - assert!(service.live_agreement.lock().as_ref().unwrap().0 == first); - - let _second_bft = service.build_upon(&second, Comms(PhantomData), Comms(PhantomData)).unwrap(); - assert!(service.live_agreement.lock().as_ref().unwrap().0 != first); - assert!(service.live_agreement.lock().as_ref().unwrap().0 == second); - - // first_bft has been cancelled. need to swap out so we can check it. - let (_tx, mut rx) = oneshot::channel(); - ::std::mem::swap(&mut rx, &mut first_bft.cancel); - - assert!(rx.wait().is_ok()); - } - - #[test] - fn max_faulty() { - assert_eq!(max_faulty_of(3), 0); - assert_eq!(max_faulty_of(4), 1); - assert_eq!(max_faulty_of(100), 33); - assert_eq!(max_faulty_of(0), 0); - assert_eq!(max_faulty_of(11), 3); - assert_eq!(max_faulty_of(99), 32); - } - - #[test] - fn justification_check_works() { - let parent_hash = Default::default(); - let hash = [0xff; 32].into(); - - let authorities = vec![ - Ed25519Keyring::One.into(), - Ed25519Keyring::Two.into(), - Ed25519Keyring::Alice.into(), - Ed25519Keyring::Eve.into(), - ]; - - let authorities_keys = vec![ - Ed25519Keyring::One.into(), - Ed25519Keyring::Two.into(), - Ed25519Keyring::Alice.into(), - Ed25519Keyring::Eve.into(), - ]; - - let unchecked = UncheckedJustification(rhododendron::UncheckedJustification { - digest: hash, - round_number: 1, - signatures: authorities_keys.iter().take(3).map(|key| { - sign_vote(rhododendron::Vote::Commit(1, hash).into(), key, parent_hash) - }).collect(), - }); - - assert!(check_justification::(&authorities, parent_hash, unchecked).is_ok()); - - let unchecked = UncheckedJustification(rhododendron::UncheckedJustification { - digest: hash, - round_number: 0, // wrong round number (vs. the signatures) - signatures: authorities_keys.iter().take(3).map(|key| { - sign_vote(rhododendron::Vote::Commit(1, hash).into(), key, parent_hash) - }).collect(), - }); - - assert!(check_justification::(&authorities, parent_hash, unchecked).is_err()); - - // not enough signatures. - let unchecked = UncheckedJustification(rhododendron::UncheckedJustification { - digest: hash, - round_number: 1, - signatures: authorities_keys.iter().take(2).map(|key| { - sign_vote(rhododendron::Vote::Commit(1, hash).into(), key, parent_hash) - }).collect(), - }); - - assert!(check_justification::(&authorities, parent_hash, unchecked).is_err()); - - // wrong hash. - let unchecked = UncheckedJustification(rhododendron::UncheckedJustification { - digest: [0xfe; 32].into(), - round_number: 1, - signatures: authorities_keys.iter().take(3).map(|key| { - sign_vote(rhododendron::Vote::Commit(1, hash).into(), key, parent_hash) - }).collect(), - }); - - assert!(check_justification::(&authorities, parent_hash, unchecked).is_err()); - } - - #[test] - fn propose_check_works() { - let parent_hash = Default::default(); - - let authorities = vec![ - Ed25519Keyring::Alice.into(), - Ed25519Keyring::Eve.into(), - ]; - - let block = TestBlock { - header: from_block_number(1), - extrinsics: Default::default() - }; - - let proposal = sign_message( - rhododendron::Message::Propose(1, block.clone()), - &Ed25519Keyring::Alice.pair(), - parent_hash, - ); - if let rhododendron::LocalizedMessage::Propose(proposal) = proposal { - assert!(check_proposal(&authorities, &parent_hash, &proposal).is_ok()); - let mut invalid_round = proposal.clone(); - invalid_round.round_number = 0; - assert!(check_proposal(&authorities, &parent_hash, &invalid_round).is_err()); - let mut invalid_digest = proposal.clone(); - invalid_digest.digest = [0xfe; 32].into(); - assert!(check_proposal(&authorities, &parent_hash, &invalid_digest).is_err()); - } else { - assert!(false); - } - - // Not an authority - let proposal = sign_message::( - rhododendron::Message::Propose(1, block), - &Ed25519Keyring::Bob.pair(), - parent_hash, - ); - if let rhododendron::LocalizedMessage::Propose(proposal) = proposal { - assert!(check_proposal(&authorities, &parent_hash, &proposal).is_err()); - } else { - assert!(false); - } - } - - #[test] - fn vote_check_works() { - let parent_hash: H256 = Default::default(); - let hash: H256 = [0xff; 32].into(); - - let authorities = vec![ - Ed25519Keyring::Alice.into(), - Ed25519Keyring::Eve.into(), - ]; - - let vote = sign_message::(rhododendron::Message::Vote(rhododendron::Vote::Prepare(1, hash)), &Keyring::Alice.pair(), parent_hash);; - if let rhododendron::LocalizedMessage::Vote(vote) = vote { - assert!(check_vote::(&authorities, &parent_hash, &vote).is_ok()); - let mut invalid_sender = vote.clone(); - invalid_sender.signature.signer = Keyring::Eve.into(); - assert!(check_vote::(&authorities, &parent_hash, &invalid_sender).is_err()); - } else { - assert!(false); - } - - // Not an authority - let vote = sign_message::(rhododendron::Message::Vote(rhododendron::Vote::Prepare(1, hash)), &Keyring::Bob.pair(), parent_hash);; - if let rhododendron::LocalizedMessage::Vote(vote) = vote { - assert!(check_vote::(&authorities, &parent_hash, &vote).is_err()); - } else { - assert!(false); - } - } - - #[test] - fn drop_bft_future_does_not_deadlock() { - let client = FakeClient { - authorities: vec![ - Ed25519Keyring::One.into(), - Ed25519Keyring::Two.into(), - Ed25519Keyring::Alice.into(), - Ed25519Keyring::Eve.into(), - ], - imported_heights: Mutex::new(HashSet::new()), - }; - - let service = make_service(client); - - let first = from_block_number(2); - let first_hash = first.hash(); - - let mut second = from_block_number(3); - second.parent_hash = first_hash; - - let _ = service.build_upon(&first, Comms(PhantomData), Comms(PhantomData)).unwrap(); - assert!(service.live_agreement.lock().as_ref().unwrap().0 == first); - service.live_agreement.lock().take(); - } - - #[test] - fn bft_can_build_though_skipped() { - let client = FakeClient { - authorities: vec![ - Ed25519Keyring::One.into(), - Ed25519Keyring::Two.into(), - Ed25519Keyring::Alice.into(), - Ed25519Keyring::Eve.into(), - ], - imported_heights: Mutex::new(HashSet::new()), - }; - - let service = make_service(client); - - let first = from_block_number(2); - let first_hash = first.hash(); - - let mut second = from_block_number(3); - second.parent_hash = first_hash; - - let mut third = from_block_number(4); - third.parent_hash = second.hash(); - - let _ = service.build_upon(&first, Comms(PhantomData), Comms(PhantomData)).unwrap(); - assert!(service.live_agreement.lock().as_ref().unwrap().0 == first); - // BFT has not seen second, but will move forward on third - service.build_upon(&third, Comms(PhantomData), Comms(PhantomData)).unwrap(); - assert!(service.live_agreement.lock().as_ref().unwrap().0 == third); - - // but we are not walking backwards - service.build_upon(&second, Comms(PhantomData), Comms(PhantomData)).unwrap(); - assert!(service.live_agreement.lock().as_ref().unwrap().0 == third); - } -} diff --git a/core/consensus/rhd/src/misbehaviour_check.rs b/core/consensus/rhd/src/misbehaviour_check.rs deleted file mode 100644 index a475f5d1ef597..0000000000000 --- a/core/consensus/rhd/src/misbehaviour_check.rs +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright 2017-2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Utility for substrate-based runtimes that want to check misbehavior reports. - -use codec::{Codec, Encode}; -use primitives::{AuthorityId, Signature}; - -use rhododendron::messages::{Action, Message, MisbehaviorKind}; -use runtime_io; - -// check a message signature. returns true if signed by that authority. -fn check_message_sig( - message: Message, - signature: &Signature, - from: &AuthorityId -) -> bool { - let msg: Vec = message.encode(); - runtime_io::ed25519_verify(&signature.0, &msg, from) -} - -fn prepare(parent: H, round_number: u32, hash: H) -> Message { - Message { - parent, - action: Action::Prepare(round_number, hash), - } -} - -fn commit(parent: H, round_number: u32, hash: H) -> Message { - Message { - parent, - action: Action::Commit(round_number, hash), - } -} - -/// Evaluate misbehavior. -/// -/// Doesn't check that the header hash in question is -/// valid or whether the misbehaving authority was part of -/// the set at that block. -pub fn evaluate_misbehavior( - misbehaved: &AuthorityId, - parent_hash: H, - kind: &MisbehaviorKind, -) -> bool { - match *kind { - MisbehaviorKind::BftDoublePrepare(round, (h_1, ref s_1), (h_2, ref s_2)) => { - s_1 != s_2 && - check_message_sig::(prepare::(parent_hash, round, h_1), s_1, misbehaved) && - check_message_sig::(prepare::(parent_hash, round, h_2), s_2, misbehaved) - } - MisbehaviorKind::BftDoubleCommit(round, (h_1, ref s_1), (h_2, ref s_2)) => { - s_1 != s_2 && - check_message_sig::(commit::(parent_hash, round, h_1), s_1, misbehaved) && - check_message_sig::(commit::(parent_hash, round, h_2), s_2, misbehaved) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use keyring::Ed25519Keyring; - use rhododendron; - - use runtime_primitives::testing::{H256, Block as RawBlock}; - - type Block = RawBlock; - - fn sign_prepare(key: &ed25519::Pair, round: u32, hash: H256, parent_hash: H256) -> (H256, Signature) { - let msg = ::sign_message::( - rhododendron::Message::Vote(rhododendron::Vote::Prepare(round as _, hash)), - key, - parent_hash - ); - - match msg { - rhododendron::LocalizedMessage::Vote(vote) => (hash, vote.signature.signature), - _ => panic!("signing vote leads to signed vote"), - } - } - - fn sign_commit(key: &ed25519::Pair, round: u32, hash: H256, parent_hash: H256) -> (H256, Signature) { - let msg = ::sign_message::( - rhododendron::Message::Vote(rhododendron::Vote::Commit(round as _, hash)), - key, - parent_hash - ); - - match msg { - rhododendron::LocalizedMessage::Vote(vote) => (hash, vote.signature.signature), - _ => panic!("signing vote leads to signed vote"), - } - } - - #[test] - fn evaluates_double_prepare() { - let key = Ed25519Keyring::One.pair(); - let parent_hash = [0xff; 32].into(); - let hash_1 = [0; 32].into(); - let hash_2 = [1; 32].into(); - - assert!(evaluate_misbehavior::( - &key.public().into(), - parent_hash, - &MisbehaviorKind::BftDoublePrepare( - 1, - sign_prepare(&key, 1, hash_1, parent_hash), - sign_prepare(&key, 1, hash_2, parent_hash) - ) - )); - - // same signature twice is not misbehavior. - let signed = sign_prepare(&key, 1, hash_1, parent_hash); - assert!(!evaluate_misbehavior::( - &key.public().into(), - parent_hash, - &MisbehaviorKind::BftDoublePrepare( - 1, - signed, - signed, - ) - )); - - // misbehavior has wrong target. - assert!(!evaluate_misbehavior::( - &Ed25519Keyring::Two.into(), - parent_hash, - &MisbehaviorKind::BftDoublePrepare( - 1, - sign_prepare(&key, 1, hash_1, parent_hash), - sign_prepare(&key, 1, hash_2, parent_hash), - ) - )); - } - - #[test] - fn evaluates_double_commit() { - let key = Ed25519Keyring::One.pair(); - let parent_hash = [0xff; 32].into(); - let hash_1 = [0; 32].into(); - let hash_2 = [1; 32].into(); - - assert!(evaluate_misbehavior::( - &key.public().into(), - parent_hash, - &MisbehaviorKind::BftDoubleCommit( - 1, - sign_commit(&key, 1, hash_1, parent_hash), - sign_commit(&key, 1, hash_2, parent_hash) - ) - )); - - // same signature twice is not misbehavior. - let signed = sign_commit(&key, 1, hash_1, parent_hash); - assert!(!evaluate_misbehavior::( - &key.public().into(), - parent_hash, - &MisbehaviorKind::BftDoubleCommit( - 1, - signed, - signed, - ) - )); - - // misbehavior has wrong target. - assert!(!evaluate_misbehavior::( - &Ed25519Keyring::Two.into(), - parent_hash, - &MisbehaviorKind::BftDoubleCommit( - 1, - sign_commit(&key, 1, hash_1, parent_hash), - sign_commit(&key, 1, hash_2, parent_hash), - ) - )); - } -} diff --git a/core/consensus/rhd/src/service.rs b/core/consensus/rhd/src/service.rs deleted file mode 100644 index f59393c530356..0000000000000 --- a/core/consensus/rhd/src/service.rs +++ /dev/null @@ -1,181 +0,0 @@ -// Copyright 2018-2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Consensus service. - -/// Consensus service. A long running service that manages BFT agreement -/// the network. -use std::thread; -use std::time::{Duration, Instant}; -use std::sync::Arc; - -use client::{BlockchainEvents, BlockBody}; -use futures::prelude::*; -use transaction_pool::txpool::{Pool as TransactionPool, ChainApi as PoolChainApi}; -use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, BlockNumberToHash}; - -use tokio::executor::current_thread::TaskExecutor as LocalThreadHandle; -use tokio::runtime::TaskExecutor as ThreadPoolHandle; -use tokio::runtime::current_thread::Runtime as LocalRuntime; -use tokio::timer::Interval; - -use parking_lot::RwLock; -use consensus::{self, offline_tracker::OfflineTracker}; - -use super::{Network, ProposerFactory, AuthoringApi}; -use {consensus, primitives, ed25519, error, BftService, LocalProposer}; - -const TIMER_DELAY_MS: u64 = 5000; -const TIMER_INTERVAL_MS: u64 = 500; - -// spin up an instance of BFT agreement on the current thread's executor. -// panics if there is no current thread executor. -fn start_bft( - header: ::Header, - bft_service: Arc>, -) where - F: consensus::Environment + 'static, - C: consensus::BlockImport + consensus::Authorities + 'static, - F::Error: ::std::fmt::Debug, - >::Error: ::std::fmt::Display + Into, - >::Proposer : LocalProposer, - >::Error: ::std::fmt::Display, - Block: BlockT, -{ - let mut handle = LocalThreadHandle::current(); - match bft_service.build_upon(&header) { - Ok(Some(bft_work)) => if let Err(e) = handle.spawn_local(Box::new(bft_work)) { - warn!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e); - } - Ok(None) => trace!(target: "bft", "Could not start agreement on top of {}", header.hash()), - Err(e) => warn!(target: "bft", "BFT agreement error: {}", e), - } -} - -/// Consensus service. Starts working when created. -pub struct Service { - thread: Option>, - exit_signal: Option<::exit_future::Signal>, -} - -impl Service { - /// Create and start a new instance. - pub fn new( - client: Arc, - api: Arc, - network: N, - transaction_pool: Arc>, - thread_pool: ThreadPoolHandle, - key: ed25519::Pair, - block_delay: u64, - ) -> Service - where - error::Error: From<::Error>, - A: AuthoringApi + BlockNumberToHash + 'static, - P: PoolChainApi::Block> + 'static, - C: BlockchainEvents<::Block> - + BlockBody<::Block> - + consensus::SelectChain<::Block> - + consensus::BlockImport<::Block> - + consensus::Authorities<::Block> + Send + Sync + 'static, - primitives::H256: From<<::Block as BlockT>::Hash>, - <::Block as BlockT>::Hash: PartialEq + PartialEq, - N: Network::Block> + Send + 'static, - { - - let (signal, exit) = ::exit_future::signal(); - let thread = thread::spawn(move || { - let mut runtime = LocalRuntime::new().expect("Could not create local runtime"); - let key = Arc::new(key); - - let factory = ProposerFactory { - client: api.clone(), - transaction_pool: transaction_pool.clone(), - network, - handle: thread_pool.clone(), - offline: Arc::new(RwLock::new(OfflineTracker::new())), - force_delay: block_delay, - }; - let bft_service = Arc::new(BftService::new(client.clone(), key, factory)); - - let notifications = { - let client = client.clone(); - let bft_service = bft_service.clone(); - - client.import_notification_stream().for_each(move |notification| { - if notification.is_new_best { - start_bft(notification.header, bft_service.clone()); - } - Ok(()) - }) - }; - - let interval = Interval::new( - Instant::now() + Duration::from_millis(TIMER_DELAY_MS), - Duration::from_millis(TIMER_INTERVAL_MS), - ); - - let mut prev_best = match client.best_block_header() { - Ok(header) => header.hash(), - Err(e) => { - warn!("Cant's start consensus service. Error reading best block header: {:?}", e); - return; - } - }; - - let timed = { - let c = client.clone(); - let s = bft_service.clone(); - - interval.map_err(|e| debug!(target: "bft", "Timer error: {:?}", e)).for_each(move |_| { - if let Ok(best_block) = c.best_block_header() { - let hash = best_block.hash(); - - if hash == prev_best { - debug!(target: "bft", "Starting consensus round after a timeout"); - start_bft(best_block, s.clone()); - } - prev_best = hash; - } - Ok(()) - }) - }; - - runtime.spawn(notifications); - runtime.spawn(timed); - - if let Err(e) = runtime.block_on(exit) { - debug!("BFT event loop error {:?}", e); - } - }); - Service { - thread: Some(thread), - exit_signal: Some(signal), - } - } -} - -impl Drop for Service { - fn drop(&mut self) { - if let Some(signal) = self.exit_signal.take() { - signal.fire(); - } - - if let Some(thread) = self.thread.take() { - thread.join().expect("The service thread has panicked"); - } - } -} diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index aeeb8e2061fdf..616b6ec722a48 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -278,64 +278,6 @@ mod tests { use service_test::SyncService; use crate::service::Factory; - #[cfg(feature = "rhd")] - fn test_sync() { - use primitives::ed25519::Pair; - - use {service_test, Factory}; - use client::{BlockImportParams, BlockOrigin}; - - let alice: Arc = Arc::new(Keyring::Alice.into()); - let bob: Arc = Arc::new(Keyring::Bob.into()); - let validators = vec![alice.public().0.into(), bob.public().0.into()]; - let keys: Vec<&ed25519::Pair> = vec![&*alice, &*bob]; - let dummy_runtime = ::tokio::runtime::Runtime::new().unwrap(); - let block_factory = |service: &::FullService| { - let block_id = BlockId::number(service.client().info().chain.best_number); - let parent_header = service.client().header(&block_id).unwrap().unwrap(); - let consensus_net = ConsensusNetwork::new(service.network(), service.client().clone()); - let proposer_factory = consensus::ProposerFactory { - client: service.client().clone(), - transaction_pool: service.transaction_pool().clone(), - network: consensus_net, - force_delay: 0, - handle: dummy_runtime.executor(), - }; - let (proposer, _, _) = proposer_factory.init(&parent_header, &validators, alice.clone()).unwrap(); - let block = proposer.propose().expect("Error making test block"); - BlockImportParams { - origin: BlockOrigin::File, - justification: Vec::new(), - internal_justification: Vec::new(), - finalized: true, - body: Some(block.extrinsics), - header: block.header, - auxiliary: Vec::new(), - } - }; - let extrinsic_factory = |service: &SyncService<::FullService>| { - let payload = ( - 0, - Call::Balances(BalancesCall::transfer(RawAddress::Id(bob.public().0.into()), 69.into())), - Era::immortal(), - service.client().genesis_hash() - ); - let signature = alice.sign(&payload.encode()).into(); - let id = alice.public().0.into(); - let xt = UncheckedExtrinsic { - signature: Some((RawAddress::Id(id), signature, payload.0, Era::immortal())), - function: payload.1, - }.encode(); - let v: Vec = Decode::decode(&mut xt.as_slice()).unwrap(); - OpaqueExtrinsic(v) - }; - service_test::sync::( - chain_spec::integration_test_config(), - block_factory, - extrinsic_factory, - ); - } - #[test] #[ignore] fn test_sync() {