Skip to content

Commit

Permalink
Limit initial candidate set fanout to the number of initial peers
Browse files Browse the repository at this point in the history
If there is a small number of initial peers, and they are slow, the
initial candidate set update can appear to hang. To avoid this issue,
limit the initial candidate set fanout to the number of initial peers.

Once the initial peers have sent us more peer addresses, there is no need
to limit the fanouts for future updates.

Reported by Niklas Long of Equilibrium.
  • Loading branch information
teor2345 committed May 17, 2021
1 parent 679920f commit 458c26f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 5 deletions.
25 changes: 22 additions & 3 deletions zebra-network/src/peer_set/candidate_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,25 @@ where
}
}

/// Update the peer set from the network.
/// Update the peer set from the network, using the default fanout limit.
///
/// See `update_initial` for details.
pub async fn update(&mut self) -> Result<(), BoxError> {
self.update_inner(None).await
}

/// Update the peer set from the network, limiting the fanout to
/// `fanout_limit`.
///
/// - Ask a few live `Responded` peers to send us more peers.
/// - Process all completed peer responses, adding new peers in the
/// `NeverAttempted` state.
///
/// ## Correctness
///
/// Pass the initial peer set size as `fanout_limit` during initialization,
/// so that Zebra does not send duplicate requests to the same peer.
///
/// The crawler exits when update returns an error, so it must only return
/// errors on permanent failures.
///
Expand All @@ -148,7 +159,15 @@ where
/// `report_failed` puts peers into the `Failed` state.
///
/// `next` puts peers into the `AttemptPending` state.
pub async fn update(&mut self) -> Result<(), BoxError> {
pub async fn update_initial(&mut self, fanout_limit: usize) -> Result<(), BoxError> {
self.update_inner(Some(fanout_limit)).await
}

/// Update the peer set from the network, limiting the fanout to
/// `fanout_limit`.
///
/// See `update_initial` for details.
async fn update_inner(&mut self, fanout_limit: Option<usize>) -> Result<(), BoxError> {
// Opportunistically crawl the network on every update call to ensure
// we're actively fetching peers. Continue independently of whether we
// actually receive any peers, but always ask the network for more.
Expand All @@ -159,7 +178,7 @@ where
// called while the peer set is already loaded.
let mut responses = FuturesUnordered::new();
trace!("sending GetPeers requests");
for _ in 0..constants::GET_ADDR_FANOUT {
for _ in 0..fanout_limit.unwrap_or(constants::GET_ADDR_FANOUT) {
// CORRECTNESS
//
// Use a timeout to avoid deadlocks when there are no connected
Expand Down
7 changes: 5 additions & 2 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,14 @@ where
);

// 2. Initial peers, specified in the config.
let (initial_peer_count_tx, initial_peer_count_rx) = tokio::sync::oneshot::channel();
let initial_peers_fut = {
let config = config.clone();
let outbound_connector = outbound_connector.clone();
let peerset_tx = peerset_tx.clone();
async move {
let initial_peers = config.initial_peers().await;
let _ = initial_peer_count_tx.send(initial_peers.len());
// Connect the tx end to the 3 peer sources:
add_initial_peers(initial_peers, outbound_connector, peerset_tx).await
}
Expand All @@ -157,10 +159,11 @@ where
// We need to await candidates.update() here, because zcashd only sends one
// `addr` message per connection, and if we only have one initial peer we
// need to ensure that its `addr` message is used by the crawler.
// XXX this should go in CandidateSet::new, but we need init() -> Result<_,_>

info!("Sending initial request for peers");
let _ = candidates.update().await;
let _ = candidates
.update_initial(initial_peer_count_rx.await.expect("value sent before drop"))
.await;

for _ in 0..config.peerset_initial_target_size {
let _ = demand_tx.try_send(());
Expand Down

0 comments on commit 458c26f

Please sign in to comment.