From 030f26fdae6f5376a02315eb7465587342097156 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 13 Jan 2020 21:27:45 +0100 Subject: [PATCH 1/5] client/finality-grandpa: Reintegrate periodic neighbor packet worker The `NeighborPacketWorker` within `client/finality-grandpa` does two things: 1. It receives neighbor packets from components within `client/finality-grandpa`, sends them down to the `GossipEngine` in order for neighboring nodes to receive. 2. It periodically sends out the most recent neighbor packet to the `GossipEngine`. In order to send out packets it had a clone to a `GossipEgine` within an atomic reference counter and a mutex. The `NeighborPacketWorker` was then spawned onto its own asynchronous task. Instead of running in its own task, this patch reintegrates the `NeighborPacketWorker` into the main `client/finality-grandpa` task not requiring the `NeighborPacketWorker` to own a clone of the `GossipEngine`. The greater picture This is a tiny change within a greater refactoring. The overall goal is to **simplify** how finality-grandpa interacts with the network and to **reduce** the amount of **unbounded channels** within the logic. Why no unbounded channels: Bounding channels is needed for backpressure and proper scheduling. With unbounded channels there is no way of telling the producer side to slow down for the consumer side to catch up. Rephrased, there is no way for the scheduler to know when to favour the consumer task over the producer task on a crowded channel and the other way round for an empty channel. Reducing the amount of shared ownership simplifies the logic and enables one to use async-await syntax-suggar, given that one does not need to hold a lock across poll invocations. Using async-await enables one to use bounded channels without complex logic. --- .../finality-grandpa/src/communication/mod.rs | 55 +++++++-- .../src/communication/periodic.rs | 113 +++++++++--------- client/finality-grandpa/src/lib.rs | 29 +++-- 3 files changed, 124 insertions(+), 73 deletions(-) diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index b65f340652542..d289feb1fc046 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -27,13 +27,18 @@ //! In the future, there will be a fallback for allowing sending the same message //! under certain conditions that are used to un-stick the protocol. -use std::sync::Arc; - use futures::{prelude::*, future::Executor as _, sync::mpsc}; -use futures03::{compat::Compat, stream::StreamExt, future::FutureExt as _, future::TryFutureExt as _}; +use futures03::{ + compat::Compat, + stream::StreamExt, + future::{Future as Future03, FutureExt as _, TryFutureExt as _}, +}; +use log::{debug, trace}; +use parking_lot::Mutex; +use std::{pin::Pin, sync::Arc, task::{Context, Poll as Poll03}}; + use finality_grandpa::Message::{Prevote, Precommit, PrimaryPropose}; use finality_grandpa::{voter, voter_set::VoterSet}; -use log::{debug, trace}; use sc_network::{NetworkService, ReputationChange}; use sc_network_gossip::{GossipEngine, Network as GossipNetwork}; use parity_scale_codec::{Encode, Decode}; @@ -134,7 +139,18 @@ pub(crate) struct NetworkBridge> { service: N, gossip_engine: GossipEngine, validator: Arc>, + + /// Sender side of the neighbor packet channel. + /// + /// Packets send into this channel are processed by the `NeighborPacketWorker` and passed on to + /// the underlying `GossipEngine`. neighbor_sender: periodic::NeighborPacketSender, + + /// `NeighborPacketWorker` processing packets send through the `NeighborPacketSender`. + // + // NetworkBridge is required to be clonable, thus one needs to be able to clone its children, + // thus one has to wrap neighor_packet_worker with an Arc Mutex. + neighbor_packet_worker: Arc>>, } impl> NetworkBridge { @@ -195,14 +211,18 @@ impl> NetworkBridge { } } - let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(gossip_engine.clone()); + let (neighbor_packet_worker, neighbor_packet_sender) = periodic::NeighborPacketWorker::new(); let reporting_job = report_stream.consume(gossip_engine.clone()); - let bridge = NetworkBridge { service, gossip_engine, validator, neighbor_sender }; + let bridge = NetworkBridge { + service, + gossip_engine, + validator, + neighbor_sender: neighbor_packet_sender, + neighbor_packet_worker: Arc::new(Mutex::new(neighbor_packet_worker)), + }; let executor = Compat::new(executor); - executor.execute(Box::new(rebroadcast_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(())))) - .expect("failed to spawn grandpa rebroadcast job task"); executor.execute(Box::new(reporting_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(())))) .expect("failed to spawn grandpa reporting job task"); @@ -391,6 +411,24 @@ impl> NetworkBridge { } } +impl> Future03 for NetworkBridge +where + NumberFor: Unpin, +{ + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll03 { + loop { + match futures03::ready!(self.neighbor_packet_worker.lock().poll_next_unpin(cx)) { + None => return Poll03::Ready( + Err(Error::Network("NeighborPacketWorker stream closed.".into())) + ), + Some((to, packet)) => self.gossip_engine.send_message(to, packet.encode()), + } + } + } +} + fn incoming_global( mut gossip_engine: GossipEngine, topic: B::Hash, @@ -530,6 +568,7 @@ impl> Clone for NetworkBridge { gossip_engine: self.gossip_engine.clone(), validator: Arc::clone(&self.validator), neighbor_sender: self.neighbor_sender.clone(), + neighbor_packet_worker: self.neighbor_packet_worker.clone(), } } } diff --git a/client/finality-grandpa/src/communication/periodic.rs b/client/finality-grandpa/src/communication/periodic.rs index a31203104b61f..0437479dccecd 100644 --- a/client/finality-grandpa/src/communication/periodic.rs +++ b/client/finality-grandpa/src/communication/periodic.rs @@ -16,21 +16,16 @@ //! Periodic rebroadcast of neighbor packets. -use std::time::{Instant, Duration}; - -use parity_scale_codec::Encode; -use futures::prelude::*; -use futures::sync::mpsc; use futures_timer::Delay; -use futures03::future::{FutureExt as _, TryFutureExt as _}; -use log::{debug, warn}; +use futures03::{channel::mpsc, future::{FutureExt as _}, prelude::*, ready, stream::Stream}; +use log::debug; +use std::{pin::Pin, task::{Context, Poll}, time::{Instant, Duration}}; use sc_network::PeerId; -use sc_network_gossip::GossipEngine; use sp_runtime::traits::{NumberFor, Block as BlockT}; use super::gossip::{NeighborPacket, GossipMessage}; -// how often to rebroadcast, if no other +// How often to rebroadcast, in cases where no new packets are created. const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60); fn rebroadcast_instant() -> Instant { @@ -56,56 +51,62 @@ impl NeighborPacketSender { } } -/// Does the work of sending neighbor packets, asynchronously. -/// -/// It may rebroadcast the last neighbor packet periodically when no -/// progress is made. -pub(super) fn neighbor_packet_worker(net: GossipEngine) -> ( - impl Future + Send + 'static, - NeighborPacketSender, -) where - B: BlockT, +pub(super) struct NeighborPacketWorker { + last: Option<(Vec, NeighborPacket>)>, + delay: Delay, + rx: mpsc::UnboundedReceiver<(Vec, NeighborPacket>)>, +} + +impl NeighborPacketWorker { + pub(super) fn new() -> (Self, NeighborPacketSender){ + let (tx, rx) = mpsc::unbounded::<(Vec, NeighborPacket>)>(); + let delay = Delay::new(REBROADCAST_AFTER); + + (NeighborPacketWorker { + last: None, + delay, + rx, + }, NeighborPacketSender(tx)) + } +} + +impl Stream for NeighborPacketWorker +where + NumberFor: Unpin, { - let mut last = None; - let (tx, mut rx) = mpsc::unbounded::<(Vec, NeighborPacket>)>(); - let mut delay = Delay::new(REBROADCAST_AFTER); - - let work = futures::future::poll_fn(move || { - loop { - match rx.poll().expect("unbounded receivers do not error; qed") { - Async::Ready(None) => return Ok(Async::Ready(())), - Async::Ready(Some((to, packet))) => { - // send to peers. - net.send_message(to.clone(), GossipMessage::::from(packet.clone()).encode()); - - // rebroadcasting network. - delay.reset(rebroadcast_instant()); - last = Some((to, packet)); - } - Async::NotReady => break, + type Item = (Vec, GossipMessage); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> + { + let this = &mut *self; + match this.rx.poll_next_unpin(cx) { + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some((to, packet))) => { + this.delay.reset(rebroadcast_instant()); + this.last = Some((to.clone(), packet.clone())); + + return Poll::Ready(Some((to, GossipMessage::::from(packet.clone())))); } - } + // Don't do anything, maybe the timer fired. + Poll::Pending => {}, + }; - // has to be done in a loop because it needs to be polled after - // re-scheduling. - loop { - match (&mut delay).unit_error().compat().poll() { - Err(e) => { - warn!(target: "afg", "Could not rebroadcast neighbor packets: {:?}", e); - delay.reset(rebroadcast_instant()); - } - Ok(Async::Ready(())) => { - delay.reset(rebroadcast_instant()); - - if let Some((ref to, ref packet)) = last { - // send to peers. - net.send_message(to.clone(), GossipMessage::::from(packet.clone()).encode()); - } - } - Ok(Async::NotReady) => return Ok(Async::NotReady), - } + ready!(this.delay.poll_unpin(cx)); + + // Getting this far here implies that the timer fired. + + this.delay.reset(rebroadcast_instant()); + + // Make sure the underlying task is scheduled for wake-up. + // + // Note: In case poll_unpin is called after the resetted delay fires again, this + // will drop one tick. Deemed as very unlikely and also not critical. + while let Poll::Ready(()) = this.delay.poll_unpin(cx) {}; + + if let Some((ref to, ref packet)) = this.last { + return Poll::Ready(Some((to.clone(), GossipMessage::::from(packet.clone())))); } - }); - (work, NeighborPacketSender(tx)) + return Poll::Pending; + } } diff --git a/client/finality-grandpa/src/lib.rs b/client/finality-grandpa/src/lib.rs index dbecd9c9a4b7f..9e16a84ae469f 100644 --- a/client/finality-grandpa/src/lib.rs +++ b/client/finality-grandpa/src/lib.rs @@ -551,10 +551,10 @@ pub fn run_grandpa_voter( Block::Hash: Ord, B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, - N: NetworkT + Send + Sync + Clone + 'static, + N: NetworkT + Send + Sync + Clone + Unpin + 'static, SC: SelectChain + 'static, VR: VotingRule> + Clone + 'static, - NumberFor: BlockNumberOps, + NumberFor: BlockNumberOps + Unpin, DigestFor: Encode, RA: Send + Sync + 'static, X: futures03::Future + Clone + Send + Unpin + 'static, @@ -650,13 +650,14 @@ struct VoterWork, RA, SC, VR> { voter: Box>> + Send>, env: Arc>, voter_commands_rx: mpsc::UnboundedReceiver>>, + network: futures03::compat::Compat>, } impl VoterWork where Block: BlockT, - N: NetworkT + Sync, - NumberFor: BlockNumberOps, + N: NetworkT + Sync + Unpin, + NumberFor: BlockNumberOps + Unpin, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, B: Backend + 'static, @@ -681,7 +682,7 @@ where voting_rule, voters: Arc::new(voters), config, - network, + network: network.clone(), set_id: persistent_data.authority_set.set_id(), authority_set: persistent_data.authority_set.clone(), consensus_changes: persistent_data.consensus_changes.clone(), @@ -694,6 +695,7 @@ where voter: Box::new(futures::empty()) as Box<_>, env, voter_commands_rx, + network: futures03::future::TryFutureExt::compat(network), }; work.rebuild_voter(); work @@ -831,8 +833,8 @@ where impl Future for VoterWork where Block: BlockT, - N: NetworkT + Sync, - NumberFor: BlockNumberOps, + N: NetworkT + Sync + Unpin, + NumberFor: BlockNumberOps + Unpin, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, B: Backend + 'static, @@ -878,6 +880,15 @@ where } } + match self.network.poll() { + Ok(Async::NotReady) => {}, + Ok(Async::Ready(())) => { + // the network bridge future should never conclude. + return Ok(Async::Ready(())) + } + e @ Err(_) => return e, + }; + Ok(Async::NotReady) } } @@ -889,9 +900,9 @@ pub fn run_grandpa( Block::Hash: Ord, B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, - N: NetworkT + Send + Sync + Clone + 'static, + N: NetworkT + Send + Sync + Clone + Unpin + 'static, SC: SelectChain + 'static, - NumberFor: BlockNumberOps, + NumberFor: BlockNumberOps + Unpin, DigestFor: Encode, RA: Send + Sync + 'static, VR: VotingRule> + Clone + 'static, From 799356ebef149009c5aa497ead2a6f980d9b8ab3 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 16 Jan 2020 14:59:51 +0100 Subject: [PATCH 2/5] client/finality-grandpa: Implement Unpin on structs instead of bound Instead of requiring `Unpin` on all generic trait bounds involved in anything ever `poll`ed, one can implement `Unpin` on the structs being `poll`ed themselves. --- client/finality-grandpa/src/communication/mod.rs | 9 ++++----- .../src/communication/periodic.rs | 9 ++++----- client/finality-grandpa/src/lib.rs | 16 ++++++++-------- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index d289feb1fc046..5bc15b98a7c78 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -153,6 +153,8 @@ pub(crate) struct NetworkBridge> { neighbor_packet_worker: Arc>>, } +impl> Unpin for NetworkBridge {} + impl> NetworkBridge { /// Create a new NetworkBridge to the given NetworkService. Returns the service /// handle. @@ -411,15 +413,12 @@ impl> NetworkBridge { } } -impl> Future03 for NetworkBridge -where - NumberFor: Unpin, -{ +impl> Future03 for NetworkBridge { type Output = Result<(), Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll03 { loop { - match futures03::ready!(self.neighbor_packet_worker.lock().poll_next_unpin(cx)) { + match futures03::ready!((self.neighbor_packet_worker.lock()).poll_next_unpin(cx)) { None => return Poll03::Ready( Err(Error::Network("NeighborPacketWorker stream closed.".into())) ), diff --git a/client/finality-grandpa/src/communication/periodic.rs b/client/finality-grandpa/src/communication/periodic.rs index 0437479dccecd..9b20777fc78d9 100644 --- a/client/finality-grandpa/src/communication/periodic.rs +++ b/client/finality-grandpa/src/communication/periodic.rs @@ -57,6 +57,8 @@ pub(super) struct NeighborPacketWorker { rx: mpsc::UnboundedReceiver<(Vec, NeighborPacket>)>, } +impl Unpin for NeighborPacketWorker {} + impl NeighborPacketWorker { pub(super) fn new() -> (Self, NeighborPacketSender){ let (tx, rx) = mpsc::unbounded::<(Vec, NeighborPacket>)>(); @@ -70,10 +72,7 @@ impl NeighborPacketWorker { } } -impl Stream for NeighborPacketWorker -where - NumberFor: Unpin, -{ +impl Stream for NeighborPacketWorker { type Item = (Vec, GossipMessage); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> @@ -87,7 +86,7 @@ where return Poll::Ready(Some((to, GossipMessage::::from(packet.clone())))); } - // Don't do anything, maybe the timer fired. + // Don't return yet, maybe the timer fired. Poll::Pending => {}, }; diff --git a/client/finality-grandpa/src/lib.rs b/client/finality-grandpa/src/lib.rs index 9e16a84ae469f..8b53bb63a9e1a 100644 --- a/client/finality-grandpa/src/lib.rs +++ b/client/finality-grandpa/src/lib.rs @@ -551,10 +551,10 @@ pub fn run_grandpa_voter( Block::Hash: Ord, B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, - N: NetworkT + Send + Sync + Clone + Unpin + 'static, + N: NetworkT + Send + Sync + Clone + 'static, SC: SelectChain + 'static, VR: VotingRule> + Clone + 'static, - NumberFor: BlockNumberOps + Unpin, + NumberFor: BlockNumberOps, DigestFor: Encode, RA: Send + Sync + 'static, X: futures03::Future + Clone + Send + Unpin + 'static, @@ -656,8 +656,8 @@ struct VoterWork, RA, SC, VR> { impl VoterWork where Block: BlockT, - N: NetworkT + Sync + Unpin, - NumberFor: BlockNumberOps + Unpin, + N: NetworkT + Sync, + NumberFor: BlockNumberOps, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, B: Backend + 'static, @@ -833,8 +833,8 @@ where impl Future for VoterWork where Block: BlockT, - N: NetworkT + Sync + Unpin, - NumberFor: BlockNumberOps + Unpin, + N: NetworkT + Sync, + NumberFor: BlockNumberOps, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, B: Backend + 'static, @@ -900,9 +900,9 @@ pub fn run_grandpa( Block::Hash: Ord, B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, - N: NetworkT + Send + Sync + Clone + Unpin + 'static, + N: NetworkT + Send + Sync + Clone + 'static, SC: SelectChain + 'static, - NumberFor: BlockNumberOps + Unpin, + NumberFor: BlockNumberOps, DigestFor: Encode, RA: Send + Sync + 'static, VR: VotingRule> + Clone + 'static, From 5ff36e64b237af2a235412129198ab6feb5889fb Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 16 Jan 2020 15:17:29 +0100 Subject: [PATCH 3/5] client/finality-grandpa/src/communication/periodic: Add doc comment for Worker --- client/finality-grandpa/src/communication/periodic.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client/finality-grandpa/src/communication/periodic.rs b/client/finality-grandpa/src/communication/periodic.rs index 9b20777fc78d9..d5c8c1e0b856c 100644 --- a/client/finality-grandpa/src/communication/periodic.rs +++ b/client/finality-grandpa/src/communication/periodic.rs @@ -51,6 +51,10 @@ impl NeighborPacketSender { } } +/// NeighborPacketWorker is listening on a channel for new neighbor packets being produced by +/// components within `finality-grandpa` and forwards those packets to the underlying +/// `NetworkEngine` through the `NetworkBridge` that it is being polled by (see `Stream` +/// implementation). Periodically it sends out the last packet in cases where no new ones arrive. pub(super) struct NeighborPacketWorker { last: Option<(Vec, NeighborPacket>)>, delay: Delay, From a72797b4bfc1ded4279a327433912a7c4fcdbdac Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 17 Jan 2020 10:58:17 +0100 Subject: [PATCH 4/5] client/finality-grandpa/src/communication/mod.rs: Fix typo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: André Silva --- client/finality-grandpa/src/communication/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index 5bc15b98a7c78..b1ee0da7f837c 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -142,7 +142,7 @@ pub(crate) struct NetworkBridge> { /// Sender side of the neighbor packet channel. /// - /// Packets send into this channel are processed by the `NeighborPacketWorker` and passed on to + /// Packets sent into this channel are processed by the `NeighborPacketWorker` and passed on to /// the underlying `GossipEngine`. neighbor_sender: periodic::NeighborPacketSender, From 01d8d4e83a3f34f016a384fbd662a3cef3b82619 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 17 Jan 2020 10:58:35 +0100 Subject: [PATCH 5/5] client/finality-grandpa/src/communication/mod.rs: Fix typo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: André Silva --- client/finality-grandpa/src/communication/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index b1ee0da7f837c..d966091a18f24 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -146,7 +146,7 @@ pub(crate) struct NetworkBridge> { /// the underlying `GossipEngine`. neighbor_sender: periodic::NeighborPacketSender, - /// `NeighborPacketWorker` processing packets send through the `NeighborPacketSender`. + /// `NeighborPacketWorker` processing packets sent through the `NeighborPacketSender`. // // NetworkBridge is required to be clonable, thus one needs to be able to clone its children, // thus one has to wrap neighor_packet_worker with an Arc Mutex.