Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/stable' into unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Jun 28, 2024
2 parents 784ef5f + 9e12c21 commit a64cee3
Show file tree
Hide file tree
Showing 18 changed files with 294 additions and 200 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion beacon_node/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "beacon_node"
version = "5.2.0"
version = "5.2.1"
authors = [
"Paul Hauner <paul@paulhauner.com>",
"Age Manning <Age@AgeManning.com",
Expand Down
31 changes: 0 additions & 31 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,37 +352,6 @@ where
!matches!(self.state, HandlerState::Deactivated)
}

// NOTE: This function gets polled to completion upon a connection close.
fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
// Inform the network behaviour of any failed requests

while let Some(substream_id) = self.outbound_substreams.keys().next().cloned() {
let outbound_info = self
.outbound_substreams
.remove(&substream_id)
.expect("The value must exist for a key");
// If the state of the connection is closing, we do not need to report this case to
// the behaviour, as the connection has just closed non-gracefully
if matches!(outbound_info.state, OutboundSubstreamState::Closing(_)) {
continue;
}

// Register this request as an RPC Error
return Poll::Ready(Some(HandlerEvent::Err(HandlerErr::Outbound {
error: RPCError::Disconnected,
proto: outbound_info.proto,
id: outbound_info.req_id,
})));
}

// Also handle any events that are awaiting to be sent to the behaviour
if !self.events_out.is_empty() {
return Poll::Ready(Some(self.events_out.remove(0)));
}

Poll::Ready(None)
}

fn poll(
&mut self,
cx: &mut Context<'_>,
Expand Down
94 changes: 1 addition & 93 deletions beacon_node/lighthouse_network/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod common;

use common::Protocol;
use lighthouse_network::rpc::{methods::*, RPCError};
use lighthouse_network::rpc::methods::*;
use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Request, Response};
use slog::{debug, warn, Level};
use ssz::Encode;
Expand Down Expand Up @@ -1012,98 +1012,6 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
})
}

#[test]
fn test_disconnect_triggers_rpc_error() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Debug;
let enable_logging = false;

let log = common::build_log(log_level, enable_logging);
let spec = E::default_spec();

let rt = Arc::new(Runtime::new().unwrap());
// get sender/receiver
rt.block_on(async {
let (mut sender, mut receiver) = common::build_node_pair(
Arc::downgrade(&rt),
&log,
ForkName::Base,
&spec,
Protocol::Tcp,
)
.await;

// BlocksByRoot Request
let rpc_request = Request::BlocksByRoot(BlocksByRootRequest::new(
// Must have at least one root for the request to create a stream
vec![Hash256::from_low_u64_be(0)],
&spec,
));

// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
.send_request(peer_id, 42, rpc_request.clone())
.unwrap();
}
NetworkEvent::RPCFailed { error, id: 42, .. } => match error {
RPCError::Disconnected => return,
other => panic!("received unexpected error {:?}", other),
},
other => {
warn!(log, "Ignoring other event {:?}", other);
}
}
}
};

// determine messages to send (PeerId, RequestId). If some, indicates we still need to send
// messages
let mut sending_peer = None;
let receiver_future = async {
loop {
// this future either drives the sending/receiving or times out allowing messages to be
// sent in the timeout
match futures::future::select(
Box::pin(receiver.next_event()),
Box::pin(tokio::time::sleep(Duration::from_secs(1))),
)
.await
{
futures::future::Either::Left((ev, _)) => match ev {
NetworkEvent::RequestReceived { peer_id, .. } => {
sending_peer = Some(peer_id);
}
other => {
warn!(log, "Ignoring other event {:?}", other);
}
},
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
}

// if we need to send messages send them here. This will happen after a delay
if let Some(peer_id) = sending_peer.take() {
warn!(log, "Receiver got request, disconnecting peer");
receiver.__hard_disconnect_testing_only(peer_id);
}
}
};

tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = sleep(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
})
}

/// Establishes a pair of nodes and disconnects the pair based on the selected protocol via an RPC
/// Goodbye message.
fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) {
Expand Down
38 changes: 36 additions & 2 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,49 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// A peer has disconnected.
/// If the peer has active batches, those are considered failed and re-requested.
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> {
pub fn peer_disconnected(
&mut self,
peer_id: &PeerId,
network: &mut SyncNetworkContext<T>,
) -> Result<(), BackFillError> {
if matches!(
self.state(),
BackFillState::Failed | BackFillState::NotRequired
) {
return Ok(());
}

self.active_requests.remove(peer_id);
if let Some(batch_ids) = self.active_requests.remove(peer_id) {
// fail the batches.
for id in batch_ids {
if let Some(batch) = self.batches.get_mut(&id) {
match batch.download_failed(false) {
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
self.fail_sync(BackFillError::BatchDownloadFailed(id))?;
}
Ok(BatchOperationOutcome::Continue) => {}
Err(e) => {
self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?;
}
}
// If we have run out of peers in which to retry this batch, the backfill state
// transitions to a paused state.
// We still need to reset the state for all the affected batches, so we should not
// short circuit early.
if self.retry_batch_download(network, id).is_err() {
debug!(
self.log,
"Batch could not be retried";
"batch_id" => id,
"error" => "no synced peers"
);
}
} else {
debug!(self.log, "Batch not found while removing peer";
"peer" => %peer_id, "batch" => id)
}
}
}

// Remove the peer from the participation list
self.participating_peers.remove(peer_id);
Expand Down
64 changes: 40 additions & 24 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
//! Implements block lookup sync.
//!
//! Block lookup sync is triggered when a peer claims to have imported a block we don't know about.
//! For example, a peer attesting to a head block root that is not in our fork-choice. Lookup sync
//! is recursive in nature, as we may discover that this attested head block root has a parent that
//! is also unknown to us.
//!
//! Block lookup is implemented as an event-driven state machine. It sends events to the network and
//! beacon processor, and expects some set of events back. A discrepancy in the expected event API
//! will result in lookups getting "stuck". A lookup becomes stuck when there is no future event
//! that will trigger the lookup to make progress. There's a fallback mechanism that drops lookups
//! that live for too long, logging the line "Notify the devs a sync lookup is stuck".
//!
//! The expected event API is documented in the code paths that are making assumptions with the
//! comment prefix "Lookup sync event safety:"
//!
//! Block lookup sync attempts to not re-download or re-process data that we already have. Block
//! components are cached temporarily in multiple places before they are imported into fork-choice.
//! Therefore, block lookup sync must peek these caches correctly to decide when to skip a download
//! or consider a lookup complete. These caches are read from the `SyncNetworkContext` and its state
//! returned to this module as `LookupRequestResult` variants.
use self::parent_chain::{compute_parent_chains, NodeChain};
pub use self::single_block_lookup::DownloadResult;
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
Expand Down Expand Up @@ -277,7 +299,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}

if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers) {
if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) {
warn!(self.log, "Error adding peers to ancestor lookup"; "error" => ?e);
}

Expand Down Expand Up @@ -426,21 +448,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/* Error responses */

pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
self.single_block_lookups.retain(|_, lookup| {
for (_, lookup) in self.single_block_lookups.iter_mut() {
lookup.remove_peer(peer_id);

// Note: this condition should be removed in the future. It's not strictly necessary to drop a
// lookup if there are no peers left. Lookup should only be dropped if it can not make progress
if lookup.has_no_peers() {
debug!(self.log,
"Dropping single lookup after peer disconnection";
"block_root" => ?lookup.block_root()
);
false
} else {
true
}
});
}
}

/* Processing responses */
Expand Down Expand Up @@ -803,12 +813,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
};

if stuck_lookup.id == ancestor_stuck_lookup.id {
warn!(self.log, "Notify the devs, a sync lookup is stuck";
warn!(self.log, "Notify the devs a sync lookup is stuck";
"block_root" => ?stuck_lookup.block_root(),
"lookup" => ?stuck_lookup,
);
} else {
warn!(self.log, "Notify the devs, a sync lookup is stuck";
warn!(self.log, "Notify the devs a sync lookup is stuck";
"block_root" => ?stuck_lookup.block_root(),
"lookup" => ?stuck_lookup,
"ancestor_block_root" => ?ancestor_stuck_lookup.block_root(),
Expand Down Expand Up @@ -850,37 +860,43 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
lookup_id: SingleLookupId,
peers: &[PeerId],
cx: &mut SyncNetworkContext<T>,
) -> Result<(), String> {
let lookup = self
.single_block_lookups
.get_mut(&lookup_id)
.ok_or(format!("Unknown lookup for id {lookup_id}"))?;

let mut added_some_peer = false;
for peer in peers {
if lookup.add_peer(*peer) {
added_some_peer = true;
debug!(self.log, "Adding peer to existing single block lookup";
"block_root" => ?lookup.block_root(),
"peer" => ?peer
);
}
}

// We may choose to attempt to continue a lookup here. It is possible that a lookup had zero
// peers and after adding this set of peers it can make progress again. Note that this
// recursive function iterates from child to parent, so continuing the child first is weird.
// However, we choose to not attempt to continue the lookup for simplicity. It's not
// strictly required and just and optimization for a rare corner case.

if let Some(parent_root) = lookup.awaiting_parent() {
if let Some((&child_id, _)) = self
.single_block_lookups
.iter()
.find(|(_, l)| l.block_root() == parent_root)
{
self.add_peers_to_lookup_and_ancestors(child_id, peers)
self.add_peers_to_lookup_and_ancestors(child_id, peers, cx)
} else {
Err(format!("Lookup references unknown parent {parent_root:?}"))
}
} else if added_some_peer {
// If this lookup is not awaiting a parent and we added at least one peer, attempt to
// make progress. It is possible that a lookup is created with zero peers, attempted to
// make progress, and then receives peers. After that time the lookup will never be
// pruned with `drop_lookups_without_peers` because it has peers. This is rare corner
// case, but it can result in stuck lookups.
let result = lookup.continue_requests(cx);
self.on_lookup_result(lookup_id, result, "add_peers", cx);
Ok(())
} else {
Ok(())
}
Expand Down
Loading

0 comments on commit a64cee3

Please sign in to comment.