Skip to content

Commit

Permalink
Generalize sync ActiveRequests (#6398)
Browse files Browse the repository at this point in the history
* Generalize sync ActiveRequests

* Remove impossible to hit test

* Update beacon_node/lighthouse_network/src/service/api_types.rs

Co-authored-by: realbigsean <sean@sigmaprime.io>

* Update beacon_node/network/src/sync/network_context.rs

Co-authored-by: realbigsean <sean@sigmaprime.io>

* Update beacon_node/network/src/sync/network_context.rs

Co-authored-by: realbigsean <sean@sigmaprime.io>

* Simplify match

* Fix display

* Merge remote-tracking branch 'sigp/unstable' into sync-active-request-generalize

* Sampling requests should not expect all responses

* Merge remote-tracking branch 'sigp/unstable' into sync-active-request-generalize

* Fix sampling_batch_requests_not_enough_responses_returned test

* Merge remote-tracking branch 'sigp/unstable' into sync-active-request-generalize

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into sync-active-request-generalize
  • Loading branch information
dapplion authored Oct 17, 2024
1 parent 606a113 commit a074e9e
Show file tree
Hide file tree
Showing 10 changed files with 369 additions and 333 deletions.
18 changes: 11 additions & 7 deletions beacon_node/lighthouse_network/src/service/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ pub struct SingleLookupReqId {
pub req_id: Id,
}

/// Request ID for data_columns_by_root requests. Block lookup do not issue this requests directly.
/// Wrapping this particular req_id, ensures not mixing this requests with a custody req_id.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct DataColumnsByRootRequestId(pub Id);

/// Id of rpc requests sent by sync to the network.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum SyncRequestId {
Expand All @@ -35,11 +30,19 @@ pub enum SyncRequestId {
/// Request searching for a set of blobs given a hash.
SingleBlob { id: SingleLookupReqId },
/// Request searching for a set of data columns given a hash and list of column indices.
DataColumnsByRoot(DataColumnsByRootRequestId, DataColumnsByRootRequester),
DataColumnsByRoot(DataColumnsByRootRequestId),
/// Range request that is composed by both a block range request and a blob range request.
RangeBlockAndBlobs { id: Id },
}

/// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly.
/// Wrapping this particular req_id, ensures not mixing this request with a custody req_id.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct DataColumnsByRootRequestId {
pub id: Id,
pub requester: DataColumnsByRootRequester,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum DataColumnsByRootRequester {
Sampling(SamplingId),
Expand Down Expand Up @@ -173,8 +176,9 @@ impl slog::Value for RequestId {
}
}

// This custom impl reduces log boilerplate not printing `DataColumnsByRootRequestId` on each id log
impl std::fmt::Display for DataColumnsByRootRequestId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
write!(f, "{} {:?}", self.id, self.requester)
}
}
58 changes: 20 additions & 38 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use beacon_chain::{
use beacon_processor::WorkEvent;
use lighthouse_network::rpc::{RPCError, RequestType, RpcErrorResponse};
use lighthouse_network::service::api_types::{
AppRequestId, DataColumnsByRootRequester, Id, SamplingRequester, SingleLookupReqId,
SyncRequestId,
AppRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SamplingRequester,
SingleLookupReqId, SyncRequestId,
};
use lighthouse_network::types::SyncState;
use lighthouse_network::NetworkConfig;
Expand Down Expand Up @@ -745,10 +745,10 @@ impl TestRig {
let first_dc = data_columns.first().unwrap();
let block_root = first_dc.block_root();
let sampling_request_id = match id.0 {
SyncRequestId::DataColumnsByRoot(
_,
_requester @ DataColumnsByRootRequester::Sampling(sampling_id),
) => sampling_id.sampling_request_id,
SyncRequestId::DataColumnsByRoot(DataColumnsByRootRequestId {
requester: DataColumnsByRootRequester::Sampling(sampling_id),
..
}) => sampling_id.sampling_request_id,
_ => unreachable!(),
};
self.complete_data_columns_by_root_request(id, data_columns);
Expand All @@ -773,14 +773,15 @@ impl TestRig {
data_columns: Vec<Arc<DataColumnSidecar<E>>>,
missing_components: bool,
) {
let lookup_id =
if let SyncRequestId::DataColumnsByRoot(_, DataColumnsByRootRequester::Custody(id)) =
ids.first().unwrap().0
{
id.requester.0.lookup_id
} else {
panic!("not a custody requester")
};
let lookup_id = if let SyncRequestId::DataColumnsByRoot(DataColumnsByRootRequestId {
requester: DataColumnsByRootRequester::Custody(id),
..
}) = ids.first().unwrap().0
{
id.requester.0.lookup_id
} else {
panic!("not a custody requester")
};

let first_column = data_columns.first().cloned().unwrap();

Expand Down Expand Up @@ -1189,6 +1190,7 @@ impl TestRig {
penalty_msg, expect_penalty_msg,
"Unexpected penalty msg for {peer_id}"
);
self.log(&format!("Found expected penalty {penalty_msg}"));
}

pub fn expect_single_penalty(&mut self, peer_id: PeerId, expect_penalty_msg: &'static str) {
Expand Down Expand Up @@ -1416,7 +1418,7 @@ fn test_single_block_lookup_empty_response() {

// The peer does not have the block. It should be penalized.
r.single_lookup_block_response(id, peer_id, None);
r.expect_penalty(peer_id, "NoResponseReturned");
r.expect_penalty(peer_id, "NotEnoughResponsesReturned");
// it should be retried
let id = r.expect_block_lookup_request(block_root);
// Send the right block this time.
Expand Down Expand Up @@ -2160,7 +2162,7 @@ fn sampling_batch_requests_not_enough_responses_returned() {
r.assert_sampling_request_ongoing(block_root, &column_indexes);

// Split the indexes to simulate the case where the supernode doesn't have the requested column.
let (_column_indexes_supernode_does_not_have, column_indexes_to_complete) =
let (column_indexes_supernode_does_not_have, column_indexes_to_complete) =
column_indexes.split_at(1);

// Complete the requests but only partially, so a NotEnoughResponsesReturned error occurs.
Expand All @@ -2176,7 +2178,7 @@ fn sampling_batch_requests_not_enough_responses_returned() {

// The request status should be set to NoPeers since the supernode, the only peer, returned not enough responses.
r.log_sampling_requests(block_root, &column_indexes);
r.assert_sampling_request_nopeers(block_root, &column_indexes);
r.assert_sampling_request_nopeers(block_root, column_indexes_supernode_does_not_have);

// The sampling request stalls.
r.expect_empty_network();
Expand Down Expand Up @@ -2721,11 +2723,6 @@ mod deneb_only {
self.blobs.pop().expect("blobs");
self
}
fn invalidate_blobs_too_many(mut self) -> Self {
let first_blob = self.blobs.first().expect("blob").clone();
self.blobs.push(first_blob);
self
}
fn expect_block_process(mut self) -> Self {
self.rig.expect_block_process(ResponseType::Block);
self
Expand Down Expand Up @@ -2814,21 +2811,6 @@ mod deneb_only {
.expect_no_block_request();
}

#[test]
fn single_block_response_then_too_many_blobs_response_attestation() {
let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else {
return;
};
tester
.block_response_triggering_process()
.invalidate_blobs_too_many()
.blobs_response()
.expect_penalty("TooManyResponses")
// Network context returns "download success" because the request has enough blobs + it
// downscores the peer for returning too many.
.expect_no_block_request();
}

// Test peer returning block that has unknown parent, and a new lookup is created
#[test]
fn parent_block_unknown_parent() {
Expand Down Expand Up @@ -2869,7 +2851,7 @@ mod deneb_only {
};
tester
.empty_block_response()
.expect_penalty("NoResponseReturned")
.expect_penalty("NotEnoughResponsesReturned")
.expect_block_request()
.expect_no_blobs_request()
.block_response_and_expect_blob_request()
Expand Down
16 changes: 5 additions & 11 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,13 +472,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncRequestId::SingleBlob { id } => {
self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error))
}
SyncRequestId::DataColumnsByRoot(req_id, requester) => self
.on_data_columns_by_root_response(
req_id,
requester,
peer_id,
RpcEvent::RPCError(error),
),
SyncRequestId::DataColumnsByRoot(req_id) => {
self.on_data_columns_by_root_response(req_id, peer_id, RpcEvent::RPCError(error))
}
SyncRequestId::RangeBlockAndBlobs { id } => {
if let Some(sender_id) = self.network.range_request_failed(id) {
match sender_id {
Expand Down Expand Up @@ -1104,10 +1100,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
seen_timestamp: Duration,
) {
match request_id {
SyncRequestId::DataColumnsByRoot(req_id, requester) => {
SyncRequestId::DataColumnsByRoot(req_id) => {
self.on_data_columns_by_root_response(
req_id,
requester,
peer_id,
match data_column {
Some(data_column) => RpcEvent::Response(data_column, seen_timestamp),
Expand Down Expand Up @@ -1149,15 +1144,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
fn on_data_columns_by_root_response(
&mut self,
req_id: DataColumnsByRootRequestId,
requester: DataColumnsByRootRequester,
peer_id: PeerId,
data_column: RpcEvent<Arc<DataColumnSidecar<T::EthSpec>>>,
) {
if let Some(resp) =
self.network
.on_data_columns_by_root_response(req_id, peer_id, data_column)
{
match requester {
match req_id.requester {
DataColumnsByRootRequester::Sampling(id) => {
if let Some((requester, result)) =
self.sampling
Expand Down
Loading

0 comments on commit a074e9e

Please sign in to comment.