Skip to content

Commit

Permalink
feat: add rate limiter for RPC calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeytimoshin committed Jan 27, 2025
1 parent d532c7e commit b197435
Show file tree
Hide file tree
Showing 19 changed files with 887 additions and 391 deletions.
73 changes: 73 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions forester/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ pub struct StartArgs {
default_value = "28807"
)]
pub address_queue_processing_length: u16,

#[arg(long, env = "FORESTER_ENABLE_RPC_RATE_LIMIT", default_value = "false")]
pub rpc_rate_limit_enabled: bool,

#[arg(long, env = "FORESTER_RPC_RATE_LIMIT", default_value = "100")]
pub rpc_rate_limit: u32,
}

#[derive(Parser, Clone, Debug)]
Expand Down
8 changes: 8 additions & 0 deletions forester/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct ExternalServicesConfig {
pub photon_api_key: Option<String>,
pub pushgateway_url: Option<String>,
pub pagerduty_routing_key: Option<String>,
pub rpc_rate_limit: Option<u32>,
}

#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -142,6 +143,11 @@ impl ForesterConfig {
.clone()
.ok_or(ConfigError::MissingField { field: "rpc_url" })?;

let mut rpc_rate_limit = None;
if args.rpc_rate_limit_enabled {
rpc_rate_limit = Some(args.rpc_rate_limit);
}

Ok(Self {
external_services: ExternalServicesConfig {
rpc_url,
Expand All @@ -151,6 +157,7 @@ impl ForesterConfig {
photon_api_key: args.photon_api_key.clone(),
pushgateway_url: args.push_gateway_url.clone(),
pagerduty_routing_key: args.pagerduty_routing_key.clone(),
rpc_rate_limit,
},
retry_config: RetryConfig {
max_retries: args.max_retries,
Expand Down Expand Up @@ -203,6 +210,7 @@ impl ForesterConfig {
photon_api_key: None,
pushgateway_url: args.push_gateway_url.clone(),
pagerduty_routing_key: args.pagerduty_routing_key.clone(),
rpc_rate_limit: None,
},
retry_config: RetryConfig::default(),
queue_config: QueueConfig::default(),
Expand Down
3 changes: 3 additions & 0 deletions forester/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub use config::{ForesterConfig, ForesterEpochInfo};
use forester_utils::forester_epoch::{TreeAccounts, TreeType};
use light_client::{
indexer::Indexer,
rate_limiter::RateLimiter,
rpc::{RpcConnection, SolanaRpcConnection},
rpc_pool::SolanaRpcPool,
};
Expand Down Expand Up @@ -83,6 +84,7 @@ pub async fn run_queue_info(

pub async fn run_pipeline<R: RpcConnection, I: Indexer<R> + IndexerType<R>>(
config: Arc<ForesterConfig>,
rate_limiter: Option<RateLimiter>,
indexer: Arc<Mutex<I>>,
shutdown: oneshot::Receiver<()>,
work_report_sender: mpsc::Sender<WorkReport>,
Expand All @@ -91,6 +93,7 @@ pub async fn run_pipeline<R: RpcConnection, I: Indexer<R> + IndexerType<R>>(
config.external_services.rpc_url.to_string(),
CommitmentConfig::confirmed(),
config.general_config.rpc_pool_size as u32,
rate_limiter.clone(),
)
.await?;

Expand Down
21 changes: 19 additions & 2 deletions forester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use forester::{
};
use light_client::{
indexer::photon_indexer::PhotonIndexer,
rate_limiter::RateLimiter,
rpc::{RpcConnection, SolanaRpcConnection},
};
use tokio::{
Expand Down Expand Up @@ -50,15 +51,31 @@ async fn main() -> Result<(), ForesterError> {
}
});

let indexer_rpc =
let mut rate_limiter = None;
if let Some(rate_limit) = config.external_services.rpc_rate_limit {
rate_limiter = Some(RateLimiter::new(rate_limit));
}

let mut indexer_rpc =
SolanaRpcConnection::new(config.external_services.rpc_url.clone(), None);
if let Some(limiter) = &rate_limiter {
indexer_rpc.set_rate_limiter(limiter.clone());
}

let indexer = Arc::new(tokio::sync::Mutex::new(PhotonIndexer::new(
config.external_services.indexer_url.clone().unwrap(),
config.external_services.photon_api_key.clone(),
indexer_rpc,
)));

run_pipeline(config, indexer, shutdown_receiver, work_report_sender).await?
run_pipeline(
config,
rate_limiter,
indexer,
shutdown_receiver,
work_report_sender,
)
.await?
}
Commands::Status(args) => {
forester_status::fetch_forester_status(args).await;
Expand Down
2 changes: 2 additions & 0 deletions forester/tests/batched_address_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async fn test_address_batched() {
config.external_services.rpc_url.to_string(),
CommitmentConfig::processed(),
config.general_config.rpc_pool_size as u32,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -217,6 +218,7 @@ async fn test_address_batched() {

let service_handle = tokio::spawn(run_pipeline(
config.clone(),
None,
Arc::new(Mutex::new(env.indexer)),
shutdown_receiver,
work_report_sender,
Expand Down
2 changes: 2 additions & 0 deletions forester/tests/batched_state_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ async fn test_state_batched() {
config.external_services.rpc_url.to_string(),
CommitmentConfig::processed(),
config.general_config.rpc_pool_size as u32,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -203,6 +204,7 @@ async fn test_state_batched() {

let service_handle = tokio::spawn(run_pipeline(
Arc::from(config.clone()),
None,
Arc::new(Mutex::new(e2e_env.indexer)),
shutdown_receiver,
work_report_sender,
Expand Down
7 changes: 7 additions & 0 deletions forester/tests/e2e_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ async fn test_epoch_monitor_with_test_indexer_and_1_forester() {
config.external_services.rpc_url.to_string(),
CommitmentConfig::confirmed(),
config.general_config.rpc_pool_size as u32,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -174,6 +175,7 @@ async fn test_epoch_monitor_with_test_indexer_and_1_forester() {
// Run the forester as pipeline
let service_handle = tokio::spawn(run_pipeline(
config.clone(),
None,
Arc::new(Mutex::new(env.indexer)),
shutdown_receiver,
work_report_sender,
Expand Down Expand Up @@ -311,6 +313,7 @@ async fn test_epoch_monitor_with_2_foresters() {
config1.external_services.rpc_url.to_string(),
CommitmentConfig::confirmed(),
config1.general_config.rpc_pool_size as u32,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -463,12 +466,14 @@ async fn test_epoch_monitor_with_2_foresters() {

let service_handle1 = tokio::spawn(run_pipeline(
config1.clone(),
None,
indexer.clone(),
shutdown_receiver1,
work_report_sender1,
));
let service_handle2 = tokio::spawn(run_pipeline(
config2.clone(),
None,
indexer,
shutdown_receiver2,
work_report_sender2,
Expand Down Expand Up @@ -656,6 +661,7 @@ async fn test_epoch_double_registration() {
config.external_services.rpc_url.to_string(),
CommitmentConfig::confirmed(),
config.general_config.rpc_pool_size as u32,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -715,6 +721,7 @@ async fn test_epoch_double_registration() {
// Run the forester pipeline
let service_handle = tokio::spawn(run_pipeline(
config.clone(),
None,
indexer.clone(),
shutdown_receiver,
work_report_sender.clone(),
Expand Down
1 change: 1 addition & 0 deletions forester/tests/priority_fee_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::test_utils::init;
mod test_utils;

#[tokio::test]
#[ignore]
async fn test_priority_fee_request() {
dotenvy::dotenv().ok();

Expand Down
5 changes: 4 additions & 1 deletion sdk-libs/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ num-bigint = { workspace = true }
num-traits = { workspace = true }
reqwest = { workspace = true }

governor = "0.8.0"


[dev-dependencies]
light-test-utils = { workspace = true, features=["devenv"]}
light-program-test = { workspace = true }
light-system-program = { workspace = true }
light-compressed-token = { workspace = true }
spl-token = { workspace = true }
rand = { workspace = true }
light-utils = { workspace = true }
light-utils = { workspace = true }
Loading

0 comments on commit b197435

Please sign in to comment.