Skip to content

Commit

Permalink
Limit concurrent inbound gossipped block requests
Browse files Browse the repository at this point in the history
Uses the "load shed directly" design pattern from ZcashFoundation#1618.
  • Loading branch information
teor2345 committed Jan 22, 2021
1 parent 840fce4 commit f8a1152
Showing 1 changed file with 52 additions and 6 deletions.
58 changes: 52 additions & 6 deletions zebrad/src/components/inbound/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,30 @@ use zebra_state as zs;

type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

/// The maximum number of concurrent inbound download and verify tasks.
///
/// Set to one and a half checkpoint intervals, so that the inbound queue can
/// hold a complete checkpoint interval, if needed. We expect the syncer to
/// download and verify checkpoints, so this bound might never be reached.
const MAX_INBOUND_CONCURRENCY: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 3 / 2;

/// The action taken in response to a peer's gossipped block hash.
pub enum DownloadAction {
/// The block hash was successfully queued for download and verification.
AddedToQueue,

/// The block hash is already queued, so this request was ignored.
///
/// Another peer has already gossipped the same hash to us.
AlreadyQueued,

/// The queue is at capacity, so this request was ignored.
///
/// The sync service should discover this block later, when we are closer
/// to the tip. The queue's capacity is [`MAX_INBOUND_CONCURRENCY`].
FullQueue,
}

/// Manages download and verification of blocks gossiped to this peer.
#[pin_project]
#[derive(Debug)]
Expand Down Expand Up @@ -116,12 +140,27 @@ where

/// Queue a block for download and verification.
///
/// Returns true if the block was newly queued, and false if it was already queued.
/// Returns the action taken in response to the queue request.
#[instrument(skip(self, hash), fields(hash = %hash))]
pub fn download_and_verify(&mut self, hash: block::Hash) -> bool {
pub fn download_and_verify(&mut self, hash: block::Hash) -> DownloadAction {
if self.cancel_handles.contains_key(&hash) {
tracing::debug!("hash already queued for download");
return false;
tracing::debug!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"block hash already queued for inbound download: ignored block"
);
return DownloadAction::AlreadyQueued;
}

if self.pending.len() >= MAX_INBOUND_CONCURRENCY {
tracing::info!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"too many blocks queued for inbound download: ignored block"
);
return DownloadAction::FullQueue;
}

// This oneshot is used to signal cancellation to the download task.
Expand Down Expand Up @@ -182,7 +221,14 @@ where
"blocks are only queued once"
);

tracing::debug!("queued hash for download");
true
tracing::debug!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"queued hash for download"
);
metrics::gauge!("gossip.queued.block.count", self.pending.len() as _);

DownloadAction::AddedToQueue
}
}

0 comments on commit f8a1152

Please sign in to comment.