Skip to content

Commit

Permalink
catch-up size as associated constant
Browse files Browse the repository at this point in the history
  • Loading branch information
sistemd committed Dec 16, 2024
1 parent e163ef5 commit 15bdee2
Show file tree
Hide file tree
Showing 11 changed files with 35 additions and 27 deletions.
4 changes: 2 additions & 2 deletions crates/pathfinder/src/bin/pathfinder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ This should only be enabled for debugging purposes as it adds substantial proces
"The number of aggregate event bloom filters to cache in memory. Each filter covers a {} block range.
This cache speeds up event related RPC queries at the cost of using extra memory.
Each cached filter takes 16 MiB of memory.",
pathfinder_storage::BLOCK_RANGE_LEN
pathfinder_storage::AGGREGATE_BLOOM_BLOCK_RANGE_LEN
),
env = "PATHFINDER_STORAGE_EVENT_FILTER_CACHE_SIZE",
default_value = "64"
Expand All @@ -283,7 +283,7 @@ This should only be enabled for debugging purposes as it adds substantial proces
"The number of uncached aggregate Bloom filters to load when querying for events.
Each filter covers a {} block range.
This limit is used to prevent queries from taking too long.",
pathfinder_storage::BLOCK_RANGE_LEN
pathfinder_storage::AGGREGATE_BLOOM_BLOCK_RANGE_LEN
),
env = "PATHFINDER_RPC_GET_EVENTS_MAX_UNCACHED_EVENT_FILTERS_TO_LOAD",
default_value = "12"
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use pathfinder_common::{BlockHash, BlockNumber};
pub use request::RpcRequest;
pub use response::RpcResponse;
#[cfg(test)]
pub use router::{handle_json_rpc_socket, CATCH_UP_BATCH_SIZE};
pub use router::handle_json_rpc_socket;
pub use router::{
rpc_handler,
CatchUp,
Expand Down
2 changes: 0 additions & 2 deletions crates/rpc/src/jsonrpc/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ use axum::response::IntoResponse;
use futures::{Future, FutureExt, StreamExt};
use http::HeaderValue;
use method::RpcMethodEndpoint;
#[cfg(test)]
pub use subscription::CATCH_UP_BATCH_SIZE;
pub use subscription::{handle_json_rpc_socket, CatchUp, RpcSubscriptionFlow, SubscriptionMessage};
use subscription::{split_ws, RpcSubscriptionEndpoint};

Expand Down
6 changes: 3 additions & 3 deletions crates/rpc/src/jsonrpc/router/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use crate::error::ApplicationError;
use crate::jsonrpc::{RpcError, RpcRequest, RpcResponse};
use crate::{RpcVersion, SubscriptionId};

pub const CATCH_UP_BATCH_SIZE: u64 = 64;

/// See [`RpcSubscriptionFlow`].
#[axum::async_trait]
pub(super) trait RpcSubscriptionEndpoint: Send + Sync {
Expand Down Expand Up @@ -66,6 +64,8 @@ pub trait RpcSubscriptionFlow: Send + Sync {
type Params: crate::dto::DeserializeForVersion + Clone + Send + Sync + 'static;
/// The notification type to be sent to the client.
type Notification: crate::dto::serialize::SerializeForVersion + Send + Sync + 'static;
/// The maximum number of blocks to catch up to in a single batch.
const CATCH_UP_BATCH_SIZE: u64 = 64;

/// Validate the subscription parameters. If the parameters are invalid,
/// return an error.
Expand Down Expand Up @@ -204,7 +204,7 @@ where
// -1 because the end is inclusive, otherwise we get batches of
// `CATCH_UP_BATCH_SIZE + 1` which probably doesn't really
// matter, but it's misleading.
let end = *current_block + CATCH_UP_BATCH_SIZE - 1;
let end = *current_block + Self::CATCH_UP_BATCH_SIZE - 1;
let catch_up =
match T::catch_up(&router.context, &params, *current_block, end).await {
Ok(messages) => messages,
Expand Down
18 changes: 10 additions & 8 deletions crates/rpc/src/method/subscribe_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use axum::async_trait;
use pathfinder_common::{BlockId, BlockNumber, ContractAddress, EventKey};
use pathfinder_storage::EVENT_KEY_FILTER_LIMIT;
use pathfinder_storage::{AGGREGATE_BLOOM_BLOCK_RANGE_LEN, EVENT_KEY_FILTER_LIMIT};
use tokio::sync::mpsc;

use super::REORG_SUBSCRIPTION_NAME;
Expand Down Expand Up @@ -65,6 +65,7 @@ const SUBSCRIPTION_NAME: &str = "starknet_subscriptionEvents";
impl RpcSubscriptionFlow for SubscribeEvents {
type Params = Option<Params>;
type Notification = Notification;
const CATCH_UP_BATCH_SIZE: u64 = AGGREGATE_BLOOM_BLOCK_RANGE_LEN;

fn validate_params(params: &Self::Params) -> Result<(), RpcError> {
if let Some(params) = params {
Expand Down Expand Up @@ -256,14 +257,15 @@ mod tests {
use tokio::sync::mpsc;

use crate::context::{RpcConfig, RpcContext};
use crate::jsonrpc::{handle_json_rpc_socket, RpcRouter, CATCH_UP_BATCH_SIZE};
use crate::jsonrpc::{handle_json_rpc_socket, RpcRouter, RpcSubscriptionFlow};
use crate::method::subscribe_events::SubscribeEvents;
use crate::pending::PendingWatcher;
use crate::types::syncing::Syncing;
use crate::{v08, Notifications, Reorg, SyncState};

#[tokio::test]
async fn no_filtering() {
let num_blocks = 2000;
let num_blocks = SubscribeEvents::CATCH_UP_BATCH_SIZE + 10;
let router = setup(num_blocks).await;
let (sender_tx, mut sender_rx) = mpsc::channel(1024);
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
Expand Down Expand Up @@ -325,7 +327,7 @@ mod tests {

#[tokio::test]
async fn filter_from_address() {
let router = setup(2000).await;
let router = setup(SubscribeEvents::CATCH_UP_BATCH_SIZE + 10).await;
let (sender_tx, mut sender_rx) = mpsc::channel(1024);
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx);
Expand Down Expand Up @@ -391,7 +393,7 @@ mod tests {

#[tokio::test]
async fn filter_keys() {
let router = setup(2000).await;
let router = setup(SubscribeEvents::CATCH_UP_BATCH_SIZE + 10).await;
let (sender_tx, mut sender_rx) = mpsc::channel(1024);
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx);
Expand Down Expand Up @@ -457,7 +459,7 @@ mod tests {

#[tokio::test]
async fn filter_from_address_and_keys() {
let router = setup(2000).await;
let router = setup(SubscribeEvents::CATCH_UP_BATCH_SIZE + 10).await;
let (sender_tx, mut sender_rx) = mpsc::channel(1024);
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx);
Expand Down Expand Up @@ -524,7 +526,7 @@ mod tests {

#[tokio::test]
async fn too_many_keys_filter() {
let router = setup(2000).await;
let router = setup(SubscribeEvents::CATCH_UP_BATCH_SIZE + 10).await;
let (sender_tx, mut sender_rx) = mpsc::channel(1024);
let (receiver_tx, receiver_rx) = mpsc::channel(1024);
handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx);
Expand Down Expand Up @@ -691,7 +693,7 @@ mod tests {
}

async fn setup(num_blocks: u64) -> RpcRouter {
assert!(num_blocks == 0 || num_blocks > CATCH_UP_BATCH_SIZE);
assert!(num_blocks == 0 || num_blocks > SubscribeEvents::CATCH_UP_BATCH_SIZE);

let storage = StorageBuilder::in_memory().unwrap();
tokio::task::spawn_blocking({
Expand Down
13 changes: 7 additions & 6 deletions crates/rpc/src/method/subscribe_new_heads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,27 +176,28 @@ mod tests {
use starknet_gateway_client::Client;
use tokio::sync::mpsc;

use super::*;
use crate::context::{RpcConfig, RpcContext};
use crate::jsonrpc::{handle_json_rpc_socket, RpcResponse, RpcRouter, CATCH_UP_BATCH_SIZE};
use crate::jsonrpc::{handle_json_rpc_socket, RpcResponse, RpcRouter};
use crate::pending::PendingWatcher;
use crate::types::syncing::Syncing;
use crate::{v08, Notifications, Reorg, SubscriptionId, SyncState};

#[tokio::test]
async fn happy_path_with_historic_blocks() {
happy_path_test(2000).await;
happy_path_test(SubscribeNewHeads::CATCH_UP_BATCH_SIZE + 10).await;
}

#[tokio::test]
async fn happy_path_with_historic_blocks_no_batching() {
happy_path_test(CATCH_UP_BATCH_SIZE - 5).await;
happy_path_test(SubscribeNewHeads::CATCH_UP_BATCH_SIZE - 5).await;
}

#[tokio::test]
async fn happy_path_with_historic_blocks_batching_edge_cases() {
happy_path_test(2 * CATCH_UP_BATCH_SIZE).await;
happy_path_test(2 * (CATCH_UP_BATCH_SIZE - 1)).await;
happy_path_test(2 * (CATCH_UP_BATCH_SIZE + 1)).await;
happy_path_test(2 * SubscribeNewHeads::CATCH_UP_BATCH_SIZE).await;
happy_path_test(2 * (SubscribeNewHeads::CATCH_UP_BATCH_SIZE - 1)).await;
happy_path_test(2 * (SubscribeNewHeads::CATCH_UP_BATCH_SIZE + 1)).await;
}

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/method/trace_block_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ pub(crate) mod tests {
// Need to avoid skipping blocks for `insert_transaction_data`
// so that there is no gap in event filters.
(0..619596)
.step_by(pathfinder_storage::BLOCK_RANGE_LEN as usize)
.step_by(pathfinder_storage::AGGREGATE_BLOOM_BLOCK_RANGE_LEN as usize)
.for_each(|block: u64| {
let block = BlockNumber::new_or_panic(block.saturating_sub(1));
transaction
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/v06/method/trace_block_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ pub(crate) mod tests {
// Need to avoid skipping blocks for `insert_transaction_data`
// so that there is no gap in event filters.
(0..619596)
.step_by(pathfinder_storage::BLOCK_RANGE_LEN as usize)
.step_by(pathfinder_storage::AGGREGATE_BLOOM_BLOCK_RANGE_LEN as usize)
.for_each(|block: u64| {
let block = BlockNumber::new_or_panic(block.saturating_sub(1));
transaction
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/v06/method/trace_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ pub mod tests {

// Need to avoid skipping blocks for `insert_transaction_data`.
(0..619596)
.step_by(pathfinder_storage::BLOCK_RANGE_LEN as usize)
.step_by(pathfinder_storage::AGGREGATE_BLOOM_BLOCK_RANGE_LEN as usize)
.for_each(|block: u64| {
let block = BlockNumber::new_or_panic(block.saturating_sub(1));
transaction
Expand Down
9 changes: 8 additions & 1 deletion crates/storage/src/bloom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use cached::{Cached, SizedCache};
use pathfinder_common::BlockNumber;
use pathfinder_crypto::Felt;

pub const BLOCK_RANGE_LEN: u64 = AggregateBloom::BLOCK_RANGE_LEN;
pub const AGGREGATE_BLOOM_BLOCK_RANGE_LEN: u64 = AggregateBloom::BLOCK_RANGE_LEN;

/// An aggregate of all Bloom filters for a given range of blocks.
/// Before being added to `AggregateBloom`, each [`BloomFilter`] is
Expand Down Expand Up @@ -99,6 +99,7 @@ impl AggregateBloom {
Self::from_parts(from_block, to_block, bitmap)
}

/// Create an `AggregateBloom` from a compressed bitmap.
pub fn from_existing_compressed(
from_block: BlockNumber,
to_block: BlockNumber,
Expand Down Expand Up @@ -127,13 +128,19 @@ impl AggregateBloom {
}
}

/// Compress the bitmap of the aggregate Bloom filter.
pub fn compress_bitmap(&self) -> Vec<u8> {
zstd::bulk::compress(&self.bitmap, 10).expect("Compressing aggregate Bloom filter")
}

/// Rotate the [`BloomFilter`] by 90 degrees (transpose) and add it to the
/// aggregate. It is up to the user to keep track of when the aggregate
/// filter's block range has been exhausted and respond accordingly.
///
/// # Panics
///
/// Panics if the block number is not in the range of blocks that this
/// aggregate covers.
pub fn add_bloom(&mut self, bloom: &BloomFilter, block_number: BlockNumber) {
assert!(
(self.from_block..=self.to_block).contains(&block_number),
Expand Down
2 changes: 1 addition & 1 deletion crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod prelude;

mod bloom;
use bloom::AggregateBloomCache;
pub use bloom::BLOCK_RANGE_LEN;
pub use bloom::AGGREGATE_BLOOM_BLOCK_RANGE_LEN;
mod connection;
pub mod fake;
mod params;
Expand Down

0 comments on commit 15bdee2

Please sign in to comment.