diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index b65f340652542..d966091a18f24 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -27,13 +27,18 @@ //! In the future, there will be a fallback for allowing sending the same message //! under certain conditions that are used to un-stick the protocol. -use std::sync::Arc; - use futures::{prelude::*, future::Executor as _, sync::mpsc}; -use futures03::{compat::Compat, stream::StreamExt, future::FutureExt as _, future::TryFutureExt as _}; +use futures03::{ + compat::Compat, + stream::StreamExt, + future::{Future as Future03, FutureExt as _, TryFutureExt as _}, +}; +use log::{debug, trace}; +use parking_lot::Mutex; +use std::{pin::Pin, sync::Arc, task::{Context, Poll as Poll03}}; + use finality_grandpa::Message::{Prevote, Precommit, PrimaryPropose}; use finality_grandpa::{voter, voter_set::VoterSet}; -use log::{debug, trace}; use sc_network::{NetworkService, ReputationChange}; use sc_network_gossip::{GossipEngine, Network as GossipNetwork}; use parity_scale_codec::{Encode, Decode}; @@ -134,9 +139,22 @@ pub(crate) struct NetworkBridge> { service: N, gossip_engine: GossipEngine, validator: Arc>, + + /// Sender side of the neighbor packet channel. + /// + /// Packets sent into this channel are processed by the `NeighborPacketWorker` and passed on to + /// the underlying `GossipEngine`. neighbor_sender: periodic::NeighborPacketSender, + + /// `NeighborPacketWorker` processing packets sent through the `NeighborPacketSender`. + // + // NetworkBridge is required to be clonable, thus one needs to be able to clone its children, + // thus one has to wrap neighor_packet_worker with an Arc Mutex. + neighbor_packet_worker: Arc>>, } +impl> Unpin for NetworkBridge {} + impl> NetworkBridge { /// Create a new NetworkBridge to the given NetworkService. Returns the service /// handle. @@ -195,14 +213,18 @@ impl> NetworkBridge { } } - let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(gossip_engine.clone()); + let (neighbor_packet_worker, neighbor_packet_sender) = periodic::NeighborPacketWorker::new(); let reporting_job = report_stream.consume(gossip_engine.clone()); - let bridge = NetworkBridge { service, gossip_engine, validator, neighbor_sender }; + let bridge = NetworkBridge { + service, + gossip_engine, + validator, + neighbor_sender: neighbor_packet_sender, + neighbor_packet_worker: Arc::new(Mutex::new(neighbor_packet_worker)), + }; let executor = Compat::new(executor); - executor.execute(Box::new(rebroadcast_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(())))) - .expect("failed to spawn grandpa rebroadcast job task"); executor.execute(Box::new(reporting_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(())))) .expect("failed to spawn grandpa reporting job task"); @@ -391,6 +413,21 @@ impl> NetworkBridge { } } +impl> Future03 for NetworkBridge { + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll03 { + loop { + match futures03::ready!((self.neighbor_packet_worker.lock()).poll_next_unpin(cx)) { + None => return Poll03::Ready( + Err(Error::Network("NeighborPacketWorker stream closed.".into())) + ), + Some((to, packet)) => self.gossip_engine.send_message(to, packet.encode()), + } + } + } +} + fn incoming_global( mut gossip_engine: GossipEngine, topic: B::Hash, @@ -530,6 +567,7 @@ impl> Clone for NetworkBridge { gossip_engine: self.gossip_engine.clone(), validator: Arc::clone(&self.validator), neighbor_sender: self.neighbor_sender.clone(), + neighbor_packet_worker: self.neighbor_packet_worker.clone(), } } } diff --git a/client/finality-grandpa/src/communication/periodic.rs b/client/finality-grandpa/src/communication/periodic.rs index a31203104b61f..d5c8c1e0b856c 100644 --- a/client/finality-grandpa/src/communication/periodic.rs +++ b/client/finality-grandpa/src/communication/periodic.rs @@ -16,21 +16,16 @@ //! Periodic rebroadcast of neighbor packets. -use std::time::{Instant, Duration}; - -use parity_scale_codec::Encode; -use futures::prelude::*; -use futures::sync::mpsc; use futures_timer::Delay; -use futures03::future::{FutureExt as _, TryFutureExt as _}; -use log::{debug, warn}; +use futures03::{channel::mpsc, future::{FutureExt as _}, prelude::*, ready, stream::Stream}; +use log::debug; +use std::{pin::Pin, task::{Context, Poll}, time::{Instant, Duration}}; use sc_network::PeerId; -use sc_network_gossip::GossipEngine; use sp_runtime::traits::{NumberFor, Block as BlockT}; use super::gossip::{NeighborPacket, GossipMessage}; -// how often to rebroadcast, if no other +// How often to rebroadcast, in cases where no new packets are created. const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60); fn rebroadcast_instant() -> Instant { @@ -56,56 +51,65 @@ impl NeighborPacketSender { } } -/// Does the work of sending neighbor packets, asynchronously. -/// -/// It may rebroadcast the last neighbor packet periodically when no -/// progress is made. -pub(super) fn neighbor_packet_worker(net: GossipEngine) -> ( - impl Future + Send + 'static, - NeighborPacketSender, -) where - B: BlockT, -{ - let mut last = None; - let (tx, mut rx) = mpsc::unbounded::<(Vec, NeighborPacket>)>(); - let mut delay = Delay::new(REBROADCAST_AFTER); - - let work = futures::future::poll_fn(move || { - loop { - match rx.poll().expect("unbounded receivers do not error; qed") { - Async::Ready(None) => return Ok(Async::Ready(())), - Async::Ready(Some((to, packet))) => { - // send to peers. - net.send_message(to.clone(), GossipMessage::::from(packet.clone()).encode()); - - // rebroadcasting network. - delay.reset(rebroadcast_instant()); - last = Some((to, packet)); - } - Async::NotReady => break, - } - } +/// NeighborPacketWorker is listening on a channel for new neighbor packets being produced by +/// components within `finality-grandpa` and forwards those packets to the underlying +/// `NetworkEngine` through the `NetworkBridge` that it is being polled by (see `Stream` +/// implementation). Periodically it sends out the last packet in cases where no new ones arrive. +pub(super) struct NeighborPacketWorker { + last: Option<(Vec, NeighborPacket>)>, + delay: Delay, + rx: mpsc::UnboundedReceiver<(Vec, NeighborPacket>)>, +} + +impl Unpin for NeighborPacketWorker {} + +impl NeighborPacketWorker { + pub(super) fn new() -> (Self, NeighborPacketSender){ + let (tx, rx) = mpsc::unbounded::<(Vec, NeighborPacket>)>(); + let delay = Delay::new(REBROADCAST_AFTER); + + (NeighborPacketWorker { + last: None, + delay, + rx, + }, NeighborPacketSender(tx)) + } +} - // has to be done in a loop because it needs to be polled after - // re-scheduling. - loop { - match (&mut delay).unit_error().compat().poll() { - Err(e) => { - warn!(target: "afg", "Could not rebroadcast neighbor packets: {:?}", e); - delay.reset(rebroadcast_instant()); - } - Ok(Async::Ready(())) => { - delay.reset(rebroadcast_instant()); - - if let Some((ref to, ref packet)) = last { - // send to peers. - net.send_message(to.clone(), GossipMessage::::from(packet.clone()).encode()); - } - } - Ok(Async::NotReady) => return Ok(Async::NotReady), +impl Stream for NeighborPacketWorker { + type Item = (Vec, GossipMessage); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> + { + let this = &mut *self; + match this.rx.poll_next_unpin(cx) { + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some((to, packet))) => { + this.delay.reset(rebroadcast_instant()); + this.last = Some((to.clone(), packet.clone())); + + return Poll::Ready(Some((to, GossipMessage::::from(packet.clone())))); } + // Don't return yet, maybe the timer fired. + Poll::Pending => {}, + }; + + ready!(this.delay.poll_unpin(cx)); + + // Getting this far here implies that the timer fired. + + this.delay.reset(rebroadcast_instant()); + + // Make sure the underlying task is scheduled for wake-up. + // + // Note: In case poll_unpin is called after the resetted delay fires again, this + // will drop one tick. Deemed as very unlikely and also not critical. + while let Poll::Ready(()) = this.delay.poll_unpin(cx) {}; + + if let Some((ref to, ref packet)) = this.last { + return Poll::Ready(Some((to.clone(), GossipMessage::::from(packet.clone())))); } - }); - (work, NeighborPacketSender(tx)) + return Poll::Pending; + } } diff --git a/client/finality-grandpa/src/lib.rs b/client/finality-grandpa/src/lib.rs index c0d83ba0196a9..f0d80c96394ec 100644 --- a/client/finality-grandpa/src/lib.rs +++ b/client/finality-grandpa/src/lib.rs @@ -650,12 +650,13 @@ struct VoterWork, RA, SC, VR> { voter: Box>> + Send>, env: Arc>, voter_commands_rx: mpsc::UnboundedReceiver>>, + network: futures03::compat::Compat>, } impl VoterWork where Block: BlockT, - N: NetworkT + Sync, + N: NetworkT + Sync, NumberFor: BlockNumberOps, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, @@ -681,7 +682,7 @@ where voting_rule, voters: Arc::new(voters), config, - network, + network: network.clone(), set_id: persistent_data.authority_set.set_id(), authority_set: persistent_data.authority_set.clone(), consensus_changes: persistent_data.consensus_changes.clone(), @@ -694,6 +695,7 @@ where voter: Box::new(futures::empty()) as Box<_>, env, voter_commands_rx, + network: futures03::future::TryFutureExt::compat(network), }; work.rebuild_voter(); work @@ -831,7 +833,7 @@ where impl Future for VoterWork where Block: BlockT, - N: NetworkT + Sync, + N: NetworkT + Sync, NumberFor: BlockNumberOps, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, @@ -878,6 +880,15 @@ where } } + match self.network.poll() { + Ok(Async::NotReady) => {}, + Ok(Async::Ready(())) => { + // the network bridge future should never conclude. + return Ok(Async::Ready(())) + } + e @ Err(_) => return e, + }; + Ok(Async::NotReady) } }