diff --git a/beacon_node/http_api/src/metrics.rs b/beacon_node/http_api/src/metrics.rs index 26ee183c83f..3eada3a3d46 100644 --- a/beacon_node/http_api/src/metrics.rs +++ b/beacon_node/http_api/src/metrics.rs @@ -31,7 +31,13 @@ lazy_static::lazy_static! { ); pub static ref HTTP_API_BLOCK_BROADCAST_DELAY_TIMES: Result = try_create_histogram_vec( "http_api_block_broadcast_delay_times", - "Time between start of the slot and when the block was broadcast", + "Time between start of the slot and when the block completed broadcast and processing", + &["provenance"] + ); + pub static ref HTTP_API_BLOCK_GOSSIP_TIMES: Result = try_create_histogram_vec_with_buckets( + "http_api_block_gossip_times", + "Time between receiving the block on HTTP and publishing it on gossip", + decimal_buckets(-2, 2), &["provenance"] ); pub static ref HTTP_API_BLOCK_PUBLISHED_LATE_TOTAL: Result = try_create_int_counter( diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 0d176e6a53a..e23768ebb6f 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -60,6 +60,11 @@ pub async fn publish_block (block_contents, true), ProvenancedBlock::Builder(block_contents, _) => (block_contents, false), }; + let provenance = if is_locally_built_block { + "local" + } else { + "builder" + }; let block = block_contents.inner_block().clone(); let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock); debug!(log, "Signed block received in HTTP API"; "slot" => block.slot()); @@ -75,7 +80,18 @@ pub async fn publish_block block.slot(), "publish_delay" => ?publish_delay); + metrics::observe_timer_vec( + &metrics::HTTP_API_BLOCK_GOSSIP_TIMES, + &[provenance], + publish_delay, + ); + + info!( + log, + "Signed block published to network via HTTP API"; + "slot" => block.slot(), + "publish_delay_ms" => publish_delay.as_millis() + ); match block.as_ref() { SignedBeaconBlock::Base(_) diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index df5bbba99c8..daf95fb8c91 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -352,6 +352,31 @@ where !matches!(self.state, HandlerState::Deactivated) } + // NOTE: This function gets polled to completion upon a connection close. + fn poll_close(&mut self, _: &mut Context<'_>) -> Poll> { + // Inform the network behaviour of any failed requests + + while let Some(substream_id) = self.outbound_substreams.keys().next().cloned() { + let outbound_info = self + .outbound_substreams + .remove(&substream_id) + .expect("The value must exist for a key"); + // If the state of the connection is closing, we do not need to report this case to + // the behaviour, as the connection has just closed non-gracefully + if matches!(outbound_info.state, OutboundSubstreamState::Closing(_)) { + continue; + } + + // Register this request as an RPC Error + return Poll::Ready(Some(HandlerEvent::Err(HandlerErr::Outbound { + error: RPCError::Disconnected, + proto: outbound_info.proto, + id: outbound_info.req_id, + }))); + } + Poll::Ready(None) + } + fn poll( &mut self, cx: &mut Context<'_>, diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 1397c80fbd3..8ed8658a4bb 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -972,6 +972,12 @@ impl Network { .goodbye_peer(peer_id, reason, source); } + /// Hard (ungraceful) disconnect for testing purposes only + /// Use goodbye_peer for disconnections, do not use this function. + pub fn __hard_disconnect_testing_only(&mut self, peer_id: PeerId) { + let _ = self.swarm.disconnect_peer_id(peer_id); + } + /// Returns an iterator over all enr entries in the DHT. pub fn enr_entries(&self) -> Vec { self.discovery().table_entries_enr() @@ -1373,12 +1379,18 @@ impl Network { let peer_id = event.peer_id; if !self.peer_manager().is_connected(&peer_id) { - debug!( - self.log, - "Ignoring rpc message of disconnecting peer"; - event - ); - return None; + // Sync expects a RPCError::Disconnected to drop associated lookups with this peer. + // Silencing this event breaks the API contract with RPC where every request ends with + // - A stream termination event, or + // - An RPCError event + if !matches!(event.event, HandlerEvent::Err(HandlerErr::Outbound { .. })) { + debug!( + self.log, + "Ignoring rpc message of disconnecting peer"; + event + ); + return None; + } } let handler_id = event.conn_id; diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index a60af4db3db..e2b72f86732 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -3,7 +3,7 @@ mod common; use common::Protocol; -use lighthouse_network::rpc::methods::*; +use lighthouse_network::rpc::{methods::*, RPCError}; use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Request, Response}; use slog::{debug, warn, Level}; use ssz::Encode; @@ -996,6 +996,96 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { }) } +#[test] +fn test_disconnect_triggers_rpc_error() { + // set up the logging. The level and enabled logging or not + let log_level = Level::Debug; + let enable_logging = false; + + let log = common::build_log(log_level, enable_logging); + let spec = E::default_spec(); + + let rt = Arc::new(Runtime::new().unwrap()); + // get sender/receiver + rt.block_on(async { + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + &log, + ForkName::Base, + &spec, + Protocol::Tcp, + ) + .await; + + // BlocksByRoot Request + let rpc_request = Request::BlocksByRoot(BlocksByRootRequest::new( + // Must have at least one root for the request to create a stream + vec![Hash256::from_low_u64_be(0)], + &spec, + )); + + // build the sender future + let sender_future = async { + loop { + match sender.next_event().await { + NetworkEvent::PeerConnectedOutgoing(peer_id) => { + // Send a STATUS message + debug!(log, "Sending RPC"); + sender.send_request(peer_id, 42, rpc_request.clone()); + } + NetworkEvent::RPCFailed { error, id: 42, .. } => match error { + RPCError::Disconnected => return, + other => panic!("received unexpected error {:?}", other), + }, + other => { + warn!(log, "Ignoring other event {:?}", other); + } + } + } + }; + + // determine messages to send (PeerId, RequestId). If some, indicates we still need to send + // messages + let mut sending_peer = None; + let receiver_future = async { + loop { + // this future either drives the sending/receiving or times out allowing messages to be + // sent in the timeout + match futures::future::select( + Box::pin(receiver.next_event()), + Box::pin(tokio::time::sleep(Duration::from_secs(1))), + ) + .await + { + futures::future::Either::Left((ev, _)) => match ev { + NetworkEvent::RequestReceived { peer_id, .. } => { + sending_peer = Some(peer_id); + } + other => { + warn!(log, "Ignoring other event {:?}", other); + } + }, + futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required + } + + // if we need to send messages send them here. This will happen after a delay + if let Some(peer_id) = sending_peer.take() { + warn!(log, "Receiver got request, disconnecting peer"); + receiver.__hard_disconnect_testing_only(peer_id); + } + } + }; + + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = sleep(Duration::from_secs(30)) => { + panic!("Future timed out"); + } + } + }) +} + /// Establishes a pair of nodes and disconnects the pair based on the selected protocol via an RPC /// Goodbye message. fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) { diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 9075bb15f08..4be92d59a4b 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -307,11 +307,7 @@ impl BackFillSync { /// A peer has disconnected. /// If the peer has active batches, those are considered failed and re-requested. #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] - pub fn peer_disconnected( - &mut self, - peer_id: &PeerId, - network: &mut SyncNetworkContext, - ) -> Result<(), BackFillError> { + pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> { if matches!( self.state(), BackFillState::Failed | BackFillState::NotRequired @@ -319,37 +315,7 @@ impl BackFillSync { return Ok(()); } - if let Some(batch_ids) = self.active_requests.remove(peer_id) { - // fail the batches - for id in batch_ids { - if let Some(batch) = self.batches.get_mut(&id) { - match batch.download_failed(false) { - Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { - self.fail_sync(BackFillError::BatchDownloadFailed(id))?; - } - Ok(BatchOperationOutcome::Continue) => {} - Err(e) => { - self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?; - } - } - // If we have run out of peers in which to retry this batch, the backfill state - // transitions to a paused state. - // We still need to reset the state for all the affected batches, so we should not - // short circuit early - if self.retry_batch_download(network, id).is_err() { - debug!( - self.log, - "Batch could not be retried"; - "batch_id" => id, - "error" => "no synced peers" - ); - } - } else { - debug!(self.log, "Batch not found while removing peer"; - "peer" => %peer_id, "batch" => id) - } - } - } + self.active_requests.remove(peer_id); // Remove the peer from the participation list self.participating_peers.remove(peer_id); diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 06e00ea6d1e..fa63e37c1b3 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -1,9 +1,7 @@ use crate::sync::block_lookups::single_block_lookup::{ LookupRequestError, SingleBlockLookup, SingleLookupRequestState, }; -use crate::sync::block_lookups::{ - BlobRequestState, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, -}; +use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId}; use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE}; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::block_verification_types::RpcBlock; @@ -26,11 +24,6 @@ pub enum ResponseType { /// is further back than the most recent head slot. pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; -/// Wrapper around bool to prevent mixing this argument with `BlockIsProcessed` -pub(crate) struct AwaitingParent(pub bool); -/// Wrapper around bool to prevent mixing this argument with `AwaitingParent` -pub(crate) struct BlockIsProcessed(pub bool); - /// This trait unifies common single block lookup functionality across blocks and blobs. This /// includes making requests, verifying responses, and handling processing results. A /// `SingleBlockLookup` includes both a `BlockRequestState` and a `BlobRequestState`, this trait is @@ -43,52 +36,6 @@ pub trait RequestState { /// The type created after validation. type VerifiedResponseType: Clone; - /// Potentially makes progress on this request if it's in a progress-able state - fn continue_request( - &mut self, - id: Id, - awaiting_parent: AwaitingParent, - downloaded_block_expected_blobs: Option, - block_is_processed: BlockIsProcessed, - cx: &mut SyncNetworkContext, - ) -> Result<(), LookupRequestError> { - // Attempt to progress awaiting downloads - if self.get_state().is_awaiting_download() { - // Verify the current request has not exceeded the maximum number of attempts. - let request_state = self.get_state(); - if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS { - let cannot_process = request_state.more_failed_processing_attempts(); - return Err(LookupRequestError::TooManyAttempts { cannot_process }); - } - - let peer_id = self - .get_state_mut() - .use_rand_available_peer() - .ok_or(LookupRequestError::NoPeers)?; - - // make_request returns true only if a request needs to be made - if self.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? { - self.get_state_mut().on_download_start()?; - } else { - self.get_state_mut().on_completed_request()?; - } - - // Otherwise, attempt to progress awaiting processing - // If this request is awaiting a parent lookup to be processed, do not send for processing. - // The request will be rejected with unknown parent error. - } else if !awaiting_parent.0 - && (block_is_processed.0 || matches!(Self::response_type(), ResponseType::Block)) - { - // maybe_start_processing returns Some if state == AwaitingProcess. This pattern is - // useful to conditionally access the result data. - if let Some(result) = self.get_state_mut().maybe_start_processing() { - return Self::send_for_processing(id, result, cx); - } - } - - Ok(()) - } - /// Request the network context to prepare a request of a component of `block_root`. If the /// request is not necessary because the component is already known / processed, return false. /// Return true if it sent a request and we can expect an event back from the network. diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 6852761d8bf..3da2577114c 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -16,7 +16,7 @@ use fnv::FnvHashMap; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; pub use single_block_lookup::{BlobRequestState, BlockRequestState}; -use slog::{debug, error, trace, warn, Logger}; +use slog::{debug, error, warn, Logger}; use std::collections::hash_map::Entry; use std::sync::Arc; use std::time::Duration; @@ -233,13 +233,17 @@ impl BlockLookups { .iter_mut() .find(|(_id, lookup)| lookup.is_for_block(block_root)) { - trace!(self.log, "Adding peer to existing single block lookup"; "block_root" => %block_root); - lookup.add_peers(peers); + for peer in peers { + if lookup.add_peer(*peer) { + debug!(self.log, "Adding peer to existing single block lookup"; "block_root" => ?block_root, "peer" => ?peer); + } + } + if let Some(block_component) = block_component { let component_type = block_component.get_type(); let imported = lookup.add_child_components(block_component); if !imported { - debug!(self.log, "Lookup child component ignored"; "block_root" => %block_root, "type" => component_type); + debug!(self.log, "Lookup child component ignored"; "block_root" => ?block_root, "type" => component_type); } } return true; @@ -252,10 +256,15 @@ impl BlockLookups { .iter() .any(|(_, lookup)| lookup.is_for_block(awaiting_parent)) { + warn!(self.log, "Ignoring child lookup parent lookup not found"; "block_root" => ?awaiting_parent); return false; } } + // If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve), + // signal here to hold processing downloaded data. + let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent); + let msg = if block_component.is_some() { "Searching for components of a block with unknown parent" } else { @@ -265,14 +274,11 @@ impl BlockLookups { self.log, "{}", msg; "peer_ids" => ?peers, - "block" => ?block_root, + "block_root" => ?block_root, + "id" => lookup.id, ); metrics::inc_counter(&metrics::SYNC_LOOKUP_CREATED); - // If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve), - // signal here to hold processing downloaded data. - let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent); - // Add block components to the new request if let Some(block_component) = block_component { lookup.add_child_components(block_component); @@ -338,7 +344,8 @@ impl BlockLookups { Ok((response, seen_timestamp)) => { debug!(self.log, "Received lookup download success"; - "block_root" => %block_root, + "block_root" => ?block_root, + "id" => id, "peer_id" => %peer_id, "response_type" => ?response_type, ); @@ -357,7 +364,8 @@ impl BlockLookups { Err(e) => { debug!(self.log, "Received lookup download failure"; - "block_root" => %block_root, + "block_root" => ?block_root, + "id" => id, "peer_id" => %peer_id, "response_type" => ?response_type, "error" => %e, @@ -374,16 +382,13 @@ impl BlockLookups { /* Error responses */ pub fn peer_disconnected(&mut self, peer_id: &PeerId) { - /* Check disconnection for single lookups */ - self.single_block_lookups.retain(|_, req| { - let should_drop_lookup = - req.should_drop_lookup_on_disconnected_peer(peer_id ); - - if should_drop_lookup { - debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => %req.block_root()); + self.single_block_lookups.retain(|_, lookup| { + if lookup.remove_peer(peer_id) { + debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => ?lookup.block_root()); + false + } else { + true } - - !should_drop_lookup }); } @@ -425,6 +430,7 @@ impl BlockLookups { "Received lookup processing result"; "component" => ?R::response_type(), "block_root" => ?block_root, + "id" => lookup_id, "result" => ?result, ); @@ -496,7 +502,7 @@ impl BlockLookups { debug!( self.log, "Single block lookup failed. Execution layer is offline / unsynced / misconfigured"; - "block_root" => %block_root, + "block_root" => ?block_root, "error" => ?e ); Action::Drop @@ -505,7 +511,7 @@ impl BlockLookups { if e.category() == AvailabilityCheckErrorCategory::Internal => { // There errors indicate internal problems and should not downscore the peer - warn!(self.log, "Internal availability check failure"; "block_root" => %block_root, "error" => ?e); + warn!(self.log, "Internal availability check failure"; "block_root" => ?block_root, "error" => ?e); // Here we choose *not* to call `on_processing_failure` because this could result in a bad // lookup state transition. This error invalidates both blob and block requests, and we don't know the @@ -514,7 +520,7 @@ impl BlockLookups { Action::Drop } other => { - debug!(self.log, "Invalid lookup component"; "block_root" => %block_root, "component" => ?R::response_type(), "error" => ?other); + debug!(self.log, "Invalid lookup component"; "block_root" => ?block_root, "component" => ?R::response_type(), "error" => ?other); let peer_id = request_state.on_processing_failure()?; cx.report_peer( peer_id, @@ -540,7 +546,7 @@ impl BlockLookups { Action::ParentUnknown { parent_root } => { let peers = lookup.all_available_peers().cloned().collect::>(); lookup.set_awaiting_parent(parent_root); - debug!(self.log, "Marking lookup as awaiting parent"; "lookup" => %block_root, "parent_root" => %parent_root); + debug!(self.log, "Marking lookup as awaiting parent"; "id" => lookup.id, "block_root" => ?block_root, "parent_root" => ?parent_root); self.search_parent_of_child(parent_root, block_root, &peers, cx); Ok(LookupResult::Pending) } @@ -562,7 +568,7 @@ impl BlockLookups { for (id, lookup) in self.single_block_lookups.iter_mut() { if lookup.awaiting_parent() == Some(block_root) { lookup.resolve_awaiting_parent(); - debug!(self.log, "Continuing child lookup"; "parent_root" => %block_root, "block_root" => %lookup.block_root()); + debug!(self.log, "Continuing child lookup"; "parent_root" => ?block_root, "id" => id, "block_root" => ?lookup.block_root()); let result = lookup.continue_requests(cx); lookup_results.push((*id, result)); } @@ -578,7 +584,7 @@ impl BlockLookups { /// dropped. pub fn drop_lookup_and_children(&mut self, dropped_id: SingleLookupId) { if let Some(dropped_lookup) = self.single_block_lookups.remove(&dropped_id) { - debug!(self.log, "Dropping child lookup"; "id" => ?dropped_id, "block_root" => %dropped_lookup.block_root()); + debug!(self.log, "Dropping child lookup"; "id" => ?dropped_id, "block_root" => ?dropped_lookup.block_root()); let child_lookups = self .single_block_lookups @@ -605,11 +611,13 @@ impl BlockLookups { Ok(LookupResult::Pending) => {} // no action Ok(LookupResult::Completed) => { if let Some(lookup) = self.single_block_lookups.remove(&id) { - debug!(self.log, "Dropping completed lookup"; "block" => %lookup.block_root()); + debug!(self.log, "Dropping completed lookup"; "block" => ?lookup.block_root(), "id" => id); metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED); // Block imported, continue the requests of pending child blocks self.continue_child_lookups(lookup.block_root(), cx); self.update_metrics(); + } else { + debug!(self.log, "Attempting to drop non-existent lookup"; "id" => id); } } Err(error) => { diff --git a/beacon_node/network/src/sync/block_lookups/parent_chain.rs b/beacon_node/network/src/sync/block_lookups/parent_chain.rs index 55f2cfe1292..7f4fe5119f6 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_chain.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_chain.rs @@ -55,7 +55,7 @@ pub(crate) fn compute_parent_chains(nodes: &[Node]) -> Vec { // Iterate blocks with no children for tip in nodes { let mut block_root = tip.block_root; - if parent_to_child.get(&block_root).is_none() { + if !parent_to_child.contains_key(&block_root) { let mut chain = vec![]; // Resolve chain of blocks diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index b642ec8e5b2..a5729f39062 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,5 +1,5 @@ -use super::common::{AwaitingParent, BlockIsProcessed}; -use super::{BlockComponent, PeerId}; +use super::common::ResponseType; +use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; use crate::sync::block_lookups::common::RequestState; use crate::sync::block_lookups::Id; use crate::sync::network_context::SyncNetworkContext; @@ -150,7 +150,7 @@ impl SingleBlockLookup { } } - /// Wrapper around `RequestState::continue_request` to inject lookup data + /// Potentially makes progress on this request if it's in a progress-able state pub fn continue_request>( &mut self, cx: &mut SyncNetworkContext, @@ -163,26 +163,51 @@ impl SingleBlockLookup { .peek_downloaded_data() .map(|block| block.num_expected_blobs()); let block_is_processed = self.block_request_state.state.is_processed(); - R::request_state_mut(self).continue_request( - id, - AwaitingParent(awaiting_parent), - downloaded_block_expected_blobs, - BlockIsProcessed(block_is_processed), - cx, - ) - } + let request = R::request_state_mut(self); + + // Attempt to progress awaiting downloads + if request.get_state().is_awaiting_download() { + // Verify the current request has not exceeded the maximum number of attempts. + let request_state = request.get_state(); + if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS { + let cannot_process = request_state.more_failed_processing_attempts(); + return Err(LookupRequestError::TooManyAttempts { cannot_process }); + } - /// Add all given peers to both block and blob request states. - pub fn add_peer(&mut self, peer_id: PeerId) { - self.block_request_state.state.add_peer(&peer_id); - self.blob_request_state.state.add_peer(&peer_id); - } + let peer_id = request + .get_state_mut() + .use_rand_available_peer() + .ok_or(LookupRequestError::NoPeers)?; - /// Add all given peers to both block and blob request states. - pub fn add_peers(&mut self, peers: &[PeerId]) { - for peer in peers { - self.add_peer(*peer); + // make_request returns true only if a request needs to be made + if request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? { + request.get_state_mut().on_download_start()?; + } else { + request.get_state_mut().on_completed_request()?; + } + + // Otherwise, attempt to progress awaiting processing + // If this request is awaiting a parent lookup to be processed, do not send for processing. + // The request will be rejected with unknown parent error. + } else if !awaiting_parent + && (block_is_processed || matches!(R::response_type(), ResponseType::Block)) + { + // maybe_start_processing returns Some if state == AwaitingProcess. This pattern is + // useful to conditionally access the result data. + if let Some(result) = request.get_state_mut().maybe_start_processing() { + return R::send_for_processing(id, result, cx); + } } + + Ok(()) + } + + /// Add peer to all request states. The peer must be able to serve this request. + /// Returns true if the peer was newly inserted into some request state. + pub fn add_peer(&mut self, peer_id: PeerId) -> bool { + let inserted_block = self.block_request_state.state.add_peer(&peer_id); + let inserted_blob = self.blob_request_state.state.add_peer(&peer_id); + inserted_block || inserted_blob } /// Returns true if the block has already been downloaded. @@ -191,21 +216,11 @@ impl SingleBlockLookup { && self.blob_request_state.state.is_processed() } - /// Checks both the block and blob request states to see if the peer is disconnected. - /// - /// Returns true if the lookup should be dropped. - pub fn should_drop_lookup_on_disconnected_peer(&mut self, peer_id: &PeerId) -> bool { - self.block_request_state.state.remove_peer(peer_id); - self.blob_request_state.state.remove_peer(peer_id); - - if self.all_available_peers().count() == 0 { - return true; - } - - // Note: if the peer disconnected happens to have an on-going request associated with this - // lookup we will receive an RPCError and the lookup will fail. No need to manually retry - // now. - false + /// Remove peer from available peers. Return true if there are no more available peers and all + /// requests are not expecting any future event (AwaitingDownload). + pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool { + self.block_request_state.state.remove_peer(peer_id) + && self.blob_request_state.state.remove_peer(peer_id) } } @@ -464,14 +479,17 @@ impl SingleLookupRequestState { self.failed_processing >= self.failed_downloading } - /// This method should be used for peers wrapped in `PeerId::BlockAndBlobs`. - pub fn add_peer(&mut self, peer_id: &PeerId) { - self.available_peers.insert(*peer_id); + /// Add peer to this request states. The peer must be able to serve this request. + /// Returns true if the peer is newly inserted. + pub fn add_peer(&mut self, peer_id: &PeerId) -> bool { + self.available_peers.insert(*peer_id) } - /// If a peer disconnects, this request could be failed. If so, an error is returned - pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) { + /// Remove peer from available peers. Return true if there are no more available peers and the + /// request is not expecting any future event (AwaitingDownload). + pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) -> bool { self.available_peers.remove(disconnected_peer_id); + self.available_peers.is_empty() && self.is_awaiting_download() } pub fn get_used_peers(&self) -> impl Iterator { diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 302a0489c3b..75e0fc524f1 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -450,8 +450,25 @@ impl TestRig { }) } - fn peer_disconnected(&mut self, peer_id: PeerId) { - self.send_sync_message(SyncMessage::Disconnect(peer_id)); + fn peer_disconnected(&mut self, disconnected_peer_id: PeerId) { + self.send_sync_message(SyncMessage::Disconnect(disconnected_peer_id)); + + // Return RPCErrors for all active requests of peer + self.drain_network_rx(); + while let Ok(request_id) = self.pop_received_network_event(|ev| match ev { + NetworkMessage::SendRequest { + peer_id, + request_id: RequestId::Sync(id), + .. + } if *peer_id == disconnected_peer_id => Some(*id), + _ => None, + }) { + self.send_sync_message(SyncMessage::RpcError { + peer_id: disconnected_peer_id, + request_id, + error: RPCError::Disconnected, + }); + } } fn drain_network_rx(&mut self) { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 0836d97c49f..56bce7acad5 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -56,7 +56,7 @@ use lighthouse_network::rpc::RPCError; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::SyncInfo; use lighthouse_network::{PeerAction, PeerId}; -use slog::{crit, debug, error, info, trace, warn, Logger}; +use slog::{crit, debug, error, info, o, trace, warn, Logger}; use std::ops::Sub; use std::sync::Arc; use std::time::Duration; @@ -257,9 +257,16 @@ impl SyncManager { beacon_chain.clone(), log.clone(), ), - range_sync: RangeSync::new(beacon_chain.clone(), log.clone()), - backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals, log.clone()), - block_lookups: BlockLookups::new(log.clone()), + range_sync: RangeSync::new( + beacon_chain.clone(), + log.new(o!("service" => "range_sync")), + ), + backfill_sync: BackFillSync::new( + beacon_chain.clone(), + network_globals, + log.new(o!("service" => "backfill_sync")), + ), + block_lookups: BlockLookups::new(log.new(o!("service"=> "lookup_sync"))), log: log.clone(), } } @@ -366,9 +373,7 @@ impl SyncManager { self.range_sync.peer_disconnect(&mut self.network, peer_id); self.block_lookups.peer_disconnected(peer_id); // Regardless of the outcome, we update the sync status. - let _ = self - .backfill_sync - .peer_disconnected(peer_id, &mut self.network); + let _ = self.backfill_sync.peer_disconnected(peer_id); self.update_sync_state(); } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index c60cdb2cc9f..9a6c99ebf6c 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -174,30 +174,8 @@ impl SyncingChain { /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. - pub fn remove_peer( - &mut self, - peer_id: &PeerId, - network: &mut SyncNetworkContext, - ) -> ProcessingResult { - if let Some(batch_ids) = self.peers.remove(peer_id) { - // fail the batches - for id in batch_ids { - if let Some(batch) = self.batches.get_mut(&id) { - if let BatchOperationOutcome::Failed { blacklist } = - batch.download_failed(true)? - { - return Err(RemoveChain::ChainFailed { - blacklist, - failing_batch: id, - }); - } - self.retry_batch_download(network, id)?; - } else { - debug!(self.log, "Batch not found while removing peer"; - "peer" => %peer_id, "batch" => id) - } - } - } + pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult { + self.peers.remove(peer_id); if self.peers.is_empty() { Err(RemoveChain::EmptyPeerPool) diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index c8e82666840..fe48db35b45 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -278,9 +278,8 @@ where /// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum /// retries. In this case, we need to remove the chain. fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { - for (removed_chain, sync_type, remove_reason) in self - .chains - .call_all(|chain| chain.remove_peer(peer_id, network)) + for (removed_chain, sync_type, remove_reason) in + self.chains.call_all(|chain| chain.remove_peer(peer_id)) { self.on_chain_removed( removed_chain, diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index c2dc2732020..6645416d4b0 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -1060,7 +1060,9 @@ mod release_tests { op_pool .insert_attestation(att.clone_as_attestation(), attesting_indices.clone()) .unwrap(); - op_pool.insert_attestation(att.clone_as_attestation(), attesting_indices).unwrap(); + op_pool + .insert_attestation(att.clone_as_attestation(), attesting_indices) + .unwrap(); } assert_eq!(op_pool.num_attestations(), committees.len()); diff --git a/consensus/state_processing/src/per_block_processing.rs b/consensus/state_processing/src/per_block_processing.rs index 2efa1218829..98671f82b9f 100644 --- a/consensus/state_processing/src/per_block_processing.rs +++ b/consensus/state_processing/src/per_block_processing.rs @@ -508,6 +508,7 @@ pub fn get_expected_withdrawals( let mut withdrawal_index = state.next_withdrawal_index()?; let mut validator_index = state.next_withdrawal_validator_index()?; let mut withdrawals = vec![]; + let fork_name = state.fork_name_unchecked(); let bound = std::cmp::min( state.validators().len() as u64, @@ -518,7 +519,7 @@ pub fn get_expected_withdrawals( let balance = *state.balances().get(validator_index as usize).ok_or( BeaconStateError::BalancesOutOfBounds(validator_index as usize), )?; - if validator.is_fully_withdrawable_at(balance, epoch, spec) { + if validator.is_fully_withdrawable_at(balance, epoch, spec, fork_name) { withdrawals.push(Withdrawal { index: withdrawal_index, validator_index, @@ -528,7 +529,7 @@ pub fn get_expected_withdrawals( amount: balance, }); withdrawal_index.safe_add_assign(1)?; - } else if validator.is_partially_withdrawable_validator(balance, spec) { + } else if validator.is_partially_withdrawable_validator(balance, spec, fork_name) { withdrawals.push(Withdrawal { index: withdrawal_index, validator_index, diff --git a/consensus/state_processing/src/per_epoch_processing/registry_updates.rs b/consensus/state_processing/src/per_epoch_processing/registry_updates.rs index 4b2f940e5f8..3d02d797366 100644 --- a/consensus/state_processing/src/per_epoch_processing/registry_updates.rs +++ b/consensus/state_processing/src/per_epoch_processing/registry_updates.rs @@ -19,19 +19,20 @@ pub fn process_registry_updates( validator.is_active_at(current_epoch) && validator.effective_balance <= spec.ejection_balance }; + let fork_name = state.fork_name_unchecked(); let indices_to_update: Vec<_> = state .validators() .iter() .enumerate() .filter(|(_, validator)| { - validator.is_eligible_for_activation_queue(spec) || is_ejectable(validator) + validator.is_eligible_for_activation_queue(spec, fork_name) || is_ejectable(validator) }) .map(|(idx, _)| idx) .collect(); for index in indices_to_update { let validator = state.get_validator_mut(index)?; - if validator.is_eligible_for_activation_queue(spec) { + if validator.is_eligible_for_activation_queue(spec, fork_name) { validator.activation_eligibility_epoch = current_epoch.safe_add(1)?; } if is_ejectable(validator) { diff --git a/consensus/state_processing/src/per_epoch_processing/single_pass.rs b/consensus/state_processing/src/per_epoch_processing/single_pass.rs index 7a95de3317e..a9629e73e40 100644 --- a/consensus/state_processing/src/per_epoch_processing/single_pass.rs +++ b/consensus/state_processing/src/per_epoch_processing/single_pass.rs @@ -466,7 +466,7 @@ fn process_single_registry_update( ) -> Result<(), Error> { let current_epoch = state_ctxt.current_epoch; - if validator.is_eligible_for_activation_queue(spec) { + if validator.is_eligible_for_activation_queue(spec, state_ctxt.fork_name) { validator.make_mut()?.activation_eligibility_epoch = current_epoch.safe_add(1)?; } diff --git a/consensus/types/src/validator.rs b/consensus/types/src/validator.rs index 8ed449ec8a7..9e26d1eeca6 100644 --- a/consensus/types/src/validator.rs +++ b/consensus/types/src/validator.rs @@ -1,5 +1,5 @@ use crate::{ - test_utils::TestRandom, Address, BeaconState, ChainSpec, Epoch, EthSpec, Hash256, + test_utils::TestRandom, Address, BeaconState, ChainSpec, Epoch, EthSpec, ForkName, Hash256, PublicKeyBytes, }; use serde::{Deserialize, Serialize}; @@ -57,8 +57,31 @@ impl Validator { /// Returns `true` if the validator is eligible to join the activation queue. /// - /// Modified in electra - pub fn is_eligible_for_activation_queue(&self, spec: &ChainSpec) -> bool { + /// Calls the correct function depending on the provided `fork_name`. + pub fn is_eligible_for_activation_queue( + &self, + spec: &ChainSpec, + current_fork: ForkName, + ) -> bool { + if current_fork >= ForkName::Electra { + self.is_eligible_for_activation_queue_electra(spec) + } else { + self.is_eligible_for_activation_queue_base(spec) + } + } + + /// Returns `true` if the validator is eligible to join the activation queue. + /// + /// Spec v0.12.1 + fn is_eligible_for_activation_queue_base(&self, spec: &ChainSpec) -> bool { + self.activation_eligibility_epoch == spec.far_future_epoch + && self.effective_balance == spec.max_effective_balance + } + + /// Returns `true` if the validator is eligible to join the activation queue. + /// + /// Modified in electra as part of EIP 7251. + fn is_eligible_for_activation_queue_electra(&self, spec: &ChainSpec) -> bool { self.activation_eligibility_epoch == spec.far_future_epoch && self.effective_balance >= spec.min_activation_balance } @@ -131,8 +154,40 @@ impl Validator { /// Returns `true` if the validator is fully withdrawable at some epoch. /// - /// Note: Modified in electra. - pub fn is_fully_withdrawable_at(&self, balance: u64, epoch: Epoch, spec: &ChainSpec) -> bool { + /// Calls the correct function depending on the provided `fork_name`. + pub fn is_fully_withdrawable_at( + &self, + balance: u64, + epoch: Epoch, + spec: &ChainSpec, + current_fork: ForkName, + ) -> bool { + if current_fork >= ForkName::Electra { + self.is_fully_withdrawable_at_electra(balance, epoch, spec) + } else { + self.is_fully_withdrawable_at_capella(balance, epoch, spec) + } + } + + /// Returns `true` if the validator is fully withdrawable at some epoch. + fn is_fully_withdrawable_at_capella( + &self, + balance: u64, + epoch: Epoch, + spec: &ChainSpec, + ) -> bool { + self.has_eth1_withdrawal_credential(spec) && self.withdrawable_epoch <= epoch && balance > 0 + } + + /// Returns `true` if the validator is fully withdrawable at some epoch. + /// + /// Modified in electra as part of EIP 7251. + fn is_fully_withdrawable_at_electra( + &self, + balance: u64, + epoch: Epoch, + spec: &ChainSpec, + ) -> bool { self.has_execution_withdrawal_credential(spec) && self.withdrawable_epoch <= epoch && balance > 0 @@ -140,8 +195,35 @@ impl Validator { /// Returns `true` if the validator is partially withdrawable. /// - /// Note: Modified in electra. - pub fn is_partially_withdrawable_validator(&self, balance: u64, spec: &ChainSpec) -> bool { + /// Calls the correct function depending on the provided `fork_name`. + pub fn is_partially_withdrawable_validator( + &self, + balance: u64, + spec: &ChainSpec, + current_fork: ForkName, + ) -> bool { + if current_fork >= ForkName::Electra { + self.is_partially_withdrawable_validator_electra(balance, spec) + } else { + self.is_partially_withdrawable_validator_capella(balance, spec) + } + } + + /// Returns `true` if the validator is partially withdrawable. + fn is_partially_withdrawable_validator_capella(&self, balance: u64, spec: &ChainSpec) -> bool { + self.has_eth1_withdrawal_credential(spec) + && self.effective_balance == spec.max_effective_balance + && balance > spec.max_effective_balance + } + + /// Returns `true` if the validator is partially withdrawable. + /// + /// Modified in electra as part of EIP 7251. + pub fn is_partially_withdrawable_validator_electra( + &self, + balance: u64, + spec: &ChainSpec, + ) -> bool { let max_effective_balance = self.get_validator_max_effective_balance(spec); let has_max_effective_balance = self.effective_balance == max_effective_balance; let has_excess_balance = balance > max_effective_balance; diff --git a/validator_client/slashing_protection/src/slashing_database.rs b/validator_client/slashing_protection/src/slashing_database.rs index b497abd7dde..04554786f6f 100644 --- a/validator_client/slashing_protection/src/slashing_database.rs +++ b/validator_client/slashing_protection/src/slashing_database.rs @@ -23,7 +23,7 @@ pub const POOL_SIZE: u32 = 1; #[cfg(not(test))] pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); #[cfg(test)] -pub const CONNECTION_TIMEOUT: Duration = Duration::from_millis(500); +pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(1); /// Supported version of the interchange format. pub const SUPPORTED_INTERCHANGE_FORMAT_VERSION: u64 = 5; diff --git a/validator_client/src/signing_method.rs b/validator_client/src/signing_method.rs index 19824d76fb0..d89c9b82292 100644 --- a/validator_client/src/signing_method.rs +++ b/validator_client/src/signing_method.rs @@ -7,7 +7,7 @@ use crate::http_metrics::metrics; use eth2_keystore::Keystore; use lockfile::Lockfile; use parking_lot::Mutex; -use reqwest::Client; +use reqwest::{header::ACCEPT, Client}; use std::path::PathBuf; use std::sync::Arc; use task_executor::TaskExecutor; @@ -243,6 +243,7 @@ impl SigningMethod { // Request a signature from the Web3Signer instance via HTTP(S). let response: SigningResponse = http_client .post(signing_url.clone()) + .header(ACCEPT, "application/json") .json(&request) .send() .await diff --git a/watch/src/server/mod.rs b/watch/src/server/mod.rs index 25dd242aab6..08036db9510 100644 --- a/watch/src/server/mod.rs +++ b/watch/src/server/mod.rs @@ -31,7 +31,7 @@ pub async fn serve(config: FullConfig) -> Result<(), Error> { ) })?; - let server = start_server(&config, slots_per_epoch as u64, db)?; + let (_addr, server) = start_server(&config, slots_per_epoch as u64, db)?; server.await?; @@ -58,7 +58,13 @@ pub fn start_server( config: &FullConfig, slots_per_epoch: u64, pool: PgPool, -) -> Result> + 'static, Error> { +) -> Result< + ( + SocketAddr, + impl Future> + 'static, + ), + Error, +> { let mut routes = Router::new() .route("/v1/slots", get(handler::get_slots_by_range)) .route("/v1/slots/:slot", get(handler::get_slot)) @@ -106,11 +112,15 @@ pub fn start_server( let addr = SocketAddr::new(config.server.listen_addr, config.server.listen_port); let listener = TcpListener::bind(addr)?; listener.set_nonblocking(true)?; + + // Read the socket address (it may be different from `addr` if listening on port 0). + let socket_addr = listener.local_addr()?; + let serve = axum::serve(tokio::net::TcpListener::from_std(listener)?, app); info!("HTTP server listening on {}", addr); - Ok(serve.into_future()) + Ok((socket_addr, serve.into_future())) } // The default route indicating that no available routes matched the request. diff --git a/watch/tests/tests.rs b/watch/tests/tests.rs index 0e29e7f0cd8..5461508edd8 100644 --- a/watch/tests/tests.rs +++ b/watch/tests/tests.rs @@ -14,7 +14,6 @@ use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use std::collections::HashMap; use std::env; -use std::net::SocketAddr; use std::time::Duration; use testcontainers::{clients::Cli, core::WaitFor, Image, RunnableImage}; use tokio::{runtime, task::JoinHandle}; @@ -154,7 +153,7 @@ impl TesterBuilder { * Create a watch configuration */ let database_port = unused_tcp4_port().expect("Unable to find unused port."); - let server_port = unused_tcp4_port().expect("Unable to find unused port."); + let server_port = 0; let config = Config { database: DatabaseConfig { dbname: random_dbname(), @@ -187,14 +186,9 @@ impl TesterBuilder { /* * Spawn a Watch HTTP API. */ - let watch_server = start_server(&self.config, SLOTS_PER_EPOCH, pool).unwrap(); + let (addr, watch_server) = start_server(&self.config, SLOTS_PER_EPOCH, pool).unwrap(); tokio::spawn(watch_server); - let addr = SocketAddr::new( - self.config.server.listen_addr, - self.config.server.listen_port, - ); - /* * Create a HTTP client to talk to the watch HTTP API. */