diff --git a/Cargo.lock b/Cargo.lock index b242c7337d920..335932c5956ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -876,6 +876,11 @@ dependencies = [ "static_assertions 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "fixedbitset" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "flate2" version = "1.0.9" @@ -2236,6 +2241,11 @@ dependencies = [ "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "multimap" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "multistream-select" version = "0.5.1" @@ -2324,6 +2334,7 @@ dependencies = [ "srml-system 2.0.0", "srml-timestamp 2.0.0", "structopt 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", + "substrate-authority-discovery 2.0.0", "substrate-basic-authorship 2.0.0", "substrate-cli 2.0.0", "substrate-client 2.0.0", @@ -2934,6 +2945,14 @@ name = "percent-encoding" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "petgraph" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "pin-utils" version = "0.1.0-alpha.4" @@ -3005,6 +3024,54 @@ dependencies = [ "unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "prost" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "prost-derive 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prost-build" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", + "multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "prost-types 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "which 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prost-derive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.42 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prost-types" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "protobuf" version = "2.8.0" @@ -4440,6 +4507,32 @@ dependencies = [ "substrate-test-runtime-client 2.0.0", ] +[[package]] +name = "substrate-authority-discovery" +version = "2.0.0" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "libp2p 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-scale-codec 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", + "sr-primitives 2.0.0", + "substrate-authority-discovery-primitives 2.0.0", + "substrate-client 2.0.0", + "substrate-keystore 2.0.0", + "substrate-network 2.0.0", + "substrate-peerset 2.0.0", + "substrate-primitives 2.0.0", + "substrate-test-runtime-client 2.0.0", + "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "substrate-authority-discovery-primitives" version = "2.0.0" @@ -5134,6 +5227,8 @@ dependencies = [ "sr-io 2.0.0", "sr-primitives 2.0.0", "substrate-application-crypto 2.0.0", + "substrate-authority-discovery 2.0.0", + "substrate-authority-discovery-primitives 2.0.0", "substrate-client 2.0.0", "substrate-client-db 2.0.0", "substrate-consensus-babe-primitives 2.0.0", @@ -6433,6 +6528,7 @@ dependencies = [ "checksum fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1ee15a7050e5580b3712877157068ea713b245b080ff302ae2ca973cfcd9baa" "checksum finality-grandpa 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9681c1f75941ea47584573dd2bc10558b2067d460612945887e00744e43393be" "checksum fixed-hash 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "516877b7b9a1cc2d0293cbce23cd6203f0edbfd4090e6ca4489fecb5aa73050e" +"checksum fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "86d4de0081402f5e88cdac65c8dcdcc73118c1a7a465e2a05f0da05843a8ea33" "checksum flate2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)" = "550934ad4808d5d39365e5d61727309bf18b3b02c6c56b729cb92e7dd84bc3d8" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" @@ -6561,6 +6657,7 @@ dependencies = [ "checksum mio-extras 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "46e73a04c2fa6250b8d802134d56d554a9ec2922bf977777c805ea5def61ce40" "checksum mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)" = "966257a94e196b11bb43aca423754d87429960a768de9414f3691d6957abf125" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" +"checksum multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151" "checksum multistream-select 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e8f3cb4c93f2d79811fc11fa01faab99d8b7b8cbe024b602c27434ff2b08a59d" "checksum names 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ef320dab323286b50fb5cdda23f61c796a72a89998ab565ca32525c5c556f2da" "checksum native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4b2df1a4c22fd44a62147fd8f13dd0f95c9d8ca7b2610299b2a2f9cf8964274e" @@ -6607,6 +6704,7 @@ dependencies = [ "checksum peeking_take_while 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" "checksum percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +"checksum petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)" = "9c3659d1ee90221741f65dd128d9998311b0e40c5d3c23a62445938214abce4f" "checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587" "checksum pkg-config 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c1d2cfa5a714db3b5f24f0915e74fcdf91d09d496ba61329705dda7774d2af" "checksum ppv-lite86 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e3cbf9f658cdb5000fcf6f362b8ea2ba154b9f146a61c7a20d647034c6b6561b" @@ -6616,6 +6714,10 @@ dependencies = [ "checksum proc-macro-hack 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)" = "982a35d1194084ba319d65c4a68d24ca28f5fdb5b8bc20899e4eef8641ea5178" "checksum proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)" = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759" "checksum proc-macro2 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "175a40b9cf564ce9bf050654633dbf339978706b8ead1a907bb970b63185dd95" +"checksum prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "96d14b1c185652833d24aaad41c5832b0be5616a590227c1fbff57c616754b23" +"checksum prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "eb788126ea840817128183f8f603dce02cb7aea25c2a0b764359d8e20010702e" +"checksum prost-derive 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5e7dc378b94ac374644181a2247cebf59a6ec1c88b49ac77f3a94b86b79d0e11" +"checksum prost-types 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1de482a366941c8d56d19b650fac09ca08508f2a696119ee7513ad590c8bac6f" "checksum protobuf 2.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8aefcec9f142b524d98fc81d07827743be89dd6586a1ba6ab21fa66a500b3fa5" "checksum pwasm-utils 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "efb0dcbddbb600f47a7098d33762a00552c671992171637f5bb310b37fe1f0e4" "checksum quick-error 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5fb6ccf8db7bbcb9c2eae558db5ab4f3da1c2a87e4e597ed394726bc8ea6ca1d" diff --git a/Cargo.toml b/Cargo.toml index aaa8c372fb182..29f5307610794 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,7 @@ members = [ "core/utils/fork-tree", "core/utils/wasm-builder", "core/utils/wasm-builder-runner", + "core/authority-discovery", "srml/support", "srml/support/procedural", "srml/support/procedural/tools", diff --git a/core/authority-discovery/Cargo.toml b/core/authority-discovery/Cargo.toml new file mode 100644 index 0000000000000..ac7f8ac3685ee --- /dev/null +++ b/core/authority-discovery/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "substrate-authority-discovery" +version = "2.0.0" +authors = ["Parity Technologies "] +edition = "2018" +build = "build.rs" + +[build-dependencies] +prost-build = "0.5" + +[dependencies] +authority-discovery-primitives = { package = "substrate-authority-discovery-primitives", path = "./primitives", default-features = false } +bytes = "0.4" +client = { package = "substrate-client", path = "../../core/client" } +codec = { package = "parity-scale-codec", default-features = false, version = "1.0.3" } +derive_more = "0.14.0" +futures = "0.1" +keystore = { package = "substrate-keystore", path = "../../core/keystore" } +libp2p = { version = "0.12.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] } +log = "0.4" +network = { package = "substrate-network", path = "../../core/network" } +primitives = { package = "substrate-primitives", path = "../primitives" } +prost = "0.5" +serde_json = "1.0" +sr-primitives = { path = "../../core/sr-primitives" } +tokio-timer = "0.2" + +[dev-dependencies] +parking_lot = { version = "0.9.0" } +peerset = { package = "substrate-peerset", path = "../../core/peerset" } +test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client" } +tokio = { version = "0.1"} diff --git a/core/authority-discovery/build.rs b/core/authority-discovery/build.rs new file mode 100644 index 0000000000000..ed632575f3ba8 --- /dev/null +++ b/core/authority-discovery/build.rs @@ -0,0 +1,3 @@ +fn main() { + prost_build::compile_protos(&["src/schema/dht.proto"], &["src/schema"]).unwrap(); +} diff --git a/core/authority-discovery/primitives/src/lib.rs b/core/authority-discovery/primitives/src/lib.rs index 556b758aa61fc..13da4de020466 100644 --- a/core/authority-discovery/primitives/src/lib.rs +++ b/core/authority-discovery/primitives/src/lib.rs @@ -19,9 +19,15 @@ #![cfg_attr(not(feature = "std"), no_std)] use client::decl_runtime_apis; -use codec::Codec; use rstd::vec::Vec; +#[derive(codec::Encode, codec::Decode, Eq, PartialEq, Clone)] +#[cfg_attr(feature = "std", derive(Debug, Hash))] +pub struct Signature(pub Vec); +#[derive(codec::Encode, codec::Decode, Eq, PartialEq, Clone)] +#[cfg_attr(feature = "std", derive(Debug, Hash))] +pub struct AuthorityId(pub Vec); + decl_runtime_apis! { /// The authority discovery api. /// @@ -29,21 +35,15 @@ decl_runtime_apis! { /// own authority identifier, to retrieve identifiers of the current authority /// set, as well as sign and verify Kademlia Dht external address payloads /// from and to other authorities. - pub trait AuthorityDiscoveryApi { - /// Returns own authority identifier iff it is part of the current authority - /// set, otherwise this function returns None. The restriction might be - /// softened in the future in case a consumer needs to learn own authority - /// identifier. - fn authority_id() -> Option; - + pub trait AuthorityDiscoveryApi { /// Retrieve authority identifiers of the current authority set. fn authorities() -> Vec; /// Sign the given payload with the private key corresponding to the given authority id. - fn sign(payload: Vec, authority_id: AuthorityId) -> Option>; + fn sign(payload: &Vec) -> Option<(Signature, AuthorityId)>; /// Verify the given signature for the given payload with the given /// authority identifier. - fn verify(payload: Vec, signature: Vec, authority_id: AuthorityId) -> bool; + fn verify(payload: &Vec, signature: &Signature, authority_id: &AuthorityId) -> bool; } } diff --git a/core/authority-discovery/src/error.rs b/core/authority-discovery/src/error.rs new file mode 100644 index 0000000000000..e8c1ad9705f0c --- /dev/null +++ b/core/authority-discovery/src/error.rs @@ -0,0 +1,47 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Authority discovery errors. + +/// AuthorityDiscovery Result. +pub type Result = std::result::Result; + +/// Error type for the authority discovery module. +#[derive(Debug, derive_more::Display, derive_more::From)] +pub enum Error { + /// Failed to verify a dht payload with the given signature. + VerifyingDhtPayload, + /// Failed to hash the authority id to be used as a dht key. + HashingAuthorityId(libp2p::core::multiaddr::multihash::EncodeError), + /// Failed calling into the Substrate runtime. + CallingRuntime(client::error::Error), + /// Failed signing the dht payload via the Substrate runtime. + SigningDhtPayload, + /// From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure it + /// is actually an authority, we match the hash against the hash of the authority id of all other authorities. This + /// error is the result of the above failing. + MatchingHashedAuthorityIdWithAuthorityId, + /// Failed to set the authority discovery peerset priority group in the peerset module. + SettingPeersetPriorityGroup(String), + /// Failed to encode a dht payload. + Encoding(prost::EncodeError), + /// Failed to decode a dht payload. + Decoding(prost::DecodeError), + /// Failed to parse a libp2p multi address. + ParsingMultiaddress(libp2p::core::multiaddr::Error), + /// Tokio timer error. + PollingTokioTimer(tokio_timer::Error) +} diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs new file mode 100644 index 0000000000000..987169ead90b1 --- /dev/null +++ b/core/authority-discovery/src/lib.rs @@ -0,0 +1,698 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +#![warn(missing_docs)] + +//! Substrate authority discovery. +//! +//! This crate enables Substrate authorities to directly connect to other authorities. [`AuthorityDiscovery`] implements +//! the Future trait. By polling [`AuthorityDiscovery`] an authority: +//! +//! +//! 1. **Makes itself discoverable** +//! +//! 1. Retrieves its external addresses. +//! +//! 2. Adds its network peer id to the addresses. +//! +//! 3. Signs the above. +//! +//! 4. Puts the signature and the addresses on the libp2p Kademlia DHT. +//! +//! +//! 2. **Discovers other authorities** +//! +//! 1. Retrieves the current set of authorities. +//! +//! 2. Starts DHT queries for the ids of the authorities. +//! +//! 3. Validates the signatures of the retrieved key value pairs. +//! +//! 4. Adds the retrieved external addresses as priority nodes to the peerset. + +use authority_discovery_primitives::{AuthorityDiscoveryApi, AuthorityId, Signature}; +use client::blockchain::HeaderBackend; +use error::{Error, Result}; +use futures::{prelude::*, sync::mpsc::Receiver}; +use log::{debug, error, log_enabled, warn}; +use network::specialization::NetworkSpecialization; +use network::{DhtEvent, ExHashT}; +use prost::Message; +use sr_primitives::generic::BlockId; +use sr_primitives::traits::{Block as BlockT, ProvideRuntimeApi}; +use std::collections::{HashMap, HashSet}; +use std::convert::TryInto; +use std::iter::FromIterator; +use std::marker::PhantomData; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +mod error; +/// Dht payload schemas generated from Protobuf definitions via Prost crate in build.rs. +mod schema { + include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs")); +} + +/// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities. +pub struct AuthorityDiscovery +where + Block: BlockT + 'static, + Network: NetworkProvider, + Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, + ::Api: AuthorityDiscoveryApi, +{ + client: Arc, + + network: Arc, + /// Channel we receive Dht events on. + dht_event_rx: Receiver, + + /// Interval to be proactive, publishing own addresses. + publish_interval: tokio_timer::Interval, + /// Interval on which to query for addresses of other authorities. + query_interval: tokio_timer::Interval, + + /// The network peerset interface for priority groups lets us only set an entire group, but we retrieve the + /// addresses of other authorities one by one from the network. To use the peerset interface we need to cache the + /// addresses and always overwrite the entire peerset priority group. To ensure this map doesn't grow indefinitely + /// `purge_old_authorities_from_cache` function is called each time we add a new entry. + address_cache: HashMap>, + + phantom: PhantomData, +} + +impl AuthorityDiscovery +where + Block: BlockT + 'static, + Network: NetworkProvider, + Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, + ::Api: AuthorityDiscoveryApi, +{ + /// Return a new authority discovery. + pub fn new( + client: Arc, + network: Arc, + dht_event_rx: futures::sync::mpsc::Receiver, + ) -> AuthorityDiscovery { + // Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h. Given that a node + // could restart at any point in time, one can not depend on the republishing process, thus publishing own + // external addresses should happen on an interval < 36h. + let publish_interval = + tokio_timer::Interval::new(Instant::now(), Duration::from_secs(12 * 60 * 60)); + + // External addresses of other authorities can change at any given point in time. The interval on which to query + // for external addresses of other authorities is a trade off between efficiency and performance. + let query_interval = + tokio_timer::Interval::new(Instant::now(), Duration::from_secs(10 * 60)); + + let address_cache = HashMap::new(); + + AuthorityDiscovery { + client, + network, + dht_event_rx, + publish_interval, + query_interval, + address_cache, + phantom: PhantomData, + } + } + + fn publish_own_ext_addresses(&mut self) -> Result<()> { + let id = BlockId::hash(self.client.info().best_hash); + + let addresses = self + .network + .external_addresses() + .into_iter() + .map(|a| { + a.with(libp2p::core::multiaddr::Protocol::P2p( + self.network.local_peer_id().into(), + )) + }) + .map(|a| a.to_vec()) + .collect(); + + let mut serialized_addresses = vec![]; + schema::AuthorityAddresses { addresses } + .encode(&mut serialized_addresses) + .map_err(Error::Encoding)?; + + let (signature, authority_id) = self + .client + .runtime_api() + .sign(&id, &serialized_addresses) + .map_err(Error::CallingRuntime)? + .ok_or(Error::SigningDhtPayload)?; + + let mut signed_addresses = vec![]; + schema::SignedAuthorityAddresses { + addresses: serialized_addresses, + signature: signature.0, + } + .encode(&mut signed_addresses) + .map_err(Error::Encoding)?; + + self.network.put_value( + hash_authority_id(authority_id.0.as_ref())?, + signed_addresses, + ); + + Ok(()) + } + + fn request_addresses_of_others(&mut self) -> Result<()> { + let id = BlockId::hash(self.client.info().best_hash); + + let authorities = self + .client + .runtime_api() + .authorities(&id) + .map_err(Error::CallingRuntime)?; + + for authority_id in authorities.iter() { + self.network + .get_value(&hash_authority_id(authority_id.0.as_ref())?); + } + + Ok(()) + } + + fn handle_dht_events(&mut self) -> Result<()> { + while let Ok(Async::Ready(Some(event))) = self.dht_event_rx.poll() { + match event { + DhtEvent::ValueFound(v) => { + if log_enabled!(log::Level::Debug) { + let hashes = v.iter().map(|(hash, _value)| hash.clone()); + debug!(target: "sub-authority-discovery", "Value for hash '{:?}' found on Dht.", hashes); + } + + self.handle_dht_value_found_event(v)?; + } + DhtEvent::ValueNotFound(hash) => { + warn!(target: "sub-authority-discovery", "Value for hash '{:?}' not found on Dht.", hash) + } + DhtEvent::ValuePut(hash) => { + debug!(target: "sub-authority-discovery", "Successfully put hash '{:?}' on Dht.", hash) + } + DhtEvent::ValuePutFailed(hash) => { + warn!(target: "sub-authority-discovery", "Failed to put hash '{:?}' on Dht.", hash) + } + } + } + + Ok(()) + } + + fn handle_dht_value_found_event( + &mut self, + values: Vec<(libp2p::kad::record::Key, Vec)>, + ) -> Result<()> { + debug!(target: "sub-authority-discovery", "Got Dht value from network."); + + let id = BlockId::hash(self.client.info().best_hash); + + // From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure + // it is actually an authority, we match the hash against the hash of the authority id of all other authorities. + let authorities = self.client.runtime_api().authorities(&id)?; + self.purge_old_authorities_from_cache(&authorities); + + let authorities = authorities + .into_iter() + .map(|a| hash_authority_id(a.0.as_ref()).map(|h| (h, a))) + .collect::>>()?; + + for (key, value) in values.iter() { + // Check if the event origins from an authority in the current authority set. + let authority_id: &AuthorityId = authorities + .get(key) + .ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?; + + let schema::SignedAuthorityAddresses { + signature, + addresses, + } = schema::SignedAuthorityAddresses::decode(value).map_err(Error::Decoding)?; + let signature = Signature(signature); + + let is_verified = self + .client + .runtime_api() + .verify(&id, &addresses, &signature, &authority_id.clone()) + .map_err(Error::CallingRuntime)?; + + if !is_verified { + return Err(Error::VerifyingDhtPayload); + } + + let addresses: Vec = schema::AuthorityAddresses::decode(addresses) + .map(|a| a.addresses) + .map_err(Error::Decoding)? + .into_iter() + .map(|a| a.try_into()) + .collect::>() + .map_err(Error::ParsingMultiaddress)?; + + self.address_cache.insert(authority_id.clone(), addresses); + } + + // Let's update the peerset priority group with the all the addresses we have in our cache. + + let addresses = HashSet::from_iter( + self.address_cache + .iter() + .map(|(_peer_id, addresses)| addresses.clone()) + .flatten(), + ); + + debug!(target: "sub-authority-discovery", "Applying priority group {:#?} to peerset.", addresses); + self.network + .set_priority_group("authorities".to_string(), addresses) + .map_err(Error::SettingPeersetPriorityGroup)?; + + Ok(()) + } + + fn purge_old_authorities_from_cache(&mut self, current_authorities: &Vec) { + self.address_cache + .retain(|peer_id, _addresses| current_authorities.contains(peer_id)) + } +} + +impl futures::Future for AuthorityDiscovery +where + Block: BlockT + 'static, + Network: NetworkProvider, + Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, + ::Api: AuthorityDiscoveryApi, +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> futures::Poll { + let mut inner = || -> Result<()> { + // Process incoming events before triggering new ones. + self.handle_dht_events()?; + + if let Async::Ready(_) = self + .publish_interval + .poll() + .map_err(Error::PollingTokioTimer)? + { + // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the + // function calls within this block do a `return`, we don't call `interval.poll` again and thereby the + // underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval + // tick. + while let Async::Ready(_) = self + .publish_interval + .poll() + .map_err(Error::PollingTokioTimer)? + {} + + self.publish_own_ext_addresses()?; + } + + if let Async::Ready(_) = self + .query_interval + .poll() + .map_err(Error::PollingTokioTimer)? + { + // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the + // function calls within this block do a `return`, we don't call `interval.poll` again and thereby the + // underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval + // tick. + while let Async::Ready(_) = self + .query_interval + .poll() + .map_err(Error::PollingTokioTimer)? + {} + + self.request_addresses_of_others()?; + } + + Ok(()) + }; + + match inner() { + Ok(()) => {} + Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e), + }; + + // Make sure to always return NotReady as this is a long running task with the same lifetime as the node itself. + Ok(futures::Async::NotReady) + } +} + +/// NetworkProvider provides AuthorityDiscovery with all necessary hooks into the underlying Substrate networking. Using +/// this trait abstraction instead of NetworkService directly is necessary to unit test AuthorityDiscovery. +pub trait NetworkProvider { + /// Returns the local external addresses. + fn external_addresses(&self) -> Vec; + + /// Returns the network identity of the node. + fn local_peer_id(&self) -> libp2p::PeerId; + + /// Modify a peerset priority group. + fn set_priority_group( + &self, + group_id: String, + peers: HashSet, + ) -> std::result::Result<(), String>; + + /// Start putting a value in the Dht. + fn put_value(&self, key: libp2p::kad::record::Key, value: Vec); + + /// Start getting a value from the Dht. + fn get_value(&self, key: &libp2p::kad::record::Key); +} + +impl NetworkProvider for network::NetworkService +where + B: BlockT + 'static, + S: NetworkSpecialization, + H: ExHashT, +{ + fn external_addresses(&self) -> Vec { + self.external_addresses() + } + fn local_peer_id(&self) -> libp2p::PeerId { + self.local_peer_id() + } + fn set_priority_group( + &self, + group_id: String, + peers: HashSet, + ) -> std::result::Result<(), String> { + self.set_priority_group(group_id, peers) + } + fn put_value(&self, key: libp2p::kad::record::Key, value: Vec) { + self.put_value(key, value) + } + fn get_value(&self, key: &libp2p::kad::record::Key) { + self.get_value(key) + } +} + +fn hash_authority_id(id: &[u8]) -> Result { + libp2p::multihash::encode(libp2p::multihash::Hash::SHA2256, id) + .map(|k| libp2p::kad::record::Key::new(&k)) + .map_err(Error::HashingAuthorityId) +} + +#[cfg(test)] +mod tests { + use super::*; + use client::runtime_api::{ApiExt, Core, RuntimeVersion}; + use futures::future::poll_fn; + use primitives::{ExecutionContext, NativeOrEncoded}; + use sr_primitives::traits::Zero; + use sr_primitives::traits::{ApiRef, Block as BlockT, NumberFor, ProvideRuntimeApi}; + use std::sync::{Arc, Mutex}; + use test_client::runtime::Block; + use tokio::runtime::current_thread; + + #[derive(Clone)] + struct TestApi {} + + impl ProvideRuntimeApi for TestApi { + type Api = RuntimeApi; + + fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> { + RuntimeApi {}.into() + } + } + + /// Blockchain database header backend. Does not perform any validation. + impl HeaderBackend for TestApi { + fn header( + &self, + _id: BlockId, + ) -> std::result::Result, client::error::Error> { + Ok(None) + } + + fn info(&self) -> client::blockchain::Info { + client::blockchain::Info { + best_hash: Default::default(), + best_number: Zero::zero(), + finalized_hash: Default::default(), + finalized_number: Zero::zero(), + genesis_hash: Default::default(), + } + } + + fn status( + &self, + _id: BlockId, + ) -> std::result::Result { + Ok(client::blockchain::BlockStatus::Unknown) + } + + fn number( + &self, + _hash: Block::Hash, + ) -> std::result::Result>, client::error::Error> { + Ok(None) + } + + fn hash( + &self, + _number: NumberFor, + ) -> std::result::Result, client::error::Error> { + Ok(None) + } + } + + struct RuntimeApi {} + + impl Core for RuntimeApi { + fn Core_version_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> std::result::Result, client::error::Error> { + unimplemented!("Not required for testing!") + } + + fn Core_execute_block_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<(Block)>, + _: Vec, + ) -> std::result::Result, client::error::Error> { + unimplemented!("Not required for testing!") + } + + fn Core_initialize_block_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<&::Header>, + _: Vec, + ) -> std::result::Result, client::error::Error> { + unimplemented!("Not required for testing!") + } + } + + impl ApiExt for RuntimeApi { + fn map_api_result std::result::Result, R, E>( + &self, + _: F, + ) -> std::result::Result { + unimplemented!("Not required for testing!") + } + + fn runtime_version_at( + &self, + _: &BlockId, + ) -> std::result::Result { + unimplemented!("Not required for testing!") + } + + fn record_proof(&mut self) { + unimplemented!("Not required for testing!") + } + + fn extract_proof(&mut self) -> Option>> { + unimplemented!("Not required for testing!") + } + } + + impl AuthorityDiscoveryApi for RuntimeApi { + fn AuthorityDiscoveryApi_authorities_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> std::result::Result>, client::error::Error> { + return Ok(NativeOrEncoded::Native(vec![ + AuthorityId("test-authority-id-1".as_bytes().to_vec()), + AuthorityId("test-authority-id-2".as_bytes().to_vec()), + ])); + } + fn AuthorityDiscoveryApi_sign_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<&std::vec::Vec>, + _: Vec, + ) -> std::result::Result< + NativeOrEncoded>, + client::error::Error, + > { + return Ok(NativeOrEncoded::Native(Some(( + Signature("test-signature-1".as_bytes().to_vec()), + AuthorityId("test-authority-id-1".as_bytes().to_vec()), + )))); + } + fn AuthorityDiscoveryApi_verify_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + args: Option<(&Vec, &Signature, &AuthorityId)>, + _: Vec, + ) -> std::result::Result, client::error::Error> { + if *args.unwrap().1 == Signature("test-signature-1".as_bytes().to_vec()) { + return Ok(NativeOrEncoded::Native(true)); + } + return Ok(NativeOrEncoded::Native(false)); + } + } + + #[derive(Default)] + struct TestNetwork { + // Whenever functions on `TestNetwork` are called, the function arguments are added to the vectors below. + pub put_value_call: Arc)>>>, + pub get_value_call: Arc>>, + pub set_priority_group_call: Arc)>>>, + } + + impl NetworkProvider for TestNetwork { + fn external_addresses(&self) -> Vec { + vec![] + } + fn local_peer_id(&self) -> libp2p::PeerId { + libp2p::PeerId::random() + } + fn set_priority_group( + &self, + group_id: String, + peers: HashSet, + ) -> std::result::Result<(), String> { + self.set_priority_group_call + .lock() + .unwrap() + .push((group_id, peers)); + Ok(()) + } + fn put_value(&self, key: libp2p::kad::record::Key, value: Vec) { + self.put_value_call.lock().unwrap().push((key, value)); + } + fn get_value(&self, key: &libp2p::kad::record::Key) { + self.get_value_call.lock().unwrap().push(key.clone()); + } + } + + #[test] + fn publish_own_ext_addresses_puts_record_on_dht() { + let (_dht_event_tx, dht_event_rx) = futures::sync::mpsc::channel(1000); + let test_api = Arc::new(TestApi {}); + let network: Arc = Arc::new(Default::default()); + + let mut authority_discovery = + AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx); + + authority_discovery.publish_own_ext_addresses().unwrap(); + + // Expect authority discovery to put a new record onto the dht. + assert_eq!(network.put_value_call.lock().unwrap().len(), 1); + } + + #[test] + fn request_addresses_of_others_triggers_dht_get_query() { + let (_dht_event_tx, dht_event_rx) = futures::sync::mpsc::channel(1000); + let test_api = Arc::new(TestApi {}); + let network: Arc = Arc::new(Default::default()); + + let mut authority_discovery = + AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx); + + authority_discovery.request_addresses_of_others().unwrap(); + + // Expect authority discovery to request new records from the dht. + assert_eq!(network.get_value_call.lock().unwrap().len(), 2); + } + + #[test] + fn handle_dht_events_with_value_found_should_call_set_priority_group() { + // Create authority discovery. + + let (mut dht_event_tx, dht_event_rx) = futures::sync::mpsc::channel(1000); + let test_api = Arc::new(TestApi {}); + let network: Arc = Arc::new(Default::default()); + + let mut authority_discovery = + AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx); + + // Create sample dht event. + + let authority_id_1 = hash_authority_id("test-authority-id-1".as_bytes()).unwrap(); + let address_1: libp2p::Multiaddr = "/ip6/2001:db8::".parse().unwrap(); + + let mut serialized_addresses = vec![]; + schema::AuthorityAddresses { + addresses: vec![address_1.to_vec()], + } + .encode(&mut serialized_addresses) + .unwrap(); + + let mut signed_addresses = vec![]; + schema::SignedAuthorityAddresses { + addresses: serialized_addresses, + signature: "test-signature-1".as_bytes().to_vec(), + } + .encode(&mut signed_addresses) + .unwrap(); + + let dht_event = network::DhtEvent::ValueFound(vec![(authority_id_1, signed_addresses)]); + dht_event_tx.try_send(dht_event).unwrap(); + + // Make authority discovery handle the event. + + let f = || { + authority_discovery.handle_dht_events().unwrap(); + + // Expect authority discovery to set the priority set. + assert_eq!(network.set_priority_group_call.lock().unwrap().len(), 1); + + assert_eq!( + network.set_priority_group_call.lock().unwrap()[0], + ( + "authorities".to_string(), + HashSet::from_iter(vec![address_1.clone()].into_iter()) + ) + ); + + Ok(Async::Ready(())) + }; + + let mut runtime = current_thread::Runtime::new().unwrap(); + runtime.block_on(poll_fn::<(), (), _>(f)).unwrap(); + } +} diff --git a/core/authority-discovery/src/schema/dht.proto b/core/authority-discovery/src/schema/dht.proto new file mode 100644 index 0000000000000..9dbe9d559f4b1 --- /dev/null +++ b/core/authority-discovery/src/schema/dht.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package authority_discovery; + +// First we need to serialize the addresses in order to be able to sign them. +message AuthorityAddresses { + repeated bytes addresses = 1; +} + +// Then we need to serialize addresses and signature to send them over the wire. +message SignedAuthorityAddresses { + bytes addresses = 1; + bytes signature = 2; +} diff --git a/core/network/src/lib.rs b/core/network/src/lib.rs index e797ffb208ec1..7e9fd51a41533 100644 --- a/core/network/src/lib.rs +++ b/core/network/src/lib.rs @@ -192,6 +192,7 @@ pub use service::{ NetworkStateInfo, }; pub use protocol::{PeerInfo, Context, consensus_gossip, message, specialization}; +pub use protocol::event::{Event, DhtEvent}; pub use protocol::sync::SyncState; pub use libp2p::{Multiaddr, PeerId}; #[doc(inline)] diff --git a/core/network/src/protocol/event.rs b/core/network/src/protocol/event.rs index c0c26da515f0b..c8bee5588c704 100644 --- a/core/network/src/protocol/event.rs +++ b/core/network/src/protocol/event.rs @@ -20,6 +20,7 @@ use libp2p::kad::record::Key; /// Events generated by DHT as a response to get_value and put_value requests. +#[derive(Debug, Clone)] pub enum DhtEvent { /// The value was found. ValueFound(Vec<(Key, Vec)>), @@ -35,6 +36,7 @@ pub enum DhtEvent { } /// Type for events generated by networking layer. +#[derive(Debug, Clone)] pub enum Event { /// Event generated by a DHT. Dht(DhtEvent), diff --git a/core/network/src/service.rs b/core/network/src/service.rs index c3f773e232e7a..ac6bd1ac05dd5 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -612,11 +612,11 @@ pub struct NetworkWorker, H: Ex light_client_rqs: Option>>, } -impl, H: ExHashT> Future for NetworkWorker { - type Item = (); +impl, H: ExHashT> Stream for NetworkWorker { + type Item = Event; type Error = io::Error; - fn poll(&mut self) -> Poll { + fn poll(&mut self) -> Poll, Self::Error> { // Poll the import queue for actions to perform. let _ = futures03::future::poll_fn(|cx| { self.import_queue.poll_actions(cx, &mut NetworkLink { @@ -636,7 +636,7 @@ impl, H: ExHashT> Future for Ne // Process the next message coming from the `NetworkService`. let msg = match self.from_worker.poll() { Ok(Async::Ready(Some(msg))) => msg, - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(None)), Ok(Async::NotReady) => break, }; @@ -677,8 +677,9 @@ impl, H: ExHashT> Future for Ne Ok(Async::Ready(Some(BehaviourOut::SubstrateAction(outcome)))) => outcome, Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => { self.network_service.user_protocol_mut() - .on_event(Event::Dht(ev)); - CustomMessageOutcome::None + .on_event(Event::Dht(ev.clone())); + + return Ok(Async::Ready(Some(Event::Dht(ev)))); }, Ok(Async::Ready(None)) => CustomMessageOutcome::None, Err(err) => { diff --git a/core/service/Cargo.toml b/core/service/Cargo.toml index 7afd59ebc0679..7f17d83931564 100644 --- a/core/service/Cargo.toml +++ b/core/service/Cargo.toml @@ -31,12 +31,14 @@ client = { package = "substrate-client", path = "../../core/client" } client_db = { package = "substrate-client-db", path = "../../core/client/db", features = ["kvdb-rocksdb"] } codec = { package = "parity-scale-codec", version = "1.0.0" } substrate-executor = { path = "../../core/executor" } +substrate-authority-discovery = { path = "../../core/authority-discovery"} transaction_pool = { package = "substrate-transaction-pool", path = "../../core/transaction-pool" } rpc-servers = { package = "substrate-rpc-servers", path = "../../core/rpc-servers" } rpc = { package = "substrate-rpc", path = "../../core/rpc" } tel = { package = "substrate-telemetry", path = "../../core/telemetry" } offchain = { package = "substrate-offchain", path = "../../core/offchain" } parity-multiaddr = { package = "parity-multiaddr", version = "0.5.0" } +authority-discovery-primitives = { package = "substrate-authority-discovery-primitives", path = "../authority-discovery/primitives", default-features = false } [dev-dependencies] substrate-test-runtime-client = { path = "../test-runtime/client" } diff --git a/core/service/src/builder.rs b/core/service/src/builder.rs index c675710e540a0..53cec940d7cc5 100644 --- a/core/service/src/builder.rs +++ b/core/service/src/builder.rs @@ -28,7 +28,7 @@ use futures::{prelude::*, sync::mpsc}; use futures03::{FutureExt as _, compat::Compat, StreamExt as _, TryStreamExt as _}; use keystore::{Store as Keystore, KeyStorePtr}; use log::{info, warn}; -use network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo}; +use network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo, DhtEvent}; use network::{config::BoxFinalityProofRequestBuilder, specialization::NetworkSpecialization}; use parking_lot::{Mutex, RwLock}; use primitives::{Blake2Hasher, H256, Hasher}; @@ -76,6 +76,7 @@ pub struct ServiceBuilder, rpc_extensions: TRpc, rpc_builder: TRpcB, + dht_event_tx: Option>, marker: PhantomData<(TBl, TRtApi)>, } @@ -197,6 +198,7 @@ where TGen: Serialize + DeserializeOwned + BuildStorage { transaction_pool: Arc::new(()), rpc_extensions: Default::default(), rpc_builder, + dht_event_tx: None, marker: PhantomData, }) } @@ -266,6 +268,7 @@ where TGen: Serialize + DeserializeOwned + BuildStorage { transaction_pool: Arc::new(()), rpc_extensions: Default::default(), rpc_builder, + dht_event_tx: None, marker: PhantomData, }) } @@ -312,6 +315,7 @@ impl, + ) -> Result, Error> { + Ok(ServiceBuilder { + config: self.config, + client: self.client, + backend: self.backend, + keystore: self.keystore, + fetcher: self.fetcher, + select_chain: self.select_chain, + import_queue: self.import_queue, + finality_proof_request_builder: self.finality_proof_request_builder, + finality_proof_provider: self.finality_proof_provider, + network_protocol: self.network_protocol, + transaction_pool: self.transaction_pool, + rpc_extensions: self.rpc_extensions, + rpc_builder: self.rpc_builder, + dht_event_tx: Some(dht_event_tx), + marker: self.marker, + }) + } } /// RPC handlers builder. @@ -798,6 +834,7 @@ ServiceBuilder< network_protocol, transaction_pool, rpc_extensions, + dht_event_tx, rpc_builder, ) = ( self.client, @@ -811,6 +848,7 @@ ServiceBuilder< self.network_protocol, self.transaction_pool, self.rpc_extensions, + self.dht_event_tx, self.rpc_builder, ); @@ -829,7 +867,8 @@ ServiceBuilder< finality_proof_provider, network_protocol, transaction_pool, - rpc_extensions + rpc_extensions, + dht_event_tx, )) }, |h, c, tx| maintain_transaction_pool(h, c, tx), diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 2056c8a2f2da3..9fc305560f45a 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -39,7 +39,7 @@ use client::{runtime_api::BlockT, Client}; use exit_future::Signal; use futures::prelude::*; use futures03::stream::{StreamExt as _, TryStreamExt as _}; -use network::{NetworkService, NetworkState, specialization::NetworkSpecialization}; +use network::{NetworkService, NetworkState, specialization::NetworkSpecialization, Event, DhtEvent}; use log::{log, warn, debug, error, Level}; use codec::{Encode, Decode}; use primitives::{Blake2Hasher, H256}; @@ -154,7 +154,8 @@ macro_rules! new_impl { finality_proof_provider, network_protocol, transaction_pool, - rpc_extensions + rpc_extensions, + dht_event_tx, ) = $build_components(&$config)?; let import_queue = Box::new(import_queue); let chain_info = client.info().chain; @@ -357,12 +358,14 @@ macro_rules! new_impl { let rpc_handlers = gen_handler(); let rpc = start_rpc_servers(&$config, gen_handler)?; + let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future( network_mut, client.clone(), network_status_sinks.clone(), system_rpc_rx, - has_bootnodes + has_bootnodes, + dht_event_tx, ) .map_err(|_| ()) .select(exit.clone()) @@ -653,6 +656,7 @@ fn build_network_future< status_sinks: Arc, NetworkState)>>>>, rpc_rx: futures03::channel::mpsc::UnboundedReceiver>, should_have_peers: bool, + dht_event_tx: Option>, ) -> impl Future { // Compatibility shim while we're transitionning to stable Futures. // See https://github.com/paritytech/substrate/issues/3099 @@ -730,11 +734,21 @@ fn build_network_future< } // Main network polling. - match network.poll() { - Ok(Async::NotReady) => {} - Err(err) => warn!(target: "service", "Error in network: {:?}", err), - Ok(Async::Ready(())) => warn!(target: "service", "Network service finished"), - } + while let Ok(Async::Ready(Some(Event::Dht(event)))) = network.poll().map_err(|err| { + warn!(target: "service", "Error in network: {:?}", err); + }) { + // Given that core/authority-discovery is the only upper stack consumer of Dht events at the moment, all Dht + // events are being passed on to the authority-discovery module. In the future there might be multiple + // consumers of these events. In that case this would need to be refactored to properly dispatch the events, + // e.g. via a subscriber model. + if let Some(Err(e)) = dht_event_tx.as_ref().map(|c| c.clone().try_send(event)) { + if e.is_full() { + warn!(target: "service", "Dht event channel to authority discovery is full, dropping event."); + } else if e.is_disconnected() { + warn!(target: "service", "Dht event channel to authority discovery is disconnected, dropping event."); + } + } + }; // Now some diagnostic for performances. let polling_dur = before_polling.elapsed(); diff --git a/node/cli/Cargo.toml b/node/cli/Cargo.toml index 1f35f7b86b41c..c28a517639654 100644 --- a/node/cli/Cargo.toml +++ b/node/cli/Cargo.toml @@ -46,7 +46,8 @@ system = { package = "srml-system", path = "../../srml/system" } balances = { package = "srml-balances", path = "../../srml/balances" } support = { package = "srml-support", path = "../../srml/support", default-features = false } im_online = { package = "srml-im-online", path = "../../srml/im-online", default-features = false } -authority-discovery = { package = "srml-authority-discovery", path = "../../srml/authority-discovery", default-features = false } +sr-authority-discovery = { package = "srml-authority-discovery", path = "../../srml/authority-discovery", default-features = false } +authority-discovery = { package = "substrate-authority-discovery", path = "../../core/authority-discovery"} [dev-dependencies] keystore = { package = "substrate-keystore", path = "../../core/keystore" } diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index 509de7a943815..8522ce6d124a4 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -103,6 +103,8 @@ macro_rules! new_full_start { macro_rules! new_full { ($config:expr) => {{ use futures::Future; + use futures::sync::mpsc; + use network::DhtEvent; let ( is_authority, @@ -118,10 +120,18 @@ macro_rules! new_full { let (builder, mut import_setup, inherent_data_providers, mut tasks_to_spawn) = new_full_start!($config); + // Dht event channel from the network to the authority discovery module. Use bounded channel to ensure + // back-pressure. Authority discovery is triggering one event per authority within the current authority set. + // This estimates the authority set size to be somewhere below 10 000 thereby setting the channel buffer size to + // 10 000. + let (dht_event_tx, dht_event_rx) = + mpsc::channel::(10000); + let service = builder.with_network_protocol(|_| Ok(crate::service::NodeProtocol::new()))? .with_finality_proof_provider(|client, backend| Ok(Arc::new(grandpa::FinalityProofProvider::new(backend, client)) as _) )? + .with_dht_event_tx(dht_event_tx)? .build()?; let (block_import, link_half, babe_link) = import_setup.take() @@ -162,6 +172,13 @@ macro_rules! new_full { let babe = babe::start_babe(babe_config)?; let select = babe.select(service.on_exit()).then(|_| Ok(())); service.spawn_task(Box::new(select)); + + let authority_discovery = authority_discovery::AuthorityDiscovery::new( + service.client(), + service.network(), + dht_event_rx, + ); + service.spawn_task(Box::new(authority_discovery)); } let config = grandpa::Config { diff --git a/node/runtime/src/lib.rs b/node/runtime/src/lib.rs index ded3fa483de8b..9d7ae5d9df85d 100644 --- a/node/runtime/src/lib.rs +++ b/node/runtime/src/lib.rs @@ -47,7 +47,9 @@ use elections::VoteIndex; use version::NativeVersion; use primitives::OpaqueMetadata; use grandpa::{AuthorityId as GrandpaId, AuthorityWeight as GrandpaWeight}; -use im_online::sr25519::{AuthorityId as ImOnlineId}; +use im_online::sr25519::{AuthorityId as ImOnlineId, AuthoritySignature as ImOnlineSignature}; +use authority_discovery_primitives::{AuthorityId as EncodedAuthorityId, Signature as EncodedSignature}; +use codec::{Encode, Decode}; use system::offchain::TransactionSubmitter; #[cfg(any(feature = "std", test))] @@ -191,7 +193,7 @@ impl authorship::Trait for Runtime { type EventHandler = Staking; } -type SessionHandlers = (Grandpa, Babe, ImOnline); +type SessionHandlers = (Grandpa, Babe, ImOnline, AuthorityDiscovery); impl_opaque_keys! { pub struct SessionKeys { @@ -617,20 +619,32 @@ impl_runtime_apis! { } } - impl authority_discovery_primitives::AuthorityDiscoveryApi for Runtime { - fn authority_id() -> Option { - AuthorityDiscovery::authority_id() - } - fn authorities() -> Vec { - AuthorityDiscovery::authorities() + impl authority_discovery_primitives::AuthorityDiscoveryApi for Runtime { + fn authorities() -> Vec { + AuthorityDiscovery::authorities().into_iter() + .map(|id| id.encode()) + .map(EncodedAuthorityId) + .collect() } - fn sign(payload: Vec, authority_id: ImOnlineId) -> Option> { - AuthorityDiscovery::sign(payload, authority_id) + fn sign(payload: &Vec) -> Option<(EncodedSignature, EncodedAuthorityId)> { + AuthorityDiscovery::sign(payload).map(|(sig, id)| { + (EncodedSignature(sig.encode()), EncodedAuthorityId(id.encode())) + }) } - fn verify(payload: Vec, signature: Vec, public_key: ImOnlineId) -> bool { - AuthorityDiscovery::verify(payload, signature, public_key) + fn verify(payload: &Vec, signature: &EncodedSignature, authority_id: &EncodedAuthorityId) -> bool { + let signature = match ImOnlineSignature::decode(&mut &signature.0[..]) { + Ok(s) => s, + _ => return false, + }; + + let authority_id = match ImOnlineId::decode(&mut &authority_id.0[..]) { + Ok(id) => id, + _ => return false, + }; + + AuthorityDiscovery::verify(payload, signature, authority_id) } } diff --git a/srml/authority-discovery/src/lib.rs b/srml/authority-discovery/src/lib.rs index 76b0c93c4ffb5..1c46822dfef6f 100644 --- a/srml/authority-discovery/src/lib.rs +++ b/srml/authority-discovery/src/lib.rs @@ -29,13 +29,14 @@ #![cfg_attr(not(feature = "std"), no_std)] use app_crypto::RuntimeAppPublic; -use codec::{Decode, Encode}; use rstd::prelude::*; use support::{decl_module, decl_storage, StorageValue}; pub trait Trait: system::Trait + session::Trait + im_online::Trait {} type AuthorityIdFor = ::AuthorityId; +type AuthoritySignatureFor = + <::AuthorityId as RuntimeAppPublic>::Signature; decl_storage! { trait Store for Module as AuthorityDiscovery { @@ -58,7 +59,7 @@ impl Module { /// set, otherwise this function returns None. The restriction might be /// softened in the future in case a consumer needs to learn own authority /// identifier. - pub fn authority_id() -> Option> { + fn authority_id() -> Option> { let authorities = Keys::::get(); let local_keys = >::all(); @@ -78,20 +79,19 @@ impl Module { } /// Sign the given payload with the private key corresponding to the given authority id. - pub fn sign(payload: Vec, authority_id: AuthorityIdFor) -> Option> { - authority_id.sign(&payload).map(|s| s.encode()) + pub fn sign(payload: &Vec) -> Option<(AuthoritySignatureFor, AuthorityIdFor)> { + let authority_id = Module::::authority_id()?; + authority_id.sign(payload).map(|s| (s, authority_id)) } /// Verify the given signature for the given payload with the given /// authority identifier. pub fn verify( - payload: Vec, - signature: Vec, + payload: &Vec, + signature: AuthoritySignatureFor, authority_id: AuthorityIdFor, ) -> bool { - as RuntimeAppPublic>::Signature::decode(&mut &signature[..]) - .map(|s| authority_id.verify(&payload, &s)) - .unwrap_or(false) + authority_id.verify(payload, &signature) } fn initialize_keys(keys: &[AuthorityIdFor]) { @@ -158,10 +158,7 @@ mod tests { pub struct TestOnSessionEnding; impl session::OnSessionEnding for TestOnSessionEnding { - fn on_session_ending( - _: SessionIndex, - _: SessionIndex, - ) -> Option> { + fn on_session_ending(_: SessionIndex, _: SessionIndex) -> Option> { None } } @@ -351,19 +348,13 @@ mod tests { externalities.set_keystore(key_store); with_externalities(&mut externalities, || { - let authority_id = AuthorityDiscovery::authority_id().expect("authority id"); let payload = String::from("test payload").into_bytes(); - let sig = - AuthorityDiscovery::sign(payload.clone(), authority_id.clone()).expect("signature"); + let (sig, authority_id) = AuthorityDiscovery::sign(&payload).expect("signature"); - assert!(AuthorityDiscovery::verify( - payload, - sig.clone(), - authority_id.clone() - )); + assert!(AuthorityDiscovery::verify(&payload, sig.clone(), authority_id.clone(),)); assert!(!AuthorityDiscovery::verify( - String::from("other payload").into_bytes(), + &String::from("other payload").into_bytes(), sig, authority_id )) diff --git a/srml/im-online/src/lib.rs b/srml/im-online/src/lib.rs index 6106b1e45c153..3fd57a2e24027 100644 --- a/srml/im-online/src/lib.rs +++ b/srml/im-online/src/lib.rs @@ -67,7 +67,7 @@ // Ensure we're `no_std` when compiling for Wasm. #![cfg_attr(not(feature = "std"), no_std)] -use app_crypto::{AppPublic, RuntimeAppPublic}; +use app_crypto::{AppPublic, RuntimeAppPublic, AppSignature}; use codec::{Encode, Decode}; use primitives::offchain::{OpaqueNetworkState, StorageKind}; use rstd::prelude::*;