Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runtime rpc request sizes #4841

Merged
merged 19 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::sync::Arc;
use task_executor::TaskExecutor;
use types::beacon_block_body::{KzgCommitmentOpts, KzgCommitments};
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::consts::deneb::MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS;
use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};

mod availability_view;
Expand Down Expand Up @@ -389,7 +388,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map(|current_epoch| {
std::cmp::max(
fork_epoch,
current_epoch.saturating_sub(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS),
current_epoch
.saturating_sub(self.spec.min_epochs_for_blob_sidecars_requests),
)
})
})
Expand Down Expand Up @@ -484,7 +484,8 @@ async fn availability_cache_maintenance_service<T: BeaconChainTypes>(
let cutoff_epoch = std::cmp::max(
finalized_epoch + 1,
std::cmp::max(
current_epoch.saturating_sub(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS),
current_epoch
.saturating_sub(chain.spec.min_epochs_for_blob_sidecars_requests),
deneb_fork_epoch,
),
);
Expand Down
4 changes: 1 addition & 3 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use types::consts::deneb::MAX_BLOBS_PER_BLOCK;
use types::{Attestation, Hash256, SignedAggregateAndProof, SubnetId};
use types::{EthSpec, Slot};
use work_reprocessing_queue::IgnoredRpcBlock;
Expand Down Expand Up @@ -168,8 +167,7 @@ const MAX_BLOCKS_BY_RANGE_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `BlobsByRangeRequest` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_BLOBS_BY_RANGE_QUEUE_LEN: usize =
MAX_BLOCKS_BY_RANGE_QUEUE_LEN * MAX_BLOBS_PER_BLOCK as usize;
const MAX_BLOBS_BY_RANGE_QUEUE_LEN: usize = 1024;
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved

/// The maximum number of queued `BlocksByRootRequest` objects received from the network RPC that
/// will be stored before we start dropping them.
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/rpc/codec/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ mod tests {
));

// Request limits
let limit = protocol_id.rpc_request_limits();
let limit = protocol_id.rpc_request_limits(&fork_context);
let mut max = encode_len(limit.max + 1);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol_id.clone(),
Expand Down
59 changes: 37 additions & 22 deletions beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::sync::Arc;
use tokio_util::codec::{Decoder, Encoder};
use types::{light_client_bootstrap::LightClientBootstrap, BlobSidecar};
use types::{
EthSpec, ForkContext, ForkName, Hash256, SignedBeaconBlock, SignedBeaconBlockAltair,
SignedBeaconBlockBase, SignedBeaconBlockCapella, SignedBeaconBlockDeneb,
SignedBeaconBlockMerge,
EthSpec, ForkContext, ForkName, Hash256, RuntimeVariableList, SignedBeaconBlock,
SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockCapella,
SignedBeaconBlockDeneb, SignedBeaconBlockMerge,
};
use unsigned_varint::codec::Uvi;

Expand Down Expand Up @@ -142,7 +142,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {

// Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of
// packet size for ssz container corresponding to `self.protocol`.
let ssz_limits = self.protocol.rpc_request_limits();
let ssz_limits = self.protocol.rpc_request_limits(&self.fork_context);
if ssz_limits.is_out_of_bounds(length, self.max_packet_size) {
return Err(RPCError::InvalidData(format!(
"RPC request length for protocol {:?} is out of bounds, length {}",
Expand All @@ -163,7 +163,11 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
let n = reader.get_ref().get_ref().position();
self.len = None;
let _read_bytes = src.split_to(n as usize);
handle_rpc_request(self.protocol.versioned_protocol, &decoded_buffer)
handle_rpc_request(
self.protocol.versioned_protocol,
&decoded_buffer,
&self.fork_context,
)
}
Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len),
}
Expand Down Expand Up @@ -455,6 +459,7 @@ fn handle_length(
fn handle_rpc_request<T: EthSpec>(
versioned_protocol: SupportedProtocol,
decoded_buffer: &[u8],
fork_context: &ForkContext,
) -> Result<Option<InboundRequest<T>>, RPCError> {
match versioned_protocol {
SupportedProtocol::StatusV1 => Ok(Some(InboundRequest::Status(
Expand All @@ -471,20 +476,29 @@ fn handle_rpc_request<T: EthSpec>(
))),
SupportedProtocol::BlocksByRootV2 => Ok(Some(InboundRequest::BlocksByRoot(
BlocksByRootRequest::V2(BlocksByRootRequestV2 {
block_roots: VariableList::from_ssz_bytes(decoded_buffer)?,
block_roots: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
fork_context.spec.max_request_blocks as usize,
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
)?,
}),
))),
SupportedProtocol::BlocksByRootV1 => Ok(Some(InboundRequest::BlocksByRoot(
BlocksByRootRequest::V1(BlocksByRootRequestV1 {
block_roots: VariableList::from_ssz_bytes(decoded_buffer)?,
block_roots: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
fork_context.spec.max_request_blocks as usize,
)?,
}),
))),
SupportedProtocol::BlobsByRangeV1 => Ok(Some(InboundRequest::BlobsByRange(
BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?,
))),
SupportedProtocol::BlobsByRootV1 => {
Ok(Some(InboundRequest::BlobsByRoot(BlobsByRootRequest {
blob_ids: VariableList::from_ssz_bytes(decoded_buffer)?,
blob_ids: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
fork_context.spec.max_request_blob_sidecars as usize,
)?,
})))
}
SupportedProtocol::PingV1 => Ok(Some(InboundRequest::Ping(Ping {
Expand Down Expand Up @@ -777,21 +791,22 @@ mod tests {
}
}

fn bbroot_request_v1() -> BlocksByRootRequest {
BlocksByRootRequest::new_v1(vec![Hash256::zero()].into())
fn bbroot_request_v1(spec: &ChainSpec) -> BlocksByRootRequest {
BlocksByRootRequest::new_v1(vec![Hash256::zero()], spec)
}

fn bbroot_request_v2() -> BlocksByRootRequest {
BlocksByRootRequest::new(vec![Hash256::zero()].into())
fn bbroot_request_v2(spec: &ChainSpec) -> BlocksByRootRequest {
BlocksByRootRequest::new(vec![Hash256::zero()], spec)
}

fn blbroot_request() -> BlobsByRootRequest {
BlobsByRootRequest {
blob_ids: VariableList::from(vec![BlobIdentifier {
fn blbroot_request(spec: &ChainSpec) -> BlobsByRootRequest {
BlobsByRootRequest::new(
vec![BlobIdentifier {
block_root: Hash256::zero(),
index: 0,
}]),
}
}],
spec,
)
}

fn ping_message() -> Ping {
Expand Down Expand Up @@ -1395,22 +1410,22 @@ mod tests {

#[test]
fn test_encode_then_decode_request() {
let chain_spec = Spec::default_spec();

let requests: &[OutboundRequest<Spec>] = &[
OutboundRequest::Ping(ping_message()),
OutboundRequest::Status(status_message()),
OutboundRequest::Goodbye(GoodbyeReason::Fault),
OutboundRequest::BlocksByRange(bbrange_request_v1()),
OutboundRequest::BlocksByRange(bbrange_request_v2()),
OutboundRequest::BlocksByRoot(bbroot_request_v1()),
OutboundRequest::BlocksByRoot(bbroot_request_v2()),
OutboundRequest::BlocksByRoot(bbroot_request_v1(&chain_spec)),
OutboundRequest::BlocksByRoot(bbroot_request_v2(&chain_spec)),
OutboundRequest::MetaData(MetadataRequest::new_v1()),
OutboundRequest::BlobsByRange(blbrange_request()),
OutboundRequest::BlobsByRoot(blbroot_request()),
OutboundRequest::BlobsByRoot(blbroot_request(&chain_spec)),
OutboundRequest::MetaData(MetadataRequest::new_v2()),
];

let chain_spec = Spec::default_spec();

for req in requests.iter() {
for fork_name in ForkName::list_all() {
encode_then_decode_request(req.clone(), fork_name, &chain_spec);
Expand Down
8 changes: 3 additions & 5 deletions beacon_node/lighthouse_network/src/rpc/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
time::Duration,
};

use super::{methods, rate_limiter::Quota, Protocol};
use super::{rate_limiter::Quota, Protocol};

use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -99,11 +99,9 @@ impl RateLimiterConfig {
pub const DEFAULT_META_DATA_QUOTA: Quota = Quota::n_every(2, 5);
pub const DEFAULT_STATUS_QUOTA: Quota = Quota::n_every(5, 15);
pub const DEFAULT_GOODBYE_QUOTA: Quota = Quota::one_every(10);
pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota =
Quota::n_every(methods::MAX_REQUEST_BLOCKS, 10);
pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota = Quota::n_every(1024, 10);
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota =
Quota::n_every(methods::MAX_REQUEST_BLOB_SIDECARS, 10);
pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(768, 10);
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10);
}
Expand Down
45 changes: 20 additions & 25 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,22 @@ use regex::bytes::Regex;
use serde::Serialize;
use ssz::Encode;
use ssz_derive::{Decode, Encode};
use ssz_types::{
typenum::{U1024, U128, U256, U768},
VariableList,
};
use ssz_types::{typenum::U256, VariableList};
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::Arc;
use strum::IntoStaticStr;
use superstruct::superstruct;
use types::blob_sidecar::BlobIdentifier;
use types::consts::deneb::MAX_BLOBS_PER_BLOCK;
use types::{
blob_sidecar::BlobSidecar, light_client_bootstrap::LightClientBootstrap, Epoch, EthSpec,
Hash256, SignedBeaconBlock, Slot,
blob_sidecar::BlobSidecar, light_client_bootstrap::LightClientBootstrap, ChainSpec, Epoch,
EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, Slot,
};

/// Maximum number of blocks in a single request.
pub type MaxRequestBlocks = U1024;
pub const MAX_REQUEST_BLOCKS: u64 = 1024;

/// Maximum length of error message.
pub type MaxErrorLen = U256;
pub const MAX_ERROR_LEN: u64 = 256;

pub type MaxRequestBlocksDeneb = U128;
pub const MAX_REQUEST_BLOCKS_DENEB: u64 = 128;

pub type MaxRequestBlobSidecars = U768;
pub const MAX_REQUEST_BLOB_SIDECARS: u64 = MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK;

/// Wrapper over SSZ List to represent error message in rpc responses.
#[derive(Debug, Clone)]
pub struct ErrorType(pub VariableList<u8, MaxErrorLen>);
Expand Down Expand Up @@ -344,22 +330,23 @@ impl OldBlocksByRangeRequest {
}

/// Request a number of beacon block bodies from a peer.
#[superstruct(
variants(V1, V2),
variant_attributes(derive(Encode, Decode, Clone, Debug, PartialEq))
)]
#[superstruct(variants(V1, V2), variant_attributes(derive(Clone, Debug, PartialEq)))]
#[derive(Clone, Debug, PartialEq)]
pub struct BlocksByRootRequest {
/// The list of beacon block bodies being requested.
pub block_roots: VariableList<Hash256, MaxRequestBlocks>,
pub block_roots: RuntimeVariableList<Hash256>,
}

impl BlocksByRootRequest {
pub fn new(block_roots: VariableList<Hash256, MaxRequestBlocks>) -> Self {
pub fn new(block_roots: Vec<Hash256>, spec: &ChainSpec) -> Self {
let block_roots =
RuntimeVariableList::from_vec(block_roots, spec.max_request_blocks as usize);
Self::V2(BlocksByRootRequestV2 { block_roots })
}

pub fn new_v1(block_roots: VariableList<Hash256, MaxRequestBlocks>) -> Self {
pub fn new_v1(block_roots: Vec<Hash256>, spec: &ChainSpec) -> Self {
let block_roots =
RuntimeVariableList::from_vec(block_roots, spec.max_request_blocks as usize);
Self::V1(BlocksByRootRequestV1 { block_roots })
}
}
Expand All @@ -368,7 +355,15 @@ impl BlocksByRootRequest {
#[derive(Clone, Debug, PartialEq)]
pub struct BlobsByRootRequest {
/// The list of beacon block roots being requested.
pub blob_ids: VariableList<BlobIdentifier, MaxRequestBlobSidecars>,
pub blob_ids: RuntimeVariableList<BlobIdentifier>,
}

impl BlobsByRootRequest {
pub fn new(blob_ids: Vec<BlobIdentifier>, spec: &ChainSpec) -> Self {
let blob_ids =
RuntimeVariableList::from_vec(blob_ids, spec.max_request_blob_sidecars as usize);
Self { blob_ids }
}
}

/* RPC Handling and Grouping */
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub(crate) use protocol::InboundRequest;
pub use handler::SubstreamId;
pub use methods::{
BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest,
MaxRequestBlocks, RPCResponseErrorCode, ResponseTermination, StatusMessage, MAX_REQUEST_BLOCKS,
RPCResponseErrorCode, ResponseTermination, StatusMessage,
};
pub(crate) use outbound::OutboundRequest;
pub use protocol::{max_rpc_size, Protocol, RPCError};
Expand Down
45 changes: 10 additions & 35 deletions beacon_node/lighthouse_network/src/rpc/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use super::methods::*;
use crate::rpc::{
codec::{base::BaseInboundCodec, ssz_snappy::SSZSnappyInboundCodec, InboundCodec},
methods::{MaxErrorLen, ResponseTermination, MAX_ERROR_LEN},
MaxRequestBlocks, MAX_REQUEST_BLOCKS,
};
use futures::future::BoxFuture;
use futures::prelude::{AsyncRead, AsyncWrite};
Expand All @@ -22,7 +21,7 @@ use tokio_util::{
};
use types::{
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockCapella, BeaconBlockMerge,
BlobSidecar, EmptyBlock, EthSpec, ForkContext, ForkName, Hash256, MainnetEthSpec, Signature,
BlobSidecar, EmptyBlock, EthSpec, ForkContext, ForkName, MainnetEthSpec, Signature,
SignedBeaconBlock,
};

Expand Down Expand Up @@ -89,32 +88,6 @@ lazy_static! {
+ (<types::KzgCommitment as Encode>::ssz_fixed_len() * <MainnetEthSpec>::max_blobs_per_block())
+ ssz::BYTES_PER_LENGTH_OFFSET; // Length offset for the blob commitments field.

pub static ref BLOCKS_BY_ROOT_REQUEST_MIN: usize =
VariableList::<Hash256, MaxRequestBlocks>::from(Vec::<Hash256>::new())
.as_ssz_bytes()
.len();
pub static ref BLOCKS_BY_ROOT_REQUEST_MAX: usize =
VariableList::<Hash256, MaxRequestBlocks>::from(vec![
Hash256::zero();
MAX_REQUEST_BLOCKS
as usize
])
.as_ssz_bytes()
.len();

pub static ref BLOBS_BY_ROOT_REQUEST_MIN: usize =
VariableList::<Hash256, MaxRequestBlobSidecars>::from(Vec::<Hash256>::new())
.as_ssz_bytes()
.len();
pub static ref BLOBS_BY_ROOT_REQUEST_MAX: usize =
VariableList::<Hash256, MaxRequestBlobSidecars>::from(vec![
Hash256::zero();
MAX_REQUEST_BLOB_SIDECARS
as usize
])
.as_ssz_bytes()
.len();

pub static ref ERROR_TYPE_MIN: usize =
VariableList::<u8, MaxErrorLen>::from(Vec::<u8>::new())
.as_ssz_bytes()
Expand Down Expand Up @@ -375,7 +348,7 @@ impl AsRef<str> for ProtocolId {

impl ProtocolId {
/// Returns min and max size for messages of given protocol id requests.
pub fn rpc_request_limits(&self) -> RpcLimits {
pub fn rpc_request_limits(&self, fork_context: &ForkContext) -> RpcLimits {
match self.versioned_protocol.protocol() {
Protocol::Status => RpcLimits::new(
<StatusMessage as Encode>::ssz_fixed_len(),
Expand All @@ -390,16 +363,18 @@ impl ProtocolId {
<OldBlocksByRangeRequestV2 as Encode>::ssz_fixed_len(),
<OldBlocksByRangeRequestV2 as Encode>::ssz_fixed_len(),
),
Protocol::BlocksByRoot => {
RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX)
}
Protocol::BlocksByRoot => RpcLimits::new(
fork_context.blocks_by_root_request_min,
fork_context.blocks_by_root_request_max,
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
),
Protocol::BlobsByRange => RpcLimits::new(
<BlobsByRangeRequest as Encode>::ssz_fixed_len(),
<BlobsByRangeRequest as Encode>::ssz_fixed_len(),
),
Protocol::BlobsByRoot => {
RpcLimits::new(*BLOBS_BY_ROOT_REQUEST_MIN, *BLOBS_BY_ROOT_REQUEST_MAX)
}
Protocol::BlobsByRoot => RpcLimits::new(
fork_context.blobs_by_root_request_min,
fork_context.blobs_by_root_request_max,
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
),
Protocol::Ping => RpcLimits::new(
<Ping as Encode>::ssz_fixed_len(),
<Ping as Encode>::ssz_fixed_len(),
Expand Down
Loading