Skip to content

Commit

Permalink
Runtime rpc request sizes (sigp#4841)
Browse files Browse the repository at this point in the history
* add runtime variable list type

* add configs to ChainSpec

* git rid of max request blocks type

* fix tests and lints

* remove todos

* git rid of old const usage

* fix decode impl

* add new config to `Config` api struct

* add docs fix compilt

* move methods for per-fork-spec to chainspec

* get values off chain spec

* fix compile

* remove min by root size

* add tests for runtime var list

---------

Co-authored-by: Jimmy Chen <jchen.tc@gmail.com>
  • Loading branch information
realbigsean and jimmygchen authored Jan 8, 2024
1 parent 5c8c8da commit b47e3f2
Show file tree
Hide file tree
Showing 25 changed files with 507 additions and 179 deletions.
7 changes: 4 additions & 3 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::sync::Arc;
use task_executor::TaskExecutor;
use types::beacon_block_body::{KzgCommitmentOpts, KzgCommitments};
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::consts::deneb::MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS;
use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};

mod availability_view;
Expand Down Expand Up @@ -424,7 +423,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map(|current_epoch| {
std::cmp::max(
fork_epoch,
current_epoch.saturating_sub(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS),
current_epoch
.saturating_sub(self.spec.min_epochs_for_blob_sidecars_requests),
)
})
})
Expand Down Expand Up @@ -517,7 +517,8 @@ async fn availability_cache_maintenance_service<T: BeaconChainTypes>(
let cutoff_epoch = std::cmp::max(
finalized_epoch + 1,
std::cmp::max(
current_epoch.saturating_sub(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS),
current_epoch
.saturating_sub(chain.spec.min_epochs_for_blob_sidecars_requests),
deneb_fork_epoch,
),
);
Expand Down
4 changes: 1 addition & 3 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use types::consts::deneb::MAX_BLOBS_PER_BLOCK;
use types::{Attestation, Hash256, SignedAggregateAndProof, SubnetId};
use types::{EthSpec, Slot};
use work_reprocessing_queue::IgnoredRpcBlock;
Expand Down Expand Up @@ -168,8 +167,7 @@ const MAX_BLOCKS_BY_RANGE_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `BlobsByRangeRequest` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_BLOBS_BY_RANGE_QUEUE_LEN: usize =
MAX_BLOCKS_BY_RANGE_QUEUE_LEN * MAX_BLOBS_PER_BLOCK as usize;
const MAX_BLOBS_BY_RANGE_QUEUE_LEN: usize = 1024;

/// The maximum number of queued `BlocksByRootRequest` objects received from the network RPC that
/// will be stored before we start dropping them.
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/rpc/codec/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ mod tests {
));

// Request limits
let limit = protocol_id.rpc_request_limits();
let limit = protocol_id.rpc_request_limits(&fork_context.spec);
let mut max = encode_len(limit.max + 1);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol_id.clone(),
Expand Down
60 changes: 38 additions & 22 deletions beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ use std::io::{Read, Write};
use std::marker::PhantomData;
use std::sync::Arc;
use tokio_util::codec::{Decoder, Encoder};
use types::ChainSpec;
use types::{
BlobSidecar, EthSpec, ForkContext, ForkName, Hash256, LightClientBootstrap, SignedBeaconBlock,
SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockCapella,
SignedBeaconBlockDeneb, SignedBeaconBlockMerge,
BlobSidecar, EthSpec, ForkContext, ForkName, Hash256, LightClientBootstrap,
RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase,
SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockMerge,
};
use unsigned_varint::codec::Uvi;

Expand Down Expand Up @@ -140,7 +141,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {

// Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of
// packet size for ssz container corresponding to `self.protocol`.
let ssz_limits = self.protocol.rpc_request_limits();
let ssz_limits = self.protocol.rpc_request_limits(&self.fork_context.spec);
if ssz_limits.is_out_of_bounds(length, self.max_packet_size) {
return Err(RPCError::InvalidData(format!(
"RPC request length for protocol {:?} is out of bounds, length {}",
Expand All @@ -161,7 +162,11 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
let n = reader.get_ref().get_ref().position();
self.len = None;
let _read_bytes = src.split_to(n as usize);
handle_rpc_request(self.protocol.versioned_protocol, &decoded_buffer)
handle_rpc_request(
self.protocol.versioned_protocol,
&decoded_buffer,
&self.fork_context.spec,
)
}
Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len),
}
Expand Down Expand Up @@ -451,6 +456,7 @@ fn handle_length(
fn handle_rpc_request<T: EthSpec>(
versioned_protocol: SupportedProtocol,
decoded_buffer: &[u8],
spec: &ChainSpec,
) -> Result<Option<InboundRequest<T>>, RPCError> {
match versioned_protocol {
SupportedProtocol::StatusV1 => Ok(Some(InboundRequest::Status(
Expand All @@ -467,20 +473,29 @@ fn handle_rpc_request<T: EthSpec>(
))),
SupportedProtocol::BlocksByRootV2 => Ok(Some(InboundRequest::BlocksByRoot(
BlocksByRootRequest::V2(BlocksByRootRequestV2 {
block_roots: VariableList::from_ssz_bytes(decoded_buffer)?,
block_roots: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
spec.max_request_blocks as usize,
)?,
}),
))),
SupportedProtocol::BlocksByRootV1 => Ok(Some(InboundRequest::BlocksByRoot(
BlocksByRootRequest::V1(BlocksByRootRequestV1 {
block_roots: VariableList::from_ssz_bytes(decoded_buffer)?,
block_roots: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
spec.max_request_blocks as usize,
)?,
}),
))),
SupportedProtocol::BlobsByRangeV1 => Ok(Some(InboundRequest::BlobsByRange(
BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?,
))),
SupportedProtocol::BlobsByRootV1 => {
Ok(Some(InboundRequest::BlobsByRoot(BlobsByRootRequest {
blob_ids: VariableList::from_ssz_bytes(decoded_buffer)?,
blob_ids: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
spec.max_request_blob_sidecars as usize,
)?,
})))
}
SupportedProtocol::PingV1 => Ok(Some(InboundRequest::Ping(Ping {
Expand Down Expand Up @@ -773,21 +788,22 @@ mod tests {
}
}

fn bbroot_request_v1() -> BlocksByRootRequest {
BlocksByRootRequest::new_v1(vec![Hash256::zero()].into())
fn bbroot_request_v1(spec: &ChainSpec) -> BlocksByRootRequest {
BlocksByRootRequest::new_v1(vec![Hash256::zero()], spec)
}

fn bbroot_request_v2() -> BlocksByRootRequest {
BlocksByRootRequest::new(vec![Hash256::zero()].into())
fn bbroot_request_v2(spec: &ChainSpec) -> BlocksByRootRequest {
BlocksByRootRequest::new(vec![Hash256::zero()], spec)
}

fn blbroot_request() -> BlobsByRootRequest {
BlobsByRootRequest {
blob_ids: VariableList::from(vec![BlobIdentifier {
fn blbroot_request(spec: &ChainSpec) -> BlobsByRootRequest {
BlobsByRootRequest::new(
vec![BlobIdentifier {
block_root: Hash256::zero(),
index: 0,
}]),
}
}],
spec,
)
}

fn ping_message() -> Ping {
Expand Down Expand Up @@ -1391,22 +1407,22 @@ mod tests {

#[test]
fn test_encode_then_decode_request() {
let chain_spec = Spec::default_spec();

let requests: &[OutboundRequest<Spec>] = &[
OutboundRequest::Ping(ping_message()),
OutboundRequest::Status(status_message()),
OutboundRequest::Goodbye(GoodbyeReason::Fault),
OutboundRequest::BlocksByRange(bbrange_request_v1()),
OutboundRequest::BlocksByRange(bbrange_request_v2()),
OutboundRequest::BlocksByRoot(bbroot_request_v1()),
OutboundRequest::BlocksByRoot(bbroot_request_v2()),
OutboundRequest::BlocksByRoot(bbroot_request_v1(&chain_spec)),
OutboundRequest::BlocksByRoot(bbroot_request_v2(&chain_spec)),
OutboundRequest::MetaData(MetadataRequest::new_v1()),
OutboundRequest::BlobsByRange(blbrange_request()),
OutboundRequest::BlobsByRoot(blbroot_request()),
OutboundRequest::BlobsByRoot(blbroot_request(&chain_spec)),
OutboundRequest::MetaData(MetadataRequest::new_v2()),
];

let chain_spec = Spec::default_spec();

for req in requests.iter() {
for fork_name in ForkName::list_all() {
encode_then_decode_request(req.clone(), fork_name, &chain_spec);
Expand Down
8 changes: 3 additions & 5 deletions beacon_node/lighthouse_network/src/rpc/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
time::Duration,
};

use super::{methods, rate_limiter::Quota, Protocol};
use super::{rate_limiter::Quota, Protocol};

use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -99,11 +99,9 @@ impl RateLimiterConfig {
pub const DEFAULT_META_DATA_QUOTA: Quota = Quota::n_every(2, 5);
pub const DEFAULT_STATUS_QUOTA: Quota = Quota::n_every(5, 15);
pub const DEFAULT_GOODBYE_QUOTA: Quota = Quota::one_every(10);
pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota =
Quota::n_every(methods::MAX_REQUEST_BLOCKS, 10);
pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota = Quota::n_every(1024, 10);
pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota =
Quota::n_every(methods::MAX_REQUEST_BLOB_SIDECARS, 10);
pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(768, 10);
pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10);
}
Expand Down
45 changes: 20 additions & 25 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,22 @@ use regex::bytes::Regex;
use serde::Serialize;
use ssz::Encode;
use ssz_derive::{Decode, Encode};
use ssz_types::{
typenum::{U1024, U128, U256, U768},
VariableList,
};
use ssz_types::{typenum::U256, VariableList};
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::Arc;
use strum::IntoStaticStr;
use superstruct::superstruct;
use types::blob_sidecar::BlobIdentifier;
use types::consts::deneb::MAX_BLOBS_PER_BLOCK;
use types::{
blob_sidecar::BlobSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap, SignedBeaconBlock,
Slot,
blob_sidecar::BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, LightClientBootstrap,
RuntimeVariableList, SignedBeaconBlock, Slot,
};

/// Maximum number of blocks in a single request.
pub type MaxRequestBlocks = U1024;
pub const MAX_REQUEST_BLOCKS: u64 = 1024;

/// Maximum length of error message.
pub type MaxErrorLen = U256;
pub const MAX_ERROR_LEN: u64 = 256;

pub type MaxRequestBlocksDeneb = U128;
pub const MAX_REQUEST_BLOCKS_DENEB: u64 = 128;

pub type MaxRequestBlobSidecars = U768;
pub const MAX_REQUEST_BLOB_SIDECARS: u64 = MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK;

/// Wrapper over SSZ List to represent error message in rpc responses.
#[derive(Debug, Clone)]
pub struct ErrorType(pub VariableList<u8, MaxErrorLen>);
Expand Down Expand Up @@ -344,22 +330,23 @@ impl OldBlocksByRangeRequest {
}

/// Request a number of beacon block bodies from a peer.
#[superstruct(
variants(V1, V2),
variant_attributes(derive(Encode, Decode, Clone, Debug, PartialEq))
)]
#[superstruct(variants(V1, V2), variant_attributes(derive(Clone, Debug, PartialEq)))]
#[derive(Clone, Debug, PartialEq)]
pub struct BlocksByRootRequest {
/// The list of beacon block bodies being requested.
pub block_roots: VariableList<Hash256, MaxRequestBlocks>,
pub block_roots: RuntimeVariableList<Hash256>,
}

impl BlocksByRootRequest {
pub fn new(block_roots: VariableList<Hash256, MaxRequestBlocks>) -> Self {
pub fn new(block_roots: Vec<Hash256>, spec: &ChainSpec) -> Self {
let block_roots =
RuntimeVariableList::from_vec(block_roots, spec.max_request_blocks as usize);
Self::V2(BlocksByRootRequestV2 { block_roots })
}

pub fn new_v1(block_roots: VariableList<Hash256, MaxRequestBlocks>) -> Self {
pub fn new_v1(block_roots: Vec<Hash256>, spec: &ChainSpec) -> Self {
let block_roots =
RuntimeVariableList::from_vec(block_roots, spec.max_request_blocks as usize);
Self::V1(BlocksByRootRequestV1 { block_roots })
}
}
Expand All @@ -368,7 +355,15 @@ impl BlocksByRootRequest {
#[derive(Clone, Debug, PartialEq)]
pub struct BlobsByRootRequest {
/// The list of beacon block roots being requested.
pub blob_ids: VariableList<BlobIdentifier, MaxRequestBlobSidecars>,
pub blob_ids: RuntimeVariableList<BlobIdentifier>,
}

impl BlobsByRootRequest {
pub fn new(blob_ids: Vec<BlobIdentifier>, spec: &ChainSpec) -> Self {
let blob_ids =
RuntimeVariableList::from_vec(blob_ids, spec.max_request_blob_sidecars as usize);
Self { blob_ids }
}
}

/* RPC Handling and Grouping */
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(crate) use protocol::InboundRequest;
pub use handler::SubstreamId;
pub use methods::{
BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest,
MaxRequestBlocks, RPCResponseErrorCode, ResponseTermination, StatusMessage, MAX_REQUEST_BLOCKS,
RPCResponseErrorCode, ResponseTermination, StatusMessage,
};
pub(crate) use outbound::OutboundRequest;
pub use protocol::{max_rpc_size, Protocol, RPCError};
Expand Down
39 changes: 4 additions & 35 deletions beacon_node/lighthouse_network/src/rpc/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use super::methods::*;
use crate::rpc::{
codec::{base::BaseInboundCodec, ssz_snappy::SSZSnappyInboundCodec, InboundCodec},
methods::{MaxErrorLen, ResponseTermination, MAX_ERROR_LEN},
MaxRequestBlocks, MAX_REQUEST_BLOCKS,
};
use futures::future::BoxFuture;
use futures::prelude::{AsyncRead, AsyncWrite};
Expand All @@ -22,7 +21,7 @@ use tokio_util::{
};
use types::{
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockCapella, BeaconBlockMerge,
BlobSidecar, EmptyBlock, EthSpec, ForkContext, ForkName, Hash256, MainnetEthSpec, Signature,
BlobSidecar, ChainSpec, EmptyBlock, EthSpec, ForkContext, ForkName, MainnetEthSpec, Signature,
SignedBeaconBlock,
};

Expand Down Expand Up @@ -89,32 +88,6 @@ lazy_static! {
+ (<types::KzgCommitment as Encode>::ssz_fixed_len() * <MainnetEthSpec>::max_blobs_per_block())
+ ssz::BYTES_PER_LENGTH_OFFSET; // Length offset for the blob commitments field.

pub static ref BLOCKS_BY_ROOT_REQUEST_MIN: usize =
VariableList::<Hash256, MaxRequestBlocks>::from(Vec::<Hash256>::new())
.as_ssz_bytes()
.len();
pub static ref BLOCKS_BY_ROOT_REQUEST_MAX: usize =
VariableList::<Hash256, MaxRequestBlocks>::from(vec![
Hash256::zero();
MAX_REQUEST_BLOCKS
as usize
])
.as_ssz_bytes()
.len();

pub static ref BLOBS_BY_ROOT_REQUEST_MIN: usize =
VariableList::<Hash256, MaxRequestBlobSidecars>::from(Vec::<Hash256>::new())
.as_ssz_bytes()
.len();
pub static ref BLOBS_BY_ROOT_REQUEST_MAX: usize =
VariableList::<Hash256, MaxRequestBlobSidecars>::from(vec![
Hash256::zero();
MAX_REQUEST_BLOB_SIDECARS
as usize
])
.as_ssz_bytes()
.len();

pub static ref ERROR_TYPE_MIN: usize =
VariableList::<u8, MaxErrorLen>::from(Vec::<u8>::new())
.as_ssz_bytes()
Expand Down Expand Up @@ -375,7 +348,7 @@ impl AsRef<str> for ProtocolId {

impl ProtocolId {
/// Returns min and max size for messages of given protocol id requests.
pub fn rpc_request_limits(&self) -> RpcLimits {
pub fn rpc_request_limits(&self, spec: &ChainSpec) -> RpcLimits {
match self.versioned_protocol.protocol() {
Protocol::Status => RpcLimits::new(
<StatusMessage as Encode>::ssz_fixed_len(),
Expand All @@ -390,16 +363,12 @@ impl ProtocolId {
<OldBlocksByRangeRequestV2 as Encode>::ssz_fixed_len(),
<OldBlocksByRangeRequestV2 as Encode>::ssz_fixed_len(),
),
Protocol::BlocksByRoot => {
RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX)
}
Protocol::BlocksByRoot => RpcLimits::new(0, spec.max_blocks_by_root_request),
Protocol::BlobsByRange => RpcLimits::new(
<BlobsByRangeRequest as Encode>::ssz_fixed_len(),
<BlobsByRangeRequest as Encode>::ssz_fixed_len(),
),
Protocol::BlobsByRoot => {
RpcLimits::new(*BLOBS_BY_ROOT_REQUEST_MIN, *BLOBS_BY_ROOT_REQUEST_MAX)
}
Protocol::BlobsByRoot => RpcLimits::new(0, spec.max_blobs_by_root_request),
Protocol::Ping => RpcLimits::new(
<Ping as Encode>::ssz_fixed_len(),
<Ping as Encode>::ssz_fixed_len(),
Expand Down
Loading

0 comments on commit b47e3f2

Please sign in to comment.