Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not drop lookups without peers while awaiting events #5839

Merged
merged 1 commit into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4;
/// Maximum time we allow a lookup to exist before assuming it is stuck and will never make
/// progress. Assume the worse case processing time per block component set * times max depth.
/// 15 * 2 * 32 = 16 minutes.
const LOOKUP_MAX_DURATION_SECS: usize = 15 * PARENT_DEPTH_TOLERANCE;
const LOOKUP_MAX_DURATION_STUCK_SECS: u64 = 15 * PARENT_DEPTH_TOLERANCE as u64;
/// The most common case of child-lookup without peers is receiving block components before the
/// attestation deadline when the node is lagging behind. Once peers start attesting for the child
/// lookup at most after 4 seconds, the lookup should gain peers.
const LOOKUP_MAX_DURATION_NO_PEERS_SECS: u64 = 10;

pub enum BlockComponent<E: EthSpec> {
Block(DownloadResult<Arc<SignedBeaconBlock<E>>>),
Expand Down Expand Up @@ -689,6 +693,53 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
}

/// Perform some prune operations on lookups on some interval
pub fn prune_lookups(&mut self) {
self.drop_lookups_without_peers();
self.drop_stuck_lookups();
}

/// Lookups without peers are allowed to exist for some time. See this common race condition:
///
/// 1. Receive unknown block parent event
/// 2. Create child lookup with zero peers
/// 3. Parent is processed, before receiving any attestation for the child block
/// 4. Child lookup is attempted to make progress but has no peers
/// 5. We receive an attestion for child block and add a peer to the child block lookup
///
/// On step 4 we could drop the lookup because we attempt to issue a request with no peers
/// available. This has two issues:
/// - We may drop the lookup while some other block component is processing, triggering an
/// unknown lookup error. This can potentially cause un-related child lookups to also be
/// dropped when calling `drop_lookup_and_children`.
/// - We lose all progress of the lookup, and have to re-download its components that we may
/// already have there cached.
///
/// Instead there's no negative for keeping lookups with no peers around for some time. If we
/// regularly prune them, it should not be a memory concern (TODO: maybe yes!).
fn drop_lookups_without_peers(&mut self) {
for (lookup_id, block_root) in self
.single_block_lookups
.values()
.filter(|lookup| {
// Do not drop lookup that are awaiting events to prevent inconsinstencies. If a
// lookup gets stuck, it will be eventually pruned by `drop_stuck_lookups`
lookup.has_no_peers()
&& lookup.elapsed_since_created()
> Duration::from_secs(LOOKUP_MAX_DURATION_NO_PEERS_SECS)
&& !lookup.is_awaiting_event()
})
.map(|lookup| (lookup.id, lookup.block_root()))
.collect::<Vec<_>>()
{
debug!(self.log, "Dropping lookup with no peers";
"id" => lookup_id,
"block_root" => ?block_root
);
self.drop_lookup_and_children(lookup_id);
}
}

/// Safety mechanism to unstuck lookup sync. Lookup sync if purely event driven and depends on
/// external components to feed it events to make progress. If there is a bug in network, in
/// beacon processor, or here internally: lookups can get stuck forever. A stuck lookup can
Expand All @@ -702,10 +753,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
///
/// - One single clear warn level log per stuck incident
/// - If the original bug is sporadic, it reduces the time a node is stuck from forever to 15 min
pub fn drop_stuck_lookups(&mut self) {
fn drop_stuck_lookups(&mut self) {
// While loop to find and drop all disjoint trees of potentially stuck lookups.
while let Some(stuck_lookup) = self.single_block_lookups.values().find(|lookup| {
lookup.elapsed_since_created() > Duration::from_secs(LOOKUP_MAX_DURATION_SECS as u64)
lookup.elapsed_since_created() > Duration::from_secs(LOOKUP_MAX_DURATION_STUCK_SECS)
}) {
let ancestor_stuck_lookup = match self.find_oldest_ancestor_lookup(stuck_lookup) {
Ok(lookup) => lookup,
Expand Down
53 changes: 37 additions & 16 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,17 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.block_root() == block_root
}

/// Get all unique peers that claim to have imported this set of block components
pub fn all_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.peers.iter()
/// Returns true if the block has already been downloaded.
pub fn both_components_processed(&self) -> bool {
self.block_request_state.state.is_processed()
&& self.blob_request_state.state.is_processed()
}

/// Returns true if this request is expecting some event to make progress
pub fn is_awaiting_event(&self) -> bool {
self.awaiting_parent.is_some()
|| self.block_request_state.state.is_awaiting_event()
|| self.blob_request_state.state.is_awaiting_event()
}

/// Makes progress on all requests of this lookup. Any error is not recoverable and must result
Expand Down Expand Up @@ -189,13 +197,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}

let Some(peer_id) = self.use_rand_available_peer() else {
if awaiting_parent {
// Allow lookups awaiting for a parent to have zero peers. If when the parent
// resolve they still have zero peers the lookup will fail gracefully.
return Ok(());
} else {
return Err(LookupRequestError::NoPeers);
}
// Allow lookup to not have any peers. In that case do nothing. If the lookup does
// not have peers for some time, it will be dropped.
return Ok(());
};

let request = R::request_state_mut(self);
Expand Down Expand Up @@ -226,18 +230,17 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
Ok(())
}

/// Get all unique peers that claim to have imported this set of block components
pub fn all_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.peers.iter()
}

/// Add peer to all request states. The peer must be able to serve this request.
/// Returns true if the peer was newly inserted into some request state.
pub fn add_peer(&mut self, peer_id: PeerId) -> bool {
self.peers.insert(peer_id)
}

/// Returns true if the block has already been downloaded.
pub fn both_components_processed(&self) -> bool {
self.block_request_state.state.is_processed()
&& self.blob_request_state.state.is_processed()
}

/// Remove peer from available peers. Return true if there are no more available peers and all
/// requests are not expecting any future event (AwaitingDownload).
pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool {
Expand Down Expand Up @@ -351,6 +354,24 @@ impl<T: Clone> SingleLookupRequestState<T> {
}
}

/// Returns true if we can expect some future event to progress this block component request
/// specifically.
pub fn is_awaiting_event(&self) -> bool {
match self.state {
// No event will progress this request specifically, but the request may be put on hold
// due to some external event
State::AwaitingDownload { .. } => false,
// Network will emit a download success / error event
State::Downloading { .. } => true,
// Not awaiting any external event
State::AwaitingProcess { .. } => false,
// Beacon processor will emit a processing result event
State::Processing { .. } => true,
// Request complete, no future event left
State::Processed { .. } => false,
}
}

pub fn peek_downloaded_data(&self) -> Option<&T> {
match &self.state {
State::AwaitingDownload => None,
Expand Down
11 changes: 6 additions & 5 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
futures::stream::iter(ee_responsiveness_watch.await).flatten()
};

// LOOKUP_MAX_DURATION_SECS is 60 seconds. Logging every 30 seconds allows enough timely
// visbility while being sparse and not increasing the debug log volume in a noticeable way
let mut interval = tokio::time::interval(Duration::from_secs(30));
// min(LOOKUP_MAX_DURATION_*) is 15 seconds. The cost of calling prune_lookups more often is
// one iteration over the single lookups HashMap. This map is supposed to be very small < 10
// unless there is a bug.
let mut prune_lookups_interval = tokio::time::interval(Duration::from_secs(15));

// process any inbound messages
loop {
Expand All @@ -560,8 +561,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
Some(engine_state) = check_ee_stream.next(), if check_ee => {
self.handle_new_execution_engine_state(engine_state);
}
_ = interval.tick() => {
self.block_lookups.drop_stuck_lookups();
_ = prune_lookups_interval.tick() => {
self.block_lookups.prune_lookups();
}
}
}
Expand Down
Loading