Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
grandpa: reannounce voted blocks periodically (#3602)
Browse files Browse the repository at this point in the history
* grandpa: reannounce latest voted blocks periodically

* grandpa: add test for background block announcement

* grandpa: configurable delay for background block announcer

* grandpa: nits
  • Loading branch information
andresilva authored and gavofyork committed Sep 12, 2019
1 parent 6c89631 commit a0dd5e8
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 10 deletions.
15 changes: 11 additions & 4 deletions core/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
service: N,
validator: Arc<GossipValidator<B>>,
neighbor_sender: periodic::NeighborPacketSender<B>,
announce_sender: periodic::BlockAnnounceSender<B>,
}

impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
Expand Down Expand Up @@ -299,16 +300,19 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}

let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(service.clone());
let (announce_job, announce_sender) = periodic::block_announce_worker(service.clone());
let reporting_job = report_stream.consume(service.clone());

let bridge = NetworkBridge { service, validator, neighbor_sender };
let bridge = NetworkBridge { service, validator, neighbor_sender, announce_sender };

let startup_work = futures::future::lazy(move || {
// lazily spawn these jobs onto their own tasks. the lazy future has access
// to tokio globals, which aren't available outside.
let mut executor = tokio_executor::DefaultExecutor::current();
executor.spawn(Box::new(rebroadcast_job.select(on_exit.clone()).then(|_| Ok(()))))
.expect("failed to spawn grandpa rebroadcast job task");
executor.spawn(Box::new(announce_job.select(on_exit.clone()).then(|_| Ok(()))))
.expect("failed to spawn grandpa block announce job task");
executor.spawn(Box::new(reporting_job.select(on_exit.clone()).then(|_| Ok(()))))
.expect("failed to spawn grandpa reporting job task");
Ok(())
Expand Down Expand Up @@ -424,6 +428,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
network: self.service.clone(),
locals,
sender: tx,
announce_sender: self.announce_sender.clone(),
has_voted,
};

Expand Down Expand Up @@ -616,6 +621,7 @@ impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> {
service: self.service.clone(),
validator: Arc::clone(&self.validator),
neighbor_sender: self.neighbor_sender.clone(),
announce_sender: self.announce_sender.clone(),
}
}
}
Expand Down Expand Up @@ -662,6 +668,7 @@ struct OutgoingMessages<Block: BlockT, N: Network<Block>> {
set_id: SetIdNumber,
locals: Option<(AuthorityPair, AuthorityId)>,
sender: mpsc::UnboundedSender<SignedMessage<Block>>,
announce_sender: periodic::BlockAnnounceSender<Block>,
network: N,
has_voted: HasVoted<Block>,
}
Expand Down Expand Up @@ -719,10 +726,10 @@ impl<Block: BlockT, N: Network<Block>> Sink for OutgoingMessages<Block, N>
"block" => ?target_hash, "round" => ?self.round, "set_id" => ?self.set_id,
);

// announce our block hash to peers and propagate the
// message.
self.network.announce(target_hash);
// send the target block hash to the background block announcer
self.announce_sender.send(target_hash);

// propagate the message to peers
let topic = round_topic::<Block>(self.round, self.set_id);
self.network.gossip_message(topic, message.encode(), false);

Expand Down
159 changes: 153 additions & 6 deletions core/finality-grandpa/src/communication/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,27 @@

//! Periodic rebroadcast of neighbor packets.
use super::{gossip::{NeighborPacket, GossipMessage}, Network};
use std::collections::VecDeque;
use std::time::{Instant, Duration};

use codec::Encode;
use futures::prelude::*;
use futures::sync::mpsc;
use sr_primitives::traits::{NumberFor, Block as BlockT};
use network::PeerId;
use tokio_timer::Delay;
use log::{debug, warn};
use codec::Encode;
use tokio_timer::Delay;

use std::time::{Instant, Duration};
use network::PeerId;
use sr_primitives::traits::{NumberFor, Block as BlockT};
use super::{gossip::{NeighborPacket, GossipMessage}, Network};

// how often to rebroadcast, if no other
const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60);

/// The number of block hashes that we have previously voted on that we should
/// keep around for announcement. The current value should be enough for 3
/// rounds assuming we have prevoted and precommited on different blocks.
const LATEST_VOTED_BLOCKS_TO_ANNOUNCE: usize = 6;

fn rebroadcast_instant() -> Instant {
Instant::now() + REBROADCAST_AFTER
}
Expand All @@ -41,6 +48,7 @@ pub(super) struct NeighborPacketSender<B: BlockT>(
);

impl<B: BlockT> NeighborPacketSender<B> {
/// Send a neighbor packet for the background worker to gossip to peers.
pub fn send(
&self,
who: Vec<network::PeerId>,
Expand Down Expand Up @@ -106,3 +114,142 @@ pub(super) fn neighbor_packet_worker<B, N>(net: N) -> (

(work, NeighborPacketSender(tx))
}

/// A background worker for performing block announcements.
struct BlockAnnouncer<B: BlockT, N> {
net: N,
block_rx: mpsc::UnboundedReceiver<B::Hash>,
latest_voted_blocks: VecDeque<B::Hash>,
reannounce_after: Duration,
delay: Delay,
}

/// A background worker for announcing block hashes to peers. The worker keeps
/// track of `LATEST_VOTED_BLOCKS_TO_ANNOUNCE` and periodically announces these
/// blocks to all peers if no new blocks to announce are noted (i.e. presumably
/// GRANDPA progress is stalled).
pub(super) fn block_announce_worker<B: BlockT, N: Network<B>>(net: N) -> (
impl Future<Item = (), Error = ()>,
BlockAnnounceSender<B>,
) {
block_announce_worker_aux(net, REBROADCAST_AFTER)
}

#[cfg(test)]
pub(super) fn block_announce_worker_with_delay<B: BlockT, N: Network<B>>(
net: N,
reannounce_after: Duration,
) -> (
impl Future<Item = (), Error = ()>,
BlockAnnounceSender<B>,
) {
block_announce_worker_aux(net, reannounce_after)
}

fn block_announce_worker_aux<B: BlockT, N: Network<B>>(
net: N,
reannounce_after: Duration,
) -> (
impl Future<Item = (), Error = ()>,
BlockAnnounceSender<B>,
) {
let latest_voted_blocks = VecDeque::with_capacity(LATEST_VOTED_BLOCKS_TO_ANNOUNCE);

let (block_tx, block_rx) = mpsc::unbounded();

let announcer = BlockAnnouncer {
net,
block_rx,
latest_voted_blocks,
reannounce_after,
delay: Delay::new(Instant::now() + reannounce_after),
};

(announcer, BlockAnnounceSender(block_tx))
}


impl<B: BlockT, N> BlockAnnouncer<B, N> {
fn note_block(&mut self, block: B::Hash) -> bool {
if !self.latest_voted_blocks.contains(&block) {
if self.latest_voted_blocks.len() >= LATEST_VOTED_BLOCKS_TO_ANNOUNCE {
self.latest_voted_blocks.pop_front();
}

self.latest_voted_blocks.push_back(block);

true
} else {
false
}
}

fn reset_delay(&mut self) {
self.delay.reset(Instant::now() + self.reannounce_after);
}
}

impl<B: BlockT, N: Network<B>> Future for BlockAnnouncer<B, N> {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// note any new blocks to announce and announce them
loop {
match self.block_rx.poll().expect("unbounded receivers do not error; qed") {
Async::Ready(None) => return Ok(Async::Ready(())),
Async::Ready(Some(block)) => {
if self.note_block(block) {
self.net.announce(block);
self.reset_delay();
}
},
Async::NotReady => break,
}
}

// check the reannouncement delay timer, has to be done in a loop
// because it needs to be polled after re-scheduling.
loop {
match self.delay.poll() {
Err(e) => {
warn!(target: "afg", "Error in periodic block announcer timer: {:?}", e);
self.reset_delay();
},
// after the delay fires announce all blocks that we have
// stored. note that this only happens if we don't receive any
// new blocks above for the duration of `reannounce_after`.
Ok(Async::Ready(())) => {
self.reset_delay();

debug!(
target: "afg",
"Re-announcing latest voted blocks due to lack of progress: {:?}",
self.latest_voted_blocks,
);

for block in self.latest_voted_blocks.iter() {
self.net.announce(*block);
}
},
Ok(Async::NotReady) => return Ok(Async::NotReady),
}
}
}
}

/// A sender used to send block hashes to announce to a background job.
#[derive(Clone)]
pub(super) struct BlockAnnounceSender<B: BlockT>(mpsc::UnboundedSender<B::Hash>);

impl<B: BlockT> BlockAnnounceSender<B> {
/// Send a block hash for the background worker to announce.
pub fn send(
&self,
block: B::Hash,
) {
if let Err(err) = self.0.unbounded_send(block) {
debug!(target: "afg", "Failed to send block to background announcer: {:?}", err);
}
}
}
76 changes: 76 additions & 0 deletions core/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,3 +498,79 @@ fn peer_with_higher_view_leads_to_catch_up_request() {

current_thread::block_on_all(test).unwrap();
}

#[test]
fn periodically_reannounce_voted_blocks_on_stall() {
use futures::try_ready;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use parking_lot::Mutex;

let (tester, net) = make_test_network();
let (announce_worker, announce_sender) = super::periodic::block_announce_worker_with_delay(
net,
Duration::from_secs(1),
);

let hashes = Arc::new(Mutex::new(Vec::new()));

fn wait_all(tester: Tester, hashes: &[Hash]) -> impl Future<Item = Tester, Error = ()> {
struct WaitAll {
remaining_hashes: Arc<Mutex<HashSet<Hash>>>,
events_fut: Box<dyn Future<Item = Tester, Error = ()>>,
}

impl Future for WaitAll {
type Item = Tester;
type Error = ();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let tester = try_ready!(self.events_fut.poll());

if self.remaining_hashes.lock().is_empty() {
return Ok(Async::Ready(tester));
}

let remaining_hashes = self.remaining_hashes.clone();
self.events_fut = Box::new(tester.filter_network_events(move |event| match event {
Event::Announce(h) =>
remaining_hashes.lock().remove(&h) || panic!("unexpected announce"),
_ => false,
}));

self.poll()
}
}

WaitAll {
remaining_hashes: Arc::new(Mutex::new(hashes.iter().cloned().collect())),
events_fut: Box::new(futures::future::ok(tester)),
}
}

let test = tester
.and_then(move |tester| {
current_thread::spawn(announce_worker);
Ok(tester)
})
.and_then(|tester| {
// announce 12 blocks
for _ in 0..=12 {
let hash = Hash::random();
hashes.lock().push(hash);
announce_sender.send(hash);
}

// we should see an event for each of those announcements
wait_all(tester, &hashes.lock())
})
.and_then(|tester| {
// after a period of inactivity we should see the last
// `LATEST_VOTED_BLOCKS_TO_ANNOUNCE` being rebroadcast
wait_all(tester, &hashes.lock()[7..=12])
});

let mut runtime = current_thread::Runtime::new().unwrap();
runtime.block_on(test).unwrap();
}

0 comments on commit a0dd5e8

Please sign in to comment.