Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Try to fix out of view statements #5177

Merged
merged 16 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 4 additions & 6 deletions node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ impl State {
// to all peers in the BlockEntry's known_by set who know about the block,
// excluding the peer in the source, if source has kind MessageSource::Peer.
let maybe_peer_id = source.peer_id();
let peers = entry
let mut peers = entry
.known_by
.keys()
.cloned()
Expand All @@ -729,8 +729,7 @@ impl State {

let assignments = vec![(assignment, claimed_candidate_index)];
let gossip_peers = &self.gossip_peers;
let peers =
util::choose_random_subset(|e| gossip_peers.contains(e), peers, MIN_GOSSIP_PEERS);
util::choose_random_subset(|e| gossip_peers.contains(e), &mut peers, MIN_GOSSIP_PEERS);

// Add the fingerprint of the assignment to the knowledge of each peer.
for peer in peers.iter() {
Expand Down Expand Up @@ -943,16 +942,15 @@ impl State {
// to all peers in the BlockEntry's known_by set who know about the block,
// excluding the peer in the source, if source has kind MessageSource::Peer.
let maybe_peer_id = source.peer_id();
let peers = entry
let mut peers = entry
.known_by
.keys()
.cloned()
.filter(|key| maybe_peer_id.as_ref().map_or(true, |id| id != key))
.collect::<Vec<_>>();

let gossip_peers = &self.gossip_peers;
let peers =
util::choose_random_subset(|e| gossip_peers.contains(e), peers, MIN_GOSSIP_PEERS);
util::choose_random_subset(|e| gossip_peers.contains(e), &mut peers, MIN_GOSSIP_PEERS);

// Add the fingerprint of the assignment to the knowledge of each peer.
for peer in peers.iter() {
Expand Down
6 changes: 3 additions & 3 deletions node/network/bitfield-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ async fn relay_message<Context>(

let _span = span.child("interested-peers");
// pass on the bitfield distribution to all interested peers
let interested_peers = peer_views
let mut interested_peers = peer_views
.iter()
.filter_map(|(peer, view)| {
// check interest in the peer in this message's relay parent
Expand All @@ -363,9 +363,9 @@ async fn relay_message<Context>(
}
})
.collect::<Vec<PeerId>>();
let interested_peers = util::choose_random_subset(
util::choose_random_subset(
|e| gossip_peers.contains(e),
interested_peers,
&mut interested_peers,
MIN_GOSSIP_PEERS,
);
interested_peers.iter().for_each(|peer| {
Expand Down
63 changes: 41 additions & 22 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use polkadot_node_network_protocol::{
IfDisconnected, PeerId, UnifiedReputationChange as Rep, View,
};
use polkadot_node_primitives::{SignedFullStatement, Statement, UncheckedSignedFullStatement};
use polkadot_node_subsystem_util::{self as util, MIN_GOSSIP_PEERS};
use polkadot_node_subsystem_util::{self as util, rand, MIN_GOSSIP_PEERS};

use polkadot_primitives::v2::{
AuthorityDiscoveryId, CandidateHash, CommittedCandidateReceipt, CompactStatement, Hash,
Expand Down Expand Up @@ -61,7 +61,6 @@ use util::runtime::RuntimeInfo;
use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};

use fatality::Nested;

vstakhov marked this conversation as resolved.
Show resolved Hide resolved
mod error;
pub use error::{Error, FatalError, JfyiError, Result};

Expand Down Expand Up @@ -115,16 +114,19 @@ const LOG_TARGET: &str = "parachain::statement-distribution";
const MAX_LARGE_STATEMENTS_PER_SENDER: usize = 20;

/// The statement distribution subsystem.
pub struct StatementDistributionSubsystem {
pub struct StatementDistributionSubsystem<R: rand::Rng> {
vstakhov marked this conversation as resolved.
Show resolved Hide resolved
/// Pointer to a keystore, which is required for determining this node's validator index.
keystore: SyncCryptoStorePtr,
/// Receiver for incoming large statement requests.
req_receiver: Option<IncomingRequestReceiver<request_v1::StatementFetchingRequest>>,
/// Prometheus metrics
metrics: Metrics,
/// PRG for peers selection logic
rng: R,
}

impl<Context> overseer::Subsystem<Context, SubsystemError> for StatementDistributionSubsystem
impl<Context, R: rand::Rng + Send + Sync + 'static> overseer::Subsystem<Context, SubsystemError>
for StatementDistributionSubsystem<R>
where
Context: SubsystemContext<Message = StatementDistributionMessage>,
Context: overseer::SubsystemContext<Message = StatementDistributionMessage>,
Expand All @@ -142,17 +144,6 @@ where
}
}

impl StatementDistributionSubsystem {
/// Create a new Statement Distribution Subsystem
pub fn new(
keystore: SyncCryptoStorePtr,
req_receiver: IncomingRequestReceiver<request_v1::StatementFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { keystore, req_receiver: Some(req_receiver), metrics }
}
}

#[derive(Default)]
struct RecentOutdatedHeads {
buf: VecDeque<Hash>,
Expand Down Expand Up @@ -906,6 +897,7 @@ async fn circulate_statement_and_dependents(
statement: SignedFullStatement,
priority_peers: Vec<PeerId>,
metrics: &Metrics,
rng: &mut impl rand::Rng,
) {
let active_head = match active_heads.get_mut(&relay_parent) {
Some(res) => res,
Expand All @@ -932,6 +924,7 @@ async fn circulate_statement_and_dependents(
stored,
priority_peers,
metrics,
rng,
)
.await,
)),
Expand Down Expand Up @@ -1019,6 +1012,7 @@ async fn circulate_statement<'a>(
stored: StoredStatement<'a>,
mut priority_peers: Vec<PeerId>,
metrics: &Metrics,
rng: &mut impl rand::Rng,
) -> Vec<PeerId> {
let fingerprint = stored.fingerprint();

Expand All @@ -1041,8 +1035,12 @@ async fn circulate_statement<'a>(
let priority_set: HashSet<&PeerId> = priority_peers.iter().collect();
peers_to_send.retain(|p| !priority_set.contains(p));

let mut peers_to_send =
util::choose_random_subset(|e| gossip_peers.contains(e), peers_to_send, MIN_GOSSIP_PEERS);
util::choose_random_subset_with_rng(
|e| gossip_peers.contains(e),
&mut peers_to_send,
rng,
MIN_GOSSIP_PEERS,
);
// We don't want to use less peers, than we would without any priority peers:
let min_size = std::cmp::max(peers_to_send.len(), MIN_GOSSIP_PEERS);
// Make set full:
Expand Down Expand Up @@ -1313,6 +1311,7 @@ async fn handle_incoming_message_and_circulate<'a>(
message: protocol_v1::StatementDistributionMessage,
req_sender: &mpsc::Sender<RequesterMessage>,
metrics: &Metrics,
rng: &mut impl rand::Rng,
) {
let handled_incoming = match peers.get_mut(&peer) {
Some(data) =>
Expand Down Expand Up @@ -1348,6 +1347,7 @@ async fn handle_incoming_message_and_circulate<'a>(
statement,
Vec::new(),
metrics,
rng,
)
.await;
}
Expand Down Expand Up @@ -1458,11 +1458,11 @@ async fn handle_incoming_message<'a>(
Ok(()) => {},
Err(DeniedStatement::NotUseful) => return None,
Err(DeniedStatement::UsefulButKnown) => {
report_peer(ctx, peer, BENEFIT_VALID_STATEMENT).await;
// Note a received statement in the peer data
peer_data
.receive(&relay_parent, &fingerprint, max_message_count)
.expect("checked in `check_can_receive` above; qed");
report_peer(ctx, peer, BENEFIT_VALID_STATEMENT).await;

return None
},
Expand Down Expand Up @@ -1563,6 +1563,7 @@ async fn update_peer_view_and_maybe_send_unlocked(
active_heads: &HashMap<Hash, ActiveHeadData>,
new_view: View,
metrics: &Metrics,
rng: &mut impl rand::Rng,
) {
let old_view = std::mem::replace(&mut peer_data.view, new_view);

Expand All @@ -1573,9 +1574,10 @@ async fn update_peer_view_and_maybe_send_unlocked(

let is_gossip_peer = gossip_peers.contains(&peer);
let lucky = is_gossip_peer ||
util::gen_ratio(
util::gen_ratio_rng(
util::MIN_GOSSIP_PEERS.saturating_sub(gossip_peers.len()),
util::MIN_GOSSIP_PEERS,
rng,
);

// Add entries for all relay-parents in the new view but not the old.
Expand All @@ -1602,6 +1604,7 @@ async fn handle_network_update(
req_sender: &mpsc::Sender<RequesterMessage>,
update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>,
metrics: &Metrics,
rng: &mut impl rand::Rng,
) {
match update {
NetworkBridgeEvent::PeerConnected(peer, role, maybe_authority) => {
Expand Down Expand Up @@ -1643,6 +1646,7 @@ async fn handle_network_update(
&*active_heads,
view,
metrics,
rng,
)
.await
}
Expand All @@ -1659,6 +1663,7 @@ async fn handle_network_update(
message,
req_sender,
metrics,
rng,
)
.await;
},
Expand All @@ -1675,6 +1680,7 @@ async fn handle_network_update(
&*active_heads,
view,
metrics,
rng,
)
.await,
None => (),
Expand All @@ -1686,7 +1692,17 @@ async fn handle_network_update(
}
}

impl StatementDistributionSubsystem {
impl<R: rand::Rng> StatementDistributionSubsystem<R> {
/// Create a new Statement Distribution Subsystem
pub fn new(
keystore: SyncCryptoStorePtr,
req_receiver: IncomingRequestReceiver<request_v1::StatementFetchingRequest>,
metrics: Metrics,
rng: R,
) -> Self {
Self { keystore, req_receiver: Some(req_receiver), metrics, rng }
}

async fn run(
mut self,
mut ctx: (impl SubsystemContext<Message = StatementDistributionMessage>
Expand Down Expand Up @@ -1808,7 +1824,7 @@ impl StatementDistributionSubsystem {
}

async fn handle_requester_message(
&self,
&mut self,
ctx: &mut impl SubsystemContext,
gossip_peers: &HashSet<PeerId>,
peers: &mut HashMap<PeerId, PeerData>,
Expand Down Expand Up @@ -1866,6 +1882,7 @@ impl StatementDistributionSubsystem {
message,
req_sender,
&self.metrics,
&mut self.rng,
)
.await;
}
Expand Down Expand Up @@ -1915,7 +1932,7 @@ impl StatementDistributionSubsystem {
}

async fn handle_subsystem_message(
&self,
&mut self,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
runtime: &mut RuntimeInfo,
peers: &mut HashMap<PeerId, PeerData>,
Expand Down Expand Up @@ -2027,6 +2044,7 @@ impl StatementDistributionSubsystem {
statement,
group_peers,
metrics,
&mut self.rng,
)
.await;
},
Expand All @@ -2041,6 +2059,7 @@ impl StatementDistributionSubsystem {
req_sender,
event,
metrics,
&mut self.rng,
)
.await;
},
Expand Down
Loading