diff --git a/client/finality-grandpa/src/communication/gossip.rs b/client/finality-grandpa/src/communication/gossip.rs index 1135cc4f8674f..ec74393d80ff1 100644 --- a/client/finality-grandpa/src/communication/gossip.rs +++ b/client/finality-grandpa/src/communication/gossip.rs @@ -83,15 +83,15 @@ //! We only send polite messages to peers, use sp_runtime::traits::{NumberFor, Block as BlockT, Zero}; -use sc_network_gossip::{GossipEngine, MessageIntent, ValidatorContext}; +use sc_network_gossip::{MessageIntent, ValidatorContext}; use sc_network::{config::Roles, PeerId, ReputationChange}; use parity_scale_codec::{Encode, Decode}; use sp_finality_grandpa::AuthorityId; use sc_telemetry::{telemetry, CONSENSUS_DEBUG}; -use log::{trace, debug, warn}; +use log::{trace, debug}; use futures::prelude::*; -use futures::sync::mpsc; +use futures03::channel::mpsc; use rand::seq::SliceRandom; use crate::{environment, CatchUp, CompactCommit, SignedMessage}; @@ -1178,7 +1178,7 @@ impl GossipValidator { pub(super) fn new( config: crate::Config, set_state: environment::SharedVoterSetState, - ) -> (GossipValidator, ReportStream) { + ) -> (GossipValidator, mpsc::UnboundedReceiver) { let (tx, rx) = mpsc::unbounded(); let val = GossipValidator { inner: parking_lot::RwLock::new(Inner::new(config)), @@ -1186,7 +1186,7 @@ impl GossipValidator { report_sender: tx, }; - (val, ReportStream { reports: rx }) + (val, rx) } /// Note a round in the current set has started. @@ -1445,57 +1445,9 @@ impl sc_network_gossip::Validator for GossipValidator, -} - -impl ReportStream { - /// Consume the report stream, converting it into a future that - /// handles all reports. - pub(super) fn consume(self, net: GossipEngine) - -> impl Future + Send + 'static - where - B: BlockT, - { - ReportingTask { - reports: self.reports, - net, - } - } -} - -/// A future for reporting peers. -#[must_use = "Futures do nothing unless polled"] -struct ReportingTask { - reports: mpsc::UnboundedReceiver, - net: GossipEngine, -} - -impl Future for ReportingTask { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - loop { - match self.reports.poll() { - Err(_) => { - warn!(target: "afg", "Report stream terminated unexpectedly"); - return Ok(Async::Ready(())) - } - Ok(Async::Ready(None)) => return Ok(Async::Ready(())), - Ok(Async::Ready(Some(PeerReport { who, cost_benefit }))) => - self.net.report(who, cost_benefit), - Ok(Async::NotReady) => return Ok(Async::NotReady), - } - } - } +pub(super) struct PeerReport { + pub who: PeerId, + pub cost_benefit: ReputationChange, } #[cfg(test)] diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index d966091a18f24..7723047d1b423 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -27,11 +27,12 @@ //! In the future, there will be a fallback for allowing sending the same message //! under certain conditions that are used to un-stick the protocol. -use futures::{prelude::*, future::Executor as _, sync::mpsc}; +use futures::{prelude::*, sync::mpsc}; use futures03::{ + channel::mpsc as mpsc03, compat::Compat, + future::{Future as Future03}, stream::StreamExt, - future::{Future as Future03, FutureExt as _, TryFutureExt as _}, }; use log::{debug, trace}; use parking_lot::Mutex; @@ -52,7 +53,12 @@ use crate::{ }; use crate::environment::HasVoted; use gossip::{ - GossipMessage, FullCatchUpMessage, FullCommitMessage, VoteMessage, GossipValidator + FullCatchUpMessage, + FullCommitMessage, + GossipMessage, + GossipValidator, + PeerReport, + VoteMessage, }; use sp_finality_grandpa::{ AuthorityPair, AuthorityId, AuthoritySignature, SetId as SetIdNumber, RoundNumber, @@ -148,9 +154,18 @@ pub(crate) struct NetworkBridge> { /// `NeighborPacketWorker` processing packets sent through the `NeighborPacketSender`. // - // NetworkBridge is required to be clonable, thus one needs to be able to clone its children, - // thus one has to wrap neighor_packet_worker with an Arc Mutex. + // `NetworkBridge` is required to be clonable, thus one needs to be able to clone its children, + // thus one has to wrap neighor_packet_worker with an `Arc` `Mutex`. neighbor_packet_worker: Arc>>, + + /// Receiver side of the peer report stream populated by the gossip validator, forwarded to the + /// gossip engine. + // + // `NetworkBridge` is required to be clonable, thus one needs to be able to clone its children, + // thus one has to wrap gossip_validator_report_stream with an `Arc` `Mutex`. Given that it is + // just an `UnboundedReceiver`, one could also switch to a multi-producer-*multi*-consumer + // channel implementation. + gossip_validator_report_stream: Arc>>, } impl> Unpin for NetworkBridge {} @@ -165,7 +180,6 @@ impl> NetworkBridge { config: crate::Config, set_state: crate::environment::SharedVoterSetState, executor: &impl futures03::task::Spawn, - on_exit: impl futures03::Future + Clone + Send + Unpin + 'static, ) -> Self { let (validator, report_stream) = GossipValidator::new( config, @@ -214,7 +228,6 @@ impl> NetworkBridge { } let (neighbor_packet_worker, neighbor_packet_sender) = periodic::NeighborPacketWorker::new(); - let reporting_job = report_stream.consume(gossip_engine.clone()); let bridge = NetworkBridge { service, @@ -222,12 +235,9 @@ impl> NetworkBridge { validator, neighbor_sender: neighbor_packet_sender, neighbor_packet_worker: Arc::new(Mutex::new(neighbor_packet_worker)), + gossip_validator_report_stream: Arc::new(Mutex::new(report_stream)), }; - let executor = Compat::new(executor); - executor.execute(Box::new(reporting_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(())))) - .expect("failed to spawn grandpa reporting job task"); - bridge } @@ -418,13 +428,30 @@ impl> Future03 for NetworkBridge { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll03 { loop { - match futures03::ready!((self.neighbor_packet_worker.lock()).poll_next_unpin(cx)) { - None => return Poll03::Ready( - Err(Error::Network("NeighborPacketWorker stream closed.".into())) + match self.neighbor_packet_worker.lock().poll_next_unpin(cx) { + Poll03::Ready(Some((to, packet))) => { + self.gossip_engine.send_message(to, packet.encode()); + }, + Poll03::Ready(None) => return Poll03::Ready( + Err(Error::Network("Neighbor packet worker stream closed.".into())) ), - Some((to, packet)) => self.gossip_engine.send_message(to, packet.encode()), + Poll03::Pending => break, } } + + loop { + match self.gossip_validator_report_stream.lock().poll_next_unpin(cx) { + Poll03::Ready(Some(PeerReport { who, cost_benefit })) => { + self.gossip_engine.report(who, cost_benefit); + }, + Poll03::Ready(None) => return Poll03::Ready( + Err(Error::Network("Gossip validator report stream closed.".into())) + ), + Poll03::Pending => break, + } + } + + Poll03::Pending } } @@ -568,6 +595,7 @@ impl> Clone for NetworkBridge { validator: Arc::clone(&self.validator), neighbor_sender: self.neighbor_sender.clone(), neighbor_packet_worker: self.neighbor_packet_worker.clone(), + gossip_validator_report_stream: self.gossip_validator_report_stream.clone(), } } } diff --git a/client/finality-grandpa/src/communication/tests.rs b/client/finality-grandpa/src/communication/tests.rs index a016940a05652..c104af0339260 100644 --- a/client/finality-grandpa/src/communication/tests.rs +++ b/client/finality-grandpa/src/communication/tests.rs @@ -172,7 +172,6 @@ fn make_test_network(executor: &impl futures03::task::Spawn) -> ( config(), voter_set_state(), executor, - Exit, ); ( diff --git a/client/finality-grandpa/src/lib.rs b/client/finality-grandpa/src/lib.rs index f0d80c96394ec..071214961f9a1 100644 --- a/client/finality-grandpa/src/lib.rs +++ b/client/finality-grandpa/src/lib.rs @@ -584,7 +584,6 @@ pub fn run_grandpa_voter( config.clone(), persistent_data.set_state.clone(), &executor, - on_exit.clone(), ); register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?; diff --git a/client/finality-grandpa/src/observer.rs b/client/finality-grandpa/src/observer.rs index b97d80a33c3d9..989a1e1655e8d 100644 --- a/client/finality-grandpa/src/observer.rs +++ b/client/finality-grandpa/src/observer.rs @@ -178,7 +178,6 @@ pub fn run_grandpa_observer( config.clone(), persistent_data.set_state.clone(), &executor, - on_exit.clone(), ); let observer_work = ObserverWork::new( diff --git a/client/finality-grandpa/src/tests.rs b/client/finality-grandpa/src/tests.rs index fc0b6d17ae0d7..9ad12c6c317ad 100644 --- a/client/finality-grandpa/src/tests.rs +++ b/client/finality-grandpa/src/tests.rs @@ -25,7 +25,7 @@ use sc_network_test::{ use sc_network::config::{ProtocolConfig, Roles, BoxFinalityProofRequestBuilder}; use parking_lot::Mutex; use futures_timer::Delay; -use futures03::{StreamExt as _, TryStreamExt as _}; +use futures03::TryStreamExt as _; use tokio::runtime::current_thread; use sp_keyring::Ed25519Keyring; use sc_client::LongestChain; @@ -1270,7 +1270,6 @@ fn voter_persists_its_votes() { config.clone(), set_state, &threads_pool, - Exit, ); let (round_rx, round_tx) = network.round_communication( @@ -1675,7 +1674,6 @@ fn grandpa_environment_respects_voting_rules() { config.clone(), set_state.clone(), &threads_pool, - Exit, ); Environment {