Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

client/finality-grandpa: Reintegrate periodic neighbor packet worker #4631

Merged
merged 5 commits into from
Jan 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 47 additions & 8 deletions client/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@
//! In the future, there will be a fallback for allowing sending the same message
//! under certain conditions that are used to un-stick the protocol.

use std::sync::Arc;

use futures::{prelude::*, future::Executor as _, sync::mpsc};
use futures03::{compat::Compat, stream::StreamExt, future::FutureExt as _, future::TryFutureExt as _};
use futures03::{
compat::Compat,
stream::StreamExt,
future::{Future as Future03, FutureExt as _, TryFutureExt as _},
};
use log::{debug, trace};
use parking_lot::Mutex;
use std::{pin::Pin, sync::Arc, task::{Context, Poll as Poll03}};

use finality_grandpa::Message::{Prevote, Precommit, PrimaryPropose};
use finality_grandpa::{voter, voter_set::VoterSet};
use log::{debug, trace};
use sc_network::{NetworkService, ReputationChange};
use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
use parity_scale_codec::{Encode, Decode};
Expand Down Expand Up @@ -134,7 +139,18 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
service: N,
gossip_engine: GossipEngine<B>,
validator: Arc<GossipValidator<B>>,

/// Sender side of the neighbor packet channel.
///
/// Packets send into this channel are processed by the `NeighborPacketWorker` and passed on to
mxinden marked this conversation as resolved.
Show resolved Hide resolved
/// the underlying `GossipEngine`.
neighbor_sender: periodic::NeighborPacketSender<B>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
neighbor_sender: periodic::NeighborPacketSender<B>,
neighbor_packet_sender: periodic::NeighborPacketSender<B>,

Happy to do this change, but rather as a follow up to reduce the noise within this pull request.


/// `NeighborPacketWorker` processing packets send through the `NeighborPacketSender`.
mxinden marked this conversation as resolved.
Show resolved Hide resolved
//
// NetworkBridge is required to be clonable, thus one needs to be able to clone its children,
// thus one has to wrap neighor_packet_worker with an Arc Mutex.
neighbor_packet_worker: Arc<Mutex<periodic::NeighborPacketWorker<B>>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in order to reduce the amount of Mutexes, I introduce a new one here. This one is within the crate only and can be removed once NetworkBridge does not need to be clonable anymore.

}

impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
Expand Down Expand Up @@ -195,14 +211,18 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}
}

let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(gossip_engine.clone());
let (neighbor_packet_worker, neighbor_packet_sender) = periodic::NeighborPacketWorker::new();
let reporting_job = report_stream.consume(gossip_engine.clone());

let bridge = NetworkBridge { service, gossip_engine, validator, neighbor_sender };
let bridge = NetworkBridge {
service,
gossip_engine,
validator,
neighbor_sender: neighbor_packet_sender,
neighbor_packet_worker: Arc::new(Mutex::new(neighbor_packet_worker)),
};

let executor = Compat::new(executor);
executor.execute(Box::new(rebroadcast_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(()))))
.expect("failed to spawn grandpa rebroadcast job task");
executor.execute(Box::new(reporting_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(()))))
.expect("failed to spawn grandpa reporting job task");

Expand Down Expand Up @@ -391,6 +411,24 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}
}

impl<B: BlockT, N: Network<B>> Future03 for NetworkBridge<B, N>
where
NumberFor<B>: Unpin,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
NumberFor<B>: Unpin,

You probably have to add impl<B, N> Unpin for NetworkBridge<B, N> {} somewhere instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, thank you and thank you @tomaka ❤️

{
type Output = Result<(), Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Self::Output> {
loop {
match futures03::ready!(self.neighbor_packet_worker.lock().poll_next_unpin(cx)) {
None => return Poll03::Ready(
Err(Error::Network("NeighborPacketWorker stream closed.".into()))
),
Some((to, packet)) => self.gossip_engine.send_message(to, packet.encode()),
}
}
}
}

fn incoming_global<B: BlockT>(
mut gossip_engine: GossipEngine<B>,
topic: B::Hash,
Expand Down Expand Up @@ -530,6 +568,7 @@ impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> {
gossip_engine: self.gossip_engine.clone(),
validator: Arc::clone(&self.validator),
neighbor_sender: self.neighbor_sender.clone(),
neighbor_packet_worker: self.neighbor_packet_worker.clone(),
}
}
}
Expand Down
113 changes: 57 additions & 56 deletions client/finality-grandpa/src/communication/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,16 @@

//! Periodic rebroadcast of neighbor packets.

use std::time::{Instant, Duration};

use parity_scale_codec::Encode;
use futures::prelude::*;
use futures::sync::mpsc;
use futures_timer::Delay;
use futures03::future::{FutureExt as _, TryFutureExt as _};
use log::{debug, warn};
use futures03::{channel::mpsc, future::{FutureExt as _}, prelude::*, ready, stream::Stream};
use log::debug;
use std::{pin::Pin, task::{Context, Poll}, time::{Instant, Duration}};

use sc_network::PeerId;
use sc_network_gossip::GossipEngine;
use sp_runtime::traits::{NumberFor, Block as BlockT};
use super::gossip::{NeighborPacket, GossipMessage};

// how often to rebroadcast, if no other
// How often to rebroadcast, in cases where no new packets are created.
const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60);

fn rebroadcast_instant() -> Instant {
Expand All @@ -56,56 +51,62 @@ impl<B: BlockT> NeighborPacketSender<B> {
}
}

/// Does the work of sending neighbor packets, asynchronously.
///
/// It may rebroadcast the last neighbor packet periodically when no
/// progress is made.
pub(super) fn neighbor_packet_worker<B>(net: GossipEngine<B>) -> (
impl Future<Item = (), Error = ()> + Send + 'static,
NeighborPacketSender<B>,
) where
B: BlockT,
pub(super) struct NeighborPacketWorker<B: BlockT> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could use a doc-string

last: Option<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
delay: Delay,
rx: mpsc::UnboundedReceiver<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
}

impl<B: BlockT> NeighborPacketWorker<B> {
pub(super) fn new() -> (Self, NeighborPacketSender<B>){
let (tx, rx) = mpsc::unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>();
let delay = Delay::new(REBROADCAST_AFTER);

(NeighborPacketWorker {
last: None,
delay,
rx,
}, NeighborPacketSender(tx))
}
}

impl <B: BlockT> Stream for NeighborPacketWorker<B>
where
NumberFor<B>: Unpin,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
NumberFor<B>: Unpin,

{
let mut last = None;
let (tx, mut rx) = mpsc::unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>();
let mut delay = Delay::new(REBROADCAST_AFTER);

let work = futures::future::poll_fn(move || {
loop {
match rx.poll().expect("unbounded receivers do not error; qed") {
Async::Ready(None) => return Ok(Async::Ready(())),
Async::Ready(Some((to, packet))) => {
// send to peers.
net.send_message(to.clone(), GossipMessage::<B>::from(packet.clone()).encode());

// rebroadcasting network.
delay.reset(rebroadcast_instant());
last = Some((to, packet));
}
Async::NotReady => break,
type Item = (Vec<PeerId>, GossipMessage<B>);

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>
{
let this = &mut *self;
match this.rx.poll_next_unpin(cx) {
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some((to, packet))) => {
this.delay.reset(rebroadcast_instant());
this.last = Some((to.clone(), packet.clone()));

return Poll::Ready(Some((to, GossipMessage::<B>::from(packet.clone()))));
}
}
// Don't do anything, maybe the timer fired.
Poll::Pending => {},
};

// has to be done in a loop because it needs to be polled after
// re-scheduling.
loop {
match (&mut delay).unit_error().compat().poll() {
Err(e) => {
warn!(target: "afg", "Could not rebroadcast neighbor packets: {:?}", e);
delay.reset(rebroadcast_instant());
}
Ok(Async::Ready(())) => {
delay.reset(rebroadcast_instant());

if let Some((ref to, ref packet)) = last {
// send to peers.
net.send_message(to.clone(), GossipMessage::<B>::from(packet.clone()).encode());
}
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
}
ready!(this.delay.poll_unpin(cx));

// Getting this far here implies that the timer fired.

this.delay.reset(rebroadcast_instant());

// Make sure the underlying task is scheduled for wake-up.
//
// Note: In case poll_unpin is called after the resetted delay fires again, this
// will drop one tick. Deemed as very unlikely and also not critical.
while let Poll::Ready(()) = this.delay.poll_unpin(cx) {};

if let Some((ref to, ref packet)) = this.last {
return Poll::Ready(Some((to.clone(), GossipMessage::<B>::from(packet.clone()))));
}
});

(work, NeighborPacketSender(tx))
return Poll::Pending;
}
}
29 changes: 20 additions & 9 deletions client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,10 +551,10 @@ pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
Block::Hash: Ord,
B: Backend<Block> + 'static,
E: CallExecutor<Block> + Send + Sync + 'static,
N: NetworkT<Block> + Send + Sync + Clone + 'static,
N: NetworkT<Block> + Send + Sync + Clone + Unpin + 'static,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this Unpin

SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, Client<B, E, Block, RA>> + Clone + 'static,
NumberFor<Block>: BlockNumberOps,
NumberFor<Block>: BlockNumberOps + Unpin,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
NumberFor<Block>: BlockNumberOps + Unpin,
NumberFor<Block>: BlockNumberOps,

DigestFor<Block>: Encode,
RA: Send + Sync + 'static,
X: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
Expand Down Expand Up @@ -650,13 +650,14 @@ struct VoterWork<B, E, Block: BlockT, N: NetworkT<Block>, RA, SC, VR> {
voter: Box<dyn Future<Item = (), Error = CommandOrError<Block::Hash, NumberFor<Block>>> + Send>,
env: Arc<Environment<B, E, Block, N, RA, SC, VR>>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
network: futures03::compat::Compat<NetworkBridge<Block, N>>,
}

impl<B, E, Block, N, RA, SC, VR> VoterWork<B, E, Block, N, RA, SC, VR>
where
Block: BlockT,
N: NetworkT<Block> + Sync,
NumberFor<Block>: BlockNumberOps,
N: NetworkT<Block> + Sync + Unpin,
NumberFor<Block>: BlockNumberOps + Unpin,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, you get the idea

RA: 'static + Send + Sync,
E: CallExecutor<Block> + Send + Sync + 'static,
B: Backend<Block> + 'static,
Expand All @@ -681,7 +682,7 @@ where
voting_rule,
voters: Arc::new(voters),
config,
network,
network: network.clone(),
set_id: persistent_data.authority_set.set_id(),
authority_set: persistent_data.authority_set.clone(),
consensus_changes: persistent_data.consensus_changes.clone(),
Expand All @@ -694,6 +695,7 @@ where
voter: Box::new(futures::empty()) as Box<_>,
env,
voter_commands_rx,
network: futures03::future::TryFutureExt::compat(network),
};
work.rebuild_voter();
work
Expand Down Expand Up @@ -831,8 +833,8 @@ where
impl<B, E, Block, N, RA, SC, VR> Future for VoterWork<B, E, Block, N, RA, SC, VR>
where
Block: BlockT,
N: NetworkT<Block> + Sync,
NumberFor<Block>: BlockNumberOps,
N: NetworkT<Block> + Sync + Unpin,
NumberFor<Block>: BlockNumberOps + Unpin,
RA: 'static + Send + Sync,
E: CallExecutor<Block> + Send + Sync + 'static,
B: Backend<Block> + 'static,
Expand Down Expand Up @@ -878,6 +880,15 @@ where
}
}

match self.network.poll() {
Ok(Async::NotReady) => {},
Ok(Async::Ready(())) => {
// the network bridge future should never conclude.
return Ok(Async::Ready(()))
}
e @ Err(_) => return e,
};

Ok(Async::NotReady)
}
}
Expand All @@ -889,9 +900,9 @@ pub fn run_grandpa<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
Block::Hash: Ord,
B: Backend<Block> + 'static,
E: CallExecutor<Block> + Send + Sync + 'static,
N: NetworkT<Block> + Send + Sync + Clone + 'static,
N: NetworkT<Block> + Send + Sync + Clone + Unpin + 'static,
SC: SelectChain<Block> + 'static,
NumberFor<Block>: BlockNumberOps,
NumberFor<Block>: BlockNumberOps + Unpin,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong

DigestFor<Block>: Encode,
RA: Send + Sync + 'static,
VR: VotingRule<Block, Client<B, E, Block, RA>> + Clone + 'static,
Expand Down